diff --git a/etl/controller/pulsemaintainer.go b/etl/controller/pulsemaintainer.go index 80369888..67efb7f5 100644 --- a/etl/controller/pulsemaintainer.go +++ b/etl/controller/pulsemaintainer.go @@ -26,10 +26,9 @@ func (c *Controller) pulseMaintainer(ctx context.Context) { select { case <-ctx.Done(): return - default: - time.Sleep(time.Second * time.Duration(c.cfg.PulsePeriod)) + case <-time.After(time.Second * time.Duration(c.cfg.PulsePeriod)): + eraseJetDropRegister(ctx, c, log) } - eraseJetDropRegister(ctx, c, log) } } @@ -82,8 +81,7 @@ func (c *Controller) pulseSequence(ctx context.Context) { select { case <-ctx.Done(): return - default: - time.Sleep(time.Second * time.Duration(c.cfg.SequentialPeriod)) + case <-time.After(time.Second * time.Duration(c.cfg.SequentialPeriod)): } var err error var nextSequential models.Pulse diff --git a/etl/controller/pulsemaintainer_test.go b/etl/controller/pulsemaintainer_test.go index 908d8e49..f6ba5026 100644 --- a/etl/controller/pulsemaintainer_test.go +++ b/etl/controller/pulsemaintainer_test.go @@ -240,7 +240,7 @@ func TestController_pulseSequence_Start_NoMissedData(t *testing.T) { } func TestController_pulseMaintainer_Start_PulsesCompleteAndNot(t *testing.T) { - var cfg = configuration.Controller{PulsePeriod: 0, ReloadPeriod: 10, ReloadCleanPeriod: 1, SequentialPeriod: 0} + var cfg = configuration.Controller{PulsePeriod: 1, ReloadPeriod: 10, ReloadCleanPeriod: 1, SequentialPeriod: 10} extractor := mock.NewJetDropsExtractorMock(t) @@ -289,9 +289,9 @@ func TestController_pulseMaintainer_Start_PulsesCompleteAndNot(t *testing.T) { // sequential is 1000000, pulses in db: [1000000, 1000020], expect loading data from 1000000 to 1000010 // sequential is 1000000, pulses in db: [1000000, 1000020], expect don't load already loaded data // wait ReloadPeriod seconds -// sequential is 1000000, pulses in db: [1000000, 1000020], expect loading data from 1000000 to 1000020 +// sequential is 1000000, pulses in db: [1000000, 1000020], expect loading data from 1000000 to 1000010 func TestController_pulseSequence_ReloadPeriodExpired(t *testing.T) { - var cfg = configuration.Controller{PulsePeriod: 0, ReloadPeriod: 2, ReloadCleanPeriod: 1, SequentialPeriod: 0} + var cfg = configuration.Controller{PulsePeriod: 0, ReloadPeriod: 2, ReloadCleanPeriod: 1, SequentialPeriod: 1} extractor := mock.NewJetDropsExtractorMock(t) @@ -310,7 +310,7 @@ func TestController_pulseSequence_ReloadPeriodExpired(t *testing.T) { require.Equal(t, int64(1000000), fromPulseNumber) require.Equal(t, int64(1000010), toPulseNumber) if extractor.LoadJetDropsBeforeCounter() > 2 { - require.Fail(t, "LoadJetDrops was called more than once") + require.Fail(t, "LoadJetDrops was called more than twice") } wg.Done() return nil