threaded_object
Overview
cbeam::concurrency::threaded_object is a helper class template based on the Curiously Recurring Template Pattern (CRTP). It spawns and manages a dedicated thread that executes user-defined logic. You derive your own class from threaded_object<Derived, MessageDataType> and implement specific methods such as on_start(), is_message_available(), get_message(), on_message(), and on_exit().
When your derived object is created via the static create() method, the base class starts a new thread. That thread will:
- Call
on_start()once, before entering the main loop. - In the loop, check
is_message_available()to decide whether to process work or wait on a condition variable. - If work is available,
get_message()retrieves a single unit of work (e.g., a message or task). on_message()is then invoked to process that retrieved message or task.- When
threaded_objectis destroyed (or the_runningflag is set to false internally),on_exit()is called just before the thread terminates.
The threaded_object automatically joins its thread in the destructor, ensuring a clean shutdown with no resource leaks.
How to Derive and Use
1. Derive a Class
Below is a minimal example based on the unit tests (test_threaded_object.cpp). Notice how we override only on_start() and on_exit() for demonstration, while is_message_available() remains false in the base class (so the thread effectively waits until destruction). In other scenarios, you might override all relevant methods.
#include <cbeam/concurrency/threaded_object.hpp>
#include <atomic>
#include <chrono>
#include <thread>
class MyThreadedExample
: public cbeam::concurrency::threaded_object<MyThreadedExample, int>
{
public:
// The constructor must accept a `construction_token` as its first parameter
MyThreadedExample(typename cbeam::concurrency::threaded_object<MyThreadedExample, int>::construction_token)
{
// Any extra initialization goes here
}
void on_start() override {
// Called once when the thread begins
std::this_thread::sleep_for(std::chrono::milliseconds(50));
_has_run = true;
}
// is_message_available() and get_message() can be overridden
// if you need to handle actual queue-based data. For this demo,
// we leave them as inherited from the base class, which does nothing.
// on_message(...) is similarly optional if you do want to process messages.
void on_exit() override {
// Called right before the thread returns
_finished = true;
}
// Public or private flags that let you observe the thread’s work
std::atomic<bool> _has_run{false};
std::atomic<bool> _finished{false};
};
2. Instantiate via create()
Because the constructor is protected by a construction_token, you cannot construct MyThreadedExample directly. Instead, call:
#include <cbeam/concurrency/threaded_object.hpp>
#include <memory>
#include <mutex>
#include <condition_variable>
int main() {
// Shared mutex and condition variable
// (used internally by threaded_object to wait/notify)
auto mtx = std::make_shared<std::mutex>();
auto cv = std::make_shared<std::condition_variable>();
// Create your derived object and start the thread
auto obj = cbeam::concurrency::threaded_object<MyThreadedExample, int>
::create(mtx, cv);
// The new thread calls on_start() internally
// ... do other work ...
// On scope exit or obj.reset(), destructor joins the thread
// and calls on_exit().
return 0;
}
3. Overriding is_message_available(), get_message(), and on_message()
For more complex scenarios (for example, you want a real message queue), you can override:
is_message_available(): Return true if you have data (like a non-empty queue).get_message(): Pop or retrieve the front of that data queue.on_message(...): Process the retrieved item.
Real-World Example in message_manager
In message_manager.hpp, threaded_object::create(...) is used to spawn a new thread that processes messages in a loop, waiting on a condition variable between iterations if no message is available:
class message_handler : public threaded_object<message_handler, MessageDataType> {
public:
message_handler(typename threaded_object<message_handler, MessageDataType>::construction_token,
const std::shared_ptr<message_queue>& queue,
/* ... other params ... */)
: threaded_object<message_handler, MessageDataType>{}
, _message_queue{queue}
// etc...
{}
void on_start() override {
// set thread name, log details ...
}
bool is_message_available() override {
// return true if _message_queue->queue is non-empty
return !_message_queue->queue.empty();
}
MessageDataType get_message() override {
// pop from front/back depending on queue's order type
// update counters, logging, etc.
// then return the retrieved message
}
void on_message(const MessageDataType& message_data) noexcept override {
// the actual user-defined message handler logic
}
void on_exit() override {
// final cleanup if needed
}
private:
std::shared_ptr<message_queue> _message_queue;
// other fields ...
};
Lifecycle Details
Construction
The static method create(...) returns a std::unique_ptr<Derived>. Internally, it:
- Allocates the derived object via
std::make_unique<Derived>(construction_token{}, ...). - Stores shared pointers to a mutex (
_mtx) and a condition variable (_cv). - Launches the thread with a private
worker()method, which callson_start()and enters the loop.
Running Loop
- While
_runningis true:- Acquire the mutex and check
is_message_available(). - If no message is available, wait on
_cv->wait(...). - Otherwise, call
get_message(), then outside the lock, callon_message(...).
- Acquire the mutex and check
- When
_runningis set to false (e.g., in the destructor), the loop ends.
Destruction
- In the
threaded_objectdestructor, we set_runningto false, notify the condition variable, and join the thread. - Just before the thread exits, it calls
on_exit().
Key Points
- This design ensures that each derived object can hold custom data and logic while the
threaded_objectbase class handles synchronization and clean shutdown. - You do not instantiate derived classes directly;
create()is the recommended pattern. - Condition variables help avoid spinning or polling; the thread sleeps if there's no current work, then wakes up when data arrives.
Unit Test References
Please see test_threaded_object.cpp for a unit test of how threaded_object gets subclassed and used. This includes checks that:
on_start()runs in the spawned thread.- The destructor cleanly joins the thread.
- The derived class flags
_has_runand_finishedare updated as expected.
All these confirm the reliable lifecycle approach that threaded_object provides.