Skip to content

Commit

Permalink
feat: added reset function for bus reader
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Simpson committed Dec 11, 2024
1 parent 17f9a4d commit 22b7bce
Show file tree
Hide file tree
Showing 17 changed files with 1,357 additions and 218 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ time=$(shell date +%s)

PROTO_DIR=$(ROOT_DIR)/api

PROTO_FILES = "$(PROTO_DIR)/health.proto" "$(PROTO_DIR)/sro/globals.proto"
PROTO_FILES = $(shell find "$(PROTO_DIR)/sro" -name '*.proto')

MOCK_INTERFACES = $(shell egrep -rl --include="*.go" "type (\w*) interface {" $(ROOT_DIR)/pkg | sed "s/.go$$//")

Expand Down
23 changes: 23 additions & 0 deletions api/sro/bus.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
syntax = "proto3";
package sro;
option go_package = "github.com/ShatteredRealms/go-common-service/pkg/pb;pb";

import "google/api/annotations.proto";
import "google/protobuf/empty.proto";

service BusService {
rpc ResetReaderBus(BusTarget) returns (ResetBusResponse) {
option (google.api.http) = {
get : "/v1/bus/reset/reader"
};
}
rpc ResetWriterBus(BusTarget) returns (ResetBusResponse) {
option (google.api.http) = {
get : "/v1/bus/reset/writer"
};
}
}

message BusTarget { string type = 1; }

message ResetBusResponse { string message = 1; }
File renamed without changes.
80 changes: 59 additions & 21 deletions cmd/go-common-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,25 @@ import (
"io"
"os"
"os/signal"
"sync"
"time"

"github.com/ShatteredRealms/go-common-service/pkg/bus"
"github.com/ShatteredRealms/go-common-service/pkg/bus/character/characterbus"
"github.com/ShatteredRealms/go-common-service/pkg/config"
"github.com/ShatteredRealms/go-common-service/pkg/log"
"github.com/go-faker/faker/v4"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/sdk/trace"
)

var (
resetBus = true
sendMessages = false
)

func main() {

log.Logger.Level = logrus.InfoLevel
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
Expand All @@ -36,55 +45,84 @@ func main() {
failCount := 0
maxFailCount := 0
for ctx.Err() == nil {
msg, err := b.FetchMessage(ctx)
inCtx := context.Background()
msg, err := b.FetchMessage(inCtx)
if err != nil {
if !errors.Is(err, io.EOF) {
log.Logger.Errorf("Error fetching message: %v", err)
log.Logger.Errorf("Error for group %s fetching message: %v", b.GetGroup(), err)
}
continue
}

if failCount < maxFailCount {
failCount++
log.Logger.Infof("Failing to process message: %v", msg)
log.Logger.Infof("Failing for group %s to process and message: %v", b.GetGroup(), msg)
err := b.ProcessFailed()
if err != nil {
log.Logger.Errorf("Failed to mark %v as failed: %v", msg, err)
log.Logger.Errorf("Failed for group %s to mark %v as failed: %v", b.GetGroup(), msg, err)
}
continue
}

failCount = 0
log.Logger.Infof("Succeeding to process message: %v", msg)
err = b.ProcessSucceeded(ctx)
log.Logger.Infof("Succeeding for group %s to process message: %v", b.GetGroup(), msg)
err = b.ProcessSucceeded(inCtx)
if err != nil {
log.Logger.Errorf("Failed to mark %v as succeeded: %v", msg, err)
log.Logger.Errorf("Failed for group %s to mark %v as succeeded: %v", b.GetGroup(), msg, err)
}
}
}()
}

// writeBus := bus.NewKafkaMessageBusWriter([]config.ServerAddress{{Host: "localhost", Port: "29092"}}, msg)
// ticker := time.NewTicker(5 * time.Second)
// tracer := tp.Tracer("main")
if resetBus {
log.Logger.Info("Resetting bus")
bus := readBusses[0]
err := bus.Reset(ctx)
if err != nil {
log.Logger.Errorf("Error resetting bus: %v", err)
}
log.Logger.Info("Resetting bus complete")
}

writeBus := bus.NewKafkaMessageBusWriter([]config.ServerAddress{{Host: "localhost", Port: "29092"}}, msg)
ticker := time.NewTicker(5 * time.Second)
tracer := tp.Tracer("main")
if !sendMessages {
ticker.Stop()
}
for {
select {
// case <-ticker.C:
// ctx, span := tracer.Start(ctx, "publish-message")
// newMsg := bus.CharacterCreatedMessage{ID: faker.UUIDHyphenated()}
//
// log.Logger.Infof("Publishing message (%s)", newMsg.GetId())
// writeBus.Publish(ctx, newMsg)
// span.End()
case <-ticker.C:
ctx, span := tracer.Start(ctx, "publish-message")
newMsg := characterbus.Message{
Id: faker.UUIDHyphenated(),
OwnerId: faker.UUIDHyphenated(),
DimensionId: faker.UUIDHyphenated(),
MapId: faker.UUIDHyphenated(),
Deleted: false,
}

log.Logger.Infof("Publishing message (%s)", newMsg.GetId())
writeBus.Publish(ctx, newMsg)
span.End()

case <-ctx.Done():
log.Logger.Info("Shut down requested by user")

wg := sync.WaitGroup{}
for _, b := range readBusses {
err := b.Close()
if err != nil {
log.Logger.Errorf("Error shutting down bus: %v", err)
}
wg.Add(1)
go func() {
defer wg.Done()
err := b.Close()
if err != nil {
log.Logger.Errorf("Error shutting down bus: %v", err)
}
log.Logger.Infof("Bus %s shut down complete", b.GetMessageType())
}()
}

wg.Wait()
log.Logger.Info("Shut down complete")
return
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@ type BusModelMessage[T any] interface {
}

type MessageBusReader[T BusMessage[any]] interface {
GetMessageType() BusMessageType
GetGroup() string
Reset(context.Context) error
FetchMessage(context.Context) (*T, error)
ProcessSucceeded(context.Context) error
ProcessFailed() error
Close() error
}

type MessageBusWriter[T BusMessage[any]] interface {
GetMessageType() BusMessageType
Publish(context.Context, T) error
Close() error
}
148 changes: 0 additions & 148 deletions pkg/bus/bus_kafka.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package bus

import (
"bytes"
"context"
"encoding/gob"
"errors"
"fmt"
"sync"

"github.com/ShatteredRealms/go-common-service/pkg/config"
"github.com/ShatteredRealms/go-common-service/pkg/log"
"github.com/segmentio/kafka-go"
)

var (
Expand All @@ -21,149 +15,7 @@ var (
type kafkaBus[T BusMessage[any]] struct {
brokers config.ServerAddresses
topic string
}

// MessageBusReader is for reading messages from the message bus synchronously.
type kafkaBusReader[T BusMessage[any]] struct {
*kafkaBus[T]
groupId string
Reader *kafka.Reader
currentMessage *kafka.Message
}

// MessageBusWriter is for writing message asynchronously to the message bus.
type kafkaBusWriter[T BusMessage[any]] struct {
*kafkaBus[T]
Writer *kafka.Writer

mu sync.Mutex
wg sync.WaitGroup
}

// Publish implements MessageBus.
func (k *kafkaBusWriter[T]) Publish(ctx context.Context, msg T) error {
k.mu.Lock()
if k.Writer == nil {
k.Writer = kafka.NewWriter(kafka.WriterConfig{
Brokers: k.brokers.Addresses(),
Topic: k.topic,
Balancer: &kafka.LeastBytes{},
Async: true,
Logger: kafka.LoggerFunc(log.Logger.Tracef),
})
k.Writer.AllowAutoTopicCreation = true
}
k.mu.Unlock()

k.wg.Add(1)
defer k.wg.Done()
var buf bytes.Buffer
err := gob.NewEncoder(&buf).Encode(msg)
if err != nil {
return fmt.Errorf("%w: %w", ErrSerializeMessage, err)
}

err = k.Writer.WriteMessages(ctx, kafka.Message{
Key: []byte(msg.GetId()),
Value: buf.Bytes(),
})
if err != nil {
return fmt.Errorf("%w: %w", ErrSendingMessage, err)
}

return nil
}

// ReceiveMessages implements MessageBus.
func (k *kafkaBusReader[T]) FetchMessage(ctx context.Context) (*T, error) {
if k.currentMessage != nil {
return nil, errors.New("message already fetched")
}
k.currentMessage = new(kafka.Message)

if k.Reader == nil {
k.Reader = kafka.NewReader(kafka.ReaderConfig{
Brokers: k.brokers.Addresses(),
Topic: k.topic,
GroupID: k.groupId,
MinBytes: 1,
MaxBytes: 10e3,
Logger: kafka.LoggerFunc(log.Logger.Tracef),
})
}
var err error
*k.currentMessage, err = k.Reader.FetchMessage(ctx)
if err != nil {
return nil, fmt.Errorf("%w: %w", ErrSerializeMessage, err)
}

var data T
dec := gob.NewDecoder(bytes.NewReader(k.currentMessage.Value))
if err := dec.Decode(&data); err != nil {
return nil, fmt.Errorf("%w: %w", ErrSerializeMessage, err)
}

return &data, nil
}

func (k *kafkaBusReader[T]) ProcessSucceeded(ctx context.Context) error {
if k.currentMessage == nil {
return errors.New("message not fetched")
}
if k.Reader != nil {
err := k.Reader.CommitMessages(ctx, *k.currentMessage)
k.currentMessage = nil
return err
}
return errors.New("reader not initialized")
}

func (k *kafkaBusReader[T]) ProcessFailed() error {
if k.Reader == nil {
return errors.New("reader not initialized")
}
if k.currentMessage == nil {
return errors.New("message not fetched")
}
err := k.Close()
k.currentMessage = nil
return err
}

func (k *kafkaBusReader[T]) Close() error {
if k.Reader != nil {
err := k.Reader.Close()
k.Reader = nil
return err
}
return nil
}

func (k *kafkaBusWriter[T]) Close() error {
k.wg.Wait()
if k.Writer != nil {
err := k.Writer.Close()
k.Writer = nil
return err
}
return nil
}

func NewKafkaMessageBusReader[T BusMessage[any]](brokers config.ServerAddresses, groupId string, msg T) MessageBusReader[T] {
return &kafkaBusReader[T]{
kafkaBus: &kafkaBus[T]{
brokers: brokers,
topic: string(msg.GetType()),
},
groupId: groupId,
}
}

func NewKafkaMessageBusWriter[T BusMessage[any]](brokers config.ServerAddresses, msg T) MessageBusWriter[T] {
return &kafkaBusWriter[T]{
kafkaBus: &kafkaBus[T]{
brokers: brokers,
topic: string(msg.GetType()),
},
}
}
Loading

0 comments on commit 22b7bce

Please sign in to comment.