Skip to content

Commit

Permalink
[#18] MessageHandler interface enhancements
Browse files Browse the repository at this point in the history
Fixed PR review comments.
- CloudConnectionInfo renamed to RemoteConnectionInfo
- SharedAccessKey struct removed, not really needed (SharedAccessKeyName is not applicable for Azure IoT Hub device connection string.
- SharedAccessKey is now just a []byte field.

Signed-off-by: Stoyan Zoubev <[email protected]>
  • Loading branch information
stoyan-zoubev committed Oct 14, 2022
1 parent 0c50d98 commit 97a892b
Show file tree
Hide file tree
Showing 14 changed files with 28 additions and 49 deletions.
4 changes: 2 additions & 2 deletions cmd/azure-connector/app/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ func startRouter(
azureSub := connector.NewSubscriber(azureClient, connector.QosAtMostOnce, false, logger, nil)
mosquittoSub := connector.NewSubscriber(cloudClient, connector.QosAtLeastOnce, false, router.Logger(), nil)

routingbus.TelemetryBus(router, azurePub, mosquittoSub, &connSettings.CloudConnectionInfo, telemetryHandlers)
routingbus.TelemetryBus(router, azurePub, mosquittoSub, &connSettings.RemoteConnectionInfo, telemetryHandlers)

cloudPub := connector.NewPublisher(cloudClient, connector.QosAtLeastOnce, router.Logger(), nil)
routingbus.CommandBus(router, cloudPub, azureSub, &connSettings.CloudConnectionInfo, commandHandlers)
routingbus.CommandBus(router, cloudPub, azureSub, &connSettings.RemoteConnectionInfo, commandHandlers)

go func() {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
30 changes: 10 additions & 20 deletions config/connection_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,29 @@ import (
)

const (
provisioningJSONConfig = "provisioning.json" // TODO: why with the same name as default flag value?
hostNameSuffix = ".azure-devices.net"
propertyKeyHostName = "HostName"
propertyKeyDeviceID = "DeviceId"
propertyKeySharedAccessKey = "SharedAccessKey"
propertyKeySharedAccessKeyName = "SharedAccessKeyName"
provisioningJSONConfig = "provisioning.json" // TODO: why with the same name as default flag value?
hostNameSuffix = ".azure-devices.net"
propertyKeyHostName = "HostName"
propertyKeyDeviceID = "DeviceId"
propertyKeySharedAccessKey = "SharedAccessKey"
)

// CloudConnectionInfo contains properties related to the cloud connection that may not be known at the start of the connector.
type CloudConnectionInfo struct {
// RemoteConnectionInfo contains properties related to the cloud connection that may not be known at the start of the connector.
type RemoteConnectionInfo struct {
HubName string
HostName string
DeviceID string
}

// SharedAccessKey contains the shared access key for generating SAS token for device authentication.
type SharedAccessKey struct {
SharedAccessKeyName string
SharedAccessKeyDecoded []byte
}

// AzureConnectionSettings contains the configuration data for establishing connection to the Azure IoT Hub.
type AzureConnectionSettings struct {
CloudConnectionInfo
RemoteConnectionInfo

DeviceCert string
DeviceKey string
TokenValidity time.Duration

*SharedAccessKey
SharedAccessKey []byte
}

// PrepareAzureConnectionSettings prepares the configuration data for establishing connection to the Azure IoT Hub, allowing usage of IDScopeProvider.
Expand Down Expand Up @@ -220,10 +213,7 @@ func CreateAzureSASTokenConnectionSettings(
if sharedAccessKeyDecoded, err = base64.StdEncoding.DecodeString(sharedAccessKey); err != nil {
return nil, errors.New("the SharedAccessKey is not base64 encoded")
}
connSettings.SharedAccessKey = &SharedAccessKey{
SharedAccessKeyName: connStringProperties[propertyKeySharedAccessKeyName],
SharedAccessKeyDecoded: sharedAccessKeyDecoded,
}
connSettings.SharedAccessKey = sharedAccessKeyDecoded

if tokenValidity, err := ParseSASTokenValidity(settings.SASTokenValidity); err != nil {
logger.Warn("The default SAS token validity period will be set.", err, nil)
Expand Down
3 changes: 1 addition & 2 deletions config/connection_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func TestCreateTokenConnectionSettings(t *testing.T) {
assert.Equal(t, "dummy-device", connSettings.DeviceID)
assert.Equal(t, "dummy-hub.azure-devices.net", connSettings.HostName)
assert.Equal(t, "dummy-hub", connSettings.HubName)
assert.Equal(t, "", connSettings.SharedAccessKey.SharedAccessKeyName)
assert.Equal(t, decodedSharedAccessKey, connSettings.SharedAccessKey.SharedAccessKeyDecoded)
assert.Equal(t, decodedSharedAccessKey, connSettings.SharedAccessKey)
assert.Equal(t, time.Hour, connSettings.TokenValidity)
assert.Equal(t, "", connSettings.DeviceCert)
assert.Equal(t, "", connSettings.DeviceKey)
Expand Down
8 changes: 3 additions & 5 deletions config/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@ func TestCreateAzureClientNoCacert(t *testing.T) {
defer goleak.VerifyNone(t)

decodedAccessKey, _ := base64.StdEncoding.DecodeString("x7HrdC+URzEneFam9ZKa0Ke7nvsDwiuJptzFkgs8JWA=")
accessKey := &config.SharedAccessKey{
SharedAccessKeyDecoded: decodedAccessKey,
}

settings := &config.AzureSettings{}
connSettings := &config.AzureConnectionSettings{
CloudConnectionInfo: config.CloudConnectionInfo{
RemoteConnectionInfo: config.RemoteConnectionInfo{
DeviceID: "dummy-device",
HostName: "dummy-hub.azure-devices.net",
HubName: "dummy-hub",
},
SharedAccessKey: accessKey,
SharedAccessKey: decodedAccessKey,
}

logger := logger.NewLogger(log.New(io.Discard, "", log.Ldate), logger.INFO)
Expand Down
13 changes: 3 additions & 10 deletions config/sas.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,35 +39,28 @@ type SharedAccessSignature struct {
Sr string
Sig string
Se time.Time
Skn string
}

// GenerateSASToken generates the SAS token for device authentication.
func GenerateSASToken(connSettings *AzureConnectionSettings) *SharedAccessSignature {
return newSharedAccessSignature(connSettings.HostName,
connSettings.SharedAccessKeyName,
connSettings.SharedAccessKey.SharedAccessKeyDecoded,
connSettings.SharedAccessKey,
Now().Add(connSettings.TokenValidity))
}

func sasTokenToString(sas *SharedAccessSignature) string {
s := "SharedAccessSignature " +
return "SharedAccessSignature " +
"sr=" + url.QueryEscape(sas.Sr) +
"&sig=" + url.QueryEscape(sas.Sig) +
"&se=" + url.QueryEscape(strconv.FormatInt(sas.Se.Unix(), 10))
if sas.Skn != "" {
s += "&skn=" + url.QueryEscape(sas.Skn)
}
return s
}

func newSharedAccessSignature(resource, policy string, decodedKey []byte, expiry time.Time) *SharedAccessSignature {
func newSharedAccessSignature(resource string, decodedKey []byte, expiry time.Time) *SharedAccessSignature {
sig := messageKeySignature(resource, decodedKey, expiry)
return &SharedAccessSignature{
Sr: resource,
Sig: sig,
Se: expiry,
Skn: policy,
}
}

Expand Down
1 change: 0 additions & 1 deletion config/sas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func TestGenerateSASToken(t *testing.T) {

sasToken := config.GenerateSASToken(connSettings)

assert.Equal(t, "", sasToken.Skn)
assert.Equal(t, "dummy-hub.azure-devices.net", sasToken.Sr)
assert.Equal(t, "2021-01-01 01:00:00 +0000 UTC", sasToken.Se.String())
assert.Equal(t, "ifZm2I0YKRkwc8Pc49e0qKSsu3l3FbxoWZRqGBtXtng=", sasToken.Sig)
Expand Down
2 changes: 1 addition & 1 deletion routing/bus/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type commandBusHandler struct {
func CommandBus(router *message.Router,
mosquittoPub message.Publisher,
azureSub message.Subscriber,
connInfo *config.CloudConnectionInfo,
connInfo *config.RemoteConnectionInfo,
commandHandlers []handlers.CommandHandler,
) {
//Azure IoT Hub -> Message bus -> Mosquitto Broker -> Gateway
Expand Down
2 changes: 1 addition & 1 deletion routing/bus/internal/testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type dummyMessageHandler struct {
handleErr error
}

func (h *dummyMessageHandler) Init(connInfo *config.CloudConnectionInfo) error {
func (h *dummyMessageHandler) Init(connInfo *config.RemoteConnectionInfo) error {
return h.initErr
}

Expand Down
2 changes: 1 addition & 1 deletion routing/bus/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TelemetryBus(
router *message.Router,
azurePub message.Publisher,
mosquittoSub message.Subscriber,
connInfo *config.CloudConnectionInfo,
connInfo *config.RemoteConnectionInfo,
telemetryHandlers []handlers.TelemetryHandler,
) {
//Gateway -> Mosquitto Broker -> Message bus -> Azure IoT Hub
Expand Down
4 changes: 2 additions & 2 deletions routing/bus/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func TestTelemetryMessageHandlerInitializationError(t *testing.T) {
test.AssertNoRouterHandlers(t, router)
}

func setupTestRouter(deviceID string) (*message.Router, *config.CloudConnectionInfo) {
connInfo := &config.CloudConnectionInfo{
func setupTestRouter(deviceID string) (*message.Router, *config.RemoteConnectionInfo) {
connInfo := &config.RemoteConnectionInfo{
DeviceID: deviceID,
}
router, _ := message.NewRouter(message.RouterConfig{}, watermill.NopLogger{})
Expand Down
2 changes: 1 addition & 1 deletion routing/message/handlers/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

// MessageHandler represents the internal interface for implementing a message handler.
type messageHandler interface {
Init(connInfo *config.CloudConnectionInfo) error
Init(connInfo *config.RemoteConnectionInfo) error
HandleMessage(message *message.Message) ([]*message.Message, error)
Name() string
}
Expand Down
2 changes: 1 addition & 1 deletion routing/message/handlers/passthrough/command_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func CreateCommandHandler(topic string) handlers.CommandHandler {
}

// Init does nothing.
func (h *commandHandler) Init(connInfo *config.CloudConnectionInfo) error {
func (h *commandHandler) Init(connInfo *config.RemoteConnectionInfo) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion routing/message/handlers/passthrough/telemetry_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func CreateTelemetryHandler(topics string) handlers.TelemetryHandler {
}

// Init gets the device ID that is needed for the message forwarding towards Azure IoT Hub.
func (h *telemetryHandler) Init(connInfo *config.CloudConnectionInfo) error {
func (h *telemetryHandler) Init(connInfo *config.RemoteConnectionInfo) error {
h.deviceID = connInfo.DeviceID
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestCreateTelemetryHandler(t *testing.T) {

func TestHandleTelemetryMessage(t *testing.T) {
handler := CreateTelemetryHandler("telemetry_topic")
require.NoError(t, handler.Init(&config.CloudConnectionInfo{DeviceID: "dummy_device"}))
require.NoError(t, handler.Init(&config.RemoteConnectionInfo{DeviceID: "dummy_device"}))

payload := "dummy_message"
outgoingMessages, err := handler.HandleMessage(&message.Message{Payload: []byte(payload)})
Expand Down

0 comments on commit 97a892b

Please sign in to comment.