Parent document: Connectors
The Kafka connector supports the following functional points:
At Least Once
write in batch scenariosExactly Once
read in streaming scenarios
The Kafka connector internally uses org.apache.kafka:kafka-clients
(version 1.0.1) for data writing. So when using kafka to write the connector, you need to pay attention that the target kafka cluster should be able to use this version of kafka-clients.
<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-connector-kafka</artifactId>
<version>${revision}</version>
</dependency>
Param name | Required | Default value | Description |
---|---|---|---|
class | Yes | Reader class name for kafka connector,com.bytedance.bitsail.connector.legacy.kafka.source.KafkaSourceFunctionDAGBuilder |
|
child_connector_type | Yes | Only could be kafka |
|
reader_parallelism_num | No | Reader parallelism num |
The underlying Kafka connector uses FlinkKafkaConsumer
for reading. The properties or kafka information of the initialized FlinkKafkaConsumer are passed in through options job.reader.connector
. You can specify them as follows:
{
"job": {
"reader": {
"connector": {
"prop_key": "prop_value" // "prop_key" means property key, while "prop_val" means property value
}
}
}
}
job.reader.connector
supports KV configuration in the form of <string,string>, where:
prop_key
: FlinkKafkaConsumer property keyprop_value
: FlinkKafkaConsumer property key
Some common property used are listed below:
1. Kafka cluster properties
Property key | Required | Default value | Optional value | Description |
---|---|---|---|---|
connector.bootstrap.servers | Yes | kafka cluster address | ||
connector.topic | Yes | topic to read | ||
connector.group.id | Yes | kafka consumer group |
2. Where to start consuming
Property key | Is necessary | Default value | Optional value | Description |
---|---|---|---|---|
connector.startup-mode | No | group-offsets | 1. ealiest-offset : Consume from the earliest offset of the partition2. latest-offset : Consume from the latest offset of the partition3. group-offsets : Comsume from the offset of the current consumer group4. specific-offsets : Specify the offset for each partition, cooperate with connector.specific-offsets 5. specific-timestamp : Consume messages after a certain point in time, cooperate with connector.specific-timestamp |
Decide from which offsets to consume |
connector.specific-offsets | No | Used with specific-offsets, the format is a standard json string. For example: [{"partition":1,"offset":100},{"partition":2,"offset":200}] |
||
connector.specific-timestamp | No | Used with specific-timestamp (ms) to specify the offset to consume |
3. Other FlinkKafkaConsumer parameters
FlinkKafkaConsumer supports many parameters, please refer to ConsumerConfig(2.1.0) API] for details .
If the user needs to set these parameters, it can be configured through connector.XXX
.
For example, to set MAX_PARTITION_FETCH_BYTES_CONFIG to 1024, add the parameter:
{
"job": {
"reader": {
"connector": {
"connector.max.partition.fetch.bytes": "1024"
}
}
}
}
The Kafka read connector is used in streaming scenarios and will be consumed all the time under normal circumstances.
If the user wants to debug by consuming only a limited amount of data, the following parameters can be configured. Note that these parameters need to be added to job.reader
block.
Property key | Is necessary | Default value | Description |
---|---|---|---|
enable_count_mode | No | false | Whether to end the current task after sending a piece of data, generally used for testing |
count_mode_record_threshold | No | 10000 | If enable_count_mode=true , the current task ends after consuming count_mode_record_threshold pieces of messages |
count_mode_run_time_threshold | No | 600 | If enable_count_mode=true , end the current task after running count_mode_record_threshold seconds |
Messages can be pulled from KafkaConsumer in format of ConsumerRecord. BitSail supports two ways to handle ConsumerRecordof. The user can use job.reader.format
to decide which method to use.
job.reader.format_type="json"
: Parse according to json format- In this mode, BitSail parses the json format string represented by value in ConsumerRecord according to the parameters
job.reader.columns
set by the user. - Therefore, the parameters
job.reader.columns
is required in this mode
- In this mode, BitSail parses the json format string represented by value in ConsumerRecord according to the parameters
job.reader.format_type="streaming_file"
: Use raw byte value- In this mode, BitSail directly deliver the raw bytes value in ConsumerRecord. The specific structure is as follows:
[
{"index":0, "name":"key", "type":"binary"}, // message key
{"index":1, "name":"value", "type":"binary"}, // message value
{"index":2, "name":"partition", "type":"string"}, // partition of the message
{"index":3, "name":"offset", "type":"long"} // offset of the meesage in partition
]
Note that these parameters should be added to job.writer
block.
Param names | Default value | Description |
---|---|---|
class | Writer class name of kafka connector, com.bytedance.bitsail.connector.legacy.kafka.sink.KafkaOutputFormat |
|
kafka_servers | Kafka's bootstrap server address, multiple bootstrap server addresses are separated by ',' |
|
topic_name | kafka topic | |
columns | Describing fields' names and data types |
Param names | Default value | Description |
---|---|---|
writer_parallelism_num | writer parallelism num | |
partition_field | partition_field contains one or several fields from job.writer.columns , separated by commas (e.g. "id,timestamp"). If partition_field is not empty, when sending data to kafka topic, it will decide which topic to write based on the hash values of these fields in the record |
|
log_failures_only | false | When KafkaProducer fails to perform an asynchronous send operation: 1. If log_failures_only=true , only log failure information2. If log_failures_only=false , throw an exception |
retries | 10 | Number of failed retries for KafkaProducer |
retry_backoff_ms | 1000 | KafkaProducer's failure retry interval (ms) |
linger_ms | 5000 | The maximum waiting time (ms) for KafkaProducer to create a single batch |
When initializing the KafkaProducer, the user can use job.common.optional
to pass initialization parameters, for example:
{
"job": {
"common": {
"optional": {
"batch.size": 16384,
"buffer.memory": 33554432
}
}
}
}
Configuration examples: Kafka connector example