Skip to content

Commit

Permalink
feat: added sink batch size config for sink concurrency (#57)
Browse files Browse the repository at this point in the history
* increased default batch size

* fixed unit test

* made sink batch size configurable

* resolved comments

---------

Co-authored-by: Sumeet Rai <[email protected]>
  • Loading branch information
sumslim and Sumeet Rai authored Apr 18, 2024
1 parent 18820c3 commit 8f3ed09
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 5 deletions.
6 changes: 3 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"github.com/pkg/errors"
)

const defaultBatchSize = 1

// TimerFn of function type
type TimerFn func() func() int

Expand All @@ -32,6 +30,7 @@ type Agent struct {
retrier *retrier
stopOnSinkError bool
timerFn TimerFn
sinkBatchSize int
}

// NewAgent returns an Agent with plugin factories.
Expand All @@ -53,6 +52,7 @@ func NewAgent(config Config) *Agent {
logger: config.Logger,
retrier: retrier,
timerFn: timerFn,
sinkBatchSize: config.SinkBatchSize,
}
}

Expand Down Expand Up @@ -313,7 +313,7 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s

r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipeName)
return nil
}, defaultBatchSize)
}, r.sinkBatchSize)

stream.onClose(func() {
if err := sink.Close(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ func TestAgentRun(t *testing.T) {
Monitor: monitor,
MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
SinkBatchSize: 1,
})
run := r.Run(ctx, validRecipe)
assert.NoError(t, run.Error)
Expand Down
1 change: 1 addition & 0 deletions agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ type Config struct {
RetryInitialInterval time.Duration
StopOnSinkError bool
TimerFn TimerFn
SinkBatchSize int
}
1 change: 1 addition & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func RunCmd() *cobra.Command {
MaxRetries: cfg.MaxRetries,
RetryInitialInterval: time.Duration(cfg.RetryInitialIntervalSeconds) * time.Second,
StopOnSinkError: cfg.StopOnSinkError,
SinkBatchSize: cfg.SinkBatchSize,
})

recipes, err := recipe.NewReader(lg, pathToConfig).Read(args[0])
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config struct {
OtelEnabled bool `mapstructure:"OTEL_ENABLED" default:"false"`
OtelCollectorAddr string `mapstructure:"OTEL_COLLECTOR_ADDR" default:"localhost:4317"`
OtelTraceSampleProbability float64 `mapstructure:"OTEL_TRACE_SAMPLE_PROBABILITY" default:"1"`
SinkBatchSize int `mapstructure:"SINK_BATCH_SIZE" default:"1"`
}

func Load(configFile string) (Config, error) {
Expand Down
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestLoad(t *testing.T) {
MaxRetries: 5,
RetryInitialIntervalSeconds: 5,
StopOnSinkError: false,
SinkBatchSize: 1,
},
},
{
Expand All @@ -46,6 +47,7 @@ func TestLoad(t *testing.T) {
OtelTraceSampleProbability: 1,
MaxRetries: 5,
RetryInitialIntervalSeconds: 5,
SinkBatchSize: 1,
},
expectedErr: "",
},
Expand Down
3 changes: 2 additions & 1 deletion config/meteor.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ STOP_ON_SINK_ERROR: false
APP_NAME: meteor
OTEL_ENABLED: false
OTEL_COLLECTOR_ADDR: "localhost:4317"
OTEL_TRACE_SAMPLE_PROBABILITY: 1
OTEL_TRACE_SAMPLE_PROBABILITY: 1
SINK_BATCH_SIZE: 10
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ require (
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/sdk/metric v0.39.0
golang.org/x/oauth2 v0.6.0
golang.org/x/sync v0.1.0
google.golang.org/api v0.114.0
google.golang.org/genproto v0.0.0-20230330154414-c0448cd141ea
google.golang.org/grpc v1.55.0
Expand Down Expand Up @@ -221,7 +222,6 @@ require (
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
Expand Down

0 comments on commit 8f3ed09

Please sign in to comment.