From 2c3753c04ba67f69247ff15cf9427089f42489f4 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Mon, 16 Dec 2024 13:51:24 +0100 Subject: [PATCH 01/12] remove handler groups from _examples/basic/5-cqrs-protobuf and polish it a bit --- .../5-cqrs-protobuf/.validate_example.yml | 4 +- _examples/basic/5-cqrs-protobuf/README.md | 29 +++++ _examples/basic/5-cqrs-protobuf/go.mod | 2 +- _examples/basic/5-cqrs-protobuf/go.sum | 4 + _examples/basic/5-cqrs-protobuf/main.go | 104 ++++++++++-------- .../{events.pb.go => messages.pb.go} | 0 .../events.proto => proto/messages.proto} | 0 7 files changed, 94 insertions(+), 49 deletions(-) rename _examples/basic/5-cqrs-protobuf/{events.pb.go => messages.pb.go} (100%) rename _examples/basic/5-cqrs-protobuf/{inputs/events.proto => proto/messages.proto} (100%) diff --git a/_examples/basic/5-cqrs-protobuf/.validate_example.yml b/_examples/basic/5-cqrs-protobuf/.validate_example.yml index 20dce4535..b8ae907e4 100644 --- a/_examples/basic/5-cqrs-protobuf/.validate_example.yml +++ b/_examples/basic/5-cqrs-protobuf/.validate_example.yml @@ -1,4 +1,6 @@ validation_cmd: "docker compose up" teardown_cmd: "docker compose down" timeout: 120 -expected_output: "beers ordered to room 3" +expected_outputs: + - "beers ordered to room \\d+" + - "Already booked rooms for \\$\\d{2,}" diff --git a/_examples/basic/5-cqrs-protobuf/README.md b/_examples/basic/5-cqrs-protobuf/README.md index f1e6e0891..e8ad853e3 100644 --- a/_examples/basic/5-cqrs-protobuf/README.md +++ b/_examples/basic/5-cqrs-protobuf/README.md @@ -6,6 +6,35 @@ Detailed documentation for CQRS can be found in Watermill's docs: [http://waterm ![CQRS Event Storming](https://threedots.tech/watermill-io/cqrs-example-storming.png) +```mermaid +sequenceDiagram + participant M as Main + participant CB as CommandBus + participant BRH as BookRoomHandler + participant EB as EventBus + participant OBRB as OrderBeerOnRoomBooked + participant OBH as OrderBeerHandler + participant BFR as BookingsFinancialReport + + Note over M,BFR: Commands use AMQP queue, Events use AMQP pub/sub + + M->>CB: Send(BookRoom)
topic: commands.BookRoom + CB->>BRH: Handle(BookRoom) + + BRH->>EB: Publish(RoomBooked)
topic: events.RoomBooked + + par Process RoomBooked Event + EB->>OBRB: Handle(RoomBooked) + OBRB->>CB: Send(OrderBeer)
topic: commands.OrderBeer + CB->>OBH: Handle(OrderBeer) + OBH->>EB: Publish(BeerOrdered)
topic: events.BeerOrdered + + EB->>BFR: Handle(RoomBooked) + Note over BFR: Updates financial report + end +``` + + ## Running ```bash diff --git a/_examples/basic/5-cqrs-protobuf/go.mod b/_examples/basic/5-cqrs-protobuf/go.mod index 406956feb..72e057665 100644 --- a/_examples/basic/5-cqrs-protobuf/go.mod +++ b/_examples/basic/5-cqrs-protobuf/go.mod @@ -1,7 +1,7 @@ module main.go require ( - github.com/ThreeDotsLabs/watermill v1.3.7 + github.com/ThreeDotsLabs/watermill v1.4.2-0.20241216112750-9d5e2da13339 github.com/ThreeDotsLabs/watermill-amqp/v3 v3.0.0 github.com/golang/protobuf v1.5.4 ) diff --git a/_examples/basic/5-cqrs-protobuf/go.sum b/_examples/basic/5-cqrs-protobuf/go.sum index 8b9e98ea5..27ff1c96f 100644 --- a/_examples/basic/5-cqrs-protobuf/go.sum +++ b/_examples/basic/5-cqrs-protobuf/go.sum @@ -1,5 +1,9 @@ github.com/ThreeDotsLabs/watermill v1.3.7 h1:NV0PSTmuACVEOV4dMxRnmGXrmbz8U83LENOvpHekN7o= github.com/ThreeDotsLabs/watermill v1.3.7/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to= +github.com/ThreeDotsLabs/watermill v1.4.1 h1:gjP6yZH+otMPjV0KsV07pl9TeMm9UQV/gqiuiuG5Drs= +github.com/ThreeDotsLabs/watermill v1.4.1/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to= +github.com/ThreeDotsLabs/watermill v1.4.2-0.20241216112750-9d5e2da13339 h1:Q6joJUSSelwcxHEngdu3PGy4UYqk7BMXKC3Rzer3Xuk= +github.com/ThreeDotsLabs/watermill v1.4.2-0.20241216112750-9d5e2da13339/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to= github.com/ThreeDotsLabs/watermill-amqp/v3 v3.0.0 h1:r5idq2qkd3M345iv3C3zAX+lFlEu7iW8QESNnuuv4eY= github.com/ThreeDotsLabs/watermill-amqp/v3 v3.0.0/go.mod h1:+8tCh6VCuBcQWhfETCwzRINKQ1uyeg9moH3h7jMKxQk= github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= diff --git a/_examples/basic/5-cqrs-protobuf/main.go b/_examples/basic/5-cqrs-protobuf/main.go index b29ce7a20..48ca3bd0b 100644 --- a/_examples/basic/5-cqrs-protobuf/main.go +++ b/_examples/basic/5-cqrs-protobuf/main.go @@ -3,12 +3,12 @@ package main import ( "context" "fmt" - "log" + "log/slog" "math/rand" "sync" "time" - "github.com/golang/protobuf/ptypes" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp" @@ -29,12 +29,12 @@ func (b BookRoomHandler) Handle(ctx context.Context, cmd *BookRoom) error { // some random price, in production you probably will calculate in wiser way price := (rand.Int63n(40) + 1) * 10 - log.Printf( - "Booked %s for %s from %s to %s", - cmd.RoomId, - cmd.GuestName, - time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)), - time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)), + slog.Info( + "Booked room", + "room_id", cmd.RoomId, + "guest_name", cmd.GuestName, + "start_date", time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)), + "end_date", time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)), ) // RoomBooked will be handled by OrderBeerOnRoomBooked event handler, @@ -90,7 +90,7 @@ func (o OrderBeerHandler) Handle(ctx context.Context, cmd *OrderBeer) error { return err } - log.Printf("%d beers ordered to room %s", cmd.Count, cmd.RoomId) + slog.Info(fmt.Sprintf("%d beers ordered to room %s", cmd.Count, cmd.RoomId)) return nil } @@ -123,15 +123,23 @@ func (b *BookingsFinancialReport) Handle(ctx context.Context, event *RoomBooked) b.totalCharge += event.Price - fmt.Printf(">>> Already booked rooms for $%d\n", b.totalCharge) + slog.Info(">>> Already booked rooms for $%d\n", b.totalCharge) return nil } var amqpAddress = "amqp://guest:guest@rabbitmq:5672/" func main() { - logger := watermill.NewStdLogger(false, false) - cqrsMarshaler := cqrs.ProtobufMarshaler{} + logger := watermill.NewSlogLoggerWithLevelMapping(nil, map[slog.Level]slog.Level{ + slog.LevelInfo: slog.LevelDebug, + }) + + cqrsMarshaler := cqrs.ProtobufMarshaler{ + // It will generate topic names based on the event/command type. + // So for example, for "RoomBooked" name will be "RoomBooked". + // It's later used to generate topic names. + GenerateName: cqrs.StructName, + } // You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/ // Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/ @@ -168,8 +176,7 @@ func main() { commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{ GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) { - // we are using queue RabbitMQ config, so we need to have topic per command type - return params.CommandName, nil + return generateCommandsTopic(params.CommandName), nil }, OnSend: func(params cqrs.CommandBusOnSendParams) error { logger.Info("Sending command", watermill.LogFields{ @@ -191,8 +198,7 @@ func main() { router, cqrs.CommandProcessorConfig{ GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) { - // we are using queue RabbitMQ config, so we need to have topic per command type - return params.CommandName, nil + return generateCommandsTopic(params.CommandName), nil }, SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { // we can reuse subscriber, because all commands have separated topics @@ -221,11 +227,7 @@ func main() { eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{ GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { - // because we are using PubSub RabbitMQ config, we can use one topic for all events - return "events", nil - - // we can also use topic per event type - // return params.EventName, nil + return generateEventsTopic(params.EventName), nil }, OnPublish: func(params cqrs.OnEventSendParams) error { @@ -245,22 +247,22 @@ func main() { panic(err) } - eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig( + eventProcessor, err := cqrs.NewEventProcessorWithConfig( router, - cqrs.EventGroupProcessorConfig{ - GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) { - return "events", nil + cqrs.EventProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { + return generateEventsTopic(params.EventName), nil }, - SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) { + SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { config := amqp.NewDurablePubSubConfig( amqpAddress, - amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName), + amqp.GenerateQueueNameTopicNameWithSuffix(params.HandlerName), ) return amqp.NewSubscriber(config, logger) }, - OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error { + OnHandle: func(params cqrs.EventProcessorOnHandleParams) error { start := time.Now() err := params.Handler.Handle(params.Message.Context(), params.Event) @@ -290,16 +292,23 @@ func main() { panic(err) } - err = eventProcessor.AddHandlersGroup( - "events", - cqrs.NewGroupEventHandler(OrderBeerOnRoomBooked{commandBus}.Handle), - cqrs.NewGroupEventHandler(NewBookingsFinancialReport().Handle), - cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error { - logger.Info("Beer ordered", watermill.LogFields{ - "room_id": event.RoomId, - }) - return nil - }), + err = eventProcessor.AddHandlers( + cqrs.NewEventHandler( + "OrderBeerOnRoomBooked", + OrderBeerOnRoomBooked{commandBus}.Handle), + + cqrs.NewEventHandler( + "LogBeerOrdered", + func(ctx context.Context, event *BeerOrdered) error { + logger.Info("Beer ordered", watermill.LogFields{ + "room_id": event.RoomId, + }) + return nil + }), + cqrs.NewEventHandler( + "BookingsFinancialReport", + NewBookingsFinancialReport().Handle, + ), ) if err != nil { panic(err) @@ -319,15 +328,8 @@ func publishCommands(commandBus *cqrs.CommandBus) func() { for { i++ - startDate, err := ptypes.TimestampProto(time.Now()) - if err != nil { - panic(err) - } - - endDate, err := ptypes.TimestampProto(time.Now().Add(time.Hour * 24 * 3)) - if err != nil { - panic(err) - } + startDate := timestamppb.New(time.Now()) + endDate := timestamppb.New(time.Now().Add(time.Hour * 24 * 3)) bookRoomCmd := &BookRoom{ RoomId: fmt.Sprintf("%d", i), @@ -342,3 +344,11 @@ func publishCommands(commandBus *cqrs.CommandBus) func() { time.Sleep(time.Second) } } + +func generateEventsTopic(eventName string) string { + return "events." + eventName +} + +func generateCommandsTopic(commandName string) string { + return "commands." + commandName +} diff --git a/_examples/basic/5-cqrs-protobuf/events.pb.go b/_examples/basic/5-cqrs-protobuf/messages.pb.go similarity index 100% rename from _examples/basic/5-cqrs-protobuf/events.pb.go rename to _examples/basic/5-cqrs-protobuf/messages.pb.go diff --git a/_examples/basic/5-cqrs-protobuf/inputs/events.proto b/_examples/basic/5-cqrs-protobuf/proto/messages.proto similarity index 100% rename from _examples/basic/5-cqrs-protobuf/inputs/events.proto rename to _examples/basic/5-cqrs-protobuf/proto/messages.proto From 9a2ba52b62a5db3af0c05b2af237a5b8180f294f Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Mon, 16 Dec 2024 14:48:20 +0100 Subject: [PATCH 02/12] [validate-examples] add support for multiple expected outputs --- dev/validate-examples/main.go | 46 +++++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/dev/validate-examples/main.go b/dev/validate-examples/main.go index 37c77ff83..d22b84995 100644 --- a/dev/validate-examples/main.go +++ b/dev/validate-examples/main.go @@ -16,10 +16,11 @@ import ( ) type Config struct { - ValidationCmd string `yaml:"validation_cmd"` - TeardownCmd string `yaml:"teardown_cmd"` - Timeout int `yaml:"timeout"` - ExpectedOutput string `yaml:"expected_output"` + ValidationCmd string `yaml:"validation_cmd"` + TeardownCmd string `yaml:"teardown_cmd"` + Timeout int `yaml:"timeout"` + ExpectedOutput string `yaml:"expected_output"` + ExpectedOutputs []string `yaml:"expected_outputs"` } func (c *Config) LoadFrom(path string) error { @@ -35,8 +36,20 @@ func (c *Config) LoadFrom(path string) error { } func main() { - walkErr := filepath.Walk("../../", func(exampleConfig string, f os.FileInfo, _ error) error { - matches, _ := filepath.Match(".validate_example*.yml", f.Name()) + path := "../../_examples/" + + if len(os.Args) > 1 { + path = filepath.Join(path, os.Args[1]) + } + + walkErr := filepath.Walk(path, func(exampleConfig string, f os.FileInfo, _ error) error { + if f == nil { + return nil + } + matches, err := filepath.Match(".validate_example*.yml", f.Name()) + if err != nil { + return fmt.Errorf("could not match file, err: %w", err) + } if !matches { return nil } @@ -45,7 +58,7 @@ func main() { fmt.Printf("validating %s\n", exampleDirectory) - err := validate(exampleConfig) + err = validate(exampleConfig) if err != nil { return fmt.Errorf("validation for %s failed, err: %v", exampleDirectory, err) } @@ -67,9 +80,14 @@ func validate(path string) error { dirName := filepath.Base(filepath.Dir(path)) + expectedOutputs := config.ExpectedOutputs + if config.ExpectedOutput != "" { + expectedOutputs = append(expectedOutputs, config.ExpectedOutput) + } + fmt.Print("\n\n") fmt.Println("Validating example:", dirName) - fmt.Println("Waiting for output: ", color.GreenString(config.ExpectedOutput)) + fmt.Println("Waiting for output: ", color.GreenString(fmt.Sprintf("%+q", expectedOutputs))) cmdAndArgs := strings.Fields(config.ValidationCmd) validationCmd := exec.Command(cmdAndArgs[0], cmdAndArgs[1:]...) @@ -114,12 +132,20 @@ func validate(path string) error { go readLines(stdout, lines) go readLines(stderr, lines) + outputsFound := map[int]struct{}{} + go func() { for line := range lines { fmt.Printf("[%s] > %s\n", color.CyanString(dirName), line) - ok, _ := regexp.MatchString(config.ExpectedOutput, line) - if ok { + for num, output := range expectedOutputs { + ok, _ := regexp.MatchString(output, line) + if ok { + outputsFound[num] = struct{}{} + } + } + + if len(outputsFound) == len(expectedOutputs) { success <- true return } From 2629bc948b5d813a07283e7eda5298623e889cb1 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Mon, 16 Dec 2024 14:50:01 +0100 Subject: [PATCH 03/12] better docs for cqrs.StructName and cqrs.FullyQualifiedStructName --- components/cqrs/name.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/components/cqrs/name.go b/components/cqrs/name.go index bdeb393ab..7bb5882a3 100644 --- a/components/cqrs/name.go +++ b/components/cqrs/name.go @@ -6,6 +6,13 @@ import ( ) // FullyQualifiedStructName name returns object name in format [package].[type name]. +// For example, for the struct: +// +// package events +// type UserCreated struct {} +// +// it will return "events.UserCreated". +// // It ignores if the value is a pointer or not. func FullyQualifiedStructName(v interface{}) string { s := fmt.Sprintf("%T", v) @@ -15,6 +22,13 @@ func FullyQualifiedStructName(v interface{}) string { } // StructName name returns struct name in format [type name]. +// For example, for the struct: +// +// package events +// type UserCreated struct {} +// +// it will return "UserCreated". +// // It ignores if the value is a pointer or not. func StructName(v interface{}) string { segments := strings.Split(fmt.Sprintf("%T", v), ".") From b49271fc9fce2a1c0aada1ee4400a314c285e4cb Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Mon, 16 Dec 2024 15:41:22 +0100 Subject: [PATCH 04/12] added CQRS ordered events example --- .../basic/5-cqrs-protobuf/docker-compose.yml | 1 + _examples/basic/5-cqrs-protobuf/main.go | 10 +- _examples/basic/5-cqrs-protobuf/makefile | 4 + .../.validate_example.yml | 10 + .../basic/6-cqrs-ordered-events/README.md | 13 + .../basic/6-cqrs-ordered-events/activity.go | 89 +++ .../6-cqrs-ordered-events/docker-compose.yml | 43 ++ _examples/basic/6-cqrs-ordered-events/go.mod | 43 ++ _examples/basic/6-cqrs-ordered-events/go.sum | 130 ++++ _examples/basic/6-cqrs-ordered-events/main.go | 264 +++++++ .../basic/6-cqrs-ordered-events/makefile | 4 + .../basic/6-cqrs-ordered-events/message.go | 56 ++ .../6-cqrs-ordered-events/messages.pb.go | 654 ++++++++++++++++++ .../proto/messages.proto | 53 ++ .../6-cqrs-ordered-events/subscribers.go | 68 ++ 15 files changed, 1437 insertions(+), 5 deletions(-) create mode 100644 _examples/basic/5-cqrs-protobuf/makefile create mode 100644 _examples/basic/6-cqrs-ordered-events/.validate_example.yml create mode 100644 _examples/basic/6-cqrs-ordered-events/README.md create mode 100644 _examples/basic/6-cqrs-ordered-events/activity.go create mode 100644 _examples/basic/6-cqrs-ordered-events/docker-compose.yml create mode 100644 _examples/basic/6-cqrs-ordered-events/go.mod create mode 100644 _examples/basic/6-cqrs-ordered-events/go.sum create mode 100644 _examples/basic/6-cqrs-ordered-events/main.go create mode 100644 _examples/basic/6-cqrs-ordered-events/makefile create mode 100644 _examples/basic/6-cqrs-ordered-events/message.go create mode 100644 _examples/basic/6-cqrs-ordered-events/messages.pb.go create mode 100644 _examples/basic/6-cqrs-ordered-events/proto/messages.proto create mode 100644 _examples/basic/6-cqrs-ordered-events/subscribers.go diff --git a/_examples/basic/5-cqrs-protobuf/docker-compose.yml b/_examples/basic/5-cqrs-protobuf/docker-compose.yml index f01a2a3e2..09ec5450e 100644 --- a/_examples/basic/5-cqrs-protobuf/docker-compose.yml +++ b/_examples/basic/5-cqrs-protobuf/docker-compose.yml @@ -17,3 +17,4 @@ services: rabbitmq: image: rabbitmq:3.7 restart: unless-stopped + attach: false diff --git a/_examples/basic/5-cqrs-protobuf/main.go b/_examples/basic/5-cqrs-protobuf/main.go index 48ca3bd0b..217ed1c9d 100644 --- a/_examples/basic/5-cqrs-protobuf/main.go +++ b/_examples/basic/5-cqrs-protobuf/main.go @@ -8,13 +8,12 @@ import ( "sync" "time" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp" "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" + "google.golang.org/protobuf/types/known/timestamppb" ) // BookRoomHandler is a command handler, which handles BookRoom command and emits RoomBooked. @@ -295,8 +294,8 @@ func main() { err = eventProcessor.AddHandlers( cqrs.NewEventHandler( "OrderBeerOnRoomBooked", - OrderBeerOnRoomBooked{commandBus}.Handle), - + OrderBeerOnRoomBooked{commandBus}.Handle, + ), cqrs.NewEventHandler( "LogBeerOrdered", func(ctx context.Context, event *BeerOrdered) error { @@ -304,7 +303,8 @@ func main() { "room_id": event.RoomId, }) return nil - }), + }, + ), cqrs.NewEventHandler( "BookingsFinancialReport", NewBookingsFinancialReport().Handle, diff --git a/_examples/basic/5-cqrs-protobuf/makefile b/_examples/basic/5-cqrs-protobuf/makefile new file mode 100644 index 000000000..b071206af --- /dev/null +++ b/_examples/basic/5-cqrs-protobuf/makefile @@ -0,0 +1,4 @@ +.PHONY: proto + +proto: + protoc --proto_path=proto --go_out=. --go_opt=paths=source_relative messages.proto diff --git a/_examples/basic/6-cqrs-ordered-events/.validate_example.yml b/_examples/basic/6-cqrs-ordered-events/.validate_example.yml new file mode 100644 index 000000000..f021a2513 --- /dev/null +++ b/_examples/basic/6-cqrs-ordered-events/.validate_example.yml @@ -0,0 +1,10 @@ +validation_cmd: "docker compose up" +teardown_cmd: "docker compose down" +timeout: 120 +expected_outputs: + - "Subscriber added subscriber_id" + - "Subscriber updated subscriber_id" + - "Subscriber removed subscriber_id" + - "\\[ACTIVITY\\] activity_type=SUBSCRIBED" + - "\\[ACTIVITY\\] activity_type=UNSUBSCRIBED" + - "\\[ACTIVITY\\] activity_type=EMAIL_UPDATED" diff --git a/_examples/basic/6-cqrs-ordered-events/README.md b/_examples/basic/6-cqrs-ordered-events/README.md new file mode 100644 index 000000000..3bd984aad --- /dev/null +++ b/_examples/basic/6-cqrs-ordered-events/README.md @@ -0,0 +1,13 @@ +# Example Golang CQRS application - ordered events + +This application is using [Watermill CQRS](http://watermill.io/docs/cqrs) component. + +Detailed documentation for CQRS can be found in Watermill's docs: [http://watermill.io/docs/cqrs#usage](http://watermill.io/docs/cqrs). + +This example, uses event groups to maintain order + +## Running + +```bash +docker-compose up +``` diff --git a/_examples/basic/6-cqrs-ordered-events/activity.go b/_examples/basic/6-cqrs-ordered-events/activity.go new file mode 100644 index 000000000..5b97065ba --- /dev/null +++ b/_examples/basic/6-cqrs-ordered-events/activity.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "sync" + "time" +) + +// ActivityEntry represents a single event in the timeline +type ActivityEntry struct { + Timestamp time.Time + SubscriberID string + ActivityType string + Details string +} + +// ActivityTimelineReadModel maintains a chronological log of all subscription-related events +type ActivityTimelineReadModel struct { + activities []ActivityEntry + lock sync.RWMutex +} + +func NewActivityTimelineModel() *ActivityTimelineReadModel { + return &ActivityTimelineReadModel{ + activities: make([]ActivityEntry, 0), + } +} + +// OnSubscribed handles subscription events +func (m *ActivityTimelineReadModel) OnSubscribed(ctx context.Context, event *SubscriberSubscribed) error { + m.lock.Lock() + defer m.lock.Unlock() + + entry := ActivityEntry{ + Timestamp: time.Now(), + SubscriberID: event.SubscriberId, + ActivityType: "SUBSCRIBED", + Details: fmt.Sprintf("Subscribed with email: %s", event.Email), + } + + m.activities = append(m.activities, entry) + m.logActivity(entry) + return nil +} + +// OnUnsubscribed handles unsubscription events +func (m *ActivityTimelineReadModel) OnUnsubscribed(ctx context.Context, event *SubscriberUnsubscribed) error { + m.lock.Lock() + defer m.lock.Unlock() + + entry := ActivityEntry{ + Timestamp: time.Now(), + SubscriberID: event.SubscriberId, + ActivityType: "UNSUBSCRIBED", + Details: "Subscriber unsubscribed", + } + + m.activities = append(m.activities, entry) + m.logActivity(entry) + return nil +} + +// OnEmailUpdated handles email update events +func (m *ActivityTimelineReadModel) OnEmailUpdated(ctx context.Context, event *SubscriberEmailUpdated) error { + m.lock.Lock() + defer m.lock.Unlock() + + entry := ActivityEntry{ + Timestamp: time.Now(), + SubscriberID: event.SubscriberId, + ActivityType: "EMAIL_UPDATED", + Details: fmt.Sprintf("Email updated to: %s", event.NewEmail), + } + + m.activities = append(m.activities, entry) + m.logActivity(entry) + return nil +} + +func (m *ActivityTimelineReadModel) logActivity(entry ActivityEntry) { + slog.Info( + "[ACTIVITY]", + "activity_type", entry.ActivityType, + "subscriber_id", entry.SubscriberID, + "details", entry.Details, + ) +} diff --git a/_examples/basic/6-cqrs-ordered-events/docker-compose.yml b/_examples/basic/6-cqrs-ordered-events/docker-compose.yml new file mode 100644 index 000000000..dfbd8cca1 --- /dev/null +++ b/_examples/basic/6-cqrs-ordered-events/docker-compose.yml @@ -0,0 +1,43 @@ +services: + golang: + image: golang:1.23 + restart: unless-stopped + ports: + - 8080:8080 + depends_on: + - kafka + - zookeeper + links: + - kafka + - zookeeper + volumes: + - .:/app + - $GOPATH/pkg/mod:/go/pkg/mod + working_dir: /app + command: go run . + + zookeeper: + container_name: zk + attach: false + image: confluentinc/cp-zookeeper:7.7.1 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 2181:2181 + + kafka: + container_name: kafka + attach: false + image: confluentinc/cp-kafka:7.7.1 + depends_on: + - zookeeper + ports: + - 9093:9093 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zk:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 \ No newline at end of file diff --git a/_examples/basic/6-cqrs-ordered-events/go.mod b/_examples/basic/6-cqrs-ordered-events/go.mod new file mode 100644 index 000000000..69d1a33e8 --- /dev/null +++ b/_examples/basic/6-cqrs-ordered-events/go.mod @@ -0,0 +1,43 @@ +module main.go + +require ( + github.com/ThreeDotsLabs/watermill v1.4.2-0.20241216125745-7ab13543158c + github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.5 + google.golang.org/protobuf v1.34.2 +) + +require ( + github.com/IBM/sarama v1.43.3 // indirect + github.com/cenkalti/backoff/v3 v3.2.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dnwe/otelsarama v0.0.0-20240308230250-9388d9d40bc0 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/lithammer/shortuuid/v3 v3.0.7 // indirect + github.com/oklog/ulid v1.3.1 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/sony/gobreaker v1.0.0 // indirect + go.opentelemetry.io/otel v1.29.0 // indirect + go.opentelemetry.io/otel/metric v1.29.0 // indirect + go.opentelemetry.io/otel/trace v1.29.0 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/net v0.28.0 // indirect +) + +go 1.21 diff --git a/_examples/basic/6-cqrs-ordered-events/go.sum b/_examples/basic/6-cqrs-ordered-events/go.sum new file mode 100644 index 000000000..35c7c441f --- /dev/null +++ b/_examples/basic/6-cqrs-ordered-events/go.sum @@ -0,0 +1,130 @@ +github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= +github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= +github.com/ThreeDotsLabs/watermill v1.4.2-0.20241216125745-7ab13543158c h1:YucRnlvVXDydpiKp88znVBjcAYDIpmjKtHxixEAEJ38= +github.com/ThreeDotsLabs/watermill v1.4.2-0.20241216125745-7ab13543158c/go.mod h1:fO+fo0wGp1+dKQGkoPdLGIHXxwmYj+kitAXJDk/6l0Y= +github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.5 h1:ud+4txnRgtr3kZXfXZ5+C7kVQEvsLc5HSNUEa0g+X1Q= +github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.5/go.mod h1:t4o+4A6GB+XC8WL3DandhzPwd265zQuyWMQC/I+WIOU= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dnwe/otelsarama v0.0.0-20240308230250-9388d9d40bc0 h1:R2zQhFwSCyyd7L43igYjDrH0wkC/i+QBPELuY0HOu84= +github.com/dnwe/otelsarama v0.0.0-20240308230250-9388d9d40bc0/go.mod h1:2MqLKYJfjs3UriXXF9Fd0Qmh/lhxi/6tHXkqtXxyIHc= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= +github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ= +github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= +go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= +go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= +go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= +go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= +go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/_examples/basic/6-cqrs-ordered-events/main.go b/_examples/basic/6-cqrs-ordered-events/main.go new file mode 100644 index 000000000..99d25f5ac --- /dev/null +++ b/_examples/basic/6-cqrs-ordered-events/main.go @@ -0,0 +1,264 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka" + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" +) + +var amqpAddress = "amqp://guest:guest@rabbitmq:5672/" + +func main() { + slog.SetLogLoggerLevel(slog.LevelDebug) + + logger := watermill.NewSlogLoggerWithLevelMapping(nil, map[slog.Level]slog.Level{ + slog.LevelInfo: slog.LevelDebug, + }) + + // We are decorating ProtobufMarshaler to add extra metadata to the message. + cqrsMarshaler := CqrsMarshalerDecorator{ + cqrs.ProtobufMarshaler{ + // It will generate topic names based on the event/command type. + // So for example, for "RoomBooked" name will be "RoomBooked". + GenerateName: cqrs.StructName, + }, + } + + watermillLogger := watermill.NewSlogLoggerWithLevelMapping( + slog.With("watermill", true), + map[slog.Level]slog.Level{ + slog.LevelInfo: slog.LevelDebug, + }, + ) + + // This marshaler converts Watermill messages to Kafka messages. + // We are using it to add partition key to the Kafka message. + kafkaMarshaler := kafka.NewWithPartitioningMarshaler(GenerateKafkaPartitionKey) + + // You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/ + publisher, err := kafka.NewPublisher( + kafka.PublisherConfig{ + Brokers: []string{"kafka:9092"}, + Marshaler: kafkaMarshaler, + }, + watermillLogger, + ) + if err != nil { + panic(err) + } + + // CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/ + router, err := message.NewRouter(message.RouterConfig{}, logger) + if err != nil { + panic(err) + } + + // Simple middleware which will recover panics from event or command handlers. + // More about router middlewares you can find in the documentation: + // https://watermill.io/docs/messages-router/#middleware + // + // List of available middlewares you can find in message/router/middleware. + router.AddMiddleware(middleware.Recoverer) + router.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc { + return func(msg *message.Message) ([]*message.Message, error) { + slog.Debug("Received message", "metadata", msg.Metadata) + return h(msg) + } + }) + + commandBus, err := cqrs.NewCommandBusWithConfig(publisher, cqrs.CommandBusConfig{ + GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) { + // We are using one topic for all commands to maintain the order of commands. + return "commands", nil + }, + Marshaler: cqrsMarshaler, + Logger: logger, + }) + if err != nil { + panic(err) + } + + eventBus, err := cqrs.NewEventBusWithConfig(publisher, cqrs.EventBusConfig{ + GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { + // We are using one topic for all events to maintain the order of events. + return "events", nil + }, + Marshaler: cqrsMarshaler, + Logger: logger, + }) + if err != nil { + panic(err) + } + + commandProcessor, err := cqrs.NewCommandProcessorWithConfig( + router, + cqrs.CommandProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) { + return "commands", nil + }, + SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return kafka.NewSubscriber( + kafka.SubscriberConfig{ + Brokers: []string{"kafka:9092"}, + ConsumerGroup: params.HandlerName, + Unmarshaler: kafkaMarshaler, + }, + watermillLogger, + ) + }, + Marshaler: cqrsMarshaler, + Logger: logger, + }, + ) + if err != nil { + panic(err) + } + + eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig( + router, + cqrs.EventGroupProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) { + return "events", nil + }, + SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return kafka.NewSubscriber( + kafka.SubscriberConfig{ + Brokers: []string{"kafka:9092"}, + ConsumerGroup: params.EventGroupName, + Unmarshaler: kafkaMarshaler, + }, + watermillLogger, + ) + }, + Marshaler: cqrsMarshaler, + Logger: logger, + }, + ) + if err != nil { + panic(err) + } + + err = commandProcessor.AddHandlers( + cqrs.NewCommandHandler("SubscribeHandler", SubscribeHandler{eventBus}.Handle), + cqrs.NewCommandHandler("UnsubscribeHandler", UnsubscribeHandler{eventBus}.Handle), + cqrs.NewCommandHandler("UpdateEmailHandler", UpdateEmailHandler{eventBus}.Handle), + ) + if err != nil { + panic(err) + } + + subscribersReadModel := NewSubscriberReadModel() + + // All messages from this group will have one subscription. + // When message arrives, Watermill will match it with the correct handler. + err = eventProcessor.AddHandlersGroup( + "SubscriberReadModel", + cqrs.NewGroupEventHandler(subscribersReadModel.OnSubscribed), + cqrs.NewGroupEventHandler(subscribersReadModel.OnUnsubscribed), + cqrs.NewGroupEventHandler(subscribersReadModel.OnEmailUpdated), + ) + if err != nil { + panic(err) + } + + activityReadModel := NewActivityTimelineModel() + + // All messages from this group will have one subscription. + // When message arrives, Watermill will match it with the correct handler. + err = eventProcessor.AddHandlersGroup( + "ActivityTimelineReadModel", + cqrs.NewGroupEventHandler(activityReadModel.OnSubscribed), + cqrs.NewGroupEventHandler(activityReadModel.OnUnsubscribed), + cqrs.NewGroupEventHandler(activityReadModel.OnEmailUpdated), + ) + if err != nil { + panic(err) + } + + slog.Info("Starting service") + + go simulateTraffic(commandBus) + + if err := router.Run(context.Background()); err != nil { + panic(err) + } +} + +func simulateTraffic(commandBus *cqrs.CommandBus) { + for i := 0; ; i++ { + subscriberID := watermill.NewUUID() + + err := commandBus.Send(context.Background(), &Subscribe{ + Metadata: GenerateMessageMetadata(subscriberID), + SubscriberId: subscriberID, + Email: fmt.Sprintf("user%d@example.com", i), + }) + if err != nil { + slog.Error("Error sending Subscribe command", "err", err) + } + time.Sleep(time.Millisecond * 500) + + err = commandBus.Send(context.Background(), &UpdateEmail{ + Metadata: GenerateMessageMetadata(subscriberID), + SubscriberId: subscriberID, + NewEmail: fmt.Sprintf("updated%d@example.com", i), + }) + if err != nil { + slog.Error("Error sending UpdateEmail command", "err", err) + } + time.Sleep(time.Millisecond * 500) + + if i%3 == 0 { + err = commandBus.Send(context.Background(), &Unsubscribe{ + Metadata: GenerateMessageMetadata(subscriberID), + SubscriberId: subscriberID, + }) + if err != nil { + slog.Error("Error sending Unsubscribe command", "err", err) + } + } + time.Sleep(time.Millisecond * 500) + } +} + +type SubscribeHandler struct { + eventBus *cqrs.EventBus +} + +func (h SubscribeHandler) Handle(ctx context.Context, cmd *Subscribe) error { + return h.eventBus.Publish(ctx, &SubscriberSubscribed{ + Metadata: GenerateMessageMetadata(cmd.SubscriberId), + SubscriberId: cmd.SubscriberId, + Email: cmd.Email, + }) +} + +type UnsubscribeHandler struct { + eventBus *cqrs.EventBus +} + +func (h UnsubscribeHandler) Handle(ctx context.Context, cmd *Unsubscribe) error { + return h.eventBus.Publish(ctx, &SubscriberUnsubscribed{ + Metadata: GenerateMessageMetadata(cmd.SubscriberId), + SubscriberId: cmd.SubscriberId, + }) +} + +type UpdateEmailHandler struct { + eventBus *cqrs.EventBus +} + +func (h UpdateEmailHandler) Handle(ctx context.Context, cmd *UpdateEmail) error { + return h.eventBus.Publish(ctx, &SubscriberEmailUpdated{ + Metadata: GenerateMessageMetadata(cmd.SubscriberId), + SubscriberId: cmd.SubscriberId, + NewEmail: cmd.NewEmail, + }) +} diff --git a/_examples/basic/6-cqrs-ordered-events/makefile b/_examples/basic/6-cqrs-ordered-events/makefile new file mode 100644 index 000000000..b071206af --- /dev/null +++ b/_examples/basic/6-cqrs-ordered-events/makefile @@ -0,0 +1,4 @@ +.PHONY: proto + +proto: + protoc --proto_path=proto --go_out=. --go_opt=paths=source_relative messages.proto diff --git a/_examples/basic/6-cqrs-ordered-events/message.go b/_examples/basic/6-cqrs-ordered-events/message.go new file mode 100644 index 000000000..3c644b632 --- /dev/null +++ b/_examples/basic/6-cqrs-ordered-events/message.go @@ -0,0 +1,56 @@ +package main + +import ( + "fmt" + "log/slog" + + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/ThreeDotsLabs/watermill/message" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func GenerateMessageMetadata(partitionKey string) *MessageMetadata { + return &MessageMetadata{ + PartitionKey: partitionKey, + CreatedAt: timestamppb.Now(), + } +} + +type CqrsMarshalerDecorator struct { + cqrs.ProtobufMarshaler +} + +const PartitionKeyMetadataField = "partition_key" + +func (c CqrsMarshalerDecorator) Marshal(v interface{}) (*message.Message, error) { + msg, err := c.ProtobufMarshaler.Marshal(v) + if err != nil { + return nil, err + } + + pm, ok := v.(ProtoMessage) + if !ok { + return nil, fmt.Errorf("%T does not implement ProtoMessage and can't be marshaled", v) + } + + metadata := pm.GetMetadata() + if metadata == nil { + return nil, fmt.Errorf("%T.GetMetadata returned nil", v) + } + + msg.Metadata.Set(PartitionKeyMetadataField, metadata.PartitionKey) + msg.Metadata.Set("created_at", metadata.CreatedAt.AsTime().String()) + + return msg, nil +} + +type ProtoMessage interface { + GetMetadata() *MessageMetadata +} + +// GenerateKafkaPartitionKey is a function that generates a partition key for Kafka messages. +func GenerateKafkaPartitionKey(topic string, msg *message.Message) (string, error) { + slog.Debug("Setting partition key", "topic", topic, "msg_metadata", msg.Metadata) + + return msg.Metadata.Get(PartitionKeyMetadataField), nil +} diff --git a/_examples/basic/6-cqrs-ordered-events/messages.pb.go b/_examples/basic/6-cqrs-ordered-events/messages.pb.go new file mode 100644 index 000000000..519ec4128 --- /dev/null +++ b/_examples/basic/6-cqrs-ordered-events/messages.pb.go @@ -0,0 +1,654 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v4.24.4 +// source: messages.proto + +package main + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type MessageMetadata struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + PartitionKey string `protobuf:"bytes,1,opt,name=partition_key,json=partitionKey,proto3" json:"partition_key,omitempty"` + CreatedAt *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` +} + +func (x *MessageMetadata) Reset() { + *x = MessageMetadata{} + if protoimpl.UnsafeEnabled { + mi := &file_messages_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MessageMetadata) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MessageMetadata) ProtoMessage() {} + +func (x *MessageMetadata) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MessageMetadata.ProtoReflect.Descriptor instead. +func (*MessageMetadata) Descriptor() ([]byte, []int) { + return file_messages_proto_rawDescGZIP(), []int{0} +} + +func (x *MessageMetadata) GetPartitionKey() string { + if x != nil { + return x.PartitionKey + } + return "" +} + +func (x *MessageMetadata) GetCreatedAt() *timestamppb.Timestamp { + if x != nil { + return x.CreatedAt + } + return nil +} + +// Commands +type Subscribe struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Metadata *MessageMetadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` + SubscriberId string `protobuf:"bytes,2,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"` + Email string `protobuf:"bytes,3,opt,name=email,proto3" json:"email,omitempty"` +} + +func (x *Subscribe) Reset() { + *x = Subscribe{} + if protoimpl.UnsafeEnabled { + mi := &file_messages_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Subscribe) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Subscribe) ProtoMessage() {} + +func (x *Subscribe) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Subscribe.ProtoReflect.Descriptor instead. +func (*Subscribe) Descriptor() ([]byte, []int) { + return file_messages_proto_rawDescGZIP(), []int{1} +} + +func (x *Subscribe) GetMetadata() *MessageMetadata { + if x != nil { + return x.Metadata + } + return nil +} + +func (x *Subscribe) GetSubscriberId() string { + if x != nil { + return x.SubscriberId + } + return "" +} + +func (x *Subscribe) GetEmail() string { + if x != nil { + return x.Email + } + return "" +} + +type Unsubscribe struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Metadata *MessageMetadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` + SubscriberId string `protobuf:"bytes,2,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"` +} + +func (x *Unsubscribe) Reset() { + *x = Unsubscribe{} + if protoimpl.UnsafeEnabled { + mi := &file_messages_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Unsubscribe) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Unsubscribe) ProtoMessage() {} + +func (x *Unsubscribe) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Unsubscribe.ProtoReflect.Descriptor instead. +func (*Unsubscribe) Descriptor() ([]byte, []int) { + return file_messages_proto_rawDescGZIP(), []int{2} +} + +func (x *Unsubscribe) GetMetadata() *MessageMetadata { + if x != nil { + return x.Metadata + } + return nil +} + +func (x *Unsubscribe) GetSubscriberId() string { + if x != nil { + return x.SubscriberId + } + return "" +} + +type UpdateEmail struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Metadata *MessageMetadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` + SubscriberId string `protobuf:"bytes,2,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"` + NewEmail string `protobuf:"bytes,3,opt,name=new_email,json=newEmail,proto3" json:"new_email,omitempty"` +} + +func (x *UpdateEmail) Reset() { + *x = UpdateEmail{} + if protoimpl.UnsafeEnabled { + mi := &file_messages_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UpdateEmail) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UpdateEmail) ProtoMessage() {} + +func (x *UpdateEmail) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UpdateEmail.ProtoReflect.Descriptor instead. +func (*UpdateEmail) Descriptor() ([]byte, []int) { + return file_messages_proto_rawDescGZIP(), []int{3} +} + +func (x *UpdateEmail) GetMetadata() *MessageMetadata { + if x != nil { + return x.Metadata + } + return nil +} + +func (x *UpdateEmail) GetSubscriberId() string { + if x != nil { + return x.SubscriberId + } + return "" +} + +func (x *UpdateEmail) GetNewEmail() string { + if x != nil { + return x.NewEmail + } + return "" +} + +// Events +type SubscriberSubscribed struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Metadata *MessageMetadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` + SubscriberId string `protobuf:"bytes,2,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"` + Email string `protobuf:"bytes,3,opt,name=email,proto3" json:"email,omitempty"` +} + +func (x *SubscriberSubscribed) Reset() { + *x = SubscriberSubscribed{} + if protoimpl.UnsafeEnabled { + mi := &file_messages_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscriberSubscribed) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscriberSubscribed) ProtoMessage() {} + +func (x *SubscriberSubscribed) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscriberSubscribed.ProtoReflect.Descriptor instead. +func (*SubscriberSubscribed) Descriptor() ([]byte, []int) { + return file_messages_proto_rawDescGZIP(), []int{4} +} + +func (x *SubscriberSubscribed) GetMetadata() *MessageMetadata { + if x != nil { + return x.Metadata + } + return nil +} + +func (x *SubscriberSubscribed) GetSubscriberId() string { + if x != nil { + return x.SubscriberId + } + return "" +} + +func (x *SubscriberSubscribed) GetEmail() string { + if x != nil { + return x.Email + } + return "" +} + +type SubscriberUnsubscribed struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Metadata *MessageMetadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` + SubscriberId string `protobuf:"bytes,2,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"` +} + +func (x *SubscriberUnsubscribed) Reset() { + *x = SubscriberUnsubscribed{} + if protoimpl.UnsafeEnabled { + mi := &file_messages_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscriberUnsubscribed) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscriberUnsubscribed) ProtoMessage() {} + +func (x *SubscriberUnsubscribed) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscriberUnsubscribed.ProtoReflect.Descriptor instead. +func (*SubscriberUnsubscribed) Descriptor() ([]byte, []int) { + return file_messages_proto_rawDescGZIP(), []int{5} +} + +func (x *SubscriberUnsubscribed) GetMetadata() *MessageMetadata { + if x != nil { + return x.Metadata + } + return nil +} + +func (x *SubscriberUnsubscribed) GetSubscriberId() string { + if x != nil { + return x.SubscriberId + } + return "" +} + +type SubscriberEmailUpdated struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Metadata *MessageMetadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` + SubscriberId string `protobuf:"bytes,2,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"` + NewEmail string `protobuf:"bytes,3,opt,name=new_email,json=newEmail,proto3" json:"new_email,omitempty"` +} + +func (x *SubscriberEmailUpdated) Reset() { + *x = SubscriberEmailUpdated{} + if protoimpl.UnsafeEnabled { + mi := &file_messages_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscriberEmailUpdated) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscriberEmailUpdated) ProtoMessage() {} + +func (x *SubscriberEmailUpdated) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscriberEmailUpdated.ProtoReflect.Descriptor instead. +func (*SubscriberEmailUpdated) Descriptor() ([]byte, []int) { + return file_messages_proto_rawDescGZIP(), []int{6} +} + +func (x *SubscriberEmailUpdated) GetMetadata() *MessageMetadata { + if x != nil { + return x.Metadata + } + return nil +} + +func (x *SubscriberEmailUpdated) GetSubscriberId() string { + if x != nil { + return x.SubscriberId + } + return "" +} + +func (x *SubscriberEmailUpdated) GetNewEmail() string { + if x != nil { + return x.NewEmail + } + return "" +} + +var File_messages_proto protoreflect.FileDescriptor + +var file_messages_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x04, 0x6d, 0x61, 0x69, 0x6e, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x71, 0x0a, 0x0f, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x23, 0x0a, 0x0d, 0x70, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12, + 0x39, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, + 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22, 0x79, 0x0a, 0x09, 0x53, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x31, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x61, 0x69, 0x6e, + 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0c, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x49, 0x64, 0x12, + 0x14, 0x0a, 0x05, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x22, 0x65, 0x0a, 0x0b, 0x55, 0x6e, 0x73, 0x75, 0x62, 0x73, 0x63, + 0x72, 0x69, 0x62, 0x65, 0x12, 0x31, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x75, 0x62, 0x73, 0x63, + 0x72, 0x69, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, + 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x49, 0x64, 0x22, 0x82, 0x01, 0x0a, + 0x0b, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x45, 0x6d, 0x61, 0x69, 0x6c, 0x12, 0x31, 0x0a, 0x08, + 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, + 0x2e, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, + 0x23, 0x0a, 0x0d, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x65, 0x77, 0x5f, 0x65, 0x6d, 0x61, 0x69, + 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x65, 0x77, 0x45, 0x6d, 0x61, 0x69, + 0x6c, 0x22, 0x84, 0x01, 0x0a, 0x14, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, + 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x64, 0x12, 0x31, 0x0a, 0x08, 0x6d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, + 0x61, 0x69, 0x6e, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x23, 0x0a, + 0x0d, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, + 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x22, 0x70, 0x0a, 0x16, 0x53, 0x75, 0x62, 0x73, + 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x55, 0x6e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x64, 0x12, 0x31, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x49, 0x64, 0x22, 0x8d, 0x01, 0x0a, 0x16, 0x53, + 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x45, 0x6d, 0x61, 0x69, 0x6c, 0x55, 0x70, + 0x64, 0x61, 0x74, 0x65, 0x64, 0x12, 0x31, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, + 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, + 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x75, 0x62, 0x73, + 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0c, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, + 0x09, 0x6e, 0x65, 0x77, 0x5f, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x08, 0x6e, 0x65, 0x77, 0x45, 0x6d, 0x61, 0x69, 0x6c, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, + 0x6d, 0x61, 0x69, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_messages_proto_rawDescOnce sync.Once + file_messages_proto_rawDescData = file_messages_proto_rawDesc +) + +func file_messages_proto_rawDescGZIP() []byte { + file_messages_proto_rawDescOnce.Do(func() { + file_messages_proto_rawDescData = protoimpl.X.CompressGZIP(file_messages_proto_rawDescData) + }) + return file_messages_proto_rawDescData +} + +var file_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_messages_proto_goTypes = []interface{}{ + (*MessageMetadata)(nil), // 0: main.MessageMetadata + (*Subscribe)(nil), // 1: main.Subscribe + (*Unsubscribe)(nil), // 2: main.Unsubscribe + (*UpdateEmail)(nil), // 3: main.UpdateEmail + (*SubscriberSubscribed)(nil), // 4: main.SubscriberSubscribed + (*SubscriberUnsubscribed)(nil), // 5: main.SubscriberUnsubscribed + (*SubscriberEmailUpdated)(nil), // 6: main.SubscriberEmailUpdated + (*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp +} +var file_messages_proto_depIdxs = []int32{ + 7, // 0: main.MessageMetadata.created_at:type_name -> google.protobuf.Timestamp + 0, // 1: main.Subscribe.metadata:type_name -> main.MessageMetadata + 0, // 2: main.Unsubscribe.metadata:type_name -> main.MessageMetadata + 0, // 3: main.UpdateEmail.metadata:type_name -> main.MessageMetadata + 0, // 4: main.SubscriberSubscribed.metadata:type_name -> main.MessageMetadata + 0, // 5: main.SubscriberUnsubscribed.metadata:type_name -> main.MessageMetadata + 0, // 6: main.SubscriberEmailUpdated.metadata:type_name -> main.MessageMetadata + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name +} + +func init() { file_messages_proto_init() } +func file_messages_proto_init() { + if File_messages_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_messages_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MessageMetadata); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_messages_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Subscribe); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_messages_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Unsubscribe); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_messages_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UpdateEmail); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_messages_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscriberSubscribed); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_messages_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscriberUnsubscribed); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_messages_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscriberEmailUpdated); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_messages_proto_rawDesc, + NumEnums: 0, + NumMessages: 7, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_messages_proto_goTypes, + DependencyIndexes: file_messages_proto_depIdxs, + MessageInfos: file_messages_proto_msgTypes, + }.Build() + File_messages_proto = out.File + file_messages_proto_rawDesc = nil + file_messages_proto_goTypes = nil + file_messages_proto_depIdxs = nil +} diff --git a/_examples/basic/6-cqrs-ordered-events/proto/messages.proto b/_examples/basic/6-cqrs-ordered-events/proto/messages.proto new file mode 100644 index 000000000..574315f4b --- /dev/null +++ b/_examples/basic/6-cqrs-ordered-events/proto/messages.proto @@ -0,0 +1,53 @@ +syntax = "proto3"; +package main; + +option go_package = "./main"; + +import "google/protobuf/timestamp.proto"; + +message MessageMetadata { + string partition_key = 1; + google.protobuf.Timestamp created_at = 2; +} + +// Commands +message Subscribe { + MessageMetadata metadata = 1; + + string subscriber_id = 2; + string email = 3; +} + +message Unsubscribe { + MessageMetadata metadata = 1; + + string subscriber_id = 2; +} + +message UpdateEmail { + MessageMetadata metadata = 1; + + string subscriber_id = 2; + string new_email = 3; +} + +// Events +message SubscriberSubscribed { + MessageMetadata metadata = 1; + + string subscriber_id = 2; + string email = 3; +} + +message SubscriberUnsubscribed { + MessageMetadata metadata = 1; + + string subscriber_id = 2; +} + +message SubscriberEmailUpdated { + MessageMetadata metadata = 1; + + string subscriber_id = 2; + string new_email = 3; +} \ No newline at end of file diff --git a/_examples/basic/6-cqrs-ordered-events/subscribers.go b/_examples/basic/6-cqrs-ordered-events/subscribers.go new file mode 100644 index 000000000..48d4e0a45 --- /dev/null +++ b/_examples/basic/6-cqrs-ordered-events/subscribers.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "log/slog" + "sync" +) + +type SubscriberReadModel struct { + subscribers map[string]string // map[subscriberID]email + lock sync.RWMutex +} + +func NewSubscriberReadModel() *SubscriberReadModel { + return &SubscriberReadModel{ + subscribers: make(map[string]string), + } +} + +func (m *SubscriberReadModel) OnSubscribed(ctx context.Context, event *SubscriberSubscribed) error { + m.lock.Lock() + defer m.lock.Unlock() + + m.subscribers[event.SubscriberId] = event.Email + + slog.Info( + "Subscriber added", + "subscriber_id", event.SubscriberId, + "email", event.Email, + ) + + return nil +} + +func (m *SubscriberReadModel) OnUnsubscribed(ctx context.Context, event *SubscriberUnsubscribed) error { + m.lock.Lock() + defer m.lock.Unlock() + + delete(m.subscribers, event.SubscriberId) + + slog.Info( + "Subscriber removed", + "subscriber_id", event.SubscriberId, + ) + + return nil +} + +func (m *SubscriberReadModel) OnEmailUpdated(ctx context.Context, event *SubscriberEmailUpdated) error { + m.lock.Lock() + defer m.lock.Unlock() + + m.subscribers[event.SubscriberId] = event.NewEmail + + slog.Info( + "Subscriber updated", + "subscriber_id", event.SubscriberId, + "email", event.NewEmail, + ) + + return nil +} + +func (m *SubscriberReadModel) GetSubscriberCount() int { + m.lock.RLock() + defer m.lock.RUnlock() + return len(m.subscribers) +} From 8a967012e0dc138d4d802b8f079b92bd1b6e52a8 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Mon, 16 Dec 2024 15:48:30 +0100 Subject: [PATCH 05/12] fix output --- _examples/basic/5-cqrs-protobuf/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_examples/basic/5-cqrs-protobuf/main.go b/_examples/basic/5-cqrs-protobuf/main.go index 217ed1c9d..dd2a7c3ad 100644 --- a/_examples/basic/5-cqrs-protobuf/main.go +++ b/_examples/basic/5-cqrs-protobuf/main.go @@ -122,7 +122,7 @@ func (b *BookingsFinancialReport) Handle(ctx context.Context, event *RoomBooked) b.totalCharge += event.Price - slog.Info(">>> Already booked rooms for $%d\n", b.totalCharge) + slog.Info(fmt.Sprintf(">>> Already booked rooms for $%d\n", b.totalCharge)) return nil } From b930331cc4c301935e8ca4f6b06762791e6e7aea Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Mon, 16 Dec 2024 16:15:23 +0100 Subject: [PATCH 06/12] added README for CQRS ordered events example --- .../basic/6-cqrs-ordered-events/README.md | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/_examples/basic/6-cqrs-ordered-events/README.md b/_examples/basic/6-cqrs-ordered-events/README.md index 3bd984aad..9581cc344 100644 --- a/_examples/basic/6-cqrs-ordered-events/README.md +++ b/_examples/basic/6-cqrs-ordered-events/README.md @@ -4,7 +4,24 @@ This application is using [Watermill CQRS](http://watermill.io/docs/cqrs) compon Detailed documentation for CQRS can be found in Watermill's docs: [http://watermill.io/docs/cqrs#usage](http://watermill.io/docs/cqrs). -This example, uses event groups to maintain order +This example, uses event groups to maintain order of events. You can read more about them in the [Watermill documentation](https://watermill.io/docs/cqrs/). +We are also using Kafka's partitioning keys to increase processing throughput without losing order of events. + + +## What does this application do? + +This application manages an email subscription system where users can: + +1. Subscribe to receive emails by providing their email address +2. Update their email address after subscribing +3. Unsubscribe from the mailing list + +The system maintains: +- A current list of all active subscribers +- A timeline of all subscription-related activities + +If events won't be ordered, and `SubscriberSubscribed` would arrive after `SubscriberUnsubscribed` event, +the subscriber will be still subscribed. ## Running From 6b0d4f4b4c4bd399a0912e1af60df4540ce09d59 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Mon, 16 Dec 2024 16:18:42 +0100 Subject: [PATCH 07/12] added docs for EventProcessor vs EventGroupProcessor --- docs/content/docs/cqrs.md | 46 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/docs/content/docs/cqrs.md b/docs/content/docs/cqrs.md index 4729dc7e6..e0daa0350 100644 --- a/docs/content/docs/cqrs.md +++ b/docs/content/docs/cqrs.md @@ -132,11 +132,52 @@ In the scenario, when we have multiple event types on one topic, you have two op 1. You can set `EventConfig.AckOnUnknownEvent` to true - it will acknowledge all events that are not handled by handler, 2. You can use Event Handler groups mechanism. +**Key differences between `EventProcessor` and `EventGroupProcessor`:** + +1. `EventProcessor`: +- Each handler has its own subscriber +- One handler per event type +- Simple one-to-one matching of events to handlers +2. `EventGroupProcessor`: +- Multiple handlers share a single subscriber +- Can have multiple handlers for the same event type +- Iterates through all handlers in the group +- Message is considered processed if at least one handler processes it +- All handlers in a group receive messages in the same order +- If one handler fails, the message is nacked and will be redelivered to all handlers + +```kroki {type=mermaid} +graph TD + subgraph Individual Handlers + E[Event Bus] --> S1[Subscriber 1] + E --> S2[Subscriber 2] + E --> S3[Subscriber 3] + S1 --> H1[Handler 1] + S2 --> H2[Handler 2] + S3 --> H3[Handler 3] + end + + subgraph Group Handlers + EB[Event Bus] --> SharedSub[Shared Subscriber] + SharedSub --> GH[Handler Group] + GH --> GH1[Handler 1] + GH --> GH2[Handler 2] + GH --> GH3[Handler 3] + end +``` + +{{< callout context="note" title="Note" icon="outline/info-circle" >}} +It's allowed to have multiple handlers for the same event type in one group, but we recommend to not do that. + +Please keep in mind that those handlers will be processed within the same message. +If first handler succeeds and the second fails, the message will be re-delivered and the first will be re-executed. +{{< /callout >}} + To use event groups, you need to set `GenerateHandlerGroupSubscribeTopic` and `GroupSubscriberConstructor` options in [`EventConfig`](#event-config). After that, you can use `AddHandlersGroup` on [`EventProcessor`](#event-processor). -{{% load-snippet-partial file="src-link/_examples/basic/5-cqrs-protobuf/main.go" first_line_contains="eventProcessor.AddHandlersGroup(" last_line_contains="if err != nil {" padding_after="0" %}} +{{% load-snippet-partial file="src-link/_examples/basic/6-cqrs-ordered-events/main.go" first_line_contains="eventProcessor.AddHandlersGroup(" last_line_contains="if err != nil {" padding_after="0" %}} Both `GenerateHandlerGroupSubscribeTopic` and `GroupSubscriberConstructor` receives information about group name in function arguments. @@ -144,7 +185,7 @@ Both `GenerateHandlerGroupSubscribeTopic` and `GroupSubscriberConstructor` recei Since Watermill v1.3 it's possible to use generic handlers for commands and events. It's useful when you have a lot of commands/events and you don't want to create a handler for each of them. -{{% load-snippet-partial file="src-link/_examples/basic/5-cqrs-protobuf/main.go" first_line_contains="cqrs.NewGroupEventHandler" last_line_contains="})," padding_after="0" %}} +{{% load-snippet-partial file="src-link/_examples/basic/6-cqrs-ordered-events/main.go" first_line_contains="cqrs.NewGroupEventHandler" last_line_contains=")," padding_after="0" %}} Under the hood, it creates EventHandler or CommandHandler implementation. It's available for all kind of handlers. @@ -154,7 +195,6 @@ It's available for all kind of handlers. {{% load-snippet-partial file="src-link/components/cqrs/event_handler.go" first_line_contains="// NewEventHandler" last_line_contains="func NewEventHandler" padding_after="0" %}} {{% load-snippet-partial file="src-link/components/cqrs/event_handler.go" first_line_contains="// NewGroupEventHandler" last_line_contains="func NewGroupEventHandler" padding_after="0" %}} - ### Building a read model with the event handler {{% load-snippet-partial file="src-link/_examples/basic/5-cqrs-protobuf/main.go" first_line_contains="// BookingsFinancialReport is a read model" last_line_contains="func main() {" padding_after="0" %}} From 91673621dc7b4c55b2651ffae8b3f7f18c50f7ed Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Mon, 16 Dec 2024 16:19:17 +0100 Subject: [PATCH 08/12] added DEVELOP.md for docs --- docs/DEVELOP.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 docs/DEVELOP.md diff --git a/docs/DEVELOP.md b/docs/DEVELOP.md new file mode 100644 index 000000000..6c7aaa8a0 --- /dev/null +++ b/docs/DEVELOP.md @@ -0,0 +1,14 @@ +## How to Develop watermill.io docs? + +### Building & running + +```bash +./build.sh +npm run dev +``` + +### Useful resources + +- [Available shortcodes](https://getdoks.org/docs/basics/shortcodes/) +- [Diagrams](https://getdoks.org/docs/built-ins/diagrams/) (we recommend [Mermaid](https://getdoks.org/docs/built-ins/diagrams/#mermaid)) +- [Codeglocks](https://getdoks.org/docs/built-ins/code-blocks/) From d414391ba3e06afc13e03e767575c46af74df05f Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Mon, 16 Dec 2024 16:31:13 +0100 Subject: [PATCH 09/12] improve CQRS event groups docs --- _examples/basic/6-cqrs-ordered-events/README.md | 6 +++++- docs/content/docs/cqrs.md | 15 ++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/_examples/basic/6-cqrs-ordered-events/README.md b/_examples/basic/6-cqrs-ordered-events/README.md index 9581cc344..e90f6fdd5 100644 --- a/_examples/basic/6-cqrs-ordered-events/README.md +++ b/_examples/basic/6-cqrs-ordered-events/README.md @@ -4,7 +4,7 @@ This application is using [Watermill CQRS](http://watermill.io/docs/cqrs) compon Detailed documentation for CQRS can be found in Watermill's docs: [http://watermill.io/docs/cqrs#usage](http://watermill.io/docs/cqrs). -This example, uses event groups to maintain order of events. You can read more about them in the [Watermill documentation](https://watermill.io/docs/cqrs/). +This example, uses event groups to keep order for multiple events. You can read more about them in the [Watermill documentation](https://watermill.io/docs/cqrs/). We are also using Kafka's partitioning keys to increase processing throughput without losing order of events. @@ -23,6 +23,10 @@ The system maintains: If events won't be ordered, and `SubscriberSubscribed` would arrive after `SubscriberUnsubscribed` event, the subscriber will be still subscribed. +## Possible improvements + +You may consider creating sub-topics xxxx + ## Running ```bash diff --git a/docs/content/docs/cqrs.md b/docs/content/docs/cqrs.md index e0daa0350..2923f2a2d 100644 --- a/docs/content/docs/cqrs.md +++ b/docs/content/docs/cqrs.md @@ -135,16 +135,14 @@ In the scenario, when we have multiple event types on one topic, you have two op **Key differences between `EventProcessor` and `EventGroupProcessor`:** 1. `EventProcessor`: -- Each handler has its own subscriber +- Each handler has its own subscriber instance - One handler per event type - Simple one-to-one matching of events to handlers + 2. `EventGroupProcessor`: -- Multiple handlers share a single subscriber -- Can have multiple handlers for the same event type -- Iterates through all handlers in the group -- Message is considered processed if at least one handler processes it -- All handlers in a group receive messages in the same order -- If one handler fails, the message is nacked and will be redelivered to all handlers +- Group of handlers share a single subscriber instance (and one consumer group, if such mechanism is supported) +- One handler group can support multiple event types, +- When message arrives to the topic, Watermill will match it to the handler in the group based on event type ```kroki {type=mermaid} graph TD @@ -166,6 +164,9 @@ graph TD end ``` +**Event Handler groups are helpful, when you have multiple event of different types, and you want to maintain order of events.** +Thanks to using one subscriber instance and consumer group, events will be processed in the order they were sent. + {{< callout context="note" title="Note" icon="outline/info-circle" >}} It's allowed to have multiple handlers for the same event type in one group, but we recommend to not do that. From e25e8b445bb678fe12a6ebf62a8fc19fc24f5740 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Mon, 16 Dec 2024 16:31:13 +0100 Subject: [PATCH 10/12] improve CQRS event groups docs --- _examples/basic/6-cqrs-ordered-events/README.md | 2 +- docs/content/docs/cqrs.md | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/_examples/basic/6-cqrs-ordered-events/README.md b/_examples/basic/6-cqrs-ordered-events/README.md index 9581cc344..4474d2354 100644 --- a/_examples/basic/6-cqrs-ordered-events/README.md +++ b/_examples/basic/6-cqrs-ordered-events/README.md @@ -4,7 +4,7 @@ This application is using [Watermill CQRS](http://watermill.io/docs/cqrs) compon Detailed documentation for CQRS can be found in Watermill's docs: [http://watermill.io/docs/cqrs#usage](http://watermill.io/docs/cqrs). -This example, uses event groups to maintain order of events. You can read more about them in the [Watermill documentation](https://watermill.io/docs/cqrs/). +This example, uses event groups to keep order for multiple events. You can read more about them in the [Watermill documentation](https://watermill.io/docs/cqrs/). We are also using Kafka's partitioning keys to increase processing throughput without losing order of events. diff --git a/docs/content/docs/cqrs.md b/docs/content/docs/cqrs.md index e0daa0350..2923f2a2d 100644 --- a/docs/content/docs/cqrs.md +++ b/docs/content/docs/cqrs.md @@ -135,16 +135,14 @@ In the scenario, when we have multiple event types on one topic, you have two op **Key differences between `EventProcessor` and `EventGroupProcessor`:** 1. `EventProcessor`: -- Each handler has its own subscriber +- Each handler has its own subscriber instance - One handler per event type - Simple one-to-one matching of events to handlers + 2. `EventGroupProcessor`: -- Multiple handlers share a single subscriber -- Can have multiple handlers for the same event type -- Iterates through all handlers in the group -- Message is considered processed if at least one handler processes it -- All handlers in a group receive messages in the same order -- If one handler fails, the message is nacked and will be redelivered to all handlers +- Group of handlers share a single subscriber instance (and one consumer group, if such mechanism is supported) +- One handler group can support multiple event types, +- When message arrives to the topic, Watermill will match it to the handler in the group based on event type ```kroki {type=mermaid} graph TD @@ -166,6 +164,9 @@ graph TD end ``` +**Event Handler groups are helpful, when you have multiple event of different types, and you want to maintain order of events.** +Thanks to using one subscriber instance and consumer group, events will be processed in the order they were sent. + {{< callout context="note" title="Note" icon="outline/info-circle" >}} It's allowed to have multiple handlers for the same event type in one group, but we recommend to not do that. From 244492e5880d6f27cb06eef58a02891a6e892982 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Mon, 16 Dec 2024 16:48:10 +0100 Subject: [PATCH 11/12] added example link for event groups --- docs/content/docs/cqrs.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/content/docs/cqrs.md b/docs/content/docs/cqrs.md index 2923f2a2d..cc6c03024 100644 --- a/docs/content/docs/cqrs.md +++ b/docs/content/docs/cqrs.md @@ -182,6 +182,8 @@ After that, you can use `AddHandlersGroup` on [`EventProcessor`](#event-processo Both `GenerateHandlerGroupSubscribeTopic` and `GroupSubscriberConstructor` receives information about group name in function arguments. +You can see a fully working example with event groups in our [examples](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/6-cqrs-ordered-events/). + ### Generic handlers Since Watermill v1.3 it's possible to use generic handlers for commands and events. It's useful when you have a lot of commands/events and you don't want to create a handler for each of them. From 7ae8f0232f895bb3a2f3c9add3916f5064c6f419 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Mon, 16 Dec 2024 17:43:30 +0100 Subject: [PATCH 12/12] Update docs/content/docs/cqrs.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Miłosz Smółka --- docs/content/docs/cqrs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/docs/cqrs.md b/docs/content/docs/cqrs.md index cc6c03024..5c01e1517 100644 --- a/docs/content/docs/cqrs.md +++ b/docs/content/docs/cqrs.md @@ -164,7 +164,7 @@ graph TD end ``` -**Event Handler groups are helpful, when you have multiple event of different types, and you want to maintain order of events.** +**Event Handler groups are helpful when you have multiple event types on one topic and you want to maintain order of events.** Thanks to using one subscriber instance and consumer group, events will be processed in the order they were sent. {{< callout context="note" title="Note" icon="outline/info-circle" >}}