paho-mqtt-cpp
MQTT C++ Client for POSIX and Windows
Loading...
Searching...
No Matches
thread_queue.h
Go to the documentation of this file.
1
8
9/*******************************************************************************
10 * Copyright (c) 2017-2022 Frank Pagliughi <fpagliughi@mindspring.com>
11 *
12 * All rights reserved. This program and the accompanying materials
13 * are made available under the terms of the Eclipse Public License v2.0
14 * and Eclipse Distribution License v1.0 which accompany this distribution.
15 *
16 * The Eclipse Public License is available at
17 * http://www.eclipse.org/legal/epl-v20.html
18 * and the Eclipse Distribution License is available at
19 * http://www.eclipse.org/org/documents/edl-v10.php.
20 *
21 * Contributors:
22 * Frank Pagliughi - initial implementation and documentation
23 *******************************************************************************/
24
25#ifndef __mqtt_thread_queue_h
26#define __mqtt_thread_queue_h
27
28#include <thread>
29#include <mutex>
30#include <condition_variable>
31#include <limits>
32#include <deque>
33#include <queue>
34#include <algorithm>
35
36namespace mqtt {
37
39
68template <typename T, class Container=std::deque<T>>
70{
71public:
73 using container_type = Container;
75 using value_type = T;
77 using size_type = typename Container::size_type;
78
80 static constexpr size_type MAX_CAPACITY = std::numeric_limits<size_type>::max();
81
82private:
84 mutable std::mutex lock_;
86 std::condition_variable notEmptyCond_;
88 std::condition_variable notFullCond_;
90 size_type cap_;
92 std::queue<T,Container> que_;
93
95 using guard = std::lock_guard<std::mutex>;
97 using unique_guard = std::unique_lock<std::mutex>;
98
99public:
109 explicit thread_queue(size_t cap) : cap_(std::max<size_type>(cap, 1)) {}
115 bool empty() const {
116 guard g(lock_);
117 return que_.empty();
118 }
124 guard g(lock_);
125 return cap_;
126 }
133 void capacity(size_type cap) {
134 guard g(lock_);
135 cap_ = cap;
136 }
141 size_type size() const {
142 guard g(lock_);
143 return que_.size();
144 }
151 void put(value_type val) {
152 unique_guard g(lock_);
153 notFullCond_.wait(g, [this]{return que_.size() < cap_;});
154
155 que_.emplace(std::move(val));
156 g.unlock();
157 notEmptyCond_.notify_one();
158 }
165 bool try_put(value_type val) {
166 unique_guard g(lock_);
167 if (que_.size() >= cap_)
168 return false;
169
170 que_.emplace(std::move(val));
171 g.unlock();
172 notEmptyCond_.notify_one();
173 return true;
174 }
184 template <typename Rep, class Period>
185 bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
186 unique_guard g(lock_);
187 if (!notFullCond_.wait_for(g, relTime, [this]{return que_.size() < cap_;}))
188 return false;
189
190 que_.emplace(std::move(val));
191 g.unlock();
192 notEmptyCond_.notify_one();
193 return true;
194 }
205 template <class Clock, class Duration>
206 bool try_put_until(value_type val, const std::chrono::time_point<Clock,Duration>& absTime) {
207 unique_guard g(lock_);
208 if (!notFullCond_.wait_until(g, absTime, [this]{return que_.size() < cap_;}))
209 return false;
210
211 que_.emplace(std::move(val));
212 g.unlock();
213 notEmptyCond_.notify_one();
214 return true;
215 }
222 void get(value_type* val) {
223 if (!val)
224 return;
225
226 unique_guard g(lock_);
227 notEmptyCond_.wait(g, [this]{return !que_.empty();});
228
229 *val = std::move(que_.front());
230 que_.pop();
231 g.unlock();
232 notFullCond_.notify_one();
233 }
241 unique_guard g(lock_);
242 notEmptyCond_.wait(g, [this]{return !que_.empty();});
243
244 value_type val = std::move(que_.front());
245 que_.pop();
246 g.unlock();
247 notFullCond_.notify_one();
248 return val;
249 }
258 bool try_get(value_type* val) {
259 if (!val)
260 return false;
261
262 unique_guard g(lock_);
263 if (que_.empty())
264 return false;
265
266 *val = std::move(que_.front());
267 que_.pop();
268 g.unlock();
269 notFullCond_.notify_one();
270 return true;
271 }
282 template <typename Rep, class Period>
283 bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
284 if (!val)
285 return false;
286
287 unique_guard g(lock_);
288 if (!notEmptyCond_.wait_for(g, relTime, [this]{return !que_.empty();}))
289 return false;
290
291 *val = std::move(que_.front());
292 que_.pop();
293 g.unlock();
294 notFullCond_.notify_one();
295 return true;
296 }
307 template <class Clock, class Duration>
308 bool try_get_until(value_type* val, const std::chrono::time_point<Clock,Duration>& absTime) {
309 if (!val)
310 return false;
311
312 unique_guard g(lock_);
313 if (!notEmptyCond_.wait_until(g, absTime, [this]{return !que_.empty();}))
314 return false;
315
316 *val = std::move(que_.front());
317 que_.pop();
318 g.unlock();
319 notFullCond_.notify_one();
320 return true;
321 }
322};
323
325// end namespace mqtt
326}
327
328#endif // __mqtt_thread_queue_h
329
Definition thread_queue.h:70
typename Container::size_type size_type
Definition thread_queue.h:77
T value_type
Definition thread_queue.h:75
size_type size() const
Definition thread_queue.h:141
void capacity(size_type cap)
Definition thread_queue.h:133
Container container_type
Definition thread_queue.h:73
bool try_put(value_type val)
Definition thread_queue.h:165
bool try_get_for(value_type *val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:283
bool try_put_for(value_type val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:185
static constexpr size_type MAX_CAPACITY
Definition thread_queue.h:80
bool try_get(value_type *val)
Definition thread_queue.h:258
thread_queue()
Definition thread_queue.h:103
bool try_put_until(value_type val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:206
void get(value_type *val)
Definition thread_queue.h:222
bool empty() const
Definition thread_queue.h:115
size_type capacity() const
Definition thread_queue.h:123
thread_queue(size_t cap)
Definition thread_queue.h:109
void put(value_type val)
Definition thread_queue.h:151
value_type get()
Definition thread_queue.h:240
bool try_get_until(value_type *val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:308
Definition async_client.h:49