-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathregistry.go
103 lines (82 loc) · 1.85 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package saga
import (
"sync"
)
func NewRegistry[Tx TxContext](orchestrator Orchestrator[Tx]) *Registry[Tx] {
return &Registry[Tx]{
sagas: make([]Saga[Session, Tx], 0),
mutex: sync.Mutex{},
orchestrator: orchestrator,
}
}
func RegisterSagaTo[S Session, Tx TxContext](r *Registry[Tx], s Saga[S, Tx]) error {
if s.Name() == "" {
return ErrRegisterInvalidSaga
}
if r.HasSaga(s.Name()) {
return ErrRegisterInvalidSaga
}
r.mutex.Lock()
defer r.mutex.Unlock()
r.sagas = append(r.sagas, convertSaga(s))
return nil
}
type Registry[Tx TxContext] struct {
sagas []Saga[Session, Tx]
mutex sync.Mutex
orchestrator Orchestrator[Tx]
}
func (r *Registry[Tx]) consumeMessage(packet messagePacket) error {
sessID := packet.Payload().SessionID()
type Orchestrations func() error
orchestrations := make([]Orchestrations, 0)
func() {
r.mutex.Lock()
defer r.mutex.Unlock()
for _, s := range r.sagas {
if s.hasPublishedSaga(sessID) {
f := func() error {
return r.orchestrator.Orchestrate(s, packet)
}
orchestrations = append(orchestrations, f)
}
}
}()
for _, f := range orchestrations {
if err := f(); err != nil {
return err
}
}
return nil
}
func (r *Registry[Tx]) StartSaga(sagaName string, sessionArgs map[string]interface{}) error {
if sagaName == "" {
return ErrInvalidSagaStart
}
if sessionArgs == nil {
return ErrInvalidSagaStart
}
r.mutex.Lock()
defer r.mutex.Unlock()
var target *Saga[Session, Tx]
for _, s := range r.sagas {
if s.Name() == sagaName {
target = &s
break
}
}
if target == nil {
return ErrSagaNotFound
}
return r.orchestrator.StartSaga(*target, sessionArgs)
}
func (r *Registry[Tx]) HasSaga(sagaName string) bool {
r.mutex.Lock()
defer r.mutex.Unlock()
for _, s := range r.sagas {
if s.Name() == sagaName {
return true
}
}
return false
}