From 8f3ed09f6339b78d9e443aa282d6c780e9474f9a Mon Sep 17 00:00:00 2001 From: Sumeet Rai <43041062+sumslim@users.noreply.github.com> Date: Thu, 18 Apr 2024 17:06:55 +0530 Subject: [PATCH] feat: added sink batch size config for sink concurrency (#57) * increased default batch size * fixed unit test * made sink batch size configurable * resolved comments --------- Co-authored-by: Sumeet Rai --- agent/agent.go | 6 +++--- agent/agent_test.go | 1 + agent/config.go | 1 + cmd/run.go | 1 + config/config.go | 1 + config/config_test.go | 2 ++ config/meteor.yaml.sample | 3 ++- go.mod | 2 +- 8 files changed, 12 insertions(+), 5 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 27f8f1d00..c0fce8e50 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -17,8 +17,6 @@ import ( "github.com/pkg/errors" ) -const defaultBatchSize = 1 - // TimerFn of function type type TimerFn func() func() int @@ -32,6 +30,7 @@ type Agent struct { retrier *retrier stopOnSinkError bool timerFn TimerFn + sinkBatchSize int } // NewAgent returns an Agent with plugin factories. @@ -53,6 +52,7 @@ func NewAgent(config Config) *Agent { logger: config.Logger, retrier: retrier, timerFn: timerFn, + sinkBatchSize: config.SinkBatchSize, } } @@ -313,7 +313,7 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipeName) return nil - }, defaultBatchSize) + }, r.sinkBatchSize) stream.onClose(func() { if err := sink.Close(); err != nil { diff --git a/agent/agent_test.go b/agent/agent_test.go index fef9434ed..a77ab7eab 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -772,6 +772,7 @@ func TestAgentRun(t *testing.T) { Monitor: monitor, MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time + SinkBatchSize: 1, }) run := r.Run(ctx, validRecipe) assert.NoError(t, run.Error) diff --git a/agent/config.go b/agent/config.go index 1b6e8caf8..c7d6d9b7d 100644 --- a/agent/config.go +++ b/agent/config.go @@ -17,4 +17,5 @@ type Config struct { RetryInitialInterval time.Duration StopOnSinkError bool TimerFn TimerFn + SinkBatchSize int } diff --git a/cmd/run.go b/cmd/run.go index 479bcd133..9d5b1386d 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -91,6 +91,7 @@ func RunCmd() *cobra.Command { MaxRetries: cfg.MaxRetries, RetryInitialInterval: time.Duration(cfg.RetryInitialIntervalSeconds) * time.Second, StopOnSinkError: cfg.StopOnSinkError, + SinkBatchSize: cfg.SinkBatchSize, }) recipes, err := recipe.NewReader(lg, pathToConfig).Read(args[0]) diff --git a/config/config.go b/config/config.go index edd7892fc..2a511e337 100644 --- a/config/config.go +++ b/config/config.go @@ -17,6 +17,7 @@ type Config struct { OtelEnabled bool `mapstructure:"OTEL_ENABLED" default:"false"` OtelCollectorAddr string `mapstructure:"OTEL_COLLECTOR_ADDR" default:"localhost:4317"` OtelTraceSampleProbability float64 `mapstructure:"OTEL_TRACE_SAMPLE_PROBABILITY" default:"1"` + SinkBatchSize int `mapstructure:"SINK_BATCH_SIZE" default:"1"` } func Load(configFile string) (Config, error) { diff --git a/config/config_test.go b/config/config_test.go index 56d28055d..e062b169e 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -31,6 +31,7 @@ func TestLoad(t *testing.T) { MaxRetries: 5, RetryInitialIntervalSeconds: 5, StopOnSinkError: false, + SinkBatchSize: 1, }, }, { @@ -46,6 +47,7 @@ func TestLoad(t *testing.T) { OtelTraceSampleProbability: 1, MaxRetries: 5, RetryInitialIntervalSeconds: 5, + SinkBatchSize: 1, }, expectedErr: "", }, diff --git a/config/meteor.yaml.sample b/config/meteor.yaml.sample index f660860ba..f811df6f4 100644 --- a/config/meteor.yaml.sample +++ b/config/meteor.yaml.sample @@ -5,4 +5,5 @@ STOP_ON_SINK_ERROR: false APP_NAME: meteor OTEL_ENABLED: false OTEL_COLLECTOR_ADDR: "localhost:4317" -OTEL_TRACE_SAMPLE_PROBABILITY: 1 \ No newline at end of file +OTEL_TRACE_SAMPLE_PROBABILITY: 1 +SINK_BATCH_SIZE: 10 \ No newline at end of file diff --git a/go.mod b/go.mod index 97ba9245b..4e91b103d 100644 --- a/go.mod +++ b/go.mod @@ -60,6 +60,7 @@ require ( go.opentelemetry.io/otel/sdk v1.16.0 go.opentelemetry.io/otel/sdk/metric v0.39.0 golang.org/x/oauth2 v0.6.0 + golang.org/x/sync v0.1.0 google.golang.org/api v0.114.0 google.golang.org/genproto v0.0.0-20230330154414-c0448cd141ea google.golang.org/grpc v1.55.0 @@ -221,7 +222,6 @@ require ( golang.org/x/crypto v0.8.0 // indirect golang.org/x/mod v0.9.0 // indirect golang.org/x/net v0.9.0 // indirect - golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.8.0 // indirect golang.org/x/term v0.7.0 // indirect golang.org/x/text v0.9.0 // indirect