Cbeam
Loading...
Searching...
No Matches
message_manager.hpp
Go to the documentation of this file.
1/*
2Copyright (c) 2025 acrion innovations GmbH
3Authors: Stefan Zipproth, s.zipproth@acrion.ch
4
5This file is part of Cbeam, see https://github.com/acrion/cbeam and https://cbeam.org
6
7Cbeam is offered under a commercial and under the AGPL license.
8For commercial licensing, contact us at https://acrion.ch/sales. For AGPL licensing, see below.
9
10AGPL licensing:
11
12Cbeam is free software: you can redistribute it and/or modify
13it under the terms of the GNU Affero General Public License as published by
14the Free Software Foundation, either version 3 of the License, or
15(at your option) any later version.
16
17Cbeam is distributed in the hope that it will be useful,
18but WITHOUT ANY WARRANTY; without even the implied warranty of
19MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20GNU Affero General Public License for more details.
21
22You should have received a copy of the GNU Affero General Public License
23along with Cbeam. If not, see <https://www.gnu.org/licenses/>.
24*/
25
26#pragma once
27
33
34#include <atomic> // for std::atomic
35#include <condition_variable> // for std::condition_variable
36#include <deque>
37#include <functional>
38#include <map>
39#include <memory> // for std::unique_ptr, std::make_unique
40#include <mutex> // for std::mutex, std::lock_guard, std::unique_lock
41#include <set> // for std::set, std::operator!=, std::_Rb_tree_const_iterator
42#include <thread> // for std::thread
43#include <unordered_set>
44#include <utility> // for std::forward
45
47{
100 template <typename MessageDataType>
102 {
103 public:
104 using message_id_type = std::size_t;
105
112 using message_logger_type = std::function<void(message_id_type, MessageDataType, bool)>;
113
125 enum class order_type
126 {
130 };
131
132 private:
133 struct message_queue
134 {
135 std::deque<MessageDataType> queue;
136 std::shared_ptr<std::mutex> queue_mutex{std::make_shared<std::mutex>()};
137 std::shared_ptr<std::condition_variable> queue_cv{std::make_shared<std::condition_variable>()};
138 std::condition_variable queue_cv_empty;
139 std::atomic<std::size_t> busy_count{0};
140 message_logger_type message_logger;
141 std::mutex message_logger_mutex;
142 };
143
144 class message_handler : public threaded_object<message_handler, MessageDataType>
145 {
146 public:
148 const std::shared_ptr<message_queue>& queue,
149 const message_id_type message_id,
150 std::function<void(MessageDataType message_type)> on_message,
151 const std::function<void(const std::exception& exception)>& on_exception,
152 const std::function<void()>& on_exit,
153 const std::string& thread_name,
154 const order_type order)
155 : threaded_object<message_handler, MessageDataType>{}
156 , _message_queue{queue}
157 , _message_id{message_id}
158 , _on_message{on_message}
159 , _on_exception{on_exception}
160 , _on_exit{on_exit}
161 , _thread_name{thread_name}
162 , _order{order} { CBEAM_LOG_DEBUG("cbeam::concurrency::message_manager: constructor for thread + '" + thread_name + "' received message queue " + cbeam::convert::to_string(&queue->queue) + " to message handler"); }
163
164 ~message_handler() override = default;
165
166 void on_start() override
167 {
168 set_thread_name(_thread_name.c_str());
169 CBEAM_LOG_DEBUG("cbeam::concurrency::message_manager: Thread '" + _thread_name + "' now waits for messages in queue " + cbeam::convert::to_string(&_message_queue->queue));
170 }
171
172 bool is_message_available() override { return !_message_queue->queue.empty(); }
173
174 MessageDataType get_message() override
175 {
176 MessageDataType message;
177
178 switch (_order)
179 {
180 case order_type::FIFO:
181 message = _message_queue->queue.front();
182 _message_queue->queue.pop_front();
183 break;
184 case order_type::FILO:
185 message = _message_queue->queue.back();
186 _message_queue->queue.pop_back();
187 break;
189 {
190 const size_t rnd = cbeam::random::random_number(_message_queue->queue.size());
191 message = _message_queue->queue.at(rnd);
192 _message_queue->queue.erase(_message_queue->queue.begin() + rnd);
193 break;
194 }
195 }
196
197 _message_queue->busy_count.fetch_add(1, std::memory_order_acq_rel);
198
199 CBEAM_SUPPRESS_WARNINGS_PUSH() // suppress false positive "may be used uninitialized"
200 return message;
202 }
203
204 void on_message(const MessageDataType& message_data) noexcept override
205 {
206 try
207 {
208 {
209 std::lock_guard<std::mutex> lock_logger(_message_queue->message_logger_mutex);
210 if (_message_queue->message_logger) { _message_queue->message_logger(_message_id, message_data, false); }
211 }
212
213 _on_message(message_data);
214 }
215 catch (const std::exception& ex)
216 {
217 _on_exception(ex);
218 }
219
220 // Decrement the busy_count now that we're done
221 auto still_busy = _message_queue->busy_count.fetch_sub(1, std::memory_order_acq_rel) - 1;
222
223 // We also need to ensure we hold the queue mutex to safely check queue->queue.empty().
224 // Because multiple threads might pop from the queue concurrently.
225 std::lock_guard<std::mutex> lock_queue_mutex(*_message_queue->queue_mutex);
226
227 // If the queue is empty and no thread is busy => notify waiting threads
228 if (_message_queue->queue.empty() && still_busy == 0)
229 {
230 _message_queue->queue_cv_empty.notify_all();
231 }
232 }
233
234 void on_exit() override { _on_exit(); }
235
236 private:
237 std::shared_ptr<message_queue> _message_queue;
238 const message_id_type _message_id;
239 std::function<void(MessageDataType message_data)> _on_message;
240 std::function<void(const std::exception& exception)> _on_exception;
241 std::function<void()> _on_exit;
242 std::string _thread_name;
243 order_type _order;
244 }; // class message_handler
245
246 std::mutex _threads_mutex;
247 std::map<message_id_type, std::unordered_set<std::unique_ptr<message_handler>>> _threads;
248 std::mutex _message_queues_mutex;
249 std::map<message_id_type, std::shared_ptr<message_queue>> _message_queues;
250
251 public:
265 void send_message(const message_id_type message_id, const MessageDataType& message_data, const std::size_t max_queued_messages = 0)
266 {
267 std::shared_ptr<message_queue> queue;
268 {
269 std::lock_guard<std::mutex> lock(_message_queues_mutex);
270 queue = _message_queues[message_id];
271 if (!queue) { queue = _message_queues[message_id] = std::make_shared<message_queue>(); }
272 }
273
274 {
275 std::unique_lock<std::mutex> lock(*queue->queue_mutex);
276 queue->queue_cv->wait(lock, [&]
277 { return max_queued_messages == 0 || queue->queue.size() < max_queued_messages; });
278 queue->queue.emplace_back(message_data);
279 }
280
281 CBEAM_LOG_DEBUG("cbeam::concurrency::message_manager(" + cbeam::convert::to_string(this) + ")::send_message: adding message to receiver " + std::to_string(message_id) + " to queue " + cbeam::convert::to_string(&queue->queue));
282 queue->queue_cv->notify_all();
283
284 std::lock_guard<std::mutex> lock_logger(queue->message_logger_mutex);
285 if (queue->message_logger) { queue->message_logger(message_id, message_data, true); }
286 }
287
303 void add_handler(const message_id_type message_id,
304 std::function<void(MessageDataType message_data)> on_message,
305 std::function<void(const std::exception& exception)> on_exception = nullptr,
306 std::function<void()> on_exit = nullptr,
307 const std::string& thread_name = {},
308 const order_type order = order_type::FIFO)
309 {
310 std::lock_guard<std::mutex> lock(_threads_mutex);
311 std::lock_guard<std::mutex> lock2(_message_queues_mutex);
312
313 auto& queue = _message_queues[message_id];
314 if (!queue) { queue = std::make_shared<message_queue>(); }
315 CBEAM_LOG_DEBUG("cbeam::concurrency::message_manager(" + cbeam::convert::to_string(this) + ")::add_handler(" + std::to_string(message_id) + ", ...): passing queue " + cbeam::convert::to_string(&queue->queue) + " to message handler");
316 _threads[message_id].emplace(message_handler::create(queue->queue_mutex,
317 queue->queue_cv,
318 queue,
319 message_id,
320 on_message,
321 on_exception,
322 on_exit,
323 thread_name + "_" + std::to_string(message_id),
324 order));
325 }
326
327 void wait_until_empty(const message_id_type message_id)
328 {
329 std::shared_ptr<message_queue> queue;
330 {
331 std::lock_guard<std::mutex> lock(_message_queues_mutex);
332 auto it = _message_queues.find(message_id);
333 if (it == _message_queues.end() || !(it->second))
334 {
335 // no known queue => nothing to wait for
336 return;
337 }
338 queue = it->second;
339 }
340
341 std::unique_lock<std::mutex> lock_q(*queue->queue_mutex);
342
343 queue->queue_cv_empty.wait(lock_q, [&]
344 { return queue->queue.empty() && (queue->busy_count.load(std::memory_order_acquire) == 0); });
345 }
346
347 void dispose(const message_id_type message_id)
348 {
349 std::lock_guard<std::mutex> lock(_threads_mutex);
350 std::lock_guard<std::mutex> lock2(_message_queues_mutex);
351 _threads[message_id].clear();
352 }
353
354 void set_logger(const message_id_type message_id,
355 message_logger_type on_message)
356 {
357 std::lock_guard<std::mutex> lock_logger(_message_queues[message_id]->message_logger_mutex);
358 _message_queues[message_id]->message_logger = on_message;
359 }
360 };
361}
Definition message_manager.hpp:102
void send_message(const message_id_type message_id, const MessageDataType &message_data, const std::size_t max_queued_messages=0)
Sends a message of specified ID and data.
Definition message_manager.hpp:265
void wait_until_empty(const message_id_type message_id)
Definition message_manager.hpp:327
std::size_t message_id_type
Definition message_manager.hpp:104
std::function< void(message_id_type, MessageDataType, bool)> message_logger_type
Typedef for a message logging function.
Definition message_manager.hpp:112
void add_handler(const message_id_type message_id, std::function< void(MessageDataType message_data)> on_message, std::function< void(const std::exception &exception)> on_exception=nullptr, std::function< void()> on_exit=nullptr, const std::string &thread_name={}, const order_type order=order_type::FIFO)
Adds a message handler for a specified message ID.
Definition message_manager.hpp:303
void dispose(const message_id_type message_id)
Definition message_manager.hpp:347
order_type
Enum class defining the order in which messages are processed.
Definition message_manager.hpp:126
@ RANDOM
Process messages in a random order, reducing path dependencies in certain algorithms but less efficie...
Definition message_manager.hpp:129
@ FILO
Process the most recently received message first.
Definition message_manager.hpp:128
@ FIFO
Process messages in the order they were received.
Definition message_manager.hpp:127
void set_logger(const message_id_type message_id, message_logger_type on_message)
Definition message_manager.hpp:354
static std::unique_ptr< message_handler > create(std::shared_ptr< std::mutex > mtx, std::shared_ptr< std::condition_variable > cv, Args &&... args)
Definition threaded_object.hpp:149
Header file containing macros for compiler compatibility and warning suppression.
#define CBEAM_SUPPRESS_WARNINGS_PUSH()
Definition compiler_compatibility.hpp:64
#define CBEAM_SUPPRESS_WARNINGS_POP()
Definition compiler_compatibility.hpp:86
#define CBEAM_LOG_DEBUG(s)
Logs a debug message if CBEAM_DEBUG_LOGGING is enabled.
Definition log_manager.hpp:138
Provides concurrency primitives and abstractions for multithreaded programming. It features the power...
Definition message_manager.hpp:47
void set_thread_name(uint32_t dwThreadID, const char *thread_name)
Sets the name for a thread with a specified Thread ID.
Definition thread.hpp:92
std::string to_string(const container::buffer &b)
Creates a std::string from the contents of a container::buffer.
Definition buffer.hpp:42
std::size_t random_number(const std::size_t n, std::mt19937 &gen=default_generator())
Returns a random number in the range [0, n-1].
Definition generators.hpp:52