From 0b4ef055fe626c31d479040006d66a7a3596fc73 Mon Sep 17 00:00:00 2001 From: Wil Simpson Date: Wed, 11 Dec 2024 16:52:47 -0500 Subject: [PATCH] feat: seperated reset to allow for more generic handlign --- cmd/go-common-service/main.go | 9 +++++++++ pkg/bus/bus.go | 6 +++++- pkg/bus/processor.go | 5 +++-- pkg/srv/bus.go | 18 +++++++----------- 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/cmd/go-common-service/main.go b/cmd/go-common-service/main.go index 43209f5..927f33f 100644 --- a/cmd/go-common-service/main.go +++ b/cmd/go-common-service/main.go @@ -40,6 +40,15 @@ func main() { readBusses = append(readBusses, bus.NewKafkaMessageBusReader([]config.ServerAddress{{Host: "localhost", Port: "29092"}}, cg2, msg)) readBusses = append(readBusses, bus.NewKafkaMessageBusReader([]config.ServerAddress{{Host: "localhost", Port: "29092"}}, cg2, msg)) + var svc characterbus.Service + _ = svc + svc = characterbus.NewService(nil, readBusses[0]) + + var checker bus.MessageBusReader[bus.BusMessage[any]] + _ = checker + checker = bus.NewKafkaMessageBusReader([]config.ServerAddress{{Host: "localhost", Port: "29092"}}, cg1, msg) + // checker = svc.GetReader() + for _, b := range readBusses { go func() { failCount := 0 diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 28dd439..018dd30 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -14,10 +14,14 @@ type BusModelMessage[T any] interface { WasDeleted() bool } +type Resettable interface { + Reset(context.Context) error +} + type MessageBusReader[T BusMessage[any]] interface { + Resettable GetMessageType() BusMessageType GetGroup() string - Reset(context.Context) error FetchMessage(context.Context) (*T, error) ProcessSucceeded(context.Context) error ProcessFailed() error diff --git a/pkg/bus/processor.go b/pkg/bus/processor.go index e2e3f22..7204643 100644 --- a/pkg/bus/processor.go +++ b/pkg/bus/processor.go @@ -13,7 +13,7 @@ type BusProcessor[T BusMessage[any]] interface { StartProcessing(ctx context.Context) StopProcessing() IsProcessing() bool - GetReader() MessageBusReader[T] + GetResetter() Resettable } var ( @@ -30,7 +30,8 @@ type DefaultBusProcessor[T BusModelMessage[any]] struct { isProcessing bool } -func (bp *DefaultBusProcessor[T]) GetReader() MessageBusReader[T] { +// GetResetter implements BusProcessor. +func (bp *DefaultBusProcessor[T]) GetResetter() Resettable { return bp.Reader } diff --git a/pkg/srv/bus.go b/pkg/srv/bus.go index e10594e..cff9443 100644 --- a/pkg/srv/bus.go +++ b/pkg/srv/bus.go @@ -20,7 +20,7 @@ type busService struct { pb.UnimplementedBusServiceServer ctx Context - readers map[bus.BusMessageType]bus.MessageBusReader[bus.BusMessage[any]] + readerResetters map[bus.BusMessageType]bus.Resettable writerCallbacks map[bus.BusMessageType]WriterResetCallback } @@ -42,7 +42,7 @@ func (b *busService) ResetReaderBus(ctx context.Context, request *pb.BusTarget) // Reset all buses var err error builder := strings.Builder{} - for name, reader := range b.readers { + for name, reader := range b.readerResetters { err = errors.Join(err, reader.Reset(ctx)) builder.WriteString(string(name)) builder.WriteString(", ") @@ -53,12 +53,12 @@ func (b *busService) ResetReaderBus(ctx context.Context, request *pb.BusTarget) str := builder.String() return &pb.ResetBusResponse{ - Message: fmt.Sprintf("Reset %d buses: %s", len(b.readers), str[:len(str)-2]), + Message: fmt.Sprintf("Reset %d buses: %s", len(b.readerResetters), str[:len(str)-2]), }, nil } // Reset a specific bus - bus, ok := b.readers[bus.BusMessageType(request.GetType())] + bus, ok := b.readerResetters[bus.BusMessageType(request.GetType())] if !ok { return nil, status.Errorf(codes.NotFound, ErrBusNotFound.Error()) } @@ -69,7 +69,7 @@ func (b *busService) ResetReaderBus(ctx context.Context, request *pb.BusTarget) } return &pb.ResetBusResponse{ - Message: fmt.Sprintf("Reset 1 bus: %s", bus.GetMessageType()), + Message: fmt.Sprintf("Reset 1 bus: %s", request.GetType()), }, nil } @@ -111,7 +111,7 @@ func (b *busService) ResetWriterBus(ctx context.Context, request *pb.BusTarget) func NewBusServiceServer( ctx context.Context, srvCtx Context, - readers []bus.MessageBusReader[bus.BusMessage[any]], + readerResetters map[bus.BusMessageType]bus.Resettable, writerCallbacks map[bus.BusMessageType]WriterResetCallback, ) (*busService, error) { err := srvCtx.CreateRoles(ctx, &BusRoles) @@ -122,11 +122,7 @@ func NewBusServiceServer( service := &busService{ ctx: srvCtx, writerCallbacks: writerCallbacks, - readers: make(map[bus.BusMessageType]bus.MessageBusReader[bus.BusMessage[any]], len(readers)), - } - - for _, reader := range readers { - service.readers[reader.GetMessageType()] = reader + readerResetters: readerResetters, } return service, nil