Skip to content

Commit

Permalink
refactor: use Transformer.ProcessBatchWith in parsers
Browse files Browse the repository at this point in the history
  • Loading branch information
andrzej-stencel committed Dec 16, 2024
1 parent d8704ab commit 5734fcc
Show file tree
Hide file tree
Showing 12 changed files with 12 additions and 70 deletions.
6 changes: 1 addition & 5 deletions pkg/stanza/operator/parser/container/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,7 @@ type Parser struct {
}

func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, p.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return p.ProcessBatchWith(ctx, entries, p.Process)
}

// Process will parse an entry of Container logs
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/parser/csv/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package csv // import "github.com/open-telemetry/opentelemetry-collector-contrib

import (
"context"
"errors"
"fmt"
"strings"

Expand All @@ -29,11 +28,7 @@ type Parser struct {
type parseFunc func(any) (any, error)

func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, p.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return p.ProcessBatchWith(ctx, entries, p.Process)
}

// Process will parse an entry for csv.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/parser/json/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package json // import "github.com/open-telemetry/opentelemetry-collector-contri

import (
"context"
"errors"
"fmt"
"strings"

Expand All @@ -23,11 +22,7 @@ type Parser struct {
}

func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, p.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return p.ProcessBatchWith(ctx, entries, p.Process)
}

// Process will parse an entry for JSON.
Expand Down
6 changes: 1 addition & 5 deletions pkg/stanza/operator/parser/jsonarray/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ type Parser struct {
type parseFunc func(any) (any, error)

func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, p.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return p.ProcessBatchWith(ctx, entries, p.Process)
}

// Process will parse an entry for json array.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/parser/keyvalue/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package keyvalue // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"context"
"errors"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/parseutils"
Expand All @@ -21,11 +20,7 @@ type Parser struct {
}

func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, p.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return p.ProcessBatchWith(ctx, entries, p.Process)
}

// Process will parse an entry for key value pairs.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/parser/regex/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package regex // import "github.com/open-telemetry/opentelemetry-collector-contr

import (
"context"
"errors"
"fmt"
"regexp"

Expand All @@ -28,11 +27,7 @@ func (p *Parser) Stop() error {
}

func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, p.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return p.ProcessBatchWith(ctx, entries, p.Process)
}

// Process will parse an entry for regex.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/parser/scope/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package scope // import "github.com/open-telemetry/opentelemetry-collector-contr

import (
"context"
"errors"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand All @@ -18,11 +17,7 @@ type Parser struct {
}

func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, p.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return p.ProcessBatchWith(ctx, entries, p.Process)
}

// Process will parse logger name from an entry.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/parser/severity/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package severity // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"context"
"errors"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand All @@ -18,11 +17,7 @@ type Parser struct {
}

func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, p.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return p.ProcessBatchWith(ctx, entries, p.Process)
}

// Process will parse severity from an entry.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/parser/syslog/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package syslog // import "github.com/open-telemetry/opentelemetry-collector-cont
import (
"bytes"
"context"
"errors"
"fmt"
"regexp"
"time"
Expand Down Expand Up @@ -38,11 +37,7 @@ type Parser struct {
}

func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, p.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return p.ProcessBatchWith(ctx, entries, p.Process)
}

// Process will parse an entry field as syslog.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/parser/time/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package time // import "github.com/open-telemetry/opentelemetry-collector-contri

import (
"context"
"errors"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand All @@ -18,11 +17,7 @@ type Parser struct {
}

func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, p.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return p.ProcessBatchWith(ctx, entries, p.Process)
}

// Process will parse time from an entry.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/parser/trace/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package trace // import "github.com/open-telemetry/opentelemetry-collector-contr

import (
"context"
"errors"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand All @@ -18,11 +17,7 @@ type Parser struct {
}

func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, p.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return p.ProcessBatchWith(ctx, entries, p.Process)
}

// Process will parse traces from an entry.
Expand Down
7 changes: 1 addition & 6 deletions pkg/stanza/operator/parser/uri/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package uri // import "github.com/open-telemetry/opentelemetry-collector-contrib

import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/collector/featuregate"
Expand Down Expand Up @@ -34,11 +33,7 @@ type Parser struct {
}

func (p *Parser) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
var errs []error
for i := range entries {
errs = append(errs, p.Process(ctx, entries[i]))
}
return errors.Join(errs...)
return p.ProcessBatchWith(ctx, entries, p.Process)
}

// Process will parse an entry.
Expand Down

0 comments on commit 5734fcc

Please sign in to comment.