35#include <condition_variable>
43#include <unordered_set>
100 template <
typename MessageDataType>
135 std::deque<MessageDataType> queue;
136 std::shared_ptr<std::mutex> queue_mutex{std::make_shared<std::mutex>()};
137 std::shared_ptr<std::condition_variable> queue_cv{std::make_shared<std::condition_variable>()};
138 std::condition_variable queue_cv_empty;
139 std::atomic<std::size_t> busy_count{0};
141 std::mutex message_logger_mutex;
144 class message_handler :
public threaded_object<message_handler, MessageDataType>
148 const std::shared_ptr<message_queue>& queue,
150 std::function<
void(MessageDataType message_type)> on_message,
151 const std::function<
void(
const std::exception& exception)>& on_exception,
152 const std::function<
void()>& on_exit,
153 const std::string& thread_name,
156 , _message_queue{queue}
157 , _message_id{message_id}
158 , _on_message{on_message}
159 , _on_exception{on_exception}
161 , _thread_name{thread_name}
162 , _order{order} {
CBEAM_LOG_DEBUG(
"cbeam::concurrency::message_manager: constructor for thread + '" + thread_name +
"' received message queue " +
cbeam::convert::to_string(&queue->queue) +
" to message handler"); }
164 ~message_handler()
override =
default;
166 void on_start()
override
172 bool is_message_available()
override {
return !_message_queue->queue.empty(); }
174 MessageDataType get_message()
override
176 MessageDataType message;
181 message = _message_queue->queue.front();
182 _message_queue->queue.pop_front();
185 message = _message_queue->queue.back();
186 _message_queue->queue.pop_back();
191 message = _message_queue->queue.at(rnd);
192 _message_queue->queue.erase(_message_queue->queue.begin() + rnd);
197 _message_queue->busy_count.fetch_add(1, std::memory_order_acq_rel);
204 void on_message(const MessageDataType& message_data) noexcept
override
209 std::lock_guard<std::mutex> lock_logger(_message_queue->message_logger_mutex);
210 if (_message_queue->message_logger) { _message_queue->message_logger(_message_id, message_data,
false); }
213 _on_message(message_data);
215 catch (
const std::exception& ex)
221 auto still_busy = _message_queue->busy_count.fetch_sub(1, std::memory_order_acq_rel) - 1;
225 std::lock_guard<std::mutex> lock_queue_mutex(*_message_queue->queue_mutex);
228 if (_message_queue->queue.empty() && still_busy == 0)
230 _message_queue->queue_cv_empty.notify_all();
234 void on_exit()
override { _on_exit(); }
237 std::shared_ptr<message_queue> _message_queue;
239 std::function<void(MessageDataType message_data)> _on_message;
240 std::function<void(
const std::exception& exception)> _on_exception;
241 std::function<void()> _on_exit;
242 std::string _thread_name;
246 std::mutex _threads_mutex;
247 std::map<message_id_type, std::unordered_set<std::unique_ptr<message_handler>>> _threads;
248 std::mutex _message_queues_mutex;
249 std::map<message_id_type, std::shared_ptr<message_queue>> _message_queues;
267 std::shared_ptr<message_queue> queue;
269 std::lock_guard<std::mutex> lock(_message_queues_mutex);
270 queue = _message_queues[message_id];
271 if (!queue) { queue = _message_queues[message_id] = std::make_shared<message_queue>(); }
275 std::unique_lock<std::mutex> lock(*queue->queue_mutex);
276 queue->queue_cv->wait(lock, [&]
277 {
return max_queued_messages == 0 || queue->queue.size() < max_queued_messages; });
278 queue->queue.emplace_back(message_data);
282 queue->queue_cv->notify_all();
284 std::lock_guard<std::mutex> lock_logger(queue->message_logger_mutex);
285 if (queue->message_logger) { queue->message_logger(message_id, message_data,
true); }
304 std::function<
void(MessageDataType message_data)> on_message,
305 std::function<
void(
const std::exception& exception)> on_exception =
nullptr,
306 std::function<
void()> on_exit =
nullptr,
307 const std::string& thread_name = {},
310 std::lock_guard<std::mutex> lock(_threads_mutex);
311 std::lock_guard<std::mutex> lock2(_message_queues_mutex);
313 auto& queue = _message_queues[message_id];
314 if (!queue) { queue = std::make_shared<message_queue>(); }
323 thread_name +
"_" + std::to_string(message_id),
329 std::shared_ptr<message_queue> queue;
331 std::lock_guard<std::mutex> lock(_message_queues_mutex);
332 auto it = _message_queues.find(message_id);
333 if (it == _message_queues.end() || !(it->second))
341 std::unique_lock<std::mutex> lock_q(*queue->queue_mutex);
343 queue->queue_cv_empty.wait(lock_q, [&]
344 {
return queue->queue.empty() && (queue->busy_count.load(std::memory_order_acquire) == 0); });
349 std::lock_guard<std::mutex> lock(_threads_mutex);
350 std::lock_guard<std::mutex> lock2(_message_queues_mutex);
351 _threads[message_id].clear();
357 std::lock_guard<std::mutex> lock_logger(_message_queues[message_id]->message_logger_mutex);
358 _message_queues[message_id]->message_logger = on_message;
Definition message_manager.hpp:102
void send_message(const message_id_type message_id, const MessageDataType &message_data, const std::size_t max_queued_messages=0)
Sends a message of specified ID and data.
Definition message_manager.hpp:265
void wait_until_empty(const message_id_type message_id)
Definition message_manager.hpp:327
std::size_t message_id_type
Definition message_manager.hpp:104
std::function< void(message_id_type, MessageDataType, bool)> message_logger_type
Typedef for a message logging function.
Definition message_manager.hpp:112
void add_handler(const message_id_type message_id, std::function< void(MessageDataType message_data)> on_message, std::function< void(const std::exception &exception)> on_exception=nullptr, std::function< void()> on_exit=nullptr, const std::string &thread_name={}, const order_type order=order_type::FIFO)
Adds a message handler for a specified message ID.
Definition message_manager.hpp:303
void dispose(const message_id_type message_id)
Definition message_manager.hpp:347
order_type
Enum class defining the order in which messages are processed.
Definition message_manager.hpp:126
@ RANDOM
Process messages in a random order, reducing path dependencies in certain algorithms but less efficie...
Definition message_manager.hpp:129
@ FILO
Process the most recently received message first.
Definition message_manager.hpp:128
@ FIFO
Process messages in the order they were received.
Definition message_manager.hpp:127
void set_logger(const message_id_type message_id, message_logger_type on_message)
Definition message_manager.hpp:354
static std::unique_ptr< message_handler > create(std::shared_ptr< std::mutex > mtx, std::shared_ptr< std::condition_variable > cv, Args &&... args)
Definition threaded_object.hpp:149
threaded_object()=default
Header file containing macros for compiler compatibility and warning suppression.
#define CBEAM_SUPPRESS_WARNINGS_PUSH()
Definition compiler_compatibility.hpp:64
#define CBEAM_SUPPRESS_WARNINGS_POP()
Definition compiler_compatibility.hpp:86
#define CBEAM_LOG_DEBUG(s)
Logs a debug message if CBEAM_DEBUG_LOGGING is enabled.
Definition log_manager.hpp:138
Provides concurrency primitives and abstractions for multithreaded programming. It features the power...
Definition message_manager.hpp:47
void set_thread_name(uint32_t dwThreadID, const char *thread_name)
Sets the name for a thread with a specified Thread ID.
Definition thread.hpp:92
std::string to_string(const container::buffer &b)
Creates a std::string from the contents of a container::buffer.
Definition buffer.hpp:42
std::size_t random_number(const std::size_t n, std::mt19937 &gen=default_generator())
Returns a random number in the range [0, n-1].
Definition generators.hpp:52