From 9f9a50ae0955d8324577c8695458e3aeb799e84f Mon Sep 17 00:00:00 2001 From: Kathryn Hancox Date: Thu, 6 Jun 2024 15:08:54 -0400 Subject: [PATCH] Update changefeed docs to reflect message ordering for re-emitted messages --- .../v22.2/change-data-capture-overview.md | 2 +- src/current/v22.2/changefeed-messages.md | 8 +++++-- .../v23.1/change-data-capture-overview.md | 2 +- src/current/v23.1/changefeed-messages.md | 24 ++++++++++++------- .../how-does-an-enterprise-changefeed-work.md | 2 +- .../v23.2/change-data-capture-overview.md | 2 +- src/current/v23.2/changefeed-messages.md | 24 ++++++++++++------- .../how-does-an-enterprise-changefeed-work.md | 2 +- .../v24.1/change-data-capture-overview.md | 2 +- src/current/v24.1/changefeed-messages.md | 24 ++++++++++++------- .../how-does-an-enterprise-changefeed-work.md | 2 +- 11 files changed, 58 insertions(+), 36 deletions(-) diff --git a/src/current/v22.2/change-data-capture-overview.md b/src/current/v22.2/change-data-capture-overview.md index 1582287f657..e08f292f421 100644 --- a/src/current/v22.2/change-data-capture-overview.md +++ b/src/current/v22.2/change-data-capture-overview.md @@ -33,7 +33,7 @@ The [Advanced Changefeed Configuration](advanced-changefeed-configuration.html) When an {{ site.data.products.enterprise }} changefeed is started on a node, that node becomes the _coordinator_ for the changefeed job (**Node 2** in the diagram). The coordinator node acts as an administrator: keeping track of all other nodes during job execution and the changefeed work as it completes. The changefeed job will run across all nodes in the cluster to access changed data in the watched table. Typically, the [leaseholder](architecture/replication-layer.html#leases) for a particular range (or the range’s replica) determines which node emits the changefeed data. -Each node uses its aggregator processors to send back checkpoint progress to the coordinator, which gathers this information to update the high-water mark timestamp. The high-water mark acts as a checkpoint for the changefeed’s job progress, and guarantees that all changes before (or at) the timestamp have been emitted. In the unlikely event that the changefeed’s coordinating node were to fail during the job, that role will move to a different node and the changefeed will restart from the last checkpoint. If restarted, the changefeed will send duplicate messages starting at the high-water mark time to the current time. See [Ordering Guarantees](changefeed-messages.html#ordering-guarantees) for detail on CockroachDB's at-least-once-delivery-guarantee as well as an explanation on how rows are emitted. +Each node uses its aggregator processors to send back checkpoint progress to the coordinator, which gathers this information to update the high-water mark timestamp. The high-water mark acts as a checkpoint for the changefeed’s job progress, and guarantees that all changes before (or at) the timestamp have been emitted. In the unlikely event that the changefeed’s coordinating node were to fail during the job, that role will move to a different node and the changefeed will restart from the last checkpoint. If restarted, the changefeed will re-emit messages starting at the high-water mark time to the current time. Refer to [Ordering Guarantees]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-guarantees) for detail on CockroachDB's at-least-once-delivery-guarantee and how per-key message ordering is applied. Changefeed process in a 3-node cluster diff --git a/src/current/v22.2/changefeed-messages.md b/src/current/v22.2/changefeed-messages.md index 133b1b8aef3..966edd23430 100644 --- a/src/current/v22.2/changefeed-messages.md +++ b/src/current/v22.2/changefeed-messages.md @@ -6,7 +6,7 @@ docs_area: stream_data key: use-changefeeds.html --- -Changefeeds emit messages as changes happen to watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different [options](create-changefeed.html#options) (e.g., `format=avro`). +Changefeeds emit messages per row as change events happen to watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different [options](create-changefeed.html#options) (e.g., `format=avro`). This page describes the format and behavior of changefeed messages. You will find the following information on this page: @@ -53,7 +53,11 @@ See [changefeed files](create-changefeed.html#files) for more detail on the file - In most cases, each version of a row will be emitted once. However, some infrequent conditions (e.g., node failures, network partitions) will cause them to be repeated. This gives our changefeeds an **at-least-once delivery guarantee**. -- Once a row has been emitted with some timestamp, no previously unseen versions of that row will be emitted with a lower timestamp. That is, you will never see a _new_ change for that row at an earlier timestamp. +- For the first emission of a row with some timestamp, no previously unseen versions of that row will be emitted with a lower timestamp. That is, you will never see a _new_ change for that row at an earlier timestamp. + + {{site.data.alerts.callout_info}} + Under some circumstances (e.g., node restarts, retryable errors), a changefeed will emit duplicate messages of row updates. Changefeeds can emit duplicate messages in any order. + {{site.data.alerts.end}} For example, if you ran the following: diff --git a/src/current/v23.1/change-data-capture-overview.md b/src/current/v23.1/change-data-capture-overview.md index 4116691b846..26b38f7837b 100644 --- a/src/current/v23.1/change-data-capture-overview.md +++ b/src/current/v23.1/change-data-capture-overview.md @@ -19,7 +19,7 @@ The main feature of CockroachDB CDC is the _changefeed_, which targets an allowl ## Stream row-level changes with changefeeds -Changefeeds are customizable _jobs_ that track row-level changes and send data in real time in a preferred format to your specified destination, known as a _sink_. Each version of a row emitted to the sink are subject to an at-least-once delivery guarantee and are ordered by timestamp. +Changefeeds are customizable _jobs_ that track row-level changes and send data in realtime in a preferred format to your specified destination, known as a _sink_. The first emission of a row's change event to the sink is subject to an at-least-once delivery guarantee and ordered by timestamp. CockroachDB has two implementations of changefeeds: diff --git a/src/current/v23.1/changefeed-messages.md b/src/current/v23.1/changefeed-messages.md index 4260d125014..24235d35416 100644 --- a/src/current/v23.1/changefeed-messages.md +++ b/src/current/v23.1/changefeed-messages.md @@ -6,13 +6,13 @@ docs_area: stream_data key: use-changefeeds.html --- -Changefeeds emit messages as changes happen to watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different [options]({% link {{ page.version.version }}/create-changefeed.md %}#options) (e.g., `format=avro`). +Changefeeds emit messages per row as change events happen to watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different [options]({% link {{ page.version.version }}/create-changefeed.md %}#options) (e.g., `format=avro`). This page describes the format and behavior of changefeed messages. You will find the following information on this page: - [Responses](#responses): The general format of changefeed messages. - [Message envelopes](#message-envelopes): The structure of the changefeed message. -- [Ordering and delivery guarantees](#ordering-and-delivery-guarantees): CockroachDB's guarantees for a changefeed's message ordering. +- [Ordering and delivery guarantees](#ordering-and-delivery-guarantees): CockroachDB's guarantees for a changefeed's message ordering and delivery. - [Delete messages](#delete-messages): The format of messages when a row is deleted. - [Resolved messages](#resolved-messages): The resolved timestamp option and how to configure it. - [Duplicate messages](#duplicate-messages): The causes of duplicate messages from a changefeed. @@ -198,8 +198,8 @@ CREATE CHANGEFEED FOR TABLE rides INTO 'external://kafka' WITH diff, envelope=wr Changefeeds provide the following guarantees for message delivery to changefeed sinks: -- [Per-key ordering](#per-key-ordering) -- [At-least-once delivery](#at-least-once-delivery) +- [Per-key ordering](#per-key-ordering) for the first emission of an event's message. +- [At-least-once delivery](#at-least-once-delivery) per event message. {{site.data.alerts.callout_info}} Changefeeds do not support total message ordering or transactional ordering of messages. @@ -207,11 +207,13 @@ Changefeeds do not support total message ordering or transactional ordering of m ### Per-key ordering -Changefeeds provide a _per-key ordering guarantee_ for messages emitted to the sink. Once the changefeed has emitted a row with a timestamp, the changefeed will not emit any previously unseen versions of that row with a lower timestamp. Therefore, you will never receive a **new** change for that row at an earlier timestamp. +Changefeeds provide a _per-key ordering guarantee_ for the **first emission** of a message to the sink. Once the changefeed has emitted a row with a timestamp, the changefeed will not emit any previously unseen versions of that row with a lower timestamp. Therefore, you will never receive a new change for that row at an earlier timestamp. For example, a changefeed can emit updates to rows `A` at timestamp `T1`, `B` at `T2`, and `C` at `T3` in any order. -When there are updates to rows `A` at `T1`, `B` at `T2`, and `A` at `T3`, the changefeed will always emit `A` at `T3` **after** `A` at `T1`. However, `A` at `T3` could precede or follow `B` at `T2`. This is because there is no timestamp ordering between keys. +When there are updates to rows `A` at `T1`, `B` at `T2`, and `A` at `T3`, the changefeed will always emit `A` at `T3` **after** `A` at `T1`. However, `A` at `T3` could precede or follow `B` at `T2`, because there is no timestamp ordering between keys. + +Under some circumstances, a changefeed will emit [duplicate messages](#duplicate-messages) of row updates. Changefeeds can emit duplicate messages in any order. As an example, you run the following sequence of SQL statements to create a changefeed: @@ -264,7 +266,7 @@ As an example, you run the following sequence of SQL statements to create a chan {"after": {"id": 4, "name": "Danny", "office": "los angeles"}, "key": [4], "updated": "1701102561022789676.0000000000"} ~~~ - The messages received at the sink are in order by timestamp **for each key**. Here, the update for key `[1]` is emitted before the insertion of key `[2]` even though the timestamp for the update to key `[1]` is higher. That is, if you follow the sequence of updates for a particular key at the sink, they will always be in the correct timestamp order. + The messages received at the sink are in order by timestamp **for each key**. Here, the update for key `[1]` is emitted before the insertion of key `[2]` even though the timestamp for the update to key `[1]` is higher. That is, if you follow the sequence of updates for a particular key at the sink, they will be in the correct timestamp order. However, re-emitted messages (duplicates) may not be in the correct timestamp order. For details on when changefeeds might re-emit messages, refer to [Duplicate messages](#duplicate-messages). The `updated` option adds an `updated` timestamp to each emitted row. You can also use the [`resolved` option](#resolved-messages) to emit a `resolved` timestamp message to each Kafka partition, or to a separate file at a cloud storage sink. A `resolved` timestamp guarantees that no (previously unseen) rows with a lower update timestamp will be emitted on that partition. @@ -320,7 +322,11 @@ For example, the checkpoints and changefeed pauses marked in this output show ho [checkpoint] ~~~ -With duplicates removed, an individual row is emitted in the same order as the transactions that updated it. However, this is not true for updates to two different rows, even two rows in the same table. (Refer to [Per-key ordering](#per-key-ordering).) +In this example, with duplicates removed, an individual row is emitted in the same order as the transactions that updated it. However, this is not true for updates to two different rows, even two rows in the same table. (Refer to [Per-key ordering](#per-key-ordering).) + +{{site.data.alerts.callout_danger}} +The first time a message is delivered, it will be in the correct timestamp order, which follows the [per-key ordering guarantee](#per-key-ordering). However, [duplicate messages](#duplicate-messages) for an individual row may **not** be in the correct timestamp order. +{{site.data.alerts.end}} To compare two different rows for [happens-before](https://wikipedia.org/wiki/Happened-before), compare the `updated` timestamp. This works across anything in the same cluster (tables, nodes, etc.). @@ -365,7 +371,7 @@ Under some circumstances, changefeeds will emit duplicate messages to ensure the A changefeed job cannot confirm that a message has been received by the sink unless the changefeed has reached a checkpoint. As a changefeed job runs, each node will send checkpoint progress to the job's coordinator node. These progress reports allow the coordinator to update the high-water mark timestamp confirming that all changes before (or at) the timestamp have been emitted. -When a changefeed must pause and then it resumes, it will return to the last checkpoint (**A**), which is the last point at which the coordinator confirmed all changes for the given timestamp. As a result, when the changefeed resumes, it will re-emit the messages that had sent after the last checkpoint, but were not confirmed in the next checkpoint. +When a changefeed must pause and then it resumes, it will return to the last checkpoint (**A**), which is the last point at which the coordinator confirmed all changes for the given timestamp. As a result, when the changefeed resumes, it will re-emit the messages that had sent after the last checkpoint, but were not confirmed in the next checkpoint. The re-emitted messages may **not** be in [order](#per-key-ordering) by timestamp. How checkpoints will re-emit messages when a changefeed pauses. The changefeed returns to the last checkpoint and potentially sends duplicate messages. diff --git a/src/current/v23.1/how-does-an-enterprise-changefeed-work.md b/src/current/v23.1/how-does-an-enterprise-changefeed-work.md index ab2b08d5086..e235060117f 100644 --- a/src/current/v23.1/how-does-an-enterprise-changefeed-work.md +++ b/src/current/v23.1/how-does-an-enterprise-changefeed-work.md @@ -7,7 +7,7 @@ docs_area: stream_data When an {{ site.data.products.enterprise }} changefeed is started on a node, that node becomes the _coordinator_ for the changefeed job (**Node 2** in the diagram). The coordinator node acts as an administrator: keeping track of all other nodes during job execution and the changefeed work as it completes. The changefeed job will run across all nodes in the cluster to access changed data in the watched table. Typically, the [leaseholder]({% link {{ page.version.version }}/architecture/replication-layer.md %}#leases) for a particular range (or the range’s replica) determines which node emits the changefeed data. -Each node uses its aggregator processors to send back checkpoint progress to the coordinator, which gathers this information to update the high-water mark timestamp. The high-water mark acts as a checkpoint for the changefeed’s job progress, and guarantees that all changes before (or at) the timestamp have been emitted. In the unlikely event that the changefeed’s coordinating node were to fail during the job, that role will move to a different node and the changefeed will restart from the last checkpoint. If restarted, the changefeed will send duplicate messages starting at the high-water mark time to the current time. See [Ordering Guarantees]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) for detail on CockroachDB's at-least-once-delivery-guarantee as well as an explanation on how rows are emitted. +Each node uses its aggregator processors to send back checkpoint progress to the coordinator, which gathers this information to update the high-water mark timestamp. The high-water mark acts as a checkpoint for the changefeed’s job progress, and guarantees that all changes before (or at) the timestamp have been emitted. In the unlikely event that the changefeed’s coordinating node were to fail during the job, that role will move to a different node and the changefeed will restart from the last checkpoint. If restarted, the changefeed will [re-emit messages]({% link {{ page.version.version }}/changefeed-messages.md %}#duplicate-messages) starting at the high-water mark time to the current time. Refer to [Ordering Guarantees]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) for detail on CockroachDB's at-least-once-delivery-guarantee and how per-key message ordering is applied. Changefeed process in a 3-node cluster diff --git a/src/current/v23.2/change-data-capture-overview.md b/src/current/v23.2/change-data-capture-overview.md index 2fbd21bacfe..1c96626c45c 100644 --- a/src/current/v23.2/change-data-capture-overview.md +++ b/src/current/v23.2/change-data-capture-overview.md @@ -19,7 +19,7 @@ The main feature of CockroachDB CDC is the _changefeed_, which targets an allowl ## Stream row-level changes with changefeeds -Changefeeds are customizable _jobs_ that track row-level changes and send data in realtime in a preferred format to your specified destination, known as a _sink_. Each version of a row emitted to the sink are subject to an at-least-once delivery guarantee and are ordered by timestamp. +Changefeeds are customizable _jobs_ that track row-level changes and send data in realtime in a preferred format to your specified destination, known as a _sink_. The first emission of a row's change event to the sink is subject to an at-least-once delivery guarantee and ordered by timestamp. CockroachDB has two implementations of changefeeds: diff --git a/src/current/v23.2/changefeed-messages.md b/src/current/v23.2/changefeed-messages.md index 57af36ec66b..6f0cc1b4975 100644 --- a/src/current/v23.2/changefeed-messages.md +++ b/src/current/v23.2/changefeed-messages.md @@ -6,13 +6,13 @@ docs_area: stream_data key: use-changefeeds.html --- -Changefeeds emit messages as changes happen to watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different [options]({% link {{ page.version.version }}/create-changefeed.md %}#options) (e.g., `format=avro`). +Changefeeds emit messages per row as change events happen to watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different [options]({% link {{ page.version.version }}/create-changefeed.md %}#options) (e.g., `format=avro`). This page describes the format and behavior of changefeed messages. You will find the following information on this page: - [Responses](#responses): The general format of changefeed messages. - [Message envelopes](#message-envelopes): The structure of the changefeed message. -- [Ordering and delivery guarantees](#ordering-and-delivery-guarantees): CockroachDB's guarantees for a changefeed's message ordering. +- [Ordering and delivery guarantees](#ordering-and-delivery-guarantees): CockroachDB's guarantees for a changefeed's message ordering and delivery. - [Delete messages](#delete-messages): The format of messages when a row is deleted. - [Resolved messages](#resolved-messages): The resolved timestamp option and how to configure it. - [Duplicate messages](#duplicate-messages): The causes of duplicate messages from a changefeed. @@ -196,8 +196,8 @@ CREATE CHANGEFEED FOR TABLE rides INTO 'external://kafka' WITH diff, envelope=wr Changefeeds provide the following guarantees for message delivery to changefeed sinks: -- [Per-key ordering](#per-key-ordering) -- [At-least-once delivery](#at-least-once-delivery) +- [Per-key ordering](#per-key-ordering) for the first emission of an event's message. +- [At-least-once delivery](#at-least-once-delivery) per event message. {{site.data.alerts.callout_info}} Changefeeds do not support total message ordering or transactional ordering of messages. @@ -205,11 +205,13 @@ Changefeeds do not support total message ordering or transactional ordering of m ### Per-key ordering -Changefeeds provide a _per-key ordering guarantee_ for messages emitted to the sink. Once the changefeed has emitted a row with a timestamp, the changefeed will not emit any previously unseen versions of that row with a lower timestamp. Therefore, you will never receive a **new** change for that row at an earlier timestamp. +Changefeeds provide a _per-key ordering guarantee_ for the **first emission** of a message to the sink. Once the changefeed has emitted a row with a timestamp, the changefeed will not emit any previously unseen versions of that row with a lower timestamp. Therefore, you will never receive a new change for that row at an earlier timestamp. For example, a changefeed can emit updates to rows `A` at timestamp `T1`, `B` at `T2`, and `C` at `T3` in any order. -When there are updates to rows `A` at `T1`, `B` at `T2`, and `A` at `T3`, the changefeed will always emit `A` at `T3` **after** `A` at `T1`. However, `A` at `T3` could precede or follow `B` at `T2`. This is because there is no timestamp ordering between keys. +When there are updates to rows `A` at `T1`, `B` at `T2`, and `A` at `T3`, the changefeed will always emit `A` at `T3` **after** `A` at `T1`. However, `A` at `T3` could precede or follow `B` at `T2`, because there is no timestamp ordering between keys. + +Under some circumstances, a changefeed will emit [duplicate messages](#duplicate-messages) of row updates. Changefeeds can emit duplicate messages in any order. As an example, you run the following sequence of SQL statements to create a changefeed: @@ -262,7 +264,7 @@ As an example, you run the following sequence of SQL statements to create a chan {"after": {"id": 4, "name": "Danny", "office": "los angeles"}, "key": [4], "updated": "1701102561022789676.0000000000"} ~~~ - The messages received at the sink are in order by timestamp **for each key**. Here, the update for key `[1]` is emitted before the insertion of key `[2]` even though the timestamp for the update to key `[1]` is higher. That is, if you follow the sequence of updates for a particular key at the sink, they will always be in the correct timestamp order. + The messages received at the sink are in order by timestamp **for each key**. Here, the update for key `[1]` is emitted before the insertion of key `[2]` even though the timestamp for the update to key `[1]` is higher. That is, if you follow the sequence of updates for a particular key at the sink, they will be in the correct timestamp order. However, re-emitted messages (duplicates) may not be in the correct timestamp order. For details on when changefeeds might re-emit messages, refer to [Duplicate messages](#duplicate-messages). The `updated` option adds an `updated` timestamp to each emitted row. You can also use the [`resolved` option](#resolved-messages) to emit a `resolved` timestamp message to each Kafka partition, or to a separate file at a cloud storage sink. A `resolved` timestamp guarantees that no (previously unseen) rows with a lower update timestamp will be emitted on that partition. @@ -318,7 +320,11 @@ For example, the checkpoints and changefeed pauses marked in this output show ho [checkpoint] ~~~ -With duplicates removed, an individual row is emitted in the same order as the transactions that updated it. However, this is not true for updates to two different rows, even two rows in the same table. (Refer to [Per-key ordering](#per-key-ordering).) +In this example, with duplicates removed, an individual row is emitted in the same order as the transactions that updated it. However, this is not true for updates to two different rows, even two rows in the same table. (Refer to [Per-key ordering](#per-key-ordering).) + +{{site.data.alerts.callout_danger}} +The first time a message is delivered, it will be in the correct timestamp order, which follows the [per-key ordering guarantee](#per-key-ordering). However, [duplicate messages](#duplicate-messages) for an individual row may **not** be in the correct timestamp order. +{{site.data.alerts.end}} To compare two different rows for [happens-before](https://wikipedia.org/wiki/Happened-before), compare the `updated` timestamp. This works across anything in the same cluster (tables, nodes, etc.). @@ -363,7 +369,7 @@ Under some circumstances, changefeeds will emit duplicate messages to ensure the A changefeed job cannot confirm that a message has been received by the sink unless the changefeed has reached a checkpoint. As a changefeed job runs, each node will send checkpoint progress to the job's coordinator node. These progress reports allow the coordinator to update the high-water mark timestamp confirming that all changes before (or at) the timestamp have been emitted. -When a changefeed must pause and then it resumes, it will return to the last checkpoint (**A**), which is the last point at which the coordinator confirmed all changes for the given timestamp. As a result, when the changefeed resumes, it will re-emit the messages that had sent after the last checkpoint, but were not confirmed in the next checkpoint. +When a changefeed must pause and then it resumes, it will return to the last checkpoint (**A**), which is the last point at which the coordinator confirmed all changes for the given timestamp. As a result, when the changefeed resumes, it will re-emit the messages that had sent after the last checkpoint, but were not confirmed in the next checkpoint. The re-emitted messages may **not** be in [order](#per-key-ordering) by timestamp. How checkpoints will re-emit messages when a changefeed pauses. The changefeed returns to the last checkpoint and potentially sends duplicate messages. diff --git a/src/current/v23.2/how-does-an-enterprise-changefeed-work.md b/src/current/v23.2/how-does-an-enterprise-changefeed-work.md index 8cfa657b230..8d8312aef27 100644 --- a/src/current/v23.2/how-does-an-enterprise-changefeed-work.md +++ b/src/current/v23.2/how-does-an-enterprise-changefeed-work.md @@ -7,7 +7,7 @@ docs_area: stream_data When an {{ site.data.products.enterprise }} changefeed is started on a node, that node becomes the _coordinator_ for the changefeed job (**Node 2** in the diagram). The coordinator node acts as an administrator: keeping track of all other nodes during job execution and the changefeed work as it completes. The changefeed job will run across all nodes in the cluster to access changed data in the watched table. Typically, the [leaseholder]({% link {{ page.version.version }}/architecture/replication-layer.md %}#leases) for a particular range (or the range’s replica) determines which node emits the changefeed data. -Each node uses its aggregator processors to send back checkpoint progress to the coordinator, which gathers this information to update the high-water mark timestamp. The high-water mark acts as a checkpoint for the changefeed’s job progress, and guarantees that all changes before (or at) the timestamp have been emitted. In the unlikely event that the changefeed’s coordinating node were to fail during the job, that role will move to a different node and the changefeed will restart from the last checkpoint. If restarted, the changefeed will send duplicate messages starting at the high-water mark time to the current time. See [Ordering Guarantees]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) for detail on CockroachDB's at-least-once-delivery-guarantee as well as an explanation on how rows are emitted. +Each node uses its aggregator processors to send back checkpoint progress to the coordinator, which gathers this information to update the high-water mark timestamp. The high-water mark acts as a checkpoint for the changefeed’s job progress, and guarantees that all changes before (or at) the timestamp have been emitted. In the unlikely event that the changefeed’s coordinating node were to fail during the job, that role will move to a different node and the changefeed will restart from the last checkpoint. If restarted, the changefeed will [re-emit messages]({% link {{ page.version.version }}/changefeed-messages.md %}#duplicate-messages) starting at the high-water mark time to the current time. Refer to [Ordering Guarantees]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) for detail on CockroachDB's at-least-once-delivery-guarantee and how per-key message ordering is applied. Changefeed process in a 3-node cluster diff --git a/src/current/v24.1/change-data-capture-overview.md b/src/current/v24.1/change-data-capture-overview.md index a5027cd5b38..1e65eac8373 100644 --- a/src/current/v24.1/change-data-capture-overview.md +++ b/src/current/v24.1/change-data-capture-overview.md @@ -19,7 +19,7 @@ The main feature of CockroachDB CDC is the _changefeed_, which targets an allowl ## Stream row-level changes with changefeeds -Changefeeds are customizable _jobs_ that track row-level changes and send data in realtime in a preferred format to your specified destination, known as a _sink_. Each version of a row emitted to the sink are subject to an at-least-once delivery guarantee and are ordered by timestamp. +Changefeeds are customizable _jobs_ that track row-level changes and send data in realtime in a preferred format to your specified destination, known as a _sink_. The first emission of a row's change event to the sink is subject to an at-least-once delivery guarantee and ordered by timestamp. CockroachDB has two implementations of changefeeds: diff --git a/src/current/v24.1/changefeed-messages.md b/src/current/v24.1/changefeed-messages.md index 67492e0b81e..9a62cf7324d 100644 --- a/src/current/v24.1/changefeed-messages.md +++ b/src/current/v24.1/changefeed-messages.md @@ -6,13 +6,13 @@ docs_area: stream_data key: use-changefeeds.html --- -Changefeeds emit messages as changes happen to watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different [options]({% link {{ page.version.version }}/create-changefeed.md %}#options) (e.g., `format=avro`). +Changefeeds emit messages per row as change events happen to watched tables. CockroachDB changefeeds have an at-least-once delivery guarantee as well as message ordering guarantees. You can also configure the format of changefeed messages with different [options]({% link {{ page.version.version }}/create-changefeed.md %}#options) (e.g., `format=avro`). This page describes the format and behavior of changefeed messages. You will find the following information on this page: - [Responses](#responses): The general format of changefeed messages. - [Message envelopes](#message-envelopes): The structure of the changefeed message. -- [Ordering and delivery guarantees](#ordering-and-delivery-guarantees): CockroachDB's guarantees for a changefeed's message ordering. +- [Ordering and delivery guarantees](#ordering-and-delivery-guarantees): CockroachDB's guarantees for a changefeed's message ordering and delivery. - [Delete messages](#delete-messages): The format of messages when a row is deleted. - [Resolved messages](#resolved-messages): The resolved timestamp option and how to configure it. - [Duplicate messages](#duplicate-messages): The causes of duplicate messages from a changefeed. @@ -196,8 +196,8 @@ CREATE CHANGEFEED FOR TABLE rides INTO 'external://kafka' WITH diff, envelope=wr Changefeeds provide the following guarantees for message delivery to changefeed sinks: -- [Per-key ordering](#per-key-ordering) -- [At-least-once delivery](#at-least-once-delivery) +- [Per-key ordering](#per-key-ordering) for the first emission of an event's message. +- [At-least-once delivery](#at-least-once-delivery) per event message. {{site.data.alerts.callout_info}} Changefeeds do not support total message ordering or transactional ordering of messages. @@ -205,11 +205,13 @@ Changefeeds do not support total message ordering or transactional ordering of m ### Per-key ordering -Changefeeds provide a _per-key ordering guarantee_ for messages emitted to the sink. Once the changefeed has emitted a row with a timestamp, the changefeed will not emit any previously unseen versions of that row with a lower timestamp. Therefore, you will never receive a **new** change for that row at an earlier timestamp. +Changefeeds provide a _per-key ordering guarantee_ for the **first emission** of a message to the sink. Once the changefeed has emitted a row with a timestamp, the changefeed will not emit any previously unseen versions of that row with a lower timestamp. Therefore, you will never receive a new change for that row at an earlier timestamp. For example, a changefeed can emit updates to rows `A` at timestamp `T1`, `B` at `T2`, and `C` at `T3` in any order. -When there are updates to rows `A` at `T1`, `B` at `T2`, and `A` at `T3`, the changefeed will always emit `A` at `T3` **after** `A` at `T1`. However, `A` at `T3` could precede or follow `B` at `T2`. This is because there is no timestamp ordering between keys. +When there are updates to rows `A` at `T1`, `B` at `T2`, and `A` at `T3`, the changefeed will always emit `A` at `T3` **after** `A` at `T1`. However, `A` at `T3` could precede or follow `B` at `T2`, because there is no timestamp ordering between keys. + +Under some circumstances, a changefeed will emit [duplicate messages](#duplicate-messages) of row updates. Changefeeds can emit duplicate messages in any order. As an example, you run the following sequence of SQL statements to create a changefeed: @@ -262,7 +264,7 @@ As an example, you run the following sequence of SQL statements to create a chan {"after": {"id": 4, "name": "Danny", "office": "los angeles"}, "key": [4], "updated": "1701102561022789676.0000000000"} ~~~ - The messages received at the sink are in order by timestamp **for each key**. Here, the update for key `[1]` is emitted before the insertion of key `[2]` even though the timestamp for the update to key `[1]` is higher. That is, if you follow the sequence of updates for a particular key at the sink, they will always be in the correct timestamp order. + The messages received at the sink are in order by timestamp **for each key**. Here, the update for key `[1]` is emitted before the insertion of key `[2]` even though the timestamp for the update to key `[1]` is higher. That is, if you follow the sequence of updates for a particular key at the sink, they will be in the correct timestamp order. However, re-emitted messages (duplicates) may not be in the correct timestamp order. For details on when changefeeds might re-emit messages, refer to [Duplicate messages](#duplicate-messages). The `updated` option adds an `updated` timestamp to each emitted row. You can also use the [`resolved` option](#resolved-messages) to emit a `resolved` timestamp message to each Kafka partition, or to a separate file at a cloud storage sink. A `resolved` timestamp guarantees that no (previously unseen) rows with a lower update timestamp will be emitted on that partition. @@ -318,7 +320,11 @@ For example, the checkpoints and changefeed pauses marked in this output show ho [checkpoint] ~~~ -With duplicates removed, an individual row is emitted in the same order as the transactions that updated it. However, this is not true for updates to two different rows, even two rows in the same table. (Refer to [Per-key ordering](#per-key-ordering).) +In this example, with duplicates removed, an individual row is emitted in the same order as the transactions that updated it. However, this is not true for updates to two different rows, even two rows in the same table. (Refer to [Per-key ordering](#per-key-ordering).) + +{{site.data.alerts.callout_danger}} +The first time a message is delivered, it will be in the correct timestamp order, which follows the [per-key ordering guarantee](#per-key-ordering). However, [duplicate messages](#duplicate-messages) for an individual row may **not** be in the correct timestamp order. +{{site.data.alerts.end}} To compare two different rows for [happens-before](https://wikipedia.org/wiki/Happened-before), compare the `updated` timestamp. This works across anything in the same cluster (tables, nodes, etc.). @@ -363,7 +369,7 @@ Under some circumstances, changefeeds will emit duplicate messages to ensure the A changefeed job cannot confirm that a message has been received by the sink unless the changefeed has reached a checkpoint. As a changefeed job runs, each node will send checkpoint progress to the job's coordinator node. These progress reports allow the coordinator to update the high-water mark timestamp confirming that all changes before (or at) the timestamp have been emitted. -When a changefeed must pause and then it resumes, it will return to the last checkpoint (**A**), which is the last point at which the coordinator confirmed all changes for the given timestamp. As a result, when the changefeed resumes, it will re-emit the messages that had sent after the last checkpoint, but were not confirmed in the next checkpoint. +When a changefeed must pause and then it resumes, it will return to the last checkpoint (**A**), which is the last point at which the coordinator confirmed all changes for the given timestamp. As a result, when the changefeed resumes, it will re-emit the messages that had sent after the last checkpoint, but were not confirmed in the next checkpoint. The re-emitted messages may **not** be in [order](#per-key-ordering) by timestamp. How checkpoints will re-emit messages when a changefeed pauses. The changefeed returns to the last checkpoint and potentially sends duplicate messages. diff --git a/src/current/v24.1/how-does-an-enterprise-changefeed-work.md b/src/current/v24.1/how-does-an-enterprise-changefeed-work.md index c6da1c53777..b0da0ca3af8 100644 --- a/src/current/v24.1/how-does-an-enterprise-changefeed-work.md +++ b/src/current/v24.1/how-does-an-enterprise-changefeed-work.md @@ -7,7 +7,7 @@ docs_area: stream_data When an {{ site.data.products.enterprise }} changefeed is started on a node, that node becomes the _coordinator_ for the changefeed job (**Node 2** in the diagram). The coordinator node acts as an administrator: keeping track of all other nodes during job execution and the changefeed work as it completes. The changefeed job will run across all nodes in the cluster to access changed data in the watched table. Typically, the [leaseholder]({% link {{ page.version.version }}/architecture/replication-layer.md %}#leases) for a particular range (or the range’s replica) determines which node emits the changefeed data. -Each node uses its aggregator processors to send back checkpoint progress to the coordinator, which gathers this information to update the high-water mark timestamp. The high-water mark acts as a checkpoint for the changefeed’s job progress, and guarantees that all changes before (or at) the timestamp have been emitted. In the unlikely event that the changefeed’s coordinating node were to fail during the job, that role will move to a different node and the changefeed will restart from the last checkpoint. If restarted, the changefeed will send duplicate messages starting at the high-water mark time to the current time. See [Ordering Guarantees]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) for detail on CockroachDB's at-least-once-delivery-guarantee as well as an explanation on how rows are emitted. +Each node uses its aggregator processors to send back checkpoint progress to the coordinator, which gathers this information to update the high-water mark timestamp. The high-water mark acts as a checkpoint for the changefeed’s job progress, and guarantees that all changes before (or at) the timestamp have been emitted. In the unlikely event that the changefeed’s coordinating node were to fail during the job, that role will move to a different node and the changefeed will restart from the last checkpoint. If restarted, the changefeed will [re-emit messages]({% link {{ page.version.version }}/changefeed-messages.md %}#duplicate-messages) starting at the high-water mark time to the current time. Refer to [Ordering Guarantees]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) for detail on CockroachDB's at-least-once-delivery-guarantee and how per-key message ordering is applied. Changefeed process in a 3-node cluster