diff --git a/bootstrap/api/logging.go b/bootstrap/api/logging.go index eb92d7a45e..a9a5f25466 100644 --- a/bootstrap/api/logging.go +++ b/bootstrap/api/logging.go @@ -257,6 +257,24 @@ func (lm *loggingMiddleware) RemoveChannelHandler(ctx context.Context, id string return lm.svc.RemoveChannelHandler(ctx, id) } +func (lm *loggingMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) { + defer func(begin time.Time) { + args := []any{ + slog.String("duration", time.Since(begin).String()), + slog.String("channel_id", channelID), + slog.String("thing_id", thingID), + } + if err != nil { + args = append(args, slog.Any("error", err)) + lm.logger.Warn("Connect thing handler failed to complete successfully", args...) + return + } + lm.logger.Info("Connect thing handler completed successfully", args...) + }(time.Now()) + + return lm.svc.ConnectThingHandler(ctx, channelID, thingID) +} + func (lm *loggingMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) { defer func(begin time.Time) { args := []any{ diff --git a/bootstrap/api/metrics.go b/bootstrap/api/metrics.go index 8137635e3c..9e42a57c18 100644 --- a/bootstrap/api/metrics.go +++ b/bootstrap/api/metrics.go @@ -150,6 +150,16 @@ func (mm *metricsMiddleware) RemoveChannelHandler(ctx context.Context, id string return mm.svc.RemoveChannelHandler(ctx, id) } +// ConnectThingHandler instruments ConnectThingHandler method with metrics. +func (mm *metricsMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) { + defer func(begin time.Time) { + mm.counter.With("method", "connect_thing_handler").Add(1) + mm.latency.With("method", "connect_thing_handler").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return mm.svc.ConnectThingHandler(ctx, channelID, thingID) +} + // DisconnectThingHandler instruments DisconnectThingHandler method with metrics. func (mm *metricsMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) { defer func(begin time.Time) { diff --git a/bootstrap/configs.go b/bootstrap/configs.go index 6b7df9b42e..e6a5050743 100644 --- a/bootstrap/configs.go +++ b/bootstrap/configs.go @@ -112,7 +112,11 @@ type ConfigRepository interface { // RemoveChannel removes channel with the given ID. RemoveChannel(ctx context.Context, id string) error + // ConnectHandler changes state of the Config when the corresponding Thing is + // connected to the Channel. + ConnectThing(ctx context.Context, mgChannel, mgThing string) error + // DisconnectHandler changes state of the Config when the corresponding Thing is // disconnected from the Channel. - DisconnectThing(ctx context.Context, channelID, thingID string) error + DisconnectThing(ctx context.Context, mgChannel, mgThing string) error } diff --git a/bootstrap/events/consumer/events.go b/bootstrap/events/consumer/events.go index e5f26a6158..7ec78f6d00 100644 --- a/bootstrap/events/consumer/events.go +++ b/bootstrap/events/consumer/events.go @@ -19,6 +19,6 @@ type updateChannelEvent struct { // Connection event is either connect or disconnect event. type connectionEvent struct { - thingID string - channelID string + mgThing []string + mgChannel string } diff --git a/bootstrap/events/consumer/streams.go b/bootstrap/events/consumer/streams.go index 10d8ff77eb..55bf33f0f1 100644 --- a/bootstrap/events/consumer/streams.go +++ b/bootstrap/events/consumer/streams.go @@ -14,7 +14,8 @@ import ( const ( thingRemove = "thing.remove" - thingDisconnect = "policy.delete" + thingConnect = "group.assign" + thingDisconnect = "group.unassign" channelPrefix = "group." channelUpdate = channelPrefix + "update" @@ -42,9 +43,20 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error { case thingRemove: rte := decodeRemoveThing(msg) err = es.svc.RemoveConfigHandler(ctx, rte.id) + case thingConnect: + cte := decodeConnectThing(msg) + for _, mgThing := range cte.mgThing { + if err = es.svc.ConnectThingHandler(ctx, cte.mgChannel, mgThing); err != nil { + return err + } + } case thingDisconnect: dte := decodeDisconnectThing(msg) - err = es.svc.DisconnectThingHandler(ctx, dte.channelID, dte.thingID) + for _, mgThing := range dte.mgThing { + if err = es.svc.DisconnectThingHandler(ctx, dte.mgChannel, mgThing); err != nil { + return err + } + } case channelUpdate: uce := decodeUpdateChannel(msg) err = es.handleUpdateChannel(ctx, uce) @@ -87,10 +99,24 @@ func decodeRemoveChannel(event map[string]interface{}) removeEvent { } } -func decodeDisconnectThing(event map[string]interface{}) disconnectEvent { - return disconnectEvent{ - channelID: read(event, "chan_id", ""), - thingID: read(event, "thing_id", ""), +func decodeConnectThing(event map[string]interface{}) connectionEvent { + if event["memberKind"] != "things" && event["relation"] != "group" { + return connectionEvent{} + } + + return connectionEvent{ + mgChannel: read(event, "group_id", ""), + mgThing: ReadStringSlice(event, "member_ids"), + } +} + +func decodeDisconnectThing(event map[string]interface{}) connectionEvent { + if event["memberKind"] != "things" && event["relation"] != "group" { + return connectionEvent{} + } + return connectionEvent{ + mgChannel: read(event, "group_id", ""), + mgThing: ReadStringSlice(event, "member_ids"), } } @@ -114,6 +140,25 @@ func read(event map[string]interface{}, key, def string) string { return val } +// ReadStringSlice reads string slice from event map. +// If value is not a string slice, returns empty slice. +func ReadStringSlice(event map[string]interface{}, key string) []string { + var res []string + + vals, ok := event[key].([]interface{}) + if !ok { + return res + } + + for _, v := range vals { + if s, ok := v.(string); ok { + res = append(res, s) + } + } + + return res +} + func readTime(event map[string]interface{}, key string, def time.Time) time.Time { val, ok := event[key].(time.Time) if !ok { diff --git a/bootstrap/events/producer/events.go b/bootstrap/events/producer/events.go index ed15704a4d..15c0a05cc0 100644 --- a/bootstrap/events/producer/events.go +++ b/bootstrap/events/producer/events.go @@ -23,6 +23,7 @@ const ( thingBootstrap = thingPrefix + "bootstrap" thingStateChange = thingPrefix + "change_state" thingUpdateConnections = thingPrefix + "update_connections" + thingConnect = thingPrefix + "connect" thingDisconnect = thingPrefix + "disconnect" channelPrefix = "group." @@ -276,15 +277,28 @@ func (uche updateChannelHandlerEvent) Encode() (map[string]interface{}, error) { return val, nil } +type connectThingEvent struct { + mgThing string + mgChannel string +} + +func (cte connectThingEvent) Encode() (map[string]interface{}, error) { + return map[string]interface{}{ + "thing_id": cte.mgThing, + "channel_id": cte.mgChannel, + "operation": thingConnect, + }, nil +} + type disconnectThingEvent struct { - thingID string - channelID string + mgThing string + mgChannel string } func (dte disconnectThingEvent) Encode() (map[string]interface{}, error) { return map[string]interface{}{ - "thing_id": dte.thingID, - "channel_id": dte.channelID, + "thing_id": dte.mgThing, + "channel_id": dte.mgChannel, "operation": thingDisconnect, }, nil } diff --git a/bootstrap/events/producer/streams.go b/bootstrap/events/producer/streams.go index 5765137758..62f1c7fac8 100644 --- a/bootstrap/events/producer/streams.go +++ b/bootstrap/events/producer/streams.go @@ -207,14 +207,27 @@ func (es *eventStore) UpdateChannelHandler(ctx context.Context, channel bootstra return es.Publish(ctx, ev) } +func (es *eventStore) ConnectThingHandler(ctx context.Context, channelID, thingID string) error { + if err := es.svc.ConnectThingHandler(ctx, channelID, thingID); err != nil { + return err + } + + ev := connectThingEvent{ + mgThing: thingID, + mgChannel: channelID, + } + + return es.Publish(ctx, ev) +} + func (es *eventStore) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error { if err := es.svc.DisconnectThingHandler(ctx, channelID, thingID); err != nil { return err } ev := disconnectThingEvent{ - thingID, - channelID, + mgThing: thingID, + mgChannel: channelID, } return es.Publish(ctx, ev) diff --git a/bootstrap/events/producer/streams_test.go b/bootstrap/events/producer/streams_test.go index 175c79d365..a97d1010bb 100644 --- a/bootstrap/events/producer/streams_test.go +++ b/bootstrap/events/producer/streams_test.go @@ -48,6 +48,7 @@ const ( thingBootstrap = thingPrefix + "bootstrap" thingStateChange = thingPrefix + "change_state" thingUpdateConnections = thingPrefix + "update_connections" + thingConnect = thingPrefix + "connect" thingDisconnect = thingPrefix + "disconnect" channelPrefix = "group." @@ -1039,6 +1040,87 @@ func TestRemoveConfigHandler(t *testing.T) { } } +func TestConnectThingHandler(t *testing.T) { + err := redisClient.FlushAll(context.Background()).Err() + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + svc, boot, _, _ := newService(t, redisURL) + + err = redisClient.FlushAll(context.Background()).Err() + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + + cases := []struct { + desc string + channelID string + thingID string + err error + event map[string]interface{} + }{ + { + desc: "connect thing handler successfully", + channelID: channel.ID, + thingID: "1", + err: nil, + event: map[string]interface{}{ + "channel_id": channel.ID, + "thing_id": "1", + "operation": thingConnect, + "timestamp": time.Now().UnixNano(), + "occurred_at": time.Now().UnixNano(), + }, + }, + { + desc: "add non-existing channel handler", + channelID: "unknown", + err: nil, + event: nil, + }, + { + desc: "add channel handler with empty ID", + channelID: "", + err: nil, + event: nil, + }, + { + desc: "add channel handler successfully", + channelID: channel.ID, + thingID: "1", + err: nil, + event: map[string]interface{}{ + "channel_id": channel.ID, + "thing_id": "1", + "operation": thingConnect, + "timestamp": time.Now().UnixNano(), + "occurred_at": time.Now().UnixNano(), + }, + }, + } + + lastID := "0" + for _, tc := range cases { + svcCall := boot.On("ConnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err) + err := svc.ConnectThingHandler(context.Background(), tc.channelID, tc.thingID) + assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + + streams := redisClient.XRead(context.Background(), &redis.XReadArgs{ + Streams: []string{streamID, lastID}, + Count: 1, + Block: time.Second, + }).Val() + + var event map[string]interface{} + if len(streams) > 0 && len(streams[0].Messages) > 0 { + msg := streams[0].Messages[0] + event = msg.Values + event["timestamp"] = msg.ID + lastID = msg.ID + } + + test(t, tc.event, event, tc.desc) + svcCall.Unset() + } +} + func TestDisconnectThingHandler(t *testing.T) { err := redisClient.FlushAll(context.Background()).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) @@ -1097,7 +1179,7 @@ func TestDisconnectThingHandler(t *testing.T) { lastID := "0" for _, tc := range cases { - repoCall := boot.On("DisconnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err) + svcCall := boot.On("DisconnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err) err := svc.DisconnectThingHandler(context.Background(), tc.channelID, tc.thingID) assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) @@ -1116,7 +1198,7 @@ func TestDisconnectThingHandler(t *testing.T) { } test(t, tc.event, event, tc.desc) - repoCall.Unset() + svcCall.Unset() } } diff --git a/bootstrap/mocks/configs.go b/bootstrap/mocks/configs.go index 6345bff34e..478589c8e4 100644 --- a/bootstrap/mocks/configs.go +++ b/bootstrap/mocks/configs.go @@ -35,9 +35,27 @@ func (_m *ConfigRepository) ChangeState(ctx context.Context, owner string, id st return r0 } -// DisconnectThing provides a mock function with given fields: ctx, channelID, thingID -func (_m *ConfigRepository) DisconnectThing(ctx context.Context, channelID string, thingID string) error { - ret := _m.Called(ctx, channelID, thingID) +// ConnectThing provides a mock function with given fields: ctx, mgChannel, mgThing +func (_m *ConfigRepository) ConnectThing(ctx context.Context, mgChannel string, mgThing string) error { + ret := _m.Called(ctx, mgChannel, mgThing) + + if len(ret) == 0 { + panic("no return value specified for ConnectThing") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, mgChannel, mgThing) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DisconnectThing provides a mock function with given fields: ctx, mgChannel, mgThing +func (_m *ConfigRepository) DisconnectThing(ctx context.Context, mgChannel string, mgThing string) error { + ret := _m.Called(ctx, mgChannel, mgThing) if len(ret) == 0 { panic("no return value specified for DisconnectThing") @@ -45,7 +63,7 @@ func (_m *ConfigRepository) DisconnectThing(ctx context.Context, channelID strin var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, channelID, thingID) + r0 = rf(ctx, mgChannel, mgThing) } else { r0 = ret.Error(0) } diff --git a/bootstrap/mocks/service.go b/bootstrap/mocks/service.go index d312b15f1e..0c7a6b7ddc 100644 --- a/bootstrap/mocks/service.go +++ b/bootstrap/mocks/service.go @@ -91,9 +91,27 @@ func (_m *Service) ChangeState(ctx context.Context, token string, id string, sta return r0 } -// DisconnectThingHandler provides a mock function with given fields: ctx, channelID, thingID -func (_m *Service) DisconnectThingHandler(ctx context.Context, channelID string, thingID string) error { - ret := _m.Called(ctx, channelID, thingID) +// ConnectThingHandler provides a mock function with given fields: ctx, mgChannel, mgThing +func (_m *Service) ConnectThingHandler(ctx context.Context, mgChannel string, mgThing string) error { + ret := _m.Called(ctx, mgChannel, mgThing) + + if len(ret) == 0 { + panic("no return value specified for ConnectThingHandler") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, mgChannel, mgThing) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DisconnectThingHandler provides a mock function with given fields: ctx, mgChannel, mgThing +func (_m *Service) DisconnectThingHandler(ctx context.Context, mgChannel string, mgThing string) error { + ret := _m.Called(ctx, mgChannel, mgThing) if len(ret) == 0 { panic("no return value specified for DisconnectThingHandler") @@ -101,7 +119,7 @@ func (_m *Service) DisconnectThingHandler(ctx context.Context, channelID string, var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, channelID, thingID) + r0 = rf(ctx, mgChannel, mgThing) } else { r0 = ret.Error(0) } diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index 998f2a28aa..0a1e2f68cf 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -28,6 +28,7 @@ var ( errSaveConnections = errors.New("failed to insert connections to database") errUpdateChannels = errors.New("failed to update channels in bootstrap configuration database") errRemoveChannels = errors.New("failed to remove channels from bootstrap configuration in database") + errConnectThing = errors.New("failed to connect thing in bootstrap configuration in database") errDisconnectThing = errors.New("failed to disconnect thing in bootstrap configuration in database") ) @@ -59,27 +60,33 @@ func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsCo dbcfg := toDBConfig(cfg) if _, err := tx.NamedExec(q, dbcfg); err != nil { - e := err if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation { - e = repoerr.ErrConflict + err = repoerr.ErrConflict } - cr.rollback("Failed to insert a Config", tx) - return "", errors.Wrap(repoerr.ErrCreateEntity, e) + if errRollback := cr.rollback("failed to insert a Config", tx); errRollback != nil { + return "", errors.Wrap(err, errRollback) + } } if err := insertChannels(ctx, cfg.Owner, cfg.Channels, tx); err != nil { - cr.rollback("Failed to insert Channels", tx) + if errRollback := cr.rollback("failed to insert Channels", tx); errRollback != nil { + return "", errors.Wrap(err, errRollback) + } return "", errors.Wrap(errSaveChannels, err) } if err := insertConnections(ctx, cfg, chsConnIDs, tx); err != nil { - cr.rollback("Failed to insert connections", tx) + if errRollback := cr.rollback("failed to insert connections", tx); errRollback != nil { + return "", errors.Wrap(err, errRollback) + } return "", errors.Wrap(errSaveConnections, err) } if err := tx.Commit(); err != nil { - cr.rollback("Failed to commit Config save", tx) + if errRollback := cr.rollback("failed to commit Config save", tx); errRollback != nil { + return "", errors.Wrap(err, errRollback) + } return "", err } @@ -314,7 +321,9 @@ func (cr configRepository) UpdateConnections(ctx context.Context, owner, id stri } if err := insertChannels(ctx, owner, channels, tx); err != nil { - cr.rollback("Failed to insert Channels during the update", tx) + if rollbackErr := cr.rollback("failed to insert Channels during the update", tx); rollbackErr != nil { + return err + } return errors.Wrap(repoerr.ErrUpdateEntity, err) } @@ -324,12 +333,16 @@ func (cr configRepository) UpdateConnections(ctx context.Context, owner, id stri return repoerr.ErrNotFound } } - cr.rollback("Failed to update connections during the update", tx) + if errRollback := cr.rollback("failed to update connections during the update", tx); errRollback != nil { + return errors.Wrap(err, errRollback) + } return errors.Wrap(repoerr.ErrUpdateEntity, err) } if err := tx.Commit(); err != nil { - cr.rollback("Failed to commit Config update", tx) + if errRollback := cr.rollback("failed to commit Config update", tx); errRollback != nil { + return errors.Wrap(err, errRollback) + } return errors.Wrap(repoerr.ErrUpdateEntity, err) } @@ -451,10 +464,19 @@ func (cr configRepository) RemoveChannel(ctx context.Context, id string) error { return nil } -func (cr configRepository) DisconnectThing(ctx context.Context, channelID, thingID string) error { +func (cr configRepository) ConnectThing(ctx context.Context, mgChannel, mgThing string) error { + q := `UPDATE configs SET state = $1 WHERE EXISTS ( + SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` + if _, err := cr.db.ExecContext(ctx, q, bootstrap.Active, mgThing, mgChannel); err != nil { + return errors.Wrap(errConnectThing, err) + } + return nil +} + +func (cr configRepository) DisconnectThing(ctx context.Context, mgChannel, mgThing string) error { q := `UPDATE configs SET state = $1 WHERE EXISTS ( SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` - if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID); err != nil { + if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, mgThing, mgChannel); err != nil { return errors.Wrap(errDisconnectThing, err) } return nil @@ -483,10 +505,13 @@ func (cr configRepository) retrieveAll(owner string, filter bootstrap.Filter) (s return fmt.Sprintf(template, f), params } -func (cr configRepository) rollback(content string, tx *sqlx.Tx) { +func (cr configRepository) rollback(content string, tx *sqlx.Tx) error { + errMsg := errors.New(content) if err := tx.Rollback(); err != nil { - cr.log.Error(fmt.Sprintf("Failed to rollback due to %s", err)) + errRollback := errors.New("failed to rollback") + return errors.Wrap(errMsg, errors.Wrap(errRollback, err)) } + return errMsg } func insertChannels(_ context.Context, owner string, channels []bootstrap.Channel, tx *sqlx.Tx) error { diff --git a/bootstrap/postgres/configs_test.go b/bootstrap/postgres/configs_test.go index 81e3d85256..60ee50389a 100644 --- a/bootstrap/postgres/configs_test.go +++ b/bootstrap/postgres/configs_test.go @@ -675,6 +675,31 @@ func TestRemoveChannel(t *testing.T) { assert.NotContains(t, cfg.Channels, c.Channels[0], fmt.Sprintf("expected to remove channel %s from %s", c.Channels[0], cfg.Channels)) } +func TestConnectThing(t *testing.T) { + repo := postgres.NewConfigRepository(db, testLog) + err := deleteChannels(context.Background(), repo) + require.Nil(t, err, "Channels cleanup expected to succeed.") + + c := config + // Use UUID to prevent conflicts. + uid, err := uuid.NewV4() + assert.Nil(t, err, fmt.Sprintf("Got unexpected error: %s.\n", err)) + c.ThingKey = uid.String() + c.ThingID = uid.String() + c.ExternalID = uid.String() + c.ExternalKey = uid.String() + saved, err := repo.Save(context.Background(), c, channels) + assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) + + err = repo.ConnectThing(context.Background(), c.Channels[0].ID, saved) + fmt.Print("state", config.State) + assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) + + cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.ThingID) + assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) + assert.Equal(t, cfg.State, bootstrap.Active, fmt.Sprintf("expected to be active when a connection is added from %s", cfg)) +} + func TestDisconnectThing(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) err := deleteChannels(context.Background(), repo) @@ -696,7 +721,7 @@ func TestDisconnectThing(t *testing.T) { cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.ThingID) assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) - assert.Equal(t, cfg.State, bootstrap.Inactive, fmt.Sprintf("expected ti be inactive when a connection is removed from %s", cfg)) + assert.Equal(t, cfg.State, bootstrap.Inactive, fmt.Sprintf("expected to be inactive when a connection is removed from %s", cfg)) } func deleteChannels(ctx context.Context, repo bootstrap.ConfigRepository) error { diff --git a/bootstrap/service.go b/bootstrap/service.go index 1214e944c4..7fd09ff8e6 100644 --- a/bootstrap/service.go +++ b/bootstrap/service.go @@ -41,6 +41,7 @@ var ( errRemoveConfig = errors.New("failed to remove bootstrap configuration") errRemoveChannel = errors.New("failed to remove channel") errCreateThing = errors.New("failed to create thing") + errConnectThing = errors.New("failed to connect thing") errDisconnectThing = errors.New("failed to disconnect thing") errCheckChannels = errors.New("failed to check if channels exists") errConnectionChannels = errors.New("failed to check channels connections") @@ -96,8 +97,11 @@ type Service interface { // RemoveChannelHandler removes Channel with id received from an event. RemoveChannelHandler(ctx context.Context, id string) error - // DisconnectHandler changes state of the Config when connect/disconnect event occurs. - DisconnectThingHandler(ctx context.Context, channelID, thingID string) error + // ConnectHandler changes state of the Config to active when connect event occurs. + ConnectThingHandler(ctx context.Context, mgChannel, mgThing string) error + + // DisconnectHandler changes state of the Config to inactive when disconnect event occurs. + DisconnectThingHandler(ctx context.Context, mgChannel, mgThing string) error } // ConfigReader is used to parse Config into format which will be encoded @@ -373,8 +377,15 @@ func (bs bootstrapService) RemoveChannelHandler(ctx context.Context, id string) return nil } -func (bs bootstrapService) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error { - if err := bs.configs.DisconnectThing(ctx, channelID, thingID); err != nil { +func (bs bootstrapService) ConnectThingHandler(ctx context.Context, mgChannel, mgThing string) error { + if err := bs.configs.ConnectThing(ctx, mgChannel, mgThing); err != nil { + return errors.Wrap(errConnectThing, err) + } + return nil +} + +func (bs bootstrapService) DisconnectThingHandler(ctx context.Context, mgChannel, mgThing string) error { + if err := bs.configs.DisconnectThing(ctx, mgChannel, mgThing); err != nil { return errors.Wrap(errDisconnectThing, err) } return nil diff --git a/bootstrap/service_test.go b/bootstrap/service_test.go index fe8aa290d1..28b93bd003 100644 --- a/bootstrap/service_test.go +++ b/bootstrap/service_test.go @@ -909,6 +909,49 @@ func TestRemoveCoinfigHandler(t *testing.T) { } } +func TestConnectThingsHandler(t *testing.T) { + svc, boot, auth, sdk := newService() + repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) + repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil) + repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil) + repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil) + repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil) + saved, err := svc.Add(context.Background(), validToken, config) + assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) + repoCall.Unset() + repoCall1.Unset() + repoCall2.Unset() + repoCall3.Unset() + repoCall4.Unset() + + cases := []struct { + desc string + thingID string + channelID string + err error + }{ + { + desc: "connect", + channelID: channel.ID, + thingID: saved.ThingID, + err: nil, + }, + { + desc: "connect connected", + channelID: channel.ID, + thingID: saved.ThingID, + err: nil, + }, + } + + for _, tc := range cases { + repoCall := boot.On("ConnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err) + err := svc.ConnectThingHandler(context.Background(), tc.channelID, tc.thingID) + assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + repoCall.Unset() + } +} + func TestDisconnectThingsHandler(t *testing.T) { svc, boot, auth, sdk := newService() repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil) diff --git a/bootstrap/tracing/tracing.go b/bootstrap/tracing/tracing.go index 6d867aa39e..852a3f088b 100644 --- a/bootstrap/tracing/tracing.go +++ b/bootstrap/tracing/tracing.go @@ -158,6 +158,17 @@ func (tm *tracingMiddleware) RemoveChannelHandler(ctx context.Context, id string return tm.svc.RemoveChannelHandler(ctx, id) } +// ConnectThingHandler traces the "ConnectThingHandler" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) error { + ctx, span := tm.tracer.Start(ctx, "svc_connect_thing_handler", trace.WithAttributes( + attribute.String("channel_id", channelID), + attribute.String("thing_id", thingID), + )) + defer span.End() + + return tm.svc.ConnectThingHandler(ctx, channelID, thingID) +} + // DisconnectThingHandler traces the "DisconnectThingHandler" operation of the wrapped bootstrap.Service. func (tm *tracingMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error { ctx, span := tm.tracer.Start(ctx, "svc_disconnect_thing_handler", trace.WithAttributes( diff --git a/internal/groups/events/events.go b/internal/groups/events/events.go index d15a2a1caa..f35153c5c1 100644 --- a/internal/groups/events/events.go +++ b/internal/groups/events/events.go @@ -39,14 +39,18 @@ var ( ) type assignEvent struct { - memberIDs []string - groupID string + memberIDs []string + relation string + memberKind string + groupID string } func (cge assignEvent) Encode() (map[string]interface{}, error) { val := map[string]interface{}{ "operation": groupAssign, "member_ids": cge.memberIDs, + "relation": cge.relation, + "memberKind": cge.memberKind, "group_id": cge.groupID, } @@ -54,14 +58,18 @@ func (cge assignEvent) Encode() (map[string]interface{}, error) { } type unassignEvent struct { - memberIDs []string - groupID string + memberIDs []string + relation string + memberKind string + groupID string } func (cge unassignEvent) Encode() (map[string]interface{}, error) { val := map[string]interface{}{ "operation": groupUnassign, "member_ids": cge.memberIDs, + "relation": cge.relation, + "memberKind": cge.memberKind, "group_id": cge.groupID, } diff --git a/internal/groups/events/streams.go b/internal/groups/events/streams.go index 10610d6686..9fa5fede07 100644 --- a/internal/groups/events/streams.go +++ b/internal/groups/events/streams.go @@ -145,8 +145,10 @@ func (es eventStore) Assign(ctx context.Context, token, groupID, relation, membe } event := assignEvent{ - groupID: groupID, - memberIDs: memberIDs, + groupID: groupID, + relation: relation, + memberKind: memberKind, + memberIDs: memberIDs, } if err := es.Publish(ctx, event); err != nil { @@ -162,8 +164,10 @@ func (es eventStore) Unassign(ctx context.Context, token, groupID, relation, mem } event := unassignEvent{ - groupID: groupID, - memberIDs: memberIDs, + groupID: groupID, + relation: relation, + memberKind: memberKind, + memberIDs: memberIDs, } if err := es.Publish(ctx, event); err != nil {