Skip to content
This repository was archived by the owner on Jul 3, 2023. It is now read-only.

Commit 3a33e32

Browse files
committed
I/O adapter framework.
Started building an infrastructure to ingest data into a DBSP circuit from external data sources and to stream the outputs of the circuit to external consumers. The framework defines the APIs to integrate different transport technologies (files, Kafka streams, database connections, etc.) and data formats (CSV, bincode, JSON, etc.) into the DBSP input and output pipelines. This PR only the input half of the API. Overview ======== The data ingestion pipeline consists of two kinds of adapters: *data transport* adapters and **data format** adapters. ```text ┌──────────────┐ │ controller │ │ ┌────────┐ │ │ │ catalog│ │ │ ├────────┤ │ │ │ config │ │ │ ├────────┤ │ control commands │ │ stats │ │ ┌──┬────────┬─┬──────────────────────┤ └────────┘ │ │ │ │ │ │ │ │ │ │ │ └──────────────┘ │ │ │ │ │ │ │ │ │ │ │ │ ┌───────────┐ ▼ │ │ │ │ │ ┌────┴───┐ ▼ │ ┌──────┐ ┌────┴─┐ │ ─────►│endpoint├──────┼──►│parser├───────►│handle│ ├───► └────────┘ │ └──────┘ └────┬─┘ │ ▼ │ │ circuit │ transport- ┌────────┐bytes ▼ ┌──────┐records ┌────┴─┐ │ specific ─────►│endpoint├─────────►│parser├───────►│handle│ │ protocol └────────┘ └──────┘ └────┬─┘ │ ▲ ▲ │ │ │ │ └───────────┘ ┌────┴────┐ ┌───┴────┐ │ input │ │ input │ │transport│ │ format │ └─────────┘ └────────┘ ``` A data transport implements support for a specific streaming technology like Kafka. It provides an API to create transport **endpoints**, that connect to specified data sources, e.g., Kafka topics. An endpoint reads raw binary data from the source and provides basic flow control and error reporting facilities, but is agnostic of the contents or format of the data. A data format adapter implements support for a data encapsulation format like CSV, JSON, or bincode. It provides an API to create **parsers**, which transform raw binary data into a stream of **records** and push this data to the DBSP circuit. The Controller component serves as a centralized control plane that coordinates the creation, reconfiguration, teardown of the pipeline, and implements runtime flow control. It instantiates the pipeline according to a user-provided configuration (see below) and exposes an API to reconfigure and monitor the pipeline at runtime. Adapter API =========== The transport adapter API consists of two traits: * `InputTransport` is a factory trait that creates `InputEndpoint` instances. * `InputEndpoint` represents an individual data connection, e.g., a file, an S3 bucket or a Kafka topic. Similarly, the format adapter API consists of: * `InputFormat` - a factory trait that creates `Parser` instances * `Parser` - a parser that consumes a raw binary stream and outputs a stream of records. ------ This PR introduces a workspace, with the I/O framework implemented as a separate crate within this workspace.
1 parent 6f037e0 commit 3a33e32

File tree

23 files changed

+2815
-354
lines changed

23 files changed

+2815
-354
lines changed

.github/workflows/main.yml

+11-9
Original file line numberDiff line numberDiff line change
@@ -89,19 +89,19 @@ jobs:
8989
uses: actions-rs/cargo@v1
9090
with:
9191
command: test
92-
args: --no-run --all-features --target ${{ matrix.target }}
92+
args: --workspace --no-run --all-features --target ${{ matrix.target }}
9393

9494
- name: Build tests with no features
9595
uses: actions-rs/cargo@v1
9696
with:
9797
command: test
98-
args: --no-run --no-default-features --target ${{ matrix.target }}
98+
args: --workspace --no-run --no-default-features --target ${{ matrix.target }}
9999

100100
- name: Run tests
101101
uses: actions-rs/cargo@v1
102102
with:
103103
command: test
104-
args: ${{ env.ALMOST_ALL_FEATURES }} --target ${{ matrix.target }}
104+
args: --workspace ${{ env.ALMOST_ALL_FEATURES }} --target ${{ matrix.target }}
105105

106106
# miri:
107107
# name: Miri
@@ -215,11 +215,11 @@ jobs:
215215
# of test threads. Crashes happen in random tests, but always at the
216216
# same location in the sanitizer code:
217217
# https://github.com/llvm-mirror/compiler-rt/blob/69445f095c22aac2388f939bedebf224a6efcdaf/lib/sanitizer_common/sanitizer_thread_registry.h#L104.
218-
# So we run tests with only 1 threads at a time to mitigate this.
218+
# So we run tests one at a time to mitigate this.
219219
THREADS: "${{ matrix.sanitizer == 'leak' && '--test-threads=1' || '' }}"
220220
with:
221221
command: test
222-
args: ${{ env.ALMOST_ALL_FEATURES }} --target ${{ matrix.target }} -Z build-std -- --skip 'proptest' --skip 'persistent' ${{ env.THREADS }}
222+
args: --workspace ${{ env.ALMOST_ALL_FEATURES }} --target ${{ matrix.target }} -Z build-std -- --skip 'proptest' --skip 'persistent' ${{ env.THREADS }}
223223
clippy:
224224
name: Clippy
225225
runs-on: ubuntu-latest
@@ -245,7 +245,7 @@ jobs:
245245
uses: actions-rs/clippy-check@v1
246246
with:
247247
token: ${{ secrets.GITHUB_TOKEN }}
248-
args: --all-features --all -- -D warnings
248+
args: --all-features --workspace -- -D warnings
249249

250250
fmt:
251251
name: Rustfmt
@@ -293,9 +293,11 @@ jobs:
293293

294294
- name: Check links
295295
uses: actions-rs/cargo@v1
296+
env:
297+
RUSTRDOCFLAGS: "-D warnings --cfg docsrs"
296298
with:
297-
command: rustdoc
298-
args: ${{ env.ALMOST_ALL_FEATURES }} -- -D warnings --cfg docsrs
299+
command: doc
300+
args: --workspace ${{ env.ALMOST_ALL_FEATURES }}
299301

300302
udeps:
301303
name: Unused Dependencies
@@ -327,4 +329,4 @@ jobs:
327329
uses: actions-rs/cargo@v1
328330
with:
329331
command: udeps
330-
args: --all --all-features --all-targets
332+
args: --workspace --all-features --all-targets

0 commit comments

Comments
 (0)