-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathqueue_test.cpp
79 lines (63 loc) · 2.34 KB
/
queue_test.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <atomic>
#include <gtest/gtest.h>
#include <pomagma/util/queue.hpp>
#include <thread>
#include <typeinfo>
namespace pomagma {
namespace {
void writer_thread(SharedQueueBase* queue, std::string message,
size_t message_count,
std::atomic<uint_fast64_t>* worker_count) {
POMAGMA_INFO("sending " << message_count << " messages");
for (size_t i = 0; i < message_count; ++i) {
queue->push(message.data(), message.size());
if (i % 100 == 0) {
usleep(10);
}
}
--*worker_count;
}
template <class Queue>
class QueueTest : public ::testing::Test {};
typedef ::testing::Types<VectorQueue, FileBackedQueue> QueueTypes;
TYPED_TEST_CASE(QueueTest, QueueTypes);
TYPED_TEST(QueueTest, IsCorrect) {
typedef TypeParam Queue;
POMAGMA_INFO("Testing " << demangle(typeid(Queue).name()));
Queue queue;
const size_t message_count = 10000;
std::vector<std::thread> threads;
std::atomic<uint_fast64_t> worker_count(6);
threads.push_back(std::thread(&writer_thread, &queue, "test", message_count,
&worker_count));
threads.push_back(std::thread(
&writer_thread, &queue,
"test-test-test-test-test-test-test-test-test-test-test-test-test",
message_count, &worker_count));
threads.push_back(
std::thread(&writer_thread, &queue, "t", message_count, &worker_count));
threads.push_back(
std::thread(&writer_thread, &queue, "e", message_count, &worker_count));
threads.push_back(
std::thread(&writer_thread, &queue, "s", message_count, &worker_count));
threads.push_back(std::thread(&writer_thread, &queue, "yet-another-test",
message_count, &worker_count));
size_t actual_message_count = 0;
char message[Queue::max_message_size() + 1];
while (worker_count) {
POMAGMA_INFO("receiving...");
usleep(10);
while (size_t size = queue.try_pop(message)) {
message[size] = 0;
POMAGMA_INFO("received: " << message);
++actual_message_count;
}
}
POMAGMA_INFO("received " << actual_message_count << " messages");
EXPECT_EQ(actual_message_count, 6 * message_count);
for (auto& thread : threads) {
thread.join();
}
}
} // namespace
} // namespace pomagma