From cdfbe6c2aeadf46f18a3ceccff5835af117cc256 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Tue, 19 Mar 2024 15:59:05 +0100 Subject: [PATCH 1/7] Example: multiple sources, single processor --- .../multiple-sources-with-processor.yml | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 examples/pipelines/multiple-sources-with-processor.yml diff --git a/examples/pipelines/multiple-sources-with-processor.yml b/examples/pipelines/multiple-sources-with-processor.yml new file mode 100644 index 000000000..23167a7e2 --- /dev/null +++ b/examples/pipelines/multiple-sources-with-processor.yml @@ -0,0 +1,35 @@ +version: 2.2 +pipelines: + - id: add-department + status: running + description: > + An example pipeline which gets data (imaginary employees) from two generator + sources and uses the built-in `field.set` processor to add a department field + to each of those. This is a pipeline processor, meaning that it processes records + from all sources. + connectors: + - id: company-1 + type: source + plugin: builtin:generator + settings: + format.type: "structured" + format.options: "id:int,name:string,company:string,trial:bool" + recordCount: "1" + - id: company-2 + type: source + plugin: builtin:generator + settings: + format.type: "structured" + format.options: "id:int,name:string,company:string,trial:bool" + recordCount: "2" + - id: file-destination + type: destination + plugin: builtin:file + settings: + path: /tmp/file-destination.txt + processors: + - id: extract-name + plugin: field.set + settings: + field: '.Payload.After.department' + value: 'finance' From ecbadcc58e9c153a4a34ba7e21014cb52aab1b10 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Tue, 19 Mar 2024 16:18:15 +0100 Subject: [PATCH 2/7] Example: multiple destinations --- examples/pipelines/multiple-destinations.yaml | 25 +++++++++++++++++++ .../multiple-sources-with-processor.yml | 4 +-- 2 files changed, 27 insertions(+), 2 deletions(-) create mode 100644 examples/pipelines/multiple-destinations.yaml diff --git a/examples/pipelines/multiple-destinations.yaml b/examples/pipelines/multiple-destinations.yaml new file mode 100644 index 000000000..662138014 --- /dev/null +++ b/examples/pipelines/multiple-destinations.yaml @@ -0,0 +1,25 @@ +version: 2.2 +pipelines: + - id: multiple-destinations + status: running + description: > + An example pipeline which gets data a generator source and writes it + to multiple file destinations. + connectors: + - id: employees-source + type: source + plugin: builtin:generator + settings: + format.type: "structured" + format.options: "id:int,name:string,company:string,trial:bool" + recordCount: "1" + - id: file-destination-1 + type: destination + plugin: builtin:file + settings: + path: /tmp/file-destination-1.txt + - id: file-destination-2 + type: destination + plugin: builtin:file + settings: + path: /tmp/file-destination-2.txt diff --git a/examples/pipelines/multiple-sources-with-processor.yml b/examples/pipelines/multiple-sources-with-processor.yml index 23167a7e2..00af6d23a 100644 --- a/examples/pipelines/multiple-sources-with-processor.yml +++ b/examples/pipelines/multiple-sources-with-processor.yml @@ -8,14 +8,14 @@ pipelines: to each of those. This is a pipeline processor, meaning that it processes records from all sources. connectors: - - id: company-1 + - id: employees-1 type: source plugin: builtin:generator settings: format.type: "structured" format.options: "id:int,name:string,company:string,trial:bool" recordCount: "1" - - id: company-2 + - id: employees-2 type: source plugin: builtin:generator settings: From dd2c0687dd9c29435e2aadf89989b1468ff16c55 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Tue, 19 Mar 2024 17:55:28 +0100 Subject: [PATCH 3/7] DLQ example --- README.md | 2 +- .../{file-to-file.yml => file-to-file.yaml} | 0 examples/pipelines/multiple-destinations.yaml | 4 +-- .../multiple-sources-with-processor.yml | 2 +- examples/pipelines/pipeline-with-dlq.yaml | 34 +++++++++++++++++++ 5 files changed, 38 insertions(+), 4 deletions(-) rename examples/pipelines/{file-to-file.yml => file-to-file.yaml} (100%) create mode 100644 examples/pipelines/pipeline-with-dlq.yaml diff --git a/README.md b/README.md index 22c60620d..8487d9fee 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ Conduit was created and open-sourced by [Meroxa](https://meroxa.io). 1. Download and extract the [latest release](https://github.com/conduitio/conduit/releases/latest). 2. Download - the [example pipeline](/examples/pipelines/file-to-file.yml) + the [example pipeline](/examples/pipelines/file-to-file.yaml) and put it in the directory named `pipelines` in the same directory as the Conduit binary. 3. Run Conduit (`./conduit`). The example pipeline will start automatically. diff --git a/examples/pipelines/file-to-file.yml b/examples/pipelines/file-to-file.yaml similarity index 100% rename from examples/pipelines/file-to-file.yml rename to examples/pipelines/file-to-file.yaml diff --git a/examples/pipelines/multiple-destinations.yaml b/examples/pipelines/multiple-destinations.yaml index 662138014..ee0ac3182 100644 --- a/examples/pipelines/multiple-destinations.yaml +++ b/examples/pipelines/multiple-destinations.yaml @@ -17,9 +17,9 @@ pipelines: type: destination plugin: builtin:file settings: - path: /tmp/file-destination-1.txt + path: ./example-1.out - id: file-destination-2 type: destination plugin: builtin:file settings: - path: /tmp/file-destination-2.txt + path: ./example-2.out diff --git a/examples/pipelines/multiple-sources-with-processor.yml b/examples/pipelines/multiple-sources-with-processor.yml index 00af6d23a..890fcab25 100644 --- a/examples/pipelines/multiple-sources-with-processor.yml +++ b/examples/pipelines/multiple-sources-with-processor.yml @@ -26,7 +26,7 @@ pipelines: type: destination plugin: builtin:file settings: - path: /tmp/file-destination.txt + path: ./example.out processors: - id: extract-name plugin: field.set diff --git a/examples/pipelines/pipeline-with-dlq.yaml b/examples/pipelines/pipeline-with-dlq.yaml new file mode 100644 index 000000000..20094b7e0 --- /dev/null +++ b/examples/pipelines/pipeline-with-dlq.yaml @@ -0,0 +1,34 @@ +version: 2.0 +pipelines: + - id: pipeline-with-dlq + status: running + description: > + An example pipeline with a dead-letter queue backed by the file connector. + The failed records will be written to dlq.out. + The failures are simulated with the field.convert processor that tries to + convert a non-numerical string into a float. + connectors: + - id: generator-source + type: source + plugin: builtin:generator + settings: + format.type: "structured" + format.options: "id:int,name:string" + recordCount: "1" + processors: + - id: convert-name + plugin: field.convert + settings: + field: .Payload.After.name + type: float + - id: file-destination + type: destination + plugin: builtin:file + settings: + path: ./example.out + dead-letter-queue: # Dead-letter queue (DLQ) configuration + plugin: "builtin:file" # DLQ Connector plugin + settings: # A map of configuration keys and values for the plugin (specific to the chosen plugin) + path: "./dlq.out" + window-size: 2 # DLQ nack window size + window-nack-threshold: 1 # DLQ nack window threshold From 4603ae16f180da393655351efb24f7bd8fe2fcb2 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 20 Mar 2024 11:41:29 +0100 Subject: [PATCH 4/7] pipeline with source processor --- examples/pipelines/multiple-destinations.yaml | 2 +- .../multiple-sources-with-processor.yml | 2 +- examples/pipelines/pipeline-with-dlq.yaml | 6 +-- examples/pipelines/source-processor.yml | 37 +++++++++++++++++++ 4 files changed, 42 insertions(+), 5 deletions(-) create mode 100644 examples/pipelines/source-processor.yml diff --git a/examples/pipelines/multiple-destinations.yaml b/examples/pipelines/multiple-destinations.yaml index ee0ac3182..64c0837f7 100644 --- a/examples/pipelines/multiple-destinations.yaml +++ b/examples/pipelines/multiple-destinations.yaml @@ -3,7 +3,7 @@ pipelines: - id: multiple-destinations status: running description: > - An example pipeline which gets data a generator source and writes it + An example pipeline that reads data one generator source and writes it to multiple file destinations. connectors: - id: employees-source diff --git a/examples/pipelines/multiple-sources-with-processor.yml b/examples/pipelines/multiple-sources-with-processor.yml index 890fcab25..248f64f0c 100644 --- a/examples/pipelines/multiple-sources-with-processor.yml +++ b/examples/pipelines/multiple-sources-with-processor.yml @@ -3,7 +3,7 @@ pipelines: - id: add-department status: running description: > - An example pipeline which gets data (imaginary employees) from two generator + An example pipeline that reads data (imaginary employees) from two generator sources and uses the built-in `field.set` processor to add a department field to each of those. This is a pipeline processor, meaning that it processes records from all sources. diff --git a/examples/pipelines/pipeline-with-dlq.yaml b/examples/pipelines/pipeline-with-dlq.yaml index 20094b7e0..2db5519ce 100644 --- a/examples/pipelines/pipeline-with-dlq.yaml +++ b/examples/pipelines/pipeline-with-dlq.yaml @@ -3,9 +3,9 @@ pipelines: - id: pipeline-with-dlq status: running description: > - An example pipeline with a dead-letter queue backed by the file connector. + An example pipeline with a dead-letter queue backed by the built-in file plugin. The failed records will be written to dlq.out. - The failures are simulated with the field.convert processor that tries to + The failures are simulated with the `field.convert` processor that tries to convert a non-numerical string into a float. connectors: - id: generator-source @@ -28,7 +28,7 @@ pipelines: path: ./example.out dead-letter-queue: # Dead-letter queue (DLQ) configuration plugin: "builtin:file" # DLQ Connector plugin - settings: # A map of configuration keys and values for the plugin (specific to the chosen plugin) + settings: # Configure the file plugin used for DLQ path: "./dlq.out" window-size: 2 # DLQ nack window size window-nack-threshold: 1 # DLQ nack window threshold diff --git a/examples/pipelines/source-processor.yml b/examples/pipelines/source-processor.yml new file mode 100644 index 000000000..648e60108 --- /dev/null +++ b/examples/pipelines/source-processor.yml @@ -0,0 +1,37 @@ +version: 2.2 +pipelines: + - id: add-department + status: running + description: > + An example pipeline which reads data (imaginary employees) from two generator + sources, processes it and writes it to a file. + + It attaches the built-in `field.set` processor to one of the sources + to add a `department` field to its records. The records from the other source + are not processed. + connectors: + - id: employees-1 + type: source + plugin: builtin:generator + settings: + format.type: "structured" + format.options: "id:int,name:string,company:string,trial:bool" + recordCount: "1" + processors: + - id: extract-name + plugin: field.set + settings: + field: '.Payload.After.department' + value: 'finance' + - id: employees-2 + type: source + plugin: builtin:generator + settings: + format.type: "structured" + format.options: "id:int,name:string,company:string,trial:bool" + recordCount: "2" + - id: file-destination + type: destination + plugin: builtin:file + settings: + path: ./example.out \ No newline at end of file From ba3a775de456b5b273d74482a8ea1c42d585fed1 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 20 Mar 2024 11:59:29 +0100 Subject: [PATCH 5/7] add js processor --- examples/pipelines/javascript-processor.yaml | 34 ++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 examples/pipelines/javascript-processor.yaml diff --git a/examples/pipelines/javascript-processor.yaml b/examples/pipelines/javascript-processor.yaml new file mode 100644 index 000000000..6bb6a1a6c --- /dev/null +++ b/examples/pipelines/javascript-processor.yaml @@ -0,0 +1,34 @@ +--- +version: 2.2 +pipelines: + - id: pipeline-with-js-processor + status: running + description: > + An example pipeline that uses the built-in JavaScript processor (`custom.javascript`). + + It reads data from the `example.in` file, prefixes every line with 'hi there' and writes the results + to the `example.out` file. + + Results can be viewed with `tail -f example.out | jq '.payload.after |= @base64d'`. + connectors: + - id: file-source + type: source + plugin: "builtin:file" + name: file-source + settings: + path: ./example.in + - id: file-destination + type: destination + plugin: "builtin:file" + name: file-destination + settings: + path: ./example.out + processors: + - id: greeting-processor + plugin: "custom.javascript" + settings: + script: | + function process(rec) { + rec.Payload.After = RawData("hi there " + String.fromCharCode.apply(String, rec.Payload.After)); + return rec; + } From 8d3ef9e87b6c41a077ac9ca4a849b05cba125da5 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 20 Mar 2024 12:05:23 +0100 Subject: [PATCH 6/7] linter --- .../connector/standalone/v1/internal/fromproto/specifier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/plugin/connector/standalone/v1/internal/fromproto/specifier.go b/pkg/plugin/connector/standalone/v1/internal/fromproto/specifier.go index c3af4b737..58a8e86ed 100644 --- a/pkg/plugin/connector/standalone/v1/internal/fromproto/specifier.go +++ b/pkg/plugin/connector/standalone/v1/internal/fromproto/specifier.go @@ -89,7 +89,7 @@ func SpecifierParameter(in *connectorv1.Specifier_Parameter) (connector.Paramete } // needed for backward compatibility, in.Required is converted to a validation of type ValidationTypeRequired // making sure not to duplicate the required validation - if in.Required && !requiredExists { //nolint: staticcheck // required is still supported for now + if in.Required && !requiredExists { //nolint:staticcheck // required is still supported for now validations = append(validations, connector.Validation{ //nolint: makezero // list is full so need to append Type: connector.ValidationTypeRequired, }) From 3667fd3a76d82aafa4eac0d024765e98245c18cd Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 20 Mar 2024 13:18:49 +0100 Subject: [PATCH 7/7] example with standalone connector --- examples/pipelines/pipeline-with-dlq.yaml | 2 +- .../pipeline-with-standalone-connector.yaml | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) create mode 100644 examples/pipelines/pipeline-with-standalone-connector.yaml diff --git a/examples/pipelines/pipeline-with-dlq.yaml b/examples/pipelines/pipeline-with-dlq.yaml index 2db5519ce..aa31301a3 100644 --- a/examples/pipelines/pipeline-with-dlq.yaml +++ b/examples/pipelines/pipeline-with-dlq.yaml @@ -1,4 +1,4 @@ -version: 2.0 +version: 2.2 pipelines: - id: pipeline-with-dlq status: running diff --git a/examples/pipelines/pipeline-with-standalone-connector.yaml b/examples/pipelines/pipeline-with-standalone-connector.yaml new file mode 100644 index 000000000..1770712a5 --- /dev/null +++ b/examples/pipelines/pipeline-with-standalone-connector.yaml @@ -0,0 +1,19 @@ +version: 2.2 +pipelines: + - id: file-to-file + status: running + description: > + An example pipeline reading from the standalone chaos connector and writing into a file destination. + + To use the chaos connector, download the appropariate binary from https://github.com/conduitio-labs/conduit-connector-chaos/releases/latest. + + The binary needs to be stored in the `connectors` directory, adjacent to the `conduit` binary. + connectors: + - id: chaos-source + type: source + plugin: standalone:chaos + - id: file-destination + type: destination + plugin: builtin:file + settings: + path: ./example.out