From 64658a6a7360bb9bdf23d1cd7bedd5790931a73b Mon Sep 17 00:00:00 2001 From: Gerrit Date: Tue, 5 Sep 2023 15:16:38 +0200 Subject: [PATCH] Connect to NSQ directly (#39) --- .github/workflows/docker.yaml | 1 + go.mod | 4 +- go.sum | 10 -- internal/bmc/bmc.go | 41 +++---- internal/bmc/console.go | 11 +- internal/bmc/nsq.go | 192 ++++++++++++++++++--------------- internal/bmc/nsq_zap_logger.go | 34 ++++++ main.go | 12 +-- pkg/config/config.go | 13 +-- 9 files changed, 175 insertions(+), 143 deletions(-) create mode 100644 internal/bmc/nsq_zap_logger.go diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index 797c841..57c47bf 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -35,6 +35,7 @@ jobs: uses: actions/setup-go@v4 with: go-version: '1.21.x' + cache: false - name: Lint uses: golangci/golangci-lint-action@v3 diff --git a/go.mod b/go.mod index f511b9f..929ef16 100644 --- a/go.mod +++ b/go.mod @@ -7,8 +7,8 @@ require ( github.com/kelseyhightower/envconfig v1.4.0 github.com/metal-stack/go-hal v0.4.3 github.com/metal-stack/metal-go v0.23.2 - github.com/metal-stack/metal-lib v0.13.3 github.com/metal-stack/v v1.0.3 + github.com/nsqio/go-nsq v1.1.0 github.com/stretchr/testify v1.8.4 go.uber.org/zap v1.25.0 golang.org/x/crypto v0.12.0 @@ -50,9 +50,9 @@ require ( github.com/lestrrat-go/jwx v1.2.26 // indirect github.com/lestrrat-go/option v1.0.1 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/metal-stack/metal-lib v0.13.3 // indirect github.com/metal-stack/security v0.6.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect - github.com/nsqio/go-nsq v1.1.0 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/go.sum b/go.sum index 496bdf6..392e287 100644 --- a/go.sum +++ b/go.sum @@ -10,10 +10,6 @@ github.com/avast/retry-go/v4 v4.5.0 h1:QoRAZZ90cj5oni2Lsgl2GW8mNTnUCnmpx/iKpwVis github.com/avast/retry-go/v4 v4.5.0/go.mod h1:7hLEXp0oku2Nir2xBAsg0PTphp9z71bN5Aq1fboC3+I= github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ= -github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= -github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e h1:mWOqoK5jV13ChKf/aF3plwQ96laasTJgZi4f1aSOu+M= -github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= github.com/coreos/go-oidc/v3 v3.6.0 h1:AKVxfYw1Gmkn/w96z0DbT/B/xFnzTd3MkZvWLjF4n/o= github.com/coreos/go-oidc/v3 v3.6.0/go.mod h1:ZpHUsHBucTUj6WOkrP4E20UPynbLZzhTQ1XKCXkxyPc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -127,8 +123,6 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= -github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4= github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= @@ -184,12 +178,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/nsqio/go-diskqueue v1.1.0 h1:r0dJ0DMXT3+2mOq+79cvCjnhoBxyGC2S9O+OjQrpe4Q= -github.com/nsqio/go-diskqueue v1.1.0/go.mod h1:INuJIxl4ayUsyoNtHL5+9MFPDfSZ0zY93hNY6vhBRsI= github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= -github.com/nsqio/nsq v1.2.1 h1:ZVjANYLnX1vPLmuSNCOdiw4nNPnzWgAC4t8wFhznMqU= -github.com/nsqio/nsq v1.2.1/go.mod h1:vXbwehoIygyVoX44oLFaN7MA0xrmudeuborDpMPiLTY= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= diff --git a/internal/bmc/bmc.go b/internal/bmc/bmc.go index 1408459..ba9b13a 100644 --- a/internal/bmc/bmc.go +++ b/internal/bmc/bmc.go @@ -9,6 +9,7 @@ import ( "github.com/metal-stack/go-hal" "github.com/metal-stack/go-hal/connect" halzap "github.com/metal-stack/go-hal/pkg/logger/zap" + "github.com/metal-stack/metal-bmc/pkg/config" "go.uber.org/zap" ) @@ -16,33 +17,25 @@ import ( type BMCService struct { log *zap.SugaredLogger // NSQ related config options - mqAddress string - mqCACertFile string - mqClientCertFile string - mqLogLevel string - machineTopic string - machineTopicTTL time.Duration + mqAddress string + mqCACertFile string + mqClientCertFile string + mqClientCertKeyFile string + mqLogLevel string + machineTopic string + machineTopicTTL time.Duration } -type Config struct { - Log *zap.SugaredLogger - MQAddress string - MQCACertFile string - MQClientCertFile string - MQLogLevel string - MachineTopic string - MachineTopicTTL time.Duration -} - -func New(c Config) *BMCService { +func New(log *zap.SugaredLogger, c *config.Config) *BMCService { b := &BMCService{ - log: c.Log, - mqAddress: c.MQAddress, - mqCACertFile: c.MQCACertFile, - mqClientCertFile: c.MQClientCertFile, - mqLogLevel: c.MQLogLevel, - machineTopic: c.MachineTopic, - machineTopicTTL: c.MachineTopicTTL, + log: log, + mqAddress: c.MQAddress, + mqCACertFile: c.MQCACertFile, + mqClientCertFile: c.MQClientCertFile, + mqClientCertKeyFile: c.MQClientCertKeyFile, + mqLogLevel: c.MQLogLevel, + machineTopic: c.MachineTopic, + machineTopicTTL: c.MachineTopicTTL, } return b } diff --git a/internal/bmc/console.go b/internal/bmc/console.go index e3e18ba..bfa669b 100644 --- a/internal/bmc/console.go +++ b/internal/bmc/console.go @@ -12,6 +12,7 @@ import ( "github.com/metal-stack/go-hal/connect" halzap "github.com/metal-stack/go-hal/pkg/logger/zap" + "github.com/metal-stack/metal-bmc/pkg/config" metalgo "github.com/metal-stack/metal-go" "github.com/metal-stack/metal-go/api/client/machine" @@ -28,9 +29,9 @@ type console struct { client metalgo.Client } -func NewConsole(log *zap.SugaredLogger, client metalgo.Client, caCertFile, certFile, keyFile string, port int) (*console, error) { +func NewConsole(log *zap.SugaredLogger, client metalgo.Client, c config.Config) (*console, error) { - caCert, err := os.ReadFile(caCertFile) + caCert, err := os.ReadFile(c.ConsoleCACertFile) if err != nil { return nil, fmt.Errorf("failed to load cert: %w", err) } @@ -38,7 +39,7 @@ func NewConsole(log *zap.SugaredLogger, client metalgo.Client, caCertFile, certF caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) - cert, err := tls.LoadX509KeyPair(certFile, keyFile) + cert, err := tls.LoadX509KeyPair(c.ConsoleCertFile, c.ConsoleKeyFile) if err != nil { return nil, err } @@ -50,7 +51,7 @@ func NewConsole(log *zap.SugaredLogger, client metalgo.Client, caCertFile, certF MinVersion: tls.VersionTLS13, } - bb, err := os.ReadFile(keyFile) + bb, err := os.ReadFile(c.ConsoleKeyFile) if err != nil { return nil, fmt.Errorf("failed to load ssh server key:%w", err) } @@ -62,7 +63,7 @@ func NewConsole(log *zap.SugaredLogger, client metalgo.Client, caCertFile, certF return &console{ log: log, tlsConfig: tlsConfig, - port: port, + port: c.ConsolePort, hostKey: hostKey, client: client, }, nil diff --git a/internal/bmc/nsq.go b/internal/bmc/nsq.go index 4a36695..048ab29 100644 --- a/internal/bmc/nsq.go +++ b/internal/bmc/nsq.go @@ -1,108 +1,128 @@ package bmc import ( + "crypto/tls" + "crypto/x509" + "encoding/json" "fmt" - "strings" - "time" + "os" "github.com/metal-stack/go-hal" + "github.com/nsqio/go-nsq" +) - "github.com/metal-stack/metal-lib/bus" +const ( + mqChannel = "core" ) -// timeout for the nsq handler methods -const receiverHandlerTimeout = 15 * time.Second - -func mapLogLevel(level string) bus.Level { - switch strings.ToLower(level) { - case "debug": - return bus.Debug - case "info": - return bus.Info - case "warn", "warning": - return bus.Warning - case "error": - return bus.Error - default: - return bus.Warning +func (b *BMCService) InitConsumer() error { + caCertRaw, err := os.ReadFile(b.mqCACertFile) + if err != nil { + return fmt.Errorf("failed to read ca cert: %w", err) } -} -func (b *BMCService) timeoutHandler(err bus.TimeoutError) error { - b.log.Errorw("timeout processing event", "event", err.Event()) - return nil -} + caCertPool, err := x509.SystemCertPool() + if err != nil { + return err + } -func (b *BMCService) InitConsumer() error { - tlsCfg := &bus.TLSConfig{ - CACertFile: b.mqCACertFile, - ClientCertFile: b.mqClientCertFile, + ok := caCertPool.AppendCertsFromPEM(caCertRaw) + if !ok { + return fmt.Errorf("unable to add ca to cert pool") } - c, err := bus.NewConsumer(b.log.Desugar(), tlsCfg, b.mqAddress) + + cert, err := tls.LoadX509KeyPair(b.mqClientCertFile, b.mqClientCertKeyFile) if err != nil { return err } - err = c.With(bus.LogLevel(mapLogLevel(b.mqLogLevel))). - MustRegister(b.machineTopic, "core"). - Consume(MachineEvent{}, func(message interface{}) error { - event := message.(*MachineEvent) - b.log.Debugw("got message", "topic", b.machineTopic, "channel", "core", "event", event) + config := nsq.NewConfig() + config.TlsConfig = &tls.Config{ + Certificates: []tls.Certificate{cert}, + ClientCAs: caCertPool, + RootCAs: caCertPool, + ClientAuth: tls.RequireAndVerifyClientCert, + MinVersion: tls.VersionTLS12, + } + config.TlsV1 = true - if event.Cmd.IPMI == nil { - return fmt.Errorf("event does not contain ipmi details:%v", event) - } - outBand, err := b.outBand(event.Cmd.IPMI) - if err != nil { - b.log.Errorw("error creating outband connection", "error", err) - return err - } + consumer, err := nsq.NewConsumer(b.machineTopic, mqChannel, config) + if err != nil { + return err + } - switch event.Type { - case Delete: - err := outBand.BootFrom(hal.BootTargetPXE) - if err != nil { - return err - } - return outBand.PowerReset() - case Command: - switch event.Cmd.Command { - case MachineOnCmd: - return outBand.PowerOn() - case MachineOffCmd: - return outBand.PowerOff() - case MachineResetCmd: - return outBand.PowerReset() - case MachineCycleCmd: - return outBand.PowerCycle() - case MachineBiosCmd: - return outBand.BootFrom(hal.BootTargetBIOS) - case MachineDiskCmd: - return outBand.BootFrom(hal.BootTargetDisk) - case MachinePxeCmd: - return outBand.BootFrom(hal.BootTargetPXE) - case MachineReinstallCmd: - err := outBand.BootFrom(hal.BootTargetPXE) - if err != nil { - return err - } - return outBand.PowerCycle() - case ChassisIdentifyLEDOnCmd: - return outBand.IdentifyLEDOn() - case ChassisIdentifyLEDOffCmd: - return outBand.IdentifyLEDOff() - case UpdateFirmwareCmd: - return b.UpdateFirmware(outBand, event) - default: - b.log.Errorw("unhandled command", "topic", b.machineTopic, "channel", "core", "event", event) - } - case Create, Update: - fallthrough - default: - b.log.Warnw("unhandled event", "topic", b.machineTopic, "channel", "core", "event", event) - } - return nil - }, 5, bus.Timeout(receiverHandlerTimeout, b.timeoutHandler), bus.TTL(b.machineTopicTTL)) + consumer.SetLogger(nsqZapLogger{log: b.log}, nsqMapLevel(b.log)) + + consumer.AddHandler(b) + + err = consumer.ConnectToNSQD(b.mqAddress) + if err != nil { + return err + } return err } + +func (b *BMCService) HandleMessage(message *nsq.Message) error { + var event MachineEvent + err := json.Unmarshal(message.Body, &event) + if err != nil { + return err + } + + b.log.Debugw("got message", "topic", b.machineTopic, "channel", mqChannel, "event", event) + + if event.Cmd.IPMI == nil { + return fmt.Errorf("event does not contain ipmi details:%v", event) + } + outBand, err := b.outBand(event.Cmd.IPMI) + if err != nil { + b.log.Errorw("error creating outband connection", "error", err) + return err + } + + switch event.Type { + case Delete: + err := outBand.BootFrom(hal.BootTargetPXE) + if err != nil { + return err + } + return outBand.PowerReset() + case Command: + switch event.Cmd.Command { + case MachineOnCmd: + return outBand.PowerOn() + case MachineOffCmd: + return outBand.PowerOff() + case MachineResetCmd: + return outBand.PowerReset() + case MachineCycleCmd: + return outBand.PowerCycle() + case MachineBiosCmd: + return outBand.BootFrom(hal.BootTargetBIOS) + case MachineDiskCmd: + return outBand.BootFrom(hal.BootTargetDisk) + case MachinePxeCmd: + return outBand.BootFrom(hal.BootTargetPXE) + case MachineReinstallCmd: + err := outBand.BootFrom(hal.BootTargetPXE) + if err != nil { + return err + } + return outBand.PowerCycle() + case ChassisIdentifyLEDOnCmd: + return outBand.IdentifyLEDOn() + case ChassisIdentifyLEDOffCmd: + return outBand.IdentifyLEDOff() + case UpdateFirmwareCmd: + return b.UpdateFirmware(outBand, &event) + default: + b.log.Errorw("unhandled command", "topic", b.machineTopic, "channel", "core", "event", event) + } + case Create, Update: + fallthrough + default: + b.log.Warnw("unhandled event", "topic", b.machineTopic, "channel", "core", "event", event) + } + return nil +} diff --git a/internal/bmc/nsq_zap_logger.go b/internal/bmc/nsq_zap_logger.go new file mode 100644 index 0000000..3412dd4 --- /dev/null +++ b/internal/bmc/nsq_zap_logger.go @@ -0,0 +1,34 @@ +package bmc + +import ( + "github.com/nsqio/go-nsq" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type nsqZapLogger struct { + log *zap.SugaredLogger +} + +func (n nsqZapLogger) Output(calldepth int, s string) error { + n.log.Infow(s) + return nil +} + +func nsqMapLevel(log *zap.SugaredLogger) nsq.LogLevel { + switch log.Desugar().Level() { + case zapcore.DebugLevel, zapcore.DPanicLevel, zapcore.InvalidLevel: + return nsq.LogLevelDebug + case zapcore.InfoLevel: + return nsq.LogLevelInfo + case zapcore.WarnLevel: + return nsq.LogLevelWarning + case zapcore.ErrorLevel, zapcore.FatalLevel: + return nsq.LogLevelError + case zapcore.PanicLevel: + return nsq.LogLevelMax + default: + return nsq.LogLevelDebug + } +} diff --git a/main.go b/main.go index 02d4046..b345dde 100644 --- a/main.go +++ b/main.go @@ -50,15 +50,7 @@ func main() { } // BMC Events via NSQ - b := bmc.New(bmc.Config{ - Log: log, - MQAddress: cfg.MQAddress, - MQCACertFile: cfg.MQCACertFile, - MQClientCertFile: cfg.MQClientCertFile, - MQLogLevel: cfg.MQLogLevel, - MachineTopic: cfg.MachineTopic, - MachineTopicTTL: cfg.MachineTopicTTL, - }) + b := bmc.New(log, &cfg) err = b.InitConsumer() if err != nil { @@ -66,7 +58,7 @@ func main() { } // BMC Console access - console, err := bmc.NewConsole(log, client, cfg.ConsoleCACertFile, cfg.ConsoleCertFile, cfg.ConsoleKeyFile, cfg.ConsolePort) + console, err := bmc.NewConsole(log, client, cfg) if err != nil { log.Fatalw("unable to create bmc console", "error", err) } diff --git a/pkg/config/config.go b/pkg/config/config.go index 2ca3970..cc896e9 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -23,12 +23,13 @@ type Config struct { AllowedCidrs []string `required:"false" default:"0.0.0.0/0" desc:"filters dhcp leases" split_words:"true"` // NSQ connection parameters - MQAddress string `required:"false" default:"localhost:4161" desc:"set the MQ server address" envconfig:"mq_address"` - MQCACertFile string `required:"false" default:"" desc:"the CA certificate file for verifying MQ certificate" envconfig:"mq_ca_cert_file"` - MQClientCertFile string `required:"false" default:"" desc:"the client certificate file for accessing MQ" envconfig:"mq_client_cert_file"` - MQLogLevel string `required:"false" default:"warn" desc:"sets the MQ loglevel (debug, info, warn, error)" envconfig:"mq_loglevel"` - MachineTopic string `required:"false" default:"machine" desc:"set the machine topic name" split_words:"true"` - MachineTopicTTL time.Duration `required:"false" default:"30s" desc:"sets the TTL for MachineTopic" envconfig:"machine_topic_ttl"` + MQAddress string `required:"false" default:"localhost:4150" desc:"set the nsqd server address" envconfig:"mq_address"` + MQCACertFile string `required:"false" default:"" desc:"the CA certificate file for verifying MQ certificate" envconfig:"mq_ca_cert_file"` + MQClientCertFile string `required:"false" default:"" desc:"the client certificate file for accessing MQ" envconfig:"mq_client_cert_file"` + MQClientCertKeyFile string `required:"false" default:"" desc:"the client certificate key file for accessing MQ" envconfig:"mq_client_cert_key_file"` + MQLogLevel string `required:"false" default:"warn" desc:"sets the MQ loglevel (debug, info, warn, error)" envconfig:"mq_loglevel"` + MachineTopic string `required:"false" default:"machine" desc:"set the machine topic name" split_words:"true"` + MachineTopicTTL time.Duration `required:"false" default:"30s" desc:"sets the TTL for MachineTopic" envconfig:"machine_topic_ttl"` // Console Proxy parameters ConsolePort int `required:"false" default:"3333" desc:"defines the port where to listen for incoming console connections from metal-console" envconfig:"console_port"`