diff --git a/internal/bmc/nsq.go b/internal/bmc/nsq.go index 6079572..f9383fa 100644 --- a/internal/bmc/nsq.go +++ b/internal/bmc/nsq.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "os" + "time" "github.com/metal-stack/go-hal" "github.com/nsqio/go-nsq" @@ -46,13 +47,32 @@ func (b *BMCService) InitConsumer() error { } config.TlsV1 = true + // Deadlines for network reads and writes + config.ReadTimeout = 10 * time.Second + config.WriteTimeout = 10 * time.Second + + // Duration of time between heartbeats. This must be less than ReadTimeout + config.HeartbeatInterval = 5 * time.Second + + // Maximum duration when REQueueing (for doubling of deferred requeue) + config.MaxRequeueDelay = 5 * time.Second + config.DefaultRequeueDelay = 3 * time.Second + + // Maximum amount of time to backoff when processing fails 0 == no backoff + config.MaxBackoffDuration = 0 * time.Second // no need for backing off, just requeue + + // Maximum number of times this consumer will attempt to process a message before giving up + config.MaxAttempts = 2 // we do not try very often, if it doesn't work it's probably for a reason + + // Maximum number of messages to allow in flight (concurrency knob) + config.MaxInFlight = 10 // handling 10 machines in parallel should be enough + consumer, err := nsq.NewConsumer(b.machineTopic, mqChannel, config) if err != nil { return err } consumer.SetLogger(nsqLogger{log: b.log}, nsqMapLevel(b.log)) - consumer.AddHandler(b) err = consumer.ConnectToNSQD(b.mqAddress) @@ -70,7 +90,7 @@ func (b *BMCService) HandleMessage(message *nsq.Message) error { return err } - b.log.Debug("got message", "topic", b.machineTopic, "channel", mqChannel, "event", event) + b.log.Info("got message from nsq", "topic", b.machineTopic, "event", event, "attempt", message.Attempts) if event.Cmd.IPMI == nil { return fmt.Errorf("event does not contain ipmi details:%v", event) diff --git a/internal/bmc/nsq_logger.go b/internal/bmc/nsq_logger.go index 09c659e..cd61194 100644 --- a/internal/bmc/nsq_logger.go +++ b/internal/bmc/nsq_logger.go @@ -18,17 +18,18 @@ func (n nsqLogger) Output(calldepth int, s string) error { func nsqMapLevel(log *slog.Logger) nsq.LogLevel { ctx := context.Background() - if log.Enabled(ctx, slog.LevelDebug) { - return nsq.LogLevelDebug - } - if log.Enabled(ctx, slog.LevelInfo) { - return nsq.LogLevelInfo - } if log.Enabled(ctx, slog.LevelError) { return nsq.LogLevelError } if log.Enabled(ctx, slog.LevelWarn) { return nsq.LogLevelWarning } + if log.Enabled(ctx, slog.LevelInfo) { + return nsq.LogLevelInfo + } + if log.Enabled(ctx, slog.LevelDebug) { + return nsq.LogLevelDebug + } + return nsq.LogLevelInfo }