Skip to content

Commit

Permalink
refactor: introduce Transformer.ProcessBatchWith to reduce code dup…
Browse files Browse the repository at this point in the history
…lication
  • Loading branch information
andrzej-stencel committed Dec 16, 2024
1 parent de66483 commit d8704ab
Show file tree
Hide file tree
Showing 12 changed files with 23 additions and 65 deletions.
12 changes: 12 additions & 0 deletions pkg/stanza/operator/helper/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package helper // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"context"
goerrors "errors"
"fmt"

"github.com/expr-lang/expr/vm"
Expand Down Expand Up @@ -75,6 +76,14 @@ func (t *TransformerOperator) CanProcess() bool {
return true
}

func (t *TransformerOperator) ProcessBatchWith(ctx context.Context, entries []*entry.Entry, process ProcessFunction) error {
var errs []error
for i := range entries {
errs = append(errs, process(ctx, entries[i]))
}
return goerrors.Join(errs...)
}

// ProcessWith will process an entry with a transform function.
func (t *TransformerOperator) ProcessWith(ctx context.Context, entry *entry.Entry, transform TransformFunction) error {
// Short circuit if the "if" condition does not match
Expand Down Expand Up @@ -125,6 +134,9 @@ func (t *TransformerOperator) Skip(_ context.Context, entry *entry.Entry) (bool,
return !matches.(bool), nil
}

// ProcessFunction is a function that processes an entry.
type ProcessFunction = func(context.Context, *entry.Entry) error

// TransformFunction is function that transforms an entry.
type TransformFunction = func(*entry.Entry) error

Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/transformer/add/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package add // import "github.com/open-telemetry/opentelemetry-collector-contrib

import (
"context"
"errors"
"fmt"
"strings"

Expand All @@ -25,11 +24,7 @@ type Transformer struct {
}

func (t *Transformer) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, t.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return t.ProcessBatchWith(ctx, entries, t.Process)
}

// Process will process an entry with a add transformation.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/transformer/assignkeys/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package assignkeys // import "github.com/open-telemetry/opentelemetry-collector-

import (
"context"
"errors"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
Expand All @@ -19,11 +18,7 @@ type Transformer struct {
}

func (t *Transformer) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, t.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return t.ProcessBatchWith(ctx, entries, t.Process)
}

// Process will process an entry with AssignKeys transformation.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/transformer/copy/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package copy // import "github.com/open-telemetry/opentelemetry-collector-contri

import (
"context"
"errors"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
Expand All @@ -20,11 +19,7 @@ type Transformer struct {
}

func (t *Transformer) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, t.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return t.ProcessBatchWith(ctx, entries, t.Process)
}

// Process will process an entry with a copy transformation.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/transformer/filter/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package filter // import "github.com/open-telemetry/opentelemetry-collector-cont
import (
"context"
"crypto/rand"
"errors"
"math/big"

"github.com/expr-lang/expr/vm"
Expand All @@ -24,11 +23,7 @@ type Transformer struct {
}

func (t *Transformer) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, t.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return t.ProcessBatchWith(ctx, entries, t.Process)
}

// Process will drop incoming entries that match the filter expression
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/transformer/flatten/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package flatten // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"context"
goerrors "errors"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
Expand All @@ -25,11 +24,7 @@ type Transformer[T interface {
}

func (t *Transformer[T]) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, t.Process(ctx, entries[i]))
}
return goerrors.Join(errs...)
return t.ProcessBatchWith(ctx, entries, t.Process)
}

// Process will process an entry with a flatten transformation.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/transformer/move/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package move // import "github.com/open-telemetry/opentelemetry-collector-contri

import (
"context"
"errors"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
Expand All @@ -20,11 +19,7 @@ type Transformer struct {
}

func (t *Transformer) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, t.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return t.ProcessBatchWith(ctx, entries, t.Process)
}

// Process will process an entry with a move transformation.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/transformer/noop/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package noop // import "github.com/open-telemetry/opentelemetry-collector-contri

import (
"context"
"errors"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand All @@ -17,11 +16,7 @@ type Transformer struct {
}

func (t *Transformer) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, t.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return t.WriteBatch(ctx, entries)
}

// Process will forward the entry to the next output without any alterations.
Expand Down
6 changes: 1 addition & 5 deletions pkg/stanza/operator/transformer/recombine/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,7 @@ func (t *Transformer) Stop() error {
}

func (t *Transformer) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, t.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return t.ProcessBatchWith(ctx, entries, t.Process)
}

func (t *Transformer) Process(ctx context.Context, e *entry.Entry) error {
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/transformer/remove/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package remove // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"context"
"errors"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
Expand All @@ -19,11 +18,7 @@ type Transformer struct {
}

func (t *Transformer) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, t.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return t.ProcessBatchWith(ctx, entries, t.Process)
}

// Process will process an entry with a remove transformation.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/transformer/retain/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package retain // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"context"
"errors"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand All @@ -21,11 +20,7 @@ type Transformer struct {
}

func (t *Transformer) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, t.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return t.ProcessBatchWith(ctx, entries, t.Process)
}

// Process will process an entry with a retain transformation.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/transformer/unquote/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package unquote // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"context"
"errors"
"fmt"
"strconv"

Expand All @@ -20,11 +19,7 @@ type Transformer struct {
}

func (t *Transformer) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, t.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return t.ProcessBatchWith(ctx, entries, t.Process)
}

// Process will unquote a string
Expand Down

0 comments on commit d8704ab

Please sign in to comment.