Skip to content

Commit

Permalink
in_kafka: add async commit mode
Browse files Browse the repository at this point in the history
Signed-off-by: HaChanHo <[email protected]>
  • Loading branch information
HaChanho authored and kakao-champ-ion committed May 30, 2024
1 parent dffcd42 commit 047120c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
9 changes: 8 additions & 1 deletion plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ static int in_kafka_collect(struct flb_input_instance *ins,
rd_kafka_message_destroy(rkm);

/* TO-DO: commit the record based on `ret` */
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
rd_kafka_commit(ctx->kafka.rk, NULL, ctx->async_commit);

flb_plg_debug(ins, "offset committed(%s)", ctx->async_commit ? "async" : "sync");

/* Break from the loop when reaching the limit of polling if available */
if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED &&
Expand Down Expand Up @@ -428,6 +430,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size),
"Set the maximum size of chunk"
},
{
FLB_CONFIG_MAP_BOOL, "async_commit", "false",
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, async_commit),
"When enabled the offset commit operation is asynchronous"
},
/* EOF */
{0}
};
Expand Down
1 change: 1 addition & 0 deletions plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ struct flb_in_kafka_config {
int coll_fd;
size_t buffer_max_size; /* Maximum size of chunk allocation */
size_t polling_threshold;
int async_commit;
};

#endif

0 comments on commit 047120c

Please sign in to comment.