Skip to content

Commit

Permalink
Refactor: revert to thingID and channelID
Browse files Browse the repository at this point in the history
Signed-off-by: JeffMboya <[email protected]>

Refactor: revert to thingID and channelID

Signed-off-by: JeffMboya <[email protected]>
  • Loading branch information
JeffMboya committed May 20, 2024
1 parent 001b8af commit 3f8243e
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 34 deletions.
4 changes: 2 additions & 2 deletions bootstrap/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ type ConfigRepository interface {

// ConnectHandler changes state of the Config when the corresponding Thing is
// connected to the Channel.
ConnectThing(ctx context.Context, mgChannel, mgThing string) error
ConnectThing(ctx context.Context, channelID, thingID string) error

// DisconnectHandler changes state of the Config when the corresponding Thing is
// disconnected from the Channel.
DisconnectThing(ctx context.Context, mgChannel, mgThing string) error
DisconnectThing(ctx context.Context, channelID, thingID string) error
}
4 changes: 2 additions & 2 deletions bootstrap/events/consumer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ type updateChannelEvent struct {

// Connection event is either connect or disconnect event.
type connectionEvent struct {
mgThing []string
mgChannel string
thingID []string
channelID string
}
16 changes: 8 additions & 8 deletions bootstrap/events/consumer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
err = es.svc.RemoveConfigHandler(ctx, rte.id)
case thingConnect:
cte := decodeConnectThing(msg)
for _, mgThing := range cte.mgThing {
if err = es.svc.ConnectThingHandler(ctx, cte.mgChannel, mgThing); err != nil {
for _, thingID := range cte.thingID {
if err = es.svc.ConnectThingHandler(ctx, cte.channelID, thingID); err != nil {
return err
}
}
case thingDisconnect:
dte := decodeDisconnectThing(msg)
for _, mgThing := range dte.mgThing {
if err = es.svc.DisconnectThingHandler(ctx, dte.mgChannel, mgThing); err != nil {
for _, thingID := range dte.thingID {
if err = es.svc.DisconnectThingHandler(ctx, dte.channelID, thingID); err != nil {
return err
}
}
Expand Down Expand Up @@ -105,8 +105,8 @@ func decodeConnectThing(event map[string]interface{}) connectionEvent {
}

return connectionEvent{
mgChannel: read(event, "group_id", ""),
mgThing: ReadStringSlice(event, "member_ids"),
channelID: read(event, "group_id", ""),
thingID: ReadStringSlice(event, "member_ids"),
}
}

Expand All @@ -115,8 +115,8 @@ func decodeDisconnectThing(event map[string]interface{}) connectionEvent {
return connectionEvent{}
}
return connectionEvent{
mgChannel: read(event, "group_id", ""),
mgThing: ReadStringSlice(event, "member_ids"),
channelID: read(event, "group_id", ""),
thingID: ReadStringSlice(event, "member_ids"),
}
}

Expand Down
16 changes: 8 additions & 8 deletions bootstrap/events/producer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,27 +278,27 @@ func (uche updateChannelHandlerEvent) Encode() (map[string]interface{}, error) {
}

type connectThingEvent struct {
mgThing string
mgChannel string
thingID string
channelID string
}

func (cte connectThingEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"thing_id": cte.mgThing,
"channel_id": cte.mgChannel,
"thing_id": cte.thingID,
"channel_id": cte.channelID,
"operation": thingConnect,
}, nil
}

type disconnectThingEvent struct {
mgThing string
mgChannel string
thingID string
channelID string
}

func (dte disconnectThingEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"thing_id": dte.mgThing,
"channel_id": dte.mgChannel,
"thing_id": dte.thingID,
"channel_id": dte.channelID,
"operation": thingDisconnect,
}, nil
}
8 changes: 4 additions & 4 deletions bootstrap/events/producer/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ func (es *eventStore) ConnectThingHandler(ctx context.Context, channelID, thingI
}

ev := connectThingEvent{
mgThing: thingID,
mgChannel: channelID,
thingID: thingID,
channelID: channelID,
}

return es.Publish(ctx, ev)
Expand All @@ -226,8 +226,8 @@ func (es *eventStore) DisconnectThingHandler(ctx context.Context, channelID, thi
}

ev := disconnectThingEvent{
mgThing: thingID,
mgChannel: channelID,
thingID: thingID,
channelID: channelID,
}

return es.Publish(ctx, ev)
Expand Down
8 changes: 4 additions & 4 deletions bootstrap/postgres/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,19 +464,19 @@ func (cr configRepository) RemoveChannel(ctx context.Context, id string) error {
return nil
}

func (cr configRepository) ConnectThing(ctx context.Context, mgChannel, mgThing string) error {
func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID string) error {
q := `UPDATE configs SET state = $1 WHERE EXISTS (
SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)`
if _, err := cr.db.ExecContext(ctx, q, bootstrap.Active, mgThing, mgChannel); err != nil {
if _, err := cr.db.ExecContext(ctx, q, bootstrap.Active, thingID, channelID); err != nil {
return errors.Wrap(errConnectThing, err)
}
return nil
}

func (cr configRepository) DisconnectThing(ctx context.Context, mgChannel, mgThing string) error {
func (cr configRepository) DisconnectThing(ctx context.Context, channelID, thingID string) error {
q := `UPDATE configs SET state = $1 WHERE EXISTS (
SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)`
if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, mgThing, mgChannel); err != nil {
if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID); err != nil {
return errors.Wrap(errDisconnectThing, err)
}
return nil
Expand Down
12 changes: 6 additions & 6 deletions bootstrap/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ type Service interface {
RemoveChannelHandler(ctx context.Context, id string) error

// ConnectHandler changes state of the Config to active when connect event occurs.
ConnectThingHandler(ctx context.Context, mgChannel, mgThing string) error
ConnectThingHandler(ctx context.Context, channelID, ThingID string) error

// DisconnectHandler changes state of the Config to inactive when disconnect event occurs.
DisconnectThingHandler(ctx context.Context, mgChannel, mgThing string) error
DisconnectThingHandler(ctx context.Context, channelID, ThingID string) error
}

// ConfigReader is used to parse Config into format which will be encoded
Expand Down Expand Up @@ -377,15 +377,15 @@ func (bs bootstrapService) RemoveChannelHandler(ctx context.Context, id string)
return nil
}

func (bs bootstrapService) ConnectThingHandler(ctx context.Context, mgChannel, mgThing string) error {
if err := bs.configs.ConnectThing(ctx, mgChannel, mgThing); err != nil {
func (bs bootstrapService) ConnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := bs.configs.ConnectThing(ctx, channelID, thingID); err != nil {
return errors.Wrap(errConnectThing, err)
}
return nil
}

func (bs bootstrapService) DisconnectThingHandler(ctx context.Context, mgChannel, mgThing string) error {
if err := bs.configs.DisconnectThing(ctx, mgChannel, mgThing); err != nil {
func (bs bootstrapService) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := bs.configs.DisconnectThing(ctx, channelID, thingID); err != nil {
return errors.Wrap(errDisconnectThing, err)
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ require (
github.com/goccy/go-json v0.10.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
Expand Down Expand Up @@ -187,4 +188,5 @@ require (
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
mvdan.cc/gofumpt v0.6.0 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -826,3 +826,5 @@ gotest.tools/v3 v3.3.0/go.mod h1:Mcr9QNxkg0uMvy/YElmo4SpXgJKWgQvYrT7Kw5RzJ1A=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
mvdan.cc/gofumpt v0.6.0 h1:G3QvahNDmpD+Aek/bNOLrFR2XC6ZAdo62dZu65gmwGo=
mvdan.cc/gofumpt v0.6.0/go.mod h1:4L0wf+kgIPZtcCWXynNS2e6bhmj73umwnuXSZarixzA=

0 comments on commit 3f8243e

Please sign in to comment.