Skip to content

Commit

Permalink
Add ConnectThing method to bootstrap
Browse files Browse the repository at this point in the history
Signed-off-by: JeffMboya <[email protected]>
  • Loading branch information
JeffMboya committed Apr 24, 2024
1 parent 3d962fa commit 19ab222
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 5 deletions.
18 changes: 18 additions & 0 deletions bootstrap/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,24 @@ func (lm *loggingMiddleware) RemoveChannelHandler(ctx context.Context, id string
return lm.svc.RemoveChannelHandler(ctx, id)
}

func (lm *loggingMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("channel_id", channelID),
slog.String("thing_id", thingID),
}
if err != nil {
args = append(args, slog.Any("error", err))
lm.logger.Warn("Connect thing handler failed to complete successfully", args...)
return
}
lm.logger.Info("Connect thing handler completed successfully", args...)
}(time.Now())

return lm.svc.ConnectThingHandler(ctx, channelID, thingID)
}

func (lm *loggingMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
args := []any{
Expand Down
10 changes: 10 additions & 0 deletions bootstrap/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ func (mm *metricsMiddleware) RemoveChannelHandler(ctx context.Context, id string
return mm.svc.RemoveChannelHandler(ctx, id)
}

// ConnectThingHandler instruments ConnectThingHandler method with metrics.
func (mm *metricsMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
mm.counter.With("method", "connect_thing_handler").Add(1)
mm.latency.With("method", "connect_thing_handler").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.ConnectThingHandler(ctx, channelID, thingID)
}

// DisconnectThingHandler instruments DisconnectThingHandler method with metrics.
func (mm *metricsMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
Expand Down
4 changes: 4 additions & 0 deletions bootstrap/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ type ConfigRepository interface {
// RemoveChannel removes channel with the given ID.
RemoveChannel(ctx context.Context, id string) error

// ConnectHandler changes state of the Config when the corresponding Thing is
// connected to the Channel.
ConnectThing(ctx context.Context, channelID, thingID string) error

// DisconnectHandler changes state of the Config when the corresponding Thing is
// disconnected from the Channel.
DisconnectThing(ctx context.Context, channelID, thingID string) error
Expand Down
15 changes: 13 additions & 2 deletions bootstrap/events/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

const (
thingRemove = "thing.remove"
thingConnect = "policy.create"
thingDisconnect = "policy.delete"

channelPrefix = "group."
Expand Down Expand Up @@ -42,6 +43,9 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
case thingRemove:
rte := decodeRemoveThing(msg)
err = es.svc.RemoveConfigHandler(ctx, rte.id)
case thingConnect:
cte := decodeConnectThing(msg)
err = es.svc.ConnectThingHandler(ctx, cte.channelID, cte.thingID)
case thingDisconnect:
dte := decodeDisconnectThing(msg)
err = es.svc.DisconnectThingHandler(ctx, dte.channelID, dte.thingID)
Expand Down Expand Up @@ -87,8 +91,15 @@ func decodeRemoveChannel(event map[string]interface{}) removeEvent {
}
}

func decodeDisconnectThing(event map[string]interface{}) disconnectEvent {
return disconnectEvent{
func decodeConnectThing(event map[string]interface{}) connectionEvent {
return connectionEvent{
channelID: read(event, "chan_id", ""),
thingID: read(event, "thing_id", ""),
}
}

func decodeDisconnectThing(event map[string]interface{}) connectionEvent {
return connectionEvent{
channelID: read(event, "chan_id", ""),
thingID: read(event, "thing_id", ""),
}
Expand Down
14 changes: 14 additions & 0 deletions bootstrap/events/producer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
thingBootstrap = thingPrefix + "bootstrap"
thingStateChange = thingPrefix + "change_state"
thingUpdateConnections = thingPrefix + "update_connections"
thingConnect = thingPrefix + "connect"
thingDisconnect = thingPrefix + "disconnect"

channelPrefix = "group."
Expand Down Expand Up @@ -276,6 +277,19 @@ func (uche updateChannelHandlerEvent) Encode() (map[string]interface{}, error) {
return val, nil
}

type connectThingEvent struct {
thingID string
channelID string
}

func (cte connectThingEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"thing_id": cte.thingID,
"channel_id": cte.channelID,
"operation": thingConnect,
}, nil
}

type disconnectThingEvent struct {
thingID string
channelID string
Expand Down
13 changes: 13 additions & 0 deletions bootstrap/events/producer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,19 @@ func (es *eventStore) UpdateChannelHandler(ctx context.Context, channel bootstra
return es.Publish(ctx, ev)
}

func (es *eventStore) ConnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := es.svc.ConnectThingHandler(ctx, channelID, thingID); err != nil {
return err
}

ev := connectThingEvent{
thingID,
channelID,
}

return es.Publish(ctx, ev)
}

func (es *eventStore) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := es.svc.DisconnectThingHandler(ctx, channelID, thingID); err != nil {
return err
Expand Down
86 changes: 84 additions & 2 deletions bootstrap/events/producer/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
thingBootstrap = thingPrefix + "bootstrap"
thingStateChange = thingPrefix + "change_state"
thingUpdateConnections = thingPrefix + "update_connections"
thingConnect = thingPrefix + "connect"
thingDisconnect = thingPrefix + "disconnect"

channelPrefix = "group."
Expand Down Expand Up @@ -1039,6 +1040,87 @@ func TestRemoveConfigHandler(t *testing.T) {
}
}

func TestConnectThingHandler(t *testing.T) {
err := redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

svc, boot, _, _ := newService(t, redisURL)

err = redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

cases := []struct {
desc string
channelID string
thingID string
err error
event map[string]interface{}
}{
{
desc: "connect thing handler successfully",
channelID: channel.ID,
thingID: "1",
err: nil,
event: map[string]interface{}{
"channel_id": channel.ID,
"thing_id": "1",
"operation": thingConnect,
"timestamp": time.Now().UnixNano(),
"occurred_at": time.Now().UnixNano(),
},
},
{
desc: "add non-existing channel handler",
channelID: "unknown",
err: nil,
event: nil,
},
{
desc: "add channel handler with empty ID",
channelID: "",
err: nil,
event: nil,
},
{
desc: "add channel handler successfully",
channelID: channel.ID,
thingID: "1",
err: nil,
event: map[string]interface{}{
"channel_id": channel.ID,
"thing_id": "1",
"operation": thingConnect,
"timestamp": time.Now().UnixNano(),
"occurred_at": time.Now().UnixNano(),
},
},
}

lastID := "0"
for _, tc := range cases {
svcCall := boot.On("ConnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err)
err := svc.ConnectThingHandler(context.Background(), tc.channelID, tc.thingID)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))

streams := redisClient.XRead(context.Background(), &redis.XReadArgs{
Streams: []string{streamID, lastID},
Count: 1,
Block: time.Second,
}).Val()

var event map[string]interface{}
if len(streams) > 0 && len(streams[0].Messages) > 0 {
msg := streams[0].Messages[0]
event = msg.Values
event["timestamp"] = msg.ID
lastID = msg.ID
}

test(t, tc.event, event, tc.desc)
svcCall.Unset()
}
}

func TestDisconnectThingHandler(t *testing.T) {
err := redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
Expand Down Expand Up @@ -1097,7 +1179,7 @@ func TestDisconnectThingHandler(t *testing.T) {

lastID := "0"
for _, tc := range cases {
repoCall := boot.On("DisconnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err)
svcCall := boot.On("DisconnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err)
err := svc.DisconnectThingHandler(context.Background(), tc.channelID, tc.thingID)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))

Expand All @@ -1116,7 +1198,7 @@ func TestDisconnectThingHandler(t *testing.T) {
}

test(t, tc.event, event, tc.desc)
repoCall.Unset()
svcCall.Unset()
}
}

Expand Down
18 changes: 18 additions & 0 deletions bootstrap/mocks/configs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions bootstrap/mocks/service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions bootstrap/postgres/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
errSaveConnections = errors.New("failed to insert connections to database")
errUpdateChannels = errors.New("failed to update channels in bootstrap configuration database")
errRemoveChannels = errors.New("failed to remove channels from bootstrap configuration in database")
errConnectThing = errors.New("failed to connect thing in bootstrap configuration in database")
errDisconnectThing = errors.New("failed to disconnect thing in bootstrap configuration in database")
)

Expand Down Expand Up @@ -451,6 +452,15 @@ func (cr configRepository) RemoveChannel(ctx context.Context, id string) error {
return nil
}

func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID string) error {
q := `UPDATE configs SET state = $1 WHERE EXISTS (
SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)`
if _, err := cr.db.ExecContext(ctx, q, bootstrap.Active, thingID, channelID); err != nil {
return errors.Wrap(errConnectThing, err)
}
return nil
}

func (cr configRepository) DisconnectThing(ctx context.Context, channelID, thingID string) error {
q := `UPDATE configs SET state = $1 WHERE EXISTS (
SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)`
Expand Down
13 changes: 12 additions & 1 deletion bootstrap/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
errRemoveConfig = errors.New("failed to remove bootstrap configuration")
errRemoveChannel = errors.New("failed to remove channel")
errCreateThing = errors.New("failed to create thing")
errConnectThing = errors.New("failed to connect thing")
errDisconnectThing = errors.New("failed to disconnect thing")
errCheckChannels = errors.New("failed to check if channels exists")
errConnectionChannels = errors.New("failed to check channels connections")
Expand Down Expand Up @@ -94,7 +95,10 @@ type Service interface {
// RemoveChannelHandler removes Channel with id received from an event.
RemoveChannelHandler(ctx context.Context, id string) error

// DisconnectHandler changes state of the Config when connect/disconnect event occurs.
// ConnectHandler changes state of the Config when connect event occurs.
ConnectThingHandler(ctx context.Context, channelID, thingID string) error

// DisconnectHandler changes state of the Config when disconnect event occurs.
DisconnectThingHandler(ctx context.Context, channelID, thingID string) error
}

Expand Down Expand Up @@ -371,6 +375,13 @@ func (bs bootstrapService) RemoveChannelHandler(ctx context.Context, id string)
return nil
}

func (bs bootstrapService) ConnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := bs.configs.ConnectThing(ctx, channelID, thingID); err != nil {
return errors.Wrap(errConnectThing, err)
}
return nil
}

func (bs bootstrapService) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := bs.configs.DisconnectThing(ctx, channelID, thingID); err != nil {
return errors.Wrap(errDisconnectThing, err)
Expand Down
11 changes: 11 additions & 0 deletions bootstrap/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ func (tm *tracingMiddleware) RemoveChannelHandler(ctx context.Context, id string
return tm.svc.RemoveChannelHandler(ctx, id)
}

// ConnectThingHandler traces the "ConnectThingHandler" operation of the wrapped bootstrap.Service.
func (tm *tracingMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) error {
ctx, span := tm.tracer.Start(ctx, "svc_connect_thing_handler", trace.WithAttributes(
attribute.String("channel_id", channelID),
attribute.String("thing_id", thingID),
))
defer span.End()

return tm.svc.ConnectThingHandler(ctx, channelID, thingID)
}

// DisconnectThingHandler traces the "DisconnectThingHandler" operation of the wrapped bootstrap.Service.
func (tm *tracingMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error {
ctx, span := tm.tracer.Start(ctx, "svc_disconnect_thing_handler", trace.WithAttributes(
Expand Down

0 comments on commit 19ab222

Please sign in to comment.