Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_kafka: Add async commit mode #8894

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

HaChanho
Copy link

@HaChanho HaChanho commented May 30, 2024

We've added async_commit mode to Fluent Bit's in_kafka plugin, which allows you to handle Kafka commit operations asynchronously. Depending on your choice, this is expected to improve performance and efficient resource usage.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
[INPUT]
    name kafka
    brokers 127.0.0.1:9092
    topics test
    async_commit true
  • Debug log output from testing the change
async_commit mode OFF
<snip>
[2024/05/30 09:23:57] [debug] [input:kafka:kafka.0] kafka message received
[2024/05/30 09:23:57] [debug] [input:kafka:kafka.0] offset committed(sync)
[2024/05/30 09:23:57] [debug] [input:kafka:kafka.0] kafka message received
[2024/05/30 09:23:57] [debug] [input:kafka:kafka.0] offset committed(sync)
[2024/05/30 09:23:57] [debug] [input:kafka:kafka.0] kafka message received
[2024/05/30 09:23:57] [debug] [input:kafka:kafka.0] offset committed(sync)
[2024/05/30 09:23:57] [debug] [input:kafka:kafka.0] kafka message received
[2024/05/30 09:23:57] [debug] [input:kafka:kafka.0] offset committed(sync)
[2024/05/30 09:23:57] [debug] [input:kafka:kafka.0] kafka message received
[2024/05/30 09:23:57] [debug] [input:kafka:kafka.0] offset committed(sync)
[2024/05/30 09:23:57] [debug] [input:kafka:kafka.0] kafka message received
[2024/05/30 09:23:57] [debug] [input:kafka:kafka.0] offset committed(sync)
[2024/05/30 09:23:57] [debug] [input:kafka:kafka.0] kafka message received
[2024/05/30 09:23:57] [debug] [input:kafka:kafka.0] offset committed(sync)
<snip>
async_commit mode ON
[2024/05/30 09:49:19] [debug] [input:kafka:kafka.0] kafka message received
[2024/05/30 09:49:19] [debug] [input:kafka:kafka.0] offset committed(async)
[2024/05/30 09:49:19] [debug] [input:kafka:kafka.0] kafka message received
[2024/05/30 09:49:19] [debug] [input:kafka:kafka.0] offset committed(async)
[2024/05/30 09:49:19] [debug] [input:kafka:kafka.0] kafka message received
[2024/05/30 09:49:19] [debug] [input:kafka:kafka.0] offset committed(async)
[2024/05/30 09:49:19] [debug] [input:kafka:kafka.0] kafka message received
[2024/05/30 09:49:19] [debug] [input:kafka:kafka.0] offset committed(async)
[2024/05/30 09:49:19] [debug] [input:kafka:kafka.0] kafka message received
[2024/05/30 09:49:19] [debug] [input:kafka:kafka.0] offset committed(async)
[2024/05/30 09:49:19] [debug] [input:kafka:kafka.0] kafka message received
[2024/05/30 09:49:19] [debug] [input:kafka:kafka.0] offset committed(async)
[2024/05/30 09:49:19] [debug] [input:kafka:kafka.0] kafka message received
[2024/05/30 09:49:19] [debug] [input:kafka:kafka.0] offset committed(async)
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@edsiper
Copy link
Member

edsiper commented Jun 1, 2024

@HaChanho thanks for this contribution.

just a side question, what if rd_kafka_commit() is called after the iteration happens, once the while has finished ?, now it runs right after every message which has been consumed:

rd_kafka_commit(ctx->kafka.rk, NULL, 0);

@HaChanho
Copy link
Author

HaChanho commented Jun 3, 2024

@edsiper

just a side question, what if rd_kafka_commit() is called after the iteration happens, once the while has finished ?, now it runs right after every message which has been consumed:

If we process the commit after the loop ends, it will certainly improve performance compared to committing right after consuming each message. However, there is a possibility of duplicate message processing if the consumer restarts before the offset is updated due to commit delay.

I believe that consumer restart is rare under normal circumstances, and handling duplicate messages may not be a significant issue compared to message loss. It might be better to handle duplicate messages in the final processing system (output).

Do you have any plans to improve the code related to this?

Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Dec 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants