Skip to content

Commit

Permalink
resolved comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Sumeet Rai committed Apr 16, 2024
1 parent b76f798 commit b2006c3
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 15 deletions.
6 changes: 3 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Agent struct {
retrier *retrier
stopOnSinkError bool
timerFn TimerFn
batchSize int
sinkBatchSize int
}

// NewAgent returns an Agent with plugin factories.
Expand All @@ -52,7 +52,7 @@ func NewAgent(config Config) *Agent {
logger: config.Logger,
retrier: retrier,
timerFn: timerFn,
batchSize: config.BatchSize,
sinkBatchSize: config.SinkBatchSize,
}
}

Expand Down Expand Up @@ -313,7 +313,7 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s

r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipeName)
return nil
}, r.batchSize)
}, r.sinkBatchSize)

stream.onClose(func() {
if err := sink.Close(); err != nil {
Expand Down
8 changes: 2 additions & 6 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,11 +728,6 @@ func TestAgentRun(t *testing.T) {
data := []models.Record{
models.NewRecord(&v1beta2.Asset{}),
}
batchedData := []models.Record{
models.NewRecord(&v1beta2.Asset{}),
models.NewRecord(&v1beta2.Asset{}),
models.NewRecord(&v1beta2.Asset{}),
}

extr := mocks.NewExtractor()
extr.SetEmit(data)
Expand All @@ -755,7 +750,7 @@ func TestAgentRun(t *testing.T) {

sink := mocks.NewSink()
sink.On("Init", mockCtx, buildPluginConfig(validRecipe.Sinks[0])).Return(nil).Once()
sink.On("Sink", mockCtx, batchedData).Return(nil)
sink.On("Sink", mockCtx, data).Return(nil)
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)

Expand All @@ -777,6 +772,7 @@ func TestAgentRun(t *testing.T) {
Monitor: monitor,
MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
SinkBatchSize: 1,
})
run := r.Run(ctx, validRecipe)
assert.NoError(t, run.Error)
Expand Down
2 changes: 1 addition & 1 deletion agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ type Config struct {
RetryInitialInterval time.Duration
StopOnSinkError bool
TimerFn TimerFn
BatchSize int
SinkBatchSize int
}
2 changes: 1 addition & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func RunCmd() *cobra.Command {
MaxRetries: cfg.MaxRetries,
RetryInitialInterval: time.Duration(cfg.RetryInitialIntervalSeconds) * time.Second,
StopOnSinkError: cfg.StopOnSinkError,
BatchSize: cfg.BatchSize,
SinkBatchSize: cfg.SinkBatchSize,
})

recipes, err := recipe.NewReader(lg, pathToConfig).Read(args[0])
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Config struct {
OtelEnabled bool `mapstructure:"OTEL_ENABLED" default:"false"`
OtelCollectorAddr string `mapstructure:"OTEL_COLLECTOR_ADDR" default:"localhost:4317"`
OtelTraceSampleProbability float64 `mapstructure:"OTEL_TRACE_SAMPLE_PROBABILITY" default:"1"`
BatchSize int `mapstructure:"BATCH_SIZE" default:"1"`
SinkBatchSize int `mapstructure:"SINK_BATCH_SIZE" default:"1"`
}

func Load(configFile string) (Config, error) {
Expand Down
4 changes: 2 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestLoad(t *testing.T) {
MaxRetries: 5,
RetryInitialIntervalSeconds: 5,
StopOnSinkError: false,
BatchSize: 1,
SinkBatchSize: 1,
},
},
{
Expand All @@ -47,7 +47,7 @@ func TestLoad(t *testing.T) {
OtelTraceSampleProbability: 1,
MaxRetries: 5,
RetryInitialIntervalSeconds: 5,
BatchSize: 1,
SinkBatchSize: 1,
},
expectedErr: "",
},
Expand Down
2 changes: 1 addition & 1 deletion config/meteor.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ APP_NAME: meteor
OTEL_ENABLED: false
OTEL_COLLECTOR_ADDR: "localhost:4317"
OTEL_TRACE_SAMPLE_PROBABILITY: 1
BATCH_SIZE: 10
SINK_BATCH_SIZE: 10

0 comments on commit b2006c3

Please sign in to comment.