From 1ed6c099a80f4e0f2d7f46d95532b5042884e486 Mon Sep 17 00:00:00 2001 From: Markus Fensterer Date: Tue, 9 Jul 2024 14:36:31 +0200 Subject: [PATCH 1/5] avoid starvation by configuring max. number of retries of an event (backoff is exponential in nsq by default) and increase concurrency by processing at most two events at a time --- internal/bmc/nsq.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/bmc/nsq.go b/internal/bmc/nsq.go index 6079572..3096b04 100644 --- a/internal/bmc/nsq.go +++ b/internal/bmc/nsq.go @@ -45,6 +45,8 @@ func (b *BMCService) InitConsumer() error { MinVersion: tls.VersionTLS12, } config.TlsV1 = true + config.MaxAttempts = 3 + config.MaxInFlight = 2 consumer, err := nsq.NewConsumer(b.machineTopic, mqChannel, config) if err != nil { @@ -52,7 +54,6 @@ func (b *BMCService) InitConsumer() error { } consumer.SetLogger(nsqLogger{log: b.log}, nsqMapLevel(b.log)) - consumer.AddHandler(b) err = consumer.ConnectToNSQD(b.mqAddress) From 88ca1405cf8107b51be74e7db159423d8086cf6d Mon Sep 17 00:00:00 2001 From: Markus Fensterer Date: Tue, 9 Jul 2024 14:56:44 +0200 Subject: [PATCH 2/5] more logging --- internal/bmc/nsq.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/bmc/nsq.go b/internal/bmc/nsq.go index 3096b04..7906641 100644 --- a/internal/bmc/nsq.go +++ b/internal/bmc/nsq.go @@ -45,7 +45,6 @@ func (b *BMCService) InitConsumer() error { MinVersion: tls.VersionTLS12, } config.TlsV1 = true - config.MaxAttempts = 3 config.MaxInFlight = 2 consumer, err := nsq.NewConsumer(b.machineTopic, mqChannel, config) @@ -71,7 +70,11 @@ func (b *BMCService) HandleMessage(message *nsq.Message) error { return err } - b.log.Debug("got message", "topic", b.machineTopic, "channel", mqChannel, "event", event) + if message.attempt > 3 { + b.log.Warn("ignoring message because of multiple failed attempts", "topic", b.machineTopic, "channel", mqChannel, "event", event, "attempt", message.attempt) + return nil + } + b.log.Debug("got message", "topic", b.machineTopic, "channel", mqChannel, "event", event, "attempt", message.attempt) if event.Cmd.IPMI == nil { return fmt.Errorf("event does not contain ipmi details:%v", event) From e7bf557cff510bd13408f0c4c02b59f034f1ad10 Mon Sep 17 00:00:00 2001 From: Markus Fensterer Date: Tue, 9 Jul 2024 15:01:58 +0200 Subject: [PATCH 3/5] fixes --- internal/bmc/nsq.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/bmc/nsq.go b/internal/bmc/nsq.go index 7906641..76d63e7 100644 --- a/internal/bmc/nsq.go +++ b/internal/bmc/nsq.go @@ -70,11 +70,11 @@ func (b *BMCService) HandleMessage(message *nsq.Message) error { return err } - if message.attempt > 3 { - b.log.Warn("ignoring message because of multiple failed attempts", "topic", b.machineTopic, "channel", mqChannel, "event", event, "attempt", message.attempt) + if message.Attempts > 3 { + b.log.Warn("ignoring message because of multiple failed attempts", "topic", b.machineTopic, "channel", mqChannel, "event", event, "attempts", message.Attempts) return nil } - b.log.Debug("got message", "topic", b.machineTopic, "channel", mqChannel, "event", event, "attempt", message.attempt) + b.log.Debug("got message", "topic", b.machineTopic, "channel", mqChannel, "event", event, "attempts", message.Attempts) if event.Cmd.IPMI == nil { return fmt.Errorf("event does not contain ipmi details:%v", event) From 1acc7265ba8c886d679ffaf09f4a1130211d97be Mon Sep 17 00:00:00 2001 From: Markus Fensterer Date: Tue, 9 Jul 2024 15:54:29 +0200 Subject: [PATCH 4/5] increase max in flight --- internal/bmc/nsq.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/bmc/nsq.go b/internal/bmc/nsq.go index 76d63e7..eb48b8e 100644 --- a/internal/bmc/nsq.go +++ b/internal/bmc/nsq.go @@ -45,7 +45,7 @@ func (b *BMCService) InitConsumer() error { MinVersion: tls.VersionTLS12, } config.TlsV1 = true - config.MaxInFlight = 2 + config.MaxInFlight = 100 consumer, err := nsq.NewConsumer(b.machineTopic, mqChannel, config) if err != nil { From 26ccf303a4dc78ed711e7cffa498de7b367294be Mon Sep 17 00:00:00 2001 From: Gerrit Date: Thu, 22 Aug 2024 12:25:11 +0200 Subject: [PATCH 5/5] Further tweaking (#67) --- internal/bmc/nsq.go | 28 ++++++++++++++++++++++------ internal/bmc/nsq_logger.go | 13 +++++++------ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/internal/bmc/nsq.go b/internal/bmc/nsq.go index eb48b8e..f9383fa 100644 --- a/internal/bmc/nsq.go +++ b/internal/bmc/nsq.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "os" + "time" "github.com/metal-stack/go-hal" "github.com/nsqio/go-nsq" @@ -45,7 +46,26 @@ func (b *BMCService) InitConsumer() error { MinVersion: tls.VersionTLS12, } config.TlsV1 = true - config.MaxInFlight = 100 + + // Deadlines for network reads and writes + config.ReadTimeout = 10 * time.Second + config.WriteTimeout = 10 * time.Second + + // Duration of time between heartbeats. This must be less than ReadTimeout + config.HeartbeatInterval = 5 * time.Second + + // Maximum duration when REQueueing (for doubling of deferred requeue) + config.MaxRequeueDelay = 5 * time.Second + config.DefaultRequeueDelay = 3 * time.Second + + // Maximum amount of time to backoff when processing fails 0 == no backoff + config.MaxBackoffDuration = 0 * time.Second // no need for backing off, just requeue + + // Maximum number of times this consumer will attempt to process a message before giving up + config.MaxAttempts = 2 // we do not try very often, if it doesn't work it's probably for a reason + + // Maximum number of messages to allow in flight (concurrency knob) + config.MaxInFlight = 10 // handling 10 machines in parallel should be enough consumer, err := nsq.NewConsumer(b.machineTopic, mqChannel, config) if err != nil { @@ -70,11 +90,7 @@ func (b *BMCService) HandleMessage(message *nsq.Message) error { return err } - if message.Attempts > 3 { - b.log.Warn("ignoring message because of multiple failed attempts", "topic", b.machineTopic, "channel", mqChannel, "event", event, "attempts", message.Attempts) - return nil - } - b.log.Debug("got message", "topic", b.machineTopic, "channel", mqChannel, "event", event, "attempts", message.Attempts) + b.log.Info("got message from nsq", "topic", b.machineTopic, "event", event, "attempt", message.Attempts) if event.Cmd.IPMI == nil { return fmt.Errorf("event does not contain ipmi details:%v", event) diff --git a/internal/bmc/nsq_logger.go b/internal/bmc/nsq_logger.go index 09c659e..cd61194 100644 --- a/internal/bmc/nsq_logger.go +++ b/internal/bmc/nsq_logger.go @@ -18,17 +18,18 @@ func (n nsqLogger) Output(calldepth int, s string) error { func nsqMapLevel(log *slog.Logger) nsq.LogLevel { ctx := context.Background() - if log.Enabled(ctx, slog.LevelDebug) { - return nsq.LogLevelDebug - } - if log.Enabled(ctx, slog.LevelInfo) { - return nsq.LogLevelInfo - } if log.Enabled(ctx, slog.LevelError) { return nsq.LogLevelError } if log.Enabled(ctx, slog.LevelWarn) { return nsq.LogLevelWarning } + if log.Enabled(ctx, slog.LevelInfo) { + return nsq.LogLevelInfo + } + if log.Enabled(ctx, slog.LevelDebug) { + return nsq.LogLevelDebug + } + return nsq.LogLevelInfo }