forked from microsoft/arcana.cpp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathblocking_concurrent_queue.h
More file actions
167 lines (137 loc) · 4.21 KB
/
blocking_concurrent_queue.h
File metadata and controls
167 lines (137 loc) · 4.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#pragma once
#include "cancellation.h"
#include <atomic>
#include <condition_variable>
#include <limits>
#include <mutex>
#include <queue>
#include <utility>
#include <vector>
#ifdef ARCANA_TESTING_HOOKS
#include <functional>
#endif
namespace arcana
{
#ifdef ARCANA_TESTING_HOOKS
namespace detail
{
inline std::function<void()> beforeWaitCallback{[]() {}};
}
// Set a callback to be invoked while holding the queue mutex, right before
// condition_variable::wait(). This is used for deterministic testing of
// lost-wakeup race conditions. Pass an empty lambda [](){} to reset.
inline void set_before_wait_callback(std::function<void()> callback)
{
detail::beforeWaitCallback = std::move(callback);
}
#endif
template<typename T, size_t max_size = std::numeric_limits<size_t>::max()>
class blocking_concurrent_queue
{
// Reasoning 1: notify should be called outside the lock to avoid "hurry up and wait"
// http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one
public:
template<typename G>
void push(G&& data)
{
bool notify = false;
{
std::unique_lock<std::mutex> lock{ m_mutex };
notify = m_data.empty();
m_data.push(std::forward<G>(data));
while (m_data.size() > max_size)
{
m_data.pop();
}
}
if (notify)
{
// See Reasoning 1
m_dataReady.notify_one();
}
}
bool empty() const
{
std::unique_lock<std::mutex> lock{ m_mutex };
return m_data.empty();
}
bool blocking_pop(T& dest, const cancellation& cancel)
{
return internal_pop(dest, cancel, true);
}
bool blocking_drain(std::vector<T>& dest, const cancellation& cancel)
{
return internal_drain(dest, cancel, true);
}
bool try_pop(T& dest, const cancellation& cancel)
{
return internal_pop(dest, cancel, false);
}
bool try_drain(std::vector<T>& dest, const cancellation& cancel)
{
return internal_drain(dest, cancel, false);
}
void clear()
{
std::queue<T> empty;
{
std::unique_lock<std::mutex> lock{ m_mutex };
// swap with empty so that destruction of the queue items
// don't occure in the lock
std::swap(m_data, empty);
}
// See Reasoning 1
m_dataReady.notify_all();
}
void cancelled()
{
// See Reasoning 1
m_dataReady.notify_all();
}
private:
bool internal_pop(T& dest, const cancellation& cancel, bool block)
{
std::unique_lock<std::mutex> lock{ m_mutex };
if (block)
{
while (!cancel.cancelled() && m_data.empty())
{
#ifdef ARCANA_TESTING_HOOKS
detail::beforeWaitCallback();
#endif
m_dataReady.wait(lock);
}
}
if (m_data.empty() || cancel.cancelled())
return false;
dest = std::move(m_data.front());
m_data.pop();
return true;
}
bool internal_drain(std::vector<T>& dest, const cancellation& cancel, bool block)
{
std::unique_lock<std::mutex> lock{ m_mutex };
if (block)
{
while (!cancel.cancelled() && m_data.empty())
{
#ifdef ARCANA_TESTING_HOOKS
detail::beforeWaitCallback();
#endif
m_dataReady.wait(lock);
}
}
if (m_data.empty() || cancel.cancelled())
return false;
while (!m_data.empty())
{
dest.emplace_back(std::move(m_data.front()));
m_data.pop();
}
return true;
}
std::queue<T> m_data;
mutable std::mutex m_mutex;
std::condition_variable m_dataReady;
};
}