Skip to content

Commit

Permalink
feat: seperated reset to allow for more generic handlign
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Simpson committed Dec 11, 2024
1 parent 0753b4b commit 0b4ef05
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 14 deletions.
9 changes: 9 additions & 0 deletions cmd/go-common-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ func main() {
readBusses = append(readBusses, bus.NewKafkaMessageBusReader([]config.ServerAddress{{Host: "localhost", Port: "29092"}}, cg2, msg))
readBusses = append(readBusses, bus.NewKafkaMessageBusReader([]config.ServerAddress{{Host: "localhost", Port: "29092"}}, cg2, msg))

var svc characterbus.Service
_ = svc
svc = characterbus.NewService(nil, readBusses[0])

var checker bus.MessageBusReader[bus.BusMessage[any]]
_ = checker
checker = bus.NewKafkaMessageBusReader([]config.ServerAddress{{Host: "localhost", Port: "29092"}}, cg1, msg)

Check failure on line 49 in cmd/go-common-service/main.go

View workflow job for this annotation

GitHub Actions / Test

cannot use bus.NewKafkaMessageBusReader([]config.ServerAddress{…}, cg1, msg) (value of type bus.MessageBusReader[characterbus.Message]) as bus.MessageBusReader[bus.BusMessage[any]] value in assignment: bus.MessageBusReader[characterbus.Message] does not implement bus.MessageBusReader[bus.BusMessage[any]] (wrong type for method FetchMessage)
// checker = svc.GetReader()

for _, b := range readBusses {
go func() {
failCount := 0
Expand Down
6 changes: 5 additions & 1 deletion pkg/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ type BusModelMessage[T any] interface {
WasDeleted() bool
}

type Resettable interface {
Reset(context.Context) error
}

type MessageBusReader[T BusMessage[any]] interface {
Resettable
GetMessageType() BusMessageType
GetGroup() string
Reset(context.Context) error
FetchMessage(context.Context) (*T, error)
ProcessSucceeded(context.Context) error
ProcessFailed() error
Expand Down
5 changes: 3 additions & 2 deletions pkg/bus/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type BusProcessor[T BusMessage[any]] interface {
StartProcessing(ctx context.Context)
StopProcessing()
IsProcessing() bool
GetReader() MessageBusReader[T]
GetResetter() Resettable
}

var (
Expand All @@ -30,7 +30,8 @@ type DefaultBusProcessor[T BusModelMessage[any]] struct {
isProcessing bool
}

func (bp *DefaultBusProcessor[T]) GetReader() MessageBusReader[T] {
// GetResetter implements BusProcessor.
func (bp *DefaultBusProcessor[T]) GetResetter() Resettable {
return bp.Reader
}

Expand Down
18 changes: 7 additions & 11 deletions pkg/srv/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type busService struct {
pb.UnimplementedBusServiceServer
ctx Context

readers map[bus.BusMessageType]bus.MessageBusReader[bus.BusMessage[any]]
readerResetters map[bus.BusMessageType]bus.Resettable
writerCallbacks map[bus.BusMessageType]WriterResetCallback
}

Expand All @@ -42,7 +42,7 @@ func (b *busService) ResetReaderBus(ctx context.Context, request *pb.BusTarget)
// Reset all buses
var err error
builder := strings.Builder{}
for name, reader := range b.readers {
for name, reader := range b.readerResetters {
err = errors.Join(err, reader.Reset(ctx))
builder.WriteString(string(name))
builder.WriteString(", ")
Expand All @@ -53,12 +53,12 @@ func (b *busService) ResetReaderBus(ctx context.Context, request *pb.BusTarget)

str := builder.String()
return &pb.ResetBusResponse{
Message: fmt.Sprintf("Reset %d buses: %s", len(b.readers), str[:len(str)-2]),
Message: fmt.Sprintf("Reset %d buses: %s", len(b.readerResetters), str[:len(str)-2]),
}, nil
}

// Reset a specific bus
bus, ok := b.readers[bus.BusMessageType(request.GetType())]
bus, ok := b.readerResetters[bus.BusMessageType(request.GetType())]
if !ok {
return nil, status.Errorf(codes.NotFound, ErrBusNotFound.Error())
}
Expand All @@ -69,7 +69,7 @@ func (b *busService) ResetReaderBus(ctx context.Context, request *pb.BusTarget)
}

return &pb.ResetBusResponse{
Message: fmt.Sprintf("Reset 1 bus: %s", bus.GetMessageType()),
Message: fmt.Sprintf("Reset 1 bus: %s", request.GetType()),
}, nil
}

Expand Down Expand Up @@ -111,7 +111,7 @@ func (b *busService) ResetWriterBus(ctx context.Context, request *pb.BusTarget)
func NewBusServiceServer(
ctx context.Context,
srvCtx Context,
readers []bus.MessageBusReader[bus.BusMessage[any]],
readerResetters map[bus.BusMessageType]bus.Resettable,
writerCallbacks map[bus.BusMessageType]WriterResetCallback,
) (*busService, error) {
err := srvCtx.CreateRoles(ctx, &BusRoles)
Expand All @@ -122,11 +122,7 @@ func NewBusServiceServer(
service := &busService{
ctx: srvCtx,
writerCallbacks: writerCallbacks,
readers: make(map[bus.BusMessageType]bus.MessageBusReader[bus.BusMessage[any]], len(readers)),
}

for _, reader := range readers {
service.readers[reader.GetMessageType()] = reader
readerResetters: readerResetters,
}

return service, nil
Expand Down

0 comments on commit 0b4ef05

Please sign in to comment.