Skip to content

Commit

Permalink
Merge pull request axoflow#446 from bazsi/filterx-unmarshal-does-not-…
Browse files Browse the repository at this point in the history
…cause-variables-to-be-dirty

Filterx unmarshal does not cause variables to be dirty
  • Loading branch information
OverOrion authored Jan 22, 2025
2 parents 7a1ef06 + 795ba18 commit ca7e054
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 118 deletions.
2 changes: 1 addition & 1 deletion lib/filterx/expr-template.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ _eval(FilterXExpr *s)
/* FIXME: we could go directly to filterx_string_new() here to avoid a round trip in FilterXMessageValue */
/* FIXME/2: let's make this handle literal and trivial templates */

log_template_format_value_and_type_with_context(self->template, context->msgs, context->num_msg,
log_template_format_value_and_type_with_context(self->template, &context->msg, 1,
&context->template_eval_options, value, &t);

/* NOTE: we borrow value->str here which is stored in a scratch buffer
Expand Down
52 changes: 33 additions & 19 deletions lib/filterx/expr-variable.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@
typedef struct _FilterXVariableExpr
{
FilterXExpr super;
FilterXObject *variable_name;
FilterXVariableType variable_type;
NVHandle handle;
guint32 declared:1, handle_is_macro:1;
FilterXObject *variable_name;
guint32 handle_is_macro:1;
} FilterXVariableExpr;

static FilterXObject *
Expand All @@ -61,7 +62,7 @@ _pull_variable_from_message(FilterXVariableExpr *self, FilterXEvalContext *conte
static void
_whiteout_variable(FilterXVariableExpr *self, FilterXEvalContext *context)
{
filterx_scope_register_variable(context->scope, self->handle, NULL);
filterx_scope_register_variable(context->scope, FX_VAR_MESSAGE_TIED, self->handle, NULL);
}

static FilterXObject *
Expand All @@ -82,12 +83,12 @@ _eval(FilterXExpr *s)
return value;
}

if (!filterx_variable_handle_is_floating(self->handle))
if (filterx_variable_handle_is_message_tied(self->handle))
{
FilterXObject *msg_ref = _pull_variable_from_message(self, context, context->msgs[0]);
FilterXObject *msg_ref = _pull_variable_from_message(self, context, context->msg);
if(!msg_ref)
return NULL;
filterx_scope_register_variable(context->scope, self->handle, msg_ref);
filterx_scope_register_variable(context->scope, FX_VAR_MESSAGE_TIED, self->handle, msg_ref);
return msg_ref;
}

Expand All @@ -103,7 +104,7 @@ _update_repr(FilterXExpr *s, FilterXObject *new_repr)
FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle);

g_assert(variable != NULL);
filterx_variable_set_value(variable, new_repr);
filterx_variable_set_value(variable, new_repr, FALSE);
}

static gboolean
Expand All @@ -113,20 +114,23 @@ _assign(FilterXExpr *s, FilterXObject *new_value)
FilterXScope *scope = filterx_eval_get_scope();
FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle);

if (self->handle_is_macro)
{
filterx_eval_push_error("Macro based variable cannot be changed", &self->super, self->variable_name);
return FALSE;
}

if (!variable)
{
/* NOTE: we pass NULL as initial_value to make sure the new variable
* is considered changed due to the assignment */

if (self->declared)
variable = filterx_scope_register_declared_variable(scope, self->handle, NULL);
else
variable = filterx_scope_register_variable(scope, self->handle, NULL);
variable = filterx_scope_register_variable(scope, self->variable_type, self->handle, NULL);
}

/* this only clones mutable objects */
new_value = filterx_object_clone(new_value);
filterx_variable_set_value(variable, new_value);
filterx_variable_set_value(variable, new_value, TRUE);
filterx_object_unref(new_value);
return TRUE;
}
Expand All @@ -142,14 +146,21 @@ _isset(FilterXExpr *s)
return filterx_variable_is_set(variable);

FilterXEvalContext *context = filterx_eval_get_context();
LogMessage *msg = context->msgs[0];
LogMessage *msg = context->msg;
return log_msg_is_value_set(msg, self->handle);
}

static gboolean
_unset(FilterXExpr *s)
{
FilterXVariableExpr *self = (FilterXVariableExpr *) s;

if (self->handle_is_macro)
{
filterx_eval_push_error("Macro based variable cannot be changed", &self->super, self->variable_name);
return FALSE;
}

FilterXEvalContext *context = filterx_eval_get_context();

FilterXVariable *variable = filterx_scope_lookup_variable(context->scope, self->handle);
Expand All @@ -159,7 +170,7 @@ _unset(FilterXExpr *s)
return TRUE;
}

LogMessage *msg = context->msgs[0];
LogMessage *msg = context->msg;
if (log_msg_is_value_set(msg, self->handle))
_whiteout_variable(self, context);

Expand All @@ -176,7 +187,7 @@ _free(FilterXExpr *s)
}

static FilterXExpr *
filterx_variable_expr_new(FilterXString *name, FilterXVariableType type)
filterx_variable_expr_new(FilterXString *name, FilterXVariableType variable_type)
{
FilterXVariableExpr *self = g_new0(FilterXVariableExpr, 1);

Expand All @@ -188,9 +199,10 @@ filterx_variable_expr_new(FilterXString *name, FilterXVariableType type)
self->super.is_set = _isset;
self->super.unset = _unset;

self->variable_type = variable_type;
self->variable_name = (FilterXObject *) name;
self->handle = filterx_map_varname_to_handle(filterx_string_get_value_ref(self->variable_name, NULL), type);
if (type == FX_VAR_MESSAGE)
self->handle = filterx_map_varname_to_handle(filterx_string_get_value_ref(self->variable_name, NULL), variable_type);
if (variable_type == FX_VAR_MESSAGE_TIED)
self->handle_is_macro = log_msg_is_handle_macro(filterx_variable_handle_to_nv_handle(self->handle));

/* NOTE: name borrows the string value from the string object */
Expand All @@ -202,7 +214,7 @@ filterx_variable_expr_new(FilterXString *name, FilterXVariableType type)
FilterXExpr *
filterx_msg_variable_expr_new(FilterXString *name)
{
return filterx_variable_expr_new(name, FX_VAR_MESSAGE);
return filterx_variable_expr_new(name, FX_VAR_MESSAGE_TIED);
}

FilterXExpr *
Expand All @@ -217,5 +229,7 @@ filterx_variable_expr_declare(FilterXExpr *s)
FilterXVariableExpr *self = (FilterXVariableExpr *) s;

g_assert(s->eval == _eval);
self->declared = TRUE;
/* we can only declare a floating variable */
g_assert(self->variable_type == FX_VAR_FLOATING);
self->variable_type = FX_VAR_DECLARED_FLOATING;
}
7 changes: 3 additions & 4 deletions lib/filterx/filterx-eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,8 @@ filterx_format_last_error_location(void)
}

FilterXEvalResult
filterx_eval_exec(FilterXEvalContext *context, FilterXExpr *expr, LogMessage *msg)
filterx_eval_exec(FilterXEvalContext *context, FilterXExpr *expr)
{
context->msgs = &msg;
context->num_msg = 1;
FilterXEvalResult result = FXE_FAILURE;

FilterXObject *res = filterx_expr_eval(expr);
Expand All @@ -147,7 +145,7 @@ filterx_eval_exec(FilterXEvalContext *context, FilterXExpr *expr, LogMessage *ms
}

void
filterx_eval_init_context(FilterXEvalContext *context, FilterXEvalContext *previous_context)
filterx_eval_init_context(FilterXEvalContext *context, FilterXEvalContext *previous_context, LogMessage *msg)
{
FilterXScope *scope;

Expand All @@ -158,6 +156,7 @@ filterx_eval_init_context(FilterXEvalContext *context, FilterXEvalContext *previ
filterx_scope_make_writable(&scope);

memset(context, 0, sizeof(*context));
context->msg = msg;
context->template_eval_options = DEFAULT_TEMPLATE_EVAL_OPTIONS;
context->scope = scope;

Expand Down
7 changes: 3 additions & 4 deletions lib/filterx/filterx-eval.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ typedef enum _FilterXEvalControl
typedef struct _FilterXEvalContext FilterXEvalContext;
struct _FilterXEvalContext
{
LogMessage **msgs;
gint num_msg;
LogMessage *msg;
FilterXScope *scope;
FilterXError error;
LogTemplateEvalOptions template_eval_options;
Expand All @@ -62,14 +61,14 @@ FilterXScope *filterx_eval_get_scope(void);
void filterx_eval_push_error(const gchar *message, FilterXExpr *expr, FilterXObject *object);
void filterx_eval_push_error_info(const gchar *message, FilterXExpr *expr, gchar *info, gboolean free_info);
void filterx_eval_set_context(FilterXEvalContext *context);
FilterXEvalResult filterx_eval_exec(FilterXEvalContext *context, FilterXExpr *expr, LogMessage *msg);
FilterXEvalResult filterx_eval_exec(FilterXEvalContext *context, FilterXExpr *expr);
const gchar *filterx_eval_get_last_error(void);
EVTTAG *filterx_format_last_error(void);
EVTTAG *filterx_format_last_error_location(void);
void filterx_eval_clear_errors(void);
EVTTAG *filterx_format_eval_result(FilterXEvalResult result);

void filterx_eval_init_context(FilterXEvalContext *context, FilterXEvalContext *previous_context);
void filterx_eval_init_context(FilterXEvalContext *context, FilterXEvalContext *previous_context, LogMessage *msg);
void filterx_eval_deinit_context(FilterXEvalContext *context);

static inline void
Expand Down
4 changes: 2 additions & 2 deletions lib/filterx/filterx-pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_o
FilterXEvalResult eval_res;

path_options = log_path_options_chain(&local_path_options, path_options);
filterx_eval_init_context(&eval_context, path_options->filterx_context);
filterx_eval_init_context(&eval_context, path_options->filterx_context, msg);

if (filterx_scope_has_log_msg_changes(eval_context.scope))
filterx_scope_invalidate_log_msg_cache(eval_context.scope);
Expand All @@ -79,7 +79,7 @@ log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_o
evt_tag_msg_reference(msg));

NVTable *payload = nv_table_ref(msg->payload);
eval_res = filterx_eval_exec(&eval_context, self->block, msg);
eval_res = filterx_eval_exec(&eval_context, self->block);

msg_trace("<<<<<< filterx rule evaluation result",
filterx_format_eval_result(eval_res),
Expand Down
33 changes: 9 additions & 24 deletions lib/filterx/filterx-scope.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ filterx_scope_lookup_variable(FilterXScope *self, FilterXVariableHandle handle)

static FilterXVariable *
_register_variable(FilterXScope *self,
FilterXVariableType variable_type,
FilterXVariableHandle handle,
FilterXObject *initial_value)
{
Expand All @@ -129,9 +130,7 @@ _register_variable(FilterXScope *self,
* it was a new value */

filterx_variable_set_generation(v_slot, self->generation);
filterx_variable_set_value(v_slot, initial_value);
/* consider this to be unset just as an initial registration is */
filterx_variable_unassign(v_slot);
filterx_variable_set_value(v_slot, initial_value, FALSE);
}
return v_slot;
}
Expand All @@ -142,41 +141,27 @@ _register_variable(FilterXScope *self,


FilterXVariable v;
filterx_variable_init_instance(&v, handle, initial_value, self->generation);
filterx_variable_init_instance(&v, variable_type, handle, initial_value, self->generation);
g_array_insert_val(self->variables, v_index, v);

return &g_array_index(self->variables, FilterXVariable, v_index);
}

FilterXVariable *
filterx_scope_register_variable(FilterXScope *self,
FilterXVariableType variable_type,
FilterXVariableHandle handle,
FilterXObject *initial_value)
{
FilterXVariable *v = _register_variable(self, handle, initial_value);
filterx_variable_set_declared(v, FALSE);
FilterXVariable *v = _register_variable(self, variable_type, handle, initial_value);

/* the scope needs to be synced with the message if it holds a
* message-tied variable (e.g. $MSG) */
if (!filterx_variable_handle_is_floating(handle))
if (filterx_variable_handle_is_message_tied(handle))
self->syncable = TRUE;
return v;
}

FilterXVariable *
filterx_scope_register_declared_variable(FilterXScope *self,
FilterXVariableHandle handle,
FilterXObject *initial_value)

{
g_assert(filterx_variable_handle_is_floating(handle));

FilterXVariable *v = _register_variable(self, handle, initial_value);
filterx_variable_set_declared(v, TRUE);

return v;
}

gboolean
filterx_scope_foreach_variable(FilterXScope *self, FilterXScopeForeachFunc func, gpointer user_data)
{
Expand Down Expand Up @@ -284,7 +269,7 @@ filterx_scope_new(void)

g_atomic_counter_set(&self->ref_cnt, 1);
self->variables = g_array_sized_new(FALSE, TRUE, sizeof(FilterXVariable), 16);
g_array_set_clear_func(self->variables, (GDestroyNotify) filterx_variable_free);
g_array_set_clear_func(self->variables, (GDestroyNotify) filterx_variable_clear);
return self;
}

Expand All @@ -297,7 +282,7 @@ filterx_scope_clone(FilterXScope *other)
{
FilterXVariable *v = &g_array_index(other->variables, FilterXVariable, src_index);

if (filterx_variable_is_declared(v) || !filterx_variable_is_floating(v))
if (filterx_variable_is_declared(v) || filterx_variable_is_message_tied(v))
{
g_array_append_val(self->variables, *v);
FilterXVariable *v_clone = &g_array_index(self->variables, FilterXVariable, dst_index);
Expand Down Expand Up @@ -391,7 +376,7 @@ filterx_scope_invalidate_log_msg_cache(FilterXScope *self)
if (!filterx_variable_is_floating(v) && self->syncable)
{
/* skip this variable */
filterx_variable_free(v);
filterx_variable_clear(v);
}
else
{
Expand Down
4 changes: 1 addition & 3 deletions lib/filterx/filterx-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,9 @@ void filterx_scope_sync(FilterXScope *self, LogMessage *msg);

FilterXVariable *filterx_scope_lookup_variable(FilterXScope *self, FilterXVariableHandle handle);
FilterXVariable *filterx_scope_register_variable(FilterXScope *self,
FilterXVariableType variable_type,
FilterXVariableHandle handle,
FilterXObject *initial_value);
FilterXVariable *filterx_scope_register_declared_variable(FilterXScope *self,
FilterXVariableHandle handle,
FilterXObject *initial_value);
gboolean filterx_scope_foreach_variable(FilterXScope *self, FilterXScopeForeachFunc func, gpointer user_data);
void filterx_scope_invalidate_log_msg_cache(FilterXScope *self);

Expand Down
15 changes: 9 additions & 6 deletions lib/filterx/filterx-variable.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,32 @@
FilterXVariableHandle
filterx_map_varname_to_handle(const gchar *name, FilterXVariableType type)
{
if (type == FX_VAR_MESSAGE)
if (type == FX_VAR_MESSAGE_TIED)
name++;

NVHandle nv_handle = log_msg_get_value_handle(name);

if (type == FX_VAR_MESSAGE)
if (type == FX_VAR_MESSAGE_TIED)
return (FilterXVariableHandle) nv_handle;
return (FilterXVariableHandle) nv_handle | FILTERX_HANDLE_FLOATING_BIT;
}

void
filterx_variable_free(FilterXVariable *v)
filterx_variable_clear(FilterXVariable *v)
{
filterx_object_unref(v->value);
}

void
filterx_variable_init_instance(FilterXVariable *v, FilterXVariableHandle handle,
FilterXObject *initial_value, guint32 generation)
filterx_variable_init_instance(FilterXVariable *v,
FilterXVariableType variable_type,
FilterXVariableHandle handle,
FilterXObject *initial_value,
guint32 generation)
{
v->handle = handle;
v->variable_type = variable_type;
v->assigned = FALSE;
v->declared = FALSE;
v->generation = generation;
v->value = filterx_object_ref(initial_value);
}
Loading

0 comments on commit ca7e054

Please sign in to comment.