diff --git a/README.md b/README.md index 22cdfb3b..70ab044d 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,11 @@ ![Lifecycle Active](https://badgen.net/badge/Lifecycle/Active/green) -## Introduction -The [Modern C++ Kafka API](http://opensource.morganstanley.com/modern-cpp-kafka/doxygen/annotated.html) is a layer of C++ wrapper based on [librdkafka](https://github.com/edenhill/librdkafka) (the C part), with high quality, but more friendly to users. +The [modern-cpp-kafka API](http://opensource.morganstanley.com/modern-cpp-kafka/doxygen/annotated.html) is a layer of ***C++*** wrapper based on [librdkafka](https://github.com/confluentinc/librdkafka) (the ***C*** part only), with high quality, but more friendly to users. + +- By now, [modern-cpp-kafka](https://github.com/morganstanley/modern-cpp-kafka) is compatible with [librdkafka v1.9.2](https://github.com/confluentinc/librdkafka/releases/tag/v1.9.2). -- By now, [modern-cpp-kafka](https://github.com/morganstanley/modern-cpp-kafka) is compatible with [librdkafka v1.9.2](https://github.com/edenhill/librdkafka/releases/tag/v1.9.2). ``` KAFKA is a registered trademark of The Apache Software Foundation and @@ -14,23 +14,25 @@ has been licensed for use by modern-cpp-kafka. modern-cpp-kafka has no affiliation with and is not endorsed by The Apache Software Foundation. ``` + ## Why it's here The ***librdkafka*** is a robust high performance C/C++ library, widely used and well maintained. -Unfortunately, to maintain C++98 compatibility, the C++ interface of ***librdkafka*** is not quite object-oriented or user-friendly. +Unfortunately, to maintain ***C++98*** compatibility, the ***C++*** interface of ***librdkafka*** is not quite object-oriented or user-friendly. Since C++ is evolving quickly, we want to take advantage of new C++ features, thus make the life easier for developers. And this led us to create a new C++ API for Kafka clients. -Eventually, we worked out the ***modern-cpp-kafka***, -- a header-only library that uses idiomatic C++ features to provide a safe, efficient and easy to use way of producing and consuming Kafka messages. +Eventually, we worked out the ***modern-cpp-kafka***, -- a ***header-only*** library that uses idiomatic ***C++*** features to provide a safe, efficient and easy to use way of producing and consuming Kafka messages. + ## Features -* Header-only +* __Header-only__ * Easy to deploy, and no extra library required to link -* Ease of Use +* __Ease of Use__ * Interface/Naming matches the Java API @@ -40,7 +42,7 @@ Eventually, we worked out the ***modern-cpp-kafka***, -- a header-only library t * ***librdkafka***'s polling and queue management is now hidden -* Robust +* __Robust__ * Verified with kinds of test cases, which cover many abnormal scenarios (edge cases) @@ -50,187 +52,160 @@ Eventually, we worked out the ***modern-cpp-kafka***, -- a header-only library t * Client failure and taking over, etc. -* Efficient +* __Efficient__ * No extra performance cost (No deep copy introduced internally) * Much better (2~4 times throughput) performance result than those native language (Java/Scala) implementation, in most commonly used cases (message size: 256 B ~ 2 KB) -## Build - -* No need to build for installation - -* To build its `tools`/`tests`/`examples`, you should - - * Specify library locations with environment variables - - * `LIBRDKAFKA_INCLUDE_DIR` -- ***librdkafka*** headers - - * `LIBRDKAFKA_LIBRARY_DIR` -- ***librdkafka*** libraries - - * `GTEST_ROOT` -- ***googletest*** headers and libraries - - * `BOOST_ROOT` -- ***boost*** headers and libraries - - * `SASL_LIBRARYDIR`/`SASL_LIBRARY` -- if SASL connection support is wanted - - * `RAPIDJSON_INCLUDE_DIRS` -- `addons/KafkaMetrics` requires **rapidjson** headers - - * Create an empty directory for the build, and `cd` to it - - * Build commands - - * Type `cmake path-to-project-root` - - * Type `make` (could follow build options with `-D`) - - * `BUILD_OPTION_USE_ASAN=ON` -- Use Address Sanitizer - - * `BUILD_OPTION_USE_TSAN=ON` -- Use Thread Sanitizer - - * `BUILD_OPTION_USE_UBSAN=ON` -- Use Undefined Behavior Sanitizer - - * `BUILD_OPTION_CLANG_TIDY=ON` -- Enable clang-tidy checking +## Installation / Requirements - * `BUILD_OPTION_GEN_DOC=ON` -- Generate documentation as well +* Just include the [`include/kafka`](https://github.com/morganstanley/modern-cpp-kafka/tree/main/include/kafka) directory for your project - * `BUILD_OPTION_DOC_ONLY=ON` -- Only generate documentation +* The compiler should support ***C++17*** - * `BUILD_OPTION_GEN_COVERAGE=ON` -- Generate test coverage, only support by clang currently + * Or, ***C++14***, but with pre-requirements - * Type `make install` + - Need ***boost*** headers (for `boost::optional`) -## Install + - For ***GCC*** compiler, it needs optimization options (e.g. `-O2`) -* Include the `include/kafka` directory in your project +* Dependencies -* To work together with ***modern-cpp-kafka*** API, the compiler should support + * [**librdkafka**](https://github.com/confluentinc/librdkafka) headers and library (only the C part) - * Option 1: C++17 + - Also see the [requirements from **librdkafka**](https://github.com/confluentinc/librdkafka#requirements) - * Option 2: C++14 (with pre-requirements) + * [**rapidjson**](https://github.com/Tencent/rapidjson) headers: only required by `addons/KafkaMetrics.h` - * Need ***boost*** headers (for `boost::optional`) - - * GCC only (with optimization, e.g. -O2) - -## How to Run Tests - -* Unit test (`tests/unit`) - - * The test could be run with no Kafka cluster depolyed - -* Integration test (`tests/integration`) - - * The test should be run with Kafka cluster depolyed - - * The environment variable `KAFKA_BROKER_LIST` should be set - - * E.g. `export KAFKA_BROKER_LIST=127.0.0.1:29091,127.0.0.1:29092,127.0.0.1:29093` - -* Robustness test (`tests/robustness`) - - * The test should be run with Kafka cluster depolyed locally - - * The environment variable `KAFKA_BROKER_LIST` should be set - - * The environment variable `KAFKA_BROKER_PIDS` should be set - - * Make sure the test runner gets the privilege to stop/resume the pids - - * E.g. `export KAFKA_BROKER_PIDS=61567,61569,61571` - -* Additional settings for clients - - * The environment variable `KAFKA_CLIENT_ADDITIONAL_SETTINGS` could be used for customized test environment - - * Especially for Kafka cluster with SASL(or SSL) connections - - * E.g. `export KAFKA_CLIENT_ADDITIONAL_SETTINGS="security.protocol=SASL_PLAINTEXT;sasl.kerberos.service.name=...;sasl.kerberos.keytab=...;sasl.kerberos.principal=..."` ## To Start -* Tutorial +* Tutorials * Confluent Blog [Debuting a Modern C++ API for Apache Kafka](https://www.confluent.io/blog/modern-cpp-kafka-api-for-safe-easy-messaging) + * Noteļ¼š it's a bit out of date, since [the API changed from time to time](https://github.com/morganstanley/modern-cpp-kafka/releases) + * [KafkaProducer Quick Start](doc/KafkaProducerQuickStart.md) * [KafkaConsumer Quick Start](doc/KafkaConsumerQuickStart.md) -* User's Manual +* User Manual * [Kafka Client API](http://opensource.morganstanley.com/modern-cpp-kafka/doxygen/annotated.html) - * `Properties` for Kafka clients - - * `Properties` is a map which contains all configuration info needed to initialize a Kafka client. These configuration items are key-value pairs, -- the "key" is a `std::string`, while the "value" could be a `std::string`, a `std::function<...>`, or an `Interceptors`. + * About [`Properties`](http://opensource.morganstanley.com/modern-cpp-kafka/doxygen/classKAFKA__API_1_1Properties.html) - * K-V Types: `std::string` -> `std::string` + * It is a map which contains all configuration info needed to initialize a Kafka client. - * Most are identical with [librdkafka configuration](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) + * The configuration items are ***key-value*** pairs, -- the type of ***key*** is always `std::string`, while the type for a ***value*** could be one of the followings - * But with Exceptions + * `std::string` - * Default Value Changes + * Most items are identical with [librdkafka configuration](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) - * `log_level`(default: `5`): default was `6` from **librdkafka** + * But with exceptions - * `client.id` (default: random string): no default string from **librdkafka** + * Default value changes - * `group.id` (default: random string, for `KafkaConsumer` only): no default string from **librdkafka** + | Key String | Default | Description | + | ----------- | ------------- | --------------------------------------------------------- | + | `log_level` | `5` | Default was `6` from **librdkafka** | + | `client.id` | random string | No default from **librdkafka** | + | `group.id` | random string | (for `KafkaConsumer` only) No default from **librdkafka** | - * Additional Options + * Additional options - * `enable.manual.events.poll` (default: `false`): To poll the (offset-commit/message-delivery callback) events manually + | Key String | Default | Description | + | --------------------------- | ------------- | --------------------------------------------------------------------------------------------------- | + | `enable.manual.events.poll` | `false` | To poll the (offset-commit/message-delivery callback) events manually | + | `max.poll.records` | `500` | (for `KafkaConsumer` only) The maxmum number of records that a single call to `poll()` would return | - * `max.poll.records` (default: `500`, for `KafkaConsumer` only): The maxmum number of records that a single call to `poll()` would return + * Ignored options - * Ignored Options + | Key String | Explanation | + | --------------------------- | ---------------------------------------------------------------------------------- | + | `enable.auto.offset.store` | ***modern-cpp-kafka*** will save the offsets in its own way | + | `auto.commit.interval.ms` | ***modern-cpp-kafka*** will only commit the offsets within each `poll()` operation | - * `enable.auto.offset.store`: ***modern-cpp-kafka*** will save the offsets in its own way + * `std::function<...>` - * `auto.commit.interval.ms`: ***modern-cpp-kafka*** will not commit the offsets periodically, instead, it would do it in the next `poll()`. + | Key String | Value Type | + | ------------------------------ | --------------------------------------------------------------------------------------------- | + | `log_cb` | `LogCallback` (`std::function`) | + | `error_cb` | `ErrorCallback` (`std::function`) | + | `stats_cb` | `StatsCallback` (`std::function`) | + | `oauthbearer_token_refresh_cb` | `OauthbearerTokenRefreshCallback` (`std::function`) | + * `Interceptors` - * K-V Types: `std::string` -> `std::function<...>` + | Key String | Value Type | + | -------------- | -------------- | + | `interceptors` | `Interceptors` | - * `log_cb` -> `LogCallback` (`std::function`) - * `error_cb` -> `ErrorCallback` (`std::function`) - * `stats_cb` -> `StatsCallback` (`std::function`) +## For Developers - * `oauthbearer_token_refresh_cb` -> `OauthbearerTokenRefreshCallback` (`std::function`) +### Build (for [tests](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests)/[tools](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tools)/[examples](https://github.com/morganstanley/modern-cpp-kafka/tree/main/examples)) - * K-V Types: `std::string` -> `Interceptors` +* Specify library locations with environment variables - * `interceptors`: takes `Interceptors` as the value type + | Environment Variable | Description | + | -------------------------------- | -------------------------------------------------------- | + | `LIBRDKAFKA_INCLUDE_DIR` | ***librdkafka*** headers | + | `LIBRDKAFKA_LIBRARY_DIR` | ***librdkafka*** libraries | + | `GTEST_ROOT` | ***googletest*** headers and libraries | + | `BOOST_ROOT` | ***boost*** headers and libraries | + | `SASL_LIBRARYDIR`/`SASL_LIBRARY` | [optional] for SASL connection support | + | `RAPIDJSON_INCLUDE_DIRS` | `addons/KafkaMetrics.h` requires ***rapidjson*** headers | -* Test Environment (ZooKeeper/Kafka cluster) Setup +* Build commands - * [Start the servers](https://kafka.apache.org/documentation/#quickstart_startserver) + * `cd empty-folder-for-build` + * `cmake path-to-project-root` -## How to Achieve High Availability & Performance + * `make` (following options could be used with `-D`) -* [Kafka Broker Configuration](doc/KafkaBrokerConfiguration.md) + | Build Option | Description | + | -------------------------------- | ------------------------------------------------------------- | + | `BUILD_OPTION_USE_TSAN=ON` | Use Thread Sanitizer | + | `BUILD_OPTION_USE_ASAN=ON` | Use Address Sanitizer | + | `BUILD_OPTION_USE_UBSAN=ON` | Use Undefined Behavior Sanitizer | + | `BUILD_OPTION_CLANG_TIDY=ON` | Enable clang-tidy checking | + | `BUILD_OPTION_GEN_DOC=ON` | Generate documentation as well | + | `BUILD_OPTION_DOC_ONLY=ON` | Only generate documentation | + | `BUILD_OPTION_GEN_COVERAGE=ON` | Generate test coverage, only support by clang currently | -* [Good Practices to Use KafkaProducer](doc/GoodPracticesToUseKafkaProducer.md) + * `make install` (to install `tools`) -* [Good Practices to Use KafkaConsumer](doc/GoodPracticesToUseKafkaConsumer.md) +### Run Tests -* [How to Make KafkaProducer Reliable](doc/HowToMakeKafkaProducerReliable.md) +* Kafka cluster setup + * [Quick Start For Cluster Setup](https://kafka.apache.org/documentation/#quickstart) + + * [Cluster Setup Scripts For Test](https://github.com/morganstanley/modern-cpp-kafka/blob/main/scripts/start-local-kafka-cluster.py) -## Other References + * [Kafka Broker Configuration](doc/KafkaBrokerConfiguration.md) -* Java API for Kafka clients +* To run the binary, the test runner requires following environment variables - * [org.apache.kafka.clients.producer](https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/producer/package-summary.html) + | Environment Variable | Descrioption | Example | + | ---------------------------------- | ----------------------------------------------------------- | -------------------------------------------------------------------------- | + | `KAFKA_BROKER_LIST` | The broker list for the Kafka cluster | `export KAFKA_BROKER_LIST=127.0.0.1:29091,127.0.0.1:29092,127.0.0.1:29093` | + | `KAFKA_BROKER_PIDS` | The broker PIDs for test runner to manipulate | `export KAFKA_BROKER_PIDS=61567,61569,61571` | + | `KAFKA_CLIENT_ADDITIONAL_SETTINGS` | Could be used for addtional configuration for Kafka clients | `export KAFKA_CLIENT_ADDITIONAL_SETTINGS="security.protocol=SASL_PLAINTEXT;sasl.kerberos.service.name=...;sasl.kerberos.keytab=...;sasl.kerberos.principal=..."` | - * [org.apache.kafka.clients.consumer](https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/package-summary.html) + * The environment variable `KAFKA_BROKER_LIST` is mandatory for integration/robustness test - * [org.apache.kafka.clients.admin](https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/admin/package-summary.html) + * The environment variable `KAFKA_BROKER_PIDS` is mandatory for robustness test + | Test Type | Requires Kafka Cluster | Requires Privilege to Stop/Resume the Brokers | + | -------------------------------------------------------------------------------------------------- | ------------------------ | --------------------------------------------- | + | [tests/unit](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/unit) | - | - | + | [tests/integration](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/integration) | Y (`KAFKA_BROKER_LIST`) | - | + | [tests/robustness`](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/robustness) | Y (`KAFKA_BROKER_LIST`) | Y (`KAFKA_BROKER_PIDS`) | diff --git a/doc/GoodPracticesToUseKafkaConsumer.md b/doc/GoodPracticesToUseKafkaConsumer.md deleted file mode 100644 index 7575f077..00000000 --- a/doc/GoodPracticesToUseKafkaConsumer.md +++ /dev/null @@ -1,22 +0,0 @@ -# Good Practices to Use a KafkaConsumer - -If we want to achieve high performance/availability, here're some rules of thumb. - -## How to distribute the messages (for the same topics) to different KafkaConsumers - -* Use a consumer group for these KafkaConsumers, thus they will work together -- each one deals with different partitions. - -* Besides `subscribe` (topics), users could also choose to explicitly `assign` certain partitions to a `KafkaConsumer`. - -## How to enhance the throughput - -* Try with a larger `QUEUED_MIN_MESSAGES`, especially for small messages. - -* Use multiple KafkaConsumers to distribute the payload. - -## How to avoid polling duplicated messages - -* To commit the offsets more frequently (e.g, always do commit after finishing processing a message). - -* Don't use quite a large `MAX_POLL_RECORDS` for a `KafkaConsumer` (with `enable.auto.commit=true`) -- you might fail to commit all these messages before crash, thus more duplications with the next `poll`. - diff --git a/doc/GoodPracticesToUseKafkaProducer.md b/doc/GoodPracticesToUseKafkaProducer.md deleted file mode 100644 index b6af5c74..00000000 --- a/doc/GoodPracticesToUseKafkaProducer.md +++ /dev/null @@ -1,48 +0,0 @@ -# Good Practices to Use a KafkaProducer - -If we want to achieve high performance/availability, here're some rules of thumb. - -## Avoid using `syncSend` for better throughput - -You should never call `syncSend` if you want to get a high throughput. The `syncSend` is a synchronous operation, and would not go on until the `acks` are received. - -## The `message.max.bytes` must be consistent with Kafka servers' setting - -* Default value: 1000,000 - -* The default setting for brokers is `message.max.bytes = 1000012`, and do MAKE SURE the client side setting no larger than it. Otherwise, it might construct a MessageSet which would be rejected (error: INVALID_MESSAGE_SIZE) by brokers. - -## Calculate `batch.num.messages` with the average message size - -* Default value: 10,000 - -* It defines the maximum number of messages batched in one MessageSet. - - Normally, larger value, better performance. However, since the size of MessageSet is limited by `message.max.bytes`, a too large value would not help any more. - - E.g, with the default `message.max.bytes=1000000` and `batch.num.messages=10000` settings, you could get the best performance while the average message size is larger than 100 bytes. - - However, if the average message size is small, you have to enlarge it (to `message.max.bytes/average_message_size` at least). - -## Choose `acks` wisely - -* The acks parameter controls how many partition replicas must receive the record before the producer can consider the write successful. - - * `acks=0`, the producer will not wait for a reply from the broker before assuming the message was sent successfully. - - * `acks=1`, the producer will receive a success response from the broker the moment the leader replica received the message. - - * `acks=all`, the producer will receive a success response from the broker once all in-sync replicas received the message. - - * Note: if "ack=all", please make sure the topic's replication factor is larger than 1. - -* The `acks=all` setting will highly impact the throughput & latency, and it would be obvious if the traffic latency between kafka brokers is high. But it's mandatory if we want to achieve high availability. - -## How could a message miss after send? - -* The message might even not have been received by the partition leader! (with `acks=0`) - -* Once the message received by the partition leader, the leader crashed just after responding to the producer, but has no chance to synchronize the message to other replicas. (with `acks=1`) - -* Once the message received by the partition leader, the leader crashed just after responding to the producer, but with no in-sync replica to synchronize for the message. (with `acks=all`, while brokers are with `min.insync.replicas=1`) - diff --git a/doc/HowToMakeKafkaProducerReliable.md b/doc/HowToMakeKafkaProducerReliable.md deleted file mode 100644 index 5c72ca6d..00000000 --- a/doc/HowToMakeKafkaProducerReliable.md +++ /dev/null @@ -1,188 +0,0 @@ -# How to Make KafkaProducer Reliable - -While using message dispatching systems, we always suffer from message lost, duplication and disordering. - -Since the application (using the `KafkaProducer`) might crash/restart, we might consider using certain mechanism to achieve `At most once`/`At least once`, and `Ordering`, -- such as locally persisting the messages until successful delivery, using embedded sequence number to de-duplicate, or responding data-source to acknowledgement the delivery result, etc. These are common topics, which are not quite specific to Kafka. - -Here we'd focus on `KafkaProducer`, together with the `idempotence` feature. Let's see, in which cases problems might happen, how to avoid them, and what's the best practise,-- to achieve `No Message Lost`, `Exactly Once` and `Ordering`. - - -## About `No Message Lost` - -### When might a message actually be lost - -* The producer gets a successful delivery response after sending the message, but the `partition leader` failed to sync it to other `replicas`. - -### How could a message be lost even with successful delivery - -* First, the `partition leader` doesn't sync-up the latest message to enough `in-sync replicas` before responding with the `ack` - - * The `partition leader` just don't need to wait for other `replica`s response - - - E.g, the producer is configured with `acks=1` - - * No available `in-sync replica` to wait for the response - - - E.g, all other replicas are not in-sync - -* Then, the `partition leader` crashes, and one `in-sync replica` becomes new `partition leader` - - * The new `partition leader` has no acknowledgement with the latest messages. Later, while new messages arrive, it would use conflicting record offsets (same with those records which the `partition leader` knows only). Then, even if the previous `partition leader` comes up again, these records have no chance to be recovered (just internally overwritten to be consistent with other replicas). - -### How to make sure `No Message Lost` - -* Make sure the leader would wait for responses from all in-sync replicas before the response - - * Configuration `acks=all` is a MUST for producer - -* Ensure enough `In-Sync partition replicas` - - * Configuration `min.insync.replicas >= 2` is a MUST for brokers - - - Take `min.insync.replicas = 2` for example, it means, - - 1. At most `replication.factor - min.insync.replicas` replicas are out-of-sync, -- the producer would still be able to send messages, otherwise, it could fail with 'no enough replica' error, and keeps retrying. - - 2. Occasionally no more than `min.insync.replicas` in-sync-replica failures. -- otherwise, messages might be missed. In this case, if just one in-sync replica crashes after sending back the ack to the producer, the message would not be lost; if two failed, it would! Since the new leader might be a replica which was not in-sync previously, and has no acknowledgement with these latest messages. - - * Please refer to [Kafka Broker Configuration](KafkaBrokerConfiguration.md) for more details. - - * Then, what would happen if replicas fail - - 1. Fails to send (`not enough in-sync replica failure`), -- while number of `in-sync replicas` could not meet `min.insync.replication` - - 2. Lost messages (after sending messages), -- with no `in-sync replica` survived from multi-failures - - 3. No message lost (while with all `in-sync replicas` acknowledged, and at least one `in-sync replica` available) - - -## About `Exactly Once` - -### How duplications happen - -* After brokers successfully persisted a message, it sent the `ack` to the producer. But for some abnormal reasons (such as network failure, etc), the producer might fail to receive the `ack`. The `librdkafka`'s internal queue would retry, thus another (duplicated) message would be persisted by brokers. - -### How to guarantee `Exactly Once` - -* The `enable.idempotence` configuration is RECOMMENDED. - - -## About `Ordering` - -### No ordering between partitions - -* Make sure these `ProducerRecord`s be with the same partition - - - Explicitly assigned with the same `topic-partition` - - - Use the same `key` for these records - -### How disordering happens within one partition - -* The `librdkafka` uses internal partition queues, and once a message fails to be sent successfully(e.g, brokers are down), it would be put back on the queue and retries again while `retry.backoff.ms` expires. However, before that (retry with the failed message), the brokers might recover and the messages behind (if with configuration `max.in.flight > 1`) happened to be sent successfully. In this case (with configuration `max.in.flight > 1` and `retries > 0`), disordering could happen, and the user would not even be aware of it. - -* Furthermore, while the last retry still failed, delivery callback would eventually be triggered. The user has to determine what to do for that (might want to re-send the message, etc). But there might be a case, -- some later messages had already been saved successfully by the server, thus no way to revert the disordering. - - -## More About `Idempotent producer` - -Please refer to the document from librdkafka, [Idempotent Producer](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#idempotent-producer) for more details. - -### Extra fields to maintain the message sequence - -The `librdkafka` maintains the original produce() ordering per-partition for all messages produced, using an internal per-partition 64-bit counter called the `msgid` which starts at 1. This `msgid` allows messages to be re-inserted in the partition message queue in the original order in the case of retries. - -The Idempotent Producer functionality in the Kafka protocol also has a per-message `sequence number`, which is a signed 32-bit wrapping counter that is reset each time the `Producer's ID (PID)` or `Epoch` changes. - -The `msgid` is used, (along with a base `msgid` value stored at the time the `PID/Epoch` was bumped), to calculate the Kafka protocol's message `sequence number`. - -### Configuration conflicts - -* Since the following configuration properties are adjusted automatically (if not modified by the user). Producer instantiation will fail if user-supplied configuration is incompatible. - - - `acks = all` - - - `max.in.flight (i.e, `max.in.flight.requests.per.connection`) = 5` - - - `retries = INT32_MAX` - -### Error handling - -* Exception thrown during `send` - - * For these errors which could be detected locally (and could not be recovered with retrying), an exception would be thrown. E.g, invalid message, as RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE (conflicting with local configuration `message.max.bytes`). - -* Permanent errors (respond from brokers) - - * Typical errors are: - - * Invalid message: RD_KAFKA_RESP_ERR_CORRUPT_MESSAGE, RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE, RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS, RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT, RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE. - - * Topic/Partition not exist: ERR_UNKNOWN_TOPIC_OR_PART, -- automatic topic creation is disabled on the broker or the application is specifying a partition that does not exist. - - * Authorization failure: ERR_TOPIC_AUTHORIZATION_FAILED, ERR_CLUSTER_AUTHORIZATION_FAILED - - * Normally, `Permanent error` means careless design, or wrong configuration, which should be avoided from the very beginning. - - * Unless with `enable.gapless.guarantee`(EXPERIMENTAL) configured, producer would keep going with the following messages; otherwise, it would purge all messages in-flight/in-queue (with RD_KAFKA_RESP_ERR__PURGE_INFLIGHT/RD_KAFKA_RESP_ERR__PURGE_QUEUE). - -* Temporary errors - - * Apart from those `permanent errors`, most of the left are temporary errors, which will be retried (if retry count permits); and while `message.timeout` expired, message delivery callback would be triggered with `RD_KAFKA_RESP_ERR__TIEMD_OUT`. - -* Be careful with the `RD_KAFKA_RESP_ERR__TIEMD_OUT` failure - - * There's some corner cases, such as a message that has been persisted by brokers but `KafkaProducer` failed to get the response. If `message.timeout.ms` has not expired, the producer could retry and eventually get the response. Otherwise, (i.e, `message.timeout.ms` expired before the producer receives the successful `ack`), it would be considered as a delivery failure by the producer (while the brokers wouldn't). Users might re-transmit the message thus causing duplications. - - * To avoid this tricky situation, a longer `message.timeout.ms` is RECOMMENDED, to make sure there's enough time for transmission retries / on-flight responses. - -### Performance impact - -* The main impact comes from `max.in.flight=5` limitation. Currently, `max.in.flight` means `max.in.flight.per.connection`, -- that's 5 message batches (with size of ~1MB at the most) in flight (not get the `ack` response yet) at the most, towards per broker. Within low-latency networks, it would not be a problem; while in other cases, it might be! Good news is, there might be a plan (in `librdkafka`) to improve that `per.connection` limit to `per.partition`, thus boost the performance a lot. - - -## The best practice for `KafkaProducer` - -* Enable `enable.idempotence` configuration - -* Use a long `message.timeout.ms`, which would let `librdkafka` keep retrying, before triggering the delivery failure callback. - - -## Some examples - -### `KafkaProducer` demo - -```cpp - using namespace kafka::clients; - using namespace kafka::clients::producer; - - std::atomic running = true; - - KafkaProducer producer( - Properties({ - { Config::BOOTSTRAP_SERVERS, {"192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092"} }, - { ProducerConfig::ENABLE_IDEMPOTENCE, {"true"} }, - { ProducerConfig::MESSAGE_TIMEOUT_MS, {"86400000"} } // as long as 1 day - }) - ); - - while (running) { - auto msg = fetchMsgFromUpstream(); - auto record = ProducerRecord(topic, msg.key, msg.value, msg.id); - producer.send(record, - // Ack callback - [&msg](const RecordMetadata& metadata, std::error_code ec) { - // the message could be identified by `metadata.recordId()` - auto recordId = metadata.recordId(); - if (ec) { - std::cerr << "Cannot send out message with recordId: " << recordId << ", error:" << ec.message() << std::endl; - } else { - commitMsgToUpstream(recordId); - } - }); - } - - producer.close(); -``` - -* With a long `message.timeout.ms`, we're not likely to catch an error with delivery callback, --it would retry for temporary errors anyway. But be aware with permanent errors, it might be caused by careless design. diff --git a/doc/KafkaConsumerQuickStart.md b/doc/KafkaConsumerQuickStart.md index 184dd586..baa062b7 100644 --- a/doc/KafkaConsumerQuickStart.md +++ b/doc/KafkaConsumerQuickStart.md @@ -1,17 +1,22 @@ # KafkaConsumer Quick Start -Generally speaking, The `Modern C++ Kafka API` is quite similar with [Kafka Java's API](https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html) +Generally speaking, The `modern-cpp-kafka API` is quite similar with [Kafka Java's API](https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html) We'd recommend users to cross-reference them, --especially the examples. -## KafkaConsumer (`enable.auto.commit=true`) + +## KafkaConsumer (with `enable.auto.commit=true`) * Automatically commits previously polled offsets on each `poll` (and the final `close`) operations. * Note, the internal `offset commit` is asynchronous, and is not guaranteed to succeed. It's supposed to be triggered (within each `poll` operation) periodically, thus the occasional failure doesn't quite matter. -### Example +### [Example](https://github.com/morganstanley/modern-cpp-kafka/blob/main/examples/kafka_auto_commit_consumer.cc) + ```cpp + using namespace kafka; + using namespace kafka::clients::consumer; + // Create configuration object const Properties props ({ {"bootstrap.servers", {brokers}}, @@ -57,11 +62,13 @@ We'd recommend users to cross-reference them, --especially the examples. * At the end, we could `close` the consumer explicitly, or just leave it to the destructor. -## KafkaConsumer (`enable.auto.commit=false`) + +## KafkaConsumer (with `enable.auto.commit=false`) * Users must commit the offsets for received records manually. -### Example +### [Example](https://github.com/morganstanley/modern-cpp-kafka/blob/main/examples/kafka_manual_commit_consumer.cc) + ```cpp // Create configuration object const Properties props ({ @@ -126,7 +133,7 @@ We'd recommend users to cross-reference them, --especially the examples. * `commitSync` and `commitAsync` are both available here. Normally, use `commitSync` to guarantee the commitment, or use `commitAsync`(with `OffsetCommitCallback`) to get a better performance. -## Option `enable.manual.events.poll` +## About `enable.manual.events.poll` While we construct a `KafkaConsumer` with `enable.manual.events.poll=false` (i.e. the default option), an internal thread would be created for `OffsetCommit` callbacks handling. @@ -135,6 +142,7 @@ This might not be what you want, since then you have to use 2 different threads Here we have another choice, -- using `enable.manual.events.poll=true`, thus the `OffsetCommit` callbacks would be called within member function `pollEvents()`. ### Example + ```cpp KafkaConsumer consumer(props.put("enable.manual.events.poll", "true")); @@ -156,49 +164,48 @@ Here we have another choice, -- using `enable.manual.events.poll=true`, thus the } ``` -## Error handling -No exception would be thrown from a consumer's `poll` operation. +## Error handling -Instead, once an error occurs, the `Error` would be embedded in the `Consumer::ConsumerRecord`. +Normally, exceptions might be thrown while operations fail, but not for a `poll` operation, -- if an error occurs, the `Error` would be embedded in the `Consumer::ConsumerRecord`. -About `Error`'s `value()`s, there are 2 cases +About the `Error` `value()`, there are 2 cases -1. Success +* Success - `RD_KAFKA_RESP_ERR__NO_ERROR` (`0`), -- got a message successfully - `RD_KAFKA_RESP_ERR__PARTITION_EOF`, -- reached the end of a partition (no message got) -2. Failure +* Failure - [Error Codes](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes) + ## Frequently Asked Questions -* What're the available configurations? +* How to enhance the throughput + + * Try with a larger `queued.min.messages`, especially for small messages. - - [KafkaProducerConfiguration](KafkaClientConfiguration.md#kafkaconsumer-configuration) + * Use multiple KafkaConsumers to distribute the payload. - - [Inline doxygen page](../doxygen/classKAFKA__CPP__APIS__NAMESPACE_1_1ConsumerConfig.html) +* How to avoid polling duplicated messages -* How to enhance the polling performance? + * To commit the offsets more frequently (e.g, always do commit after finishing processing a message). - `ConsumerConfig::QUEUED_MIN_MESSAGES` determines how frequently the consumer would send the FetchRequest towards brokers. - The default configuration (i.e, 100000) might not be good enough for small (less than 1KB) messages, and suggest using a larger value (e.g, 1000000) for it. + * Don't use quite a large `max.poll.records` for a `KafkaConsumer` (with `enable.auto.commit=true`) -- you might fail to commit all these messages before crash, thus more duplications with the next `poll`. * How many threads would be created by a KafkaConsumer? - 1. Each broker (in the list of BOOTSTRAP_SERVERS) would take a seperate thread to transmit messages towards a kafka cluster server. + * Each broker (in the list of `bootstrap.servers`) would take a seperate thread to transmit messages towards a kafka cluster server. - 2. Another 3 threads will handle internal operations, consumer group operations, and kinds of timers, etc. + * Another 3 threads will handle internal operations, consumer group operations, and kinds of timers, etc. - 3. To enable the auto events-polling, one more background thread would be created, which keeps polling/processing the offset-commit callback event. + * By default, `enable.manual.events.poll=false`, then one more background thread would be created, which keeps polling/processing the offset-commit callback event. * Which one of these threads will handle the callbacks? - There are 2 kinds of callbacks for a KafkaConsumer, - - 1. `RebalanceCallback` will be triggered internally by the user's thread, -- within the `poll` function. + * `RebalanceCallback` will be triggered internally by the user's thread, -- within the `poll` function. - 2. If `enable.auto.commit=true`, the `OffsetCommitCallback` will be triggered by the user's `poll` thread; otherwise, it would be triggered by a background thread. + * If `enable.auto.commit=true`, the `OffsetCommitCallback` will be triggered by the user's `poll` thread; otherwise, it would be triggered by a background thread (if `enable.manual.events.poll=true`), or by the `pollEvents()` call (if `enable.manual.events.poll=false`). diff --git a/doc/KafkaProducerQuickStart.md b/doc/KafkaProducerQuickStart.md index 3da2bd51..e2e7a2d9 100644 --- a/doc/KafkaProducerQuickStart.md +++ b/doc/KafkaProducerQuickStart.md @@ -1,17 +1,18 @@ # KafkaProducer Quick Start -Generally speaking, The `Modern C++ Kafka API` is quite similar to the [Kafka Java's API](https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html). +Generally speaking, The `modern-cpp-kafka API` is quite similar to the [Kafka Java's API](https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html). We'd recommend users to cross-reference them, --especially the examples. + ## KafkaProducer * The `send` is an unblock operation, and the result (including errors) could only be got from the delivery callback. -### Example +### [Example](https://github.com/morganstanley/modern-cpp-kafka/blob/main/examples/kafka_async_producer_not_copy_payload.cc) + ```cpp using namespace kafka; - using namespace kafka::clients; using namespace kafka::clients::producer; // Create configuration object @@ -60,7 +61,8 @@ We'd recommend users to cross-reference them, --especially the examples. * It's guaranteed that the delivery callback would be triggered anyway after `send`, -- a producer would even be waiting for it before `close`. So, it's a good way to release these memory resources in the `Producer::Callback` function. -## `KafkaProducer` with `kafka::clients::KafkaClient::EventsPollingOption` + +## About `enable.manual.events.poll` While we construct a `KafkaProducer` with `enable.manual.events.poll=false` (the default option), an internal thread would be created for `MessageDelivery` callbacks handling. @@ -68,12 +70,12 @@ This might not be what you want, since then you have to use 2 different threads Here we have another choice, -- using `enable.manual.events.poll=true`, thus the `MessageDelivery` callbacks would be called within member function `pollEvents()`. -* Note, if you constructed the `KafkaProducer` with `enable.manual.events.poll=true`, the `send()` will be an `unblocked` operation even if the `message buffering queue` is full. In that case, the `send()` operation would throw an exception (or return an `error code` with the input reference parameter), -- instead of blocking there. And you might want to call `pollEvents()`, thus delivery-callback could be called for some messages (which could then be removed from the `message buffering queue`). +* Note: if you constructed the `KafkaProducer` with `enable.manual.events.poll=true`, the `send()` will be an `unblocked` operation even if the `message buffering queue` is full. In that case, the `send()` operation would throw an exception (or return an `error code` with the input reference parameter), -- instead of blocking there. And you might want to call `pollEvents()`, thus delivery-callback could be called for some messages (which could then be removed from the `message buffering queue`). ### Example + ```cpp using namespace kafka; - using namespace kafka::clients; using namespace kafka::clients::producer; KafkaProducer producer(props.put("enable.manual.events.poll", "true")); @@ -99,17 +101,18 @@ Here we have another choice, -- using `enable.manual.events.poll=true`, thus the } // Here we call the `MessageDelivery` callbacks - // Note, we can only do this while the producer was constructed with `enable.manual.events.poll=true`. + // Note: we can only do this while the producer was constructed with `enable.manual.events.poll=true`. producer.pollEvents(); ``` -## Headers in ProducerRecord +## `Headers` in `ProducerRecord` * A `ProducerRecord` could take extra information with `headers`. - * Note, the `header` within `headers` contains the pointer of the memory block for its `value`. The memory block MUST be valid until the `ProducerRecord` is read by `producer.send()`. + * Note: the `header` within `headers` contains the pointer of the memory block for its `value`. The memory block __MUST__ be valid until the `ProducerRecord` is read by `producer.send()`. ### Example + ```cpp using namespace kafka::clients; @@ -139,81 +142,140 @@ Here we have another choice, -- using `enable.manual.events.poll=true`, thus the } ``` -## Error handling +## To Make KafkaProducer Reliable -`Error` might occur at different places while sending a message, +While using message dispatching systems, we always suffer from message lost, duplication and disordering. -1. A `KafkaException` would be triggered if `KafkaProducer` failed to trigger the send operation. +Since the application (using the `KafkaProducer`) might crash/restart, we might consider using certain mechanism to achieve `At most once`/`At least once`, and `Ordering`, -- such as locally persisting the messages until successful delivery, using embedded sequence number to de-duplicate, or responding data-source to acknowledgement the delivery result, etc. These are common topics, which are not quite specific to Kafka. -2. Delivery `Error` would be passed through the delivery-callback. +Here we'd focus on `KafkaProducer`, together with the `idempotence` feature. Let's see, in which cases problems might happen, how to avoid them, and what's the best practise,-- to achieve `No Message Lost`, `Exactly Once` and `Ordering`. -About `Error`'s `value()`s, there are 2 cases +### No Message Lost -1. Local errors, +#### How could a message be lost even with successful delivery - - `RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC` -- The topic doesn't exist +* First, the `partition leader` doesn't sync-up the latest message to enough `in-sync replicas` before responding with the `acks` - - `RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION` -- The partition doesn't exist + * The `partition leader` just don't need to wait for other `replica`s response - - `RD_KAFKA_RESP_ERR__INVALID_ARG` -- Invalid topic (topic is null or the length is too long (>512)) + - E.g, the producer is configured with `acks=1` - - `RD_KAFKA_RESP_ERR__MSG_TIMED_OUT` -- No ack received within the time limit + * No available `in-sync replica` to wait for the response -2. Broker errors, + - E.g, all other replicas are not in-sync and brokers are configured with `min.insync.replicas=1`) - - [Error Codes](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes) +* Then, the `partition leader` crashes, and one `in-sync replica` becomes new `partition leader` -## Frequently Asked Questions + * The new `partition leader` has no acknowledgement with the latest messages. Later, while new messages arrive, it would use conflicting record offsets (same with those records which the `partition leader` knows only). Then, even if the previous `partition leader` comes up again, these records have no chance to be recovered (just internally overwritten to be consistent with other replicas). + +#### How to make sure __No Message Lost__ + +* Make sure the leader would wait for responses from all in-sync replicas before the response + + * Configuration `acks=all` is a __MUST__ for producer + +* Ensure enough `In-Sync partition replicas` + + * Configuration `min.insync.replicas >= 2` is a __MUST__ for brokers + + - Take `min.insync.replicas = 2` for example, it means, + + 1. At most `replication.factor - min.insync.replicas` replicas are out-of-sync, -- the producer would still be able to send messages, otherwise, it could fail with 'no enough replica' error, and keeps retrying. + + 2. Occasionally no more than `min.insync.replicas` in-sync-replica failures. -- otherwise, messages might be missed. In this case, if just one in-sync replica crashes after sending back the ack to the producer, the message would not be lost; if two failed, it would! Since the new leader might be a replica which was not in-sync previously, and has no acknowledgement with these latest messages. + + * Please refer to [Kafka Broker Configuration](KafkaBrokerConfiguration.md) for more details. + + * Then, what would happen if replicas fail + + 1. Fails to send (`not enough in-sync replica failure`), -- while number of `in-sync replicas` could not meet `min.insync.replication` + + 2. Lost messages (after sending messages), -- with no `in-sync replica` survived from multi-failures -### What are the available configurations? + 3. No message lost (while with all `in-sync replicas` acknowledged, and at least one `in-sync replica` available) -- [KafkaProducerConfiguration](KafkaClientConfiguration.md#kafkaproducer-configuration) +### Exactly Once & Ordering -- [Inline doxygen page](../doxygen/classKAFKA__CPP__APIS__NAMESPACE_1_1ProducerConfig.html) +#### How duplications happen -### About the automatic `topic creation` +* After brokers successfully persisted a message, it sent the `ack` to the producer. But for some abnormal reasons (such as network failure, etc), the producer might fail to receive the `ack`. The `librdkafka`'s internal queue would retry, thus another (duplicated) message would be persisted by brokers. -If the cluster's configuration is with `auto.create.topics.enable=true`, the producer/consumer could trigger the brokers to create a new topic (with `send`, `subscribe`, etc) +#### How disordering happens within one partition -Note, the default created topic may be not what you want (e.g, with `default.replication.factor=1` configuration as default, etc), thus causing other unexpected problems. +* The `librdkafka` uses internal partition queues, and once a message fails to be sent successfully(e.g, brokers are down), it would be put back on the queue and retries again while `retry.backoff.ms` expires. However, before that (retry with the failed message), the brokers might recover and the messages behind (if with configuration `max.in.flight > 1`) happened to be sent successfully. In this case (with configuration `max.in.flight > 1` and `retries > 0`), disordering could happen, and the user would not even be aware of it. -### How to enhance the sending performance? +* Furthermore, while the last retry still failed, delivery callback would eventually be triggered. The user has to determine what to do for that (might want to re-send the message, etc). But there might be a case, -- some later messages had already been saved successfully by the server, thus no way to revert the disordering. -Enlarging the default `BATCH_NUM_MESSAGES` and `LINGER_MS` might improve message batching, thus enhancing the throughput. +#### No ordering between partitions -While, on the other hand, `LINGER_MS` would highly impact the latency. +* Make sure these `ProducerRecord`s be with the same partition -The `QUEUE_BUFFERING_MAX_MESSAGES` and `QUEUE_BUFFERING_MAX_KBYTES` would determine the `max in flight requests (some materials about Kafka would call it in this way)`. If the queue buffer is full, the `send` operation would be blocked. + - Explicitly assigned with the same `topic-partition` -Larger `QUEUE_BUFFERING_MAX_MESSAGES`/`QUEUE_BUFFERING_MAX_KBYTES` might help to improve throughput as well, while also means more messages locally buffering. + - Use the same `key` for these records -### How to achieve reliable delivery +### Idempotent Producer -* Quick Answer, +* The `enable.idempotence=true` configuration is highly **RECOMMENDED**. - 1. The Kafka cluster should be configured with `min.insync.replicas = 2` at least +* Please refer to the [document from librdkafkar](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#idempotent-producer) for more details. - 2. Configure the `KafkaProducer` with property `{ProducerConfig::ENABLE_IDEMPOTENCE, "true"}`, together with proper error-handling (within the delivery callback). +* Note: the Kafka cluster should be configured with `min.insync.replicas=2` at least -* Complete Answer, - * [How to Make KafkaProducer Reliable](HowToMakeKafkaProducerReliable.md) +## Error handling + +`Error` might occur at different places while sending a message, + +- A `KafkaException` would be triggered if `KafkaProducer` failed to trigger the `send` operation. + +- Delivery `Error` would be passed through the delivery-callback. + +There are 2 cases for the `Error` `value()` + +* Local errors + + - `RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC` -- The topic doesn't exist + + - `RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION` -- The partition doesn't exist + + - `RD_KAFKA_RESP_ERR__INVALID_ARG` -- Invalid topic (topic is null or the length is too long (>512)) + + - `RD_KAFKA_RESP_ERR__MSG_TIMED_OUT` -- No ack received within the time limit + + - `RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE` -- The mesage size conflicts with local configuration `message.max.bytes` + +* Broker errors + + - [Error Codes](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes) + + - Typical errors are + + * Invalid message: `RD_KAFKA_RESP_ERR_CORRUPT_MESSAGE`, `RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE`, `RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS`, `RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT`, `RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE`. + + * Topic/Partition not exist: `RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART`, -- automatic topic creation is disabled on the broker or the application is specifying a partition that does not exist. + + * Authorization failure: `RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED`, `RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED` + + +## Frequently Asked Questions -### How many threads would be created by a KafkaProducer? +* About the automatic `topic creation` -Most of these background threads are started internally by librdkafka. + - If the cluster's configuration is with `auto.create.topics.enable=true`, the producer/consumer could trigger the brokers to create a new topic (with `send`, `subscribe`, etc) -Here is a brief introduction what they're used for, + - Note: the default created topic may be not what you want (e.g, with `default.replication.factor=1` configuration as default, etc), thus causing other unexpected problems. -1. Each broker (in the list of BOOTSTRAP_SERVERS) would take a separate thread to transmit messages towards a kafka cluster server. +* How to enhance the sending performance? -2. Another 2 threads would handle internal operations and kinds of timers, etc. + - Enlarging the default `batch.num.messages` and `linger.ms` might improve message batching, thus enhancing the throughput. (while, on the other hand, `linger.ms` would highly impact the latency) -3. To enale the auto events-polling, one more background thread would be created, which keeps polling the delivery callback event. + - The `queue.buffering.max.messages` and `queue.buffering.max.kbytes` would determine the `max in flight requests (some materials about Kafka would call it in this way)`. If the queue buffer is full, the `send` operation would be blocked. Larger `queue.buffering.max.messages`/`queue.buffering.max.kbytes` might help to improve throughput, while also means more messages locally buffering. -### Which one of these threads will handle the callbacks +* How many threads would be created by a KafkaProducer? -It will be handled by a background thread, not by the user's thread. + - Each broker (in the list of `bootstrap.servers`) would take a separate thread to transmit messages towards a kafka cluster server. -Note, should be careful if both the `KafkaProducer::send()` and the `producer::Callback` might access the same container at the same time. + - Another 2 threads would handle internal operations and kinds of timers, etc. + - By default, `enable.manual.events.poll=false`, then one more background thread would be created, which keeps polling the delivery events and triggering the callbacks. diff --git a/include/kafka/Interceptors.h b/include/kafka/Interceptors.h index b9994631..7897afb3 100644 --- a/include/kafka/Interceptors.h +++ b/include/kafka/Interceptors.h @@ -7,18 +7,45 @@ namespace KAFKA_API { namespace clients { +/** + * Interceptors for Kafka clients. + */ class Interceptors { public: + /** + * Callback type for thread-start interceptor. + */ using ThreadStartCallback = std::function; + + /** + * Callback type for thread-exit interceptor. + */ using ThreadExitCallback = std::function; + /** + * Set interceptor for thread start. + */ Interceptors& onThreadStart(ThreadStartCallback cb) { _valid = true; _threadStartCb = std::move(cb); return *this; } + + /** + * Set interceptor for thread exit. + */ Interceptors& onThreadExit(ThreadExitCallback cb) { _valid = true; _threadExitCb = std::move(cb); return *this; } + /** + * Get interceptor for thread start. + */ ThreadStartCallback onThreadStart() const { return _threadStartCb; } + + /** + * Get interceptor for thread exit. + */ ThreadExitCallback onThreadExit() const { return _threadExitCb; } + /** + * Check if there's no interceptor. + */ bool empty() const { return !_valid; } private: