diff --git a/agent/agent.go b/agent/agent.go index 27f8f1d0..c0fce8e5 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -17,8 +17,6 @@ import ( "github.com/pkg/errors" ) -const defaultBatchSize = 1 - // TimerFn of function type type TimerFn func() func() int @@ -32,6 +30,7 @@ type Agent struct { retrier *retrier stopOnSinkError bool timerFn TimerFn + sinkBatchSize int } // NewAgent returns an Agent with plugin factories. @@ -53,6 +52,7 @@ func NewAgent(config Config) *Agent { logger: config.Logger, retrier: retrier, timerFn: timerFn, + sinkBatchSize: config.SinkBatchSize, } } @@ -313,7 +313,7 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipeName) return nil - }, defaultBatchSize) + }, r.sinkBatchSize) stream.onClose(func() { if err := sink.Close(); err != nil { diff --git a/agent/agent_test.go b/agent/agent_test.go index fef9434e..a77ab7ea 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -772,6 +772,7 @@ func TestAgentRun(t *testing.T) { Monitor: monitor, MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time + SinkBatchSize: 1, }) run := r.Run(ctx, validRecipe) assert.NoError(t, run.Error) diff --git a/agent/config.go b/agent/config.go index 1b6e8caf..c7d6d9b7 100644 --- a/agent/config.go +++ b/agent/config.go @@ -17,4 +17,5 @@ type Config struct { RetryInitialInterval time.Duration StopOnSinkError bool TimerFn TimerFn + SinkBatchSize int } diff --git a/cmd/run.go b/cmd/run.go index 479bcd13..9d5b1386 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -91,6 +91,7 @@ func RunCmd() *cobra.Command { MaxRetries: cfg.MaxRetries, RetryInitialInterval: time.Duration(cfg.RetryInitialIntervalSeconds) * time.Second, StopOnSinkError: cfg.StopOnSinkError, + SinkBatchSize: cfg.SinkBatchSize, }) recipes, err := recipe.NewReader(lg, pathToConfig).Read(args[0]) diff --git a/config/config.go b/config/config.go index edd7892f..2a511e33 100644 --- a/config/config.go +++ b/config/config.go @@ -17,6 +17,7 @@ type Config struct { OtelEnabled bool `mapstructure:"OTEL_ENABLED" default:"false"` OtelCollectorAddr string `mapstructure:"OTEL_COLLECTOR_ADDR" default:"localhost:4317"` OtelTraceSampleProbability float64 `mapstructure:"OTEL_TRACE_SAMPLE_PROBABILITY" default:"1"` + SinkBatchSize int `mapstructure:"SINK_BATCH_SIZE" default:"1"` } func Load(configFile string) (Config, error) { diff --git a/config/config_test.go b/config/config_test.go index 56d28055..e062b169 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -31,6 +31,7 @@ func TestLoad(t *testing.T) { MaxRetries: 5, RetryInitialIntervalSeconds: 5, StopOnSinkError: false, + SinkBatchSize: 1, }, }, { @@ -46,6 +47,7 @@ func TestLoad(t *testing.T) { OtelTraceSampleProbability: 1, MaxRetries: 5, RetryInitialIntervalSeconds: 5, + SinkBatchSize: 1, }, expectedErr: "", }, diff --git a/config/meteor.yaml.sample b/config/meteor.yaml.sample index f660860b..f811df6f 100644 --- a/config/meteor.yaml.sample +++ b/config/meteor.yaml.sample @@ -5,4 +5,5 @@ STOP_ON_SINK_ERROR: false APP_NAME: meteor OTEL_ENABLED: false OTEL_COLLECTOR_ADDR: "localhost:4317" -OTEL_TRACE_SAMPLE_PROBABILITY: 1 \ No newline at end of file +OTEL_TRACE_SAMPLE_PROBABILITY: 1 +SINK_BATCH_SIZE: 10 \ No newline at end of file diff --git a/go.mod b/go.mod index 97ba9245..4e91b103 100644 --- a/go.mod +++ b/go.mod @@ -60,6 +60,7 @@ require ( go.opentelemetry.io/otel/sdk v1.16.0 go.opentelemetry.io/otel/sdk/metric v0.39.0 golang.org/x/oauth2 v0.6.0 + golang.org/x/sync v0.1.0 google.golang.org/api v0.114.0 google.golang.org/genproto v0.0.0-20230330154414-c0448cd141ea google.golang.org/grpc v1.55.0 @@ -221,7 +222,6 @@ require ( golang.org/x/crypto v0.8.0 // indirect golang.org/x/mod v0.9.0 // indirect golang.org/x/net v0.9.0 // indirect - golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.8.0 // indirect golang.org/x/term v0.7.0 // indirect golang.org/x/text v0.9.0 // indirect