Skip to content

Commit aff0db3

Browse files
committed
simple mpmsc example
1 parent 8f2de7b commit aff0db3

File tree

4 files changed

+100
-26
lines changed

4 files changed

+100
-26
lines changed

ddcommon-ffi/src/array_queue.rs

+27-23
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,18 @@ pub unsafe extern "C" fn ddog_array_queue_drop(queue_ptr: *mut ArrayQueue) {
9797
#[allow(unused)]
9898
#[repr(C)]
9999
pub enum ArrayQueuePushResult {
100-
Ok(bool),
100+
Ok,
101+
Full(*mut c_void),
101102
Err(Error),
102103
}
103104

104-
impl From<Result<(), anyhow::Error>> for ArrayQueuePushResult {
105-
fn from(result: Result<(), anyhow::Error>) -> Self {
105+
impl From<Result<Result<(), *mut c_void>, anyhow::Error>> for ArrayQueuePushResult {
106+
fn from(result: Result<Result<(), *mut c_void>, anyhow::Error>) -> Self {
106107
match result {
107-
Ok(_) => ArrayQueuePushResult::Ok(true),
108+
Ok(value) => match value {
109+
Ok(()) => ArrayQueuePushResult::Ok,
110+
Err(value) => ArrayQueuePushResult::Full(value),
111+
},
108112
Err(err) => ArrayQueuePushResult::Err(err.into()),
109113
}
110114
}
@@ -121,9 +125,7 @@ pub unsafe extern "C" fn ddog_array_queue_push(
121125
) -> ArrayQueuePushResult {
122126
(|| {
123127
let queue = ddog_array_queue_ptr_to_inner(queue_ptr)?;
124-
queue
125-
.push(value)
126-
.map_err(|_| anyhow::anyhow!("array_queue full"))
128+
anyhow::Ok(queue.push(value))
127129
})()
128130
.context("array_queue_push failed")
129131
.into()
@@ -133,13 +135,17 @@ pub unsafe extern "C" fn ddog_array_queue_push(
133135
#[repr(C)]
134136
pub enum ArrayQueuePopResult {
135137
Ok(*mut c_void),
138+
Empty,
136139
Err(Error),
137140
}
138141

139-
impl From<anyhow::Result<*mut c_void>> for ArrayQueuePopResult {
140-
fn from(result: anyhow::Result<*mut c_void>) -> Self {
142+
impl From<anyhow::Result<Option<*mut c_void>>> for ArrayQueuePopResult {
143+
fn from(result: anyhow::Result<Option<*mut c_void>>) -> Self {
141144
match result {
142-
Ok(value) => ArrayQueuePopResult::Ok(value),
145+
Ok(value) => match value {
146+
Some(value) => ArrayQueuePopResult::Ok(value),
147+
None => ArrayQueuePopResult::Empty,
148+
},
143149
Err(err) => ArrayQueuePopResult::Err(err.into()),
144150
}
145151
}
@@ -152,26 +158,24 @@ impl From<anyhow::Result<*mut c_void>> for ArrayQueuePopResult {
152158
pub unsafe extern "C" fn ddog_array_queue_pop(queue_ptr: *mut ArrayQueue) -> ArrayQueuePopResult {
153159
(|| {
154160
let queue = ddog_array_queue_ptr_to_inner(queue_ptr)?;
155-
queue
156-
.pop()
157-
.ok_or_else(|| anyhow::anyhow!("array_queue empty"))
161+
anyhow::Ok(queue.pop())
158162
})()
159163
.context("array_queue_pop failed")
160164
.into()
161165
}
162166

163167
#[allow(unused)]
164168
#[repr(C)]
165-
pub enum ArrayQueueIsEmptyResult {
169+
pub enum ArrayQueueBoolResult {
166170
Ok(bool),
167171
Err(Error),
168172
}
169173

170-
impl From<anyhow::Result<bool>> for ArrayQueueIsEmptyResult {
174+
impl From<anyhow::Result<bool>> for ArrayQueueBoolResult {
171175
fn from(result: anyhow::Result<bool>) -> Self {
172176
match result {
173-
Ok(value) => ArrayQueueIsEmptyResult::Ok(value),
174-
Err(err) => ArrayQueueIsEmptyResult::Err(err.into()),
177+
Ok(value) => ArrayQueueBoolResult::Ok(value),
178+
Err(err) => ArrayQueueBoolResult::Err(err.into()),
175179
}
176180
}
177181
}
@@ -182,7 +186,7 @@ impl From<anyhow::Result<bool>> for ArrayQueueIsEmptyResult {
182186
#[no_mangle]
183187
pub unsafe extern "C" fn ddog_array_queue_is_empty(
184188
queue_ptr: *mut ArrayQueue,
185-
) -> ArrayQueueIsEmptyResult {
189+
) -> ArrayQueueBoolResult {
186190
(|| {
187191
let queue = ddog_array_queue_ptr_to_inner(queue_ptr)?;
188192
anyhow::Ok(queue.is_empty())
@@ -193,16 +197,16 @@ pub unsafe extern "C" fn ddog_array_queue_is_empty(
193197

194198
#[allow(unused)]
195199
#[repr(C)]
196-
pub enum ArrayQueueLenResult {
200+
pub enum ArrayQueueUsizeResult {
197201
Ok(usize),
198202
Err(Error),
199203
}
200204

201-
impl From<anyhow::Result<usize>> for ArrayQueueLenResult {
205+
impl From<anyhow::Result<usize>> for ArrayQueueUsizeResult {
202206
fn from(result: anyhow::Result<usize>) -> Self {
203207
match result {
204-
Ok(value) => ArrayQueueLenResult::Ok(value),
205-
Err(err) => ArrayQueueLenResult::Err(err.into()),
208+
Ok(value) => ArrayQueueUsizeResult::Ok(value),
209+
Err(err) => ArrayQueueUsizeResult::Err(err.into()),
206210
}
207211
}
208212
}
@@ -211,7 +215,7 @@ impl From<anyhow::Result<usize>> for ArrayQueueLenResult {
211215
/// # Safety
212216
/// The pointer is null or points to a valid memory location allocated by array_queue_new.
213217
#[no_mangle]
214-
pub unsafe extern "C" fn ddog_array_queue_len(queue_ptr: *mut ArrayQueue) -> ArrayQueueLenResult {
218+
pub unsafe extern "C" fn ddog_array_queue_len(queue_ptr: *mut ArrayQueue) -> ArrayQueueUsizeResult {
215219
(|| {
216220
let queue = ddog_array_queue_ptr_to_inner(queue_ptr)?;
217221
anyhow::Ok(queue.len())

examples/ffi/.gitignore

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
CMakeFiles/
2-
array_queue
32
CMakeCache.txt
3+
cmake_install.cmake
4+
Makefile
5+
compile_commands.json
6+
7+
array_queue
48
crashinfo
59
crashtracking
610
exporter
7-
Makefile
811
profiles
912
telemetry
1013
telemetry_metrics
14+
trace_exporter

examples/ffi/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ project(datadog_profiling_ffi_examples LANGUAGES C CXX)
44
find_package(Datadog REQUIRED)
55

66
set(VCRUNTIME_LINK_TYPE DLL CACHE STRING "Specify the Runtime Library to use when compiling with MSVC")
7+
set(CMAKE_EXPORT_COMPILE_COMMANDS on)
78

89
function(set_vcruntime_link_type binary link_type)
910
if (CMAKE_CXX_COMPILER_ID MATCHES "MSVC")

examples/ffi/array_queue.cpp

+66-1
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@
44
extern "C" {
55
#include <datadog/common.h>
66
}
7+
#include <atomic>
8+
#include <cassert>
79
#include <cstdio>
810
#include <memory>
11+
#include <thread>
12+
#include <vector>
913

1014
struct Sample {
1115
int x;
@@ -24,11 +28,72 @@ void print_error(const char *s, const ddog_Error &err) {
2428
}
2529

2630
int main(void) {
27-
ddog_ArrayQueue_NewResult array_queue_new_result = ddog_array_queue_new(10, delete_fn);
31+
ddog_ArrayQueue_NewResult array_queue_new_result = ddog_array_queue_new(5, delete_fn);
2832
if (array_queue_new_result.tag != DDOG_ARRAY_QUEUE_NEW_RESULT_OK) {
2933
print_error("Failed to create array queue", array_queue_new_result.err);
3034
ddog_Error_drop(&array_queue_new_result.err);
3135
return 1;
3236
}
3337
std::unique_ptr<ddog_ArrayQueue, Deleter> array_queue(&array_queue_new_result.ok);
38+
39+
size_t num_threads = 4;
40+
size_t num_elements = 50;
41+
std::vector<std::atomic<size_t>> counts(num_elements);
42+
for (size_t i = 0; i < num_elements; ++i) {
43+
counts[i].store(0);
44+
}
45+
46+
auto consumer = [&array_queue, &counts, num_elements]() {
47+
for (size_t i = 0; i < num_elements; ++i) {
48+
do {
49+
ddog_ArrayQueue_PopResult pop_result = ddog_array_queue_pop(array_queue.get());
50+
if (pop_result.tag == DDOG_ARRAY_QUEUE_POP_RESULT_OK) {
51+
Sample *sample = (Sample *)pop_result.ok;
52+
counts[sample->x].fetch_add(1, std::memory_order_seq_cst);
53+
delete sample;
54+
break;
55+
} else if (pop_result.tag == DDOG_ARRAY_QUEUE_POP_RESULT_EMPTY) {
56+
std::this_thread::yield();
57+
} else {
58+
print_error("Failed to pop from array queue", pop_result.err);
59+
ddog_Error_drop(&pop_result.err);
60+
return;
61+
}
62+
} while (true);
63+
}
64+
};
65+
66+
auto producer = [&array_queue, num_elements]() {
67+
for (size_t i = 0; i < num_elements; ++i) {
68+
Sample *sample = new Sample();
69+
sample->x = i;
70+
sample->y = i;
71+
do {
72+
ddog_ArrayQueue_PushResult push_result = ddog_array_queue_push(array_queue.get(), sample);
73+
if (push_result.tag == DDOG_ARRAY_QUEUE_PUSH_RESULT_OK) {
74+
break;
75+
} else if (push_result.tag == DDOG_ARRAY_QUEUE_PUSH_RESULT_FULL) {
76+
std::this_thread::yield();
77+
} else {
78+
print_error("Failed to push to array queue", push_result.err);
79+
ddog_Error_drop(&push_result.err);
80+
return;
81+
}
82+
} while (true);
83+
}
84+
};
85+
86+
std::vector<std::thread> threads;
87+
for (size_t i = 0; i < num_threads; ++i) {
88+
threads.emplace_back(consumer);
89+
threads.emplace_back(producer);
90+
}
91+
92+
for (auto &t : threads) {
93+
t.join();
94+
}
95+
96+
for (const auto &c : counts) {
97+
assert(c.load(std::memory_order_seq_cst) == num_threads);
98+
}
3499
}

0 commit comments

Comments
 (0)