Skip to content

Commit

Permalink
Add ConnectThing method to bootstrap
Browse files Browse the repository at this point in the history
Signed-off-by: JeffMboya <[email protected]>

Add ConnectThing method to bootstrap

Signed-off-by: JeffMboya <[email protected]>

Consume ThingConnect in bootstrap

Signed-off-by: JeffMboya <[email protected]>

Consume ThingConnect in bootstrap

Signed-off-by: JeffMboya <[email protected]>

Consume ThingConnect in bootstrap

Signed-off-by: JeffMboya <[email protected]>

Consume ThingConnect event

Signed-off-by: JeffMboya <[email protected]>

Consume ThingsConnect event

Signed-off-by: JeffMboya <[email protected]>

Consume ThingsConnect event

Signed-off-by: JeffMboya <[email protected]>

Consume ThingsConnect event

Signed-off-by: JeffMboya <[email protected]>

Implement ReadStringSlice

Signed-off-by: JeffMboya <[email protected]>

Add memberKind and relation checks

Signed-off-by: JeffMboya <[email protected]>

Add memberKind and relation checks

Signed-off-by: JeffMboya <[email protected]>

Add memberKind and relation checks

Signed-off-by: JeffMboya <[email protected]>

Add memberKind and relation checks

Signed-off-by: JeffMboya <[email protected]>

Add memberKind and relation checks

Signed-off-by: JeffMboya <[email protected]>

Add TestDisconnectThing to configs_test

Signed-off-by: JeffMboya <[email protected]>

Update mocks

Signed-off-by: JeffMboya <[email protected]>
  • Loading branch information
JeffMboya authored and dborovcanin committed May 15, 2024
1 parent 3e8e534 commit 87860ef
Show file tree
Hide file tree
Showing 17 changed files with 401 additions and 52 deletions.
18 changes: 18 additions & 0 deletions bootstrap/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,24 @@ func (lm *loggingMiddleware) RemoveChannelHandler(ctx context.Context, id string
return lm.svc.RemoveChannelHandler(ctx, id)
}

func (lm *loggingMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("channel_id", channelID),
slog.String("thing_id", thingID),
}
if err != nil {
args = append(args, slog.Any("error", err))
lm.logger.Warn("Connect thing handler failed to complete successfully", args...)
return
}
lm.logger.Info("Connect thing handler completed successfully", args...)
}(time.Now())

return lm.svc.ConnectThingHandler(ctx, channelID, thingID)
}

func (lm *loggingMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
args := []any{
Expand Down
10 changes: 10 additions & 0 deletions bootstrap/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ func (mm *metricsMiddleware) RemoveChannelHandler(ctx context.Context, id string
return mm.svc.RemoveChannelHandler(ctx, id)
}

// ConnectThingHandler instruments ConnectThingHandler method with metrics.
func (mm *metricsMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
mm.counter.With("method", "connect_thing_handler").Add(1)
mm.latency.With("method", "connect_thing_handler").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.ConnectThingHandler(ctx, channelID, thingID)
}

// DisconnectThingHandler instruments DisconnectThingHandler method with metrics.
func (mm *metricsMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
Expand Down
6 changes: 5 additions & 1 deletion bootstrap/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ type ConfigRepository interface {
// RemoveChannel removes channel with the given ID.
RemoveChannel(ctx context.Context, id string) error

// ConnectHandler changes state of the Config when the corresponding Thing is
// connected to the Channel.
ConnectThing(ctx context.Context, mgChannel, mgThing string) error

// DisconnectHandler changes state of the Config when the corresponding Thing is
// disconnected from the Channel.
DisconnectThing(ctx context.Context, channelID, thingID string) error
DisconnectThing(ctx context.Context, mgChannel, mgThing string) error
}
4 changes: 2 additions & 2 deletions bootstrap/events/consumer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ type updateChannelEvent struct {

// Connection event is either connect or disconnect event.
type connectionEvent struct {
thingID string
channelID string
mgThing []string
mgChannel string
}
57 changes: 51 additions & 6 deletions bootstrap/events/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (

const (
thingRemove = "thing.remove"
thingDisconnect = "policy.delete"
thingConnect = "group.assign"
thingDisconnect = "group.unassign"

channelPrefix = "group."
channelUpdate = channelPrefix + "update"
Expand Down Expand Up @@ -42,9 +43,20 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
case thingRemove:
rte := decodeRemoveThing(msg)
err = es.svc.RemoveConfigHandler(ctx, rte.id)
case thingConnect:
cte := decodeConnectThing(msg)
for _, mgThing := range cte.mgThing {
if err = es.svc.ConnectThingHandler(ctx, cte.mgChannel, mgThing); err != nil {
return err
}
}
case thingDisconnect:
dte := decodeDisconnectThing(msg)
err = es.svc.DisconnectThingHandler(ctx, dte.channelID, dte.thingID)
for _, mgThing := range dte.mgThing {
if err = es.svc.DisconnectThingHandler(ctx, dte.mgChannel, mgThing); err != nil {
return err
}
}
case channelUpdate:
uce := decodeUpdateChannel(msg)
err = es.handleUpdateChannel(ctx, uce)
Expand Down Expand Up @@ -87,10 +99,24 @@ func decodeRemoveChannel(event map[string]interface{}) removeEvent {
}
}

func decodeDisconnectThing(event map[string]interface{}) disconnectEvent {
return disconnectEvent{
channelID: read(event, "chan_id", ""),
thingID: read(event, "thing_id", ""),
func decodeConnectThing(event map[string]interface{}) connectionEvent {
if event["memberKind"] != "things" && event["relation"] != "group" {
return connectionEvent{}
}

return connectionEvent{
mgChannel: read(event, "group_id", ""),
mgThing: ReadStringSlice(event, "member_ids"),
}
}

func decodeDisconnectThing(event map[string]interface{}) connectionEvent {
if event["memberKind"] != "things" && event["relation"] != "group" {
return connectionEvent{}
}
return connectionEvent{
mgChannel: read(event, "group_id", ""),
mgThing: ReadStringSlice(event, "member_ids"),
}
}

Expand All @@ -114,6 +140,25 @@ func read(event map[string]interface{}, key, def string) string {
return val
}

// ReadStringSlice reads string slice from event map.
// If value is not a string slice, returns empty slice.
func ReadStringSlice(event map[string]interface{}, key string) []string {
var res []string

vals, ok := event[key].([]interface{})
if !ok {
return res
}

for _, v := range vals {
if s, ok := v.(string); ok {
res = append(res, s)
}
}

return res
}

func readTime(event map[string]interface{}, key string, def time.Time) time.Time {
val, ok := event[key].(time.Time)
if !ok {
Expand Down
22 changes: 18 additions & 4 deletions bootstrap/events/producer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
thingBootstrap = thingPrefix + "bootstrap"
thingStateChange = thingPrefix + "change_state"
thingUpdateConnections = thingPrefix + "update_connections"
thingConnect = thingPrefix + "connect"
thingDisconnect = thingPrefix + "disconnect"

channelPrefix = "group."
Expand Down Expand Up @@ -276,15 +277,28 @@ func (uche updateChannelHandlerEvent) Encode() (map[string]interface{}, error) {
return val, nil
}

type connectThingEvent struct {
mgThing string
mgChannel string
}

func (cte connectThingEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"thing_id": cte.mgThing,
"channel_id": cte.mgChannel,
"operation": thingConnect,
}, nil
}

type disconnectThingEvent struct {
thingID string
channelID string
mgThing string
mgChannel string
}

func (dte disconnectThingEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"thing_id": dte.thingID,
"channel_id": dte.channelID,
"thing_id": dte.mgThing,
"channel_id": dte.mgChannel,
"operation": thingDisconnect,
}, nil
}
17 changes: 15 additions & 2 deletions bootstrap/events/producer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,27 @@ func (es *eventStore) UpdateChannelHandler(ctx context.Context, channel bootstra
return es.Publish(ctx, ev)
}

func (es *eventStore) ConnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := es.svc.ConnectThingHandler(ctx, channelID, thingID); err != nil {
return err
}

ev := connectThingEvent{
mgThing: thingID,
mgChannel: channelID,
}

return es.Publish(ctx, ev)
}

func (es *eventStore) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := es.svc.DisconnectThingHandler(ctx, channelID, thingID); err != nil {
return err
}

ev := disconnectThingEvent{
thingID,
channelID,
mgThing: thingID,
mgChannel: channelID,
}

return es.Publish(ctx, ev)
Expand Down
86 changes: 84 additions & 2 deletions bootstrap/events/producer/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
thingBootstrap = thingPrefix + "bootstrap"
thingStateChange = thingPrefix + "change_state"
thingUpdateConnections = thingPrefix + "update_connections"
thingConnect = thingPrefix + "connect"
thingDisconnect = thingPrefix + "disconnect"

channelPrefix = "group."
Expand Down Expand Up @@ -1039,6 +1040,87 @@ func TestRemoveConfigHandler(t *testing.T) {
}
}

func TestConnectThingHandler(t *testing.T) {
err := redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

svc, boot, _, _ := newService(t, redisURL)

err = redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

cases := []struct {
desc string
channelID string
thingID string
err error
event map[string]interface{}
}{
{
desc: "connect thing handler successfully",
channelID: channel.ID,
thingID: "1",
err: nil,
event: map[string]interface{}{
"channel_id": channel.ID,
"thing_id": "1",
"operation": thingConnect,
"timestamp": time.Now().UnixNano(),
"occurred_at": time.Now().UnixNano(),
},
},
{
desc: "add non-existing channel handler",
channelID: "unknown",
err: nil,
event: nil,
},
{
desc: "add channel handler with empty ID",
channelID: "",
err: nil,
event: nil,
},
{
desc: "add channel handler successfully",
channelID: channel.ID,
thingID: "1",
err: nil,
event: map[string]interface{}{
"channel_id": channel.ID,
"thing_id": "1",
"operation": thingConnect,
"timestamp": time.Now().UnixNano(),
"occurred_at": time.Now().UnixNano(),
},
},
}

lastID := "0"
for _, tc := range cases {
svcCall := boot.On("ConnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err)
err := svc.ConnectThingHandler(context.Background(), tc.channelID, tc.thingID)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))

streams := redisClient.XRead(context.Background(), &redis.XReadArgs{
Streams: []string{streamID, lastID},
Count: 1,
Block: time.Second,
}).Val()

var event map[string]interface{}
if len(streams) > 0 && len(streams[0].Messages) > 0 {
msg := streams[0].Messages[0]
event = msg.Values
event["timestamp"] = msg.ID
lastID = msg.ID
}

test(t, tc.event, event, tc.desc)
svcCall.Unset()
}
}

func TestDisconnectThingHandler(t *testing.T) {
err := redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
Expand Down Expand Up @@ -1097,7 +1179,7 @@ func TestDisconnectThingHandler(t *testing.T) {

lastID := "0"
for _, tc := range cases {
repoCall := boot.On("DisconnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err)
svcCall := boot.On("DisconnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err)
err := svc.DisconnectThingHandler(context.Background(), tc.channelID, tc.thingID)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))

Expand All @@ -1116,7 +1198,7 @@ func TestDisconnectThingHandler(t *testing.T) {
}

test(t, tc.event, event, tc.desc)
repoCall.Unset()
svcCall.Unset()
}
}

Expand Down
26 changes: 22 additions & 4 deletions bootstrap/mocks/configs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 87860ef

Please sign in to comment.