Skip to content

Commit bd6ee70

Browse files
authored
Merge pull request #33 from SpringQL/feat/spring_pop_non_blocking
feat: add spring_pop_non_blocking() API
2 parents e6bc4a8 + aeae5a8 commit bd6ee70

File tree

5 files changed

+272
-0
lines changed

5 files changed

+272
-0
lines changed

Makefile.toml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,20 @@ echo '-- Start doc_app1'
5252
echo '-- End doc_app1'
5353
echo
5454
55+
echo '-- Start doc_app2'
56+
./run_doc_app2 &
57+
sleep 1
58+
echo '{"ts": "2022-01-01 13:00:00.000000000", "symbol": "ORCL", "amount": 10}' |nc localhost 54300
59+
echo '{"ts": "2022-01-01 13:00:01.000000000", "symbol": "ORCL", "amount": 30}' |nc localhost 54300
60+
echo '{"ts": "2022-01-01 13:00:01.000000000", "symbol": "GOOGL", "amount": 50}' |nc localhost 54300
61+
echo '{"ts": "2022-01-01 13:00:02.000000000", "symbol": "ORCL", "amount": 40}' |nc localhost 54300
62+
echo '{"ts": "2022-01-01 13:00:05.000000000", "symbol": "GOOGL", "amount": 60}' |nc localhost 54300
63+
echo '{"ts": "2022-01-01 13:00:10.000000000", "symbol": "APPL", "amount": 100}' |nc localhost 54300
64+
sleep 1
65+
pkill run_doc_app2
66+
echo '-- End doc_app2'
67+
echo
68+
5569
echo '-- Start print_trade'
5670
(python trade_projection/print_trade.py | nc -l 19876) &
5771
sleep 1

c_example/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,6 @@ target_link_libraries(run_trade_projection springql_client)
1111

1212
add_executable(run_doc_app1 doc_app1/doc_app1.c)
1313
target_link_libraries(run_doc_app1 springql_client)
14+
15+
add_executable(run_doc_app2 doc_app2/doc_app2.c)
16+
target_link_libraries(run_doc_app2 springql_client)

c_example/doc_app2/doc_app2.c

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
// This file is part of https://github.com/SpringQL/SpringQL-client-c which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.
2+
3+
// Usage:
4+
//
5+
// $ ./a.out # waiting for connection...
6+
// $ echo '{"ts": "2022-01-01 13:00:00.000000000", "symbol": "ORCL", "amount": 10}' |nc localhost 54300
7+
// $ echo '{"ts": "2022-01-01 13:00:01.000000000", "symbol": "ORCL", "amount": 30}' |nc localhost 54300
8+
// $ echo '{"ts": "2022-01-01 13:00:01.000000000", "symbol": "GOOGL", "amount": 50}' |nc localhost 54300
9+
// $ echo '{"ts": "2022-01-01 13:00:02.000000000", "symbol": "ORCL", "amount": 40}' |nc localhost 54300
10+
// $ echo '{"ts": "2022-01-01 13:00:05.000000000", "symbol": "GOOGL", "amount": 60}' |nc localhost 54300
11+
// $ echo '{"ts": "2022-01-01 13:00:10.000000000", "symbol": "APPL", "amount": 100}' |nc localhost 54300
12+
13+
#include <assert.h>
14+
#include <string.h>
15+
#include <stdio.h>
16+
#include <unistd.h>
17+
18+
#include <springql.h>
19+
20+
void abort_with_report()
21+
{
22+
SpringErrno errno;
23+
char errmsg[1024];
24+
spring_last_err(&errno, errmsg, 1024);
25+
fprintf(stderr, "Error occurred (%d): %s", errno, errmsg);
26+
abort();
27+
}
28+
29+
void assert_ok(SpringErrno ret)
30+
{
31+
if (ret != Ok)
32+
{
33+
abort_with_report();
34+
}
35+
}
36+
37+
void assert_not_null(void *p)
38+
{
39+
if (p == NULL)
40+
{
41+
abort_with_report();
42+
}
43+
}
44+
45+
int main()
46+
{
47+
SpringErrno ret;
48+
49+
SpringConfig *config = spring_config_default();
50+
assert_not_null(config);
51+
52+
SpringPipeline *pipeline = spring_open(config);
53+
assert_not_null(pipeline);
54+
55+
ret = spring_command(
56+
pipeline,
57+
"CREATE SOURCE STREAM source_trade ("
58+
" ts TIMESTAMP NOT NULL ROWTIME,"
59+
" symbol TEXT NOT NULL,"
60+
" amount INTEGER NOT NULL"
61+
");");
62+
assert_ok(ret);
63+
64+
ret = spring_command(
65+
pipeline,
66+
"CREATE SINK STREAM sink_avg_all ("
67+
" ts TIMESTAMP NOT NULL ROWTIME,"
68+
" avg_amount FLOAT NOT NULL"
69+
");");
70+
assert_ok(ret);
71+
72+
ret = spring_command(
73+
pipeline,
74+
"CREATE SINK STREAM sink_avg_by_symbol ("
75+
" ts TIMESTAMP NOT NULL ROWTIME,"
76+
" symbol TEXT NOT NULL,"
77+
" avg_amount FLOAT NOT NULL"
78+
");");
79+
assert_ok(ret);
80+
81+
// Creates windows per 10 seconds ([:00, :10), [:10, :20), ...),
82+
// and calculate the average amount over the rows inside each window.
83+
//
84+
// Second parameter `DURATION_SECS(0)` means allowed latency for late data. You can ignore here.
85+
ret = spring_command(
86+
pipeline,
87+
"CREATE PUMP avg_all AS"
88+
" INSERT INTO sink_avg_all (ts, avg_amount)"
89+
" SELECT STREAM"
90+
" FLOOR_TIME(source_trade.ts, DURATION_SECS(10)) AS min_ts,"
91+
" AVG(source_trade.amount) AS avg_amount"
92+
" FROM source_trade"
93+
" GROUP BY min_ts"
94+
" FIXED WINDOW DURATION_SECS(10), DURATION_SECS(0);");
95+
assert_ok(ret);
96+
97+
// Creates windows per 2 seconds ([:00, :02), [:02, :04), ...),
98+
// and then group the rows inside each window having the same symbol.
99+
// Calculate the average amount for each group.
100+
ret = spring_command(
101+
pipeline,
102+
"CREATE PUMP avg_by_symbol AS"
103+
" INSERT INTO sink_avg_by_symbol (ts, symbol, avg_amount)"
104+
" SELECT STREAM"
105+
" FLOOR_TIME(source_trade.ts, DURATION_SECS(2)) AS min_ts,"
106+
" source_trade.symbol AS symbol,"
107+
" AVG(source_trade.amount) AS avg_amount"
108+
" FROM source_trade"
109+
" GROUP BY min_ts, symbol"
110+
" FIXED WINDOW DURATION_SECS(2), DURATION_SECS(0);");
111+
assert_ok(ret);
112+
113+
ret = spring_command(
114+
pipeline,
115+
"CREATE SINK WRITER queue_avg_all FOR sink_avg_all"
116+
" TYPE IN_MEMORY_QUEUE OPTIONS ("
117+
" NAME 'q_avg_all'"
118+
" );");
119+
assert_ok(ret);
120+
121+
ret = spring_command(
122+
pipeline,
123+
"CREATE SINK WRITER queue_avg_by_symbol FOR sink_avg_by_symbol"
124+
" TYPE IN_MEMORY_QUEUE OPTIONS ("
125+
" NAME 'q_avg_by_symbol'"
126+
" );");
127+
assert_ok(ret);
128+
129+
ret = spring_command(
130+
pipeline,
131+
"CREATE SOURCE READER tcp_trade FOR source_trade"
132+
" TYPE NET_SERVER OPTIONS ("
133+
" PROTOCOL 'TCP',"
134+
" PORT '54300'"
135+
" );");
136+
assert_ok(ret);
137+
138+
fprintf(stderr, "waiting JSON records in tcp/54300...\n");
139+
140+
SpringRow *row;
141+
bool is_err = false;
142+
while (1)
143+
{
144+
#define TS_LEN 128
145+
#define SYMBOL_LEN 6
146+
char ts[TS_LEN];
147+
char symbol[SYMBOL_LEN];
148+
149+
// Fetching rows from q_avg_all.
150+
{
151+
row = spring_pop_non_blocking(pipeline, "q_avg_all", &is_err);
152+
if (row)
153+
{
154+
int r = spring_column_text(row, 0, (char *)ts, TS_LEN);
155+
assert((size_t)r == strlen(ts));
156+
157+
float avg_amount;
158+
ret = spring_column_float(row, 1, &avg_amount);
159+
assert_ok(ret);
160+
161+
fprintf(stderr, "[q_avg_all] %s\t%f\n", ts, avg_amount);
162+
spring_row_close(row);
163+
}
164+
else
165+
{
166+
assert(!is_err);
167+
}
168+
}
169+
170+
// Fetching rows from q_avg_by_symbol.
171+
row = spring_pop_non_blocking(pipeline, "q_avg_by_symbol", &is_err);
172+
if (row)
173+
{
174+
int r = spring_column_text(row, 0, (char *)ts, TS_LEN);
175+
assert((size_t)r == strlen(ts));
176+
177+
r = spring_column_text(row, 1, (char *)symbol, SYMBOL_LEN);
178+
assert((size_t)r == strlen(symbol));
179+
180+
float avg_amount;
181+
ret = spring_column_float(row, 2, &avg_amount);
182+
assert_ok(ret);
183+
184+
fprintf(stderr, "[q_avg_by_symbol] %s\t%s\t%f\n", ts, symbol, avg_amount);
185+
spring_row_close(row);
186+
}
187+
else
188+
{
189+
assert(!is_err);
190+
}
191+
192+
// Avoid busy sleep.
193+
usleep(100000);
194+
}
195+
196+
ret = spring_close(pipeline);
197+
assert_ok(ret);
198+
199+
ret = spring_config_close(config);
200+
assert_ok(ret);
201+
202+
return 0;
203+
}

springql.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,22 @@ enum SpringErrno spring_command(const SpringPipeline *pipeline, const char *sql)
129129
*/
130130
SpringRow *spring_pop(const SpringPipeline *pipeline, const char *queue);
131131

132+
/**
133+
* See: springql_core::api::spring_pop_non_blocking
134+
*
135+
* # Returns
136+
*
137+
* - non-NULL: Successfully get a row.
138+
* - NULL: Error occurred if `is_err` is true (check spring_last_err() for details). Otherwise, any row is not in the queue.
139+
*
140+
* # Safety
141+
*
142+
* This function is unsafe because it uses raw pointer.
143+
*/
144+
SpringRow *spring_pop_non_blocking(const SpringPipeline *pipeline,
145+
const char *queue,
146+
bool *is_err);
147+
132148
/**
133149
* # Returns
134150
*

src/lib.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,42 @@ pub unsafe extern "C" fn spring_pop(
174174
)
175175
}
176176

177+
/// See: springql_core::api::spring_pop_non_blocking
178+
///
179+
/// # Returns
180+
///
181+
/// - non-NULL: Successfully get a row.
182+
/// - NULL: Error occurred if `is_err` is true (check spring_last_err() for details). Otherwise, any row is not in the queue.
183+
///
184+
/// # Safety
185+
///
186+
/// This function is unsafe because it uses raw pointer.
187+
#[no_mangle]
188+
pub unsafe extern "C" fn spring_pop_non_blocking(
189+
pipeline: *const SpringPipeline,
190+
queue: *const c_char,
191+
is_err: *mut bool,
192+
) -> *mut SpringRow {
193+
*is_err = false;
194+
195+
let pipeline = &*((*pipeline).0 as *const springql_core::SpringPipeline);
196+
let queue = CStr::from_ptr(queue).to_string_lossy().into_owned();
197+
198+
with_catch(|| springql_core::spring_pop_non_blocking(pipeline, &queue)).map_or_else(
199+
|_| {
200+
*is_err = true;
201+
ptr::null_mut()
202+
},
203+
|opt_row| {
204+
if let Some(row) = opt_row {
205+
Box::into_raw(Box::new(SpringRow(mem::transmute(Box::new(row)))))
206+
} else {
207+
ptr::null_mut()
208+
}
209+
},
210+
)
211+
}
212+
177213
/// # Returns
178214
///
179215
/// - `0`: if there are no recent errors.

0 commit comments

Comments
 (0)