Skip to content
This repository was archived by the owner on Jul 3, 2023. It is now read-only.

Commit 87326e2

Browse files
committed
Kafka input adapter.
Simple adapter that reads data from Kafka into DBSP and doesn't do anything clever for transactions or reliable delivery. It is suitable for streaming data in scenarios where atomic transactions are not required and at least once delivery is acceptable (i.e., occasional duplicates can be tolerated).
1 parent 3ed830c commit 87326e2

File tree

8 files changed

+777
-38
lines changed

8 files changed

+777
-38
lines changed

Cargo.lock

+75
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

adapters/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@ once_cell = "1.9.0"
1616
serde_yaml = "0.9.14"
1717
csv = { git = "https://github.com/ryzhyk/rust-csv.git" }
1818
bincode = { version = "2.0.0-rc.2", features = ["serde"] }
19+
rdkafka = "0.29.0"
1920

2021
[dev-dependencies]
2122
serde_json = "1.0.87"
2223
size-of = { version = "0.1.2", features = ["time-std"]}
2324
tempfile = "3.3.0"
2425
proptest = "1.0.0"
2526
proptest-derive = "0.3.0"
27+
futures = "0.3.25"
28+
log = "0.4.17"

adapters/src/controller/mod.rs

+6-33
Original file line numberDiff line numberDiff line change
@@ -863,44 +863,17 @@ impl OutputConsumer for OutputProbe {
863863

864864
#[cfg(test)]
865865
mod test {
866-
use crate::{test::wait, Catalog, Controller, ControllerConfig};
867-
use bincode::{Decode, Encode};
866+
use crate::{
867+
test::{generate_test_data, wait, TestStruct},
868+
Catalog, Controller, ControllerConfig,
869+
};
868870
use csv::{ReaderBuilder as CsvReaderBuilder, WriterBuilder as CsvWriterBuilder};
869871
use dbsp::{DBSPHandle, Runtime};
870-
use serde::{Deserialize, Serialize};
871872
use serde_yaml;
872-
use size_of::SizeOf;
873873
use std::fs::remove_file;
874874
use tempfile::NamedTempFile;
875875

876-
use proptest::{collection, prelude::*};
877-
use proptest_derive::Arbitrary;
878-
879-
#[derive(
880-
Debug,
881-
PartialEq,
882-
Eq,
883-
PartialOrd,
884-
Ord,
885-
Serialize,
886-
Deserialize,
887-
Clone,
888-
Hash,
889-
SizeOf,
890-
Encode,
891-
Decode,
892-
Arbitrary,
893-
)]
894-
struct TestStruct {
895-
id: u32,
896-
b: bool,
897-
i: Option<i64>,
898-
s: String,
899-
}
900-
901-
fn test_data(size: usize) -> impl Strategy<Value = Vec<TestStruct>> {
902-
collection::vec(any::<TestStruct>(), 0..=size)
903-
}
876+
use proptest::prelude::*;
904877

905878
fn test_circuit(workers: usize) -> (DBSPHandle, Catalog) {
906879
let (circuit, (input, output)) = Runtime::init_circuit(workers, |circuit| {
@@ -924,7 +897,7 @@ mod test {
924897
#![proptest_config(ProptestConfig::with_cases(30))]
925898
#[test]
926899
fn proptest_csv_file(
927-
data in test_data(5000),
900+
data in generate_test_data(5000),
928901
min_batch_size_records in 1..100usize,
929902
max_buffering_delay_usecs in 1..2000usize,
930903
input_buffer_size_bytes in 1..1000usize,

adapters/src/test/data.rs

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
use bincode::{Decode, Encode};
2+
use proptest::{collection, prelude::*};
3+
use proptest_derive::Arbitrary;
4+
use serde::{Deserialize, Serialize};
5+
use size_of::SizeOf;
6+
7+
#[derive(
8+
Debug,
9+
PartialEq,
10+
Eq,
11+
PartialOrd,
12+
Ord,
13+
Serialize,
14+
Deserialize,
15+
Clone,
16+
Hash,
17+
SizeOf,
18+
Encode,
19+
Decode,
20+
Arbitrary,
21+
)]
22+
pub struct TestStruct {
23+
pub id: u32,
24+
pub b: bool,
25+
pub i: Option<i64>,
26+
pub s: String,
27+
}
28+
29+
pub fn generate_test_data(size: usize) -> impl Strategy<Value = Vec<TestStruct>> {
30+
collection::vec(any::<TestStruct>(), 0..=size)
31+
}

adapters/src/test/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ use std::{
88
time::{Duration, Instant},
99
};
1010

11+
mod data;
1112
mod mock_dezset;
1213
mod mock_input_consumer;
1314

15+
pub use data::{generate_test_data, TestStruct};
1416
pub use mock_dezset::MockDeZSet;
1517
pub use mock_input_consumer::MockInputConsumer;
1618

adapters/src/transport/file.rs

-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ impl FileInputEndpoint {
103103
status: Arc<AtomicU32>,
104104
follow: bool,
105105
) {
106-
println!("file reader worker");
107106
loop {
108107
match PipelineState::from_u32(status.load(Ordering::Acquire)) {
109108
Some(PipelineState::Paused) => parker.park(),

0 commit comments

Comments
 (0)