Skip to content

Commit

Permalink
Merge pull request axoflow#343 from bshifter/filterx-parse-leef2
Browse files Browse the repository at this point in the history
Filterx parse leef 2.0
  • Loading branch information
alltilla authored Nov 8, 2024
2 parents e4f1723 + 6ba689f commit bfc93a7
Show file tree
Hide file tree
Showing 15 changed files with 681 additions and 239 deletions.
18 changes: 12 additions & 6 deletions lib/scanner/csv-scanner/csv-scanner.c
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,17 @@ _switch_to_next_column(CSVScanner *self)
g_assert_not_reached();
}

gboolean
csv_scanner_take_rest(CSVScanner *self)
{
_parse_left_whitespace(self);
g_string_assign(self->current_value, self->src);
self->src += self->current_value->len;
self->state = CSV_STATE_GREEDY_COLUMN;
_translate_value(self);
return TRUE;
}

gboolean
csv_scanner_scan_next(CSVScanner *self)
{
Expand All @@ -493,12 +504,7 @@ csv_scanner_scan_next(CSVScanner *self)

if (_is_last_column(self) && (self->options->flags & CSV_SCANNER_GREEDY))
{
_parse_left_whitespace(self);
g_string_assign(self->current_value, self->src);
self->src += self->current_value->len;
self->state = CSV_STATE_GREEDY_COLUMN;
_translate_value(self);
return TRUE;
return csv_scanner_take_rest(self);
}
else if (self->src[0] == 0)
{
Expand Down
2 changes: 2 additions & 0 deletions lib/scanner/csv-scanner/csv-scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,6 @@ gchar *csv_scanner_dup_current_value(CSVScanner *self);
void csv_scanner_init(CSVScanner *pstate, CSVScannerOptions *options, const gchar *input);
void csv_scanner_deinit(CSVScanner *pstate);

gboolean csv_scanner_take_rest(CSVScanner *self);

#endif
4 changes: 3 additions & 1 deletion modules/cef/event-format-parser-cfg.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@
#include "filterx/filterx-object.h"

typedef struct _FilterXFunctionEventFormatParser FilterXFunctionEventFormatParser;
typedef struct _EventParserContext EventParserContext;

typedef FilterXObject *(*FieldParser)(FilterXFunctionEventFormatParser *parser, const gchar *value, gint value_len,
typedef FilterXObject *(*FieldParser)(EventParserContext *ctx, const gchar *value, gint value_len,
GError **error,
gpointer user_data);

typedef struct _Field
{
const gchar *name;
FieldParser field_parser;
gboolean optional;
} Field;

typedef struct _Header
Expand Down
148 changes: 115 additions & 33 deletions modules/cef/event-format-parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,24 @@ event_format_parser_error_quark(void)
}

Field
field(FilterXFunctionEventFormatParser *self, int index)
field_by_index(FilterXFunctionEventFormatParser *self, int index)
{
g_assert(index >= 0 && index < self->config.header.num_fields);
return self->config.header.fields[index];
}

static FilterXObject *
parse_default(FilterXFunctionEventFormatParser *self, const gchar *value, gint value_len, GError **error,
parse_default(EventParserContext *ctx, const gchar *value, gint value_len, GError **error,
gpointer user_data)
{
return filterx_string_new(value, value_len);
}

FilterXObject *
parse_version(FilterXFunctionEventFormatParser *self, const gchar *value, gint value_len, GError **error,
parse_version(EventParserContext *ctx, const gchar *value, gint value_len, GError **error,
gpointer user_data)
{
const gchar *log_signature = self->config.signature;
const gchar *log_signature = ctx->parser->config.signature;
gchar *colon_pos = memchr(value, ':', value_len);
if (!colon_pos || colon_pos == value)
{
Expand Down Expand Up @@ -114,16 +114,15 @@ _unescape_value_separators(KVScanner *self)
return TRUE;
}


FilterXObject *
parse_extensions(FilterXFunctionEventFormatParser *self, const gchar *input, gint input_len, GError **error,
parse_extensions(EventParserContext *ctx, const gchar *input, gint input_len, GError **error,
gpointer user_data)
{
FilterXObject *fillable = (FilterXObject *)user_data;
FilterXObject *output = filterx_object_create_dict(fillable);

KVScanner kv_scanner;
kv_scanner_init(&kv_scanner, self->config.extensions.value_separator, self->config.extensions.pair_separator, FALSE);
kv_scanner_init(&kv_scanner, ctx->kv_parser_value_separator, ctx->kv_parser_pair_separator, FALSE);
kv_scanner_set_transform_value(&kv_scanner, _unescape_value_separators);
kv_scanner_input(&kv_scanner, input);
while (kv_scanner_scan_next(&kv_scanner))
Expand All @@ -143,60 +142,100 @@ parse_extensions(FilterXFunctionEventFormatParser *self, const gchar *input, gin
}

static inline gboolean
_fill_object_col(FilterXFunctionEventFormatParser *self, gint64 index, const gchar *input, gint input_len,
FilterXObject *fillable,
GError **error)
_match_field_to_column(EventParserContext *ctx, Field *field, const gchar *input, gint input_len,
FilterXObject *fillable,
GError **error)
{
Field f = field(self, index);
FilterXObject *key = filterx_string_new(f.name, -1);
FilterXObject *val = NULL;

if (!f.field_parser)
val = parse_default(self, input, input_len, error, fillable);
if (!field->field_parser)
val = parse_default(ctx, input, input_len, error, fillable);
else
val = f.field_parser(self, input, input_len, error, fillable);
val = field->field_parser(ctx, input, input_len, error, fillable);

gboolean ok = FALSE;
if (!*error)
ok = filterx_object_set_subscript(fillable, key, &val);
if (!*error && val)
{
FilterXObject *key = filterx_string_new(field->name, -1);
ok = filterx_object_set_subscript(fillable, key, &val);
filterx_object_unref(key);
}

filterx_object_unref(val);
filterx_object_unref(key);
return ok;
}

static gboolean
_parse_column(EventParserContext *ctx, FilterXObject *fillable, GError **error)
{
CSVScanner *csv_scanner = ctx->csv_scanner;
const gchar *input = csv_scanner_get_current_value(csv_scanner);
gint input_len = csv_scanner_get_current_value_len(csv_scanner);

Field field = field_by_index(ctx->parser, ctx->field_index);

while (!_match_field_to_column(ctx, &field, input, input_len, fillable, error) && !*error && field.optional)
{
ctx->field_index++;
if (ctx->field_index >= ctx->num_fields)
return FALSE;
field = field_by_index(ctx->parser, ctx->field_index);
}
ctx->column_index++;
return TRUE;
}

static EventParserContext
_new_context(FilterXFunctionEventFormatParser *self, CSVScanner *csv_scanner)
{
EventParserContext ctx =
{
.parser = self,
.num_fields = self->config.header.num_fields,
.field_index = 0,
.csv_scanner = csv_scanner,
.flags = 0,
.kv_parser_value_separator = self->kv_value_separator != 0 ? self->kv_value_separator : self->config.extensions.value_separator,
};
g_strlcpy(ctx.kv_parser_pair_separator, self->kv_pair_separator ? : self->config.extensions.pair_separator,
EVENT_FORMAT_PARSER_PAIR_SEPARATOR_MAX_LEN);
return ctx;
}

static gboolean
parse(FilterXFunctionEventFormatParser *self, const gchar *log, gsize len, FilterXObject *fillable, GError **error)
{
gboolean ok = FALSE;
gsize num_fields = self->config.header.num_fields;

CSVScanner csv_scanner;
csv_scanner_init(&csv_scanner, &self->csv_opts, log);

guint64 i = 0;
EventParserContext ctx = _new_context(self, &csv_scanner);

while (csv_scanner_scan_next(&csv_scanner))
{
if (i >= num_fields)
if (ctx.field_index >= ctx.num_fields)
break;
ok = _parse_column(&ctx, fillable, error);
if(!ok || *error)
goto exit;
ctx.field_index++;
}

const gchar *input = csv_scanner_get_current_value(&csv_scanner);
gint input_len = csv_scanner_get_current_value_len(&csv_scanner);

ok = _fill_object_col(self, i, input, input_len, fillable, error);
if (ctx.field_index <= ctx.num_fields - 1)
{
csv_scanner_take_rest(&csv_scanner);
ok = _parse_column(&ctx, fillable, error);
if(!ok || *error)
goto exit;

i++;
}

if (i < self->csv_opts.expected_columns)
if (ctx.column_index < ctx.num_fields-1)
{
g_set_error(error, EVENT_FORMAT_PARSER_ERROR, EVENT_FORMAT_PARSER_ERR_MISSING_COLUMNS,
EVENT_FORMAT_PARSER_ERR_MISSING_COLUMNS_MSG, i, self->config.header.num_fields);
EVENT_FORMAT_PARSER_ERR_MISSING_COLUMNS_MSG, ctx.field_index, ctx.num_fields);
}


exit:
csv_scanner_deinit(&csv_scanner);

Expand Down Expand Up @@ -245,9 +284,9 @@ _free(FilterXExpr *s)
{
FilterXFunctionEventFormatParser *self = (FilterXFunctionEventFormatParser *) s;
filterx_expr_unref(self->msg);
g_free(self->kv_pair_separator);
csv_scanner_options_clean(&self->csv_opts);
filterx_generator_function_free_method(&self->super);

}

static FilterXExpr *
Expand All @@ -264,6 +303,48 @@ _extract_msg_expr(FilterXFunctionArgs *args, GError **error)
return msg_expr;
}

static gboolean
_extract_optional_args(FilterXFunctionEventFormatParser *self, FilterXFunctionArgs *args, GError **error)
{
gboolean exists;
gsize len;
const gchar *value;

value = filterx_function_args_get_named_literal_string(args, EVENT_FORMAT_PARSER_ARG_NAME_PAIR_SEPARATOR, &len,
&exists);
if (exists)
{
if (len < 1 || !value)
{
g_set_error(error, FILTERX_FUNCTION_ERROR, FILTERX_FUNCTION_ERROR_CTOR_FAIL,
EVENT_FORMAT_PARSER_ERR_EMPTY_STRING, EVENT_FORMAT_PARSER_ARG_NAME_PAIR_SEPARATOR);
goto error;
}
if (len > EVENT_FORMAT_PARSER_PAIR_SEPARATOR_MAX_LEN - 1)
{
g_set_error(error, FILTERX_FUNCTION_ERROR, FILTERX_FUNCTION_ERROR_CTOR_FAIL,
EVENT_FORMAT_PARSER_ERR_SEPARATOR_MAX_LENGTH_EXCEEDED, EVENT_FORMAT_PARSER_ARG_NAME_PAIR_SEPARATOR);
goto error;
}
self->kv_pair_separator = g_strdup(value);
}
value = filterx_function_args_get_named_literal_string(args, EVENT_FORMAT_PARSER_ARG_NAME_VALUE_SEPARATOR, &len,
&exists);
if (exists)
{
if (len < 1 || !value)
{
g_set_error(error, FILTERX_FUNCTION_ERROR, FILTERX_FUNCTION_ERROR_CTOR_FAIL,
EVENT_FORMAT_PARSER_ERR_EMPTY_STRING, EVENT_FORMAT_PARSER_ARG_NAME_VALUE_SEPARATOR);
goto error;
}
self->kv_value_separator = value[0];
}
return TRUE;
error:
return FALSE;
}

static gboolean
_extract_args(FilterXFunctionEventFormatParser *self, FilterXFunctionArgs *args, GError **error)
{
Expand All @@ -279,6 +360,9 @@ _extract_args(FilterXFunctionEventFormatParser *self, FilterXFunctionArgs *args,
if (!self->msg)
return FALSE;

if (!_extract_optional_args(self, args, error))
return FALSE;

return TRUE;
}

Expand All @@ -290,8 +374,6 @@ _set_config(FilterXFunctionEventFormatParser *self, Config *cfg)
csv_scanner_options_set_delimiters(&self->csv_opts, cfg->header.delimiters);
csv_scanner_options_set_quote_pairs(&self->csv_opts, "");
csv_scanner_options_set_dialect(&self->csv_opts, CSV_SCANNER_ESCAPE_UNQUOTED_DELIMITER);
csv_scanner_options_set_expected_columns(&self->csv_opts, cfg->header.num_fields);
self->csv_opts.flags |= CSV_SCANNER_GREEDY;
}

gboolean
Expand Down
27 changes: 24 additions & 3 deletions modules/cef/event-format-parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,17 @@
#define EVENT_FORMAT_PARSER_ERR_LOG_SIGN_DIFFERS_MSG "the log signature differs. actual:%s expected:%s"
#define EVENT_FORMAT_PARSER_ERR_MISSING_COLUMNS_MSG "not enough header columns provided. actual:%ld expected:%ld"
#define EVENT_FORMAT_PARSER_ERR_NOT_STRING_INPUT_MSG "input argument must be string"
#define EVENT_FORMAT_PARSER_ERR_EMPTY_STRING "%s must be a non-empty string literal"
#define EVENT_FORMAT_PARSER_ERR_SEPARATOR_MAX_LENGTH_EXCEEDED "%s max length exceeded"

#define EVENT_FORMAT_PARSER_ERROR event_format_parser_error_quark()
GQuark event_format_parser_error_quark(void);

#define EVENT_FORMAT_PARSER_PAIR_SEPARATOR_MAX_LEN 0x05

#define EVENT_FORMAT_PARSER_ARG_NAME_PAIR_SEPARATOR "pair_separator"
#define EVENT_FORMAT_PARSER_ARG_NAME_VALUE_SEPARATOR "value_separator"

enum EventFormatParserError
{
EVENT_FORMAT_PARSER_ERR_NO_LOG_SIGN,
Expand All @@ -57,15 +64,29 @@ struct _FilterXFunctionEventFormatParser
FilterXExpr *msg;
CSVScannerOptions csv_opts;
Config config;
gchar *kv_pair_separator;
gchar kv_value_separator;
};

struct _EventParserContext
{
FilterXFunctionEventFormatParser *parser;
guint64 num_fields;
guint64 field_index;
guint64 column_index;
CSVScanner *csv_scanner;
guint64 flags;
gchar kv_parser_pair_separator[EVENT_FORMAT_PARSER_PAIR_SEPARATOR_MAX_LEN];
gchar kv_parser_value_separator;
};

gboolean filterx_function_parser_init_instance(FilterXFunctionEventFormatParser *s, const gchar *fn_name,
FilterXFunctionArgs *args, Config *cfg, GError **error);

FilterXObject *parse_version(FilterXFunctionEventFormatParser *parser, const gchar *value, gint value_len,
FilterXObject *parse_version(EventParserContext *ctx, const gchar *value, gint value_len,
GError **error,
gpointer user_data);
FilterXObject *parse_extensions(FilterXFunctionEventFormatParser *parser, const gchar *value, gint value_len,
FilterXObject *parse_extensions(EventParserContext *ctx, const gchar *value, gint value_len,
GError **error,
gpointer user_data);

Expand All @@ -74,7 +95,7 @@ static inline void append_error_message(GError **error, const char *extra_info)
if (error == NULL || *error == NULL)
return;

gchar *new_message = g_strdup_printf("%s: %s", (*error)->message, extra_info);
gchar *new_message = g_strdup_printf("%s %s", (*error)->message, extra_info);
GError *new_error = g_error_new((*error)->domain, (*error)->code, "%s", new_message);

g_error_free(*error);
Expand Down
4 changes: 3 additions & 1 deletion modules/cef/filterx-func-parse-cef.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
#include "plugin.h"
#include "filterx/expr-function.h"

#define FILTERX_FUNC_PARSE_CEF_USAGE "Usage: parse_cef(str)"
#define FILTERX_FUNC_PARSE_CEF_USAGE "Usage: parse_cef(str " \
EVENT_FORMAT_PARSER_ARG_NAME_PAIR_SEPARATOR"=boolean, " \
EVENT_FORMAT_PARSER_ARG_NAME_VALUE_SEPARATOR"=boolean)"

FILTERX_GENERATOR_FUNCTION_DECLARE(parse_cef);

Expand Down
Loading

0 comments on commit bfc93a7

Please sign in to comment.