Skip to content
This repository was archived by the owner on Jul 3, 2023. It is now read-only.

Commit dfcb435

Browse files
committed
The output half of the I/O pipeline.
Implemented the output half of the I/O framework. Similar to input pipelines, an output pipeline consists of an (output) transport endpoint and an encoder that serializes output batches into a particular format.
1 parent 62558c0 commit dfcb435

File tree

15 files changed

+1400
-238
lines changed

15 files changed

+1400
-238
lines changed

Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

adapters/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,5 @@ bincode = { version = "2.0.0-rc.2", features = ["serde"] }
2121
serde_json = "1.0.87"
2222
size-of = { version = "0.1.2", features = ["time-std"]}
2323
tempfile = "3.3.0"
24+
proptest = "1.0.0"
25+
proptest-derive = "0.3.0"

adapters/src/catalog.rs

+23-8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{DeCollectionHandle, DeZSetHandle};
1+
use crate::{DeCollectionHandle, DeZSetHandle, SerOutputBatchHandle};
22
use dbsp::{algebra::ZRingValue, CollectionHandle, DBData, DBWeight};
33
use serde::Deserialize;
44
use std::collections::BTreeMap;
@@ -13,7 +13,8 @@ use std::collections::BTreeMap;
1313
/// method).
1414
#[derive(Default)]
1515
pub struct Catalog {
16-
input_collections: BTreeMap<String, Box<dyn DeCollectionHandle>>,
16+
input_collection_handles: BTreeMap<String, Box<dyn DeCollectionHandle>>,
17+
output_batch_handles: BTreeMap<String, Box<dyn SerOutputBatchHandle>>,
1718
}
1819

1920
impl Catalog {
@@ -22,25 +23,39 @@ impl Catalog {
2223
Self::default()
2324
}
2425

25-
pub fn register_input_zset<K, R>(&mut self, name: &str, handle: CollectionHandle<K, R>)
26+
pub fn register_input_zset_handle<K, R>(&mut self, name: &str, handle: CollectionHandle<K, R>)
2627
where
2728
K: DBData + for<'de> Deserialize<'de>,
2829
R: DBWeight + ZRingValue,
2930
{
30-
self.register_input(name, DeZSetHandle::new(handle));
31+
self.register_input_collection_handle(name, DeZSetHandle::new(handle));
3132
}
3233

3334
/// Add a named input stream handle to the catalog.
34-
pub fn register_input<H>(&mut self, name: &str, handle: H)
35+
pub fn register_input_collection_handle<H>(&mut self, name: &str, handle: H)
3536
where
3637
H: DeCollectionHandle + 'static,
3738
{
38-
self.input_collections
39+
self.input_collection_handles
40+
.insert(name.to_owned(), Box::new(handle));
41+
}
42+
43+
/// Add a named output stream handle to the catalog.
44+
pub fn register_output_batch_handle<H>(&mut self, name: &str, handle: H)
45+
where
46+
H: SerOutputBatchHandle + 'static,
47+
{
48+
self.output_batch_handles
3949
.insert(name.to_owned(), Box::new(handle));
4050
}
4151

4252
/// Look up an input stream handle by name.
43-
pub fn input_collection(&self, name: &str) -> Option<&dyn DeCollectionHandle> {
44-
self.input_collections.get(name).map(|b| &**b)
53+
pub fn input_collection_handle(&self, name: &str) -> Option<&dyn DeCollectionHandle> {
54+
self.input_collection_handles.get(name).map(|b| &**b)
55+
}
56+
57+
/// Look up an output stream handle by name.
58+
pub fn output_batch_handle(&self, name: &str) -> Option<&dyn SerOutputBatchHandle> {
59+
self.output_batch_handles.get(name).map(|b| &**b)
4560
}
4661
}

adapters/src/controller/config.rs

+32-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ pub struct ControllerConfig {
2727

2828
/// Input endpoint configuration.
2929
pub inputs: BTreeMap<Cow<'static, str>, InputEndpointConfig>,
30+
31+
/// Output endpoint configuration.
32+
#[serde(default)]
33+
pub outputs: BTreeMap<Cow<'static, str>, OutputEndpointConfig>,
3034
}
3135

3236
/// Internal representaion of controller configuration stored inside the
@@ -36,13 +40,15 @@ pub struct ControllerConfig {
3640
pub struct ControllerInnerConfig {
3741
pub global: GlobalControllerConfig,
3842
pub inputs: BTreeMap<EndpointId, InputEndpointConfig>,
43+
pub outputs: BTreeMap<EndpointId, OutputEndpointConfig>,
3944
}
4045

4146
impl ControllerInnerConfig {
4247
pub fn new(global: GlobalControllerConfig) -> Self {
4348
Self {
4449
global,
4550
inputs: BTreeMap::new(),
51+
outputs: BTreeMap::new(),
4652
}
4753
}
4854
}
@@ -87,24 +93,47 @@ pub struct InputEndpointConfig {
8793
pub max_buffered_records: u64,
8894
}
8995

96+
#[derive(Clone, Serialize, Deserialize)]
97+
pub struct OutputEndpointConfig {
98+
/// The name of the output stream of the circuit that this pipeline is
99+
/// connected to.
100+
pub stream: Cow<'static, str>,
101+
102+
/// Transport endpoint configuration.
103+
pub transport: TransportConfig,
104+
105+
/// Encoder configuration.
106+
pub format: FormatConfig,
107+
108+
/// Backpressure threshold.
109+
///
110+
/// The default is 1 million.
111+
#[serde(default = "default_max_buffered_records")]
112+
pub max_buffered_records: u64,
113+
}
114+
90115
/// Transport endpoint configuration.
91116
#[derive(Clone, Serialize, Deserialize)]
92117
pub struct TransportConfig {
93118
/// Data transport name, e.g., "file", "kafka", "kinesis", etc.
94119
pub name: Cow<'static, str>,
95120

96121
/// Transport-specific endpoint configuration passed to
97-
/// [`Transport::new_endpoint`](`crate::Transport::new_endpoint`).
122+
/// [`OutputTransport::new_endpoint`](`crate::OutputTransport::new_endpoint`)
123+
/// and
124+
/// [`InputTransport::new_endpoint`](`crate::InputTransport::new_endpoint`).
125+
#[serde(default)]
98126
pub config: YamlValue,
99127
}
100128

101129
/// Data format specification used to parse raw data received from the
102-
/// endpoint.
130+
/// endpoint or to encode data sent to the endpoint.
103131
#[derive(Clone, Serialize, Deserialize)]
104132
pub struct FormatConfig {
105133
/// Format name, e.g., "csv", "json", "bincode", etc.
106134
pub name: Cow<'static, str>,
107135

108-
/// Format-specific parser configuration.
136+
/// Format-specific parser or encoder configuration.
137+
#[serde(default)]
109138
pub config: YamlValue,
110139
}

0 commit comments

Comments
 (0)