Skip to content

Commit

Permalink
feat: updated mapbus and implemented processor to include getting the…
Browse files Browse the repository at this point in the history
… underlying reader
  • Loading branch information
Wil Simpson committed Dec 11, 2024
1 parent ca6aa99 commit 4f9eea1
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 102 deletions.
6 changes: 3 additions & 3 deletions pkg/bus/character/characterbus/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/ShatteredRealms/go-common-service/pkg/bus"
)

type Service interface {
bus.BusProcessor
type Service[T bus.BusMessage[any]] interface {
bus.BusProcessor[T]
GetCharacters(ctx context.Context) (*Characters, error)
GetCharacterById(ctx context.Context, characterId string) (*Character, error)
DoesOwnCharacter(ctx context.Context, characterId, ownerId string) (bool, error)
Expand All @@ -20,7 +20,7 @@ type service struct {
func NewService(
repo Repository,
characterBus bus.MessageBusReader[Message],
) Service {
) Service[Message] {
return &service{
DefaultBusProcessor: bus.DefaultBusProcessor[Message]{
Reader: characterBus,
Expand Down
6 changes: 3 additions & 3 deletions pkg/bus/gameserver/dimensionbus/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/ShatteredRealms/go-common-service/pkg/bus"
)

type Service interface {
bus.BusProcessor
type Service[T bus.BusMessage[any]] interface {
bus.BusProcessor[T]
GetDimensions(ctx context.Context) (*Dimensions, error)
GetDimensionById(ctx context.Context, dimensionId string) (*Dimension, error)
}
Expand All @@ -19,7 +19,7 @@ type service struct {
func NewService(
repo Repository,
dimensionBus bus.MessageBusReader[Message],
) Service {
) Service[Message] {
return &service{
DefaultBusProcessor: bus.DefaultBusProcessor[Message]{
Reader: dimensionBus,
Expand Down
4 changes: 4 additions & 0 deletions pkg/bus/gameserver/mapbus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ func (m Message) GetType() bus.BusMessageType {
func (m Message) GetId() string {
return m.Id
}

func (m Message) WasDeleted() bool {
return m.Deleted
}
10 changes: 5 additions & 5 deletions pkg/bus/gameserver/mapbus/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package mapbus

import (
"context"

"github.com/ShatteredRealms/go-common-service/pkg/bus"
)

type Repository interface {
GetMapById(ctx context.Context, dimensionId string) (*Map, error)

GetMaps(ctx context.Context) (*Maps, error)
bus.BusMessageRepository[Message]

CreateMap(ctx context.Context, dimensionId string) (*Map, error)
GetById(ctx context.Context, dimensionId string) (*Map, error)

DeleteMap(ctx context.Context, dimensionId string) (*Map, error)
GetAll(ctx context.Context) (*Maps, error)
}
38 changes: 20 additions & 18 deletions pkg/bus/gameserver/mapbus/repository_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/ShatteredRealms/go-common-service/pkg/srospan"
"go.opentelemetry.io/otel/trace"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type postgresRepository struct {
Expand All @@ -18,44 +17,47 @@ func NewPostgresRepository(db *gorm.DB) Repository {
return &postgresRepository{gormdb: db}
}

// CreateMap implements MapRepository.
func (p *postgresRepository) CreateMap(ctx context.Context, mId string) (m *Map, _ error) {
updateSpanWithMap(ctx, mId)
m.Id = mId
return m, p.db(ctx).Create(m).Error
// Save implements MapRepository.
func (p *postgresRepository) Save(ctx context.Context, msg Message) error {
m := Map{
Id: msg.Id,
}

updateSpanWithMap(ctx, m.Id)
return p.db(ctx).Save(&m).Error
}

// DeleteMap implements MapRepository.
func (p *postgresRepository) DeleteMap(ctx context.Context, mId string) (m *Map, _ error) {
updateSpanWithMap(ctx, mId)
return m, p.db(ctx).Clauses(clause.Returning{}).Delete(m, "id = ?", mId).Error
// Delete implements MapRepository.
func (p *postgresRepository) Delete(ctx context.Context, mapId string) error {
updateSpanWithMap(ctx, mapId)
return p.db(ctx).Delete(&Map{}, "id = ?", mapId).Error
}

// GetMapById implements MapRepository.
func (p *postgresRepository) GetMapById(ctx context.Context, mId string) (m *Map, _ error) {
result := p.db(ctx).Where("id = ?", mId).Find(&m)
// GetById implements MapRepository.
func (p *postgresRepository) GetById(ctx context.Context, mapId string) (m *Map, _ error) {
result := p.db(ctx).First(&m, "id = ?", mapId)
if result.Error != nil {
return nil, result.Error
}
if result.RowsAffected == 0 {
return nil, nil
}
updateSpanWithMap(ctx, mId)
updateSpanWithMap(ctx, mapId)
return m, nil
}

// GetMaps implements MapRepository.
func (p *postgresRepository) GetMaps(ctx context.Context) (maps *Maps, _ error) {
// GetAll implements MapRepository.
func (p *postgresRepository) GetAll(ctx context.Context) (maps *Maps, _ error) {
return maps, p.db(ctx).Find(maps).Error
}

func (p *postgresRepository) db(ctx context.Context) *gorm.DB {
return p.gormdb.WithContext(ctx)
}

func updateSpanWithMap(ctx context.Context, mId string) {
func updateSpanWithMap(ctx context.Context, mapId string) {
span := trace.SpanFromContext(ctx)
span.SetAttributes(
srospan.MapId(mId),
srospan.MapId(mapId),
)
}
84 changes: 13 additions & 71 deletions pkg/bus/gameserver/mapbus/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,96 +2,38 @@ package mapbus

import (
"context"
"sync"

"github.com/ShatteredRealms/go-common-service/pkg/bus"
"github.com/ShatteredRealms/go-common-service/pkg/log"
)

type Service interface {
type Service[T bus.BusMessage[any]] interface {
bus.BusProcessor[T]
GetMaps(ctx context.Context) (*Maps, error)
GetMapById(ctx context.Context, mId string) (*Map, error)
StartProcessingBus(ctx context.Context)
StopProcessingBus()
GetMapById(ctx context.Context, mapId string) (*Map, error)
}

type service struct {
repo Repository
mBus bus.MessageBusReader[Message]
isProcessing bool
concurrentErrCount int

mu sync.Mutex
bus.DefaultBusProcessor[Message]
}

func NewService(
repo Repository,
mBus bus.MessageBusReader[Message],
) Service {
mapBus bus.MessageBusReader[Message],
) Service[Message] {
return &service{
repo: repo,
mBus: mBus,
DefaultBusProcessor: bus.DefaultBusProcessor[Message]{
Reader: mapBus,
Repo: repo,
},
}
}

// StartProcessingBus implements MapService.
func (d *service) StartProcessingBus(ctx context.Context) {
d.mu.Lock()
if d.isProcessing {
d.mu.Unlock()
return
}

d.mu.Lock()
d.isProcessing = true
d.mu.Unlock()

go func() {
for d.isProcessing && d.concurrentErrCount < 5 {
msg, err := d.mBus.FetchMessage(ctx)
if err != nil {
log.Logger.WithContext(ctx).Errorf("unable to fetch map message: %v", err)
continue
}

if msg.Deleted {
_, err = d.repo.DeleteMap(ctx, msg.Id)
if err != nil {
log.Logger.WithContext(ctx).Errorf(
"unable to delete map %s: %v", msg.Id, err)
d.mBus.ProcessFailed()
d.concurrentErrCount++
} else {
d.mBus.ProcessSucceeded(ctx)
d.concurrentErrCount = 0
}
} else {
d.repo.CreateMap(ctx, msg.Id)
if err != nil {
log.Logger.WithContext(ctx).Errorf(
"unable to save map %s: %v", msg.Id, err)
d.mBus.ProcessFailed()
d.concurrentErrCount++
} else {
d.mBus.ProcessSucceeded(ctx)
d.concurrentErrCount = 0
}
}
}
}()
}

// StopProcessingBus implements MapService.
func (d *service) StopProcessingBus() {
d.isProcessing = false
}

// GetMapById implements MapService.
func (d *service) GetMapById(ctx context.Context, mId string) (*Map, error) {
return d.repo.GetMapById(ctx, mId)
func (d *service) GetMapById(ctx context.Context, mapId string) (*Map, error) {
return d.Repo.(Repository).GetById(ctx, mapId)
}

// GetMaps implements MapService.
func (d *service) GetMaps(ctx context.Context) (*Maps, error) {
return d.repo.GetMaps(ctx)
return d.Repo.(Repository).GetAll(ctx)
}
7 changes: 6 additions & 1 deletion pkg/bus/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
"github.com/ShatteredRealms/go-common-service/pkg/log"
)

type BusProcessor interface {
type BusProcessor[T BusMessage[any]] interface {
StartProcessing(ctx context.Context)
StopProcessing()
IsProcessing() bool
GetReader() MessageBusReader[T]
}

var (
Expand All @@ -29,6 +30,10 @@ type DefaultBusProcessor[T BusModelMessage[any]] struct {
isProcessing bool
}

func (bp *DefaultBusProcessor[T]) GetReader() MessageBusReader[T] {
return bp.Reader
}

func (bp *DefaultBusProcessor[T]) IsProcessing() bool {
return bp.isProcessing
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bus/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var _ = Describe("Bus processor", func() {
var (
testRepo TestBusMessageRepository
testBus TestingBus
bp bus.BusProcessor
bp bus.BusProcessor[TestBusMessage]
)
BeforeEach(func() {
testRepo = TestBusMessageRepository{}
Expand Down

0 comments on commit 4f9eea1

Please sign in to comment.