Skip to content

Commit

Permalink
Update changefeed docs to reflect message ordering for re-emitted mes…
Browse files Browse the repository at this point in the history
…sages
  • Loading branch information
kathancox committed Jun 10, 2024
1 parent 506a4ca commit 9f9a50a
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/current/v22.2/change-data-capture-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ The [Advanced Changefeed Configuration](advanced-changefeed-configuration.html)

When an {{ site.data.products.enterprise }} changefeed is started on a node, that node becomes the _coordinator_ for the changefeed job (**Node 2** in the diagram). The coordinator node acts as an administrator: keeping track of all other nodes during job execution and the changefeed work as it completes. The changefeed job will run across all nodes in the cluster to access changed data in the watched table. Typically, the [leaseholder](architecture/replication-layer.html#leases) for a particular range (or the range’s replica) determines which node emits the changefeed data.

Each node uses its aggregator processors to send back checkpoint progress to the coordinator, which gathers this information to update the high-water mark timestamp. The high-water mark acts as a checkpoint for the changefeed’s job progress, and guarantees that all changes before (or at) the timestamp have been emitted. In the unlikely event that the changefeed’s coordinating node were to fail during the job, that role will move to a different node and the changefeed will restart from the last checkpoint. If restarted, the changefeed will send duplicate messages starting at the high-water mark time to the current time. See [Ordering Guarantees](changefeed-messages.html#ordering-guarantees) for detail on CockroachDB's at-least-once-delivery-guarantee as well as an explanation on how rows are emitted.
Each node uses its aggregator processors to send back checkpoint progress to the coordinator, which gathers this information to update the high-water mark timestamp. The high-water mark acts as a checkpoint for the changefeed’s job progress, and guarantees that all changes before (or at) the timestamp have been emitted. In the unlikely event that the changefeed’s coordinating node were to fail during the job, that role will move to a different node and the changefeed will restart from the last checkpoint. If restarted, the changefeed will re-emit messages starting at the high-water mark time to the current time. Refer to [Ordering Guarantees]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-guarantees) for detail on CockroachDB's at-least-once-delivery-guarantee and how per-key message ordering is applied.

<img src="{{ 'images/v22.2/changefeed-structure.png' | relative_url }}" alt="Changefeed process in a 3-node cluster" style="border:0px solid #eee;max-width:100%" />

Expand Down
8 changes: 6 additions & 2 deletions src/current/v22.2/changefeed-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ docs_area: stream_data
key: use-changefeeds.html
---

Changefeeds emit messages as changes happen to watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different [options](create-changefeed.html#options) (e.g., `format=avro`).
Changefeeds emit messages per row as change events happen to watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different [options](create-changefeed.html#options) (e.g., `format=avro`).

This page describes the format and behavior of changefeed messages. You will find the following information on this page:

Expand Down Expand Up @@ -53,7 +53,11 @@ See [changefeed files](create-changefeed.html#files) for more detail on the file

- In most cases, each version of a row will be emitted once. However, some infrequent conditions (e.g., node failures, network partitions) will cause them to be repeated. This gives our changefeeds an **at-least-once delivery guarantee**.

- Once a row has been emitted with some timestamp, no previously unseen versions of that row will be emitted with a lower timestamp. That is, you will never see a _new_ change for that row at an earlier timestamp.
- For the first emission of a row with some timestamp, no previously unseen versions of that row will be emitted with a lower timestamp. That is, you will never see a _new_ change for that row at an earlier timestamp.

{{site.data.alerts.callout_info}}
Under some circumstances (e.g., node restarts, retryable errors), a changefeed will emit duplicate messages of row updates. Changefeeds can emit duplicate messages in any order.
{{site.data.alerts.end}}

For example, if you ran the following:

Expand Down
2 changes: 1 addition & 1 deletion src/current/v23.1/change-data-capture-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ The main feature of CockroachDB CDC is the _changefeed_, which targets an allowl

## Stream row-level changes with changefeeds

Changefeeds are customizable _jobs_ that track row-level changes and send data in real time in a preferred format to your specified destination, known as a _sink_. Each version of a row emitted to the sink are subject to an at-least-once delivery guarantee and are ordered by timestamp.
Changefeeds are customizable _jobs_ that track row-level changes and send data in realtime in a preferred format to your specified destination, known as a _sink_. The first emission of a row's change event to the sink is subject to an at-least-once delivery guarantee and ordered by timestamp.

CockroachDB has two implementations of changefeeds:

Expand Down
24 changes: 15 additions & 9 deletions src/current/v23.1/changefeed-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ docs_area: stream_data
key: use-changefeeds.html
---

Changefeeds emit messages as changes happen to watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different [options]({% link {{ page.version.version }}/create-changefeed.md %}#options) (e.g., `format=avro`).
Changefeeds emit messages per row as change events happen to watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different [options]({% link {{ page.version.version }}/create-changefeed.md %}#options) (e.g., `format=avro`).

This page describes the format and behavior of changefeed messages. You will find the following information on this page:

- [Responses](#responses): The general format of changefeed messages.
- [Message envelopes](#message-envelopes): The structure of the changefeed message.
- [Ordering and delivery guarantees](#ordering-and-delivery-guarantees): CockroachDB's guarantees for a changefeed's message ordering.
- [Ordering and delivery guarantees](#ordering-and-delivery-guarantees): CockroachDB's guarantees for a changefeed's message ordering and delivery.
- [Delete messages](#delete-messages): The format of messages when a row is deleted.
- [Resolved messages](#resolved-messages): The resolved timestamp option and how to configure it.
- [Duplicate messages](#duplicate-messages): The causes of duplicate messages from a changefeed.
Expand Down Expand Up @@ -198,20 +198,22 @@ CREATE CHANGEFEED FOR TABLE rides INTO 'external://kafka' WITH diff, envelope=wr

Changefeeds provide the following guarantees for message delivery to changefeed sinks:

- [Per-key ordering](#per-key-ordering)
- [At-least-once delivery](#at-least-once-delivery)
- [Per-key ordering](#per-key-ordering) for the first emission of an event's message.
- [At-least-once delivery](#at-least-once-delivery) per event message.

{{site.data.alerts.callout_info}}
Changefeeds do not support total message ordering or transactional ordering of messages.
{{site.data.alerts.end}}

### Per-key ordering

Changefeeds provide a _per-key ordering guarantee_ for messages emitted to the sink. Once the changefeed has emitted a row with a timestamp, the changefeed will not emit any previously unseen versions of that row with a lower timestamp. Therefore, you will never receive a **new** change for that row at an earlier timestamp.
Changefeeds provide a _per-key ordering guarantee_ for the **first emission** of a message to the sink. Once the changefeed has emitted a row with a timestamp, the changefeed will not emit any previously unseen versions of that row with a lower timestamp. Therefore, you will never receive a new change for that row at an earlier timestamp.

For example, a changefeed can emit updates to rows `A` at timestamp `T1`, `B` at `T2`, and `C` at `T3` in any order.

When there are updates to rows `A` at `T1`, `B` at `T2`, and `A` at `T3`, the changefeed will always emit `A` at `T3` **after** `A` at `T1`. However, `A` at `T3` could precede or follow `B` at `T2`. This is because there is no timestamp ordering between keys.
When there are updates to rows `A` at `T1`, `B` at `T2`, and `A` at `T3`, the changefeed will always emit `A` at `T3` **after** `A` at `T1`. However, `A` at `T3` could precede or follow `B` at `T2`, because there is no timestamp ordering between keys.

Under some circumstances, a changefeed will emit [duplicate messages](#duplicate-messages) of row updates. Changefeeds can emit duplicate messages in any order.

As an example, you run the following sequence of SQL statements to create a changefeed:

Expand Down Expand Up @@ -264,7 +266,7 @@ As an example, you run the following sequence of SQL statements to create a chan
{"after": {"id": 4, "name": "Danny", "office": "los angeles"}, "key": [4], "updated": "1701102561022789676.0000000000"}
~~~

The messages received at the sink are in order by timestamp **for each key**. Here, the update for key `[1]` is emitted before the insertion of key `[2]` even though the timestamp for the update to key `[1]` is higher. That is, if you follow the sequence of updates for a particular key at the sink, they will always be in the correct timestamp order.
The messages received at the sink are in order by timestamp **for each key**. Here, the update for key `[1]` is emitted before the insertion of key `[2]` even though the timestamp for the update to key `[1]` is higher. That is, if you follow the sequence of updates for a particular key at the sink, they will be in the correct timestamp order. However, re-emitted messages (duplicates) may not be in the correct timestamp order. For details on when changefeeds might re-emit messages, refer to [Duplicate messages](#duplicate-messages).

The `updated` option adds an `updated` timestamp to each emitted row. You can also use the [`resolved` option](#resolved-messages) to emit a `resolved` timestamp message to each Kafka partition, or to a separate file at a cloud storage sink. A `resolved` timestamp guarantees that no (previously unseen) rows with a lower update timestamp will be emitted on that partition.

Expand Down Expand Up @@ -320,7 +322,11 @@ For example, the checkpoints and changefeed pauses marked in this output show ho
[checkpoint]
~~~
With duplicates removed, an individual row is emitted in the same order as the transactions that updated it. However, this is not true for updates to two different rows, even two rows in the same table. (Refer to [Per-key ordering](#per-key-ordering).)
In this example, with duplicates removed, an individual row is emitted in the same order as the transactions that updated it. However, this is not true for updates to two different rows, even two rows in the same table. (Refer to [Per-key ordering](#per-key-ordering).)
{{site.data.alerts.callout_danger}}
The first time a message is delivered, it will be in the correct timestamp order, which follows the [per-key ordering guarantee](#per-key-ordering). However, [duplicate messages](#duplicate-messages) for an individual row may **not** be in the correct timestamp order.
{{site.data.alerts.end}}
To compare two different rows for [happens-before](https://wikipedia.org/wiki/Happened-before), compare the `updated` timestamp. This works across anything in the same cluster (tables, nodes, etc.).
Expand Down Expand Up @@ -365,7 +371,7 @@ Under some circumstances, changefeeds will emit duplicate messages to ensure the

A changefeed job cannot confirm that a message has been received by the sink unless the changefeed has reached a checkpoint. As a changefeed job runs, each node will send checkpoint progress to the job's coordinator node. These progress reports allow the coordinator to update the high-water mark timestamp confirming that all changes before (or at) the timestamp have been emitted.
When a changefeed must pause and then it resumes, it will return to the last checkpoint (**A**), which is the last point at which the coordinator confirmed all changes for the given timestamp. As a result, when the changefeed resumes, it will re-emit the messages that had sent after the last checkpoint, but were not confirmed in the next checkpoint.
When a changefeed must pause and then it resumes, it will return to the last checkpoint (**A**), which is the last point at which the coordinator confirmed all changes for the given timestamp. As a result, when the changefeed resumes, it will re-emit the messages that had sent after the last checkpoint, but were not confirmed in the next checkpoint. The re-emitted messages may **not** be in [order](#per-key-ordering) by timestamp.
<img src="{{ 'images/v23.1/changefeed-duplicate-messages-emit.png' | relative_url }}" alt="How checkpoints will re-emit messages when a changefeed pauses. The changefeed returns to the last checkpoint and potentially sends duplicate messages." style="border:0px solid #eee;max-width:100%" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ docs_area: stream_data

When an {{ site.data.products.enterprise }} changefeed is started on a node, that node becomes the _coordinator_ for the changefeed job (**Node 2** in the diagram). The coordinator node acts as an administrator: keeping track of all other nodes during job execution and the changefeed work as it completes. The changefeed job will run across all nodes in the cluster to access changed data in the watched table. Typically, the [leaseholder]({% link {{ page.version.version }}/architecture/replication-layer.md %}#leases) for a particular range (or the range’s replica) determines which node emits the changefeed data.

Each node uses its aggregator processors to send back checkpoint progress to the coordinator, which gathers this information to update the high-water mark timestamp. The high-water mark acts as a checkpoint for the changefeed’s job progress, and guarantees that all changes before (or at) the timestamp have been emitted. In the unlikely event that the changefeed’s coordinating node were to fail during the job, that role will move to a different node and the changefeed will restart from the last checkpoint. If restarted, the changefeed will send duplicate messages starting at the high-water mark time to the current time. See [Ordering Guarantees]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) for detail on CockroachDB's at-least-once-delivery-guarantee as well as an explanation on how rows are emitted.
Each node uses its aggregator processors to send back checkpoint progress to the coordinator, which gathers this information to update the high-water mark timestamp. The high-water mark acts as a checkpoint for the changefeed’s job progress, and guarantees that all changes before (or at) the timestamp have been emitted. In the unlikely event that the changefeed’s coordinating node were to fail during the job, that role will move to a different node and the changefeed will restart from the last checkpoint. If restarted, the changefeed will [re-emit messages]({% link {{ page.version.version }}/changefeed-messages.md %}#duplicate-messages) starting at the high-water mark time to the current time. Refer to [Ordering Guarantees]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) for detail on CockroachDB's at-least-once-delivery-guarantee and how per-key message ordering is applied.

<img src="{{ 'images/v23.1/changefeed-structure.png' | relative_url }}" alt="Changefeed process in a 3-node cluster" style="border:0px solid #eee;max-width:100%" />

Expand Down
2 changes: 1 addition & 1 deletion src/current/v23.2/change-data-capture-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ The main feature of CockroachDB CDC is the _changefeed_, which targets an allowl

## Stream row-level changes with changefeeds

Changefeeds are customizable _jobs_ that track row-level changes and send data in realtime in a preferred format to your specified destination, known as a _sink_. Each version of a row emitted to the sink are subject to an at-least-once delivery guarantee and are ordered by timestamp.
Changefeeds are customizable _jobs_ that track row-level changes and send data in realtime in a preferred format to your specified destination, known as a _sink_. The first emission of a row's change event to the sink is subject to an at-least-once delivery guarantee and ordered by timestamp.

CockroachDB has two implementations of changefeeds:

Expand Down
Loading

0 comments on commit 9f9a50a

Please sign in to comment.