-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueries.go
83 lines (59 loc) · 1.87 KB
/
queries.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package cqrs
import (
"context"
"errors"
"fmt"
"reflect"
"github.com/ahmetb/go-linq/v3"
)
type IQueryHandler[TQuery any, TResponse any] interface {
Handle(ctx context.Context, query TQuery) (TResponse, error)
}
var queryHandlers map[reflect.Type]interface{}
func init() {
queryHandlers = make(map[reflect.Type]interface{})
}
func RegisterQueryHandler[TQuery any, TResponse any](handler IQueryHandler[TQuery, TResponse]) error {
var query TQuery
queryType := reflect.TypeOf(query)
_, found := queryHandlers[queryType]
if found {
msg := fmt.Sprintf("handler for query of type %s is already registered", queryType.String())
return errors.New(msg)
}
queryHandlers[queryType] = handler
return nil
}
func Request[TQuery any, TResponse any](ctx context.Context, query TQuery) (TResponse, error) {
queryType := reflect.TypeOf(query)
h, found := queryHandlers[queryType]
if !found {
msg := fmt.Sprintf("no handler registered for query %T", query)
return *new(TResponse), errors.New(msg)
}
handler, casted := h.(IQueryHandler[TQuery, TResponse])
if !casted {
msg := fmt.Sprintf("handler of type %T is not assignable for query of type %T and response of type %T", handler, query, *new(TResponse))
return *new(TResponse), errors.New(msg)
}
if len(queryBehaviors) <= 0 {
return handler.Handle(ctx, query)
}
sortedBehaviors := sortBehaviors(queryBehaviors)
queryHandle := func() (interface{}, error) {
return handler.Handle(ctx, query)
}
aggregatedPipeline := linq.From(sortedBehaviors).AggregateWithSeedT(queryHandle, func(next NextFunc, b IBehavior) NextFunc {
var nextFunc NextFunc = func() (interface{}, error) {
return b.Handle(ctx, query, next)
}
return nextFunc
})
pipeline := aggregatedPipeline.(NextFunc)
res, err := pipeline()
response, casted := res.(TResponse)
if !casted {
return *new(TResponse), err
}
return response, err
}