Skip to content

Commit

Permalink
[libbeat][processor][append] Update the append processor (elastic#33731)
Browse files Browse the repository at this point in the history
* Updated the name of the processor. Resolved the issues in the docs. Removed TODO from code.

Co-authored-by: Craig MacKenzie <[email protected]>
  • Loading branch information
vinit-chauhan and cmacknz authored Nov 29, 2022
1 parent 678d5b0 commit ad5cbba
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 12 deletions.
14 changes: 7 additions & 7 deletions libbeat/processors/actions/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@ type appendProcessorConfig struct {
IgnoreMissing bool `config:"ignore_missing"`
IgnoreEmptyValues bool `config:"ignore_empty_values"`
FailOnError bool `config:"fail_on_error"`
AllowDuplicate bool `config:"allow_duplicate"` //TODO: Add functionality to remove duplicate
AllowDuplicate bool `config:"allow_duplicate"`
}

func init() {
processors.RegisterPlugin("append_processor",
processors.RegisterPlugin("append",
checks.ConfigChecked(NewAppendProcessor,
checks.RequireFields("target_field"),
),
)
jsprocessor.RegisterPlugin("AppendProcessor", NewAppendProcessor)
}

// NewAppendProcessor returns a new append_processor processor.
// NewAppendProcessor returns a new append processor.
func NewAppendProcessor(c *conf.C) (processors.Processor, error) {
config := appendProcessorConfig{
IgnoreMissing: false,
Expand All @@ -67,7 +67,7 @@ func NewAppendProcessor(c *conf.C) (processors.Processor, error) {

f := &appendProcessor{
config: config,
logger: logp.NewLogger("append_processor"),
logger: logp.NewLogger("append"),
}
return f, nil
}
Expand All @@ -80,12 +80,12 @@ func (f *appendProcessor) Run(event *beat.Event) (*beat.Event, error) {

err := f.appendValues(f.config.TargetField, f.config.Fields, f.config.Values, event)
if err != nil {
errMsg := fmt.Errorf("failed to append fields in append_processor processor: %w", err)
errMsg := fmt.Errorf("failed to append fields in append processor: %w", err)
f.logger.Debug(errMsg.Error())
if f.config.FailOnError {
event = backup
if _, err := event.PutValue("error.message", errMsg.Error()); err != nil {
return nil, fmt.Errorf("failed to append fields in append_processor processor: %w", err)
return nil, fmt.Errorf("failed to append fields in append processor: %w", err)
}
return event, err
}
Expand Down Expand Up @@ -152,7 +152,7 @@ func (f *appendProcessor) appendValues(target string, fields []string, values []
}

func (f *appendProcessor) String() string {
return "append_processor=" + fmt.Sprintf("%+v", f.config.TargetField)
return "append=" + fmt.Sprintf("%+v", f.config.TargetField)
}

// this function will remove all the empty strings and nil values from the array
Expand Down
24 changes: 23 additions & 1 deletion libbeat/processors/actions/append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,28 @@ func Test_appendProcessor_appendValues(t *testing.T) {
},
wantErr: false,
},
{
description: "processor with no fields or values",
args: args{
target: "target",
event: &beat.Event{
Meta: mapstr.M{},
Fields: mapstr.M{
"field": "value",
},
},
},
fields: fields{
logger: log,
config: appendProcessorConfig{
IgnoreEmptyValues: false,
IgnoreMissing: false,
AllowDuplicate: true,
FailOnError: true,
},
},
wantErr: false,
},
{
description: "append value in the arrays from an unknown field",
args: args{
Expand Down Expand Up @@ -361,7 +383,7 @@ func Test_appendProcessor_Run(t *testing.T) {
Meta: mapstr.M{},
Fields: mapstr.M{
"error": mapstr.M{
"message": "failed to append fields in append_processor processor: could not fetch value for key: missing-field, Error: key not found",
"message": "failed to append fields in append processor: could not fetch value for key: missing-field, Error: key not found",
},
},
},
Expand Down
8 changes: 4 additions & 4 deletions libbeat/processors/actions/docs/append.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
=== Append Processor

++++
<titleabbrev>append_processor</titleabbrev>
<titleabbrev>append</titleabbrev>
++++

The `append_processor` processor appends one or more values to an existing array if the target field already exists and it is an array. COnverts a scaler to an array and appends one or more values to it if the field exists and it is a scaler. Here the values can either be one or more static values or one or more values from the fields listed under 'fields' key.
The `append` processor appends one or more values to an existing array if the target field already exists and it is an array. Converts a scaler to an array and appends one or more values to it if the field exists and it is a scaler. Here the values can either be one or more static values or one or more values from the fields listed under 'fields' key.

`target_field`:: The field in which you want to append the data.
`fields`:: () List of fields from which you want to copy data from. If the value is of a concrete type it will be appended directly to the target.
`fields`:: (Optional) List of fields from which you want to copy data from. If the value is of a concrete type it will be appended directly to the target.
However, if the value is an array, all the elements of the array are pushed individually to the target field.
`values`:: (Optional) List of static values you want to append to target field.
`ignore_empty_values`:: (Optional) If set to `true`, all the `""` and `nil` are omitted from being appended to the target field.
Expand All @@ -29,7 +29,7 @@ processors:
- decode_json_fields:
fields: message
target: ""
- append_processor:
- append:
target_field: my-target-field
fields:
- concrete.field
Expand Down

0 comments on commit ad5cbba

Please sign in to comment.