Cbeam
Loading...
Searching...
No Matches
cbeam/concurrency/message_manager.hpp

Manages message queues and handlers for asynchronous message processing.

Manages message queues and handlers for asynchronous message processing.The message_manager class template is designed for efficient management of asynchronous message processing in a multithreaded environment. It allows sending, receiving, and handling of messages of a specified type. The messages are processed by message handlers which are managed by this class.

MessageDataType The type of message data that will be managed by this instance. For non-Plain Old Data (POD) types, unnecessary copying can be avoided by encapsulating the message in a std::shared_ptr, thus enabling efficient handling of complex data structures.

constexpr size_t check_prime = 1;
constexpr size_t count_prime = 2;
// shared counter to track how many primes are found (our result)
std::atomic<size_t> prime_count{0};
// handler that counts primes
mm.add_handler(count_prime, [&](uint64_t msg) { ++prime_count; });
// Handler that checks primality, and if prime => sends a message to `count_prime`
// Note: You can add more than one handler for an ID (such as check_prime)
// to distribute the CPU load across multiple cores.
mm.add_handler(check_prime, [&](uint64_t number_to_check)
{
if (is_prime(number_to_check))
{
mm.send_message(count_prime, number_to_check);
}
};
// Send all numbers from 1 to 100001 to `check_prime`
for (uint64_t number_to_check = 1; number_to_check <= 100001; ++number_to_check)
{
mm.send_message(check_prime, uint64_t{number_to_check});
}
// Now wait for the check_prime queue to be empty
// (i.e. all primality checks have been popped and processed)
mm.wait_until_empty(check_prime);
// Also wait until the count_prime queue is empty.
// This ensures that all prime-sending messages have been popped and processed.
mm.wait_until_empty(count_prime);
// Dispose the handlers
mm.dispose(check_prime);
mm.dispose(count_prime);
Definition message_manager.hpp:102
void send_message(const message_id_type message_id, const MessageDataType &message_data, const std::size_t max_queued_messages=0)
Sends a message of specified ID and data.
Definition message_manager.hpp:265
void wait_until_empty(const message_id_type message_id)
Definition message_manager.hpp:327
void add_handler(const message_id_type message_id, std::function< void(MessageDataType message_data)> on_message, std::function< void(const std::exception &exception)> on_exception=nullptr, std::function< void()> on_exit=nullptr, const std::string &thread_name={}, const order_type order=order_type::FIFO)
Adds a message handler for a specified message ID.
Definition message_manager.hpp:303
void dispose(const message_id_type message_id)
Definition message_manager.hpp:347