From 62ea24b541f06e8caf4d78a089cada383515bd78 Mon Sep 17 00:00:00 2001 From: Peter Deng Date: Wed, 3 Jul 2024 20:53:30 +0800 Subject: [PATCH] Add `parse_ints` config in json parser to support parsing int or float properly (#33699) **Description:** expose json iterator config in json parser **Link to tracking Issue:** Fixes #33696 **Testing:** added. **Documentation:** updated --- .chloggen/json_parser_number_data_type.yaml | 27 ++++ pkg/stanza/docs/operators/json_parser.md | 21 +-- pkg/stanza/operator/parser/json/config.go | 3 + .../operator/parser/json/config_test.go | 8 ++ pkg/stanza/operator/parser/json/parser.go | 61 ++++++++- .../operator/parser/json/parser_test.go | 127 ++++++++++++++++++ .../operator/parser/json/testdata/config.yaml | 3 + 7 files changed, 237 insertions(+), 13 deletions(-) create mode 100644 .chloggen/json_parser_number_data_type.yaml diff --git a/.chloggen/json_parser_number_data_type.yaml b/.chloggen/json_parser_number_data_type.yaml new file mode 100644 index 000000000000..a6cb16ebad41 --- /dev/null +++ b/.chloggen/json_parser_number_data_type.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add `parse_ints` config in json parser to support parsing int or float properly + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33696] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/docs/operators/json_parser.md b/pkg/stanza/docs/operators/json_parser.md index f173c337ed15..2f8f2c2286bd 100644 --- a/pkg/stanza/docs/operators/json_parser.md +++ b/pkg/stanza/docs/operators/json_parser.md @@ -4,16 +4,17 @@ The `json_parser` operator parses the string-type field selected by `parse_from` ### Configuration Fields -| Field | Default | Description | -| --- | --- | --- | -| `id` | `json_parser` | A unique identifier for the operator. | -| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | -| `parse_from` | `body` | The [field](../types/field.md) from which the value will be parsed. | -| `parse_to` | `attributes` | The [field](../types/field.md) to which the value will be parsed. | -| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). | -| `if` | | An [expression](../types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. | -| `timestamp` | `nil` | An optional [timestamp](../types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator. | -| `severity` | `nil` | An optional [severity](../types/severity.md) block which will parse a severity field before passing the entry to the output operator. | +| Field | Default | Description | +| --- | --- | --- | +| `id` | `json_parser` | A unique identifier for the operator. | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | +| `parse_from` | `body` | The [field](../types/field.md) from which the value will be parsed. | +| `parse_to` | `attributes` | The [field](../types/field.md) to which the value will be parsed. | +| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). | +| `if` | | An [expression](../types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. | +| `timestamp` | `nil` | An optional [timestamp](../types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator. | +| `severity` | `nil` | An optional [severity](../types/severity.md) block which will parse a severity field before passing the entry to the output operator. | +| `parse_ints` | `false` | Numbers like `int` and `float` are parsed as `float64` by default. When `parse_ints` is enabled, numbers are parsed as `json.Number` and then converted to `int64` or `float64` based on the value. However, this also introduces additional overhead. | ### Embedded Operations diff --git a/pkg/stanza/operator/parser/json/config.go b/pkg/stanza/operator/parser/json/config.go index bb45edb402b9..e7fe465dbc4a 100644 --- a/pkg/stanza/operator/parser/json/config.go +++ b/pkg/stanza/operator/parser/json/config.go @@ -31,6 +31,8 @@ func NewConfigWithID(operatorID string) *Config { // Config is the configuration of a JSON parser operator. type Config struct { helper.ParserConfig `mapstructure:",squash"` + + ParseInts bool `mapstructure:"parse_ints"` } // Build will build a JSON parser operator. @@ -42,5 +44,6 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error return &Parser{ ParserOperator: parserOperator, + parseInts: c.ParseInts, }, nil } diff --git a/pkg/stanza/operator/parser/json/config_test.go b/pkg/stanza/operator/parser/json/config_test.go index 1add671e730a..ef95d04dce74 100644 --- a/pkg/stanza/operator/parser/json/config_test.go +++ b/pkg/stanza/operator/parser/json/config_test.go @@ -110,6 +110,14 @@ func TestConfig(t *testing.T) { return p }(), }, + { + Name: "parse_ints", + Expect: func() *Config { + p := NewConfig() + p.ParseInts = true + return p + }(), + }, }, }.Run(t) } diff --git a/pkg/stanza/operator/parser/json/parser.go b/pkg/stanza/operator/parser/json/parser.go index 7d626f97a007..be9aad224c4a 100644 --- a/pkg/stanza/operator/parser/json/parser.go +++ b/pkg/stanza/operator/parser/json/parser.go @@ -6,6 +6,7 @@ package json // import "github.com/open-telemetry/opentelemetry-collector-contri import ( "context" "fmt" + "strings" "github.com/goccy/go-json" @@ -16,6 +17,8 @@ import ( // Parser is an operator that parses JSON. type Parser struct { helper.ParserOperator + + parseInts bool } // Process will parse an entry for JSON. @@ -28,12 +31,64 @@ func (p *Parser) parse(value any) (any, error) { var parsedValue map[string]any switch m := value.(type) { case string: - err := json.Unmarshal([]byte(m), &parsedValue) - if err != nil { - return nil, err + // when parseInts is disabled, `int` and `float` will be parsed as `float64`. + // when it is enabled, they will be parsed as `json.Number`, later the parser + // will convert them to `int` or `float64` according to the field type. + if p.parseInts { + d := json.NewDecoder(strings.NewReader(m)) + d.UseNumber() + err := d.Decode(&parsedValue) + if err != nil { + return nil, err + } + convertNumbers(parsedValue) + } else { + err := json.Unmarshal([]byte(m), &parsedValue) + if err != nil { + return nil, err + } } default: return nil, fmt.Errorf("type %T cannot be parsed as JSON", value) } + return parsedValue, nil } + +func convertNumbers(parsedValue map[string]any) { + for k, v := range parsedValue { + switch t := v.(type) { + case json.Number: + parsedValue[k] = convertNumber(t) + case map[string]any: + convertNumbers(t) + case []any: + convertNumbersArray(t) + } + } +} + +func convertNumbersArray(arr []any) { + for i, v := range arr { + switch t := v.(type) { + case json.Number: + arr[i] = convertNumber(t) + case map[string]any: + convertNumbers(t) + case []any: + convertNumbersArray(t) + } + } +} + +func convertNumber(value json.Number) any { + i64, err := value.Int64() + if err == nil { + return i64 + } + f64, err := value.Float64() + if err == nil { + return f64 + } + return value.String() +} diff --git a/pkg/stanza/operator/parser/json/parser_test.go b/pkg/stanza/operator/parser/json/parser_test.go index f27d1fdd8113..f9efe32a01c2 100644 --- a/pkg/stanza/operator/parser/json/parser_test.go +++ b/pkg/stanza/operator/parser/json/parser_test.go @@ -141,6 +141,116 @@ func TestParser(t *testing.T) { ScopeName: "logger", }, }, + { + "parse_ints_disabled", + func(_ *Config) {}, + &entry.Entry{ + Body: `{"int":1,"float":1.0}`, + }, + &entry.Entry{ + Attributes: map[string]any{ + "int": float64(1), + "float": float64(1), + }, + Body: `{"int":1,"float":1.0}`, + }, + }, + { + "parse_ints_simple", + func(p *Config) { + p.ParseInts = true + }, + &entry.Entry{ + Body: `{"int":1,"float":1.0}`, + }, + &entry.Entry{ + Attributes: map[string]any{ + "int": int64(1), + "float": float64(1), + }, + Body: `{"int":1,"float":1.0}`, + }, + }, + { + "parse_ints_nested", + func(p *Config) { + p.ParseInts = true + }, + &entry.Entry{ + Body: `{"int":1,"float":1.0,"nested":{"int":2,"float":2.0}}`, + }, + &entry.Entry{ + Attributes: map[string]any{ + "int": int64(1), + "float": float64(1), + "nested": map[string]any{ + "int": int64(2), + "float": float64(2), + }, + }, + Body: `{"int":1,"float":1.0,"nested":{"int":2,"float":2.0}}`, + }, + }, + { + "parse_ints_arrays", + func(p *Config) { + p.ParseInts = true + }, + &entry.Entry{ + Body: `{"int":1,"float":1.0,"nested":{"int":2,"float":2.0},"array":[1,2]}`, + }, + &entry.Entry{ + Attributes: map[string]any{ + "int": int64(1), + "float": float64(1), + "nested": map[string]any{ + "int": int64(2), + "float": float64(2), + }, + "array": []any{int64(1), int64(2)}, + }, + Body: `{"int":1,"float":1.0,"nested":{"int":2,"float":2.0},"array":[1,2]}`, + }, + }, + { + "parse_ints_mixed_arrays", + func(p *Config) { + p.ParseInts = true + }, + &entry.Entry{ + Body: `{"int":1,"float":1.0,"mixed_array":[1,1.5,2]}`, + }, + &entry.Entry{ + Attributes: map[string]any{ + "int": int64(1), + "float": float64(1), + "mixed_array": []any{int64(1), float64(1.5), int64(2)}, + }, + Body: `{"int":1,"float":1.0,"mixed_array":[1,1.5,2]}`, + }, + }, + { + "parse_ints_nested_arrays", + func(p *Config) { + p.ParseInts = true + }, + &entry.Entry{ + Body: `{"int":1,"float":1.0,"nested":{"int":2,"float":2.0,"array":[1,2]},"array":[3,4]}`, + }, + &entry.Entry{ + Attributes: map[string]any{ + "int": int64(1), + "float": float64(1), + "nested": map[string]any{ + "int": int64(2), + "float": float64(2), + "array": []any{int64(1), int64(2)}, + }, + "array": []any{int64(3), int64(4)}, + }, + Body: `{"int":1,"float":1.0,"nested":{"int":2,"float":2.0,"array":[1,2]},"array":[3,4]}`, + }, + }, } for _, tc := range cases { @@ -176,6 +286,23 @@ func BenchmarkProcess(b *testing.B) { parser, err := cfg.Build(componenttest.NewNopTelemetrySettings()) require.NoError(b, err) + benchmarkOperator(b, parser) +} + +func BenchmarkProcessParseInts(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + cfg := NewConfig() + cfg.ParseInts = true + + parser, err := cfg.Build(componenttest.NewNopTelemetrySettings()) + require.NoError(b, err) + + benchmarkOperator(b, parser) +} + +func benchmarkOperator(b *testing.B, parser operator.Operator) { body, err := os.ReadFile(filepath.Join("testdata", "testdata.json")) require.NoError(b, err) diff --git a/pkg/stanza/operator/parser/json/testdata/config.yaml b/pkg/stanza/operator/parser/json/testdata/config.yaml index 4de9e105952f..8b2bbb4f2aa0 100644 --- a/pkg/stanza/operator/parser/json/testdata/config.yaml +++ b/pkg/stanza/operator/parser/json/testdata/config.yaml @@ -37,3 +37,6 @@ timestamp: parse_from: body.timestamp_field layout_type: strptime layout: '%Y-%m-%d' +parse_ints: + type: json_parser + parse_ints: true