Cbeam
Loading...
Searching...
No Matches
threaded_object.hpp
Go to the documentation of this file.
1/*
2Copyright (c) 2025 acrion innovations GmbH
3Authors: Stefan Zipproth, s.zipproth@acrion.ch
4
5This file is part of Cbeam, see https://github.com/acrion/cbeam and https://cbeam.org
6
7Cbeam is offered under a commercial and under the AGPL license.
8For commercial licensing, contact us at https://acrion.ch/sales. For AGPL licensing, see below.
9
10AGPL licensing:
11
12Cbeam is free software: you can redistribute it and/or modify
13it under the terms of the GNU Affero General Public License as published by
14the Free Software Foundation, either version 3 of the License, or
15(at your option) any later version.
16
17Cbeam is distributed in the hope that it will be useful,
18but WITHOUT ANY WARRANTY; without even the implied warranty of
19MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20GNU Affero General Public License for more details.
21
22You should have received a copy of the GNU Affero General Public License
23along with Cbeam. If not, see <https://www.gnu.org/licenses/>.
24*/
25
26#pragma once
27
29
30#include <atomic> // for std::atomic
31#include <condition_variable> // for std::condition_variable
32#include <functional>
33#include <memory> // for std::unique_ptr, std::make_unique
34#include <mutex> // for std::mutex, std::lock_guard, std::unique_lock
35#include <set> // for std::set, std::operator!=, std::_Rb_tree_const_iterator
36#include <string>
37#include <thread> // for std::thread
38#include <utility> // for std::forward
39
40namespace cbeam::concurrency
41{
95 template <typename Derived, typename MessageDataType>
97 {
98 public:
99 threaded_object() = default;
108 virtual ~threaded_object() noexcept
109 {
110 {
111 std::unique_lock<std::mutex> lock(*_mtx);
112 _running = false;
113 }
114 _cv->notify_all();
115
116 if (_t.joinable())
117 {
118 try
119 {
120 _t.join();
121 }
122 catch (const std::system_error& ex)
123 {
124 CBEAM_LOG(std::string{"cbeam::concurrency::threaded_object::~threaded_object: "} + ex.what());
125 assert(false);
126 }
127 }
128 }
129
148 template <typename... Args>
149 static std::unique_ptr<Derived> create(std::shared_ptr<std::mutex> mtx, std::shared_ptr<std::condition_variable> cv, Args&&... args)
150 {
151 std::unique_ptr<Derived> obj = std::make_unique<Derived>(construction_token{}, std::forward<Args>(args)...);
152 obj->_mtx = mtx;
153 obj->_cv = cv;
154 obj->_t = std::thread(&threaded_object::worker, obj.get());
155 return obj;
156 }
157
158 protected:
159 /* @brief Construction token for controlled instantiation of derived classes.
160 */
161 class construction_token
162 {
163 friend class threaded_object<Derived, MessageDataType>;
164 construction_token() {}
165 };
166
167 virtual void on_start()
168 {
169 }
170
171 virtual bool is_message_available()
172 {
173 return false;
174 }
175
176 virtual MessageDataType get_message()
177 {
178 return {};
179 }
180
181 virtual void on_message(const MessageDataType& /*message_data*/) noexcept
182 {
183 }
184
185 virtual void on_exit()
186 {
187 }
188
189 std::shared_ptr<std::mutex> _mtx;
190 std::shared_ptr<std::condition_variable> _cv;
191
192 private:
193 void worker()
194 {
195 on_start();
196
197 while (_running)
198 {
199 MessageDataType message_data;
200
201 {
202 std::unique_lock<std::mutex> lock(*_mtx);
203
204 // We might have missed a notification during previous work outside this lock,
205 // so check if a message is available prior waiting for a notification.
206 if (!is_message_available() && _running)
207 {
208 // wait for other thread to notify us via _cv->notify_one() or _cv->notify_all()
209 _cv->wait(lock, [&]
210 { return is_message_available() || !_running; });
211 }
212
213 if (_running)
214 {
215 message_data = get_message();
216 }
217 }
218
219 if (_running)
220 {
221 on_message(message_data);
222 }
223 }
224
225 on_exit();
226 }
227
228 std::thread _t;
229 std::atomic<bool> _running{true};
230
231 threaded_object(const threaded_object&) = delete;
232
233 threaded_object& operator=(const threaded_object&) = delete;
234 };
235}
std::shared_ptr< std::condition_variable > _cv
Definition threaded_object.hpp:190
virtual bool is_message_available()
Definition threaded_object.hpp:171
virtual void on_exit()
Definition threaded_object.hpp:185
std::shared_ptr< std::mutex > _mtx
Definition threaded_object.hpp:189
virtual ~threaded_object() noexcept
Destructor. Safely shuts down the managed thread by setting _running to false, notifying the conditio...
Definition threaded_object.hpp:108
static std::unique_ptr< Derived > create(std::shared_ptr< std::mutex > mtx, std::shared_ptr< std::condition_variable > cv, Args &&... args)
Definition threaded_object.hpp:149
virtual void on_start()
Definition threaded_object.hpp:167
virtual MessageDataType get_message()
Definition threaded_object.hpp:176
virtual void on_message(const MessageDataType &) noexcept
Definition threaded_object.hpp:181
#define CBEAM_LOG(s)
Logs a message using cbeam::logging::log_manager.
Definition log_manager.hpp:124
Provides concurrency primitives and abstractions for multithreaded programming. It features the power...
Definition message_manager.hpp:47