threaded_object

Overview

cbeam::concurrency::threaded_object is a helper class template based on the Curiously Recurring Template Pattern (CRTP). It spawns and manages a dedicated thread that executes user-defined logic. You derive your own class from threaded_object<Derived, MessageDataType> and implement specific methods such as on_start(), is_message_available(), get_message(), on_message(), and on_exit().

When your derived object is created via the static create() method, the base class starts a new thread. That thread will:

  1. Call on_start() once, before entering the main loop.
  2. In the loop, check is_message_available() to decide whether to process work or wait on a condition variable.
  3. If work is available, get_message() retrieves a single unit of work (e.g., a message or task).
  4. on_message() is then invoked to process that retrieved message or task.
  5. When threaded_object is destroyed (or the _running flag is set to false internally), on_exit() is called just before the thread terminates.

The threaded_object automatically joins its thread in the destructor, ensuring a clean shutdown with no resource leaks.

How to Derive and Use

1. Derive a Class

Below is a minimal example based on the unit tests (test_threaded_object.cpp). Notice how we override only on_start() and on_exit() for demonstration, while is_message_available() remains false in the base class (so the thread effectively waits until destruction). In other scenarios, you might override all relevant methods.

#include <cbeam/concurrency/threaded_object.hpp>
#include <atomic>
#include <chrono>
#include <thread>

class MyThreadedExample
  : public cbeam::concurrency::threaded_object<MyThreadedExample, int>
{
public:
    // The constructor must accept a `construction_token` as its first parameter
    MyThreadedExample(typename cbeam::concurrency::threaded_object<MyThreadedExample, int>::construction_token)
    {
        // Any extra initialization goes here
    }

    void on_start() override {
        // Called once when the thread begins
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
        _has_run = true;
    }

    // is_message_available() and get_message() can be overridden
    // if you need to handle actual queue-based data. For this demo,
    // we leave them as inherited from the base class, which does nothing.

    // on_message(...) is similarly optional if you do want to process messages.

    void on_exit() override {
        // Called right before the thread returns
        _finished = true;
    }

    // Public or private flags that let you observe the thread’s work
    std::atomic<bool> _has_run{false};
    std::atomic<bool> _finished{false};
};

2. Instantiate via create()

Because the constructor is protected by a construction_token, you cannot construct MyThreadedExample directly. Instead, call:

#include <cbeam/concurrency/threaded_object.hpp>
#include <memory>
#include <mutex>
#include <condition_variable>

int main() {
    // Shared mutex and condition variable 
    // (used internally by threaded_object to wait/notify)
    auto mtx = std::make_shared<std::mutex>();
    auto cv  = std::make_shared<std::condition_variable>();

    // Create your derived object and start the thread
    auto obj = cbeam::concurrency::threaded_object<MyThreadedExample, int>
                   ::create(mtx, cv);

    // The new thread calls on_start() internally

    // ... do other work ...

    // On scope exit or obj.reset(), destructor joins the thread 
    // and calls on_exit(). 
    return 0;
}

3. Overriding is_message_available(), get_message(), and on_message()

For more complex scenarios (for example, you want a real message queue), you can override:

  • is_message_available(): Return true if you have data (like a non-empty queue).
  • get_message(): Pop or retrieve the front of that data queue.
  • on_message(...): Process the retrieved item.

Real-World Example in message_manager

In message_manager.hpp, threaded_object::create(...) is used to spawn a new thread that processes messages in a loop, waiting on a condition variable between iterations if no message is available:

class message_handler : public threaded_object<message_handler, MessageDataType> {
public:
    message_handler(typename threaded_object<message_handler, MessageDataType>::construction_token,
                    const std::shared_ptr<message_queue>& queue,
                    /* ... other params ... */)
        : threaded_object<message_handler, MessageDataType>{}
        , _message_queue{queue}
        // etc...
    {}

    void on_start() override {
        // set thread name, log details ...
    }

    bool is_message_available() override {
        // return true if _message_queue->queue is non-empty
        return !_message_queue->queue.empty();
    }

    MessageDataType get_message() override {
        // pop from front/back depending on queue's order type
        // update counters, logging, etc.
        // then return the retrieved message
    }

    void on_message(const MessageDataType& message_data) noexcept override {
        // the actual user-defined message handler logic
    }

    void on_exit() override {
        // final cleanup if needed
    }

private:
    std::shared_ptr<message_queue> _message_queue;
    // other fields ...
};

Lifecycle Details

Construction

The static method create(...) returns a std::unique_ptr<Derived>. Internally, it:

  • Allocates the derived object via std::make_unique<Derived>(construction_token{}, ...).
  • Stores shared pointers to a mutex (_mtx) and a condition variable (_cv).
  • Launches the thread with a private worker() method, which calls on_start() and enters the loop.

Running Loop

  • While _running is true:
    • Acquire the mutex and check is_message_available().
    • If no message is available, wait on _cv->wait(...).
    • Otherwise, call get_message(), then outside the lock, call on_message(...).
  • When _running is set to false (e.g., in the destructor), the loop ends.

Destruction

  • In the threaded_object destructor, we set _running to false, notify the condition variable, and join the thread.
  • Just before the thread exits, it calls on_exit().

Key Points

  • This design ensures that each derived object can hold custom data and logic while the threaded_object base class handles synchronization and clean shutdown.
  • You do not instantiate derived classes directly; create() is the recommended pattern.
  • Condition variables help avoid spinning or polling; the thread sleeps if there's no current work, then wakes up when data arrives.

Unit Test References

Please see test_threaded_object.cpp for a unit test of how threaded_object gets subclassed and used. This includes checks that:

  • on_start() runs in the spawned thread.
  • The destructor cleanly joins the thread.
  • The derived class flags _has_run and _finished are updated as expected.

All these confirm the reliable lifecycle approach that threaded_object provides.