Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more example pipelines #1447

Merged
merged 10 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Conduit was created and open-sourced by [Meroxa](https://meroxa.io).
1. Download and extract
the [latest release](https://github.com/conduitio/conduit/releases/latest).
2. Download
the [example pipeline](/examples/pipelines/file-to-file.yml)
the [example pipeline](/examples/pipelines/file-to-file.yaml)
and put it in the directory named `pipelines` in the same directory as the
Conduit binary.
3. Run Conduit (`./conduit`). The example pipeline will start automatically.
Expand Down
34 changes: 34 additions & 0 deletions examples/pipelines/javascript-processor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
version: 2.2
pipelines:
- id: pipeline-with-js-processor
status: running
description: >
An example pipeline that uses the built-in JavaScript processor (`custom.javascript`).

It reads data from the `example.in` file, prefixes every line with 'hi there' and writes the results
to the `example.out` file.

Results can be viewed with `tail -f example.out | jq '.payload.after |= @base64d'`.
connectors:
- id: file-source
type: source
plugin: "builtin:file"
name: file-source
settings:
path: ./example.in
- id: file-destination
type: destination
plugin: "builtin:file"
name: file-destination
settings:
path: ./example.out
processors:
- id: greeting-processor
plugin: "custom.javascript"
settings:
script: |
function process(rec) {
rec.Payload.After = RawData("hi there " + String.fromCharCode.apply(String, rec.Payload.After));
return rec;
}
25 changes: 25 additions & 0 deletions examples/pipelines/multiple-destinations.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
version: 2.2
pipelines:
- id: multiple-destinations
status: running
description: >
An example pipeline that reads data one generator source and writes it
to multiple file destinations.
connectors:
- id: employees-source
type: source
plugin: builtin:generator
settings:
format.type: "structured"
format.options: "id:int,name:string,company:string,trial:bool"
recordCount: "1"
- id: file-destination-1
type: destination
plugin: builtin:file
settings:
path: ./example-1.out
- id: file-destination-2
type: destination
plugin: builtin:file
settings:
path: ./example-2.out
35 changes: 35 additions & 0 deletions examples/pipelines/multiple-sources-with-processor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
version: 2.2
pipelines:
- id: add-department
status: running
description: >
An example pipeline that reads data (imaginary employees) from two generator
sources and uses the built-in `field.set` processor to add a department field
to each of those. This is a pipeline processor, meaning that it processes records
from all sources.
connectors:
- id: employees-1
type: source
plugin: builtin:generator
settings:
format.type: "structured"
format.options: "id:int,name:string,company:string,trial:bool"
recordCount: "1"
- id: employees-2
type: source
plugin: builtin:generator
settings:
format.type: "structured"
format.options: "id:int,name:string,company:string,trial:bool"
recordCount: "2"
- id: file-destination
type: destination
plugin: builtin:file
settings:
path: ./example.out
processors:
- id: extract-name
plugin: field.set
settings:
field: '.Payload.After.department'
value: 'finance'
34 changes: 34 additions & 0 deletions examples/pipelines/pipeline-with-dlq.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
version: 2.2
pipelines:
- id: pipeline-with-dlq
status: running
description: >
An example pipeline with a dead-letter queue backed by the built-in file plugin.
The failed records will be written to dlq.out.
The failures are simulated with the `field.convert` processor that tries to
convert a non-numerical string into a float.
connectors:
- id: generator-source
type: source
plugin: builtin:generator
settings:
format.type: "structured"
format.options: "id:int,name:string"
recordCount: "1"
processors:
- id: convert-name
plugin: field.convert
settings:
field: .Payload.After.name
type: float
- id: file-destination
type: destination
plugin: builtin:file
settings:
path: ./example.out
dead-letter-queue: # Dead-letter queue (DLQ) configuration
plugin: "builtin:file" # DLQ Connector plugin
settings: # Configure the file plugin used for DLQ
path: "./dlq.out"
window-size: 2 # DLQ nack window size
window-nack-threshold: 1 # DLQ nack window threshold
19 changes: 19 additions & 0 deletions examples/pipelines/pipeline-with-standalone-connector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
version: 2.2
pipelines:
- id: file-to-file
status: running
description: >
An example pipeline reading from the standalone chaos connector and writing into a file destination.

To use the chaos connector, download the appropariate binary from https://github.com/conduitio-labs/conduit-connector-chaos/releases/latest.

The binary needs to be stored in the `connectors` directory, adjacent to the `conduit` binary.
connectors:
- id: chaos-source
type: source
plugin: standalone:chaos
- id: file-destination
type: destination
plugin: builtin:file
settings:
path: ./example.out
37 changes: 37 additions & 0 deletions examples/pipelines/source-processor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
version: 2.2
pipelines:
- id: add-department
status: running
description: >
An example pipeline which reads data (imaginary employees) from two generator
sources, processes it and writes it to a file.

It attaches the built-in `field.set` processor to one of the sources
to add a `department` field to its records. The records from the other source
are not processed.
connectors:
- id: employees-1
type: source
plugin: builtin:generator
settings:
format.type: "structured"
format.options: "id:int,name:string,company:string,trial:bool"
recordCount: "1"
processors:
- id: extract-name
plugin: field.set
settings:
field: '.Payload.After.department'
value: 'finance'
- id: employees-2
type: source
plugin: builtin:generator
settings:
format.type: "structured"
format.options: "id:int,name:string,company:string,trial:bool"
recordCount: "2"
- id: file-destination
type: destination
plugin: builtin:file
settings:
path: ./example.out
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func SpecifierParameter(in *connectorv1.Specifier_Parameter) (connector.Paramete
}
// needed for backward compatibility, in.Required is converted to a validation of type ValidationTypeRequired
// making sure not to duplicate the required validation
if in.Required && !requiredExists { //nolint: staticcheck // required is still supported for now
if in.Required && !requiredExists { //nolint:staticcheck // required is still supported for now
validations = append(validations, connector.Validation{ //nolint: makezero // list is full so need to append
Type: connector.ValidationTypeRequired,
})
Expand Down