Skip to content

Commit

Permalink
Connect to NSQ directly (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrit91 authored Sep 5, 2023
1 parent 543ab3c commit 64658a6
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 143 deletions.
1 change: 1 addition & 0 deletions .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jobs:
uses: actions/setup-go@v4
with:
go-version: '1.21.x'
cache: false

- name: Lint
uses: golangci/golangci-lint-action@v3
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/metal-stack/go-hal v0.4.3
github.com/metal-stack/metal-go v0.23.2
github.com/metal-stack/metal-lib v0.13.3
github.com/metal-stack/v v1.0.3
github.com/nsqio/go-nsq v1.1.0
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.25.0
golang.org/x/crypto v0.12.0
Expand Down Expand Up @@ -50,9 +50,9 @@ require (
github.com/lestrrat-go/jwx v1.2.26 // indirect
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/metal-stack/metal-lib v0.13.3 // indirect
github.com/metal-stack/security v0.6.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/nsqio/go-nsq v1.1.0 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand Down
10 changes: 0 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ github.com/avast/retry-go/v4 v4.5.0 h1:QoRAZZ90cj5oni2Lsgl2GW8mNTnUCnmpx/iKpwVis
github.com/avast/retry-go/v4 v4.5.0/go.mod h1:7hLEXp0oku2Nir2xBAsg0PTphp9z71bN5Aq1fboC3+I=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ=
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e h1:mWOqoK5jV13ChKf/aF3plwQ96laasTJgZi4f1aSOu+M=
github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/coreos/go-oidc/v3 v3.6.0 h1:AKVxfYw1Gmkn/w96z0DbT/B/xFnzTd3MkZvWLjF4n/o=
github.com/coreos/go-oidc/v3 v3.6.0/go.mod h1:ZpHUsHBucTUj6WOkrP4E20UPynbLZzhTQ1XKCXkxyPc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down Expand Up @@ -127,8 +123,6 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
Expand Down Expand Up @@ -184,12 +178,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nsqio/go-diskqueue v1.1.0 h1:r0dJ0DMXT3+2mOq+79cvCjnhoBxyGC2S9O+OjQrpe4Q=
github.com/nsqio/go-diskqueue v1.1.0/go.mod h1:INuJIxl4ayUsyoNtHL5+9MFPDfSZ0zY93hNY6vhBRsI=
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
github.com/nsqio/nsq v1.2.1 h1:ZVjANYLnX1vPLmuSNCOdiw4nNPnzWgAC4t8wFhznMqU=
github.com/nsqio/nsq v1.2.1/go.mod h1:vXbwehoIygyVoX44oLFaN7MA0xrmudeuborDpMPiLTY=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
Expand Down
41 changes: 17 additions & 24 deletions internal/bmc/bmc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,33 @@ import (
"github.com/metal-stack/go-hal"
"github.com/metal-stack/go-hal/connect"
halzap "github.com/metal-stack/go-hal/pkg/logger/zap"
"github.com/metal-stack/metal-bmc/pkg/config"

"go.uber.org/zap"
)

type BMCService struct {
log *zap.SugaredLogger
// NSQ related config options
mqAddress string
mqCACertFile string
mqClientCertFile string
mqLogLevel string
machineTopic string
machineTopicTTL time.Duration
mqAddress string
mqCACertFile string
mqClientCertFile string
mqClientCertKeyFile string
mqLogLevel string
machineTopic string
machineTopicTTL time.Duration
}

type Config struct {
Log *zap.SugaredLogger
MQAddress string
MQCACertFile string
MQClientCertFile string
MQLogLevel string
MachineTopic string
MachineTopicTTL time.Duration
}

func New(c Config) *BMCService {
func New(log *zap.SugaredLogger, c *config.Config) *BMCService {
b := &BMCService{
log: c.Log,
mqAddress: c.MQAddress,
mqCACertFile: c.MQCACertFile,
mqClientCertFile: c.MQClientCertFile,
mqLogLevel: c.MQLogLevel,
machineTopic: c.MachineTopic,
machineTopicTTL: c.MachineTopicTTL,
log: log,
mqAddress: c.MQAddress,
mqCACertFile: c.MQCACertFile,
mqClientCertFile: c.MQClientCertFile,
mqClientCertKeyFile: c.MQClientCertKeyFile,
mqLogLevel: c.MQLogLevel,
machineTopic: c.MachineTopic,
machineTopicTTL: c.MachineTopicTTL,
}
return b
}
Expand Down
11 changes: 6 additions & 5 deletions internal/bmc/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/metal-stack/go-hal/connect"
halzap "github.com/metal-stack/go-hal/pkg/logger/zap"
"github.com/metal-stack/metal-bmc/pkg/config"
metalgo "github.com/metal-stack/metal-go"
"github.com/metal-stack/metal-go/api/client/machine"

Expand All @@ -28,17 +29,17 @@ type console struct {
client metalgo.Client
}

func NewConsole(log *zap.SugaredLogger, client metalgo.Client, caCertFile, certFile, keyFile string, port int) (*console, error) {
func NewConsole(log *zap.SugaredLogger, client metalgo.Client, c config.Config) (*console, error) {

caCert, err := os.ReadFile(caCertFile)
caCert, err := os.ReadFile(c.ConsoleCACertFile)
if err != nil {
return nil, fmt.Errorf("failed to load cert: %w", err)
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

cert, err := tls.LoadX509KeyPair(certFile, keyFile)
cert, err := tls.LoadX509KeyPair(c.ConsoleCertFile, c.ConsoleKeyFile)
if err != nil {
return nil, err
}
Expand All @@ -50,7 +51,7 @@ func NewConsole(log *zap.SugaredLogger, client metalgo.Client, caCertFile, certF
MinVersion: tls.VersionTLS13,
}

bb, err := os.ReadFile(keyFile)
bb, err := os.ReadFile(c.ConsoleKeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load ssh server key:%w", err)
}
Expand All @@ -62,7 +63,7 @@ func NewConsole(log *zap.SugaredLogger, client metalgo.Client, caCertFile, certF
return &console{
log: log,
tlsConfig: tlsConfig,
port: port,
port: c.ConsolePort,
hostKey: hostKey,
client: client,
}, nil
Expand Down
192 changes: 106 additions & 86 deletions internal/bmc/nsq.go
Original file line number Diff line number Diff line change
@@ -1,108 +1,128 @@
package bmc

import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"strings"
"time"
"os"

"github.com/metal-stack/go-hal"
"github.com/nsqio/go-nsq"
)

"github.com/metal-stack/metal-lib/bus"
const (
mqChannel = "core"
)

// timeout for the nsq handler methods
const receiverHandlerTimeout = 15 * time.Second

func mapLogLevel(level string) bus.Level {
switch strings.ToLower(level) {
case "debug":
return bus.Debug
case "info":
return bus.Info
case "warn", "warning":
return bus.Warning
case "error":
return bus.Error
default:
return bus.Warning
func (b *BMCService) InitConsumer() error {
caCertRaw, err := os.ReadFile(b.mqCACertFile)
if err != nil {
return fmt.Errorf("failed to read ca cert: %w", err)
}
}

func (b *BMCService) timeoutHandler(err bus.TimeoutError) error {
b.log.Errorw("timeout processing event", "event", err.Event())
return nil
}
caCertPool, err := x509.SystemCertPool()
if err != nil {
return err
}

func (b *BMCService) InitConsumer() error {
tlsCfg := &bus.TLSConfig{
CACertFile: b.mqCACertFile,
ClientCertFile: b.mqClientCertFile,
ok := caCertPool.AppendCertsFromPEM(caCertRaw)
if !ok {
return fmt.Errorf("unable to add ca to cert pool")
}
c, err := bus.NewConsumer(b.log.Desugar(), tlsCfg, b.mqAddress)

cert, err := tls.LoadX509KeyPair(b.mqClientCertFile, b.mqClientCertKeyFile)
if err != nil {
return err
}

err = c.With(bus.LogLevel(mapLogLevel(b.mqLogLevel))).
MustRegister(b.machineTopic, "core").
Consume(MachineEvent{}, func(message interface{}) error {
event := message.(*MachineEvent)
b.log.Debugw("got message", "topic", b.machineTopic, "channel", "core", "event", event)
config := nsq.NewConfig()
config.TlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
ClientCAs: caCertPool,
RootCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
MinVersion: tls.VersionTLS12,
}
config.TlsV1 = true

if event.Cmd.IPMI == nil {
return fmt.Errorf("event does not contain ipmi details:%v", event)
}
outBand, err := b.outBand(event.Cmd.IPMI)
if err != nil {
b.log.Errorw("error creating outband connection", "error", err)
return err
}
consumer, err := nsq.NewConsumer(b.machineTopic, mqChannel, config)
if err != nil {
return err
}

switch event.Type {
case Delete:
err := outBand.BootFrom(hal.BootTargetPXE)
if err != nil {
return err
}
return outBand.PowerReset()
case Command:
switch event.Cmd.Command {
case MachineOnCmd:
return outBand.PowerOn()
case MachineOffCmd:
return outBand.PowerOff()
case MachineResetCmd:
return outBand.PowerReset()
case MachineCycleCmd:
return outBand.PowerCycle()
case MachineBiosCmd:
return outBand.BootFrom(hal.BootTargetBIOS)
case MachineDiskCmd:
return outBand.BootFrom(hal.BootTargetDisk)
case MachinePxeCmd:
return outBand.BootFrom(hal.BootTargetPXE)
case MachineReinstallCmd:
err := outBand.BootFrom(hal.BootTargetPXE)
if err != nil {
return err
}
return outBand.PowerCycle()
case ChassisIdentifyLEDOnCmd:
return outBand.IdentifyLEDOn()
case ChassisIdentifyLEDOffCmd:
return outBand.IdentifyLEDOff()
case UpdateFirmwareCmd:
return b.UpdateFirmware(outBand, event)
default:
b.log.Errorw("unhandled command", "topic", b.machineTopic, "channel", "core", "event", event)
}
case Create, Update:
fallthrough
default:
b.log.Warnw("unhandled event", "topic", b.machineTopic, "channel", "core", "event", event)
}
return nil
}, 5, bus.Timeout(receiverHandlerTimeout, b.timeoutHandler), bus.TTL(b.machineTopicTTL))
consumer.SetLogger(nsqZapLogger{log: b.log}, nsqMapLevel(b.log))

consumer.AddHandler(b)

err = consumer.ConnectToNSQD(b.mqAddress)
if err != nil {
return err
}

return err
}

func (b *BMCService) HandleMessage(message *nsq.Message) error {
var event MachineEvent
err := json.Unmarshal(message.Body, &event)
if err != nil {
return err
}

b.log.Debugw("got message", "topic", b.machineTopic, "channel", mqChannel, "event", event)

if event.Cmd.IPMI == nil {
return fmt.Errorf("event does not contain ipmi details:%v", event)
}
outBand, err := b.outBand(event.Cmd.IPMI)
if err != nil {
b.log.Errorw("error creating outband connection", "error", err)
return err
}

switch event.Type {
case Delete:
err := outBand.BootFrom(hal.BootTargetPXE)
if err != nil {
return err
}
return outBand.PowerReset()
case Command:
switch event.Cmd.Command {
case MachineOnCmd:
return outBand.PowerOn()
case MachineOffCmd:
return outBand.PowerOff()
case MachineResetCmd:
return outBand.PowerReset()
case MachineCycleCmd:
return outBand.PowerCycle()
case MachineBiosCmd:
return outBand.BootFrom(hal.BootTargetBIOS)
case MachineDiskCmd:
return outBand.BootFrom(hal.BootTargetDisk)
case MachinePxeCmd:
return outBand.BootFrom(hal.BootTargetPXE)
case MachineReinstallCmd:
err := outBand.BootFrom(hal.BootTargetPXE)
if err != nil {
return err
}
return outBand.PowerCycle()
case ChassisIdentifyLEDOnCmd:
return outBand.IdentifyLEDOn()
case ChassisIdentifyLEDOffCmd:
return outBand.IdentifyLEDOff()
case UpdateFirmwareCmd:
return b.UpdateFirmware(outBand, &event)
default:
b.log.Errorw("unhandled command", "topic", b.machineTopic, "channel", "core", "event", event)
}
case Create, Update:
fallthrough
default:
b.log.Warnw("unhandled event", "topic", b.machineTopic, "channel", "core", "event", event)
}
return nil
}
Loading

0 comments on commit 64658a6

Please sign in to comment.