Skip to content

Commit

Permalink
Refactor: StatsD code removal (#48)
Browse files Browse the repository at this point in the history
* removed redundant statsd code from meteor

* fixed a unit test case

* refactored in attempt to resolve lint error

* reverting to keep a list of monitors for extensibility

* formatting to keep older code as it was

* recordPlugin method mock method call not needed now

* removed list as only otel is being supported now

---------

Co-authored-by: Sumeet Kumar Rai <[email protected]>
  • Loading branch information
sumslim and Sumeet Kumar Rai authored Dec 15, 2023
1 parent a147080 commit 4bf1b95
Show file tree
Hide file tree
Showing 13 changed files with 38 additions and 419 deletions.
13 changes: 5 additions & 8 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Agent struct {
extractorFactory *registry.ExtractorFactory
processorFactory *registry.ProcessorFactory
sinkFactory *registry.SinkFactory
monitor []Monitor
monitor Monitor
logger log.Logger
retrier *retrier
stopOnSinkError bool
Expand Down Expand Up @@ -279,8 +279,8 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
}

retryNotification := func(e error, d time.Duration) {
for _, mt := range r.monitor {
mt.RecordSinkRetryCount(ctx, pluginInfo)
if r.monitor != nil {
r.monitor.RecordSinkRetryCount(ctx, pluginInfo)
}

r.logger.Warn(
Expand All @@ -302,9 +302,6 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
)

pluginInfo.Success = err == nil
for _, mt := range r.monitor {
mt.RecordPlugin(ctx, pluginInfo) // this can be deleted when statsd is removed
}
if err != nil {
// once it reaches here, it means that the retry has been exhausted and still got error
r.logger.Error("error running sink", "sink", sr.Name, "error", err.Error())
Expand All @@ -328,8 +325,8 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
}

func (r *Agent) logAndRecordMetrics(ctx context.Context, run Run) {
for _, monitor := range r.monitor {
monitor.RecordRun(ctx, run)
if r.monitor != nil {
r.monitor.RecordRun(ctx, run)
}

if run.Success {
Expand Down
58 changes: 29 additions & 29 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: registry.NewProcessorFactory(),
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: registry.NewSinkFactory(),
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -441,7 +441,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
Expand Down Expand Up @@ -483,15 +483,15 @@ func TestAgentRun(t *testing.T) {

monitor := newMockMonitor()
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once()
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
defer monitor.AssertExpectations(t)

r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.True(t, run.Success)
Expand Down Expand Up @@ -533,7 +533,7 @@ func TestAgentRun(t *testing.T) {

monitor := newMockMonitor()
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once()
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
defer monitor.AssertExpectations(t)

r := agent.NewAgent(agent.Config{
Expand All @@ -542,7 +542,7 @@ func TestAgentRun(t *testing.T) {
SinkFactory: sf,
Logger: utils.Logger,
StopOnSinkError: true,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})

run := r.Run(ctx, validRecipe)
Expand Down Expand Up @@ -585,7 +585,7 @@ func TestAgentRun(t *testing.T) {

monitor := newMockMonitor()
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once()
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
defer monitor.AssertExpectations(t)

r := agent.NewAgent(agent.Config{
Expand All @@ -594,7 +594,7 @@ func TestAgentRun(t *testing.T) {
SinkFactory: sf,
Logger: utils.Logger,
StopOnSinkError: false,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})

run := r.Run(ctx, validRecipe)
Expand Down Expand Up @@ -649,15 +649,15 @@ func TestAgentRun(t *testing.T) {

monitor := newMockMonitor()
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once()
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
defer monitor.AssertExpectations(t)

r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
run := r.Run(ctx, validRecipe)
assert.NoError(t, run.Error)
Expand Down Expand Up @@ -706,14 +706,14 @@ func TestAgentRun(t *testing.T) {

monitor := newMockMonitor()
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once()
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
defer monitor.AssertExpectations(t)

r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
Logger: utils.Logger,
TimerFn: timerFn,
})
Expand Down Expand Up @@ -761,15 +761,15 @@ func TestAgentRun(t *testing.T) {

monitor := newMockMonitor()
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once()
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
defer monitor.AssertExpectations(t)

r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
})
Expand Down Expand Up @@ -815,7 +815,7 @@ func TestAgentRun(t *testing.T) {

monitor := newMockMonitor()
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run")).Once()
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
monitor.On("RecordSinkRetryCount", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
defer monitor.AssertExpectations(t)

Expand All @@ -824,7 +824,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
MaxRetries: 2, // need to retry "at least" 2 times since Sink returns RetryError twice
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
})
Expand Down Expand Up @@ -875,7 +875,7 @@ func TestAgentRun(t *testing.T) {

monitor := newMockMonitor()
monitor.On("RecordRun", utils.OfTypeContext(), mock.AnythingOfType("agent.Run")).Once()
monitor.On("RecordPlugin", utils.OfTypeContext(), mock.AnythingOfType("agent.PluginInfo"))
monitor.On("RecordPlugin", utils.OfTypeContext(), mock.AnythingOfType("agent.PluginInfo")).Maybe()
monitor.On("RecordSinkRetryCount", utils.OfTypeContext(), mock.AnythingOfType("agent.PluginInfo")).Maybe()
defer monitor.AssertExpectations(t)

Expand All @@ -884,7 +884,7 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
MaxRetries: 5,
RetryInitialInterval: 10 * time.Second,
})
Expand Down Expand Up @@ -1057,15 +1057,15 @@ func TestAgentRunMultiple(t *testing.T) {

monitor := newMockMonitor()
monitor.On("RecordRun", mockCtx, mock.AnythingOfType("agent.Run"))
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo"))
monitor.On("RecordPlugin", mockCtx, mock.AnythingOfType("agent.PluginInfo")).Maybe()
defer monitor.AssertExpectations(t)

r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{monitor},
Monitor: monitor,
})
runs := r.RunMultiple(ctx, recipeList)

Expand Down Expand Up @@ -1152,7 +1152,7 @@ func TestValidate(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: []agent.Monitor{newMockMonitor()},
Monitor: newMockMonitor(),
})

var expectedErrs []error
Expand Down
2 changes: 1 addition & 1 deletion agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Config struct {
ExtractorFactory *registry.ExtractorFactory
ProcessorFactory *registry.ProcessorFactory
SinkFactory *registry.SinkFactory
Monitor []Monitor
Monitor Monitor
Logger log.Logger
MaxRetries int
RetryInitialInterval time.Duration
Expand Down
21 changes: 2 additions & 19 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,7 @@ func RunCmd() *cobra.Command {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

var mts []agent.Monitor
if cfg.StatsdEnabled {
mt, err := newStatsdMonitor(cfg)
if err != nil {
return err
}

mts = append(mts, mt)

}
var mts agent.Monitor

if cfg.OtelEnabled {
doneOtlp, err := metrics.InitOtel(ctx, cfg, lg, Version)
Expand All @@ -88,7 +79,7 @@ func RunCmd() *cobra.Command {
}
defer doneOtlp()

mts = append(mts, metrics.NewOtelMonitor())
mts = metrics.NewOtelMonitor()
}

runner := agent.NewAgent(agent.Config{
Expand Down Expand Up @@ -156,11 +147,3 @@ func RunCmd() *cobra.Command {

return cmd
}

func newStatsdMonitor(cfg config.Config) (*metrics.StatsdMonitor, error) {
client, err := metrics.NewStatsdClient(cfg.StatsdHost)
if err != nil {
return nil, err
}
return metrics.NewStatsdMonitor(client, cfg.AppName), nil
}
2 changes: 0 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ type Config struct {
MaxRetries int `mapstructure:"MAX_RETRIES" default:"5"`
RetryInitialIntervalSeconds int `mapstructure:"RETRY_INITIAL_INTERVAL_SECONDS" default:"5"`
StopOnSinkError bool `mapstructure:"STOP_ON_SINK_ERROR" default:"false"`
StatsdEnabled bool `mapstructure:"STATSD_ENABLED" default:"false"`
StatsdHost string `mapstructure:"STATSD_HOST" default:"localhost:8125"`
OtelEnabled bool `mapstructure:"OTEL_ENABLED" default:"false"`
OtelCollectorAddr string `mapstructure:"OTEL_COLLECTOR_ADDR" default:"localhost:4317"`
OtelTraceSampleProbability float64 `mapstructure:"OTEL_TRACE_SAMPLE_PROBABILITY" default:"1"`
Expand Down
4 changes: 0 additions & 4 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ func TestLoad(t *testing.T) {
expected: config.Config{
AppName: "meteor",
LogLevel: "info",
StatsdEnabled: false,
StatsdHost: "localhost:8125",
OtelEnabled: false,
OtelCollectorAddr: "localhost:4317",
OtelTraceSampleProbability: 1,
Expand All @@ -43,8 +41,6 @@ func TestLoad(t *testing.T) {
expected: config.Config{
AppName: "meteor",
LogLevel: "info",
StatsdEnabled: false,
StatsdHost: "localhost:8125",
OtelEnabled: false,
OtelCollectorAddr: "localhost:4317",
OtelTraceSampleProbability: 1,
Expand Down
2 changes: 0 additions & 2 deletions config/meteor.yaml.sample
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
LOG_LEVEL: info
STATSD_ENABLED: false
STATSD_HOST: "localhost:8125"
MAX_RETRIES: 5
RETRY_INITIAL_INTERVAL_SECONDS: 5
STOP_ON_SINK_ERROR: false
Expand Down
2 changes: 1 addition & 1 deletion config/testdata/invalid-config.yaml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
STATSD_ENABLED: not-a-boolean
OTEL_ENABLED: not-a-boolean
3 changes: 0 additions & 3 deletions config/testdata/valid-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
LOG_LEVEL: info
STATSD_ENABLED: false
STATSD_HOST: "localhost:8125"
STATSD_PREFIX: meteor
MAX_RETRIES: 5
RETRY_INITIAL_INTERVAL_SECONDS: 5
STOP_ON_SINK_ERROR: false
Loading

0 comments on commit 4bf1b95

Please sign in to comment.