-
Notifications
You must be signed in to change notification settings - Fork 1
/
concurrent_queue.tcc
105 lines (92 loc) · 2.18 KB
/
concurrent_queue.tcc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#define concurrent_queue_tcc
#include <stdexcept>
#include "concurrent_queue.h"
namespace kaiu {
template <typename T>
ConcurrentQueue<T>::ConcurrentQueue(bool nowaiting)
: nowaiting(nowaiting)
{
}
template <typename T>
void ConcurrentQueue<T>::push(const T& item)
{
std::lock_guard<std::mutex> lock(queue_mutex);
events.push(item);
notify();
}
template <typename T>
void ConcurrentQueue<T>::push(T&& item)
{
std::lock_guard<std::mutex> lock(queue_mutex);
events.push(std::move(item));
notify();
}
template <typename T>
template <typename... Args> void ConcurrentQueue<T>::emplace(Args&&... args)
{
std::lock_guard<std::mutex> lock(queue_mutex);
events.emplace(std::forward<Args...>(args...));
notify();
}
template <typename T>
bool ConcurrentQueue<T>::pop(T& out)
{
return pop<bool>(out, false);
}
template <typename T>
template <typename WaitGuard, typename... GuardParam>
bool ConcurrentQueue<T>::pop(T& out, GuardParam&&... guard_param)
{
/* Lock the queue */
std::unique_lock<std::mutex> lock(queue_mutex);
/* Queue is always locked when this is called */
auto end_wait_condition = [this] {
return is_nowaiting() || !events.empty();
};
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-variable"
if (!end_wait_condition()) {
/* Externally supplied wait callback guard */
WaitGuard guard(std::forward<GuardParam>(guard_param)...);
/*
* Unlocks queue, re-locks it when calling end_wait_condition and upon
* return
*/
unblock.wait(lock, end_wait_condition);
}
#pragma GCC diagnostic pop
/* Queue is locked at this point whether or not we waited */
if (events.empty()) {
return false;
}
out = std::move(events.front());
events.pop();
return true;
}
template <typename T>
void ConcurrentQueue<T>::notify()
{
unblock.notify_one();
}
template <typename T>
void ConcurrentQueue<T>::set_nowaiting(bool value)
{
nowaiting = value;
unblock.notify_all();
}
template <typename T>
bool ConcurrentQueue<T>::is_nowaiting() const
{
return nowaiting;
}
template <typename T>
bool ConcurrentQueue<T>::isEmpty(bool is_locked) const
{
if (is_locked) {
return events.empty();
} else {
std::lock_guard<std::mutex> lock(queue_mutex);
return events.empty();
}
}
}