Skip to content

Commit

Permalink
filter_log_to_metrics: Add timer callback for emitting metrics
Browse files Browse the repository at this point in the history
This commit will change the log_to_metrics filter to use a timer based
metric inject and not directly inject metrics on every incoming log
record anymore. This will lower the overall load and memory consumption
 especially in high-volume and high-cardinality situations.

Signed-off-by: Richard Treu <[email protected]>
  • Loading branch information
drbugfinder-work committed Aug 20, 2024
1 parent 8a1d830 commit c6d7766
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 8 deletions.
102 changes: 96 additions & 6 deletions plugins/filter_log_to_metrics/log_to_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,35 @@ static int fill_labels(struct log_to_metrics_ctx *ctx, char **label_values,
return label_counter;
}

/* Timer callback to inject metrics into the pipeline */
static void cb_send_metric_chunk(struct flb_config *config, void *data)
{
int ret;
struct log_to_metrics_ctx *ctx = data;

/* Check that metric context is not empty */
if (ctx->cmt == NULL || ctx->input_ins == NULL) {
return;
}

if (ctx->new_data == FLB_TRUE) {
ret = flb_input_metrics_append(ctx->input_ins, ctx->tag,
strlen(ctx->tag), ctx->cmt);
if (ret != 0) {
flb_plg_error(ctx->ins, "could not append metrics");
}
}

/* Check if we are shutting down. If so, stop our timer */
if (config->is_shutting_down) {
if(ctx->timer && ctx->timer->active) {
flb_plg_debug(ctx->ins, "Stopping callback timer");
flb_sched_timer_cb_disable(ctx->timer);
}
}
ctx->new_data = FLB_FALSE;
}

static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
struct flb_config *config, void *data)
{
Expand All @@ -462,6 +491,7 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
char metric_subsystem[MAX_METRIC_LENGTH];
char value_field[MAX_METRIC_LENGTH];
struct flb_input_instance *input_ins;
struct flb_sched *sched;

int i;
/* Create context */
Expand Down Expand Up @@ -729,6 +759,43 @@ static int cb_log_to_metrics_init(struct flb_filter_instance *f_ins,
}
ctx->input_ins = input_ins;


if (ctx->interval_sec <= 0) {
ctx->interval_sec = atoi(DEFAULT_INTERVAL_SEC);
}
if (ctx->interval_nsec <= 0) {
ctx->interval_nsec = atoi(DEFAULT_INTERVAL_NSEC);
}
if (ctx->interval_sec == 0 && ctx->interval_nsec == 0) {
flb_plg_debug(ctx->ins, "Interval is set to 0, will not use timer and "
"send metrics immediately");
ctx->timer_mode = FLB_FALSE;
return 0;
}

/* Initialize timer for scheduled metric updates */
sched = flb_sched_ctx_get();
if(sched == 0) {
flb_plg_error(f_ins, "could not get scheduler context");
log_to_metrics_destroy(ctx);
return -1;
}
//Convert interval_sec and interval_nsec to milliseconds
ctx->timer_interval = (ctx->interval_sec * 1000) +
(ctx->interval_nsec / 1000000);
flb_plg_debug(ctx->ins,
"Creating metric timer with frequency %d ms",
ctx->timer_interval);

ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->timer_interval, cb_send_metric_chunk,
ctx, &ctx->timer);
if (ret < 0) {
flb_plg_error(f_ins, "could not create timer callback");
log_to_metrics_destroy(ctx);
return -1;
}
ctx->timer_mode = FLB_TRUE;
return 0;
}

Expand Down Expand Up @@ -920,9 +987,17 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
return -1;
}

ret = flb_input_metrics_append(ctx->input_ins, ctx->tag, strlen(ctx->tag), ctx->cmt);
if (ret != 0) {
flb_plg_error(ctx->ins, "could not append metrics");
if (ctx->timer_mode == FLB_FALSE) {
ret = flb_input_metrics_append(ctx->input_ins, ctx->tag,
strlen(ctx->tag), ctx->cmt);

if (ret != 0) {
flb_plg_error(ctx->ins, "could not append metrics. "
"Please consider to use interval_sec and interval_nsec");
}
}
else {
ctx->new_data = FLB_TRUE;
}

/* Cleanup */
Expand All @@ -941,6 +1016,7 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
}
}


if (ctx->discard_logs) {
*out_buf = NULL;
*out_size = 0;
Expand All @@ -958,7 +1034,10 @@ static int cb_log_to_metrics_filter(const void *data, size_t bytes,
static int cb_log_to_metrics_exit(void *data, struct flb_config *config)
{
struct log_to_metrics_ctx *ctx = data;

if(ctx->timer != NULL) {
flb_plg_debug(ctx->ins, "Destroying callback timer");
flb_sched_timer_destroy(ctx->timer);
}
return log_to_metrics_destroy(ctx);
}

Expand Down Expand Up @@ -1037,13 +1116,24 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, emitter_name),
"Name of the emitter (advanced users)"
},

{
FLB_CONFIG_MAP_SIZE, "emitter_mem_buf_limit", FLB_MEM_BUF_LIMIT_DEFAULT,
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, emitter_mem_buf_limit),
"set a buffer limit to restrict memory usage of metrics emitter"
},

{
FLB_CONFIG_MAP_INT, "interval_sec", DEFAULT_INTERVAL_SEC,
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, interval_sec),
"Set the timer interval for metrics emission. If interval_sec and "
"interval_nsec are set to 0, the timer is disabled (default)."
},
{
FLB_CONFIG_MAP_INT, "interval_nsec", DEFAULT_INTERVAL_NSEC,
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, interval_nsec),
"Set the timer interval (subseconds) for metrics emission. "
"If interval_sec and interval_nsec are set to 0, the timer is disabled "
"(default)."
},
{
FLB_CONFIG_MAP_BOOL, "discard_logs", "false",
0, FLB_TRUE, offsetof(struct log_to_metrics_ctx, discard_logs),
Expand Down
10 changes: 8 additions & 2 deletions plugins/filter_log_to_metrics/log_to_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@

#define FLB_MEM_BUF_LIMIT_DEFAULT "10M"
#define DEFAULT_LOG_TO_METRICS_NAMESPACE "log_metric"

#define DEFAULT_INTERVAL_SEC "0"
#define DEFAULT_INTERVAL_NSEC "0"

struct log_to_metrics_ctx {
struct mk_list rules;
struct flb_filter_instance *ins;
struct cmt *cmt;

struct flb_input_instance *input_ins;

char **label_keys;
Expand All @@ -83,6 +83,12 @@ struct log_to_metrics_ctx {
flb_sds_t tag;
flb_sds_t emitter_name;
size_t emitter_mem_buf_limit;
int interval_sec;
int interval_nsec;
int timer_interval;
int timer_mode;
struct flb_sched_timer *timer;
int new_data;
};

struct grep_rule
Expand Down

0 comments on commit c6d7766

Please sign in to comment.