-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspsc_queue.hpp
70 lines (61 loc) · 1.98 KB
/
spsc_queue.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#pragma once
#include <atomic>
#include <memory>
template<typename T, typename Allocator = std::allocator<T>>
class spsc_queue : private Allocator {
std::atomic<uint64_t> _head;
uint64_t _padding[7]; // for x86
std::atomic<uint64_t> _tail;
const uint64_t _len;
const uint64_t _mask;
T* const _data;
public:
spsc_queue(const uint64_t size)
: _head(0), _tail(0), _len(size), _mask(_len - 1),
_data(Allocator::allocate(size)) {}
T& front() {
return _data[_head & _mask];
}
T& back() {
return _data[_tail & _mask];
}
size_t size() const {
return _tail.load(std::memory_order_acquire) - _head.load(std::memory_order_acquire);
}
size_t capacity() const {
return _len;
}
bool pop(T& elem) {
const uint64_t head_idx = _head.load(std::memory_order_relaxed);
const uint64_t tail_idx = _tail.load(std::memory_order_acquire);
if(head_idx == tail_idx) {
return false;
}
else {
elem = std::move(_data[head_idx & _mask]);
_data[head_idx & _mask].~T(); // call dtor
_head.store(head_idx + 1, std::memory_order_release);
return true;
}
}
bool push(const T& elem) {
const uint64_t tail_idx = _tail.load(std::memory_order_relaxed);
const uint64_t head_idx = _head.load(std::memory_order_acquire);
if((tail_idx - _len) == head_idx) {
return false;
}
else {
new (&_data[tail_idx & _mask]) T(elem);
_tail.store(tail_idx + 1, std::memory_order_release);
return true;
}
}
~spsc_queue() {
uint64_t tail_idx = _tail.load(std::memory_order_acquire);
uint64_t head_idx = _head.load(std::memory_order_acquire);
while(head_idx++ < tail_idx) {
_data[head_idx & _mask].~T(); // destruct remaining elements
}
Allocator::deallocate(_data, _len);
}
};