From e3892460286d614a8283a979f56a8ab7c48515ef Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Wed, 16 Oct 2019 15:33:12 +0300 Subject: [PATCH 1/3] added custom saga correlation capabilties --- docs/SAGA.md | 13 ++++ examples/vacation_app/go.sum | 1 - gbus/abstractions.go | 36 ++++++++++ gbus/saga/def.go | 13 ++-- gbus/saga/glue.go | 47 +++++++++--- go.mod | 11 ++- go.sum | 19 +++++ tests/custom_saga_locator_test.go | 114 ++++++++++++++++++++++++++++++ tests/saga_test.go | 41 +++++++++++ 9 files changed, 277 insertions(+), 18 deletions(-) create mode 100644 tests/custom_saga_locator_test.go diff --git a/docs/SAGA.md b/docs/SAGA.md index de39ca0..2c5a6c0 100644 --- a/docs/SAGA.md +++ b/docs/SAGA.md @@ -270,3 +270,16 @@ to this: sagaInvocation.ReplyToInitiator(invocation.Ctx(), msg) } ``` + + +### Routing messages to a saga instance + +By default grabbit correlates a message to the correct saga instance when a handler replies to an incoming message. +There are many cases in which you would like to interact with a saga instance and send it messages not in the context of replying to a specific message originating from the saga. + +The way to do this is have the saga implement the gbus.CustomeSagaCorrelator interface +gbus.CustomeSagaCorrelator allows saga developers to instruct grabbit to rout messages to teh saga instance acording to some custom key that may be carried as part of message payloads. + +See the [following test](https://github.com/wework/grabbit/blob/master/tests/saga_test.go#L396) as an example. + + diff --git a/examples/vacation_app/go.sum b/examples/vacation_app/go.sum index c737917..d92fcca 100644 --- a/examples/vacation_app/go.sum +++ b/examples/vacation_app/go.sum @@ -92,7 +92,6 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 7b7e43b..0b7902a 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -282,3 +282,39 @@ type Logged interface { SetLogger(entry logrus.FieldLogger) Log() logrus.FieldLogger } + +//CustomeSagaCorrelator allows saga instances to control the way messages get correlated to the saga instance +type CustomeSagaCorrelator interface { + /* + GenCustomCorrelationID generate a unique ID that will serve as the unique identifier for the saga instance + grabbit will call this function after creating a new instance and invoking the saga handler that handles the + the message that starts up the saga. + */ + GenCustomCorrelationID() string + /* + CorrelationID provides grabbit with a custom function hinting grabbit how to correlate incoming messages + to the specific saga instance. + + Example: + + func (c *CustomSagaLocatorSaga) CorrelationID() func(message *gbus.BusMessage) string { + + return func(message *gbus.BusMessage) string { + switch message.Payload.(type) { + case *AddToOrderCommand: + return message.Payload.(*AddToOrderCommand).OrderID + case *CompleteOrderCommand: + return message.Payload.(*CompleteOrderCommand).OrderID + default: + return "" + } + } + } + + The above instructs grabbit to correlate incoming messages of type *AddToOrderCommand and *CompleteOrderCommand + according to their OrderID value and return an empty string if the incoming message doesn't match any of the above. + if a saga instance implements the CustomeSagaCorrelator interface it will first try to correlate the message to a saga + instance by the value returned by the custom correlation function and in case the returned value is an empty string grabbit will fallback and correlate the message to a saga instance according to BusMessage.SagaCorrelationID field + */ + CorrelationID() func(*BusMessage) string +} diff --git a/gbus/saga/def.go b/gbus/saga/def.go index 14a8792..96ead77 100644 --- a/gbus/saga/def.go +++ b/gbus/saga/def.go @@ -19,12 +19,13 @@ type MsgToFuncPair struct { //Def defines a saga type type Def struct { - glue *Glue - sagaType reflect.Type - startedBy []string - lock *sync.Mutex - sagaConfFns []gbus.SagaConfFn - msgToFunc []*MsgToFuncPair + glue *Glue + sagaType reflect.Type + startedBy []string + lock *sync.Mutex + sagaConfFns []gbus.SagaConfFn + msgToFunc []*MsgToFuncPair + customCorrelation func(*gbus.BusMessage) string } //HandleMessage implements HandlerRegister interface diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 6007b85..3040fd9 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -71,6 +71,9 @@ func (imsm *Glue) RegisterSaga(saga gbus.Saga, conf ...gbus.SagaConfFn) error { startedBy: fqnsFromMessages(saga.StartedBy()), msgToFunc: make([]*MsgToFuncPair, 0), lock: &sync.Mutex{}} + if correlator, ok := saga.(gbus.CustomeSagaCorrelator); ok { + def.customCorrelation = correlator.CorrelationID() + } saga.RegisterAllHandlers(def) imsm.sagaDefs = append(imsm.sagaDefs, def) @@ -108,13 +111,21 @@ func (imsm *Glue) handleNewSaga(def *Def, invocation gbus.Invocation, message *g newInstance.StartedByRPCID = message.RPCID newInstance.StartedByMessageID = message.ID - imsm.Log(). - WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}). - Info("created new saga") if invkErr := imsm.invokeSagaInstance(def, newInstance, invocation, message); invkErr != nil { imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga") return invkErr } + /* + assign the new saga id only after the invocation as we assume that the value of the custom saga id + is based on some value on the incoming message + */ + if def.customCorrelation != nil { + customCorrelator := newInstance.UnderlyingInstance.(gbus.CustomeSagaCorrelator) + newInstance.ID = customCorrelator.GenCustomCorrelationID() + } + imsm.Log(). + WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}). + Info("created new saga") if !newInstance.isComplete() { imsm.Log().WithField("saga_id", newInstance.ID).Info("saving new saga") @@ -156,11 +167,30 @@ func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessa if startNew { return imsm.handleNewSaga(def, invocation, message) - } else if message.SagaCorrelationID != "" { - instance, getErr := imsm.sagaStore.GetSagaByID(invocation.Tx(), message.SagaCorrelationID) + } else if message.Semantics == gbus.CMD { + if message.SagaCorrelationID == "" && def.customCorrelation == nil { + e := fmt.Errorf("Warning:Command or Reply message with no saga reference received. message will be dropped.\nmessage as of type:%v", reflect.TypeOf(message).Name()) + return e + + } + var correlatedSagaID string + + /* + fetch the correlation id from the custom correlation if one exists. + If there is no custom correlator set or if the returned value is an empty string (meaning the custom + correlator doesn't know how to correelate this message) then fallback to the message.SagaCorrelationID + */ + + if def.customCorrelation != nil { + correlatedSagaID = def.customCorrelation(message) + } else { + correlatedSagaID = message.SagaCorrelationID + } + + instance, getErr := imsm.sagaStore.GetSagaByID(invocation.Tx(), correlatedSagaID) if getErr != nil { - imsm.Log().WithError(getErr).WithField("saga_id", message.SagaCorrelationID).Error("failed to fetch saga by id") + imsm.Log().WithError(getErr).WithField("saga_id", correlatedSagaID).Error("failed to fetch saga by id") return getErr } if instance == nil { @@ -173,7 +203,7 @@ func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessa https://github.com/wework/grabbit/issues/196 */ - imsm.Log().WithField("saga_correlation_id", message.SagaCorrelationID).Warn("message routed with SagaCorrelationID but no saga instance with the same id found") + imsm.Log().WithField("saga_correlation_id", correlatedSagaID).Warn("message routed with SagaCorrelationID but no saga instance with the same id found") return nil } def.configureSaga(instance) @@ -184,9 +214,6 @@ func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessa return imsm.completeOrUpdateSaga(invocation.Tx(), instance) - } else if message.Semantics == gbus.CMD { - e := fmt.Errorf("Warning:Command or Reply message with no saga reference received. message will be dropped.\nmessage as of type:%v", reflect.TypeOf(message).Name()) - return e } else { imsm.Log().WithFields(logrus.Fields{"saga_type": def.sagaType, "message": msgName}).Info("fetching saga instances by type") diff --git a/go.mod b/go.mod index ff5a3f4..f335be9 100644 --- a/go.mod +++ b/go.mod @@ -6,16 +6,21 @@ require ( github.com/Rican7/retry v0.1.0 github.com/Shopify/sarama v1.24.0 // indirect github.com/bsm/sarama-cluster v2.1.15+incompatible // indirect + github.com/creack/pty v1.1.9 // indirect github.com/dangkaka/go-kafka-avro v0.0.0-20181108134201-d57aece51a15 github.com/eapache/go-resiliency v1.2.0 // indirect github.com/go-sql-driver/mysql v1.4.1 github.com/golang/protobuf v1.3.2 + github.com/hhatto/gocloc v0.3.0 // indirect github.com/jcmturner/gofork v1.0.0 // indirect github.com/klauspost/compress v1.8.6 // indirect github.com/klauspost/cpuid v1.2.1 // indirect github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect + github.com/kr/pty v1.1.8 // indirect github.com/linkedin/goavro v2.1.0+incompatible github.com/lopezator/migrator v0.2.0 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect github.com/onsi/ginkgo v1.8.0 // indirect github.com/onsi/gomega v1.5.0 // indirect github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620 @@ -28,15 +33,19 @@ require ( github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect github.com/rs/xid v1.2.1 github.com/sirupsen/logrus v1.4.2 + github.com/src-d/enry/v2 v2.1.0 // indirect github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 + github.com/stretchr/objx v0.2.0 // indirect + github.com/stretchr/testify v1.4.0 // indirect go.uber.org/multierr v1.2.0 // indirect golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc // indirect golang.org/x/net v0.0.0-20191009170851-d66e71096ffb // indirect golang.org/x/sys v0.0.0-20191009170203-06d7bd2c5f4f // indirect google.golang.org/appengine v1.6.5 // indirect - gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect + gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect + gopkg.in/yaml.v2 v2.2.4 // indirect ) go 1.13 diff --git a/go.sum b/go.sum index f73ddfa..5aa2502 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/sarama-cluster v2.1.15+incompatible h1:RkV6WiNRnqEEbp81druK8zYhmnIgdOjqSVi0+9Cnl2A= github.com/bsm/sarama-cluster v2.1.15+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM= +github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/dangkaka/go-kafka-avro v0.0.0-20181108134201-d57aece51a15 h1:QuKWm+/gc4/EuT8SCBAn1qcTh576rg0KoLfi7a0ArMM= github.com/dangkaka/go-kafka-avro v0.0.0-20181108134201-d57aece51a15/go.mod h1:NBrM4f6cInyw9KSBFONNXzpvPQ/WGige7ON42RICbWM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -56,11 +58,15 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hhatto/gocloc v0.3.0 h1:n3+d1hf8qTIlN3bbhegtP4RSRuSjtv378uLQqgwjQyU= +github.com/hhatto/gocloc v0.3.0/go.mod h1:hLWkKLYoVZDsIVygx3HklyE9kOgrnQCY32pEdbK9F2M= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= @@ -76,6 +82,7 @@ github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFB github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A= @@ -133,13 +140,21 @@ github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/src-d/enry/v2 v2.0.0/go.mod h1:qQeCMRwzMF3ckeGr+h0tJLdxXnq+NVZsIDMELj0t028= +github.com/src-d/enry/v2 v2.1.0 h1:z1L8t+B8bh3mmjPkJrgOTnVRpFGmTPJsplHX9wAn6BI= +github.com/src-d/enry/v2 v2.1.0/go.mod h1:qQeCMRwzMF3ckeGr+h0tJLdxXnq+NVZsIDMELj0t028= +github.com/src-d/go-oniguruma v1.1.0/go.mod h1:chVbff8kcVtmrhxtZ3yBVLLquXbzCS6DrxQaAK/CeqM= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/toqueteos/trie v1.0.0 h1:8i6pXxNUXNRAqP246iibb7w/pSFquNTQ+uNfriG7vlk= +github.com/toqueteos/trie v1.0.0/go.mod h1:Ywk48QhEqhU1+DwhMkJ2x7eeGxDHiGkAdc9+0DYcbsM= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= @@ -184,6 +199,7 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= @@ -201,6 +217,9 @@ gopkg.in/linkedin/goavro.v1 v1.0.5 h1:BJa69CDh0awSsLUmZ9+BowBdokpduDZSM9Zk8oKHfN gopkg.in/linkedin/goavro.v1 v1.0.5/go.mod h1:Aw5GdAbizjOEl0kAMHV9iHmA8reZzW/OKuJAl4Hb9F0= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/toqueteos/substring.v1 v1.0.2 h1:urLqCeMm6x/eTuQa1oZerNw8N1KNOIp5hD5kGL7lFsE= +gopkg.in/toqueteos/substring.v1 v1.0.2/go.mod h1:Eb2Z1UYehlVK8LYW2WBVR2rwbujsz3aX8XDrM1vbNew= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/tests/custom_saga_locator_test.go b/tests/custom_saga_locator_test.go new file mode 100644 index 0000000..b51c843 --- /dev/null +++ b/tests/custom_saga_locator_test.go @@ -0,0 +1,114 @@ +package tests + +import ( + "github.com/wework/grabbit/gbus" +) + +/*** messages that the saga handles */ +type InitiateOrderCommand struct { + OrderID string +} + +func (InitiateOrderCommand) SchemaName() string { + return "InitiateOrderCommand" +} + +type InitiateOrderResponse struct { + OrderID string +} + +func (InitiateOrderResponse) SchemaName() string { + return "InitiateOrderResponse" +} + +type AddToOrderCommand struct { + OrderID string +} + +func (AddToOrderCommand) SchemaName() string { + return "AddToOrderCommand" +} + +type AddToOrderResponse struct { + OrderID string +} + +func (AddToOrderResponse) SchemaName() string { + return "AddToOrderResponse" +} + +type CompleteOrderCommand struct { + OrderID string +} + +func (CompleteOrderCommand) SchemaName() string { + return "CompleteOrderCommand" +} + +type CompleteOrderResponse struct { + OrderID string +} + +func (CompleteOrderResponse) SchemaName() string { + return "CompleteOrderResponse" +} + +/*******************************************************/ + +var _ gbus.CustomeSagaCorrelator = &CustomSagaLocatorSaga{} + +type CustomSagaLocatorSaga struct { + OrderID string + Completed bool +} + +func (*CustomSagaLocatorSaga) StartedBy() []gbus.Message { + starters := make([]gbus.Message, 0) + return append(starters, InitiateOrderCommand{}) +} + +func (c *CustomSagaLocatorSaga) IsComplete() bool { + return c.Completed +} + +func (c *CustomSagaLocatorSaga) New() gbus.Saga { + return &CustomSagaLocatorSaga{} +} + +func (c *CustomSagaLocatorSaga) RegisterAllHandlers(register gbus.HandlerRegister) { + register.HandleMessage(InitiateOrderCommand{}, c.HandleInitiateOrderCommand) + register.HandleMessage(AddToOrderCommand{}, c.HandleAddToOrderCommand) + register.HandleMessage(CompleteOrderCommand{}, c.HandleCompleteOrderCommand) +} + +func (c *CustomSagaLocatorSaga) HandleInitiateOrderCommand(invocation gbus.Invocation, message *gbus.BusMessage) error { + c.OrderID = message.Payload.(*InitiateOrderCommand).OrderID + return invocation.Reply(invocation.Ctx(), gbus.NewBusMessage(InitiateOrderResponse{})) +} + +func (c *CustomSagaLocatorSaga) HandleAddToOrderCommand(invocation gbus.Invocation, message *gbus.BusMessage) error { + return invocation.Reply(invocation.Ctx(), gbus.NewBusMessage(AddToOrderResponse{})) +} + +func (c *CustomSagaLocatorSaga) HandleCompleteOrderCommand(invocation gbus.Invocation, message *gbus.BusMessage) error { + c.Completed = true + return invocation.Reply(invocation.Ctx(), gbus.NewBusMessage(CompleteOrderResponse{})) +} + +func (c *CustomSagaLocatorSaga) GenCustomCorrelationID() string { + return c.OrderID +} + +func (c *CustomSagaLocatorSaga) CorrelationID() func(message *gbus.BusMessage) string { + + return func(message *gbus.BusMessage) string { + switch message.Payload.(type) { + case *AddToOrderCommand: + return message.Payload.(*AddToOrderCommand).OrderID + case *CompleteOrderCommand: + return message.Payload.(*CompleteOrderCommand).OrderID + default: + return "" + } + } +} diff --git a/tests/saga_test.go b/tests/saga_test.go index 9aacc42..283a86d 100644 --- a/tests/saga_test.go +++ b/tests/saga_test.go @@ -5,6 +5,7 @@ import ( "database/sql" "log" "reflect" + "sync" "testing" "time" @@ -392,6 +393,46 @@ func TestSagaConfFunctions(t *testing.T) { } +func TestCustomSagaCorrelation(t *testing.T) { + + client := createNamedBusForTest(testSvc1) + server := createNamedBusForTest(testSvc2) + server.RegisterSaga(&CustomSagaLocatorSaga{}) + + var wg sync.WaitGroup + wg.Add(3) + client.HandleMessage(InitiateOrderResponse{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { + + wg.Done() + return nil + }) + + client.HandleMessage(AddToOrderResponse{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { + + wg.Done() + return nil + }) + + client.HandleMessage(CompleteOrderResponse{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error { + + wg.Done() + return nil + }) + + client.Start() + defer client.Shutdown() + + server.Start() + defer server.Shutdown() + + orderID := "someOrderID" + client.Send(context.Background(), testSvc2, gbus.NewBusMessage(InitiateOrderCommand{OrderID: orderID})) + client.Send(context.Background(), testSvc2, gbus.NewBusMessage(AddToOrderCommand{OrderID: orderID})) + client.Send(context.Background(), testSvc2, gbus.NewBusMessage(CompleteOrderCommand{OrderID: orderID})) + + wg.Wait() +} + /*Test Sagas*/ type SagaA struct { From 31b788811a1d2fa383b72a65b5abe3eff61cebb2 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Fri, 18 Oct 2019 17:40:11 +0300 Subject: [PATCH 2/3] refactored to a simpler correlator function --- docs/SAGA.md | 2 +- gbus/abstractions.go | 52 ++++++++++++++----------------- gbus/saga/def.go | 2 +- gbus/saga/glue.go | 21 ++++++------- tests/custom_saga_locator_test.go | 14 +++++---- 5 files changed, 44 insertions(+), 47 deletions(-) diff --git a/docs/SAGA.md b/docs/SAGA.md index 2c5a6c0..0667b1c 100644 --- a/docs/SAGA.md +++ b/docs/SAGA.md @@ -278,7 +278,7 @@ By default grabbit correlates a message to the correct saga instance when a hand There are many cases in which you would like to interact with a saga instance and send it messages not in the context of replying to a specific message originating from the saga. The way to do this is have the saga implement the gbus.CustomeSagaCorrelator interface -gbus.CustomeSagaCorrelator allows saga developers to instruct grabbit to rout messages to teh saga instance acording to some custom key that may be carried as part of message payloads. +gbus.CustomeSagaCorrelator allows saga developers to instruct grabbit to rout messages to the saga instance acording to some custom key that may be carried as part of message payloads. See the [following test](https://github.com/wework/grabbit/blob/master/tests/saga_test.go#L396) as an example. diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 0b7902a..77b8578 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -286,35 +286,31 @@ type Logged interface { //CustomeSagaCorrelator allows saga instances to control the way messages get correlated to the saga instance type CustomeSagaCorrelator interface { /* - GenCustomCorrelationID generate a unique ID that will serve as the unique identifier for the saga instance - grabbit will call this function after creating a new instance and invoking the saga handler that handles the - the message that starts up the saga. - */ - GenCustomCorrelationID() string - /* - CorrelationID provides grabbit with a custom function hinting grabbit how to correlate incoming messages - to the specific saga instance. - - Example: - - func (c *CustomSagaLocatorSaga) CorrelationID() func(message *gbus.BusMessage) string { - - return func(message *gbus.BusMessage) string { - switch message.Payload.(type) { - case *AddToOrderCommand: - return message.Payload.(*AddToOrderCommand).OrderID - case *CompleteOrderCommand: - return message.Payload.(*CompleteOrderCommand).OrderID - default: - return "" - } - } + CorrelationFn provides grabbit with a custom function controling which value of the incoming messages + to use as the correlation key. + + Example: + + func (c *CustomSagaLocatorSaga) Correlator() func(message gbus.Message) (string, bool) { + + return func(message gbus.Message) (string, bool) { + switch message.(type) { + case *InitiateOrderCommand: + return message.(*InitiateOrderCommand).OrderID, true + case *AddToOrderCommand: + return message.(*AddToOrderCommand).OrderID, true + case *CompleteOrderCommand: + return message.(*CompleteOrderCommand).OrderID, true + default: + return "", false } + } + } - The above instructs grabbit to correlate incoming messages of type *AddToOrderCommand and *CompleteOrderCommand - according to their OrderID value and return an empty string if the incoming message doesn't match any of the above. - if a saga instance implements the CustomeSagaCorrelator interface it will first try to correlate the message to a saga - instance by the value returned by the custom correlation function and in case the returned value is an empty string grabbit will fallback and correlate the message to a saga instance according to BusMessage.SagaCorrelationID field + The above instructs grabbit to correlate incoming messages of type *InitiateOrderCommand, *AddToOrderCommand and *CompleteOrderCommand + according to their OrderID value and return an empty string and false value if the incoming message doesn't match any of the above. + if a saga instance implements the CustomeSagaCorrelator interface it will first try to correlate the message to a saga + instance by the value returned by the custom correlation function and in case the returned value if it can and will fallback and correlate the message to a saga instance according to BusMessage.SagaCorrelationID field */ - CorrelationID() func(*BusMessage) string + Correlator() func(Message) (string, bool) } diff --git a/gbus/saga/def.go b/gbus/saga/def.go index 96ead77..8123767 100644 --- a/gbus/saga/def.go +++ b/gbus/saga/def.go @@ -25,7 +25,7 @@ type Def struct { lock *sync.Mutex sagaConfFns []gbus.SagaConfFn msgToFunc []*MsgToFuncPair - customCorrelation func(*gbus.BusMessage) string + customCorrelation func(gbus.Message) (string, bool) } //HandleMessage implements HandlerRegister interface diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 3040fd9..70fdc6d 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -72,7 +72,9 @@ func (imsm *Glue) RegisterSaga(saga gbus.Saga, conf ...gbus.SagaConfFn) error { msgToFunc: make([]*MsgToFuncPair, 0), lock: &sync.Mutex{}} if correlator, ok := saga.(gbus.CustomeSagaCorrelator); ok { - def.customCorrelation = correlator.CorrelationID() + def.customCorrelation = correlator.Correlator() + } else { + def.customCorrelation = func(message gbus.Message) (string, bool) { return "", false } } saga.RegisterAllHandlers(def) @@ -111,18 +113,15 @@ func (imsm *Glue) handleNewSaga(def *Def, invocation gbus.Invocation, message *g newInstance.StartedByRPCID = message.RPCID newInstance.StartedByMessageID = message.ID + if overrideID, canCorrelate := def.customCorrelation(message.Payload); canCorrelate { + newInstance.ID = overrideID + } + if invkErr := imsm.invokeSagaInstance(def, newInstance, invocation, message); invkErr != nil { imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga") return invkErr } - /* - assign the new saga id only after the invocation as we assume that the value of the custom saga id - is based on some value on the incoming message - */ - if def.customCorrelation != nil { - customCorrelator := newInstance.UnderlyingInstance.(gbus.CustomeSagaCorrelator) - newInstance.ID = customCorrelator.GenCustomCorrelationID() - } + imsm.Log(). WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}). Info("created new saga") @@ -181,8 +180,8 @@ func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessa correlator doesn't know how to correelate this message) then fallback to the message.SagaCorrelationID */ - if def.customCorrelation != nil { - correlatedSagaID = def.customCorrelation(message) + if overrideID, canCorrelate := def.customCorrelation(message.Payload); canCorrelate { + correlatedSagaID = overrideID } else { correlatedSagaID = message.SagaCorrelationID } diff --git a/tests/custom_saga_locator_test.go b/tests/custom_saga_locator_test.go index b51c843..34697ce 100644 --- a/tests/custom_saga_locator_test.go +++ b/tests/custom_saga_locator_test.go @@ -99,16 +99,18 @@ func (c *CustomSagaLocatorSaga) GenCustomCorrelationID() string { return c.OrderID } -func (c *CustomSagaLocatorSaga) CorrelationID() func(message *gbus.BusMessage) string { +func (c *CustomSagaLocatorSaga) Correlator() func(message gbus.Message) (string, bool) { - return func(message *gbus.BusMessage) string { - switch message.Payload.(type) { + return func(message gbus.Message) (string, bool) { + switch message.(type) { + case *InitiateOrderCommand: + return message.(*InitiateOrderCommand).OrderID, true case *AddToOrderCommand: - return message.Payload.(*AddToOrderCommand).OrderID + return message.(*AddToOrderCommand).OrderID, true case *CompleteOrderCommand: - return message.Payload.(*CompleteOrderCommand).OrderID + return message.(*CompleteOrderCommand).OrderID, true default: - return "" + return "", false } } } From 19fe1118a5517b465d64549abc97ac50e23b1d84 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Fri, 18 Oct 2019 17:52:00 +0300 Subject: [PATCH 3/3] fixing linting --- gbus/abstractions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 77b8578..158e849 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -286,7 +286,7 @@ type Logged interface { //CustomeSagaCorrelator allows saga instances to control the way messages get correlated to the saga instance type CustomeSagaCorrelator interface { /* - CorrelationFn provides grabbit with a custom function controling which value of the incoming messages + Correlator provides grabbit with a custom function controlling which value of the incoming messages to use as the correlation key. Example: