Skip to content

Commit

Permalink
Add json_extract plugin (#590)
Browse files Browse the repository at this point in the history
* Add json_extract plugin

* Rework with jx

* Fix genFields for benchs

* gen-doc

* rework
  • Loading branch information
kirillov6 authored Mar 27, 2024
1 parent a35f6d6 commit e570031
Show file tree
Hide file tree
Showing 11 changed files with 445 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ TBD: throughput on production servers.

**Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [journalctl](plugin/input/journalctl/README.md), [k8s](plugin/input/k8s/README.md), [kafka](plugin/input/kafka/README.md)

**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [convert_utf8_bytes](plugin/action/convert_utf8_bytes/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [split](plugin/action/split/README.md), [throttle](plugin/action/throttle/README.md)
**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [convert_utf8_bytes](plugin/action/convert_utf8_bytes/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [json_extract](plugin/action/json_extract/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [split](plugin/action/split/README.md), [throttle](plugin/action/throttle/README.md)

**Output**: [clickhouse](plugin/output/clickhouse/README.md), [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [file](plugin/output/file/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [postgres](plugin/output/postgres/README.md), [s3](plugin/output/s3/README.md), [splunk](plugin/output/splunk/README.md), [stdout](plugin/output/stdout/README.md)

Expand Down
1 change: 1 addition & 0 deletions _sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
- [join_template](plugin/action/join_template/README.md)
- [json_decode](plugin/action/json_decode/README.md)
- [json_encode](plugin/action/json_encode/README.md)
- [json_extract](plugin/action/json_extract/README.md)
- [keep_fields](plugin/action/keep_fields/README.md)
- [mask](plugin/action/mask/README.md)
- [modify](plugin/action/modify/README.md)
Expand Down
1 change: 1 addition & 0 deletions cmd/file.d/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
_ "github.com/ozontech/file.d/plugin/action/join_template"
_ "github.com/ozontech/file.d/plugin/action/json_decode"
_ "github.com/ozontech/file.d/plugin/action/json_encode"
_ "github.com/ozontech/file.d/plugin/action/json_extract"
_ "github.com/ozontech/file.d/plugin/action/keep_fields"
_ "github.com/ozontech/file.d/plugin/action/mask"
_ "github.com/ozontech/file.d/plugin/action/modify"
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cespare/xxhash/v2 v2.2.0
github.com/euank/go-kmsg-parser v2.0.0+incompatible
github.com/go-faster/jx v1.1.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw=
github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw=
github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI=
github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY=
github.com/go-faster/jx v1.1.0 h1:ZsW3wD+snOdmTDy9eIVgQdjUpXRRV4rqW8NS3t+20bg=
github.com/go-faster/jx v1.1.0/go.mod h1:vKDNikrKoyUmpzaJ0OkIkRQClNHFX/nF3dnTJZb3skg=
github.com/go-ini/ini v1.62.0 h1:7VJT/ZXjzqSrvtraFp4ONq80hTcRQth1c9ZnQ3uNQvU=
github.com/go-ini/ini v1.62.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-jose/go-jose/v3 v3.0.0 h1:s6rrhirfEP/CGIoc6p+PZAeogN2SxKav6Wp7+dyMWVo=
Expand Down Expand Up @@ -468,6 +470,8 @@ golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20230116083435-1de6713980de h1:DBWn//IJw30uYCgERoxCg84hWtA97F4wMiKOIh00Uf0=
golang.org/x/exp v0.0.0-20230116083435-1de6713980de/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down
5 changes: 5 additions & 0 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,11 @@ It transforms `{"server":{"os":"linux","arch":"amd64"}}` into `{"server":"{\"os\


[More details...](plugin/action/json_encode/README.md)
## json_extract
It extracts a field from JSON-encoded event field and adds extracted field to the event root.
> If extracted field already exists in the event root, it will be overridden.

[More details...](plugin/action/json_extract/README.md)
## keep_fields
It keeps the list of the event fields and removes others.

Expand Down
5 changes: 5 additions & 0 deletions plugin/action/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ It transforms `{"server":{"os":"linux","arch":"amd64"}}` into `{"server":"{\"os\


[More details...](plugin/action/json_encode/README.md)
## json_extract
It extracts a field from JSON-encoded event field and adds extracted field to the event root.
> If extracted field already exists in the event root, it will be overridden.

[More details...](plugin/action/json_extract/README.md)
## keep_fields
It keeps the list of the event fields and removes others.

Expand Down
11 changes: 11 additions & 0 deletions plugin/action/json_extract/README.idoc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# JSON extract plugin
@introduction

### Examples
@examples

### Benchmarks
@benchmarks

### Config params
@config-params|description
58 changes: 58 additions & 0 deletions plugin/action/json_extract/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# JSON extract plugin
It extracts a field from JSON-encoded event field and adds extracted field to the event root.
> If extracted field already exists in the event root, it will be overridden.
### Examples
```yaml
pipelines:
example_pipeline:
...
actions:
- type: json_extract
field: log
extract_field: error.code
...
```
The original event:
```json
{
"log": "{\"level\":\"error\",\"message\":\"error occurred\",\"service\":\"my-service\",\"error\":{\"code\":2,\"args\":[]}}",
"time": "2024-03-01T10:49:28.263317941Z"
}
```
The resulting event:
```json
{
"log": "{\"level\":\"error\",\"message\":\"error occurred\",\"service\":\"my-service\",\"error\":{\"code\":2,\"args\":[]}}",
"time": "2024-03-01T10:49:28.263317941Z",
"code": 2
}
```

### Benchmarks
Performance comparison of `json_extract` and `json_decode` plugins.
`json_extract` on average 3 times faster than `json_decode`.

| json (length) | json_extract (time ns) | json_decode (time ns) |
|---------------|------------------------|-----------------------|
| 129 | 33 | 176 |
| 309 | 264 | 520 |
| 2109 | 2263 | 6778 |
| 10909 | 11289 | 32205 |
| 21909 | 23277 | 62819 |

### Config params
**`field`** *`cfg.FieldSelector`* *`required`*

The event field from which to extract. Must be a string.

<br>

**`extract_field`** *`cfg.FieldSelector`* *`required`*

Field to extract.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
167 changes: 167 additions & 0 deletions plugin/action/json_extract/json_extract.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package json_extract

import (
"bytes"

"github.com/go-faster/jx"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/pipeline"
insaneJSON "github.com/vitkovskii/insane-json"
)

/*{ introduction
It extracts a field from JSON-encoded event field and adds extracted field to the event root.
> If extracted field already exists in the event root, it will be overridden.
}*/

/*{ examples
```yaml
pipelines:
example_pipeline:
...
actions:
- type: json_extract
field: log
extract_field: error.code
...
```
The original event:
```json
{
"log": "{\"level\":\"error\",\"message\":\"error occurred\",\"service\":\"my-service\",\"error\":{\"code\":2,\"args\":[]}}",
"time": "2024-03-01T10:49:28.263317941Z"
}
```
The resulting event:
```json
{
"log": "{\"level\":\"error\",\"message\":\"error occurred\",\"service\":\"my-service\",\"error\":{\"code\":2,\"args\":[]}}",
"time": "2024-03-01T10:49:28.263317941Z",
"code": 2
}
```
}*/

/*{ benchmarks
Performance comparison of `json_extract` and `json_decode` plugins.
`json_extract` on average 3 times faster than `json_decode`.
| json (length) | json_extract (time ns) | json_decode (time ns) |
|---------------|------------------------|-----------------------|
| 129 | 33 | 176 |
| 309 | 264 | 520 |
| 2109 | 2263 | 6778 |
| 10909 | 11289 | 32205 |
| 21909 | 23277 | 62819 |
}*/

type Plugin struct {
config *Config
decoder *jx.Decoder
}

// ! config-params
// ^ config-params
type Config struct {
// > @3@4@5@6
// >
// > The event field from which to extract. Must be a string.
Field cfg.FieldSelector `json:"field" parse:"selector" required:"true"` // *
Field_ []string

// > @3@4@5@6
// >
// > Field to extract.
ExtractField cfg.FieldSelector `json:"extract_field" parse:"selector" required:"true"` // *
ExtractField_ []string
}

func init() {
fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{
Type: "json_extract",
Factory: factory,
})
}

func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
return &Plugin{}, &Config{}
}

func (p *Plugin) Start(config pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
p.config = config.(*Config)
p.decoder = &jx.Decoder{}
}

func (p *Plugin) Stop() {}

func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
jsonNode := event.Root.Dig(p.config.Field_...)
if jsonNode == nil {
return pipeline.ActionPass
}

p.decoder.ResetBytes(jsonNode.AsBytes())
extract(event.Root, p.decoder, p.config.ExtractField_, 0, false)
return pipeline.ActionPass
}

// extract extracts field from decoder and adds it to the root.
// `skipAddField` flag is required for proper benchmarking.
func extract(root *insaneJSON.Root, d *jx.Decoder, field []string, depth int, skipAddField bool) {
objIter, err := d.ObjIter()
if err != nil {
return
}

for objIter.Next() {
if bytes.Equal(objIter.Key(), pipeline.StringToByteUnsafe(field[depth])) {
if depth == len(field)-1 { // add field
if skipAddField {
_ = d.Skip()
} else {
addField(root, field[depth], d)
}
} else { // go deep
raw, err := d.Raw()
if err != nil {
break
}
d.ResetBytes(raw)
extract(root, d, field, depth+1, skipAddField)
}
break
} else if err = d.Skip(); err != nil {
break
}
}
}

func addField(root *insaneJSON.Root, field string, d *jx.Decoder) {
switch d.Next() {
case jx.Number:
num, _ := d.Num()
intVal, err := num.Int64()
if err == nil {
root.AddFieldNoAlloc(root, field).MutateToInt64(intVal)
} else {
floatVal, err := num.Float64()
if err == nil {
root.AddFieldNoAlloc(root, field).MutateToFloat(floatVal)
}
}
case jx.String:
s, _ := d.StrBytes()
root.AddFieldNoAlloc(root, field).MutateToBytesCopy(root, s)
case jx.Null:
root.AddFieldNoAlloc(root, field).MutateToNull()
case jx.Bool:
b, _ := d.Bool()
root.AddFieldNoAlloc(root, field).MutateToBool(b)
case jx.Object, jx.Array:
raw, _ := d.Raw()
root.AddFieldNoAlloc(root, field).MutateToJSON(root, raw.String())
default:
_ = d.Skip()
}
}
Loading

0 comments on commit e570031

Please sign in to comment.