diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index 25f11295fd6..eaaa0aebe44 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -181,7 +181,9 @@ static int in_kafka_collect(struct flb_input_instance *ins, rd_kafka_message_destroy(rkm); /* TO-DO: commit the record based on `ret` */ - rd_kafka_commit(ctx->kafka.rk, NULL, 0); + rd_kafka_commit(ctx->kafka.rk, NULL, ctx->async_commit); + + flb_plg_debug(ins, "offset committed(%s)", ctx->async_commit ? "async" : "sync"); /* Break from the loop when reaching the limit of polling if available */ if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED && @@ -428,6 +430,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size), "Set the maximum size of chunk" }, + { + FLB_CONFIG_MAP_BOOL, "async_commit", "false", + 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, async_commit), + "When enabled the offset commit operation is asynchronous" + }, /* EOF */ {0} }; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index b56d9c66893..e05a946fda0 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -48,6 +48,7 @@ struct flb_in_kafka_config { int coll_fd; size_t buffer_max_size; /* Maximum size of chunk allocation */ size_t polling_threshold; + int async_commit; }; #endif