fluent-plugin-kafka, a plugin for Fluentd
A fluentd plugin to both consume and produce data for Apache Kafka.
Add this line to your application's Gemfile:
gem 'fluent-plugin-kafka'
And then execute:
$ bundle
Or install it yourself as:
$ gem install fluent-plugin-kafka --no-document
If you want to use zookeeper related parameters, you also need to install zookeeper gem. zookeeper gem includes native extension, so development tools are needed, e.g. ruby-devel, gcc, make and etc.
- Ruby 2.1 or later
- Input plugins work with kafka v0.9 or later
- Output plugins work with kafka v0.8 or later
This fork adds support for using MSK IAM authentication with the rdkafka2
output type in Fluentd. Authentication and authorization with an MSK cluster are facilitated through a base64-encoded signed URL, which is generated by the aws-msk-iam-sasl-signer-ruby library.
The aws-msk-iam-sasl-signer-ruby
library provides an example for generating the OAuthBearer token using rdkafka, which is one of the core Kafka libraries supported by the Fluentd fluent-plugin-kafka plugin. This fork integrates that example into the Fluent::Rdkafka2Output
class, enabling AWS IAM authentication.
The key change is the inclusion of a refresh callback:
Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token)
This callback triggers token generation when needed, ensuring continuous authentication with the MSK cluster.
To enable this feature, configure your Fluentd input as follows:
<match *>
@type rdkafka2
# Kafka brokers to connect to (typically port 9098 or 9198 for IAM authentication)
brokers <broker_addresses>
# Topic to write events to
topic_key test-topic-1
default_topic test-topic-1
# AWS Region (required)
aws_msk_region us-east-1
# Use a shared producer for the connection (required)
share_producer true
# MSK IAM authentication settings (required)
rdkafka_options {
"security.protocol": "sasl_ssl",
"sasl.mechanisms": "OAUTHBEARER"
With this configuration, Fluentd will handle the token refresh and manage the connection to your MSK cluster using AWS IAM authentication.
- ssl_ca_cert
- ssl_client_cert
- ssl_client_cert_key
- ssl_client_cert_key_password
- ssl_ca_certs_from_system
Set path to SSL related files. See Encryption and Authentication using SSL for more detail.
- principal
- keytab
Set principal and path to keytab for SASL/GSSAPI authentication. See Authentication using SASL for more details.
- username
- password
- scram_mechanism
- sasl_over_ssl
Set username, password, scram_mechanism and sasl_over_ssl for SASL/Plain or Scram authentication. See Authentication using SASL for more details.
Consume events by single consumer.
@type kafka
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
topics <listening topics(separate with comma',')>
format <input text type (text|json|ltsv|msgpack)> :default => json
message_key <key (Optional, for text format only, default is message)>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
# Optionally, you can manage topic offset by using zookeeper
offset_zookeeper <zookeer node list (<zookeeper1_host>:<zookeeper1_port>,<zookeeper2_host>:<zookeeper2_port>,..)>
offset_zk_root_node <offset path in zookeeper> default => '/fluent-plugin-kafka'
# ruby-kafka consumer options
max_bytes (integer) :default => nil (Use default of ruby-kafka)
max_wait_time (integer) :default => nil (Use default of ruby-kafka)
min_bytes (integer) :default => nil (Use default of ruby-kafka)
Supports a start of processing from the assigned offset for specific topics.
@type kafka
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
format <input text type (text|json|ltsv|msgpack)>
topic <listening topic>
partition <listening partition: default=0>
offset <listening start offset: default=-1>
topic <listening topic>
partition <listening partition: default=0>
offset <listening start offset: default=-1>
See also ruby-kafka README for more detailed documentation about ruby-kafka.
Consuming topic name is used for event tag. So when the target topic name is app_event
, the tag is app_event
. If you want to modify tag, use add_prefix
or add_suffix
parameters. With add_prefix kafka
, the tag is kafka.app_event
Consume events by kafka consumer group features..
@type kafka_group
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
consumer_group <consumer group name, must set>
topics <listening topics(separate with comma',')>
format <input text type (text|json|ltsv|msgpack)> :default => json
message_key <key (Optional, for text format only, default is message)>
kafka_message_key <key (Optional, If specified, set kafka's message key to this key)>
add_headers <If true, add kafka's message headers to record>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
retry_emit_limit <Wait retry_emit_limit x 1s when BuffereQueueLimitError happens. The default is nil and it means waiting until BufferQueueLimitError is resolved>
use_record_time (Deprecated. Use 'time_source record' instead.) <If true, replace event time with contents of 'time' field of fetched record>
time_source <source for message timestamp (now|kafka|record)> :default => now
time_format <string (Optional when use_record_time is used)>
# ruby-kafka consumer options
max_bytes (integer) :default => 1048576
max_wait_time (integer) :default => nil (Use default of ruby-kafka)
min_bytes (integer) :default => nil (Use default of ruby-kafka)
offset_commit_interval (integer) :default => nil (Use default of ruby-kafka)
offset_commit_threshold (integer) :default => nil (Use default of ruby-kafka)
fetcher_max_queue_size (integer) :default => nil (Use default of ruby-kafka)
refresh_topic_interval (integer) :default => nil (Use default of ruby-kafka)
start_from_beginning (bool) :default => true
See also ruby-kafka README for more detailed documentation about ruby-kafka options.
supports regex pattern since v0.13.1. If you want to use regex pattern, use /pattern/
like /foo.*/
Consuming topic name is used for event tag. So when the target topic name is app_event
, the tag is app_event
. If you want to modify tag, use add_prefix
or add_suffix
parameter. With add_prefix kafka
, the tag is kafka.app_event
With the introduction of the rdkafka-ruby based input plugin we hope to support Kafka brokers above version 2.1 where we saw compatibility issues when using the ruby-kafka based @kafka_group input type. The rdkafka-ruby lib wraps the highly performant and production ready librdkafka C lib.
@type rdkafka_group
topics <listening topics(separate with comma',')>
format <input text type (text|json|ltsv|msgpack)> :default => json
message_key <key (Optional, for text format only, default is message)>
kafka_message_key <key (Optional, If specified, set kafka's message key to this key)>
add_headers <If true, add kafka's message headers to record>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
retry_emit_limit <Wait retry_emit_limit x 1s when BuffereQueueLimitError happens. The default is nil and it means waiting until BufferQueueLimitError is resolved>
use_record_time (Deprecated. Use 'time_source record' instead.) <If true, replace event time with contents of 'time' field of fetched record>
time_source <source for message timestamp (now|kafka|record)> :default => now
time_format <string (Optional when use_record_time is used)>
# kafka consumer options
max_wait_time_ms 500
max_batch_size 10000
kafka_configs {
"bootstrap.servers": "brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>",
"group.id": "<consumer group name>"
See also rdkafka-ruby and librdkafka for more detailed documentation about Kafka consumer options.
Consuming topic name is used for event tag. So when the target topic name is app_event
, the tag is app_event
. If you want to modify tag, use add_prefix
or add_suffix
parameter. With add_prefix kafka
, the tag is kafka.app_event
This kafka2
plugin is for fluentd v1 or later. This plugin uses ruby-kafka
producer for writing data.
If ruby-kafka
doesn't fit your kafka environment, check rdkafka2
plugin instead. This will be out_kafka
plugin in the future.
<match app.**>
@type kafka2
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
# Kafka topic, placerholders are supported. Chunk keys are required in the Buffer section inorder for placeholders
# to work.
topic (string) :default => nil
topic_key (string) :default => 'topic'
partition_key (string) :default => 'partition'
partition_key_key (string) :default => 'partition_key'
message_key_key (string) :default => 'message_key'
default_topic (string) :default => nil
default_partition_key (string) :default => nil
record_key (string) :default => nil
default_message_key (string) :default => nil
exclude_topic_key (bool) :default => false
exclude_partition_key (bool) :default => false
exclude_partition (bool) :default => false
exclude_message_key (bool) :default => false
get_kafka_client_log (bool) :default => false
headers (hash) :default => {}
headers_from_record (hash) :default => {}
use_event_time (bool) :default => false
use_default_for_unknown_topic (bool) :default => false
discard_kafka_delivery_failed (bool) :default => false (No discard)
partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
share_producer (bool) :default => false
idempotent (bool) :default => false
# If you intend to rely on AWS IAM auth to MSK with long lived credentials
# https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html
# For AWS STS support, see status in
# - https://github.com/zendesk/ruby-kafka/issues/944
# - https://github.com/zendesk/ruby-kafka/pull/951
sasl_aws_msk_iam_access_key_id (string) :default => nil
sasl_aws_msk_iam_secret_key_id (string) :default => nil
sasl_aws_msk_iam_aws_region (string) :default => nil
@type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
# Optional. See https://docs.fluentd.org/v/1.0/configuration/inject-section
tag_key tag
time_key time
# See fluentd document for buffer related parameters: https://docs.fluentd.org/v/1.0/configuration/buffer-section
# Buffer chunk key should be same with topic_key. If value is not found in the record, default_topic is used.
<buffer topic>
flush_interval 10s
# ruby-kafka producer options
idempotent (bool) :default => false
sasl_over_ssl (bool) :default => true
max_send_retries (integer) :default => 1
required_acks (integer) :default => -1
ack_timeout (integer) :default => nil (Use default of ruby-kafka)
compression_codec (string) :default => nil (No compression. Depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression)
The <formatter name>
in <format>
uses fluentd's formatter plugins. See formatter article.
Note: Java based Kafka client uses murmur2
as partitioner function by default. If you want to use same partitioning behavior with fluent-plugin-kafka, change it to murmur2
instead of crc32
. Note that for using murmur2
hash partitioner function, you must install digest-murmurhash
ruby-kafka sometimes returns Kafka::DeliveryFailed
error without good information.
In this case, get_kafka_client_log
is useful for identifying the error cause.
ruby-kafka's log is routed to fluentd log so you can see ruby-kafka's log in fluentd logs.
Supports following ruby-kafka's producer options.
- max_send_retries - default: 2 - Number of times to retry sending of messages to a leader.
- required_acks - default: -1 - The number of acks required per request. If you need flush performance, set lower value, e.g. 1, 2.
- ack_timeout - default: nil - How long the producer waits for acks. The unit is seconds.
- compression_codec - default: nil - The codec the producer uses to compress messages.
- max_send_limit_bytes - default: nil - Max byte size to send message to avoid MessageSizeTooLarge. For example, if you set 1000000(message.max.bytes in kafka), Message more than 1000000 byes will be dropped.
- discard_kafka_delivery_failed - default: false - discard the record where Kafka::DeliveryFailed occurred
If you want to know about detail of monitoring, see also https://github.com/zendesk/ruby-kafka#monitoring
See also Kafka::Client for more detailed documentation about ruby-kafka.
This plugin supports compression codec "snappy" also. Install snappy module before you use snappy compression.
$ gem install snappy --no-document
snappy gem uses native extension, so you need to install several packages before. On Ubuntu, need development packages and snappy library.
$ sudo apt-get install build-essential autoconf automake libtool libsnappy-dev
On CentOS 7 installation is also necessary.
$ sudo yum install gcc autoconf automake libtool snappy-devel
This plugin supports compression codec "lz4" also. Install extlz4 module before you use lz4 compression.
$ gem install extlz4 --no-document
This plugin supports compression codec "zstd" also. Install zstd-ruby module before you use zstd compression.
$ gem install zstd-ruby --no-document
Messages will be assigned a partition at random as default by ruby-kafka, but messages with the same partition key will always be assigned to the same partition by setting default_partition_key
in config file.
If key name partition_key_key
exists in a message, this plugin set the value of partition_key_key as key.
default_partition_key | partition_key_key | behavior |
Not set | Not exists | All messages are assigned a partition at random |
Set | Not exists | All messages are assigned to the specific partition |
Not set | Exists | Messages which have partition_key_key record are assigned to the specific partition, others are assigned a partition at random |
Set | Exists | Messages which have partition_key_key record are assigned to the specific partition with partition_key_key, others are assigned to the specific partition with default_parition_key |
If key name message_key_key
exists in a message, this plugin publishes the value of message_key_key to kafka and can be read by consumers. Same message key will be assigned to all messages by setting default_message_key
in config file. If message_key_key exists and if partition_key_key is not set explicitly, messsage_key_key will be used for partitioning.
It is possible to set headers on Kafka messages. This only works for kafka2 and rdkafka2 output plugin.
The format is like key1:value1,key2:value2. For example:
<match app.**>
@type kafka2
headers some_header_name:some_header_value
You may set header values based on a value of a fluentd record field. For example, imagine a fluentd record like:
{"source": { "ip": "" }, "payload": "hello world" }
And the following fluentd config:
<match app.**>
@type kafka2
headers_from_record source_ip:$.source.ip
The Kafka message will have a header of source_ip=
The configuration format is jsonpath. It is descibed in https://docs.fluentd.org/plugin-helper-overview/api-plugin-helper-record_accessor
Fields can be excluded from output data. Only works for kafka2 and rdkafka2 output plugin.
Fields must be specified using an array of dot notation $.
, for example:
<match app.**>
@type kafka2
exclude_fields $.source.ip,$.HTTP_FOO
This config can be used to remove fields used on another configs.
For example, $.source.ip
can be extracted with config headers_from_record
and excluded from message payload.
Using this config to remove unused fields is discouraged. A filter plugin can be used for this purpose.
If record_key
is provided, the plugin sends only a sub field given by that key.
The configuration format is jsonpath.
e.g. When the following configuration and the incoming record are given:
<match **>
@type kafka2
record_key '$.data'
"specversion" : "1.0",
"type" : "com.example.someevent",
"id" : "C234-1234-1234",
"time" : "2018-04-05T17:31:00Z",
"datacontenttype" : "application/json",
"data" : {
"appinfoA" : "abc",
"appinfoB" : 123,
"appinfoC" : true
only the data
field will be serialized by the formatter and sent to Kafka.
The toplevel data
key will be removed.
This plugin uses ruby-kafka producer for writing data. This plugin is for v0.12. If you use v1, see kafka2
Support of fluentd v0.12 has ended. kafka_buffered
will be an alias of kafka2
and will be removed in the future.
<match app.**>
@type kafka_buffered
# Brokers: you can choose either brokers or zookeeper. If you are not familiar with zookeeper, use brokers parameters.
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
zookeeper <zookeeper_host>:<zookeeper_port> # Set brokers via Zookeeper
zookeeper_path <broker path in zookeeper> :default => /brokers/ids # Set path in zookeeper for kafka
topic_key (string) :default => 'topic'
partition_key (string) :default => 'partition'
partition_key_key (string) :default => 'partition_key'
message_key_key (string) :default => 'message_key'
default_topic (string) :default => nil
default_partition_key (string) :default => nil
default_message_key (string) :default => nil
exclude_topic_key (bool) :default => false
exclude_partition_key (bool) :default => false
exclude_partition (bool) :default => false
exclude_message_key (bool) :default => false
output_data_type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
output_include_tag (bool) :default => false
output_include_time (bool) :default => false
exclude_topic_key (bool) :default => false
exclude_partition_key (bool) :default => false
get_kafka_client_log (bool) :default => false
use_event_time (bool) :default => false
partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
# See fluentd document for buffer related parameters: https://docs.fluentd.org/v/0.12/buffer
# ruby-kafka producer options
idempotent (bool) :default => false
sasl_over_ssl (bool) :default => true
max_send_retries (integer) :default => 1
required_acks (integer) :default => -1
ack_timeout (integer) :default => nil (Use default of ruby-kafka)
compression_codec (string) :default => nil (No compression. Depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression)
kafka_agg_max_bytes (integer) :default => 4096
kafka_agg_max_messages (integer) :default => nil (No limit)
max_send_limit_bytes (integer) :default => nil (No drop)
discard_kafka_delivery_failed (bool) :default => false (No discard)
monitoring_list (array) :default => []
supports the following ruby-kafka
- max_send_retries - default: 2 - Number of times to retry sending of messages to a leader.
- required_acks - default: -1 - The number of acks required per request. If you need flush performance, set lower value, e.g. 1, 2.
- ack_timeout - default: nil - How long the producer waits for acks. The unit is seconds.
- compression_codec - default: nil - The codec the producer uses to compress messages.
- max_send_limit_bytes - default: nil - Max byte size to send message to avoid MessageSizeTooLarge. For example, if you set 1000000(message.max.bytes in kafka), Message more than 1000000 byes will be dropped.
- discard_kafka_delivery_failed - default: false - discard the record where Kafka::DeliveryFailed occurred
- monitoring_list - default: [] - library to be used to monitor. statsd and datadog are supported
has two additional parameters:
- kafka_agg_max_bytes - default: 4096 - Maximum value of total message size to be included in one batch transmission.
- kafka_agg_max_messages - default: nil - Maximum number of messages to include in one batch transmission.
Note: Java based Kafka client uses murmur2
as partitioner function by default. If you want to use same partitioning behavior with fluent-plugin-kafka, change it to murmur2
instead of crc32
. Note that for using murmur2
hash partitioner function, you must install digest-murmurhash
This plugin uses ruby-kafka producer for writing data. For performance and reliability concerns, use kafka_bufferd
output instead. This is mainly for testing.
<match app.**>
@type kafka
# Brokers: you can choose either brokers or zookeeper.
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
zookeeper <zookeeper_host>:<zookeeper_port> # Set brokers via Zookeeper
zookeeper_path <broker path in zookeeper> :default => /brokers/ids # Set path in zookeeper for kafka
default_topic (string) :default => nil
default_partition_key (string) :default => nil
default_message_key (string) :default => nil
output_data_type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
output_include_tag (bool) :default => false
output_include_time (bool) :default => false
exclude_topic_key (bool) :default => false
exclude_partition_key (bool) :default => false
partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
# ruby-kafka producer options
max_send_retries (integer) :default => 1
required_acks (integer) :default => -1
ack_timeout (integer) :default => nil (Use default of ruby-kafka)
compression_codec (string) :default => nil (No compression. Depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression)
max_buffer_size (integer) :default => nil (Use default of ruby-kafka)
max_buffer_bytesize (integer) :default => nil (Use default of ruby-kafka)
This plugin also supports ruby-kafka related parameters. See Buffered output plugin section.
Note: Java based Kafka client uses murmur2
as partitioner function by default. If you want to use same partitioning behavior with fluent-plugin-kafka, change it to murmur2
instead of crc32
. Note that for using murmur2
hash partitioner function, you must install digest-murmurhash
This plugin uses rdkafka
instead of ruby-kafka
for kafka client.
You need to install rdkafka gem.
# rdkafka is C extension library. Need to install development tools like ruby-devel, gcc and etc
# for v0.12 or later
$ gem install rdkafka --no-document
# for v0.11 or earlier
$ gem install rdkafka -v 0.6.0 --no-document
is for fluentd v1.0 or later.
<match app.**>
@type rdkafka2
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
topic_key (string) :default => 'topic'
default_topic (string) :default => nil
partition_key (string) :default => 'partition'
partition_key_key (string) :default => 'partition_key'
message_key_key (string) :default => 'message_key'
use_default_for_unknown_topic (bool) :default => false
use_default_for_unknown_partition_error (bool) :default => false
default_partition_key (string) :default => nil
default_message_key (string) :default => nil
exclude_topic_key (bool) :default => false
exclude_partition_key (bool) :default => false
discard_kafka_delivery_failed (bool) :default => false (No discard)
discard_kafka_delivery_failed_regex (regexp) :default => nil (No discard)
use_event_time (bool) :default => false
# same with kafka2
headers (hash) :default => {}
headers_from_record (hash) :default => {}
record_key (string) :default => nil
@type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
# Optional. See https://docs.fluentd.org/v/1.0/configuration/inject-section
tag_key tag
time_key time
# See fluentd document for buffer section parameters: https://docs.fluentd.org/v/1.0/configuration/buffer-section
# Buffer chunk key should be same with topic_key. If value is not found in the record, default_topic is used.
<buffer topic>
flush_interval 10s
# You can set any rdkafka configuration via this parameter: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
rdkafka_options {
"log_level" : 7
# rdkafka2 specific parameters
# share kafka producer between flush threads. This is mainly for reducing kafka operations like kerberos
share_producer (bool) :default => false
# Timeout for polling message wait. If 0, no wait.
rdkafka_delivery_handle_poll_timeout (integer) :default => 30
# If the record size is larger than this value, such records are ignored. Default is no limit
max_send_limit_bytes (integer) :default => nil
# The maximum number of enqueueing bytes per second. It can reduce the
# load of both Fluentd and Kafka when excessive messages are attempted
# to send. Default is no limit.
max_enqueue_bytes_per_second (integer) :default => nil
unrecoverable_error_codes (array) :default => ["topic_authorization_failed", "msg_size_too_large"]
supports discard_kafka_delivery_failed_regex
- default: nil - discard the record where the Kafka::DeliveryFailed occurred and the emitted message matches the given regex pattern, such as/unknown_topic/
If you use v0.12, use rdkafka
<match kafka.**>
@type rdkafka
default_topic kafka
flush_interval 1s
output_data_type json
rdkafka_options {
"log_level" : 7
We got lots of similar questions. Almost cases, this problem happens by version mismatch between ruby-kafka and kafka cluster. See ruby-kafka README for more details: https://github.com/zendesk/ruby-kafka#compatibility
To avoid the problem, there are 2 approaches:
- Upgrade your kafka cluster to latest version. This is better because recent version is faster and robust.
- Downgrade ruby-kafka/fluent-plugin-kafka to work with your older kafka.
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Added some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request