diff --git a/TOC-tidb-cloud.md b/TOC-tidb-cloud.md index d346ee3230748..ea906ec0edb8e 100644 --- a/TOC-tidb-cloud.md +++ b/TOC-tidb-cloud.md @@ -287,6 +287,9 @@ - [To Kafka Sink](/tidb-cloud/changefeed-sink-to-apache-kafka.md) - [To TiDB Cloud Sink](/tidb-cloud/changefeed-sink-to-tidb-cloud.md) - [To Cloud Storage](/tidb-cloud/changefeed-sink-to-cloud-storage.md) + - Reference + - [Set Up Self-Hosted Kafka Private Link Service in AWS](/tidb-cloud/setup-self-hosted-kafka-private-link-service.md) + - [Set Up Self-Hosted Kafka Private Service Connect in Google Cloud](/tidb-cloud/setup-self-hosted-kafka-private-service-connect.md) - Disaster Recovery - [Recovery Group Overview](/tidb-cloud/recovery-group-overview.md) - [Get Started](/tidb-cloud/recovery-group-get-started.md) diff --git a/media/tidb-cloud/changefeed/connect-to-aws-self-hosted-kafka-privatelink-service.jpeg b/media/tidb-cloud/changefeed/connect-to-aws-self-hosted-kafka-privatelink-service.jpeg new file mode 100644 index 0000000000000..d0d5fa35a0555 Binary files /dev/null and b/media/tidb-cloud/changefeed/connect-to-aws-self-hosted-kafka-privatelink-service.jpeg differ diff --git a/media/tidb-cloud/changefeed/connect-to-google-cloud-self-hosted-kafka-private-service-connect-by-kafka-proxy.jpeg b/media/tidb-cloud/changefeed/connect-to-google-cloud-self-hosted-kafka-private-service-connect-by-kafka-proxy.jpeg new file mode 100644 index 0000000000000..290ee89bde5e3 Binary files /dev/null and b/media/tidb-cloud/changefeed/connect-to-google-cloud-self-hosted-kafka-private-service-connect-by-kafka-proxy.jpeg differ diff --git a/media/tidb-cloud/changefeed/connect-to-google-cloud-self-hosted-kafka-private-service-connect-by-portmapping.jpeg b/media/tidb-cloud/changefeed/connect-to-google-cloud-self-hosted-kafka-private-service-connect-by-portmapping.jpeg new file mode 100644 index 0000000000000..d0b3595663820 Binary files /dev/null and b/media/tidb-cloud/changefeed/connect-to-google-cloud-self-hosted-kafka-private-service-connect-by-portmapping.jpeg differ diff --git a/tidb-cloud/changefeed-sink-to-apache-kafka.md b/tidb-cloud/changefeed-sink-to-apache-kafka.md index 5c6f6dd8bed15..3d575b47eb7fa 100644 --- a/tidb-cloud/changefeed-sink-to-apache-kafka.md +++ b/tidb-cloud/changefeed-sink-to-apache-kafka.md @@ -18,6 +18,17 @@ This document describes how to create a changefeed to stream data from TiDB Clou - Currently, TiDB Cloud does not support uploading self-signed TLS certificates to connect to Kafka brokers. - Because TiDB Cloud uses TiCDC to establish changefeeds, it has the same [restrictions as TiCDC](https://docs.pingcap.com/tidb/stable/ticdc-overview#unsupported-scenarios). - If the table to be replicated does not have a primary key or a non-null unique index, the absence of a unique constraint during replication could result in duplicated data being inserted downstream in some retry scenarios. +- If you choose Private Link or Private Service Connect as the network connectivity method, ensure that your TiDB cluster version meets the following requirements: + + - For v6.5.x: version v6.5.9 or later + - For v7.1.x: version v7.1.4 or later + - For v7.5.x: version v7.5.1 or later + - For v8.1.x: all versions of v8.1.x and later are supported +- If you want to use Debezium as your data format, make sure the version of your TiDB cluster is v8.1.0 or later. +- For the partition distribution of Kafka messages, note the following: + + - If you want to distribute changelogs by primary key or index value to Kafka partition with a specified index name, make sure the version of your TiDB cluster is v7.5.0 or later. + - If you want to distribute changelogs by column value to Kafka partition, make sure the version of your TiDB cluster is v7.5.0 or later. ## Prerequisites @@ -28,7 +39,33 @@ Before creating a changefeed to stream data to Apache Kafka, you need to complet ### Network -Make sure that your TiDB cluster can connect to the Apache Kafka service. +Ensure that your TiDB cluster can connect to the Apache Kafka service. You can choose one of the following connection methods: + +- Private Connect (Beta): ideal for avoiding VPC CIDR conflicts and meeting security compliance, but incurs additional [Private Data Link Cost](/tidb-cloud/tidb-cloud-billing-ticdc-rcu.md#private-data-link-cost). +- VPC Peering: suitable as a cost-effective option, but requires managing potential VPC CIDR conflicts and security considerations. +- Public IP: suitable for a quick setup. + + +
+ +Private Connect leverages **Private Link** or **Private Service Connect** technologies from cloud providers to enable resources in your VPC to connect to services in other VPCs using private IP addresses, as if those services were hosted directly within your VPC. + +Currently, TiDB Cloud supports Private Connect for generic Kafka only. It does not include special integration with MSK, Confluent Kafka, or other services. + +- If your Apache Kafka service is hosted in AWS, follow [Set Up Self-Hosted Kafka Private Link Service in AWS](/tidb-cloud/setup-self-hosted-kafka-private-link-service.md) to ensure that the network connection is properly configured. After setup, provide the following information in the TiDB Cloud console to create the changefeed: + + - The ID in Kafka Advertised Listener Pattern + - The Endpoint Service Name + - The Bootstrap Ports + +- If your Apache Kafka service is hosted in Google Cloud, follow [Set Up Self-Hosted Kafka Private Service Connect in Google Cloud](/tidb-cloud/setup-self-hosted-kafka-private-service-connect.md) to ensure that the network connection is properly configured. After setup, provide the following information in the TiDB Cloud console to create the changefeed: + + - The ID in Kafka Advertised Listener Pattern + - The Service Attachment + - The Bootstrap Ports + +
+
If your Apache Kafka service is in an AWS VPC that has no internet access, take the following steps: @@ -39,7 +76,7 @@ If your Apache Kafka service is in an AWS VPC that has no internet access, take 3. If the Apache Kafka URL contains hostnames, you need to allow TiDB Cloud to be able to resolve the DNS hostnames of the Apache Kafka brokers. - 1. Follow the steps in [Enable DNS resolution for a VPC peering connection](https://docs.aws.amazon.com/vpc/latest/peering/modify-peering-connections.html#vpc-peering-dns). + 1. Follow the steps in [Enable DNS resolution for a VPC peering connection](https://docs.aws.amazon.com/vpc/latest/peering/vpc-peering-dns.html). 2. Enable the **Accepter DNS resolution** option. If your Apache Kafka service is in a Google Cloud VPC that has no internet access, take the following steps: @@ -49,6 +86,16 @@ If your Apache Kafka service is in a Google Cloud VPC that has no internet acces You must add the CIDR of the region where your TiDB Cloud cluster is located to the ingress firewall rules. The CIDR can be found on the **VPC Peering** page. Doing so allows the traffic to flow from your TiDB cluster to the Kafka brokers. +
+
+ +If you want to provide Public IP access to your Apache Kafka service, assign Public IP addresses to all your Kafka brokers. + +It is **NOT** recommended to use Public IP in a production environment. + +
+
+ ### Kafka ACL authorization To allow TiDB Cloud changefeeds to stream data to Apache Kafka and create Kafka topics automatically, ensure that the following permissions are added in Kafka: @@ -60,21 +107,71 @@ For example, if your Kafka cluster is in Confluent Cloud, you can see [Resources ## Step 1. Open the changefeed page for Apache Kafka -1. In the [TiDB Cloud console](https://tidbcloud.com), navigate to the cluster overview page of the target TiDB cluster, and then click **Changefeed** in the left navigation pane. -2. Click **Create Changefeed**, and select **Kafka** as **Target Type**. +1. Log in to the [TiDB Cloud console](https://tidbcloud.com). +2. Navigate to the cluster overview page of the target TiDB cluster, and then click **Changefeed** in the left navigation pane. +3. Click **Create Changefeed**, and select **Kafka** as **Target Type**. ## Step 2. Configure the changefeed target -1. Under **Brokers Configuration**, fill in your Kafka brokers endpoints. You can use commas `,` to separate multiple endpoints. -2. Select an authentication option according to your Kafka authentication configuration. +The steps vary depending on the connectivity method you select. + + +
+ +1. In **Connectivity Method**, select **VPC Peering** or **Public IP**, fill in your Kafka brokers endpoints. You can use commas `,` to separate multiple endpoints. +2. Select an **Authentication** option according to your Kafka authentication configuration. - If your Kafka does not require authentication, keep the default option **Disable**. - - If your Kafka requires authentication, select the corresponding authentication type, and then fill in the user name and password of your Kafka account for authentication. + - If your Kafka requires authentication, select the corresponding authentication type, and then fill in the **user name** and **password** of your Kafka account for authentication. -3. Select your Kafka version. If you do not know that, use Kafka V2. -4. Select a desired compression type for the data in this changefeed. +3. Select your **Kafka Version**. If you do not know which one to use, use **Kafka v2**. +4. Select a **Compression** type for the data in this changefeed. 5. Enable the **TLS Encryption** option if your Kafka has enabled TLS encryption and you want to use TLS encryption for the Kafka connection. -6. Click **Next** to check the configurations you set and go to the next page. +6. Click **Next** to test the network connection. If the test succeeds, you will be directed to the next page. + +
+
+ +1. In **Connectivity Method**, select **Private Link**. +2. Authorize the [AWS Principal](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_principal.html#principal-accounts) of TiDB Cloud to create an endpoint for your endpoint service. The AWS Principal is provided in the tip on the web page. +3. Make sure you select the same **Number of AZs** and **AZ IDs of Kafka Deployment**, and fill the same unique ID in **Kafka Advertised Listener Pattern** when you [Set Up Self-Hosted Kafka Private Link Service in AWS](/tidb-cloud/setup-self-hosted-kafka-private-link-service.md) in the **Network** section. +4. Fill in the **Endpoint Service Name** which is configured in [Set Up Self-Hosted Kafka Private Link Service in AWS](/tidb-cloud/setup-self-hosted-kafka-private-link-service.md). +5. Fill in the **Bootstrap Ports**. It is recommended that you set at least one port for one AZ. You can use commas `,` to separate multiple ports. +6. Select an **Authentication** option according to your Kafka authentication configuration. + + - If your Kafka does not require authentication, keep the default option **Disable**. + - If your Kafka requires authentication, select the corresponding authentication type, and then fill in the **user name** and **password** of your Kafka account for authentication. + +7. Select your **Kafka Version**. If you do not know which one to use, use **Kafka v2**. +8. Select a **Compression** type for the data in this changefeed. +9. Enable the **TLS Encryption** option if your Kafka has enabled TLS encryption and you want to use TLS encryption for the Kafka connection. +10. Click **Next** to test the network connection. If the test succeeds, you will be directed to the next page. +11. TiDB Cloud creates the endpoint for **Private Link**, which might take several minutes. +12. Once the endpoint is created, log in to your cloud provider console and accept the connection request. +13. Return to the [TiDB Cloud console](https://tidbcloud.com) to confirm that you have accepted the connection request. TiDB Cloud will test the connection and proceed to the next page if the test succeeds. + +
+
+ +1. In **Connectivity Method**, select **Private Service Connect**. +2. Ensure that you fill in the same unique ID in **Kafka Advertised Listener Pattern** when you [Set Up Self-Hosted Kafka Private Service Connect in Google Cloud](/tidb-cloud/setup-self-hosted-kafka-private-service-connect.md) in the **Network** section. +3. Fill in the **Service Attachment** that you have configured in [Setup Self Hosted Kafka Private Service Connect in Google Cloud](/tidb-cloud/setup-self-hosted-kafka-private-service-connect.md) +4. Fill in the **Bootstrap Ports**. It is recommended that you provide more than one port. You can use commas `,` to separate multiple ports. +5. Select an **Authentication** option according to your Kafka authentication configuration. + + - If your Kafka does not require authentication, keep the default option **Disable**. + - If your Kafka requires authentication, select the corresponding authentication type, and then fill in the **user name** and **password** of your Kafka account for authentication. + +6. Select your **Kafka Version**. If you do not know which one to use, use **Kafka v2**. +7. Select a **Compression** type for the data in this changefeed. +8. Enable the **TLS Encryption** option if your Kafka has enabled TLS encryption and you want to use TLS encryption for the Kafka connection. +9. Click **Next** to test the network connection. If the test succeeds, you will be directed to the next page. +10. TiDB Cloud creates the endpoint for **Private Service Connect**, which might take several minutes. +11. Once the endpoint is created, log in to your cloud provider console and accept the connection request. +12. Return to the [TiDB Cloud console](https://tidbcloud.com) to confirm that you have accepted the connection request. TiDB Cloud will test the connection and proceed to the next page if the test succeeds. + +
+
## Step 3. Set the changefeed @@ -91,8 +188,10 @@ For example, if your Kafka cluster is in Confluent Cloud, you can see [Resources 3. In the **Data Format** area, select your desired format of Kafka messages. - - Avro is a compact, fast, and binary data format with rich data structures, which is widely used in various flow systems. For more information, see [Avro data format](https://docs.pingcap.com/tidb/stable/ticdc-avro-protocol). - - Canal-JSON is a plain JSON text format, which is easy to parse. For more information, see [Canal-JSON data format](https://docs.pingcap.com/tidb/stable/ticdc-canal-json). + - Avro is a compact, fast, and binary data format with rich data structures, which is widely used in various flow systems. For more information, see [Avro data format](https://docs.pingcap.com/tidb/stable/ticdc-avro-protocol). + - Canal-JSON is a plain JSON text format, which is easy to parse. For more information, see [Canal-JSON data format](https://docs.pingcap.com/tidb/stable/ticdc-canal-json). + - Open Protocol is a row-level data change notification protocol that provides data sources for monitoring, caching, full-text indexing, analysis engines, and primary-secondary replication between different databases. For more information, see [Open Protocol data format](https://docs.pingcap.com/tidb/stable/ticdc-open-protocol). + - Debezium is a tool for capturing database changes. It converts each captured database change into a message called an "event" and sends these events to Kafka. For more information, see [Debezium data format](https://docs.pingcap.com/tidb/stable/ticdc-debezium). 4. Enable the **TiDB Extension** option if you want to add TiDB-extension fields to the Kafka message body. @@ -109,32 +208,40 @@ For example, if your Kafka cluster is in Confluent Cloud, you can see [Resources The distribution mode controls how the changefeed creates Kafka topics, by table, by database, or creating one topic for all changelogs. - - **Distribute changelogs by table to Kafka Topics** + - **Distribute changelogs by table to Kafka Topics** If you want the changefeed to create a dedicated Kafka topic for each table, choose this mode. Then, all Kafka messages of a table are sent to a dedicated Kafka topic. You can customize topic names for tables by setting a topic prefix, a separator between a database name and table name, and a suffix. For example, if you set the separator as `_`, the topic names are in the format of `_`. For changelogs of non-row events, such as Create Schema Event, you can specify a topic name in the **Default Topic Name** field. The changefeed will create a topic accordingly to collect such changelogs. - - **Distribute changelogs by database to Kafka Topics** + - **Distribute changelogs by database to Kafka Topics** If you want the changefeed to create a dedicated Kafka topic for each database, choose this mode. Then, all Kafka messages of a database are sent to a dedicated Kafka topic. You can customize topic names of databases by setting a topic prefix and a suffix. For changelogs of non-row events, such as Resolved Ts Event, you can specify a topic name in the **Default Topic Name** field. The changefeed will create a topic accordingly to collect such changelogs. - - **Send all changelogs to one specified Kafka Topic** + - **Send all changelogs to one specified Kafka Topic** If you want the changefeed to create one Kafka topic for all changelogs, choose this mode. Then, all Kafka messages in the changefeed will be sent to one Kafka topic. You can define the topic name in the **Topic Name** field. -7. In the **Partition Distribution** area, you can decide which partition a Kafka message will be sent to: +7. In the **Partition Distribution** area, you can decide which partition a Kafka message will be sent to. You can define **a single partition dispatcher for all tables**, or **different partition dispatchers for different tables**. TiDB Cloud provides four types of dispatchers: - - **Distribute changelogs by index value to Kafka partition** + - **Distribute changelogs by primary key or index value to Kafka partition** - If you want the changefeed to send Kafka messages of a table to different partitions, choose this distribution method. The index value of a row changelog will determine which partition the changelog is sent to. This distribution method provides a better partition balance and ensures row-level orderliness. + If you want the changefeed to send Kafka messages of a table to different partitions, choose this distribution method. The primary key or index value of a row changelog will determine which partition the changelog is sent to. This distribution method provides a better partition balance and ensures row-level orderliness. - - **Distribute changelogs by table to Kafka partition** + - **Distribute changelogs by table to Kafka partition** If you want the changefeed to send Kafka messages of a table to one Kafka partition, choose this distribution method. The table name of a row changelog will determine which partition the changelog is sent to. This distribution method ensures table orderliness but might cause unbalanced partitions. + - **Distribute changelogs by timestamp to Kafka partition** + + If you want the changefeed to send Kafka messages to different Kafka partitions randomly, choose this distribution method. The commitTs of a row changelog will determine which partition the changelog is sent to. This distribution method provides a better partition balance and ensures orderliness in each partition. However, multiple changes of a data item might be sent to different partitions and the consumer progress of different consumers might be different, which might cause data inconsistency. Therefore, the consumer needs to sort the data from multiple partitions by commitTs before consuming. + + - **Distribute changelogs by column value to Kafka partition** + + If you want the changefeed to send Kafka messages of a table to different partitions, choose this distribution method. The specified column values of a row changelog will determine which partition the changelog is sent to. This distribution method ensures orderliness in each partition and guarantees that the changelog with the same column values is send to the same partition. + 8. In the **Topic Configuration** area, configure the following numbers. The changefeed will automatically create the Kafka topics according to the numbers. - **Replication Factor**: controls how many Kafka servers each Kafka message is replicated to. The valid value ranges from [`min.insync.replicas`](https://kafka.apache.org/33/documentation.html#brokerconfigs_min.insync.replicas) to the number of Kafka brokers. diff --git a/tidb-cloud/setup-self-hosted-kafka-private-link-service.md b/tidb-cloud/setup-self-hosted-kafka-private-link-service.md new file mode 100644 index 0000000000000..f6170830d41f6 --- /dev/null +++ b/tidb-cloud/setup-self-hosted-kafka-private-link-service.md @@ -0,0 +1,761 @@ +--- +title: Set Up Self-Hosted Kafka Private Link Service in AWS +summary: This document explains how to set up Private Link service for self-hosted Kafka in AWS and how to make it work with TiDB Cloud. +--- + +# Set Up Self-Hosted Kafka Private Link Service in AWS + +This document describes how to set up Private Link service for self-hosted Kafka in AWS, and how to make it work with TiDB Cloud. + +The mechanism works as follows: + +1. The TiDB Cloud VPC connects to the Kafka VPC through private endpoints. +2. Kafka clients need to communicate directly to all Kafka brokers. +3. Each Kafka broker is mapped to a unique port of endpoints within the TiDB Cloud VPC. +4. Leverage the Kafka bootstrap mechanism and AWS resources to achieve the mapping. + +The following diagram shows the mechanism. + +![Connect to AWS Self-Hosted Kafka Private Link Service](/media/tidb-cloud/changefeed/connect-to-aws-self-hosted-kafka-privatelink-service.jpeg) + +The document provides an example of connecting to a Kafka Private Link service deployed across three availability zones (AZ) in AWS. While other configurations are possible based on similar port-mapping principles, this document covers the fundamental setup process of the Kafka Private Link service. For production environments, a more resilient Kafka Private Link service with enhanced operational maintainability and observability is recommended. + +## Prerequisites + +1. Ensure that you have the following authorization to set up a Kafka Private Link service in your own AWS account. + + - Manage EC2 nodes + - Manage VPC + - Manage subnets + - Manage security groups + - Manage load balancer + - Manage endpoint services + - Connect to EC2 nodes to configure Kafka nodes + +2. [Create a TiDB Cloud Dedicated cluster](/tidb-cloud/create-tidb-cluster.md) if you do not have one. + +3. Get the Kafka deployment information from your TiDB Cloud Dedicated cluster. + + 1. In the [TiDB Cloud console](https://tidbcloud.com), navigate to the cluster overview page of the TiDB cluster, and then click **Changefeed** in the left navigation pane. + 2. On the overview page, find the region of the TiDB cluster. Ensure that your Kafka cluster will be deployed to the same region. + 3. Click **Create Changefeed**. + 1. In **Target Type**, select **Kafka**. + 2. In **Connectivity Method**, select **Private Link**. + 4. Note down the information of the TiDB Cloud AWS account in **Reminders before proceeding**. You will use it to authorize TiDB Cloud to create an endpoint for the Kafka Private Link service. + 5. Select **Number of AZs**. In this example, select **3 AZs**. Note down the IDs of the AZs in which you want to deploy your Kafka cluster. If you want to know the relationship between your AZ names and AZ IDs, see [Availability Zone IDs for your AWS resources](https://docs.aws.amazon.com/ram/latest/userguide/working-with-az-ids.html) to find it. + 6. Enter a unique **Kafka Advertised Listener Pattern** for your Kafka Private Link service. + 1. Input a unique random string. It can only include numbers or lowercase letters. You will use it to generate **Kafka Advertised Listener Pattern** later. + 2. Click **Check usage and generate** to check if the random string is unique and generate **Kafka Advertised Listener Pattern** that will be used to assemble the EXTERNAL advertised listener for Kafka brokers. + +Note down all the deployment information. You need to use it to configure your Kafka Private Link service later. + +The following table shows an example of the deployment information. + +| Information | Value | Note | +|--------|-----------------|---------------------------| +| Region | Oregon (`us-west-2`) | N/A | +| Principal of TiDB Cloud AWS Account | `arn:aws:iam:::root` | N/A | +| AZ IDs |
  • `usw2-az1`
  • `usw2-az2`
  • `usw2-az3`
| Align AZ IDs to AZ names in your AWS account.
Example:
  • `usw2-az1` => `us-west-2a`
  • `usw2-az2` => `us-west-2c`
  • `usw2-az3` => `us-west-2b`
| +| Kafka Advertised Listener Pattern | The unique random string: `abc`
Generated pattern for AZs:
  • `usw2-az1` => <broker_id>.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:<port>
  • `usw2-az2` => <broker_id>.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:<port>
  • `usw2-az3` => <broker_id>.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:<port>
| Map AZ names to AZ-specified patterns. Make sure that you configure the right pattern to the broker in a specific AZ later.
  • `us-west-2a` => <broker_id>.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:<port>
  • `us-west-2c` => <broker_id>.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:<port>
  • `us-west-2b` => <broker_id>.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:<port>
| + +## Step 1. Set up a Kafka cluster + +If you need to deploy a new cluster, follow the instructions in [Deploy a new Kafka cluster](#deploy-a-new-kafka-cluster). + +If you need to expose an existing cluster, follow the instructions in [Reconfigure a running Kafka cluster](#reconfigure-a-running-kafka-cluster). + +### Deploy a new Kafka cluster + +#### 1. Set up the Kafka VPC + +The Kafka VPC requires the following: + +- Three private subnets for brokers, one for each AZ. +- One public subnet in any AZ with a bastion node that can connect to the internet and three private subnets, which makes it easy to set up the Kafka cluster. In a production environment, you might have your own bastion node that can connect to the Kafka VPC. + +Before creating subnets, create subnets in AZs based on the mappings of AZ IDs and AZ names. Take the following mapping as an example. + +- `usw2-az1` => `us-west-2a` +- `usw2-az2` => `us-west-2c` +- `usw2-az3` => `us-west-2b` + +Create private subnets in the following AZs: + +- `us-west-2a` +- `us-west-2c` +- `us-west-2b` + +Take the following steps to create the Kafka VPC. + +**1.1. Create the Kafka VPC** + +1. Go to [AWS Console > VPC dashboard](https://console.aws.amazon.com/vpcconsole/home?#vpcs:), and switch to the region in which you want to deploy Kafka. + +2. Click **Create VPC**. Fill in the information on the **VPC settings** page as follows. + + 1. Select **VPC only**. + 2. Enter a tag in **Name tag**, for example, `Kafka VPC`. + 3. Select **IPv4 CIDR manual input**, and enter the IPv4 CIDR, for example, `10.0.0.0/16`. + 4. Use the default values for other options. Click **Create VPC**. + 5. On the VPC detail page, take note of the VPC ID, for example, `vpc-01f50b790fa01dffa`. + +**1.2. Create private subnets in the Kafka VPC** + +1. Go to the [Subnets Listing page](https://console.aws.amazon.com/vpcconsole/home?#subnets:). +2. Click **Create subnet**. +3. Select **VPC ID** (`vpc-01f50b790fa01dffa` in this example) that you noted down before. +4. Add three subnets with the following information. It is recommended that you put the AZ IDs in the subnet names to make it easy to configure the brokers later, because TiDB Cloud requires encoding the AZ IDs in the broker's `advertised.listener` configuration. + + - Subnet1 in `us-west-2a` + - **Subnet name**: `broker-usw2-az1` + - **Availability Zone**: `us-west-2a` + - **IPv4 subnet CIDR block**: `10.0.0.0/18` + + - Subnet2 in `us-west-2c` + - **Subnet name**: `broker-usw2-az2` + - **Availability Zone**: `us-west-2c` + - **IPv4 subnet CIDR block**: `10.0.64.0/18` + + - Subnet3 in `us-west-2b` + - **Subnet name**: `broker-usw2-az3` + - **Availability Zone**: `us-west-2b` + - **IPv4 subnet CIDR block**: `10.0.128.0/18` + +5. Click **Create subnet**. The **Subnets Listing** page is displayed. + +**1.3. Create the public subnet in the Kafka VPC** + +1. Click **Create subnet**. +2. Select **VPC ID** (`vpc-01f50b790fa01dffa` in this example) that you noted down before. +3. Add the public subnet in any AZ with the following information: + + - **Subnet name**: `bastion` + - **IPv4 subnet CIDR block**: `10.0.192.0/18` + +4. Click **Create subnet**. The **Subnets Listing** page is displayed. +5. Configure the bastion subnet to the Public subnet. + + 1. Go to [VPC dashboard > Internet gateways](https://console.aws.amazon.com/vpcconsole/home#igws:). Create an Internet Gateway with the name `kafka-vpc-igw`. + 2. On the **Internet gateways Detail** page, in **Actions**, click **Attach to VPC** to attach the Internet Gateway to the Kafka VPC. + 3. Go to [VPC dashboard > Route tables](https://console.aws.amazon.com/vpcconsole/home#CreateRouteTable:). Create a route table to the Internet Gateway in Kafka VPC and add a new route with the following information: + + - **Name**: `kafka-vpc-igw-route-table` + - **VPC**: `Kafka VPC` + - **Route**: + - **Destination**: `0.0.0.0/0` + - **Target**: `Internet Gateway`, `kafka-vpc-igw` + + 4. Attach the route table to the bastion subnet. On the **Detail** page of the route table, click **Subnet associations > Edit subnet associations** to add the bastion subnet and save changes. + +#### 2. Set up Kafka brokers + +**2.1. Create a bastion node** + +Go to the [EC2 Listing page](https://console.aws.amazon.com/ec2/home#Instances:). Create the bastion node in the bastion subnet. + +- **Name**: `bastion-node` +- **Amazon Machine Image**: `Amazon linux` +- **Instance Type**: `t2.small` +- **Key pair**: `kafka-vpc-key-pair`. Create a new key pair named `kafka-vpc-key-pair`. Download **kafka-vpc-key-pair.pem** to your local for later configuration. +- Network settings + + - **VPC**: `Kafka VPC` + - **Subnet**: `bastion` + - **Auto-assign public IP**: `Enable` + - **Security Group**: create a new security group allow SSH login from anywhere. You can narrow the rule for safety in the production environment. + +**2.2. Create broker nodes** + +Go to the [EC2 Listing page](https://console.aws.amazon.com/ec2/home#Instances:). Create three broker nodes in broker subnets, one for each AZ. + +- Broker 1 in subnet `broker-usw2-az1` + + - **Name**: `broker-node1` + - **Amazon Machine Image**: `Amazon linux` + - **Instance Type**: `t2.large` + - **Key pair**: reuse `kafka-vpc-key-pair` + - Network settings + + - **VPC**: `Kafka VPC` + - **Subnet**: `broker-usw2-az1` + - **Auto-assign public IP**: `Disable` + - **Security Group**: create a new security group to allow all TCP from Kafka VPC. You can narrow the rule for safety in the production environment. + - **Protocol**: `TCP` + - **Port range**: `0 - 65535` + - **Source**: `10.0.0.0/16` + +- Broker 2 in subnet `broker-usw2-az2` + + - **Name**: `broker-node2` + - **Amazon Machine Image**: `Amazon linux` + - **Instance Type**: `t2.large` + - **Key pair**: reuse `kafka-vpc-key-pair` + - Network settings + + - **VPC**: `Kafka VPC` + - **Subnet**: `broker-usw2-az2` + - **Auto-assign public IP**: `Disable` + - **Security Group**: create a new security group to allow all TCP from Kafka VPC. You can narrow the rule for safety in the production environment. + - **Protocol**: `TCP` + - **Port range**: `0 - 65535` + - **Source**: `10.0.0.0/16` + +- Broker 3 in subnet `broker-usw2-az3` + + - **Name**: `broker-node3` + - **Amazon Machine Image**: `Amazon linux` + - **Instance Type**: `t2.large` + - **Key pair**: reuse `kafka-vpc-key-pair` + - Network settings + + - **VPC**: `Kafka VPC` + - **Subnet**: `broker-usw2-az3` + - **Auto-assign public IP**: `Disable` + - **Security Group**: create a new security group to allow all TCP from Kafka VPC. You can narrow the rule for safety in the production environment. + - **Protocol**: `TCP` + - **Port range**: `0 - 65535` + - **Source**: `10.0.0.0/16` + +**2.3. Prepare Kafka runtime binaries** + +1. Go to the detail page of the bastion node. Get the **Public IPv4 address**. Use SSH to log in to the node with the previously downloaded `kafka-vpc-key-pair.pem`. + + ```shell + chmod 400 kafka-vpc-key-pair.pem + ssh -i "kafka-vpc-key-pair.pem" ec2-user@{bastion_public_ip} # replace {bastion_public_ip} with the IP address of your bastion node, for example, 54.186.149.187 + scp -i "kafka-vpc-key-pair.pem" kafka-vpc-key-pair.pem ec2-user@{bastion_public_ip}:~/ + ``` + +2. Download binaries. + + ```shell + # Download Kafka and OpenJDK, and then extract the files. You can choose the binary version based on your preference. + wget https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz + tar -zxf kafka_2.13-3.7.1.tgz + wget https://download.java.net/java/GA/jdk22.0.2/c9ecb94cd31b495da20a27d4581645e8/9/GPL/openjdk-22.0.2_linux-x64_bin.tar.gz + tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz + ``` + +3. Copy binaries to each broker node. + + ```shell + # Replace {broker-node1-ip} with your broker-node1 IP address + scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz ec2-user@{broker-node1-ip}:~/ + ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node1-ip} "tar -zxf kafka_2.13-3.7.1.tgz" + scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz ec2-user@{broker-node1-ip}:~/ + ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node1-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz" + + # Replace {broker-node2-ip} with your broker-node2 IP address + scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz ec2-user@{broker-node2-ip}:~/ + ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node2-ip} "tar -zxf kafka_2.13-3.7.1.tgz" + scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz ec2-user@{broker-node2-ip}:~/ + ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node2-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz" + + # Replace {broker-node3-ip} with your broker-node3 IP address + scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz ec2-user@{broker-node3-ip}:~/ + ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node3-ip} "tar -zxf kafka_2.13-3.7.1.tgz" + scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz ec2-user@{broker-node3-ip}:~/ + ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node3-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz" + ``` + +**2.4. Set up Kafka nodes on each broker node** + +**2.4.1 Set up a KRaft Kafka cluster with three nodes** + +Each node will act as a broker and controller role. Do the following for each broker: + +1. For the `listeners` item, all three brokers are the same and act as broker and controller roles: + + 1. Configure the same CONTROLLER listener for all **controller** role nodes. If you only want to add the **broker** role nodes, you do not need the CONTROLLER listener in `server.properties`. + 2. Configure two **broker** listeners, `INTERNAL` for internal access and `EXTERNAL` for external access from TiDB Cloud. + +2. For the `advertised.listeners` item, do the following: + + 1. Configure an INTERNAL advertised listener for every broker with the internal IP of the broker node. Advertised internal Kafka clients use this address to visit the broker. + 2. Configure an EXTERNAL advertised listener based on **Kafka Advertised Listener Pattern** you get from TiDB Cloud for each broker node to help TiDB Cloud differentiate between different brokers. Different EXTERNAL advertised listeners help the Kafka client from TiDB Cloud route requests to the right broker. + + - `` differentiates brokers from Kafka Private Link Service access points. Plan a port range for EXTERNAL advertised listeners of all brokers. These ports do not have to be actual ports listened to by brokers. They are ports listened to by the load balancer for Private Link Service that will forward requests to different brokers. + - `AZ ID` in **Kafka Advertised Listener Pattern** indicates where the broker is deployed. TiDB Cloud will route requests to different endpoint DNS names based on the AZ ID. + + It is recommended to configure different broker IDs for different brokers to make it easy for troubleshooting. + +3. The planning values are as follows: + + - **CONTROLLER port**: `29092` + - **INTERNAL port**: `9092` + - **EXTERNAL**: `39092` + - **EXTERNAL advertised listener ports range**: `9093~9095` + +**2.4.2. Create a configuration file** + +Use SSH to log in to every broker node. Create a configuration file `~/config/server.properties` with the following content. + +```properties +# brokers in usw2-az1 + +# broker-node1 ~/config/server.properties +# 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses. +# 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section. +# 2.1 The pattern for AZ(ID: usw2-az1) is ".usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:". +# 2.2 So the EXTERNAL can be "b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093". Replace with "b" prefix plus "node.id" properties, and replace with a unique port (9093) in the port range of the EXTERNAL advertised listener. +# 2.3 If there are more broker role nodes in the same AZ, you can configure them in the same way. +process.roles=broker,controller +node.id=1 +controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092 +listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092 +inter.broker.listener.name=INTERNAL +advertised.listeners=INTERNAL://{broker-node1-ip}:9092,EXTERNAL://b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 +controller.listener.names=CONTROLLER +listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL +log.dirs=./data +``` + +```properties +# brokers in usw2-az2 + +# broker-node2 ~/config/server.properties +# 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses. +# 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section. +# 2.1 The pattern for AZ(ID: usw2-az2) is ".usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:". +# 2.2 So the EXTERNAL can be "b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094". Replace with "b" prefix plus "node.id" properties, and replace with a unique port (9094) in the port range of the EXTERNAL advertised listener. +# 2.3 If there are more broker role nodes in the same AZ, you can configure them in the same way. +process.roles=broker,controller +node.id=2 +controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092 +listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092 +inter.broker.listener.name=INTERNAL +advertised.listeners=INTERNAL://{broker-node2-ip}:9092,EXTERNAL://b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 +controller.listener.names=CONTROLLER +listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL +log.dirs=./data +``` + +```properties +# brokers in usw2-az3 + +# broker-node3 ~/config/server.properties +# 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses. +# 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section. +# 2.1 The pattern for AZ(ID: usw2-az3) is ".usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:". +# 2.2 So the EXTERNAL can be "b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095". Replace with "b" prefix plus "node.id" properties, and replace with a unique port (9095) in the port range of the EXTERNAL advertised listener. +# 2.3 If there are more broker role nodes in the same AZ, you can configure them in the same way. +process.roles=broker,controller +node.id=3 +controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092 +listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092 +inter.broker.listener.name=INTERNAL +advertised.listeners=INTERNAL://{broker-node3-ip}:9092,EXTERNAL://b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 +controller.listener.names=CONTROLLER +listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL +log.dirs=./data +``` + +**2.4.3 Start Kafka brokers** + +Create a script, and then execute it to start the Kafka broker in each broker node. + +```shell +#!/bin/bash + +# Get the directory of the current script +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# Set JAVA_HOME to the Java installation within the script directory +export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" +# Define the vars +KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" +KAFKA_STORAGE_CMD=$KAFKA_DIR/kafka-storage.sh +KAFKA_START_CMD=$KAFKA_DIR/kafka-server-start.sh +KAFKA_DATA_DIR=$SCRIPT_DIR/data +KAFKA_LOG_DIR=$SCRIPT_DIR/log +KAFKA_CONFIG_DIR=$SCRIPT_DIR/config + +# Cleanup step, which makes it easy for multiple experiments +# Find all Kafka process IDs +KAFKA_PIDS=$(ps aux | grep 'kafka.Kafka' | grep -v grep | awk '{print $2}') +if [ -z "$KAFKA_PIDS" ]; then + echo "No Kafka processes are running." +else + # Kill each Kafka process + echo "Killing Kafka processes with PIDs: $KAFKA_PIDS" + for PID in $KAFKA_PIDS; do + kill -9 $PID + echo "Killed Kafka process with PID: $PID" + done + echo "All Kafka processes have been killed." +fi + +rm -rf $KAFKA_DATA_DIR +mkdir -p $KAFKA_DATA_DIR +rm -rf $KAFKA_LOG_DIR +mkdir -p $KAFKA_LOG_DIR + +# Magic id: BRl69zcmTFmiPaoaANybiw, you can use your own +$KAFKA_STORAGE_CMD format -t "BRl69zcmTFmiPaoaANybiw" -c "$KAFKA_CONFIG_DIR/server.properties" > $KAFKA_LOG_DIR/server_format.log +LOG_DIR=$KAFKA_LOG_DIR nohup $KAFKA_START_CMD "$KAFKA_CONFIG_DIR/server.properties" & +``` + +**2.5. Test the cluster setting in the bastion node** + +1. Test the Kafka bootstrap. + + ```shell + export JAVA_HOME=/home/ec2-user/jdk-22.0.2 + + # Bootstrap from INTERNAL listener + ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:9092 | grep 9092 + # Expected output (the actual order might be different) + {broker-node1-ip}:9092 (id: 1 rack: null) -> ( + {broker-node2-ip}:9092 (id: 2 rack: null) -> ( + {broker-node3-ip}:9092 (id: 3 rack: null) -> ( + + # Bootstrap from EXTERNAL listener + ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:39092 + # Expected output for the last 3 lines (the actual order might be different) + # The difference in the output from "bootstrap from INTERNAL listener" is that exceptions or errors might occur because advertised listeners cannot be resolved in Kafka VPC. + # We will make them resolvable in TiDB Cloud side and make it route to the right broker when you create a changefeed connect to this Kafka cluster by Private Link. + b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException + b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException + b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException + ``` + +2. Create a producer script `produce.sh` in the bastion node. + + ```shell + #!/bin/bash + BROKER_LIST=$1 # "{broker_address1},{broker_address2}..." + + # Get the directory of the current script + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + # Set JAVA_HOME to the Java installation within the script directory + export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" + # Define the Kafka directory + KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" + TOPIC="test-topic" + + # Create a topic if it does not exist + create_topic() { + echo "Creating topic if it does not exist..." + $KAFKA_DIR/kafka-topics.sh --create --topic $TOPIC --bootstrap-server $BROKER_LIST --if-not-exists --partitions 3 --replication-factor 3 + } + + # Produce messages to the topic + produce_messages() { + echo "Producing messages to the topic..." + for ((chrono=1; chrono <= 10; chrono++)); do + message="Test message "$chrono + echo "Create "$message + echo $message | $KAFKA_DIR/kafka-console-producer.sh --broker-list $BROKER_LIST --topic $TOPIC + done + } + create_topic + produce_messages + ``` + +3. Create a consumer script `consume.sh` in the bastion node. + + ```shell + #!/bin/bash + + BROKER_LIST=$1 # "{broker_address1},{broker_address2}..." + + # Get the directory of the current script + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + # Set JAVA_HOME to the Java installation within the script directory + export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" + # Define the Kafka directory + KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" + TOPIC="test-topic" + CONSUMER_GROUP="test-group" + # Consume messages from the topic + consume_messages() { + echo "Consuming messages from the topic..." + $KAFKA_DIR/kafka-console-consumer.sh --bootstrap-server $BROKER_LIST --topic $TOPIC --from-beginning --timeout-ms 5000 --consumer-property group.id=$CONSUMER_GROUP + } + consume_messages + ``` + +4. Execute `produce.sh` and `consume.sh` to verify that the Kafka cluster is running. These scripts will also be reused for later network connection testing. The script will create a topic with `--partitions 3 --replication-factor 3`. Ensure that all these three brokers contain data. Ensure that the script will connect to all three brokers to guarantee that network connection will be tested. + + ```shell + # Test write message. + ./produce.sh {one_of_broker_ip}:9092 + ``` + + ```shell + # Expected output + Creating topic if it does not exist... + + Producing messages to the topic... + Create Test message 1 + >>Create Test message 2 + >>Create Test message 3 + >>Create Test message 4 + >>Create Test message 5 + >>Create Test message 6 + >>Create Test message 7 + >>Create Test message 8 + >>Create Test message 9 + >>Create Test message 10 + ``` + + ```shell + # Test read message + ./consume.sh {one_of_broker_ip}:9092 + ``` + + ```shell + # Expected example output (the actual message order might be different) + Consuming messages from the topic... + Test message 3 + Test message 4 + Test message 5 + Test message 9 + Test message 10 + Test message 6 + Test message 8 + Test message 1 + Test message 2 + Test message 7 + [2024-11-01 08:54:27,547] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) + org.apache.kafka.common.errors.TimeoutException + Processed a total of 10 messages + ``` + +### Reconfigure a running Kafka cluster + +Ensure that your Kafka cluster is deployed in the same region and AZs as the TiDB cluster. If any brokers are in different AZs, move them to the correct ones. + +#### 1. Configure the EXTERNAL listener for brokers + +The following configuration applies to a Kafka KRaft cluster. The ZK mode configuration is similar. + +1. Plan configuration changes. + + 1. Configure an EXTERNAL **listener** for every broker for external access from TiDB Cloud. Select a unique port as the EXTERNAL port, for example, `39092`. + 2. Configure an EXTERNAL **advertised listener** based on **Kafka Advertised Listener Pattern** you get from TiDB Cloud for every broker node to help TiDB Cloud differentiate between different brokers. Different EXTERNAL advertised listeners help Kafka clients from TiDB Cloud route requests to the right broker. + + - `` differentiates brokers from Kafka Private Link Service access points. Plan a port range for EXTERNAL advertised listeners of all brokers, for example, `range from 9093`. These ports do not have to be actual ports listened to by brokers. They are ports listened to by the load balancer for Private Link Service that will forward requests to different brokers. + - `AZ ID` in **Kafka Advertised Listener Pattern** indicates where the broker is deployed. TiDB Cloud will route requests to different endpoint DNS names based on the AZ ID. + + It is recommended to configure different broker IDs for different brokers to make it easy for troubleshooting. + +2. Use SSH to log in to each broker node. Modify the configuration file of each broker with the following content: + + ```properties + # brokers in usw2-az1 + + # Add EXTERNAL listener + listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 + + # Add EXTERNAL advertised listeners based on the "Kafka Advertised Listener Pattern" in "Prerequisites" section + # 1. The pattern for AZ(ID: usw2-az1) is ".usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:" + # 2. So the EXTERNAL can be "b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093", replace with "b" prefix plus "node.id" properties, replace with a unique port(9093) in EXTERNAL advertised listener ports range + advertised.listeners=...,EXTERNAL://b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 + + # Configure EXTERNAL map + listener.security.protocol.map=...,EXTERNAL:PLAINTEXT + ``` + + ```properties + # brokers in usw2-az2 + + # Add EXTERNAL listener + listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 + + # Add EXTERNAL advertised listeners based on the "Kafka Advertised Listener Pattern" in "Prerequisites" section + # 1. The pattern for AZ(ID: usw2-az2) is ".usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:" + # 2. So the EXTERNAL can be "b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094". Replace with "b" prefix plus "node.id" properties, and replace with a unique port(9094) in EXTERNAL advertised listener ports range. + advertised.listeners=...,EXTERNAL://b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 + + # Configure EXTERNAL map + listener.security.protocol.map=...,EXTERNAL:PLAINTEXT + ``` + + ```properties + # brokers in usw2-az3 + + # Add EXTERNAL listener + listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 + + # Add EXTERNAL advertised listeners based on the "Kafka Advertised Listener Pattern" in "Prerequisites" section + # 1. The pattern for AZ(ID: usw2-az3) is ".usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:" + # 2. So the EXTERNAL can be "b2.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095". Replace with "b" prefix plus "node.id" properties, and replace with a unique port(9095) in EXTERNAL advertised listener ports range. + advertised.listeners=...,EXTERNAL://b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 + + # Configure EXTERNAL map + listener.security.protocol.map=...,EXTERNAL:PLAINTEXT + ``` + +3. After you reconfigure all the brokers, restart your Kafka brokers one by one. + +#### 2. Test EXTERNAL listener settings in your internal network + +You can download the Kafka and OpenJDK in you Kafka client node. + +```shell +# Download Kafka and OpenJDK, and then extract the files. You can choose the binary version based on your preference. +wget https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz +tar -zxf kafka_2.13-3.7.1.tgz +wget https://download.java.net/java/GA/jdk22.0.2/c9ecb94cd31b495da20a27d4581645e8/9/GPL/openjdk-22.0.2_linux-x64_bin.tar.gz +tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz +``` + +Execute the following script to test if the bootstrap works as expected. + +```shell +export JAVA_HOME=/home/ec2-user/jdk-22.0.2 + +# Bootstrap from the EXTERNAL listener +./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:39092 + +# Expected output for the last 3 lines (the actual order might be different) +# There will be some exceptions or errors becasue advertised listeners cannot be resolved in your Kafka network. +# We will make them resolvable in TiDB Cloud side and make it route to the right broker when you create a changefeed connect to this Kafka cluster by Private Link. +b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException +b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException +b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException +``` + +## Step 2. Expose the Kafka cluster as Private Link Service + +### 1. Set up the load balancer + +Create a network load balancer with four target groups with different ports. One target group is for bootstrap, and the others will map to different brokers. + +1. bootstrap target group => 9092 => broker-node1:39092,broker-node2:39092,broker-node3:39092 +2. broker target group 1 => 9093 => broker-node1:39092 +3. broker target group 2 => 9094 => broker-node2:39092 +4. broker target group 3 => 9095 => broker-node3:39092 + +If you have more broker role nodes, you need to add more mappings. Ensure that you have at least one node in the bootstrap target group. It is recommended to add three nodes, one for each AZ for resilience. + +Do the following to set up the load balancer: + +1. Go to [Target groups](https://console.aws.amazon.com/ec2/home#CreateTargetGroup:) to create four target groups. + + - Bootstrap target group + + - **Target type**: `Instances` + - **Target group name**: `bootstrap-target-group` + - **Protocol**: `TCP` + - **Port**: `9092` + - **IP address type**: `IPv4` + - **VPC**: `Kafka VPC` + - **Health check protocol**: `TCP` + - **Register targets**: `broker-node1:39092`, `broker-node2:39092`, `broker-node3:39092` + + - Broker target group 1 + + - **Target type**: `Instances` + - **Target group name**: `broker-target-group-1` + - **Protocol**: `TCP` + - **Port**: `9093` + - **IP address type**: `IPv4` + - **VPC**: `Kafka VPC` + - **Health check protocol**: `TCP` + - **Register targets**: `broker-node1:39092` + + - Broker target group 2 + + - **Target type**: `Instances` + - **Target group name**: `broker-target-group-2` + - **Protocol**: `TCP` + - **Port**: `9094` + - **IP address type**: `IPv4` + - **VPC**: `Kafka VPC` + - **Health check protocol**: `TCP` + - **Register targets**: `broker-node2:39092` + + - Broker target group 3 + + - **Target type**: `Instances` + - **Target group name**: `broker-target-group-3` + - **Protocol**: `TCP` + - **Port**: `9095` + - **IP address type**: `IPv4` + - **VPC**: `Kafka VPC` + - **Health check protocol**: `TCP` + - **Register targets**: `broker-node3:39092` + +2. Go to [Load balancers](https://console.aws.amazon.com/ec2/home#LoadBalancers:) to create a network load balancer. + + - **Load balancer name**: `kafka-lb` + - **Schema**: `Internal` + - **Load balancer IP address type**: `IPv4` + - **VPC**: `Kafka VPC` + - **Availability Zones**: + - `usw2-az1` with `broker-usw2-az1 subnet` + - `usw2-az2` with `broker-usw2-az2 subnet` + - `usw2-az3` with `broker-usw2-az3 subnet` + - **Security groups**: create a new security group with the following rules. + - Inbound rule allows all TCP from Kafka VPC: Type - `All TCP`; Source - `Anywhere-IPv4` + - Outbound rule allows all TCP to Kafka VPC: Type - `All TCP`; Destination - `Anywhere-IPv4` + - Listeners and routing: + - Protocol: `TCP`; Port: `9092`; Forward to: `bootstrap-target-group` + - Protocol: `TCP`; Port: `9093`; Forward to: `broker-target-group-1` + - Protocol: `TCP`; Port: `9094`; Forward to: `broker-target-group-2` + - Protocol: `TCP`; Port: `9095`; Forward to: `broker-target-group-3` + +3. Test the load balancer in the bastion node. This example only tests the Kafka bootstrap. Because the load balancer is listening on the Kafka EXTERNAL listener, the addresses of EXTERNAL advertised listeners can not be resolved in the bastion node. Note down the `kafka-lb` DNS name from the load balancer detail page, for example `kafka-lb-77405fa57191adcb.elb.us-west-2.amazonaws.com`. Execute the script in the bastion node. + + ```shell + # Replace {lb_dns_name} to your actual value + export JAVA_HOME=/home/ec2-user/jdk-22.0.2 + ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {lb_dns_name}:9092 + + # Expected output for the last 3 lines (the actual order might be different) + b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException + b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException + b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException + + # You can also try bootstrap in other ports 9093/9094/9095. It will succeed probabilistically because NLB in AWS resolves LB DNS to the IP address of any availability zone and disables cross-zone load balancing by default. + # If you enable cross-zone load balancing in LB, it will succeed. However, it is unnecessary and might cause additional cross-AZ traffic. + ``` + +### 2. Set up Private Link Service + +1. Go to [Endpoint service](https://console.aws.amazon.com/vpcconsole/home#EndpointServices:). Click **Create endpoint service** to create a Private Link service for the Kafka load balancer. + + - **Name**: `kafka-pl-service` + - **Load balancer type**: `Network` + - **Load balancers**: `kafka-lb` + - **Included Availability Zones**: `usw2-az1`,`usw2-az2`, `usw2-az3` + - **Require acceptance for endpoint**: `Acceptance required` + - **Enable private DNS name**: `No` + +2. Note down the **Service name**. You need to provide it to TiDB Cloud, for example `com.amazonaws.vpce.us-west-2.vpce-svc-0f49e37e1f022cd45`. + +3. On the detail page of the kafka-pl-service, click the **Allow principals** tab, and allow the AWS account of TiDB Cloud to create the endpoint. You can get the AWS account of TiDB Cloud in [Prerequistes](#prerequisites), for example, `arn:aws:iam:::root`. + +## Step 3. Connect from TiDB Cloud + +1. Return to the [TiDB Cloud console](https://tidbcloud.com) to create a changefeed for the cluster to connect to the Kafka cluster by **Private Link**. For more information, see [Sink to Apache Kafka](/tidb-cloud/changefeed-sink-to-apache-kafka.md). + +2. When you proceed to **Configure the changefeed target > Connectivity Method > Private Link**, fill in the following fields with corresponding values and other fields as needed. + + - **Kafka Type**: `3 AZs`. Ensure that your Kafka cluster is deployed in the same three AZs. + - **Kafka Advertised Listener Pattern**: `abc`. It is the same as the unique random string you use to generate **Kafka Advertised Listener Pattern** in [Prerequistes](#prerequisites). + - **Endpoint Service Name**: the Kafka service name. + - **Bootstrap Ports**: `9092`. A single port is sufficient because you configure a dedicated bootstrap target group behind it. + +3. Proceed with the steps in [Sink to Apache Kafka](/tidb-cloud/changefeed-sink-to-apache-kafka.md). + +Now you have successfully finished the task. + +## FAQ + +### How to connect to the same Kafka Private Link service from two different TiDB Cloud projects? + +If you have already followed this document to successfully set up the connection from the first project, you can connect to the same Kafka Private Link service from the second project as follows: + +1. Follow instructions from the beginning of this document. + +2. When you proceed to [Step 1. Set up a Kafka cluster](#step-1-set-up-a-kafka-cluster), follow [Reconfigure a running Kafka cluster](#reconfigure-a-running-kafka-cluster) to create another group of EXTERNAL listeners and advertised listeners. You can name it as **EXTERNAL2**. Note that the port range of **EXTERNAL2** cannot overlap with the **EXTERNAL**. + +3. After reconfiguring brokers, add another target group in the load balancer, including the bootstrap and broker target groups. + +4. Configure the TiDB Cloud connection with the following information: + + - New Bootstrap port + - New Kafka Advertised Listener Group + - The same Endpoint Service diff --git a/tidb-cloud/setup-self-hosted-kafka-private-service-connect.md b/tidb-cloud/setup-self-hosted-kafka-private-service-connect.md new file mode 100644 index 0000000000000..16c1a4ac7e378 --- /dev/null +++ b/tidb-cloud/setup-self-hosted-kafka-private-service-connect.md @@ -0,0 +1,704 @@ +--- +title: Set Up Self-Hosted Kafka Private Service Connect in Google Cloud +summary: This document explains how to set up Private Service Connect for self-hosted Kafka in Google Cloud and how to make it work with TiDB Cloud. +--- + +# Set Up Self-Hosted Kafka Private Service Connect in Google Cloud + +This document explains how to set up Private Service Connect for self-hosted Kafka in Google Cloud, and how to make it work with TiDB Cloud. + +The mechanism works as follows: + +1. The TiDB Cloud VPC connects to the Kafka VPC through private endpoints. +2. Kafka clients need to communicate directly to all Kafka brokers. +3. Each Kafka broker is mapped to a unique port within the TiDB Cloud VPC. +4. Leverage the Kafka bootstrap mechanism and Google Cloud resources to achieve the mapping. + +There are two ways to set up Private Service Connect for self-hosted Kafka in Google Cloud: + +- Use the Private Service Connect (PSC) port mapping mechanism. This method requires static port-broker mapping configuration. You need to reconfigure the existing Kafka cluster to add a group of EXTERNAL listeners and advertised listeners. See [Set up self-hosted Kafka Private Service Connect service by PSC port mapping](#set-up-self-hosted-kafka-private-service-connect-service-by-psc-port-mapping). + +- Use [Kafka-proxy](https://github.com/grepplabs/kafka-proxy). This method introduces an extra running process as the proxy between Kafka clients and Kafka brokers. The proxy dynamically configures the port-broker mapping and forwards requests. You do not need to reconfigure the existing Kafka cluster. See [Set up self-hosted Kafka Private Service Connect by Kafka-proxy](#set-up-self-hosted-kafka-private-service-connect-by-kafka-proxy). + +The document provides an example of connecting to a Kafka Private Service Connect service deployed across three availability zones (AZ) in Google Cloud. While other configurations are possible based on similar port-mapping principles, this document covers the fundamental setup process of the Kafka Private Service Connect service. For production environments, a more resilient Kafka Private Service Connect service with enhanced operational maintainability and observability is recommended. + +## Prerequisites + +1. Ensure that you have the following authorization to set up Kafka Private Service Connect in your own Google Cloud account. + + - Manage VM nodes + - Manage VPC + - Manage subnets + - Manage load balancer + - Manage Private Service Connect + - Connect to VM nodes to configure Kafka nodes + +2. [Create a TiDB Cloud Dedicated cluster](/tidb-cloud/create-tidb-cluster.md) if you do not have one. + +3. Get the Kafka deployment information from your TiDB Cloud Dedicated cluster. + + 1. In the [TiDB Cloud console](https://tidbcloud.com), navigate to the cluster overview page of the TiDB cluster, and then click **Changefeed** in the left navigation pane. + 2. On the overview page, find the region of the TiDB cluster. Ensure that your Kafka cluster will be deployed to the same region. + 3. Click **Create Changefeed**. + 1. In **Target Type**, select **Kafka**. + 2. In **Connectivity Method**, select **Private Service Connect**. + 4. Note down the Google Cloud project in **Reminders before proceeding**. You will use it to authorize the auto-accept endpoint creation request from TiDB Cloud. + 5. Note down the **Zones of TiDB Cluster**. You will deploy your TiDB cluster in these zones. It is recommended that you deploy Kafka in these zones to reduce cross-zone traffic. + 6. Pick a unique **Kafka Advertised Listener Pattern** for your Kafka Private Service Connect service. + 1. Input a unique random string. It can only include numbers or lowercase letters. You will use it to generate **Kafka Advertised Listener Pattern** later. + 2. Click **Check usage and generate** to check if the random string is unique and generate **Kafka Advertised Listener Pattern** that will be used to assemble the EXTERNAL advertised listener for Kafka brokers, or configure Kafka-proxy. + +Note down all the deployment information. You need to use it to configure your Kafka Private Service Connect service later. + +The following table shows an example of the deployment information. + +| Information | Value | +|------------------------------------|---------------------------------| +| Region | Oregon (`us-west1`) | +| Google Cloud project of TiDB Cloud | `tidbcloud-prod-000` | +| Zones |
  • `us-west1-a`
  • `us-west1-b`
  • `us-west1-c`
| +| Kafka Advertised Listener Pattern | The unique random string: `abc`
Generated pattern: <broker_id>.abc.us-west1.gcp.3199745.tidbcloud.com:<port> | + +## Set up self-hosted Kafka Private Service Connect service by PSC port mapping + +Expose each Kafka broker to TiDB Cloud VPC with a unique port by using the PSC port mapping mechanism. The following diagram shows how it works. + +![Connect to Google Cloud self-hosted Kafka Private Service Connect by port mapping](/media/tidb-cloud/changefeed/connect-to-google-cloud-self-hosted-kafka-private-service-connect-by-portmapping.jpeg) + +### Step 1. Set up the Kafka cluster + +If you need to deploy a new cluster, follow the instructions in [Deploy a new Kafka cluster](#deploy-a-new-kafka-cluster). + +If you need to expose an existing cluster, follow the instructions in [Reconfigure a running Kafka cluster](#reconfigure-a-running-kafka-cluster). + +#### Deploy a new Kafka cluster + +**1. Set up the Kafka VPC** + +You need to create two subnets for Kafka VPC, one for Kafka brokers, and the other for the bastion node to make it easy to configure the Kafka cluster. + +Go to the [Google Cloud console](https://cloud.google.com/cloud-console), and navigate to the [VPC networks](https://console.cloud.google.com/networking/networks/list) page to create the Kafka VPC with the following attributes: + +- **Name**: `kafka-vpc` +- Subnets + - **Name**: `bastion-subnet`; **Region**: `us-west1`; **IPv4 range**: `10.0.0.0/18` + - **Name**: `brokers-subnet`; **Region**: `us-west1`; **IPv4 range**: `10.64.0.0/18` +- Firewall rules + - `kafka-vpc-allow-custom` + - `kafka-vpc-allow-ssh` + +**2. Provisioning VMs** + +Go to the [VM instances](https://console.cloud.google.com/compute/instances) page to provision VMs: + +1. Bastion node + + - **Name**: `bastion-node` + - **Region**: `us-west1` + - **Zone**: `Any` + - **Machine Type**: `e2-medium` + - **Image**: `Debian GNU/Linux 12` + - **Network**: `kafka-vpc` + - **Subnetwork**: `bastion-subnet` + - **External IPv4 address**: `Ephemeral` + +2. Broker node 1 + + - **Name**: `broker-node1` + - **Region**: `us-west1` + - **Zone**: `us-west1-a` + - **Machine Type**: `e2-medium` + - **Image**: `Debian GNU/Linux 12` + - **Network**: `kafka-vpc` + - **Subnetwork**: `brokers-subnet` + - **External IPv4 address**: `None` + +3. Broker node 2 + + - **Name**: `broker-node2` + - **Region**: `us-west1` + - **Zone**: `us-west1-b` + - **Machine Type**: `e2-medium` + - **Image**: `Debian GNU/Linux 12` + - **Network**: `kafka-vpc` + - **Subnetwork**: `brokers-subnet` + - **External IPv4 address**: `None` + +4. Broker node 3 + + - **Name**: `broker-node3` + - **Region**: `us-west1` + - **Zone**: `us-west1-c` + - **Machine Type**: `e2-medium` + - **Image**: `Debian GNU/Linux 12` + - **Network**: `kafka-vpc` + - **Subnetwork**: `brokers-subnet` + - **External IPv4 address**: `None` + +**3. Prepare Kafka runtime binaries** + +1. Go to the detail page of the bastion node. Click **SSH** to log in to the bastion node. Download binaries. + + ```shell + # Download Kafka and OpenJDK, and then extract the files. You can choose the binary version based on your preference. + wget https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz + tar -zxf kafka_2.13-3.7.1.tgz + wget https://download.java.net/java/GA/jdk22.0.2/c9ecb94cd31b495da20a27d4581645e8/9/GPL/openjdk-22.0.2_linux-x64_bin.tar.gz + tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz + ``` + +2. Copy binaries to each broker node. + + ```shell + # Run this command to authorize gcloud to access the Cloud Platform with Google user credentials + # Follow the instruction in output to finish the login + gcloud auth login + + # Copy binaries to broker nodes + gcloud compute scp kafka_2.13-3.7.1.tgz openjdk-22.0.2_linux-x64_bin.tar.gz broker-node1:~ --zone=us-west1-a + gcloud compute ssh broker-node1 --zone=us-west1-a --command="tar -zxf kafka_2.13-3.7.1.tgz && tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz" + gcloud compute scp kafka_2.13-3.7.1.tgz openjdk-22.0.2_linux-x64_bin.tar.gz broker-node2:~ --zone=us-west1-b + gcloud compute ssh broker-node2 --zone=us-west1-b --command="tar -zxf kafka_2.13-3.7.1.tgz && tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz" + gcloud compute scp kafka_2.13-3.7.1.tgz openjdk-22.0.2_linux-x64_bin.tar.gz broker-node3:~ --zone=us-west1-c + gcloud compute ssh broker-node3 --zone=us-west1-c --command="tar -zxf kafka_2.13-3.7.1.tgz && tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz" + ``` + +**4. Configure Kafka brokers** + +1. Set up a KRaft Kafka cluster with three nodes. Each node acts as a broker and controller roles. For every broker: + + 1. For `listeners`, all three brokers are the same and act as brokers and controller roles: + 1. Configure the same CONTROLLER listener for all **controller** role nodes. If you only want to add the **broker** role nodes, you do not need the CONTROLLER listener in `server.properties`. + 2. Configure two **broker** listeners. INTERNAL for internal access; EXTERNAL for external access from TiDB Cloud. + + 2. For `advertised.listeners`, do the following: + 1. Configure an INTERNAL advertised listener for each broker using the internal IP address of the broker node, which allows internal Kafka clients to connect to the broker via the advertised address. + 2. Configure an EXTERNAL advertised listener based on **Kafka Advertised Listener Pattern** you get from TiDB Cloud for every broker node to help TiDB Cloud differentiate between different brokers. Different EXTERNAL advertised listeners help Kafka clients from TiDB Cloud side route requests to the right broker. + - `` differentiates brokers from Kafka Private Service Connect access points. Plan a port range for EXTERNAL advertised listeners of all brokers. These ports do not have to be actual ports listened to by brokers. They are ports listened to by the load balancer for Private Service Connect that will forward requests to different brokers. + - It is recommended to configure different broker IDs for different brokers to make it easy for troubleshooting. + + 3. The planning values: + - CONTROLLER port: `29092` + - INTERNAL port: `9092` + - EXTERNAL: `39092` + - EXTERNAL advertised listener ports range: `9093~9095` + +2. Use SSH to log in to every broker node. Create a configuration file `~/config/server.properties` with the following content for each broker node respectively. + + ```properties + # broker-node1 ~/config/server.properties + # 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses. + # 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section. + # 2.1 The pattern is ".abc.us-west1.gcp.3199745.tidbcloud.com:". + # 2.2 So the EXTERNAL can be "b1.abc.us-west1.gcp.3199745.tidbcloud.com:9093". Replace with "b" prefix plus "node.id" properties, and replace with a unique port (9093) in the port range of the EXTERNAL advertised listener. + process.roles=broker,controller + node.id=1 + controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092 + listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092 + inter.broker.listener.name=INTERNAL + advertised.listeners=INTERNAL://{broker-node1-ip}:9092,EXTERNAL://b1.abc.us-west1.gcp.3199745.tidbcloud.com:9093 + controller.listener.names=CONTROLLER + listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + log.dirs=./data + ``` + + ```properties + # broker-node2 ~/config/server.properties + # 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses. + # 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section. + # 2.1 The pattern is ".abc.us-west1.gcp.3199745.tidbcloud.com:". + # 2.2 So the EXTERNAL can be "b2.abc.us-west1.gcp.3199745.tidbcloud.com:9094". Replace with "b" prefix plus "node.id" properties, and replace with a unique port (9094) in the port range of the EXTERNAL advertised listener. + process.roles=broker,controller + node.id=2 + controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092 + listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092 + inter.broker.listener.name=INTERNAL + advertised.listeners=INTERNAL://{broker-node2-ip}:9092,EXTERNAL://b2.abc.us-west1.gcp.3199745.tidbcloud.com:9094 + controller.listener.names=CONTROLLER + listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + log.dirs=./data + ``` + + ```properties + # broker-node3 ~/config/server.properties + # 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses. + # 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section. + # 2.1 The pattern is ".abc.us-west1.gcp.3199745.tidbcloud.com:". + # 2.2 So the EXTERNAL can be "b3.abc.us-west1.gcp.3199745.tidbcloud.com:9095". Replace with "b" prefix plus "node.id" properties, and replace with a unique port (9095) in the port range of the EXTERNAL advertised listener. + process.roles=broker,controller + node.id=3 + controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092 + listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092 + inter.broker.listener.name=INTERNAL + advertised.listeners=INTERNAL://{broker-node3-ip}:9092,EXTERNAL://b3.abc.us-west1.gcp.3199745.tidbcloud.com:9095 + controller.listener.names=CONTROLLER + listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + log.dirs=./data + ``` + +3. Create a script, and then execute it to start the Kafka broker in each broker node. + + ```shell + #!/bin/bash + + # Get the directory of the current script + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + # Set JAVA_HOME to the Java installation within the script directory + export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" + # Define the vars + KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" + KAFKA_STORAGE_CMD=$KAFKA_DIR/kafka-storage.sh + KAFKA_START_CMD=$KAFKA_DIR/kafka-server-start.sh + KAFKA_DATA_DIR=$SCRIPT_DIR/data + KAFKA_LOG_DIR=$SCRIPT_DIR/log + KAFKA_CONFIG_DIR=$SCRIPT_DIR/config + + # Cleanup step, which makes it easy for multiple experiments + # Find all Kafka process IDs + KAFKA_PIDS=$(ps aux | grep 'kafka.Kafka' | grep -v grep | awk '{print $2}') + if [ -z "$KAFKA_PIDS" ]; then + echo "No Kafka processes are running." + else + # Kill each Kafka process + echo "Killing Kafka processes with PIDs: $KAFKA_PIDS" + for PID in $KAFKA_PIDS; do + kill -9 $PID + echo "Killed Kafka process with PID: $PID" + done + echo "All Kafka processes have been killed." + fi + + rm -rf $KAFKA_DATA_DIR + mkdir -p $KAFKA_DATA_DIR + rm -rf $KAFKA_LOG_DIR + mkdir -p $KAFKA_LOG_DIR + + # Magic id: BRl69zcmTFmiPaoaANybiw. You can use your own magic ID. + $KAFKA_STORAGE_CMD format -t "BRl69zcmTFmiPaoaANybiw" -c "$KAFKA_CONFIG_DIR/server.properties" > $KAFKA_LOG_DIR/server_format.log + LOG_DIR=$KAFKA_LOG_DIR nohup $KAFKA_START_CMD "$KAFKA_CONFIG_DIR/server.properties" & + ``` + +**5. Test the Kafka cluster in the bastion node** + +1. Test the Kafka bootstrap. + + ```shell + export JAVA_HOME=~/jdk-22.0.2 + + # Bootstrap from INTERNAL listener + ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:9092 | grep 9092 + # Expected output (the actual order might be different) + {broker-node1-ip}:9092 (id: 1 rack: null) -> ( + {broker-node2-ip}:9092 (id: 2 rack: null) -> ( + {broker-node3-ip}:9092 (id: 3 rack: null) -> ( + + # Bootstrap from EXTERNAL listener + ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:39092 + # Expected output for the last 3 lines (the actual order might be different) + # The difference in the output from "bootstrap from INTERNAL listener" is that exceptions or errors might occur because advertised listeners cannot be resolved in Kafka VPC. + # We will make them resolvable in TiDB Cloud side and make it route to the right broker when you create a changefeed connect to this Kafka cluster by Private Service Connect. + b1.abc.us-west1.gcp.3199745.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException + b2.abc.us-west1.gcp.3199745.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException + b3.abc.us-west1.gcp.3199745.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException + ``` + +2. Create a producer script `produce.sh` in the bastion node. + + ```shell + #!/bin/bash + BROKER_LIST=$1 # "{broker_address1},{broker_address2}..." + + # Get the directory of the current script + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + # Set JAVA_HOME to the Java installation within the script directory + export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" + # Define the Kafka directory + KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" + TOPIC="test-topic" + + # Create a topic if it does not exist + create_topic() { + echo "Creating topic if it does not exist..." + $KAFKA_DIR/kafka-topics.sh --create --topic $TOPIC --bootstrap-server $BROKER_LIST --if-not-exists --partitions 3 --replication-factor 3 + } + + # Produce messages to the topic + produce_messages() { + echo "Producing messages to the topic..." + for ((chrono=1; chrono <= 10; chrono++)); do + message="Test message "$chrono + echo "Create "$message + echo $message | $KAFKA_DIR/kafka-console-producer.sh --broker-list $BROKER_LIST --topic $TOPIC + done + } + create_topic + produce_messages + ``` + +3. Create a consumer script `consume.sh` in the bastion node. + + ```shell + #!/bin/bash + + BROKER_LIST=$1 # "{broker_address1},{broker_address2}..." + + # Get the directory of the current script + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + # Set JAVA_HOME to the Java installation within the script directory + export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" + # Define the Kafka directory + KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" + TOPIC="test-topic" + CONSUMER_GROUP="test-group" + # Consume messages from the topic + consume_messages() { + echo "Consuming messages from the topic..." + $KAFKA_DIR/kafka-console-consumer.sh --bootstrap-server $BROKER_LIST --topic $TOPIC --from-beginning --timeout-ms 5000 --consumer-property group.id=$CONSUMER_GROUP + } + consume_messages + ``` + +4. Execute `produce.sh` and `consume.sh` to verify that the Kafka cluster is running. These scripts will also be reused for later network connection testing. The script will create a topic with `--partitions 3 --replication-factor 3`. Ensure that all three brokers contain data. Ensure that the scripts will connect to all three brokers to guarantee that network connection will be tested. + + ```shell + # Test write message. + ./produce.sh {one_of_broker_ip}:9092 + ``` + + ```text + # Expected output + Creating topic if it does not exist... + + Producing messages to the topic... + Create Test message 1 + >>Create Test message 2 + >>Create Test message 3 + >>Create Test message 4 + >>Create Test message 5 + >>Create Test message 6 + >>Create Test message 7 + >>Create Test message 8 + >>Create Test message 9 + >>Create Test message 10 + ``` + + ```shell + # Test read message + ./consume.sh {one_of_broker_ip}:9092 + ``` + + ```text + # Expected example output (the actual message order might be different) + Consuming messages from the topic... + Test message 3 + Test message 4 + Test message 5 + Test message 9 + Test message 10 + Test message 6 + Test message 8 + Test message 1 + Test message 2 + Test message 7 + [2024-11-01 08:54:27,547] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) + org.apache.kafka.common.errors.TimeoutException + Processed a total of 10 messages + ``` + +#### Reconfigure a running Kafka cluster + +Ensure that your Kafka cluster is deployed in the same region as the TiDB cluster. It is recommended that the zones are also in the same region to reduce cross-zone traffic. + +**1. Configure the EXTERNAL listener for brokers** + +The following configuration applies to a Kafka KRaft cluster. The ZK mode configuration is similar. + +1. Plan configuration changes. + + 1. Configure an EXTERNAL **listener** for every broker for external access from TiDB Cloud. Select a unique port as the EXTERNAL port, for example, `39092`. + 2. Configure an EXTERNAL **advertised listener** based on **Kafka Advertised Listener Pattern** you get from TiDB Cloud for every broker node to help TiDB Cloud differentiate between different brokers. Different EXTERNAL advertised listeners help Kafka clients from TiDB Cloud side route requests to the right broker. + - `` differentiates brokers from Kafka Private Service Connect access points. Plan a port range for EXTERNAL advertised listeners of all brokers, for example, `range from 9093`. These ports do not have to be actual ports listened to by brokers. They are ports listened to by the load balancer for Private Service Connect that will forward requests to different brokers. + - It is recommended to configure different broker IDs for different brokers to make it easy for troubleshooting. + +2. Use SSH to log in to each broker node. Modify the configuration file of each broker with the following content: + + ```properties + # Add EXTERNAL listener + listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 + + # Add EXTERNAL advertised listeners based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section + # 1. The pattern is ".abc.us-west1.gcp.3199745.tidbcloud.com:". + # 2. So the EXTERNAL can be "bx.abc.us-west1.gcp.3199745.tidbcloud.com:xxxx". Replace with "b" prefix plus "node.id" properties, and replace with a unique port in the port range of the EXTERNAL advertised listener. + # For example + advertised.listeners=...,EXTERNAL://b1.abc.us-west1.gcp.3199745.tidbcloud.com:9093 + + # Configure EXTERNAL map + listener.security.protocol.map=...,EXTERNAL:PLAINTEXT + ``` + +3. After you reconfigure all the brokers, restart your Kafka brokers one by one. + +**2. Test EXTERNAL listener settings in your internal network** + +You can download Kafka and OpenJDK in your Kafka client node. + +```shell +# Download Kafka and OpenJDK, and then extract the files. You can choose the binary version based on your preference. +wget https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz +tar -zxf kafka_2.13-3.7.1.tgz +wget https://download.java.net/java/GA/jdk22.0.2/c9ecb94cd31b495da20a27d4581645e8/9/GPL/openjdk-22.0.2_linux-x64_bin.tar.gz +tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz +``` + +Execute the following script to test if the bootstrap works as expected. + +```shell +export JAVA_HOME=~/jdk-22.0.2 + +# Bootstrap from the EXTERNAL listener +./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:39092 + +# Expected output for the last 3 lines (the actual order might be different) +# There will be some exceptions or errors because advertised listeners cannot be resolved in your Kafka network. +# We will make them resolvable in TiDB Cloud side and make it route to the right broker when you create a changefeed connect to this Kafka cluster by Private Service Connect. +b1.abc.us-west1.gcp.3199745.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException +b2.abc.us-west1.gcp.3199745.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException +b3.abc.us-west1.gcp.3199745.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException +``` + +### Step 2. Expose the Kafka cluster as Private Service Connect + +1. Go to the [Network endpoint group](https://console.cloud.google.com/compute/networkendpointgroups/list) page. Create a network endpoint group as follows: + + - **Name**: `kafka-neg` + - **Network endpoint group type**: `Port Mapping NEG(Regional)` + - **Region**: `us-west1` + - **Network**: `kafka-vpc` + - **Subnet**: `brokers-subnet` + +2. Go to the detail page of the network endpoint group, and add the network endpoints to configure the port mapping to broker nodes. + + 1. Network endpoint 1 + - **Instance**: `broker-node1` + - **VM Port**: `39092` + - **Client Port**: `9093` + 2. Network endpoint 2 + - **Instance**: `broker-node2` + - **VM Port**: `39092` + - **Client Port**: `9094` + 3. Network endpoint 3 + - **Instance**: `broker-node3` + - **VM Port**: `39092` + - **Client Port**: `9095` + +3. Go to the [Load balancing](https://console.cloud.google.com/net-services/loadbalancing/list/loadBalancers) page. Create a load balancer as follows: + + - **Type of load balancer**: `Network Load Balancer` + - **Proxy or Passthrough**: `Passthrough` + - **Public facing or internal**: `Internal` + - **Load Balancer name**: `kafka-lb` + - **Region**: `us-west1` + - **Network**: `kafka-vpc` + - Backend configuration + - **Backend type**: `Port mapping network endpoint group` + - **Protocol**: `TCP` + - **Port mapping network endpoint group**: `kafka-neg` + - Frontend configuration + - **Subnetwork**: `brokers-subnet` + - **Ports**: `All` + +4. Go to [**Private Service Connect** > **PUBLISH SERVICE**](https://console.cloud.google.com/net-services/psc/list/producers). + + - **Load Balancer Type**: `Internal passthrough Network Load Balancer` + - **Internal load balancer**: `kafka-lb` + - **Service name**: `kafka-psc` + - **Subnets**: `RESERVE NEW SUBNET` + - **Name**: `psc-subnet` + - **VPC Network**: `kafka-vpc` + - **Region**: `us-west1` + - **IPv4 range**: `10.128.0.0/18` + - **Accepted projects**: the Google Cloud project of TiDB Cloud you got in [Prerequisites](#prerequisites), for example, `tidbcloud-prod-000`. + +5. Navigate to the detail page of the `kafka-psc`. Note down the **Service attachment**, for example, `projects/tidbcloud-dp-stg-000/regions/us-west1/serviceAttachments/kafka-psc`. You will use it in TiDB Cloud to connect to this PSC. + +6. Go to the detail page of the VPC network `kafka-vpc`. Add a firewall rule to allow PSC traffic to all brokers. + + - **Name**: `allow-psc-traffic` + - **Direction of traffic**: `Ingress` + - **Action on match**: `Allow` + - **Targets**: `All instances in the network` + - **Source filter**: `IPv4 ranges` + - **Source IPv4 ranges**: `10.128.0.0/18`. The range of psc-subnet. + - **Protocols and ports**: Allow all + +### Step 3. Connect from TiDB Cloud + +1. Go back to the [TiDB Cloud console](https://tidbcloud.com) to create a changefeed for the cluster to connect to the Kafka cluster by **Private Service Connect**. For more information, see [Sink to Apache Kafka](/tidb-cloud/changefeed-sink-to-apache-kafka.md). + +2. When you proceed to **Configure the changefeed target > Connectivity Method > Private Service Connect**, fill in the following fields with corresponding values and other fields as needed. + + - **Kafka Advertised Listener Pattern**: `abc`. It is the same as the unique random string you use to generate **Kafka Advertised Listener Pattern** in [Prerequisites](#prerequisites). + - **Service Attachment**: the Kafka service attachment of PSC, for example, `projects/tidbcloud-dp-stg-000/regions/us-west1/serviceAttachments/kafka-psc`. + - **Bootstrap Ports**: `9092,9093,9094` + +3. Proceed with the steps in [Sink to Apache Kafka](/tidb-cloud/changefeed-sink-to-apache-kafka.md). + +## Set up self-hosted Kafka Private Service Connect by Kafka-proxy + +Expose each Kafka broker to TiDB Cloud VPC with a unique port by using the Kafka-proxy dynamic port mapping mechanism. The following diagram shows how it works. + +![Connect to Google Cloud self-hosted Kafka Private Service Connect by Kafka proxy](/media/tidb-cloud/changefeed/connect-to-google-cloud-self-hosted-kafka-private-service-connect-by-kafka-proxy.jpeg) + +### Step 1. Set up Kafka-proxy + +Assume that you already have a Kafka cluster running in the same region as the TiDB cluster. You can connect to the Kafka cluster from your VPC network. The Kafka cluster can be self-hosted or provided by third-party providers, such as Confluent. + +1. Go to the [Instance groups](https://console.cloud.google.com/compute/instanceGroups/list) page, and create an instance group for Kafka-proxy. + + - **Name**: `kafka-proxy-ig` + - Instance template: + - **Name**: `kafka-proxy-tpl` + - **Location**: `Regional` + - **Region**: `us-west1` + - **Machine type**: `e2-medium`. You can choose your own machine type based on your workload. + - **Network**: your VPC network that can connect to the Kafka cluster. + - **Subnetwork**: your subnet that can connect to the Kafka cluster. + - **External IPv4 address**: `Ephemeral`. Enable Internet access to make it easy to configure Kafka-proxy. You can select **None** in your production environment and log in to the node in your way. + - **Location**: `Single zone` + - **Region**: `us-west1` + - **Zone**: choose one of your broker's zones. + - **Autoscaling mode**: `Off` + - **Minimum number of instances**: `1` + - **Maximum number of instances**: `1`. Kafka-proxy does not support the cluster mode, so only one instance can be deployed. Each Kafka-proxy randomly maps local ports to the ports of the broker, resulting in different mappings across proxies. Deploying multiple Kafka-proxies behind a load balancer can cause issues. If a Kafka client connects to one proxy and then accesses a broker through another, the request might be routed to the wrong broker. + +2. Go to the detail page of the node in kafka-proxy-ig. Click **SSH** to log in to the node. Download the binaries: + + ```shell + # You can choose another version + wget https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.11/kafka-proxy-v0.3.11-linux-amd64.tar.gz + tar -zxf kafka-proxy-v0.3.11-linux-amd64.tar.gz + ``` + +3. Run Kafka-proxy and connect to Kafka brokers. + + ```shell + # There are three kinds of parameters that need to feed to the Kafka-proxy + # 1. --bootstrap-server-mapping defines the bootstrap mapping. Suggest that you configure three mappings, one for each zone for resilience. + # a) Kafka broker address; + # b) Local address for the broker in Kafka-proxy; + # c) Advertised listener for the broker if Kafka clients bootstrap from Kafka-proxy + # 2. --dynamic-sequential-min-port defines the start port of the random mapping for other brokers + # 3. --dynamic-advertised-listener defines advertised listener address for other brokers based on the pattern obtained from the "Prerequisites" section + # a) The pattern: .abc.us-west1.gcp.3199745.tidbcloud.com: + # b) Make sure to replace with a fixed lowercase string, for example, "brokers". You can use your own string. This step will help TiDB Cloud route requests properly. + # c) Remove ":" + # d) The advertised listener address would be: brokers.abc.us-west1.gcp.3199745.tidbcloud.com + ./kafka-proxy server \ + --bootstrap-server-mapping "{address_of_broker1},0.0.0.0:9092,b1.abc.us-west1.gcp.3199745.tidbcloud.com:9092" \ + --bootstrap-server-mapping "{address_of_broker2},0.0.0.0:9093,b2.abc.us-west1.gcp.3199745.tidbcloud.com:9093" \ + --bootstrap-server-mapping "{address_of_broker3},0.0.0.0:9094,b3.abc.us-west1.gcp.3199745.tidbcloud.com:9094" \ + --dynamic-sequential-min-port=9095 \ + --dynamic-advertised-listener=brokers.abc.us-west1.gcp.3199745.tidbcloud.com > ./kafka_proxy.log 2>&1 & + ``` + +4. Test bootstrap in the Kafka-proxy node. + + ```shell + # Download Kafka and OpenJDK, and then extract the files. You can choose the binary version based on your preference. + wget https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz + tar -zxf kafka_2.13-3.7.1.tgz + wget https://download.java.net/java/GA/jdk22.0.2/c9ecb94cd31b495da20a27d4581645e8/9/GPL/openjdk-22.0.2_linux-x64_bin.tar.gz + tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz + + export JAVA_HOME=~/jdk-22.0.2 + + ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server 0.0.0.0:9092 + # Expected output of the last few lines (the actual order might be different) + # There might be exceptions or errors because advertised listeners cannot be resolved in your network. + # We will make them resolvable in TiDB Cloud side and make it route to the right broker when you create a changefeed connect to this Kafka cluster by Private Service Connect. + b1.abc.us-west1.gcp.3199745.tidbcloud.com:9092 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException + b2.abc.us-west1.gcp.3199745.tidbcloud.com:9093 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException + b3.abc.us-west1.gcp.3199745.tidbcloud.com:9094 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException + brokers.abc.us-west1.gcp.3199745.tidbcloud.com:9095 (id: 4 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException + brokers.abc.us-west1.gcp.3199745.tidbcloud.com:9096 (id: 5 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException + ... + ``` + +### Step 2. Expose Kafka-proxy as Private Service Connect Service + +1. Go to the [Load balancing](https://console.cloud.google.com/net-services/loadbalancing/list/loadBalancers) page, and create a load balancer. + + - **Type of load balancer**: `Network Load Balancer` + - **Proxy or Passthrough**: `Passthrough` + - **Public facing or internal**: `Internal` + - **Load Balancer name**: `kafka-proxy-lb` + - **Region**: `us-west1` + - **Network**: your network + - Backend configuration + - **Backend type**: `Instance group` + - **Protocol**: `TCP` + - **Instance group**: `kafka-proxy-ig` + - Frontend configuration + - **Subnetwork**: your subnet + - **Ports**: `All` + - Heath check: + - **Name**: `kafka-proxy-hc` + - **Scope**: `Regional` + - **Protocol**: `TCP` + - **Port**: `9092`. You can select one of the bootstrap ports in Kafka-proxy. + +2. Go to [**Private Service Connect** > **PUBLISH SERVICE**](https://console.cloud.google.com/net-services/psc/list/producers). + + - **Load Balancer Type**: `Internal passthrough Network Load Balancer` + - **Internal load balancer**: `kafka-proxy-lb` + - **Service name**: `kafka-proxy-psc` + - **Subnets**: `RESERVE NEW SUBNET` + - **Name**: `proxy-psc-subnet` + - **VPC Network**: your network + - **Region**: `us-west1` + - **IPv4 range**: set the CIDR based on your network planning + - **Accepted projects**: the Google Cloud project of TiDB Cloud you get in [Prerequisites](#prerequisites), for example, `tidbcloud-prod-000`. + +3. Navigate to the detail page of the **kafka-proxy-psc**. Note down the `Service attachment`, for example, `projects/tidbcloud-dp-stg-000/regions/us-west1/serviceAttachments/kafka-proxy-psc`, which will be used in TiDB Cloud to connect to this PSC. + +4. Go to the detail page of your VPC network. Add a firewall rule to allow the PSC traffic for all brokers. + + - **Name**: `allow-proxy-psc-traffic` + - **Direction of traffic**: `Ingress` + - **Action on match**: `Allow` + - **Targets**: All instances in the network + - **Source filter**: `IPv4 ranges` + - **Source IPv4 ranges**: the CIDR of proxy-psc-subnet + - **Protocols and ports**: Allow all + +### Step 3. Connect from TiDB Cloud + +1. Return to the [TiDB Cloud console](https://tidbcloud.com) and create a changefeed for the cluster to connect to the Kafka cluster by **Private Service Connect**. For more information, see [Sink to Apache Kafka](/tidb-cloud/changefeed-sink-to-apache-kafka.md). + +2. After you proceed to the **Configure the changefeed target** > **Connectivity Method** > **Private Service Connect**, fill in the following fields with corresponding values and other fields as needed. + + - **Kafka Advertised Listener Pattern**: `abc`. The same as the unique random string you use to generate **Kafka Advertised Listener Pattern** in [Prerequisites](#prerequisites). + - **Service Attachment**: the kafka-proxy service attachment of PSC, for example, `projects/tidbcloud-dp-stg-000/regions/us-west1/serviceAttachments/kafka-proxy-psc`. + - **Bootstrap Ports**: `9092,9093,9094` + +3. Continue to follow the guideline in [Sink to Apache Kafka](/tidb-cloud/changefeed-sink-to-apache-kafka.md). + +## FAQ + +### How to connect to the same Kafka Private Service Connect service from two different TiDB Cloud projects? + +If you have already followed the steps in this document and successfully set up the connection from the first project, and you want to set up a second connection from the second project, you can connect to the same Kafka Private Service Connect service from two different TiDB Cloud projects as follows: + +- If you set up Kafka PSC by PSC port mapping, do the following: + + 1. Follow instructions from the beginning of this document. When you proceed to [Step 1. Set up Kafka Cluster](#step-1-set-up-the-kafka-cluster), follow the [Reconfigure a running Kafka cluster](#reconfigure-a-running-kafka-cluster) section to create another group of EXTERNAL listeners and advertised listeners. You can name it as `EXTERNAL2`. Note that the port range of `EXTERNAL2` cannot overlap with the EXTERNAL. + + 2. After reconfiguring the brokers, add another group of Network endpoints to the Network endpoint group, which maps the ports range to the `EXTERNAL2` listener. + + 3. Configure the TiDB Cloud connection with the following input to create the new changefeed: + + - New Bootstrap ports + - New Kafka Advertised Listener Pattern + - The same Service Attachment + +- If you [set up self-hosted Kafka Private Service Connect by Kafka-proxy](#set-up-self-hosted-kafka-private-service-connect-by-kafka-proxy), create a new Kafka-proxy PSC from the beginning with a new Kafka Advertised Listener Pattern. diff --git a/tidb-cloud/tidb-cloud-billing-ticdc-rcu.md b/tidb-cloud/tidb-cloud-billing-ticdc-rcu.md index 27128bc02f828..adef15f9e404b 100644 --- a/tidb-cloud/tidb-cloud-billing-ticdc-rcu.md +++ b/tidb-cloud/tidb-cloud-billing-ticdc-rcu.md @@ -6,9 +6,11 @@ aliases: ['/tidbcloud/tidb-cloud-billing-tcu'] # Changefeed Billing +## RCU cost + TiDB Cloud measures the capacity of [changefeeds](/tidb-cloud/changefeed-overview.md) in TiCDC Replication Capacity Units (RCUs). When you [create a changefeed](/tidb-cloud/changefeed-overview.md#create-a-changefeed) for a cluster, you can select an appropriate specification. The higher the RCU, the better the replication performance. You will be charged for these TiCDC changefeed RCUs. -## Number of TiCDC RCUs +### Number of TiCDC RCUs The following table lists the specifications and corresponding replication performances for changefeeds: @@ -26,6 +28,12 @@ The following table lists the specifications and corresponding replication perfo > > The preceding performance data is for reference only and might vary in different scenarios. It is strongly recommended that you conduct a real workload test before using the changefeed feature in a production environment. For further assistance, contact [TiDB Cloud support](/tidb-cloud/tidb-cloud-support.md#get-support-for-a-cluster). -## Price +### Price To learn about the supported regions and the price of TiDB Cloud for each TiCDC RCU, see [Changefeed Cost](https://www.pingcap.com/tidb-cloud-pricing-details/#changefeed-cost). + +## Private Data Link cost + +If you choose the **Private Link** or **Private Service Connect** network connectivity method, additional **Private Data Link** costs will be incurred. These charges fall under the [Data Transfer Cost](https://www.pingcap.com/tidb-dedicated-pricing-details/#data-transfer-cost) category. + +The price of **Private Data Link** is **$0.01/GiB**, the same as **Data Processed** of [AWS Interface Endpoint pricing](https://aws.amazon.com/privatelink/pricing/#Interface_Endpoint_pricing) and **Consumer data processing** of [Google Cloud Private Service Connect pricing](https://cloud.google.com/vpc/pricing#psc-forwarding-rules).