message_manager

Overview

cbeam::concurrency::message_manager manages asynchronous message queues across multiple threads. It provides a flexible way to send, process, and wait for messages identified by numeric IDs. You can register multiple handlers per message ID, each running in its own thread. This supports several order types, including FIFO, FILO, and even RANDOM.

This class is header-only, requires no dependencies on external libraries (notably, no Boost), and works cross-platform with native synchronization primitives. It is designed to facilitate concurrency patterns involving large or small message volumes, while avoiding complexities typically found in frameworks that require file-based or OS-dependent overhead.

Key Features

  • Multiple Handlers: Each message ID can be processed by one or more handlers (threads).
  • Configurable Ordering: Choose FIFO, FILO, or RANDOM ordering. In some algorithmic scenarios—like flood-fill for raster graphics—RANDOM can help optimize queue lengths.
  • Efficient Thread Management: Handlers are instantiated on demand, and each handler runs until disposed, ensuring a clear lifecycle.
  • Thread-Safe Enqueuing: Messages can be sent concurrently from any thread, with an optional max queue size to prevent uncontrolled growth.
  • Wait Mechanisms: Built-in wait_until_empty to ensure a queue is completely processed, making it simpler to enforce correct ordering or finalize results.

Contrast with Boost

Boost does not offer a direct equivalent to cbeam::concurrency::message_manager for in-process concurrency. The closest “message-based” library in Boost is Boost.MPI, which focuses on communication across multiple processes (often in high-performance computing clusters). By contrast, message_manager is designed for robust thread-to-thread messaging within a single process.

When you need an in-process message queue, you typically resort to raw std::condition_variable and manual queue handling (as shown in cppreference.com’s example). However, this is error-prone—easy to introduce deadlocks or race conditions if not done carefully. message_manager provides a ready-to-use abstraction on top of standard C++ synchronization, adding minimal overhead while significantly reducing the risk of concurrency bugs.

For cases where you need to exchange data between shared libraries (e.g., plugins) that might not be ABI-compatible, cbeam::concurrency::message_manager can also be combined with Cbeam’s serialization features. For instance, a nested_map can store complex data structures that are safely serialized and deserialized, enabling seamless message passing between components compiled with different compilers or compiler versions.

Basic Usage Example

#include <cbeam/concurrency/message_manager.hpp>

static bool is_prime(uint64_t n) {
    // trivial prime check
    if (n < 2) return false;
    if (n % 2 == 0) return (n == 2);
    for (uint64_t i = 3, stop = static_cast<uint64_t>(std::sqrt(n)); i <= stop; i += 2) {
        if (n % i == 0) return false;
    }
    return true;
}

constexpr size_t check_prime = 1;
constexpr size_t count_prime = 2;

int main() {
    cbeam::concurrency::message_manager<uint64_t> mm;
    std::atomic<size_t> prime_count{0};

    mm.add_handler(count_prime, [&](uint64_t) {
        ++prime_count;
    });

    mm.add_handler(check_prime, [&](uint64_t number) {
        if (is_prime(number)) {
            mm.send_message(count_prime, number);
        }
    });

    // Enqueue many numbers
    for (uint64_t i = 1; i <= 100000; ++i) {
        mm.send_message(check_prime, i);
    }

    // Wait until queues are empty
    mm.wait_until_empty(check_prime);
    mm.wait_until_empty(count_prime);

    // Dispose handlers
    mm.dispose(check_prime);
    mm.dispose(count_prime);

    // prime_count now holds total primes in [1..100000]
    return 0;
}

Additional Details

  • Use add_handler to attach a new thread/handler for a given message ID.
  • Use send_message to enqueue work in a thread-safe way.
  • wait_until_empty is vital for synchronization points, ensuring all tasks have completed before proceeding.
  • Each handler can optionally log messages for debugging, and the library’s message_logger_type allows custom logging integration.