Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: make mq encoding protocol related more clear (#19156) #19301

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 54 additions & 31 deletions ticdc/ticdc-avro-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ summary: Learn the concept of TiCDC Avro Protocol and how to use it.

# TiCDC Avro Protocol

Avro is a data exchange format protocol defined by [Apache Avro™](https://avro.apache.org/) and chosen by [Confluent Platform](https://docs.confluent.io/platform/current/platform.html) as the default data exchange format. This document describes the implementation of the Avro data format in TiCDC, including TiDB extension fields, definition of the Avro data format, and the interaction between Avro and [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html).
TiCDC Avro protocol is a third-party implementation of the [Confluent Avro](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html) data exchange protocol defined by [Confluent Platform](https://docs.confluent.io/platform/current/platform.html). Avro is a data format defined by [Apache Avro™](https://avro.apache.org/)

This document describes how TiCDC implements the Confluent Avro protocol, and the interaction between Avro and [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html).

> **Warning:**
>
Expand All @@ -15,8 +17,6 @@ Avro is a data exchange format protocol defined by [Apache Avro™](https://avro

## Use Avro

When using Message Queue (MQ) as a downstream sink, you can specify Avro in `sink-uri`. TiCDC captures TiDB DML events, creates Avro messages from these events, and sends the messages downstream. When Avro detects a schema change, it registers the latest schema with Schema Registry.

The following is a configuration example using Avro:

{{< copyable "shell-regular" >}}
Expand All @@ -25,30 +25,11 @@ The following is a configuration example using Avro:
cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
```

```shell
[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]
```

The value of `--schema-registry` supports the `https` protocol and `username:password` authentication, for example, `--schema-registry=https://username:[email protected]`. The username and password must be URL-encoded.
The value of `--schema-registry` supports the `https` protocol and `username:password` authentication. The username and password must be URL-encoded. For example, `--schema-registry=https://username:[email protected]`.

## TiDB extension fields

By default, Avro only collects data of changed rows in DML events and does not collect the type of data changes or TiDB-specific CommitTS (the unique identifiers of transactions). To address this issue, TiCDC introduces the following three TiDB extension fields to the Avro protocol message. When `enable-tidb-extension` is set to `true` (`false` by default) in `sink-uri`, TiCDC adds these three fields to the Avro messages during message generation.

- `_tidb_op`: The DML type. "c" indicates insert and "u" indicates updates.
- `_tidb_commit_ts`: The unique identifier of a transaction.
- `_tidb_commit_physical_time`: The physical timestamp in a transaction identifier.

The following is a configuration example:

{{< copyable "shell-regular" >}}

```shell
cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro-enable-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
```
> **Note:**
>
> When using the Avro protocol, one Kafka topic can only contain data for one table. You need to configure the [Topic dispatcher](/ticdc/ticdc-sink-to-kafka.md#topic-dispatchers) in the configuration file.

```shell
[sink]
Expand Down Expand Up @@ -95,7 +76,36 @@ The `fields` in the key contains only primary key columns or unique index column
}
```

The data format of Value is the same as that of Key, by default. However, `fields` in the Value contains all columns, not just the primary key columns.
The data format of Value is the same as that of Key, by default. However, `fields` in the Value contains all columns.

> **Note:**
>
> The Avro protocol encodes DML events as follows:
>
> - For Delete events, Avro only encodes the Key part. The Value part is empty.
> - For Insert events, Avro encodes all column data to the Value part.
> - For Update events, Avro encodes only all column data that is updated to the Value part.

## TiDB extension fields

By default, Avro only collects data of changed rows in DML events and does not collect the type of data changes or TiDB-specific CommitTS (the unique identifiers of transactions). To address this issue, TiCDC introduces the following three TiDB extension fields to the Avro protocol message. When `enable-tidb-extension` is set to `true` (`false` by default) in `sink-uri`, TiCDC adds these three fields to the Avro messages during message generation.

- `_tidb_op`: The DML type. "c" indicates insert and "u" indicates updates.
- `_tidb_commit_ts`: The unique identifier of a transaction.
- `_tidb_commit_physical_time`: The physical timestamp in a transaction identifier.

The following is a configuration example:

```shell
cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro-enable-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
```

```shell
[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]
```

After you enable [`enable-tidb-extension`](#tidb-extension-fields), the data format of the Value will be as follows:

Expand Down Expand Up @@ -161,7 +171,7 @@ If one column can be NULL, the Column data format can be:

- `{{ColumnName}}` indicates the column name.
- `{{TIDB_TYPE}}` indicates the type in TiDB, which is not a one-to-one mapping with the SQL type.
- `{{AVRO_TYPE}}` indicates the type in [avro spec](https://avro.apache.org/docs/current/spec.html).
- `{{AVRO_TYPE}}` indicates the type in the [Avro Specification](https://avro.apache.org/docs/++version++/specification).

| SQL TYPE | TIDB_TYPE | AVRO_TYPE | Description |
|------------|-----------|-----------|---------------------------------------------------------------------------------------------------------------------------|
Expand Down Expand Up @@ -274,14 +284,27 @@ DECIMAL(10, 4)

## DDL events and schema changes

Avro does not generate DDL events downstream. It checks whether a schema changes each time a DML event occurs. If a schema changes, Avro generates a new schema and registers it with the Schema Registry. If the schema change does not pass the compatibility check, the registration fails. TiCDC does not resolve any schema compatibility issues.
Avro does not send DDL events and Watermark events to downstream. It checks whether a schema changes each time a DML event occurs. If a schema changes, Avro generates a new schema and registers it with the Schema Registry. If the schema change does not pass the compatibility check, the registration fails. TiCDC does not resolve any schema compatibility issues.

Note that, even if a schema change passes the compatibility check and a new version is registered, the data producers and consumers still need to perform an upgrade to ensure normal running of the system.
For example, the default [compatibility policy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html#compatibility-types) of Confluent Schema Registry is `BACKWARD`, and you add a non-empty column to the source table. In this situation, Avro generates a new schema but fails to register it with Schema Registry due to compatibility issues. At this time, the changefeed enters an error state.

Assume that the default compatibility policy of Confluent Schema Registry is `BACKWARD` and add a non-empty column to the source table. In this situation, Avro generates a new schema but fails to register it with Schema Registry due to compatibility issues. At this time, the changefeed enters an error state.
Note that, even if a schema change passes the compatibility check and a new version is registered, the data producers and consumers still need to acquire the new schema for data encoding and decoding.

For more information about schemas, refer to [Schema Registry related documents](https://docs.confluent.io/platform/current/schema-registry/avro.html).

## Consumer implementation

The TiCDC Avro protocol can be deserialized by [`io.confluent.kafka.serializers.KafkaAvroDeserializer`](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html#avro-deserializer).

The consumer program can acquire the latest schema via [Schema Registry API](https://docs.confluent.io/platform/current/schema-registry/develop/api.html), and then use the schema to deserialize the data encoded by the TiCDC Avro protocol.

### Distinguish event types

The consumer program can distinguish DML event types by the following rules:

* If there is only the Key part, it is a Delete event.
* If there are both Key and Value parts, it is either an Insert or an Update event. If the [TiDB extension fields](#tidb-extension-fields) are enabled, you can use the `_tidb_op` field to identify if it is an Insert or Update event. If the TiDB extension fields are not enabled, you cannot distinguish them.

## Topic distribution

Schema Registry supports three [Subject Name Strategies](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy): TopicNameStrategy, RecordNameStrategy, and TopicRecordNameStrategy. Currently, TiCDC Avro only supports TopicNameStrategy, which means that a Kafka topic can only receive data in one data format. Therefore, TiCDC Avro prohibits mapping multiple tables to the same topic. When you create a changefeed, an error will be reported if the topic rule does not include the `{schema}` and `{table}` placeholders in the configured distribution rule.
Expand Down
36 changes: 34 additions & 2 deletions ticdc/ticdc-sink-to-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,19 @@ The following are descriptions of sink URI parameters and values that can be con
| `127.0.0.1` | The IP address of the downstream Kafka services. |
| `9092` | The port for the downstream Kafka. |
| `topic-name` | Variable. The name of the Kafka topic. |
| `protocol` | The protocol with which messages are output to Kafka. The value options are [`canal-json`](/ticdc/ticdc-canal-json.md)、[`open-protocol`](/ticdc/ticdc-open-protocol.md)、[`avro`](/ticdc/ticdc-avro-protocol.md)、[`debezium`](/ticdc/ticdc-debezium.md) and [`simple`](/ticdc/ticdc-simple-protocol.md). |
| `kafka-version` | The version of the downstream Kafka. This value needs to be consistent with the actual version of the downstream Kafka. |
| `kafka-client-id` | Specifies the Kafka client ID of the replication task (optional. `TiCDC_sarama_producer_replication ID` by default). |
| `partition-num` | The number of the downstream Kafka partitions (optional. The value must be **no greater than** the actual number of partitions; otherwise, the replication task cannot be created successfully. `3` by default). |
| `max-message-bytes` | The maximum size of data that is sent to Kafka broker each time (optional, `10MB` by default). From v5.0.6 and v4.0.6, the default value has changed from `64MB` and `256MB` to `10MB`. |
| `replication-factor` | The number of Kafka message replicas that can be saved (optional, `1` by default). This value must be greater than or equal to the value of [`min.insync.replicas`](https://kafka.apache.org/33/documentation.html#brokerconfigs_min.insync.replicas) in Kafka. |
<<<<<<< HEAD
| `compression` | The compression algorithm used when sending messages (value options are `none`, `lz4`, `gzip`, `snappy`, and `zstd`; `none` by default). |
| `protocol` | The protocol with which messages are output to Kafka. The value options are `canal-json`, `open-protocol`, and `avro`. |
=======
| `required-acks` | A parameter used in the `Produce` request, which notifies the broker of the number of replica acknowledgements it needs to receive before responding. Value options are `0` (`NoResponse`: no response, only `TCP ACK` is provided), `1` (`WaitForLocal`: responds only after local commits are submitted successfully), and `-1` (`WaitForAll`: responds after all replicated replicas are committed successfully. You can configure the minimum number of replicated replicas using the [`min.insync.replicas`](https://kafka.apache.org/33/documentation.html#brokerconfigs_min.insync.replicas) configuration item of the broker). (Optional, the default value is `-1`). |
| `compression` | The compression algorithm used when sending messages (value options are `none`, `lz4`, `gzip`, `snappy`, and `zstd`; `none` by default). Note that the Snappy compressed file must be in the [official Snappy format](https://github.com/google/snappy). Other variants of Snappy compression are not supported.|
>>>>>>> 02c320dc5d (cdc: make mq encoding protocol related more clear (#19156))
| `auto-create-topic` | Determines whether TiCDC creates the topic automatically when the `topic-name` passed in does not exist in the Kafka cluster (optional, `true` by default). |
| `enable-tidb-extension` | Optional. `false` by default. When the output protocol is `canal-json`, if the value is `true`, TiCDC sends [WATERMARK events](/ticdc/ticdc-canal-json.md#watermark-event) and adds the [TiDB extension field](/ticdc/ticdc-canal-json.md#tidb-extension-field) to Kafka messages. From v6.1.0, this parameter is also applicable to the `avro` protocol. If the value is `true`, TiCDC adds [three TiDB extension fields](/ticdc/ticdc-avro-protocol.md#tidb-extension-fields) to the Kafka message. |
| `max-batch-size` | New in v4.0.9. If the message protocol supports outputting multiple data changes to one Kafka message, this parameter specifies the maximum number of data changes in one Kafka message. It currently takes effect only when Kafka's `protocol` is `open-protocol` (optional, `16` by default). |
Expand Down Expand Up @@ -105,7 +111,8 @@ The following are descriptions of sink URI parameters and values that can be con

> **Note:**
>
> When `protocol` is `open-protocol`, TiCDC tries to avoid generating messages that exceed `max-message-bytes` in length. However, if a row is so large that a single change alone exceeds `max-message-bytes` in length, to avoid silent failure, TiCDC tries to output this message and prints a warning in the log.
> When `protocol` is `open-protocol`, TiCDC encodes multiple events into one Kafka message and avoids generating messages that exceed the length specified by `max-message-bytes`.
> If the encoded result of a single row change event exceeds the value of `max-message-bytes`, the changefeed reports an error and prints logs.

### TiCDC uses the authentication and authorization of Kafka

Expand Down Expand Up @@ -152,7 +159,7 @@ The following are examples when using Kafka SASL authentication:

### Integrate TiCDC with Kafka Connect (Confluent Platform)

To use the [data connectors](https://docs.confluent.io/current/connect/managing/connectors.html) provided by Confluent to stream data to relational or non-relational databases, you need to use the `avro` protocol and provide a URL for [Confluent Schema Registry](https://www.confluent.io/product/confluent-platform/data-compatibility/) in `schema-registry`.
To use the [data connectors](https://docs.confluent.io/current/connect/managing/connectors.html) provided by Confluent to stream data to relational or non-relational databases, you need to use the [`avro` protocol](/ticdc/ticdc-avro-protocol.md) and provide a URL for [Confluent Schema Registry](https://www.confluent.io/product/confluent-platform/data-compatibility/) in `schema-registry`.

Sample configuration:

Expand All @@ -169,6 +176,31 @@ dispatchers = [

For detailed integration guide, see [Quick Start Guide on Integrating TiDB with Confluent Platform](/ticdc/integrate-confluent-using-ticdc.md).

<<<<<<< HEAD
=======
### Integrate TiCDC with AWS Glue Schema Registry

Starting from v7.4.0, TiCDC supports using the [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) as the Schema Registry when users choose the [Avro protocol](/ticdc/ticdc-avro-protocol.md) for data replication. The configuration example is as follows:

```shell
./cdc cli changefeed create --server=127.0.0.1:8300 --changefeed-id="kafka-glue-test" --sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --config changefeed_glue.toml
```

```toml
[sink]
[sink.kafka-config.glue-schema-registry-config]
region="us-west-1"
registry-name="ticdc-test"
access-key="xxxx"
secret-access-key="xxxx"
token="xxxx"
```

In the above configuration, `region` and `registry-name` are required fields, while `access-key`, `secret-access-key`, and `token` are optional fields. The best practice is to set the AWS credentials as environment variables or store them in the `~/.aws/credentials` file instead of setting them in the changefeed configuration file.

For more information, refer to the [official AWS SDK for Go V2 documentation](https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials).

>>>>>>> 02c320dc5d (cdc: make mq encoding protocol related more clear (#19156))
## Customize the rules for Topic and Partition dispatchers of Kafka Sink

### Matcher rules
Expand Down
Loading