diff --git a/integrations/destinations/apache-kafka.mdx b/integrations/destinations/apache-kafka.mdx
index 3ac85f31..c411712b 100644
--- a/integrations/destinations/apache-kafka.mdx
+++ b/integrations/destinations/apache-kafka.mdx
@@ -23,29 +23,31 @@ FORMAT data_format ENCODE data_encode [ (
```
-Names and unquoted identifiers are case-insensitive. Therefore, you must double-quote any of these fields for them to be case-sensitive. See also [Identifiers](/sql/identifiers).
+ Names and unquoted identifiers are case-insensitive. Therefore, you must
+ double-quote any of these fields for them to be case-sensitive. See also
+ [Identifiers](/sql/identifiers).
## Basic parameters
All `WITH` options are required unless explicitly mentioned as optional.
-| Parameter or clause | Description |
-| :-------------------------- | :------------- |
-| sink\_name | Name of the sink to be created. |
-| sink\_from | A clause that specifies the direct source from which data will be output. `sink_from` can be a materialized view or a table. Either this clause or a SELECT query must be specified. |
-| AS select\_query | A SELECT query that specifies the data to be output to the sink. Either this query or a FROM clause must be specified. See [SELECT](/sql/commands/sql-select) for the syntax and examples of the SELECT command. |
-| connector | Sink connector type must be `kafka` for Kafka sink. |
-| properties.bootstrap.server | Address of the Kafka broker. Format: `ip:port`. If there are multiple brokers, separate them with commas. |
-| topic | Address of the Kafka topic. One sink can only correspond to one topic. |
-| primary\_key | Conditional. The primary keys of the sink. Use `,` to delimit the primary key columns. This field is optional if creating a `PLAIN` sink, but required if creating a `DEBEZIUM` or `UPSERT` sink. |
+| Parameter or clause | Description |
+| :-------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| sink_name | Name of the sink to be created. |
+| sink_from | A clause that specifies the direct source from which data will be output. `sink_from` can be a materialized view or a table. Either this clause or a SELECT query must be specified. |
+| AS select_query | A SELECT query that specifies the data to be output to the sink. Either this query or a FROM clause must be specified. See [SELECT](/sql/commands/sql-select) for the syntax and examples of the SELECT command. |
+| connector | Sink connector type must be `kafka` for Kafka sink. |
+| properties.bootstrap.server | Address of the Kafka broker. Format: `ip:port`. If there are multiple brokers, separate them with commas. |
+| topic | Address of the Kafka topic. One sink can only correspond to one topic. |
+| primary_key | Conditional. The primary keys of the sink. Use `,` to delimit the primary key columns. This field is optional if creating a `PLAIN` sink, but required if creating a `DEBEZIUM` or `UPSERT` sink. |
## Additional Kafka parameters
When creating a Kafka sink in RisingWave, you can specify the following Kafka-specific parameters. To set the parameter, add the RisingWave equivalent of the Kafka parameter as a `WITH` option. For additional details on these parameters, see the [Configuration properties](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
-| Kafka parameter name | RisingWave parameter name | Type |
-| :----------------------- | :------------------------------ | :----- |
+| Kafka parameter name | RisingWave parameter name | Type |
+| :------------------------------------ | :----------------------------------------------- | :----- |
| allow.auto.create.topics | properties.allow.auto.create.topics | bool |
| batch.num.messages | properties.batch.num.messages | int |
| batch.size | properties.batch.size | int |
@@ -64,39 +66,38 @@ When creating a Kafka sink in RisingWave, you can specify the following Kafka-sp
| receive.message.max.bytes | properties.receive.message.max.bytes | int |
| ssl.endpoint.identification.algorithm | properties.ssl.endpoint.identification.algorithm | str |
-
Set `properties.ssl.endpoint.identification.algorithm` to `none` to bypass the verification of CA certificates and resolve SSL handshake failure. This parameter can be set to either `https` or `none`. By default, it is `https`.
Starting with version 2.0, the default value for `properties.message.timeout.ms` has changed from 5 seconds to **5 minutes**, aligning with the default setting in the [official Kafka library](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
-
## FORMAT and ENCODE options
-These options should be set in `FORMAT data_format ENCODE data_encode (key = 'value')`, instead of the `WITH` clause.
+ These options should be set in `FORMAT data_format ENCODE data_encode (key =
+ 'value')`, instead of the `WITH` clause.
-| Field | Notes |
-| :------------------------ | :-------------------------- |
-| data\_format | Data format. Allowed formats:
- `PLAIN`: Output data with insert operations.
- `DEBEZIUM`: Output change data capture (CDC) log in Debezium format.
- `UPSERT`: Output data as a changelog stream. `primary_key` must be specified in this case.
To learn about when to define the primary key if creating an UPSERT sink, see the [Overview](/delivery/overview). |
-| data\_encode | Data encode. Allowed encodes:- `JSON`: Supports `PLAIN JSON`, `UPSERT JSON` and `DEBEZIUM JSON` sinks.
- `AVRO`: Supports `UPSERT AVRO` and `PLAIN AVRO` sinks.
- `PROTOBUF`: Supports `PLAIN PROTOBUF` and `UPSERT PROTOBUF` sinks.
For `UPSERT PROTOBUF` sinks, you must specify `key encode text`, while it remains optional for other format/encode combinations. |
-| force\_append\_only | If true, forces the sink to be `PLAIN` (also known as append-only), even if it cannot be. |
-| timestamptz.handling.mode | Controls the timestamptz output format. This parameter specifically applies to append-only or upsert sinks using JSON encoding. - If omitted, the output format of timestamptz is `2023-11-11T18:30:09.453000Z` which includes the UTC suffix `Z`.
- When `utc_without_suffix` is specified, the format is changed to `2023-11-11 18:30:09.453000`.
|
+| Field | Notes |
+| :------------------------ | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| data_format | Data format. Allowed formats:- `PLAIN`: Output data with insert operations.
- `DEBEZIUM`: Output change data capture (CDC) log in Debezium format.
- `UPSERT`: Output data as a changelog stream. `primary_key` must be specified in this case.
To learn about when to define the primary key if creating an UPSERT sink, see the [Overview](/delivery/overview). |
+| data_encode | Data encode. Allowed encodes:- `JSON`: Supports `PLAIN JSON`, `UPSERT JSON` and `DEBEZIUM JSON` sinks.
- `AVRO`: Supports `UPSERT AVRO` and `PLAIN AVRO` sinks.
- `PROTOBUF`: Supports `PLAIN PROTOBUF` and `UPSERT PROTOBUF` sinks.
For `UPSERT PROTOBUF` sinks, you must specify `key encode text`, while it remains optional for other format/encode combinations. |
+| force_append_only | If true, forces the sink to be `PLAIN` (also known as append-only), even if it cannot be. |
+| timestamptz.handling.mode | Controls the timestamptz output format. This parameter specifically applies to append-only or upsert sinks using JSON encoding. - If omitted, the output format of timestamptz is `2023-11-11T18:30:09.453000Z` which includes the UTC suffix `Z`.
- When `utc_without_suffix` is specified, the format is changed to `2023-11-11 18:30:09.453000`.
|
| schemas.enable | Only configurable for upsert JSON sinks. By default, this value is false for upsert JSON sinks and true for debezium JSON sinks. If true, RisingWave will sink the data with the schema to the Kafka sink. This is not referring to a schema registry containing a JSON schema, but rather schema formats defined using [Kafka Connect](https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#json-schemas). |
-| key\_encode | Optional. When specified, the key encode can only be TEXT, and the primary key should be one and only one of the following types: `varchar`, `bool`, `smallint`, `int`, and `bigint`; When absent, both key and value will use the same setting of `ENCODE data_encode ( ... )`. |
+| key_encode | Optional. When specified, the key encode can only be TEXT or BYTES. If set to TEXT, the primary key should be one and only one of the following types: `varchar`, `bool`, `smallint`, `int`, and `bigint`; If set to BYTES, the primary key should be one and only one of type `bytea`; When absent, both key and value will use the same setting of `ENCODE data_encode ( ... )`. |
### Avro specific parameters
When creating an Avro sink, the following options can be used following `FORMAT UPSERT ENCODE AVRO` or `FORMAT PLAIN ENCODE AVRO`.
-| Field | Notes |
-| :---------------------------- | :----------------------------------------------------------------------------------------------------------------------------- |
-| schema.registry | Required. The address of the schema registry. |
-| schema.registry.username | Optional. The user name used to access the schema registry. |
-| schema.registry.password | Optional. The password associated with the user name. |
-| schema.registry.name.strategy | Optional. Accepted options include topic\_name\_strategy (default), record\_name\_strategy, and topic\_record\_name\_strategy. |
-| key.message | Required if schema.registry.name.strategy is set to record\_name\_strategy or topic\_record\_name\_strategy. |
-| message | Required if schema.registry.name.strategy is set to record\_name\_strategy or topic\_record\_name\_strategy. |
+| Field | Notes |
+| :---------------------------- | :---------------------------------------------------------------------------------------------------------------------- |
+| schema.registry | Required. The address of the schema registry. |
+| schema.registry.username | Optional. The user name used to access the schema registry. |
+| schema.registry.password | Optional. The password associated with the user name. |
+| schema.registry.name.strategy | Optional. Accepted options include topic_name_strategy (default), record_name_strategy, and topic_record_name_strategy. |
+| key.message | Required if schema.registry.name.strategy is set to record_name_strategy or topic_record_name_strategy. |
+| message | Required if schema.registry.name.strategy is set to record_name_strategy or topic_record_name_strategy. |
Syntax:
@@ -118,17 +119,18 @@ For data type mapping, the serial type is supported. We map the serial type to t
When creating an append-only Protobuf sink, the following options can be used following `FORMAT PLAIN ENCODE PROTOBUF` or `FORMAT UPSERT ENCODE PROTOBUF`.
-| Field | Notes |
-| :---------------------------- | :----------------------- |
-| message | Required. Package qualified message name of the main Message in the schema definition. |
+| Field | Notes |
+| :---------------------------- | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| message | Required. Package qualified message name of the main Message in the schema definition. |
| schema.location | Required if schema.registry is not specified. Only one of schema.location or schema.registry can be defined. The schema location. This can be in either `file://`, `http://`, `https://` format. |
-| schema.registry | Required if schema.location is not specified. Only one of schema.location or schema.registry can be defined. The address of the schema registry. |
-| schema.registry.username | Optional. The user name used to access the schema registry. |
-| schema.registry.password | Optional. The password associated with the user name. |
-| schema.registry.name.strategy | Optional. Accepted options include topic\_name\_strategy (default), record\_name\_strategy, and topic\_record\_name\_strategy. |
+| schema.registry | Required if schema.location is not specified. Only one of schema.location or schema.registry can be defined. The address of the schema registry. |
+| schema.registry.username | Optional. The user name used to access the schema registry. |
+| schema.registry.password | Optional. The password associated with the user name. |
+| schema.registry.name.strategy | Optional. Accepted options include topic_name_strategy (default), record_name_strategy, and topic_record_name_strategy. |
-The `file://` format is not recommended for production use. If it is used, it needs to be available for both meta and compute nodes.
+ The `file://` format is not recommended for production use. If it is used, it
+ needs to be available for both meta and compute nodes.
Syntax:
@@ -157,8 +159,8 @@ For data type mapping, the serial type is supported. We map the serial type to t
The `jsonb.handling.mode` determines how `jsonb` data types are encoded. This parameter has two possible values:
-* `string`: Encodes the `jsonb` type to a string. For example, if you set this parameter, `{"k": 2}` will be converted to `"{\"k\": 2}"`.
-* `dynamic`: Dynamically encodes a `jsonb` type value to a JSON type value. For example, if you set this parameter, `{"k": 2}` will be converted to `{"k": 2}`. Here the `jsonb` value is encoded to a JSON object type value.
+- `string`: Encodes the `jsonb` type to a string. For example, if you set this parameter, `{"k": 2}` will be converted to `"{\"k\": 2}"`.
+- `dynamic`: Dynamically encodes a `jsonb` type value to a JSON type value. For example, if you set this parameter, `{"k": 2}` will be converted to `{"k": 2}`. Here the `jsonb` value is encoded to a JSON object type value.
You can set this parameter in the `WITH` clause of `ENCODE JSON`.
@@ -202,7 +204,7 @@ The schema of `taxi_trips` is like this:
"id": VARCHAR,
"distance": DOUBLE PRECISION,
"duration": DOUBLE PRECISION,
- "fare": DOUBLE PRECISION
+ "fare": DOUBLE PRECISION,
}
```
@@ -241,7 +243,6 @@ To create a Kafka sink with a PrivateLink connection, in the WITH section of you
| :------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| privatelink.targets | The PrivateLink targets that correspond to the Kafka brokers. The targets should be in JSON format. Note that each target listed corresponds to each broker specified in the properties.bootstrap.server field. If the order is incorrect, there will be connectivity issues. |
| privatelink.endpoint | The DNS name of the VPC endpoint. If you're using RisingWave Cloud, you can find the auto-generated endpoint after you created a connection. See details in [Create a VPC connection](/cloud/create-a-connection/#whats-next). |
-| connection.name | The name of the connection, which comes from the connection created using the [CREATE CONNECTION](/sql/commands/sql-create-connection) statement. Omit this parameter if you have provisioned a VPC endpoint using privatelink.endpoint (recommended). |
Here is an example of creating a Kafka sink using a PrivateLink connection. Notice that `{"port": 8001}` corresponds to the broker `ip1:9092`, and `{"port": 8002}` corresponds to the broker `ip2:9092`.
@@ -260,6 +261,7 @@ FORMAT PLAIN ENCODE JSON (
```
## TLS/SSL encryption and SASL authentication
+
RisingWave can sink data to Kafka that is encrypted with [Transport Layer Security (TLS)](https://en.wikipedia.org/wiki/Transport%5FLayer%5FSecurity) and/or authenticated with SASL.
Secure Sockets Layer (SSL) was the predecessor of Transport Layer Security (TLS), and has been deprecated since June 2015\. For historical reasons, `SSL` is used in configuration and code instead of `TLS`.
@@ -268,10 +270,10 @@ Simple Authentication and Security Layer (SASL) is a framework for authenticatio
RisingWave supports these SASL authentication mechanisms:
-* `SASL/PLAIN`
-* `SASL/SCRAM`
-* `SASL/GSSAPI`
-* `SASL/OAUTHBEARER`
+- `SASL/PLAIN`
+- `SASL/SCRAM`
+- `SASL/GSSAPI`
+- `SASL/OAUTHBEARER`
SSL encryption can be used concurrently with SASL authentication mechanisms.
@@ -292,7 +294,10 @@ To sink data encrypted with SSL without SASL authentication, specify these param
| properties.ssl.key.password | |
-For the definitions of the parameters, see the [librdkafka properties list](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Note that the parameters in the list assumes all parameters start with `properties.` and therefore do not include this prefix.
+ For the definitions of the parameters, see the [librdkafka properties
+ list](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
+ Note that the parameters in the list assumes all parameters start with
+ `properties.` and therefore do not include this prefix.
Here is an example of creating a sink encrypted with SSL without using SASL authentication.
@@ -311,6 +316,7 @@ WITH (
)
FORMAT PLAIN ENCODE JSON;
```
+
| Parameter | Notes |
@@ -321,15 +327,18 @@ FORMAT PLAIN ENCODE JSON;
| properties.sasl.password | |
-For the definitions of the parameters, see the [librdkafka properties list](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Note that the parameters in the list assumes all parameters start with `properties.` and therefore do not include this prefix.
+ For the definitions of the parameters, see the [librdkafka properties
+ list](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
+ Note that the parameters in the list assumes all parameters start with
+ `properties.` and therefore do not include this prefix.
For SASL/PLAIN with SSL, you need to include these SSL parameters:
-* `properties.ssl.ca.location`
-* `properties.ssl.certificate.location`
-* `properties.ssl.key.location`
-* `properties.ssl.key.password`
+- `properties.ssl.ca.location`
+- `properties.ssl.certificate.location`
+- `properties.ssl.key.location`
+- `properties.ssl.key.password`
Here is an example of creating a sink authenticated with SASL/PLAIN without SSL encryption.
@@ -366,6 +375,7 @@ WITH (
)
FORMAT PLAIN ENCODE JSON;
```
+
| Parameter | Notes |
@@ -376,15 +386,18 @@ FORMAT PLAIN ENCODE JSON;
| properties.sasl.password | |
-For the definitions of the parameters, see the [librdkafka properties list](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Note that the parameters in the list assumes all parameters start with `properties.` and therefore do not include this prefix.
+ For the definitions of the parameters, see the [librdkafka properties
+ list](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
+ Note that the parameters in the list assumes all parameters start with
+ `properties.` and therefore do not include this prefix.
For SASL/SCRAM with SSL, you also need to include these SSL parameters:
-* properties.ssl.ca.location
-* properties.ssl.certificate.location
-* properties.ssl.key.location
-* properties.ssl.key.password
+- properties.ssl.ca.location
+- properties.ssl.certificate.location
+- properties.ssl.key.location
+- properties.ssl.key.password
Here is an example of creating a sink authenticated with SASL/SCRAM without SSL encryption.
@@ -402,6 +415,7 @@ WITH (
FORMAT PLAIN ENCODE JSON;
```
+
| Parameter | Notes |
@@ -415,7 +429,10 @@ FORMAT PLAIN ENCODE JSON;
| properties.sasl.kerberos.min.time.before.relogin | |
-For the definitions of the parameters, see the [librdkafka properties list](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Note that the parameters in the list assumes all parameters start with `properties.` and therefore do not include this prefix.
+ For the definitions of the parameters, see the [librdkafka properties
+ list](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
+ Note that the parameters in the list assumes all parameters start with
+ `properties.` and therefore do not include this prefix.
Here is an example of creating a sink authenticated with SASL/GSSAPI without SSL encryption.
@@ -436,29 +453,37 @@ WITH (
properties.sasl.kerberos.min.time.before.relogin='10000'
) FORMAT PLAIN ENCODE JSON;
```
+
The implementation of SASL/OAUTHBEARER in RisingWave validates only [unsecured client side tokens](https://docs.confluent.io/platform/current/kafka/authentication%5Fsasl/authentication%5Fsasl%5Foauth.html#unsecured-client-side-token-creation-options-for-sasl-oauthbearer), and does not support OpenID Connect (OIDC) authentication. Therefore, it should not be used in production environments.
+
-| Parameter | Notes |
-| :--------------------------------- | :--------------------------------------------------------------------------------------------------------- |
-| properties.security.protocol | For SASL/OAUTHBEARER without SSL, set to SASL\_PLAINTEXT. For SASL/OAUTHBEARER with SSL, set to SASL\_SSL. |
-| properties.sasl.mechanism | Set to OAUTHBEARER. |
-| properties.sasl.oauthbearer.config | |
+| Parameter | Notes |
+| :--------------------------------- | :------------------------------------------------------------------------------------------------------- |
+| properties.security.protocol | For SASL/OAUTHBEARER without SSL, set to SASL_PLAINTEXT. For SASL/OAUTHBEARER with SSL, set to SASL_SSL. |
+| properties.sasl.mechanism | Set to OAUTHBEARER. |
+| properties.sasl.oauthbearer.config | |
-For the definitions of the parameters, see the [librdkafka properties list](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Note that the parameters in the list assumes all parameters start with `properties.` and therefore do not include this prefix. Also, due to the limitation of the SASL/OAUTHBEARER implementation, you only need to specify one OAUTHBEARER parameter: `properties.sasl.oauthbearer.config`. Other OAUTHBEARER parameters are not applicable.
+ For the definitions of the parameters, see the [librdkafka properties
+ list](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
+ Note that the parameters in the list assumes all parameters start with
+ `properties.` and therefore do not include this prefix. Also, due to the
+ limitation of the SASL/OAUTHBEARER implementation, you only need to specify
+ one OAUTHBEARER parameter: `properties.sasl.oauthbearer.config`. Other
+ OAUTHBEARER parameters are not applicable.
For SASL/OAUTHBEARER with SSL, you also need to include these SSL parameters:
-* `properties.ssl.ca.location`
-* `properties.ssl.certificate.location`
-* `properties.ssl.key.location`
-* `properties.ssl.key.password`
+- `properties.ssl.ca.location`
+- `properties.ssl.certificate.location`
+- `properties.ssl.key.location`
+- `properties.ssl.key.password`
This is an example of creating a sink authenticated with SASL/OAUTHBEARER without SSL encryption.
@@ -475,6 +500,7 @@ WITH (
) FORMAT PLAIN ENCODE JSON;
```
+