Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MessageHandler interface enhancements #30

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cmd/azure-connector/app/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func startRouter(
settings *azurecfg.AzureSettings,
connSettings *azurecfg.AzureConnectionSettings,
statusPub message.Publisher,
telemetryHandlers []handlers.MessageHandler,
commandHandlers []handlers.MessageHandler,
telemetryHandlers []handlers.TelemetryHandler,
commandHandlers []handlers.CommandHandler,
done chan bool,
logger logger.Logger,
) (*message.Router, error) {
Expand Down Expand Up @@ -71,10 +71,10 @@ func startRouter(
azureSub := connector.NewSubscriber(azureClient, connector.QosAtMostOnce, false, logger, nil)
mosquittoSub := connector.NewSubscriber(cloudClient, connector.QosAtLeastOnce, false, router.Logger(), nil)

routingbus.TelemetryBus(router, azurePub, mosquittoSub, settings, connSettings, telemetryHandlers)
routingbus.TelemetryBus(router, azurePub, mosquittoSub, &connSettings.RemoteConnectionInfo, telemetryHandlers)

cloudPub := connector.NewPublisher(cloudClient, connector.QosAtLeastOnce, router.Logger(), nil)
routingbus.CommandBus(router, cloudPub, azureSub, settings, connSettings, commandHandlers)
routingbus.CommandBus(router, cloudPub, azureSub, &connSettings.RemoteConnectionInfo, commandHandlers)

go func() {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -163,7 +163,7 @@ func startRouter(
}

// MainLoop is the main loop of the application
func MainLoop(settings *azurecfg.AzureSettings, log logger.Logger, idScopeProvider azurecfg.IDScopeProvider, telemetryHandlers []handlers.MessageHandler, commandHandlers []handlers.MessageHandler) error {
func MainLoop(settings *azurecfg.AzureSettings, log logger.Logger, idScopeProvider azurecfg.IDScopeProvider, telemetryHandlers []handlers.TelemetryHandler, commandHandlers []handlers.CommandHandler) error {
localClient, err := config.CreateLocalConnection(&settings.LocalConnectionSettings, log)
if err != nil {
return errors.Wrap(err, "cannot create mosquitto connection")
Expand Down
12 changes: 7 additions & 5 deletions cmd/azure-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func main() {
logger.Infof("Starting azure connector %s", version)
flags.ConfigCheck(logger, *fConfigFile)

if err := app.MainLoop(settings, logger, nil, telemetryHandlers(), commandHandlers()); err != nil {
if err := app.MainLoop(settings, logger, nil, telemetryHandlers(settings), commandHandlers(settings)); err != nil {
logger.Error("Init failure", err, nil)

loggerOut.Close()
Expand All @@ -80,10 +80,12 @@ func main() {
}
}

func telemetryHandlers() []handlers.MessageHandler {
return []handlers.MessageHandler{passthrough.CreateTelemetryHandler()}
func telemetryHandlers(settings *azurecfg.AzureSettings) []handlers.TelemetryHandler {
passthroughHandler := passthrough.CreateTelemetryHandler(settings.PassthroughTelemetryTopics)
return []handlers.TelemetryHandler{passthroughHandler}
}

func commandHandlers() []handlers.MessageHandler {
return []handlers.MessageHandler{passthrough.CreateCommandHandler()}
func commandHandlers(settings *azurecfg.AzureSettings) []handlers.CommandHandler {
passthroughHandler := passthrough.CreateCommandHandler(settings.PassthroughCommandTopic)
return []handlers.CommandHandler{passthroughHandler}
}
33 changes: 14 additions & 19 deletions config/connection_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,29 @@ import (
)

const (
provisioningJSONConfig = "provisioning.json" // TODO: why with the same name as default flag value?
hostNameSuffix = ".azure-devices.net"
propertyKeyHostName = "HostName"
propertyKeyDeviceID = "DeviceId"
propertyKeySharedAccessKey = "SharedAccessKey"
propertyKeySharedAccessKeyName = "SharedAccessKeyName"
provisioningJSONConfig = "provisioning.json"
hostNameSuffix = ".azure-devices.net"
propertyKeyHostName = "HostName"
propertyKeyDeviceID = "DeviceId"
propertyKeySharedAccessKey = "SharedAccessKey"
)

// SharedAccessKey contains the shared access key for generating SAS token for device authentication.
type SharedAccessKey struct {
SharedAccessKeyName string
SharedAccessKeyDecoded []byte
// RemoteConnectionInfo contains properties related to the remote connection that may not be known at the start of the connector.
type RemoteConnectionInfo struct {
HubName string
HostName string
DeviceID string
}

// AzureConnectionSettings contains the configuration data for establishing connection to the Azure IoT Hub.
type AzureConnectionSettings struct {
HubName string
HostName string
DeviceID string
RemoteConnectionInfo

DeviceCert string
DeviceKey string
TokenValidity time.Duration

*SharedAccessKey
SharedAccessKey []byte
}

// PrepareAzureConnectionSettings prepares the configuration data for establishing connection to the Azure IoT Hub, allowing usage of IDScopeProvider.
Expand Down Expand Up @@ -214,10 +213,7 @@ func CreateAzureSASTokenConnectionSettings(
if sharedAccessKeyDecoded, err = base64.StdEncoding.DecodeString(sharedAccessKey); err != nil {
return nil, errors.New("the SharedAccessKey is not base64 encoded")
}
connSettings.SharedAccessKey = &SharedAccessKey{
SharedAccessKeyName: connStringProperties[propertyKeySharedAccessKeyName],
SharedAccessKeyDecoded: sharedAccessKeyDecoded,
}
connSettings.SharedAccessKey = sharedAccessKeyDecoded

if tokenValidity, err := ParseSASTokenValidity(settings.SASTokenValidity); err != nil {
logger.Warn("The default SAS token validity period will be set.", err, nil)
Expand Down Expand Up @@ -262,7 +258,6 @@ func attachCertificateInfo(connSettings *AzureConnectionSettings, certFileReader
return err
}
}

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions config/connection_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func TestCreateTokenConnectionSettings(t *testing.T) {
assert.Equal(t, "dummy-device", connSettings.DeviceID)
assert.Equal(t, "dummy-hub.azure-devices.net", connSettings.HostName)
assert.Equal(t, "dummy-hub", connSettings.HubName)
assert.Equal(t, "", connSettings.SharedAccessKey.SharedAccessKeyName)
assert.Equal(t, decodedSharedAccessKey, connSettings.SharedAccessKey.SharedAccessKeyDecoded)
assert.Equal(t, decodedSharedAccessKey, connSettings.SharedAccessKey)
assert.Equal(t, time.Hour, connSettings.TokenValidity)
assert.Equal(t, "", connSettings.DeviceCert)
assert.Equal(t, "", connSettings.DeviceKey)
Expand Down
14 changes: 7 additions & 7 deletions config/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ func TestCreateAzureClientNoCacert(t *testing.T) {
defer goleak.VerifyNone(t)

decodedAccessKey, _ := base64.StdEncoding.DecodeString("x7HrdC+URzEneFam9ZKa0Ke7nvsDwiuJptzFkgs8JWA=")
accessKey := &config.SharedAccessKey{
SharedAccessKeyDecoded: decodedAccessKey,
}

settings := &config.AzureSettings{}
connSettings := &config.AzureConnectionSettings{
DeviceID: "dummy-device",
HostName: "dummy-hub.azure-devices.net",
HubName: "dummy-hub",
SharedAccessKey: accessKey,
RemoteConnectionInfo: config.RemoteConnectionInfo{
DeviceID: "dummy-device",
HostName: "dummy-hub.azure-devices.net",
HubName: "dummy-hub",
},
SharedAccessKey: decodedAccessKey,
}

logger := logger.NewLogger(log.New(io.Discard, "", log.Ldate), logger.INFO)
Expand Down
13 changes: 3 additions & 10 deletions config/sas.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,35 +39,28 @@ type SharedAccessSignature struct {
Sr string
Sig string
Se time.Time
Skn string
}

// GenerateSASToken generates the SAS token for device authentication.
func GenerateSASToken(connSettings *AzureConnectionSettings) *SharedAccessSignature {
return newSharedAccessSignature(connSettings.HostName,
connSettings.SharedAccessKeyName,
connSettings.SharedAccessKey.SharedAccessKeyDecoded,
connSettings.SharedAccessKey,
Now().Add(connSettings.TokenValidity))
}

func sasTokenToString(sas *SharedAccessSignature) string {
s := "SharedAccessSignature " +
return "SharedAccessSignature " +
"sr=" + url.QueryEscape(sas.Sr) +
"&sig=" + url.QueryEscape(sas.Sig) +
"&se=" + url.QueryEscape(strconv.FormatInt(sas.Se.Unix(), 10))
if sas.Skn != "" {
s += "&skn=" + url.QueryEscape(sas.Skn)
}
return s
}

func newSharedAccessSignature(resource, policy string, decodedKey []byte, expiry time.Time) *SharedAccessSignature {
func newSharedAccessSignature(resource string, decodedKey []byte, expiry time.Time) *SharedAccessSignature {
sig := messageKeySignature(resource, decodedKey, expiry)
return &SharedAccessSignature{
Sr: resource,
Sig: sig,
Se: expiry,
Skn: policy,
}
}

Expand Down
1 change: 0 additions & 1 deletion config/sas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func TestGenerateSASToken(t *testing.T) {

sasToken := config.GenerateSASToken(connSettings)

assert.Equal(t, "", sasToken.Skn)
assert.Equal(t, "dummy-hub.azure-devices.net", sasToken.Sr)
assert.Equal(t, "2021-01-01 01:00:00 +0000 UTC", sasToken.Se.String())
assert.Equal(t, "ifZm2I0YKRkwc8Pc49e0qKSsu3l3FbxoWZRqGBtXtng=", sasToken.Sig)
Expand Down
13 changes: 6 additions & 7 deletions routing/bus/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,23 @@ const (

type commandBusHandler struct {
logger watermill.LoggerAdapter
commandHandlers []handlers.MessageHandler
commandHandlers []handlers.CommandHandler
}

// CommandBus creates the cloud message bus for processing the C2D messages from the Azure IoT Hub device.
func CommandBus(router *message.Router,
mosquittoPub message.Publisher,
azureSub message.Subscriber,
settings *config.AzureSettings,
connSettings *config.AzureConnectionSettings,
commandHandlers []handlers.MessageHandler,
connInfo *config.RemoteConnectionInfo,
commandHandlers []handlers.CommandHandler,
) {
//Azure IoT Hub -> Message bus -> Mosquitto Broker -> Gateway
initCommandHandlers := []handlers.MessageHandler{}
initCommandHandlers := []handlers.CommandHandler{}
commandBusHandler := &commandBusHandler{
logger: router.Logger(),
}
for _, commandHandler := range commandHandlers {
if err := commandHandler.Init(settings, connSettings); err != nil {
if err := commandHandler.Init(connInfo); err != nil {
logFields := watermill.LogFields{"handler_name": commandHandler.Name()}
router.Logger().Error("skipping command handler that cannot be initialized", err, logFields)
continue
Expand All @@ -56,7 +55,7 @@ func CommandBus(router *message.Router,
}
commandBusHandler.commandHandlers = initCommandHandlers
router.AddHandler(commandHandlerName,
routing.CreateRemoteCloudTopic(connSettings.DeviceID),
routing.CreateRemoteCloudTopic(connInfo.DeviceID),
azureSub,
connector.TopicEmpty,
mosquittoPub,
Expand Down
48 changes: 17 additions & 31 deletions routing/bus/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,17 @@ package bus

import (
"errors"
"io"
"log"
"reflect"
"testing"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"

"github.com/eclipse-kanto/azure-connector/config"
"github.com/eclipse-kanto/azure-connector/routing"
test "github.com/eclipse-kanto/azure-connector/routing/bus/internal/testing"
"github.com/eclipse-kanto/azure-connector/routing/message/handlers"

conn "github.com/eclipse-kanto/suite-connector/connector"
"github.com/eclipse-kanto/suite-connector/logger"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -41,15 +37,10 @@ const (
)

func TestRegisterCommandMessageHandler(t *testing.T) {
settings := &config.AzureSettings{
ConnectionString: "HostName=dummy-hub.azure-devices.net;DeviceId=dummy-device;SharedAccessKey=dGVzdGF6dXJlc2hhcmVkYWNjZXNza2V5",
}
logger := logger.NewLogger(log.New(io.Discard, "", log.Ldate), logger.INFO)
connSettings, _ := config.PrepareAzureConnectionSettings(settings, nil, logger)
router, _ := message.NewRouter(message.RouterConfig{}, watermill.NopLogger{})

commandHandlers := []handlers.MessageHandler{}
CommandBus(router, conn.NullPublisher(), test.NewDummySubscriber(), settings, connSettings, commandHandlers)
router, connInfo := setupTestRouter("dummy-device")

commandHandlers := []handlers.CommandHandler{}
CommandBus(router, conn.NullPublisher(), test.NewDummySubscriber(), connInfo, commandHandlers)
refRouterPtr := reflect.ValueOf(router)
refRouter := reflect.Indirect(refRouterPtr)
refHandlers := refRouter.FieldByName(fieldHandlers)
Expand All @@ -59,16 +50,11 @@ func TestRegisterCommandMessageHandler(t *testing.T) {
}

func TestRegisterCommandMessageHandlerInitializationError(t *testing.T) {
settings := &config.AzureSettings{
ConnectionString: "HostName=dummy-hub.azure-devices.net;DeviceId=dummy-device;SharedAccessKey=dGVzdGF6dXJlc2hhcmVkYWNjZXNza2V5",
}
logger := logger.NewLogger(log.New(io.Discard, "", log.Ldate), logger.INFO)
connSettings, _ := config.PrepareAzureConnectionSettings(settings, nil, logger)
router, _ := message.NewRouter(message.RouterConfig{}, watermill.NopLogger{})

commandHandler := test.NewDummyMessageHandler(testCommandHandlerName, []string{commandName}, errors.New(""))
commandHandlers := []handlers.MessageHandler{commandHandler}
CommandBus(router, conn.NullPublisher(), test.NewDummySubscriber(), settings, connSettings, commandHandlers)
router, connInfo := setupTestRouter("dummy-device")

commandHandler := test.NewDummyCommandHandler(testCommandHandlerName, errors.New(""), nil)
commandHandlers := []handlers.CommandHandler{commandHandler}
CommandBus(router, conn.NullPublisher(), test.NewDummySubscriber(), connInfo, commandHandlers)
refRouterPtr := reflect.ValueOf(router)
refRouter := reflect.Indirect(refRouterPtr)
refHandlers := refRouter.FieldByName(fieldHandlers)
Expand All @@ -92,10 +78,10 @@ func TestNoCommandHandlerForMessage(t *testing.T) {
}

func TestFirstValidCommandMessageHandler(t *testing.T) {
commandHandler1 := test.NewDummyFailureHandler(testCommandHandlerName+"_1", nil, errors.New(""))
commandHandler2 := test.NewDummyMessageHandler(testCommandHandlerName+"_2", nil, nil)
commandHandler3 := test.NewDummyFailureHandler(testCommandHandlerName+"_3", nil, errors.New(""))
commandHandlers := []handlers.MessageHandler{commandHandler1, commandHandler2, commandHandler3}
commandHandler1 := test.NewDummyCommandHandler(testCommandHandlerName+"_1", nil, errors.New(""))
commandHandler2 := test.NewDummyCommandHandler(testCommandHandlerName+"_2", nil, nil)
commandHandler3 := test.NewDummyCommandHandler(testCommandHandlerName+"_3", nil, errors.New(""))
commandHandlers := []handlers.CommandHandler{commandHandler1, commandHandler2, commandHandler3}

busHandler := &commandBusHandler{logger: watermill.NopLogger{}, commandHandlers: commandHandlers}

Expand All @@ -107,10 +93,10 @@ func TestFirstValidCommandMessageHandler(t *testing.T) {
}

func TestMultipleCommandMessageHandlers(t *testing.T) {
commandHandler1 := test.NewDummyMessageHandler(testCommandHandlerName+"_1", nil, nil)
commandHandler2 := test.NewDummyMessageHandler(testCommandHandlerName+"_2", nil, nil)
commandHandler3 := test.NewDummyMessageHandler(testCommandHandlerName+"_3", nil, nil)
commandHandlers := []handlers.MessageHandler{commandHandler1, commandHandler2, commandHandler3}
commandHandler1 := test.NewDummyCommandHandler(testCommandHandlerName+"_1", nil, nil)
commandHandler2 := test.NewDummyCommandHandler(testCommandHandlerName+"_2", nil, nil)
commandHandler3 := test.NewDummyCommandHandler(testCommandHandlerName+"_3", nil, nil)
commandHandlers := []handlers.CommandHandler{commandHandler1, commandHandler2, commandHandler3}

busHandler := &commandBusHandler{logger: watermill.NopLogger{}, commandHandlers: commandHandlers}

Expand Down
20 changes: 10 additions & 10 deletions routing/bus/internal/testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ func NewDummySubscriber() message.Subscriber {

type dummyMessageHandler struct {
handleName string
topics []string
topics string
initErr error
handleErr error
}

func (h *dummyMessageHandler) Init(settings *config.AzureSettings, connSettings *config.AzureConnectionSettings) error {
func (h *dummyMessageHandler) Init(connInfo *config.RemoteConnectionInfo) error {
return h.initErr
}

Expand All @@ -69,25 +69,25 @@ func (h *dummyMessageHandler) Name() string {
return h.handleName
}

func (h *dummyMessageHandler) Topics() []string {
func (h *dummyMessageHandler) Topics() string {
return h.topics
}

// NewDummyMessageHandler instantiates a new dummy Watermill message handler.
func NewDummyMessageHandler(handlerName string, topics []string, initErr error) handlers.MessageHandler {
// NewDummyCommandHandler instantiates a new dummy command handler.
func NewDummyCommandHandler(handlerName string, initErr error, handleErr error) handlers.CommandHandler {
return &dummyMessageHandler{
handleName: handlerName,
topics: topics,
initErr: initErr,
handleErr: handleErr,
}
}

// NewDummyFailureHandler instantiates a new dummy Watermill message handler that can return error on message handling.
func NewDummyFailureHandler(handlerName string, topics []string, handleErr error) handlers.MessageHandler {
// NewDummyTelemetryHandler instantiates a new dummy telemetry handler.
func NewDummyTelemetryHandler(handlerName string, topic string, initErr error) handlers.TelemetryHandler {
return &dummyMessageHandler{
handleName: handlerName,
topics: topics,
handleErr: handleErr,
topics: topic,
initErr: initErr,
}
}

Expand Down
Loading