diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 664c835..28dd439 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -9,8 +9,6 @@ type BusMessage[T any] interface { GetId() string } -type MessageTransformer[T BusMessage[any]] func(any) T - type BusModelMessage[T any] interface { BusMessage[T] WasDeleted() bool @@ -29,6 +27,6 @@ type MessageBusReader[T BusMessage[any]] interface { type MessageBusWriter[T BusMessage[any]] interface { GetMessageType() BusMessageType Publish(context.Context, T) error - PublishMany(context.Context, []any, MessageTransformer[T]) error + PublishMany(context.Context, []T) error Close() error } diff --git a/pkg/bus/bus_kafka_writer.go b/pkg/bus/bus_kafka_writer.go index 0bbfe33..14991db 100644 --- a/pkg/bus/bus_kafka_writer.go +++ b/pkg/bus/bus_kafka_writer.go @@ -43,19 +43,10 @@ func (k *kafkaBusWriter[T]) Publish(ctx context.Context, msg T) error { return nil } -func (k *kafkaBusWriter[T]) PublishMany(ctx context.Context, msgs []any, transformer MessageTransformer[T]) error { - if len(msgs) == 0 { - return nil - } - +func (k *kafkaBusWriter[T]) PublishMany(ctx context.Context, msgs []T) error { k.wg.Add(1) defer k.wg.Done() - _, ok := msgs[0].(T) - if transformer == nil && !ok { - return fmt.Errorf("Expecting %T but got %T", msgs[0], msgs) - } - k.setupWriter() messages := make([]kafka.Message, len(msgs)) @@ -66,22 +57,9 @@ func (k *kafkaBusWriter[T]) PublishMany(ctx context.Context, msgs []any, transfo for chunk := range slices.Chunk(msgs, runtime.NumCPU()) { wg.Add(1) - go func(chunk []any) { + go func(chunk []T) { defer wg.Done() - var ok bool - for _, genericMsg := range chunk { - var msg T - if transformer != nil { - msg = transformer(msg) - } else { - msg = genericMsg.(T) - if !ok { - errsMu.Lock() - errs = errors.Join(errs, fmt.Errorf("Expecting %T but got %T", msg, genericMsg)) - errsMu.Unlock() - return - } - } + for _, msg := range chunk { val, err := k.encodeMessage(msg) if err != nil { errsMu.Lock()