-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathc61_lock_free_queue.cpp
124 lines (110 loc) · 2.94 KB
/
c61_lock_free_queue.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <atomic>
#include <list>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
/*!
* \brief An error-prone lock free queue implementation
* \note The queue maintain two iterators: the head points to the element
* before the oldest element, while the tail points to the element after the
* newest.
*
* The consumer only increments the head iterator and doesn't modify the queue.
* The producer inserts elements and increments the tail. It also erases any
* elements already read by the consumer.
* These two methods never overlap with each other.
* The two iterators never refer to the same position.
*
* This implementation is still problematic because the iterators are accessed
* from different threads while being modified in one of the threads. It has a
* data race unless the iterators are protected by mutexes as iterators cannot
* be atomic.
* Maybe an alternative way is to use atomic pointers. Instead of using list
* internally, it could be:
template<typename T>
class LockFreeQueue
{
public:
// methods
private:
struct Node
{
std::shared_ptr<T> data;
std::atomic<Node*> next;
Node(const T& d): data(std::make_shared<T>(d)) { }
};
std::atomic<Node*> _i_head;
std::atomic<Node*> _i_tail;
};
*/
template<typename T>
class LockFreeQueue
{
public:
// insert an initial element so that the two iterators don't point to the
// same position.
// _queue: [ ][T()][ ]
// ^ ^
// | |
// _i_head _i_tail
LockFreeQueue()
{
_queue.push_back(T());
_i_head = _queue.begin();
_i_tail = _queue.end();
}
~LockFreeQueue() = default;
void Produce(const T& t)
{
_queue.push_back(t);
_i_tail = _queue.end();
_queue.erase(_queue.begin(), _i_head);
}
bool Consume(T& t)
{
auto i_first = _i_head;
++i_first;
// if queue is not empty
if (i_first != _i_tail)
{
_i_head = i_first;
t = *_i_head;
return true;
}
return false;
}
void Print()
{
auto i_first = _i_head;
++i_first;
for (auto it = i_first; it != _i_tail; ++it)
{
std::cout << *it << ", ";
}
std::cout << "\n";
}
private:
std::list<T> _queue;
typename std::list<T>::iterator _i_head, _i_tail;
};
int main()
{
LockFreeQueue<int> queue;
std::vector<std::thread> threads;
int result{0};
for (int i = 0; i < 10; ++i)
{
std::thread producer{&LockFreeQueue<int>::Produce, &queue, std::ref(i)};
threads.push_back(std::move(producer));
std::thread consumer{&LockFreeQueue<int>::Consume, &queue, std::ref(result)};
threads.push_back(std::move(consumer));
}
for (auto& t : threads)
{
t.join();
}
std::cout << "Elements in queue: \n";
queue.Print();
return 0;
}