From 1d4d9ed37a9fc8159decb8689425c87b27420628 Mon Sep 17 00:00:00 2001 From: Gautam Punhani Date: Tue, 27 Jul 2021 09:36:31 +0530 Subject: [PATCH] output: added multithreaded worker support for proxy plugins This patch adds worker support to golang output plugins, without which if golang plugins were becoming unresponsive it was blocking other i/p and o/p plugins as well This fixes issue https://github.com/fluent/fluent-bit-go/issues/45 Signed-off-by: Gautam Punhani --- src/flb_output.c | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/src/flb_output.c b/src/flb_output.c index e582efdbc43..adfdd62c65f 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -912,6 +912,15 @@ int flb_output_init_all(struct flb_config *config) flb_output_instance_destroy(ins); return -1; } + + /* Multi-threading enabled if configured */ + ret = flb_output_enable_multi_threading(ins, config); + if (ret == -1) { + flb_error("[output] could not start thread pool for '%s' plugin", + p->name); + return -1; + } + continue; } #endif @@ -1017,18 +1026,28 @@ int flb_output_init_all(struct flb_config *config) return -1; } - /* Multi-threading enabled ? (through 'workers' property) */ - if (ins->tp_workers > 0) { - ret = flb_output_thread_pool_create(config, ins); - if (ret == -1) { - flb_error("[output] could not start thread pool for '%s' plugin", - p->name); - flb_output_instance_destroy(ins); - return -1; - } + /* Multi-threading enabled if configured */ + ret = flb_output_enable_multi_threading(ins, config); + if (ret == -1) { + flb_error("[output] could not start thread pool for '%s' plugin", + p->name); + return -1; + } + } + + return 0; +} - flb_output_thread_pool_start(ins); +/* Add thread pool for output plugin if configured with workers */ +int flb_output_enable_multi_threading(struct flb_output_instance *ins, struct flb_config *config) +{ + /* Multi-threading enabled ? (through 'workers' property) */ + if (ins->tp_workers > 0) { + if(flb_output_thread_pool_create(config, ins) != 0) { + flb_output_instance_destroy(ins); + return -1; } + flb_output_thread_pool_start(ins); } return 0;