|
| 1 | +// This file is part of https://github.com/SpringQL/SpringQL-client-c which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. |
| 2 | + |
| 3 | +use std::ffi::CString; |
| 4 | + |
| 5 | +use crate::*; |
| 6 | + |
| 7 | +unsafe fn command(pipeline: *const SpringPipeline, sql: &str) { |
| 8 | + let sql = CString::new(sql).unwrap(); |
| 9 | + let errno = spring_command(pipeline, sql.as_ptr()); |
| 10 | + assert_eq!(errno, SpringErrno::Ok); |
| 11 | +} |
| 12 | + |
| 13 | +#[test] |
| 14 | +fn test_spring_sink_row() { |
| 15 | + unsafe { |
| 16 | + let config = spring_config_default(); |
| 17 | + assert!(!config.is_null()); |
| 18 | + |
| 19 | + let pipeline = spring_open(config); |
| 20 | + assert!(!config.is_null()); |
| 21 | + |
| 22 | + command(pipeline, "CREATE SOURCE STREAM source_1 (b BLOB NOT NULL);"); |
| 23 | + command(pipeline, "CREATE SINK STREAM sink_1 (b BLOB NOT NULL);"); |
| 24 | + command( |
| 25 | + pipeline, |
| 26 | + " |
| 27 | + CREATE PUMP pump_1 AS |
| 28 | + INSERT INTO sink_1 (b) |
| 29 | + SELECT STREAM source_1.b FROM source_1; |
| 30 | + ", |
| 31 | + ); |
| 32 | + command( |
| 33 | + pipeline, |
| 34 | + " |
| 35 | + CREATE SINK WRITER queue_sink FOR sink_1 |
| 36 | + TYPE IN_MEMORY_QUEUE OPTIONS (NAME 'q_sink'); |
| 37 | + ", |
| 38 | + ); |
| 39 | + command( |
| 40 | + pipeline, |
| 41 | + " |
| 42 | + CREATE SOURCE READER queue_src FOR source_1 |
| 43 | + TYPE IN_MEMORY_QUEUE OPTIONS (NAME 'q_src'); |
| 44 | + ", |
| 45 | + ); |
| 46 | + |
| 47 | + let source_row = { |
| 48 | + let col = CString::new("b").unwrap(); |
| 49 | + let val = vec![0x01u8, 0x02, 0x03]; |
| 50 | + |
| 51 | + let builder = spring_source_row_builder(); |
| 52 | + let builder = spring_source_row_add_column_blob( |
| 53 | + builder, |
| 54 | + col.as_ptr(), |
| 55 | + val.as_ptr().cast(), |
| 56 | + val.len().try_into().unwrap(), |
| 57 | + ); |
| 58 | + assert!(!builder.is_null()); |
| 59 | + |
| 60 | + spring_source_row_build(builder) |
| 61 | + }; |
| 62 | + |
| 63 | + let q_src = CString::new("q_src").unwrap(); |
| 64 | + let errno = spring_push(pipeline, q_src.as_ptr(), source_row); |
| 65 | + assert_eq!(errno, SpringErrno::Ok); |
| 66 | + |
| 67 | + let q_sink = CString::new("q_sink").unwrap(); |
| 68 | + let sink_row = spring_pop(pipeline, q_sink.as_ptr()); |
| 69 | + |
| 70 | + spring_sink_row_close(sink_row); |
| 71 | + |
| 72 | + spring_close(pipeline); |
| 73 | + spring_config_close(config); |
| 74 | + } |
| 75 | +} |
0 commit comments