From d61b35330f24b2c68984d773c9a1bfd40c783f8c Mon Sep 17 00:00:00 2001 From: Ivan Sushkov Date: Fri, 23 Feb 2024 13:12:07 +0700 Subject: [PATCH] The mediator pattern implementation --- .../seedwork/mediator/global.go | 90 +++++++ .../seedwork/mediator/global_test.go | 217 +++++++++++++++ .../seedwork/mediator/mediator.go | 4 - .../seedwork/mediator/reflection.go | 52 ++++ .../seedwork/mediator/untyped.go | 114 ++++++++ .../seedwork/mediator/untyped_test.go | 251 ++++++++++++++++++ 6 files changed, 724 insertions(+), 4 deletions(-) create mode 100644 grade/internal/infrastructure/seedwork/mediator/global.go create mode 100644 grade/internal/infrastructure/seedwork/mediator/global_test.go delete mode 100644 grade/internal/infrastructure/seedwork/mediator/mediator.go create mode 100644 grade/internal/infrastructure/seedwork/mediator/reflection.go create mode 100644 grade/internal/infrastructure/seedwork/mediator/untyped.go create mode 100644 grade/internal/infrastructure/seedwork/mediator/untyped_test.go diff --git a/grade/internal/infrastructure/seedwork/mediator/global.go b/grade/internal/infrastructure/seedwork/mediator/global.go new file mode 100644 index 00000000..379ed9be --- /dev/null +++ b/grade/internal/infrastructure/seedwork/mediator/global.go @@ -0,0 +1,90 @@ +package mediator + +import ( + "context" + "reflect" + "sync" + + "github.com/emacsway/grade/grade/internal/domain/seedwork/disposable" + "github.com/hashicorp/go-multierror" +) + +var ( + hLock = sync.RWMutex{} + sLock = sync.RWMutex{} + + handlers = map[reflect.Type]reflect.Value{} + subscribers = map[reflect.Type]map[reflect.Value]struct{}{} +) + +func Send[T any](ctx context.Context, command T) error { + hLock.RLock() + defer hLock.RUnlock() + + commandType := reflect.TypeOf(command) + if handler, found := handlers[commandType]; found { + return call(handler, ctx, command) + } + + return nil +} + +func Register[T any](command T, handler func(context.Context, T) error) disposable.Disposable { + hLock.Lock() + defer hLock.Unlock() + + commandType := reflect.TypeOf(command) + handlers[commandType] = reflect.ValueOf(handler) + + return disposable.NewDisposable(func() { + Unregister(command) + }) +} + +func Unregister[T any](command T) { + hLock.Lock() + defer hLock.Unlock() + + commandType := reflect.TypeOf(command) + delete(handlers, commandType) +} + +func Subscribe[E any](event E, handler func(context.Context, E) error) disposable.Disposable { + sLock.Lock() + defer sLock.Unlock() + + eventType := reflect.TypeOf(event) + if _, found := subscribers[eventType]; !found { + subscribers[eventType] = map[reflect.Value]struct{}{} + } + + handlerValue := reflect.ValueOf(handler) + subscribers[eventType][handlerValue] = struct{}{} + + return disposable.NewDisposable(func() { + Unsubscribe(event, handler) + }) +} + +func Unsubscribe[E any](event E, handler func(context.Context, E) error) { + sLock.Lock() + defer sLock.Unlock() + + eventType := reflect.TypeOf(event) + handlerValue := reflect.ValueOf(handler) + + delete(subscribers[eventType], handlerValue) +} + +func Publish[E any](ctx context.Context, event E) error { + sLock.RLock() + defer sLock.RUnlock() + + var errs error + eventType := reflect.TypeOf(event) + for handler := range subscribers[eventType] { + errs = multierror.Append(errs, call(handler, ctx, event)) + } + + return errs +} diff --git a/grade/internal/infrastructure/seedwork/mediator/global_test.go b/grade/internal/infrastructure/seedwork/mediator/global_test.go new file mode 100644 index 00000000..c56c9145 --- /dev/null +++ b/grade/internal/infrastructure/seedwork/mediator/global_test.go @@ -0,0 +1,217 @@ +package mediator + +import ( + "context" + "errors" + "reflect" + "testing" + + "github.com/hashicorp/go-multierror" + "github.com/stretchr/testify/assert" +) + +type eventHandler struct { + counter int +} + +func (e *eventHandler) Handle(ctx context.Context, t Event) error { + e.counter += 1 + return nil +} + +func TestGlobalMediator(t *testing.T) { + + tests := []struct { + name string + + assertion func(t *testing.T) + }{ + { + name: "test_publish", + + assertion: func(t *testing.T) { + times := 0 + handler := func(ctx context.Context, t Event) error { + times++ + return nil + } + + times2 := 0 + handler2 := func(ctx context.Context, t Event) error { + times2++ + return nil + } + + Subscribe(Event{}, handler) + Subscribe(Event{}, handler2) + + _ = Publish(context.Background(), Event{}) + assert.Equal(t, 1, times) + assert.Equal(t, 1, times2) + }, + }, + + { + name: "test_struct_subscriber", + + assertion: func(t *testing.T) { + + handler := eventHandler{} + Subscribe(Event{}, handler.Handle) + + _ = Publish(context.Background(), Event{}) + assert.Equal(t, 1, handler.counter) + }, + }, + + { + name: "test_unsubscribe", + assertion: func(t *testing.T) { + + times := 0 + handler := func(ctx context.Context, e Event) error { + times++ + + return nil + } + + times2 := 0 + handler2 := func(ctx context.Context, e Event) error { + times2++ + + return nil + } + + Subscribe(Event{}, handler) + Subscribe(Event{}, handler2) + Unsubscribe(Event{}, handler) + + _ = Publish(context.Background(), Event{}) + + assert.Equal(t, 0, times) + assert.Equal(t, 1, times2) + }, + }, + + { + name: "test_disposable_event", + assertion: func(t *testing.T) { + + times := 0 + handler := func(ctx context.Context, e Event) error { + times++ + return nil + } + + times2 := 0 + handler2 := func(ctx context.Context, e Event) error { + times2++ + return nil + } + + Subscribe(Event{}, handler2) + + disposable := Subscribe(Event{}, handler) + disposable.Dispose() + + _ = Publish(context.Background(), Event{}) + + assert.Equal(t, 0, times) + assert.Equal(t, 1, times2) + }, + }, + + { + name: "test_send", + assertion: func(t *testing.T) { + times := 0 + handler := func(ctx context.Context, e Command) error { + times++ + return nil + } + + Register(Command{}, handler) + _ = Send(context.Background(), Command{}) + + assert.Equal(t, 1, times) + }, + }, + + { + name: "test_unregister", + assertion: func(t *testing.T) { + times := 0 + handler := func(ctx context.Context, e Command) error { + times++ + return nil + } + + Register(Command{}, handler) + Unregister(Command{}) + + _ = Send(context.Background(), Command{}) + + assert.Equal(t, 0, times) + }, + }, + + { + name: "test_disposable_command", + assertion: func(t *testing.T) { + times := 0 + handler := func(ctx context.Context, e Command) error { + times++ + return nil + } + + disposable := Register[Command](Command{}, handler) + disposable.Dispose() + + _ = Send[Command](context.Background(), Command{}) + + assert.Equal(t, 0, times) + }, + }, + + { + name: "test_returning_errors", + assertion: func(t *testing.T) { + handlerError := errors.New("") + + handler := func(ctx context.Context, e Command) error { + return handlerError + } + + handler2 := func(ctx context.Context, e Event) error { + return handlerError + } + + handler3 := func(ctx context.Context, e Event) error { + return handlerError + } + + Register(Command{}, handler) + + Subscribe(Event{}, handler2) + Subscribe(Event{}, handler3) + + var errs error + errs = multierror.Append(errs, handlerError, handlerError) + assert.Equal(t, errs, Publish(context.Background(), Event{})) + + assert.Equal(t, handlerError, Send(context.Background(), Command{})) + }, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + handlers = map[reflect.Type]reflect.Value{} + subscribers = map[reflect.Type]map[reflect.Value]struct{}{} + + tt.assertion(t) + }) + } +} diff --git a/grade/internal/infrastructure/seedwork/mediator/mediator.go b/grade/internal/infrastructure/seedwork/mediator/mediator.go deleted file mode 100644 index 82c8be80..00000000 --- a/grade/internal/infrastructure/seedwork/mediator/mediator.go +++ /dev/null @@ -1,4 +0,0 @@ -package mediator - -type MediatorImp struct { -} diff --git a/grade/internal/infrastructure/seedwork/mediator/reflection.go b/grade/internal/infrastructure/seedwork/mediator/reflection.go new file mode 100644 index 00000000..a0c64c5c --- /dev/null +++ b/grade/internal/infrastructure/seedwork/mediator/reflection.go @@ -0,0 +1,52 @@ +package mediator + +import ( + "context" + "reflect" +) + +var ctxInterface = reflect.TypeOf((*context.Context)(nil)).Elem() + +// call - вызывает обработчик +func call(callable reflect.Value, args ...any) error { + in := make([]reflect.Value, 0, len(args)) + + for _, arg := range args { + in = append(in, reflect.ValueOf(arg)) + } + + result := callable.Call(in) + + if len(result) != 1 { + return nil + } + + if err, ok := result[0].Interface().(error); ok { + return err + } + + return nil +} + +// compareWithHandlerSignature - проверяет, что handler имеет правильную сигнатуру +func compareWithHandlerSignature(initiator any, handler any) error { + handlerType := reflect.TypeOf(handler) + if handlerType.Kind() != reflect.Func { + return ErrNonCallableHandler + } + + if handlerType.NumIn() < 2 { + return ErrUnsuitableHandlerSignature + } + + if !handlerType.In(0).Implements(ctxInterface) { + return ErrUnsuitableHandlerSignature + } + + initiatorType := reflect.TypeOf(initiator) + if handlerType.In(1) != initiatorType { + return ErrUnsuitableHandlerSignature + } + + return nil +} diff --git a/grade/internal/infrastructure/seedwork/mediator/untyped.go b/grade/internal/infrastructure/seedwork/mediator/untyped.go new file mode 100644 index 00000000..0192c76e --- /dev/null +++ b/grade/internal/infrastructure/seedwork/mediator/untyped.go @@ -0,0 +1,114 @@ +package mediator + +import ( + "context" + "errors" + "reflect" + "sync" + + "github.com/emacsway/grade/grade/internal/domain/seedwork/disposable" + "github.com/hashicorp/go-multierror" +) + +var ( + ErrNonCallableHandler = errors.New("passed handler not callable") + ErrUnsuitableHandlerSignature = errors.New("passed handler has unsuitable signature") +) + +type RefUntypedMediator struct { + hLock sync.RWMutex + handlers map[reflect.Type]reflect.Value + + sLock sync.RWMutex + subscribers map[reflect.Type]map[reflect.Value]struct{} +} + +func NewRefUntypedMediator() *RefUntypedMediator { + return &RefUntypedMediator{ + hLock: sync.RWMutex{}, + handlers: map[reflect.Type]reflect.Value{}, + + sLock: sync.RWMutex{}, + subscribers: map[reflect.Type]map[reflect.Value]struct{}{}, + } +} + +func (m *RefUntypedMediator) Send(ctx context.Context, command any) error { + m.hLock.RLock() + defer m.hLock.RUnlock() + + commandType := reflect.TypeOf(command) + if handler, found := m.handlers[commandType]; found { + return call(handler, ctx, command) + } + + return nil +} + +func (m *RefUntypedMediator) Register(command any, handler any) (disposable.Disposable, error) { + m.hLock.Lock() + defer m.hLock.Unlock() + + if err := compareWithHandlerSignature(command, handler); err != nil { + return nil, err + } + + commandType := reflect.TypeOf(command) + m.handlers[commandType] = reflect.ValueOf(handler) + + return disposable.NewDisposable(func() { + m.Unregister(command) + }), nil +} + +func (m *RefUntypedMediator) Unregister(command any) { + m.hLock.Lock() + defer m.hLock.Unlock() + + commandType := reflect.TypeOf(command) + delete(m.handlers, commandType) +} + +func (m *RefUntypedMediator) Subscribe(event any, handler any) (disposable.Disposable, error) { + m.sLock.Lock() + defer m.sLock.Unlock() + + if err := compareWithHandlerSignature(event, handler); err != nil { + return nil, err + } + + valueType := reflect.TypeOf(event) + if _, found := m.subscribers[valueType]; !found { + m.subscribers[valueType] = map[reflect.Value]struct{}{} + } + + handlerValue := reflect.ValueOf(handler) + m.subscribers[valueType][handlerValue] = struct{}{} + + return disposable.NewDisposable(func() { + m.Unsubscribe(event, handler) + }), nil +} + +func (m *RefUntypedMediator) Unsubscribe(event any, handler any) { + m.sLock.Lock() + defer m.sLock.Unlock() + + eventType := reflect.TypeOf(event) + handlerValue := reflect.ValueOf(handler) + + delete(m.subscribers[eventType], handlerValue) +} + +func (m *RefUntypedMediator) Publish(ctx context.Context, event any) error { + m.sLock.RLock() + defer m.sLock.RUnlock() + + var errs error + eventType := reflect.TypeOf(event) + for handler, _ := range m.subscribers[eventType] { + errs = multierror.Append(errs, call(handler, ctx, event)) + } + + return errs +} diff --git a/grade/internal/infrastructure/seedwork/mediator/untyped_test.go b/grade/internal/infrastructure/seedwork/mediator/untyped_test.go new file mode 100644 index 00000000..629045d5 --- /dev/null +++ b/grade/internal/infrastructure/seedwork/mediator/untyped_test.go @@ -0,0 +1,251 @@ +package mediator + +import ( + "context" + "errors" + "testing" + + "github.com/hashicorp/go-multierror" + "github.com/stretchr/testify/assert" +) + +type ( + Event struct { + name string + } + Command struct { + name string + } +) + +func TestMediator(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + + assertion func(t *testing.T, m *RefUntypedMediator) + }{ + { + name: "test_publish", + + assertion: func(t *testing.T, m *RefUntypedMediator) { + counter := 0 + handler := func(ctx context.Context, t Event) { + counter++ + } + + _, err := m.Subscribe(Event{}, handler) + assert.NoError(t, err) + + _ = m.Publish(context.Background(), Event{}) + assert.Equal(t, 1, counter) + }, + }, + + { + name: "test_unsubscribe", + assertion: func(t *testing.T, m *RefUntypedMediator) { + + times := 0 + handler := func(ctx context.Context, e Event) { + times++ + } + + times2 := 0 + handler2 := func(ctx context.Context, e Event) { + times2++ + } + + _, err := m.Subscribe(Event{}, handler) + assert.NoError(t, err) + + _, err = m.Subscribe(Event{}, handler2) + assert.NoError(t, err) + + m.Unsubscribe(Event{}, handler) + _ = m.Publish(context.Background(), Event{}) + + assert.Equal(t, 0, times) + assert.Equal(t, 1, times2) + }, + }, + + { + name: "test_disposable_event", + assertion: func(t *testing.T, m *RefUntypedMediator) { + + times := 0 + handler := func(ctx context.Context, e Event) { + times++ + } + + times2 := 0 + handler2 := func(ctx context.Context, e Event) error { + times2++ + return nil + } + + disposable, err := m.Subscribe(Event{}, handler) + assert.NoError(t, err) + + _, err = m.Subscribe(Event{}, handler2) + assert.NoError(t, err) + + disposable.Dispose() + _ = m.Publish(context.Background(), Event{}) + + assert.Equal(t, 0, times) + assert.Equal(t, 1, times2) + }, + }, + + { + name: "test_send", + assertion: func(t *testing.T, m *RefUntypedMediator) { + times := 0 + handler := func(ctx context.Context, e Command) { + times++ + } + + _, err := m.Register(Command{}, handler) + assert.NoError(t, err) + + _ = m.Send(context.Background(), Command{}) + + assert.Equal(t, 1, times) + }, + }, + + { + name: "test_unregister", + assertion: func(t *testing.T, m *RefUntypedMediator) { + times := 0 + handler := func(ctx context.Context, e Command) { + times++ + } + + _, err := m.Register(Command{}, handler) + assert.NoError(t, err) + m.Unregister(Command{}) + + _ = m.Send(context.Background(), Command{}) + + assert.Equal(t, 0, times) + }, + }, + + { + name: "test_disposable_command", + assertion: func(t *testing.T, m *RefUntypedMediator) { + times := 0 + handler := func(ctx context.Context, e Command) { + times++ + } + + disposable, err := m.Register(Command{}, handler) + assert.NoError(t, err) + + disposable.Dispose() + _ = m.Send(context.Background(), Command{}) + + assert.Equal(t, 0, times) + }, + }, + + { + name: "test_unsuitable_params_type_handler", + assertion: func(t *testing.T, m *RefUntypedMediator) { + times := 0 + handler := func(ctx context.Context, e Event) { + times++ + } + + _, err := m.Register(Command{}, handler) + assert.Equal(t, ErrUnsuitableHandlerSignature, err) + }, + }, + + { + name: "test_unsuitable_params_count_handler", + assertion: func(t *testing.T, m *RefUntypedMediator) { + times := 0 + handler := func(ctx context.Context, e Event, smt any) { + times++ + } + + _, err := m.Register(Command{}, handler) + assert.Equal(t, ErrUnsuitableHandlerSignature, err) + }, + }, + + { + name: "test_unsuitable_typeof_handler", + assertion: func(t *testing.T, m *RefUntypedMediator) { + handler := 1 + + _, err := m.Subscribe(Command{}, handler) + assert.Equal(t, ErrNonCallableHandler, err) + }, + }, + + { + name: "test_unsuitable_in_args_handler", + assertion: func(t *testing.T, m *RefUntypedMediator) { + handler := func(a, b int) {} + + _, err := m.Subscribe(Command{}, handler) + assert.Equal(t, ErrUnsuitableHandlerSignature, err) + }, + }, + + { + name: "test_unsuitable_void_handler", + assertion: func(t *testing.T, m *RefUntypedMediator) { + handler := func() {} + + _, err := m.Register(Command{}, handler) + assert.Equal(t, ErrUnsuitableHandlerSignature, err) + }, + }, + + { + name: "test_returning_errors", + assertion: func(t *testing.T, m *RefUntypedMediator) { + handlerError := errors.New("") + + handler := func(ctx context.Context, e Command) error { + return handlerError + } + + handler2 := func(ctx context.Context, e Event) error { + return handlerError + } + + handler3 := func(ctx context.Context, e Event) error { + return handlerError + } + + m.Register(Command{}, handler) + + m.Subscribe(Event{}, handler2) + m.Subscribe(Event{}, handler3) + + var errs error + errs = multierror.Append(errs, handlerError, handlerError) + assert.Equal(t, errs, m.Publish(context.Background(), Event{})) + + assert.Equal(t, handlerError, m.Send(context.Background(), Command{})) + }, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + m := NewRefUntypedMediator() + tt.assertion(t, m) + }) + } +}