threaded_object
Overview
cbeam::concurrency::threaded_object
is a helper class template based on the Curiously Recurring Template Pattern (CRTP). It spawns and manages a dedicated thread that executes user-defined logic. You derive your own class from threaded_object<Derived, MessageDataType>
and implement specific methods such as on_start()
, is_message_available()
, get_message()
, on_message()
, and on_exit()
.
When your derived object is created via the static create()
method, the base class starts a new thread. That thread will:
- Call
on_start()
once, before entering the main loop. - In the loop, check
is_message_available()
to decide whether to process work or wait on a condition variable. - If work is available,
get_message()
retrieves a single unit of work (e.g., a message or task). on_message()
is then invoked to process that retrieved message or task.- When
threaded_object
is destroyed (or the_running
flag is set to false internally),on_exit()
is called just before the thread terminates.
The threaded_object
automatically joins its thread in the destructor, ensuring a clean shutdown with no resource leaks.
How to Derive and Use
1. Derive a Class
Below is a minimal example based on the unit tests (test_threaded_object.cpp
). Notice how we override only on_start()
and on_exit()
for demonstration, while is_message_available()
remains false in the base class (so the thread effectively waits until destruction). In other scenarios, you might override all relevant methods.
#include <cbeam/concurrency/threaded_object.hpp>
#include <atomic>
#include <chrono>
#include <thread>
class MyThreadedExample
: public cbeam::concurrency::threaded_object<MyThreadedExample, int>
{
public:
// The constructor must accept a `construction_token` as its first parameter
MyThreadedExample(typename cbeam::concurrency::threaded_object<MyThreadedExample, int>::construction_token)
{
// Any extra initialization goes here
}
void on_start() override {
// Called once when the thread begins
std::this_thread::sleep_for(std::chrono::milliseconds(50));
_has_run = true;
}
// is_message_available() and get_message() can be overridden
// if you need to handle actual queue-based data. For this demo,
// we leave them as inherited from the base class, which does nothing.
// on_message(...) is similarly optional if you do want to process messages.
void on_exit() override {
// Called right before the thread returns
_finished = true;
}
// Public or private flags that let you observe the thread’s work
std::atomic<bool> _has_run{false};
std::atomic<bool> _finished{false};
};
2. Instantiate via create()
Because the constructor is protected by a construction_token
, you cannot construct MyThreadedExample
directly. Instead, call:
#include <cbeam/concurrency/threaded_object.hpp>
#include <memory>
#include <mutex>
#include <condition_variable>
int main() {
// Shared mutex and condition variable
// (used internally by threaded_object to wait/notify)
auto mtx = std::make_shared<std::mutex>();
auto cv = std::make_shared<std::condition_variable>();
// Create your derived object and start the thread
auto obj = cbeam::concurrency::threaded_object<MyThreadedExample, int>
::create(mtx, cv);
// The new thread calls on_start() internally
// ... do other work ...
// On scope exit or obj.reset(), destructor joins the thread
// and calls on_exit().
return 0;
}
3. Overriding is_message_available()
, get_message()
, and on_message()
For more complex scenarios (for example, you want a real message queue), you can override:
is_message_available()
: Return true if you have data (like a non-empty queue).get_message()
: Pop or retrieve the front of that data queue.on_message(...)
: Process the retrieved item.
Real-World Example in message_manager
In message_manager.hpp
, threaded_object::create(...)
is used to spawn a new thread that processes messages in a loop, waiting on a condition variable between iterations if no message is available:
class message_handler : public threaded_object<message_handler, MessageDataType> {
public:
message_handler(typename threaded_object<message_handler, MessageDataType>::construction_token,
const std::shared_ptr<message_queue>& queue,
/* ... other params ... */)
: threaded_object<message_handler, MessageDataType>{}
, _message_queue{queue}
// etc...
{}
void on_start() override {
// set thread name, log details ...
}
bool is_message_available() override {
// return true if _message_queue->queue is non-empty
return !_message_queue->queue.empty();
}
MessageDataType get_message() override {
// pop from front/back depending on queue's order type
// update counters, logging, etc.
// then return the retrieved message
}
void on_message(const MessageDataType& message_data) noexcept override {
// the actual user-defined message handler logic
}
void on_exit() override {
// final cleanup if needed
}
private:
std::shared_ptr<message_queue> _message_queue;
// other fields ...
};
Lifecycle Details
Construction
The static method create(...)
returns a std::unique_ptr<Derived>
. Internally, it:
- Allocates the derived object via
std::make_unique<Derived>(construction_token{}, ...)
. - Stores shared pointers to a mutex (
_mtx
) and a condition variable (_cv
). - Launches the thread with a private
worker()
method, which callson_start()
and enters the loop.
Running Loop
- While
_running
is true:- Acquire the mutex and check
is_message_available()
. - If no message is available, wait on
_cv->wait(...)
. - Otherwise, call
get_message()
, then outside the lock, callon_message(...)
.
- Acquire the mutex and check
- When
_running
is set to false (e.g., in the destructor), the loop ends.
Destruction
- In the
threaded_object
destructor, we set_running
to false, notify the condition variable, and join the thread. - Just before the thread exits, it calls
on_exit()
.
Key Points
- This design ensures that each derived object can hold custom data and logic while the
threaded_object
base class handles synchronization and clean shutdown. - You do not instantiate derived classes directly;
create()
is the recommended pattern. - Condition variables help avoid spinning or polling; the thread sleeps if there's no current work, then wakes up when data arrives.
Unit Test References
Please see test_threaded_object.cpp for a unit test of how threaded_object
gets subclassed and used. This includes checks that:
on_start()
runs in the spawned thread.- The destructor cleanly joins the thread.
- The derived class flags
_has_run
and_finished
are updated as expected.
All these confirm the reliable lifecycle approach that threaded_object
provides.