Skip to content

Commit

Permalink
Add preview apache pulsar sink changefeed docs v24.1 (#18529)
Browse files Browse the repository at this point in the history
  • Loading branch information
kathancox authored Jun 10, 2024
1 parent 3299246 commit f807529
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 2 deletions.
8 changes: 8 additions & 0 deletions src/current/_includes/v24.1/cdc/apache-pulsar-unsupported.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Changefeeds emitting to an Apache Pulsar sink do **not** support:

- [`format=avro`]({% link {{ page.version.version }}/create-changefeed.md %}#format)
- [`confluent_schema_registry`]({% link {{ page.version.version }}/create-changefeed.md %}#confluent-registry)
- [`topic_prefix`]({% link {{ page.version.version }}/create-changefeed.md %}#topic-prefix-param)
- Any batching configuration
- [Authentication query parameters]({% link {{ page.version.version }}/create-changefeed.md %}#query-parameters)
- [External connections]({% link {{ page.version.version }}/create-external-connection.md %})
6 changes: 6 additions & 0 deletions src/current/_includes/v24.1/cdc/apache-pulsar-uri.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{% include_cached copy-clipboard.html %}
~~~
pulsar://{host IP}:6650
~~~

By default, Apache Pulsar listens for client connections on port `:6650`. For more detail on configuration, refer to the [Apache Pulsar documentation](https://pulsar.apache.org/docs/2.10.x/reference-configuration).
92 changes: 91 additions & 1 deletion src/current/v24.1/changefeed-examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ For a summary of Core and {{ site.data.products.enterprise }} changefeed feature
- [Kafka](#create-a-changefeed-connected-to-kafka)
- [Google Cloud Pub/Sub](#create-a-changefeed-connected-to-a-google-cloud-pub-sub-sink)
- [Cloud Storage](#create-a-changefeed-connected-to-a-cloud-storage-sink) (Amazon S3, Google Cloud Storage, Azure Storage)
- {% include_cached new-in.html version="v24.1" %} [Azure Event Hubs](#create-a-changefeed-connected-to-an-azure-event-hubs-sink)
- [Webhook](#create-a-changefeed-connected-to-a-webhook-sink)
- {% include_cached new-in.html version="v24.1" %} [Azure Event Hubs](#create-a-changefeed-connected-to-an-azure-event-hubs-sink)
- {% include_cached new-in.html version="v24.1" %} [Apache Pulsar](#create-a-changefeed-connected-to-an-apache-pulsar-sink) (Preview)

Refer to the [Changefeed Sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}) page for more detail on forming sink URIs, available sink query parameters, and specifics on configuration.

Expand Down Expand Up @@ -697,6 +698,95 @@ In this example, you'll set up a changefeed for a single-node cluster that is co

For more detail on emitted changefeed messages, see [responses]({% link {{ page.version.version }}/changefeed-messages.md %}#responses).

## Create a changefeed connected to an Apache Pulsar sink

{{site.data.alerts.callout_info}}
{% include feature-phases/preview.md %}
{{site.data.alerts.end}}

{% include_cached new-in.html version="v24.1" %} In this example, you'll set up a changefeed for a single-node cluster that is connected to an [Apache Pulsar](https://pulsar.apache.org/docs/next/getting-started-standalone/) sink. The changefeed will watch a table and send messages to the sink.
{% include {{ page.version.version }}/cdc/examples-license-workload.md %}
{% include {{ page.version.version }}/cdc/sql-cluster-settings-example.md %}
1. To prepare a Pulsar sink, refer to the [Apache Pulsar documentation](https://pulsar.apache.org/docs/next/getting-started-standalone/) for setup guides to host Pulsar on a local cluster, Docker, or a Kubernetes cluster.
1. In a terminal window where your Pulsar sink is hosted, start the cluster:
{% include_cached copy-clipboard.html %}
~~~ shell
bin/pulsar standalone
~~~
If you're running Pulsar in a Docker container, use the `docker run` command to start the cluster:

{% include_cached copy-clipboard.html %}
~~~ shell
docker run -it -p 6650:6650 -p 8080:8080 --name pulsar-standalone apachepulsar/pulsar:latest bin/pulsar standalone
~~~

{{site.data.alerts.callout_info}}
You can start a changefeed, and Pulsar will automatically use the table as the topic name.

If you want to create a topic name first, use the [`pulsar-admin`](https://pulsar.apache.org/docs/2.10.x/reference-cli-tool) tool to specify the topic's tenant and namespace. This example uses the default namespace:
{% include_cached copy-clipboard.html %}
~~~ shell
bin/pulsar-admin topics create persistent://public/default/topic-name
~~~
For more detail on persistent topics and working with topic resources, refer to the [Manage Topics](https://pulsar.apache.org/docs/3.2.x/admin-api-topics/) Apache Pulsar documentation.
{{site.data.alerts.end}}
1. Enter the SQL shell:
{% include_cached copy-clipboard.html %}
~~~ shell
cockroach sql --insecure
~~~
1. Create your changefeed:
{% include_cached copy-clipboard.html %}
~~~ sql
CREATE CHANGEFEED FOR TABLE movr.rides INTO 'pulsar://{host IP}:6650';
~~~
By default, Apache Pulsar listens for client connections on port `:6650`. For more detail on configuration, refer to the [Apache Pulsar documentation](https://pulsar.apache.org/docs/2.10.x/reference-configuration).
Changefeeds emitting to a Pulsar sink do not support external connections or a number of changefeed options. For a full list, refer to the [Changefeed Sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#apache-pulsar) page.
1. In a different terminal window, start a [Pulsar consumer](https://pulsar.apache.org/docs/next/tutorials-produce-consume/) to read messages from the changefeed. This example consumes messages from the `rides` topic:
{% include_cached copy-clipboard.html %}
~~~ shell
bin/pulsar-client consume rides -s sub1 -n 0
~~~
If you're running Pulsar in a Docker container, use the `docker run` command to start a consumer:

{% include_cached copy-clipboard.html %}
~~~ shell
docker run -it --network="host" apachepulsar/pulsar:latest bin/pulsar-client consume rides -s sub1 -n 0
~~~

You will receive the changefeed's messages similar to the following:
~~~
----- got message -----
key:[null], properties:[], content:{"Key":["seattle", "09265ab7-5f3a-40cb-a543-d37c8c893793"],"Value":{"after": {"city": "seattle", "end_address": null, "end_time": null, "id": "09265ab7-5f3a-40cb-a543-d37c8c893793", "revenue": 53.00, "rider_id": "44576296-d4a7-4e79-add9-f880dd951064", "start_address": "25795 Alyssa Extensions", "start_time": "2024-05-09T12:18:42.022952", "vehicle_city": "seattle", "vehicle_id": "a0c935f6-8872-408e-bc12-4d0b5a85fa71"}},"Topic":"rides"}
----- got message -----
key:[null], properties:[], content:{"Key":["amsterdam", "b3548485-9475-44cf-9769-66617b9cb151"],"Value":{"after": {"city": "amsterdam", "end_address": null, "end_time": null, "id": "b3548485-9475-44cf-9769-66617b9cb151", "revenue": 25.00, "rider_id": "adf4656f-6a0d-4315-b035-eaf7fa6b85eb", "start_address": "49614 Victoria Cliff Apt. 25", "start_time": "2024-05-09T12:18:42.763718", "vehicle_city": "amsterdam", "vehicle_id": "eb1d1d2c-865e-4a40-a7d7-8f396c1c063f"}},"Topic":"rides"}
----- got message -----
key:[null], properties:[], content:{"Key":["amsterdam", "d119f344-318f-41c0-bfc0-b778e6e38f9a"],"Value":{"after": {"city": "amsterdam", "end_address": null, "end_time": null, "id": "d119f344-318f-41c0-bfc0-b778e6e38f9a", "revenue": 24.00, "rider_id": "1a242414-f704-4e1f-9f5e-2b468af0c2d1", "start_address": "54909 Douglas Street Suite 51", "start_time": "2024-05-09T12:18:42.369755", "vehicle_city": "amsterdam", "vehicle_id": "99d98e05-3114-460e-bb02-828bcd745d44"}},"Topic":"rides"}
----- got message -----
key:[null], properties:[], content:{"Key":["rome", "3c7d6676-f713-4985-ba52-4c19fe6c3692"],"Value":{"after": {"city": "rome", "end_address": null, "end_time": null, "id": "3c7d6676-f713-4985-ba52-4c19fe6c3692", "revenue": 27.00, "rider_id": "c15a4926-fbb2-4931-a9a0-6dfabc6c506b", "start_address": "39415 Brandon Avenue Apt. 29", "start_time": "2024-05-09T12:18:42.055498", "vehicle_city": "rome", "vehicle_id": "627dad1a-3531-4214-a173-16bcc6b93036"}},"Topic":"rides"}
~~~
For more detail on emitted changefeed messages, refer to [Responses]({% link {{ page.version.version }}/changefeed-messages.md %}#responses).
</section>
<section class="filter-content" markdown="1" data-scope="core">
Expand Down
32 changes: 32 additions & 0 deletions src/current/v24.1/changefeed-sinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ CockroachDB supports the following sinks:
- [Cloud Storage](#cloud-storage-sink) / HTTP
- [Webhook](#webhook-sink)
- {% include_cached new-in.html version="v24.1" %} [Azure Event Hubs](#azure-event-hubs)
- {% include_cached new-in.html version="v24.1" %} [Apache Pulsar](#apache-pulsar) (in Preview)

The [`CREATE CHANGEFEED`]({% link {{ page.version.version }}/create-changefeed.md %}) page provides detail on using the SQL statement and a complete list of the [query parameters]({% link {{ page.version.version }}/create-changefeed.md %}#query-parameters) and options available when setting up a changefeed.

Expand Down Expand Up @@ -608,6 +609,37 @@ The following parameters are also needed, but are **set by default** in Cockroac
- `sasl_handshake=true`
- `sasl_mechanism=PLAIN`

## Apache Pulsar

{{site.data.alerts.callout_info}}
{% include feature-phases/preview.md %}
{{site.data.alerts.end}}

{% include_cached new-in.html version="v24.1" %} Changefeeds can deliver messages to [Apache Pulsar](https://pulsar.apache.org/docs).

A Pulsar sink URI:

{% include {{ page.version.version }}/cdc/apache-pulsar-uri.md %}

Changefeeds emitting to an Apache Pulsar sink support `json` and `csv` [format options]({% link {{ page.version.version }}/create-changefeed.md %}#format).

{% include {{ page.version.version }}/cdc/apache-pulsar-unsupported.md %}

For an Apache Pulsar setup example, refer to the [Changefeed Examples]({% link {{ page.version.version }}/changefeed-examples.md %}#create-a-changefeed-connected-to-an-apache-pulsar-sink) page.

### Apache Pulsar sink messages

~~~
----- got message -----
key:[null], properties:[], content:{"Key":["seattle", "09265ab7-5f3a-40cb-a543-d37c8c893793"],"Value":{"after": {"city": "seattle", "end_address": null, "end_time": null, "id": "09265ab7-5f3a-40cb-a543-d37c8c893793", "revenue": 53.00, "rider_id": "44576296-d4a7-4e79-add9-f880dd951064", "start_address": "25795 Alyssa Extensions", "start_time": "2024-05-09T12:18:42.022952", "vehicle_city": "seattle", "vehicle_id": "a0c935f6-8872-408e-bc12-4d0b5a85fa71"}},"Topic":"rides"}
----- got message -----
key:[null], properties:[], content:{"Key":["amsterdam", "b3548485-9475-44cf-9769-66617b9cb151"],"Value":{"after": {"city": "amsterdam", "end_address": null, "end_time": null, "id": "b3548485-9475-44cf-9769-66617b9cb151", "revenue": 25.00, "rider_id": "adf4656f-6a0d-4315-b035-eaf7fa6b85eb", "start_address": "49614 Victoria Cliff Apt. 25", "start_time": "2024-05-09T12:18:42.763718", "vehicle_city": "amsterdam", "vehicle_id": "eb1d1d2c-865e-4a40-a7d7-8f396c1c063f"}},"Topic":"rides"}
----- got message -----
key:[null], properties:[], content:{"Key":["amsterdam", "d119f344-318f-41c0-bfc0-b778e6e38f9a"],"Value":{"after": {"city": "amsterdam", "end_address": null, "end_time": null, "id": "d119f344-318f-41c0-bfc0-b778e6e38f9a", "revenue": 24.00, "rider_id": "1a242414-f704-4e1f-9f5e-2b468af0c2d1", "start_address": "54909 Douglas Street Suite 51", "start_time": "2024-05-09T12:18:42.369755", "vehicle_city": "amsterdam", "vehicle_id": "99d98e05-3114-460e-bb02-828bcd745d44"}},"Topic":"rides"}
----- got message -----
key:[null], properties:[], content:{"Key":["rome", "3c7d6676-f713-4985-ba52-4c19fe6c3692"],"Value":{"after": {"city": "rome", "end_address": null, "end_time": null, "id": "3c7d6676-f713-4985-ba52-4c19fe6c3692", "revenue": 27.00, "rider_id": "c15a4926-fbb2-4931-a9a0-6dfabc6c506b", "start_address": "39415 Brandon Avenue Apt. 29", "start_time": "2024-05-09T12:18:42.055498", "vehicle_city": "rome", "vehicle_id": "627dad1a-3531-4214-a173-16bcc6b93036"}},"Topic":"rides"}
~~~

## See also

- [Use Cloud Storage]({% link {{ page.version.version }}/use-cloud-storage.md %})
Expand Down
15 changes: 15 additions & 0 deletions src/current/v24.1/cockroachdb-feature-availability.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,21 @@ $ cockroach sql --user=jpointsman --insecure
Enter password:
~~~

### Apache Pulsar changefeed sink

Changefeeds can deliver messages to [Apache Pulsar](https://pulsar.apache.org/docs).

A Pulsar sink URI:

{% include_cached copy-clipboard.html %}
~~~
pulsar://localhost:6650
~~~

{% include {{ page.version.version }}/cdc/apache-pulsar-unsupported.md %}

For an Apache Pulsar setup example, refer to the [Changefeed Examples]({% link {{ page.version.version }}/changefeed-examples.md %}#create-a-changefeed-connected-to-an-apache-pulsar-sink) page.

### Core implementation of changefeeds

The [`EXPERIMENTAL CHANGEFEED FOR`]({% link {{ page.version.version }}/changefeed-for.md %}) statement creates a new core changefeed, which streams row-level changes to the client indefinitely until the underlying connection is closed or the changefeed is canceled. A core changefeed can watch one table or multiple tables in a comma-separated list.
Expand Down
14 changes: 13 additions & 1 deletion src/current/v24.1/create-changefeed.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ Example of a webhook URI:

Refer to [Changefeed Sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink) for specifics on webhook sink configuration.

#### Apache Pulsar

{{site.data.alerts.callout_info}}
{% include feature-phases/preview.md %}
{{site.data.alerts.end}}

{% include_cached new-in.html version="v24.1" %} Example for an [Apache Pulsar sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#apache-pulsar) URI:

{% include {{ page.version.version }}/cdc/apache-pulsar-uri.md %}

Changefeeds emitting to a Pulsar sink do not support external connections or a number of changefeed options. For a full list, refer to the [Changefeed Sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#apache-pulsar) page.

### Query parameters

{% include {{ page.version.version }}/cdc/url-encoding.md %}
Expand Down Expand Up @@ -186,7 +198,7 @@ Option | Value | Description
<a name="lagging-ranges-polling"></a>`lagging_ranges_polling_interval` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Set the interval rate for when lagging ranges are checked and the `lagging_ranges` metric is updated. Polling adds latency to the `lagging_ranges` metric being updated. For example, if a range falls behind by 3 minutes, the metric may not update until an additional minute afterward.<br><br>**Default:** `1m`
`metrics_label` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Define a metrics label to which the metrics for one or multiple changefeeds increment. All changefeeds also have their metrics aggregated.<br><br>The maximum length of a label is 128 bytes. There is a limit of 1024 unique labels.<br><br>`WITH metrics_label=label_name` <br><br>For more detail on usage and considerations, see [Using changefeed metrics labels]({% link {{ page.version.version }}/monitor-and-debug-changefeeds.md %}#using-changefeed-metrics-labels).
<a name="min-checkpoint-frequency"></a>`min_checkpoint_frequency` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Controls how often nodes flush their progress to the [coordinating changefeed node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). Changefeeds will wait for at least the specified duration before a flush to the sink. This can help you control the flush frequency of higher latency sinks to achieve better throughput. If this is set to `0s`, a node will flush as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, then `min_checkpoint_frequency` is the amount of time that changefeed will need to catch up. That is, it could emit duplicate messages during this time. <br><br>**Note:** [`resolved`](#resolved-option) messages will not be emitted more frequently than the configured `min_checkpoint_frequency` (but may be emitted less frequently). Since `min_checkpoint_frequency` defaults to `30s`, you **must** configure `min_checkpoint_frequency` to at least the desired `resolved` message frequency if you require `resolved` messages more frequently than `30s`.<br><br>**Default:** `30s`
`mvcc_timestamp` | N/A | Include the [MVCC]({% link {{ page.version.version }}/architecture/storage-layer.md %}#mvcc) timestamp for each emitted row in a changefeed. With the `mvcc_timestamp` option, each emitted row will always contain its MVCC timestamp, even during the changefeed's initial backfill.
<a name="mvcc-timestamp"></a>`mvcc_timestamp` | N/A | Include the [MVCC]({% link {{ page.version.version }}/architecture/storage-layer.md %}#mvcc) timestamp for each emitted row in a changefeed. With the `mvcc_timestamp` option, each emitted row will always contain its MVCC timestamp, even during the changefeed's initial backfill.
<a name="on-error"></a>`on_error` | `pause` / `fail` | Use `on_error=pause` to pause the changefeed when encountering **non**-retryable errors. `on_error=pause` will pause the changefeed instead of sending it into a terminal failure state. **Note:** Retryable errors will continue to be retried with this option specified. <br><br>Use with [`protect_data_from_gc_on_pause`](#protect-pause) to protect changes from [garbage collection]({% link {{ page.version.version }}/configure-replication-zones.md %}#gc-ttlseconds).<br><br>If a changefeed with `on_error=pause` is running when a watched table is [truncated]({% link {{ page.version.version }}/truncate.md %}), the changefeed will pause but will not be able to resume reads from that table. Using [`ALTER CHANGEFEED`]({% link {{ page.version.version }}/alter-changefeed.md %}) to drop the table from the changefeed and then [resuming the job]({% link {{ page.version.version }}/resume-job.md %}) will work, but you cannot add the same table to the changefeed again. Instead, you will need to [create a new changefeed](#start-a-new-changefeed-where-another-ended) for that table.<br><br>Default: `on_error=fail`
<a name="protect-pause"></a>`protect_data_from_gc_on_pause` | N/A | This option is deprecated as of v23.2 and will be removed in a future release.<br><br>When a [changefeed is paused]({% link {{ page.version.version }}/pause-job.md %}), ensure that the data needed to [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}) is not garbage collected. If `protect_data_from_gc_on_pause` is **unset**, pausing the changefeed will release the existing protected timestamp records. It is also important to note that pausing and adding `protect_data_from_gc_on_pause` to a changefeed will not protect data if the [garbage collection]({% link {{ page.version.version }}/configure-replication-zones.md %}#gc-ttlseconds) window has already passed. <br><br>Use with [`on_error=pause`](#on-error) to protect changes from garbage collection when encountering non-retryable errors. <br><br>Refer to [Protect Changefeed Data from Garbage Collection]({% link {{ page.version.version }}/protect-changefeed-data.md %}) for more detail on protecting changefeed data.<br><br>**Note:** If you use this option, changefeeds that are left paused for long periods of time can prevent garbage collection. Use with the [`gc_protect_expires_after`](#gc-protect-expire) option to set a limit for protected data and for how long a changefeed will remain paused.
`pubsub_sink_config` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Set fields to configure sink batching and retries. The schema is as follows:<br><br> `{ "Flush": { "Messages": ..., "Bytes": ..., "Frequency": ..., }, "Retry": {"Max": ..., "Backoff": ..., } }`. <br><br>**Note** that if either `Messages` or `Bytes` are nonzero, then a non-zero value for `Frequency` must be provided. <br><br>Refer to [Pub/Sub sink configuration]({% link {{ page.version.version }}/changefeed-sinks.md %}#pub-sub-sink-configuration) for more details on using this option.
Expand Down

0 comments on commit f807529

Please sign in to comment.