diff --git a/pkg/bus/character/characterbus/service.go b/pkg/bus/character/characterbus/service.go index 9125a8e..bae45de 100644 --- a/pkg/bus/character/characterbus/service.go +++ b/pkg/bus/character/characterbus/service.go @@ -6,8 +6,8 @@ import ( "github.com/ShatteredRealms/go-common-service/pkg/bus" ) -type Service interface { - bus.BusProcessor +type Service[T bus.BusMessage[any]] interface { + bus.BusProcessor[T] GetCharacters(ctx context.Context) (*Characters, error) GetCharacterById(ctx context.Context, characterId string) (*Character, error) DoesOwnCharacter(ctx context.Context, characterId, ownerId string) (bool, error) @@ -20,7 +20,7 @@ type service struct { func NewService( repo Repository, characterBus bus.MessageBusReader[Message], -) Service { +) Service[Message] { return &service{ DefaultBusProcessor: bus.DefaultBusProcessor[Message]{ Reader: characterBus, diff --git a/pkg/bus/gameserver/dimensionbus/service.go b/pkg/bus/gameserver/dimensionbus/service.go index a5fafb7..7d99be4 100644 --- a/pkg/bus/gameserver/dimensionbus/service.go +++ b/pkg/bus/gameserver/dimensionbus/service.go @@ -6,8 +6,8 @@ import ( "github.com/ShatteredRealms/go-common-service/pkg/bus" ) -type Service interface { - bus.BusProcessor +type Service[T bus.BusMessage[any]] interface { + bus.BusProcessor[T] GetDimensions(ctx context.Context) (*Dimensions, error) GetDimensionById(ctx context.Context, dimensionId string) (*Dimension, error) } @@ -19,7 +19,7 @@ type service struct { func NewService( repo Repository, dimensionBus bus.MessageBusReader[Message], -) Service { +) Service[Message] { return &service{ DefaultBusProcessor: bus.DefaultBusProcessor[Message]{ Reader: dimensionBus, diff --git a/pkg/bus/gameserver/mapbus/message.go b/pkg/bus/gameserver/mapbus/message.go index 303d0f9..11130a5 100644 --- a/pkg/bus/gameserver/mapbus/message.go +++ b/pkg/bus/gameserver/mapbus/message.go @@ -24,3 +24,7 @@ func (m Message) GetType() bus.BusMessageType { func (m Message) GetId() string { return m.Id } + +func (m Message) WasDeleted() bool { + return m.Deleted +} diff --git a/pkg/bus/gameserver/mapbus/repository.go b/pkg/bus/gameserver/mapbus/repository.go index 7147a3c..11505a1 100644 --- a/pkg/bus/gameserver/mapbus/repository.go +++ b/pkg/bus/gameserver/mapbus/repository.go @@ -2,14 +2,14 @@ package mapbus import ( "context" + + "github.com/ShatteredRealms/go-common-service/pkg/bus" ) type Repository interface { - GetMapById(ctx context.Context, dimensionId string) (*Map, error) - - GetMaps(ctx context.Context) (*Maps, error) + bus.BusMessageRepository[Message] - CreateMap(ctx context.Context, dimensionId string) (*Map, error) + GetById(ctx context.Context, dimensionId string) (*Map, error) - DeleteMap(ctx context.Context, dimensionId string) (*Map, error) + GetAll(ctx context.Context) (*Maps, error) } diff --git a/pkg/bus/gameserver/mapbus/repository_postgres.go b/pkg/bus/gameserver/mapbus/repository_postgres.go index 16343f2..9ef96ce 100644 --- a/pkg/bus/gameserver/mapbus/repository_postgres.go +++ b/pkg/bus/gameserver/mapbus/repository_postgres.go @@ -6,7 +6,6 @@ import ( "github.com/ShatteredRealms/go-common-service/pkg/srospan" "go.opentelemetry.io/otel/trace" "gorm.io/gorm" - "gorm.io/gorm/clause" ) type postgresRepository struct { @@ -18,34 +17,37 @@ func NewPostgresRepository(db *gorm.DB) Repository { return &postgresRepository{gormdb: db} } -// CreateMap implements MapRepository. -func (p *postgresRepository) CreateMap(ctx context.Context, mId string) (m *Map, _ error) { - updateSpanWithMap(ctx, mId) - m.Id = mId - return m, p.db(ctx).Create(m).Error +// Save implements MapRepository. +func (p *postgresRepository) Save(ctx context.Context, msg Message) error { + m := Map{ + Id: msg.Id, + } + + updateSpanWithMap(ctx, m.Id) + return p.db(ctx).Save(&m).Error } -// DeleteMap implements MapRepository. -func (p *postgresRepository) DeleteMap(ctx context.Context, mId string) (m *Map, _ error) { - updateSpanWithMap(ctx, mId) - return m, p.db(ctx).Clauses(clause.Returning{}).Delete(m, "id = ?", mId).Error +// Delete implements MapRepository. +func (p *postgresRepository) Delete(ctx context.Context, mapId string) error { + updateSpanWithMap(ctx, mapId) + return p.db(ctx).Delete(&Map{}, "id = ?", mapId).Error } -// GetMapById implements MapRepository. -func (p *postgresRepository) GetMapById(ctx context.Context, mId string) (m *Map, _ error) { - result := p.db(ctx).Where("id = ?", mId).Find(&m) +// GetById implements MapRepository. +func (p *postgresRepository) GetById(ctx context.Context, mapId string) (m *Map, _ error) { + result := p.db(ctx).First(&m, "id = ?", mapId) if result.Error != nil { return nil, result.Error } if result.RowsAffected == 0 { return nil, nil } - updateSpanWithMap(ctx, mId) + updateSpanWithMap(ctx, mapId) return m, nil } -// GetMaps implements MapRepository. -func (p *postgresRepository) GetMaps(ctx context.Context) (maps *Maps, _ error) { +// GetAll implements MapRepository. +func (p *postgresRepository) GetAll(ctx context.Context) (maps *Maps, _ error) { return maps, p.db(ctx).Find(maps).Error } @@ -53,9 +55,9 @@ func (p *postgresRepository) db(ctx context.Context) *gorm.DB { return p.gormdb.WithContext(ctx) } -func updateSpanWithMap(ctx context.Context, mId string) { +func updateSpanWithMap(ctx context.Context, mapId string) { span := trace.SpanFromContext(ctx) span.SetAttributes( - srospan.MapId(mId), + srospan.MapId(mapId), ) } diff --git a/pkg/bus/gameserver/mapbus/service.go b/pkg/bus/gameserver/mapbus/service.go index 19d531f..5608d39 100644 --- a/pkg/bus/gameserver/mapbus/service.go +++ b/pkg/bus/gameserver/mapbus/service.go @@ -2,96 +2,38 @@ package mapbus import ( "context" - "sync" "github.com/ShatteredRealms/go-common-service/pkg/bus" - "github.com/ShatteredRealms/go-common-service/pkg/log" ) -type Service interface { +type Service[T bus.BusMessage[any]] interface { + bus.BusProcessor[T] GetMaps(ctx context.Context) (*Maps, error) - GetMapById(ctx context.Context, mId string) (*Map, error) - StartProcessingBus(ctx context.Context) - StopProcessingBus() + GetMapById(ctx context.Context, mapId string) (*Map, error) } type service struct { - repo Repository - mBus bus.MessageBusReader[Message] - isProcessing bool - concurrentErrCount int - - mu sync.Mutex + bus.DefaultBusProcessor[Message] } func NewService( repo Repository, - mBus bus.MessageBusReader[Message], -) Service { + mapBus bus.MessageBusReader[Message], +) Service[Message] { return &service{ - repo: repo, - mBus: mBus, + DefaultBusProcessor: bus.DefaultBusProcessor[Message]{ + Reader: mapBus, + Repo: repo, + }, } } -// StartProcessingBus implements MapService. -func (d *service) StartProcessingBus(ctx context.Context) { - d.mu.Lock() - if d.isProcessing { - d.mu.Unlock() - return - } - - d.mu.Lock() - d.isProcessing = true - d.mu.Unlock() - - go func() { - for d.isProcessing && d.concurrentErrCount < 5 { - msg, err := d.mBus.FetchMessage(ctx) - if err != nil { - log.Logger.WithContext(ctx).Errorf("unable to fetch map message: %v", err) - continue - } - - if msg.Deleted { - _, err = d.repo.DeleteMap(ctx, msg.Id) - if err != nil { - log.Logger.WithContext(ctx).Errorf( - "unable to delete map %s: %v", msg.Id, err) - d.mBus.ProcessFailed() - d.concurrentErrCount++ - } else { - d.mBus.ProcessSucceeded(ctx) - d.concurrentErrCount = 0 - } - } else { - d.repo.CreateMap(ctx, msg.Id) - if err != nil { - log.Logger.WithContext(ctx).Errorf( - "unable to save map %s: %v", msg.Id, err) - d.mBus.ProcessFailed() - d.concurrentErrCount++ - } else { - d.mBus.ProcessSucceeded(ctx) - d.concurrentErrCount = 0 - } - } - } - }() -} - -// StopProcessingBus implements MapService. -func (d *service) StopProcessingBus() { - d.isProcessing = false -} - // GetMapById implements MapService. -func (d *service) GetMapById(ctx context.Context, mId string) (*Map, error) { - return d.repo.GetMapById(ctx, mId) +func (d *service) GetMapById(ctx context.Context, mapId string) (*Map, error) { + return d.Repo.(Repository).GetById(ctx, mapId) } // GetMaps implements MapService. func (d *service) GetMaps(ctx context.Context) (*Maps, error) { - return d.repo.GetMaps(ctx) + return d.Repo.(Repository).GetAll(ctx) } diff --git a/pkg/bus/processor.go b/pkg/bus/processor.go index bb9dcd7..e2e3f22 100644 --- a/pkg/bus/processor.go +++ b/pkg/bus/processor.go @@ -9,10 +9,11 @@ import ( "github.com/ShatteredRealms/go-common-service/pkg/log" ) -type BusProcessor interface { +type BusProcessor[T BusMessage[any]] interface { StartProcessing(ctx context.Context) StopProcessing() IsProcessing() bool + GetReader() MessageBusReader[T] } var ( @@ -29,6 +30,10 @@ type DefaultBusProcessor[T BusModelMessage[any]] struct { isProcessing bool } +func (bp *DefaultBusProcessor[T]) GetReader() MessageBusReader[T] { + return bp.Reader +} + func (bp *DefaultBusProcessor[T]) IsProcessing() bool { return bp.isProcessing } diff --git a/pkg/bus/processor_test.go b/pkg/bus/processor_test.go index 6d96524..cac73f5 100644 --- a/pkg/bus/processor_test.go +++ b/pkg/bus/processor_test.go @@ -18,7 +18,7 @@ var _ = Describe("Bus processor", func() { var ( testRepo TestBusMessageRepository testBus TestingBus - bp bus.BusProcessor + bp bus.BusProcessor[TestBusMessage] ) BeforeEach(func() { testRepo = TestBusMessageRepository{}