diff --git a/cmd/run.go b/cmd/run.go index f43b405..46092bd 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -53,6 +53,7 @@ func init() { runCmd.Flags().Int("minimiser-workers", 1, "how many minimiser workers to start (0 means same as GOMAXPROCS)") runCmd.Flags().Bool("disable-mqtt", false, "disable MQTT message sending") runCmd.Flags().String("mqtt-signing-key-file", "dtm-mqtt-signer-key.pem", "ECSDSA key used for signing MQTT messages") + runCmd.Flags().String("mqtt-signing-key-id", "key1", "ID (used as `kid` in JWS) when signing MQTT messages") runCmd.Flags().String("mqtt-client-key-file", "dtm-mqtt-client-key.pem", "ECSDSA client key used for authenticating to MQTT bus") runCmd.Flags().String("mqtt-client-cert-file", "dtm-mqtt-client.pem", "ECSDSA client cert used for authenticating to MQTT bus") runCmd.Flags().String("mqtt-server", "127.0.0.1:8883", "MQTT server we will publish events to") diff --git a/pkg/runner/mqtt.go b/pkg/runner/mqtt.go index 6f5ec04..1ef736f 100644 --- a/pkg/runner/mqtt.go +++ b/pkg/runner/mqtt.go @@ -1,7 +1,6 @@ package runner import ( - "crypto/ecdsa" "crypto/tls" "crypto/x509" "fmt" @@ -11,6 +10,7 @@ import ( "github.com/eclipse/paho.golang/paho" "github.com/eclipse/paho.golang/paho/log" "github.com/lestrrat-go/jwx/v2/jwa" + "github.com/lestrrat-go/jwx/v2/jwk" "github.com/lestrrat-go/jwx/v2/jws" ) @@ -49,7 +49,7 @@ func (dtm *dnstapMinimiser) newAutoPahoClientConfig(caCertPool *x509.CertPool, s } -func (dtm *dnstapMinimiser) runAutoPaho(cm *autopaho.ConnectionManager, topic string, mqttSigningKey *ecdsa.PrivateKey) { +func (dtm *dnstapMinimiser) runAutoPaho(cm *autopaho.ConnectionManager, topic string, mqttJWK jwk.Key) { dtm.autopahoWg.Add(1) defer dtm.autopahoWg.Done() for { @@ -69,7 +69,7 @@ func (dtm *dnstapMinimiser) runAutoPaho(cm *autopaho.ConnectionManager, topic st return } - signedMsg, err := jws.Sign(unsignedMsg, jws.WithJSON(), jws.WithKey(jwa.ES256, mqttSigningKey)) + signedMsg, err := jws.Sign(unsignedMsg, jws.WithJSON(), jws.WithKey(jwa.ES256, mqttJWK)) if err != nil { dtm.log.Error("runAutoPaho: failed to created JWS message", "error", err) } diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 44eb350..7cbeb5f 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -36,6 +36,7 @@ import ( "github.com/fsnotify/fsnotify" _ "github.com/grafana/pyroscope-go/godeltaprof/http/pprof" lru "github.com/hashicorp/golang-lru/v2" + "github.com/lestrrat-go/jwx/v2/jwk" "github.com/miekg/dns" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" @@ -500,8 +501,20 @@ func (dtm *dnstapMinimiser) setupMQTT() { // Setup channel for reading messages to publish dtm.mqttPubCh = make(chan []byte, 100) + mqttJWK, err := jwk.FromRaw(mqttSigningKey) + if err != nil { + dtm.log.Error("unable to create MQTT JWK key", "error", err) + os.Exit(1) + } + + err = mqttJWK.Set(jwk.KeyIDKey, viper.GetString("mqtt-signing-key-id")) + if err != nil { + dtm.log.Error("unable to set MQTT JWK `kid`", "error", err) + os.Exit(1) + } + // Connect to the broker - this will return immediately after initiating the connection process - go dtm.runAutoPaho(autopahoCm, viper.GetString("mqtt-topic"), mqttSigningKey) + go dtm.runAutoPaho(autopahoCm, viper.GetString("mqtt-topic"), mqttJWK) } func Run() {