diff --git a/_examples/basic/5-cqrs-protobuf/.validate_example.yml b/_examples/basic/5-cqrs-protobuf/.validate_example.yml
index 20dce4535..b8ae907e4 100644
--- a/_examples/basic/5-cqrs-protobuf/.validate_example.yml
+++ b/_examples/basic/5-cqrs-protobuf/.validate_example.yml
@@ -1,4 +1,6 @@
validation_cmd: "docker compose up"
teardown_cmd: "docker compose down"
timeout: 120
-expected_output: "beers ordered to room 3"
+expected_outputs:
+ - "beers ordered to room \\d+"
+ - "Already booked rooms for \\$\\d{2,}"
diff --git a/_examples/basic/5-cqrs-protobuf/README.md b/_examples/basic/5-cqrs-protobuf/README.md
index f1e6e0891..e8ad853e3 100644
--- a/_examples/basic/5-cqrs-protobuf/README.md
+++ b/_examples/basic/5-cqrs-protobuf/README.md
@@ -6,6 +6,35 @@ Detailed documentation for CQRS can be found in Watermill's docs: [http://waterm
![CQRS Event Storming](https://threedots.tech/watermill-io/cqrs-example-storming.png)
+```mermaid
+sequenceDiagram
+ participant M as Main
+ participant CB as CommandBus
+ participant BRH as BookRoomHandler
+ participant EB as EventBus
+ participant OBRB as OrderBeerOnRoomBooked
+ participant OBH as OrderBeerHandler
+ participant BFR as BookingsFinancialReport
+
+ Note over M,BFR: Commands use AMQP queue, Events use AMQP pub/sub
+
+ M->>CB: Send(BookRoom)
topic: commands.BookRoom
+ CB->>BRH: Handle(BookRoom)
+
+ BRH->>EB: Publish(RoomBooked)
topic: events.RoomBooked
+
+ par Process RoomBooked Event
+ EB->>OBRB: Handle(RoomBooked)
+ OBRB->>CB: Send(OrderBeer)
topic: commands.OrderBeer
+ CB->>OBH: Handle(OrderBeer)
+ OBH->>EB: Publish(BeerOrdered)
topic: events.BeerOrdered
+
+ EB->>BFR: Handle(RoomBooked)
+ Note over BFR: Updates financial report
+ end
+```
+
+
## Running
```bash
diff --git a/_examples/basic/5-cqrs-protobuf/docker-compose.yml b/_examples/basic/5-cqrs-protobuf/docker-compose.yml
index f01a2a3e2..09ec5450e 100644
--- a/_examples/basic/5-cqrs-protobuf/docker-compose.yml
+++ b/_examples/basic/5-cqrs-protobuf/docker-compose.yml
@@ -17,3 +17,4 @@ services:
rabbitmq:
image: rabbitmq:3.7
restart: unless-stopped
+ attach: false
diff --git a/_examples/basic/5-cqrs-protobuf/go.mod b/_examples/basic/5-cqrs-protobuf/go.mod
index 406956feb..72e057665 100644
--- a/_examples/basic/5-cqrs-protobuf/go.mod
+++ b/_examples/basic/5-cqrs-protobuf/go.mod
@@ -1,7 +1,7 @@
module main.go
require (
- github.com/ThreeDotsLabs/watermill v1.3.7
+ github.com/ThreeDotsLabs/watermill v1.4.2-0.20241216112750-9d5e2da13339
github.com/ThreeDotsLabs/watermill-amqp/v3 v3.0.0
github.com/golang/protobuf v1.5.4
)
diff --git a/_examples/basic/5-cqrs-protobuf/go.sum b/_examples/basic/5-cqrs-protobuf/go.sum
index 8b9e98ea5..27ff1c96f 100644
--- a/_examples/basic/5-cqrs-protobuf/go.sum
+++ b/_examples/basic/5-cqrs-protobuf/go.sum
@@ -1,5 +1,9 @@
github.com/ThreeDotsLabs/watermill v1.3.7 h1:NV0PSTmuACVEOV4dMxRnmGXrmbz8U83LENOvpHekN7o=
github.com/ThreeDotsLabs/watermill v1.3.7/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
+github.com/ThreeDotsLabs/watermill v1.4.1 h1:gjP6yZH+otMPjV0KsV07pl9TeMm9UQV/gqiuiuG5Drs=
+github.com/ThreeDotsLabs/watermill v1.4.1/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
+github.com/ThreeDotsLabs/watermill v1.4.2-0.20241216112750-9d5e2da13339 h1:Q6joJUSSelwcxHEngdu3PGy4UYqk7BMXKC3Rzer3Xuk=
+github.com/ThreeDotsLabs/watermill v1.4.2-0.20241216112750-9d5e2da13339/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill-amqp/v3 v3.0.0 h1:r5idq2qkd3M345iv3C3zAX+lFlEu7iW8QESNnuuv4eY=
github.com/ThreeDotsLabs/watermill-amqp/v3 v3.0.0/go.mod h1:+8tCh6VCuBcQWhfETCwzRINKQ1uyeg9moH3h7jMKxQk=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
diff --git a/_examples/basic/5-cqrs-protobuf/main.go b/_examples/basic/5-cqrs-protobuf/main.go
index b29ce7a20..dd2a7c3ad 100644
--- a/_examples/basic/5-cqrs-protobuf/main.go
+++ b/_examples/basic/5-cqrs-protobuf/main.go
@@ -3,18 +3,17 @@ package main
import (
"context"
"fmt"
- "log"
+ "log/slog"
"math/rand"
"sync"
"time"
- "github.com/golang/protobuf/ptypes"
-
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/components/cqrs"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
+ "google.golang.org/protobuf/types/known/timestamppb"
)
// BookRoomHandler is a command handler, which handles BookRoom command and emits RoomBooked.
@@ -29,12 +28,12 @@ func (b BookRoomHandler) Handle(ctx context.Context, cmd *BookRoom) error {
// some random price, in production you probably will calculate in wiser way
price := (rand.Int63n(40) + 1) * 10
- log.Printf(
- "Booked %s for %s from %s to %s",
- cmd.RoomId,
- cmd.GuestName,
- time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
- time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
+ slog.Info(
+ "Booked room",
+ "room_id", cmd.RoomId,
+ "guest_name", cmd.GuestName,
+ "start_date", time.Unix(cmd.StartDate.Seconds, int64(cmd.StartDate.Nanos)),
+ "end_date", time.Unix(cmd.EndDate.Seconds, int64(cmd.EndDate.Nanos)),
)
// RoomBooked will be handled by OrderBeerOnRoomBooked event handler,
@@ -90,7 +89,7 @@ func (o OrderBeerHandler) Handle(ctx context.Context, cmd *OrderBeer) error {
return err
}
- log.Printf("%d beers ordered to room %s", cmd.Count, cmd.RoomId)
+ slog.Info(fmt.Sprintf("%d beers ordered to room %s", cmd.Count, cmd.RoomId))
return nil
}
@@ -123,15 +122,23 @@ func (b *BookingsFinancialReport) Handle(ctx context.Context, event *RoomBooked)
b.totalCharge += event.Price
- fmt.Printf(">>> Already booked rooms for $%d\n", b.totalCharge)
+ slog.Info(fmt.Sprintf(">>> Already booked rooms for $%d\n", b.totalCharge))
return nil
}
var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
func main() {
- logger := watermill.NewStdLogger(false, false)
- cqrsMarshaler := cqrs.ProtobufMarshaler{}
+ logger := watermill.NewSlogLoggerWithLevelMapping(nil, map[slog.Level]slog.Level{
+ slog.LevelInfo: slog.LevelDebug,
+ })
+
+ cqrsMarshaler := cqrs.ProtobufMarshaler{
+ // It will generate topic names based on the event/command type.
+ // So for example, for "RoomBooked" name will be "RoomBooked".
+ // It's later used to generate topic names.
+ GenerateName: cqrs.StructName,
+ }
// You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
// Detailed RabbitMQ implementation: https://watermill.io/pubsubs/amqp/
@@ -168,8 +175,7 @@ func main() {
commandBus, err := cqrs.NewCommandBusWithConfig(commandsPublisher, cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
- // we are using queue RabbitMQ config, so we need to have topic per command type
- return params.CommandName, nil
+ return generateCommandsTopic(params.CommandName), nil
},
OnSend: func(params cqrs.CommandBusOnSendParams) error {
logger.Info("Sending command", watermill.LogFields{
@@ -191,8 +197,7 @@ func main() {
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
- // we are using queue RabbitMQ config, so we need to have topic per command type
- return params.CommandName, nil
+ return generateCommandsTopic(params.CommandName), nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
// we can reuse subscriber, because all commands have separated topics
@@ -221,11 +226,7 @@ func main() {
eventBus, err := cqrs.NewEventBusWithConfig(eventsPublisher, cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
- // because we are using PubSub RabbitMQ config, we can use one topic for all events
- return "events", nil
-
- // we can also use topic per event type
- // return params.EventName, nil
+ return generateEventsTopic(params.EventName), nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
@@ -245,22 +246,22 @@ func main() {
panic(err)
}
- eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
+ eventProcessor, err := cqrs.NewEventProcessorWithConfig(
router,
- cqrs.EventGroupProcessorConfig{
- GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
- return "events", nil
+ cqrs.EventProcessorConfig{
+ GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) {
+ return generateEventsTopic(params.EventName), nil
},
- SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
+ SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
config := amqp.NewDurablePubSubConfig(
amqpAddress,
- amqp.GenerateQueueNameTopicNameWithSuffix(params.EventGroupName),
+ amqp.GenerateQueueNameTopicNameWithSuffix(params.HandlerName),
)
return amqp.NewSubscriber(config, logger)
},
- OnHandle: func(params cqrs.EventGroupProcessorOnHandleParams) error {
+ OnHandle: func(params cqrs.EventProcessorOnHandleParams) error {
start := time.Now()
err := params.Handler.Handle(params.Message.Context(), params.Event)
@@ -290,16 +291,24 @@ func main() {
panic(err)
}
- err = eventProcessor.AddHandlersGroup(
- "events",
- cqrs.NewGroupEventHandler(OrderBeerOnRoomBooked{commandBus}.Handle),
- cqrs.NewGroupEventHandler(NewBookingsFinancialReport().Handle),
- cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
- logger.Info("Beer ordered", watermill.LogFields{
- "room_id": event.RoomId,
- })
- return nil
- }),
+ err = eventProcessor.AddHandlers(
+ cqrs.NewEventHandler(
+ "OrderBeerOnRoomBooked",
+ OrderBeerOnRoomBooked{commandBus}.Handle,
+ ),
+ cqrs.NewEventHandler(
+ "LogBeerOrdered",
+ func(ctx context.Context, event *BeerOrdered) error {
+ logger.Info("Beer ordered", watermill.LogFields{
+ "room_id": event.RoomId,
+ })
+ return nil
+ },
+ ),
+ cqrs.NewEventHandler(
+ "BookingsFinancialReport",
+ NewBookingsFinancialReport().Handle,
+ ),
)
if err != nil {
panic(err)
@@ -319,15 +328,8 @@ func publishCommands(commandBus *cqrs.CommandBus) func() {
for {
i++
- startDate, err := ptypes.TimestampProto(time.Now())
- if err != nil {
- panic(err)
- }
-
- endDate, err := ptypes.TimestampProto(time.Now().Add(time.Hour * 24 * 3))
- if err != nil {
- panic(err)
- }
+ startDate := timestamppb.New(time.Now())
+ endDate := timestamppb.New(time.Now().Add(time.Hour * 24 * 3))
bookRoomCmd := &BookRoom{
RoomId: fmt.Sprintf("%d", i),
@@ -342,3 +344,11 @@ func publishCommands(commandBus *cqrs.CommandBus) func() {
time.Sleep(time.Second)
}
}
+
+func generateEventsTopic(eventName string) string {
+ return "events." + eventName
+}
+
+func generateCommandsTopic(commandName string) string {
+ return "commands." + commandName
+}
diff --git a/_examples/basic/5-cqrs-protobuf/makefile b/_examples/basic/5-cqrs-protobuf/makefile
new file mode 100644
index 000000000..b071206af
--- /dev/null
+++ b/_examples/basic/5-cqrs-protobuf/makefile
@@ -0,0 +1,4 @@
+.PHONY: proto
+
+proto:
+ protoc --proto_path=proto --go_out=. --go_opt=paths=source_relative messages.proto
diff --git a/_examples/basic/5-cqrs-protobuf/events.pb.go b/_examples/basic/5-cqrs-protobuf/messages.pb.go
similarity index 100%
rename from _examples/basic/5-cqrs-protobuf/events.pb.go
rename to _examples/basic/5-cqrs-protobuf/messages.pb.go
diff --git a/_examples/basic/5-cqrs-protobuf/inputs/events.proto b/_examples/basic/5-cqrs-protobuf/proto/messages.proto
similarity index 100%
rename from _examples/basic/5-cqrs-protobuf/inputs/events.proto
rename to _examples/basic/5-cqrs-protobuf/proto/messages.proto
diff --git a/_examples/basic/6-cqrs-ordered-events/.validate_example.yml b/_examples/basic/6-cqrs-ordered-events/.validate_example.yml
new file mode 100644
index 000000000..f021a2513
--- /dev/null
+++ b/_examples/basic/6-cqrs-ordered-events/.validate_example.yml
@@ -0,0 +1,10 @@
+validation_cmd: "docker compose up"
+teardown_cmd: "docker compose down"
+timeout: 120
+expected_outputs:
+ - "Subscriber added subscriber_id"
+ - "Subscriber updated subscriber_id"
+ - "Subscriber removed subscriber_id"
+ - "\\[ACTIVITY\\] activity_type=SUBSCRIBED"
+ - "\\[ACTIVITY\\] activity_type=UNSUBSCRIBED"
+ - "\\[ACTIVITY\\] activity_type=EMAIL_UPDATED"
diff --git a/_examples/basic/6-cqrs-ordered-events/README.md b/_examples/basic/6-cqrs-ordered-events/README.md
new file mode 100644
index 000000000..1238cebc6
--- /dev/null
+++ b/_examples/basic/6-cqrs-ordered-events/README.md
@@ -0,0 +1,37 @@
+# Example Golang CQRS application - ordered events
+
+This application is using [Watermill CQRS](http://watermill.io/docs/cqrs) component.
+
+Detailed documentation for CQRS can be found in Watermill's docs: [http://watermill.io/docs/cqrs#usage](http://watermill.io/docs/cqrs).
+
+This example, uses event groups to keep order for multiple events. You can read more about them in the [Watermill documentation](https://watermill.io/docs/cqrs/).
+We are also using Kafka's partitioning keys to increase processing throughput without losing order of events.
+
+
+## What does this application do?
+
+This application manages an email subscription system where users can:
+
+1. Subscribe to receive emails by providing their email address
+2. Update their email address after subscribing
+3. Unsubscribe from the mailing list
+
+The system maintains:
+- A current list of all active subscribers
+- A timeline of all subscription-related activities
+
+If events won't be ordered, and `SubscriberSubscribed` would arrive after `SubscriberUnsubscribed` event,
+the subscriber will be still subscribed.
+
+## Possible improvements
+
+In this example, we are using global `events` and `commands` topics.
+You can consider splitting them into smaller topics, for example, per aggregate type.
+
+Thanks to that, you can scale your application horizontally and increase the throughput and processing less events.
+
+## Running
+
+```bash
+docker-compose up
+```
diff --git a/_examples/basic/6-cqrs-ordered-events/activity.go b/_examples/basic/6-cqrs-ordered-events/activity.go
new file mode 100644
index 000000000..5b97065ba
--- /dev/null
+++ b/_examples/basic/6-cqrs-ordered-events/activity.go
@@ -0,0 +1,89 @@
+package main
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "sync"
+ "time"
+)
+
+// ActivityEntry represents a single event in the timeline
+type ActivityEntry struct {
+ Timestamp time.Time
+ SubscriberID string
+ ActivityType string
+ Details string
+}
+
+// ActivityTimelineReadModel maintains a chronological log of all subscription-related events
+type ActivityTimelineReadModel struct {
+ activities []ActivityEntry
+ lock sync.RWMutex
+}
+
+func NewActivityTimelineModel() *ActivityTimelineReadModel {
+ return &ActivityTimelineReadModel{
+ activities: make([]ActivityEntry, 0),
+ }
+}
+
+// OnSubscribed handles subscription events
+func (m *ActivityTimelineReadModel) OnSubscribed(ctx context.Context, event *SubscriberSubscribed) error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ entry := ActivityEntry{
+ Timestamp: time.Now(),
+ SubscriberID: event.SubscriberId,
+ ActivityType: "SUBSCRIBED",
+ Details: fmt.Sprintf("Subscribed with email: %s", event.Email),
+ }
+
+ m.activities = append(m.activities, entry)
+ m.logActivity(entry)
+ return nil
+}
+
+// OnUnsubscribed handles unsubscription events
+func (m *ActivityTimelineReadModel) OnUnsubscribed(ctx context.Context, event *SubscriberUnsubscribed) error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ entry := ActivityEntry{
+ Timestamp: time.Now(),
+ SubscriberID: event.SubscriberId,
+ ActivityType: "UNSUBSCRIBED",
+ Details: "Subscriber unsubscribed",
+ }
+
+ m.activities = append(m.activities, entry)
+ m.logActivity(entry)
+ return nil
+}
+
+// OnEmailUpdated handles email update events
+func (m *ActivityTimelineReadModel) OnEmailUpdated(ctx context.Context, event *SubscriberEmailUpdated) error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ entry := ActivityEntry{
+ Timestamp: time.Now(),
+ SubscriberID: event.SubscriberId,
+ ActivityType: "EMAIL_UPDATED",
+ Details: fmt.Sprintf("Email updated to: %s", event.NewEmail),
+ }
+
+ m.activities = append(m.activities, entry)
+ m.logActivity(entry)
+ return nil
+}
+
+func (m *ActivityTimelineReadModel) logActivity(entry ActivityEntry) {
+ slog.Info(
+ "[ACTIVITY]",
+ "activity_type", entry.ActivityType,
+ "subscriber_id", entry.SubscriberID,
+ "details", entry.Details,
+ )
+}
diff --git a/_examples/basic/6-cqrs-ordered-events/docker-compose.yml b/_examples/basic/6-cqrs-ordered-events/docker-compose.yml
new file mode 100644
index 000000000..dfbd8cca1
--- /dev/null
+++ b/_examples/basic/6-cqrs-ordered-events/docker-compose.yml
@@ -0,0 +1,43 @@
+services:
+ golang:
+ image: golang:1.23
+ restart: unless-stopped
+ ports:
+ - 8080:8080
+ depends_on:
+ - kafka
+ - zookeeper
+ links:
+ - kafka
+ - zookeeper
+ volumes:
+ - .:/app
+ - $GOPATH/pkg/mod:/go/pkg/mod
+ working_dir: /app
+ command: go run .
+
+ zookeeper:
+ container_name: zk
+ attach: false
+ image: confluentinc/cp-zookeeper:7.7.1
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ ports:
+ - 2181:2181
+
+ kafka:
+ container_name: kafka
+ attach: false
+ image: confluentinc/cp-kafka:7.7.1
+ depends_on:
+ - zookeeper
+ ports:
+ - 9093:9093
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zk:2181
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
\ No newline at end of file
diff --git a/_examples/basic/6-cqrs-ordered-events/go.mod b/_examples/basic/6-cqrs-ordered-events/go.mod
new file mode 100644
index 000000000..69d1a33e8
--- /dev/null
+++ b/_examples/basic/6-cqrs-ordered-events/go.mod
@@ -0,0 +1,43 @@
+module main.go
+
+require (
+ github.com/ThreeDotsLabs/watermill v1.4.2-0.20241216125745-7ab13543158c
+ github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.5
+ google.golang.org/protobuf v1.34.2
+)
+
+require (
+ github.com/IBM/sarama v1.43.3 // indirect
+ github.com/cenkalti/backoff/v3 v3.2.2 // indirect
+ github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/dnwe/otelsarama v0.0.0-20240308230250-9388d9d40bc0 // indirect
+ github.com/eapache/go-resiliency v1.7.0 // indirect
+ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
+ github.com/eapache/queue v1.1.0 // indirect
+ github.com/go-logr/logr v1.4.2 // indirect
+ github.com/go-logr/stdr v1.2.2 // indirect
+ github.com/golang/snappy v0.0.4 // indirect
+ github.com/google/uuid v1.6.0 // indirect
+ github.com/hashicorp/errwrap v1.1.0 // indirect
+ github.com/hashicorp/go-multierror v1.1.1 // indirect
+ github.com/hashicorp/go-uuid v1.0.3 // indirect
+ github.com/jcmturner/aescts/v2 v2.0.0 // indirect
+ github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
+ github.com/jcmturner/gofork v1.7.6 // indirect
+ github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
+ github.com/jcmturner/rpc/v2 v2.0.3 // indirect
+ github.com/klauspost/compress v1.17.9 // indirect
+ github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
+ github.com/oklog/ulid v1.3.1 // indirect
+ github.com/pierrec/lz4/v4 v4.1.21 // indirect
+ github.com/pkg/errors v0.9.1 // indirect
+ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
+ github.com/sony/gobreaker v1.0.0 // indirect
+ go.opentelemetry.io/otel v1.29.0 // indirect
+ go.opentelemetry.io/otel/metric v1.29.0 // indirect
+ go.opentelemetry.io/otel/trace v1.29.0 // indirect
+ golang.org/x/crypto v0.26.0 // indirect
+ golang.org/x/net v0.28.0 // indirect
+)
+
+go 1.21
diff --git a/_examples/basic/6-cqrs-ordered-events/go.sum b/_examples/basic/6-cqrs-ordered-events/go.sum
new file mode 100644
index 000000000..35c7c441f
--- /dev/null
+++ b/_examples/basic/6-cqrs-ordered-events/go.sum
@@ -0,0 +1,130 @@
+github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA=
+github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ=
+github.com/ThreeDotsLabs/watermill v1.4.2-0.20241216125745-7ab13543158c h1:YucRnlvVXDydpiKp88znVBjcAYDIpmjKtHxixEAEJ38=
+github.com/ThreeDotsLabs/watermill v1.4.2-0.20241216125745-7ab13543158c/go.mod h1:fO+fo0wGp1+dKQGkoPdLGIHXxwmYj+kitAXJDk/6l0Y=
+github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.5 h1:ud+4txnRgtr3kZXfXZ5+C7kVQEvsLc5HSNUEa0g+X1Q=
+github.com/ThreeDotsLabs/watermill-kafka/v3 v3.0.5/go.mod h1:t4o+4A6GB+XC8WL3DandhzPwd265zQuyWMQC/I+WIOU=
+github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
+github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dnwe/otelsarama v0.0.0-20240308230250-9388d9d40bc0 h1:R2zQhFwSCyyd7L43igYjDrH0wkC/i+QBPELuY0HOu84=
+github.com/dnwe/otelsarama v0.0.0-20240308230250-9388d9d40bc0/go.mod h1:2MqLKYJfjs3UriXXF9Fd0Qmh/lhxi/6tHXkqtXxyIHc=
+github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
+github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
+github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
+github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
+github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
+github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
+github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
+github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
+github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
+github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
+github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
+github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
+github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
+github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
+github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
+github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
+github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
+github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
+github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
+github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
+github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
+github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
+github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
+github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
+github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg=
+github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo=
+github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
+github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
+github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8=
+github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
+github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
+github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
+github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
+github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
+github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
+github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
+github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
+github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
+github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
+github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
+github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
+github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ=
+github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
+github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=
+go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8=
+go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc=
+go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8=
+go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4=
+go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
+golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
+golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
+golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
+golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
+google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/_examples/basic/6-cqrs-ordered-events/main.go b/_examples/basic/6-cqrs-ordered-events/main.go
new file mode 100644
index 000000000..99d25f5ac
--- /dev/null
+++ b/_examples/basic/6-cqrs-ordered-events/main.go
@@ -0,0 +1,264 @@
+package main
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "time"
+
+ "github.com/ThreeDotsLabs/watermill"
+ "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"
+ "github.com/ThreeDotsLabs/watermill/components/cqrs"
+ "github.com/ThreeDotsLabs/watermill/message"
+ "github.com/ThreeDotsLabs/watermill/message/router/middleware"
+)
+
+var amqpAddress = "amqp://guest:guest@rabbitmq:5672/"
+
+func main() {
+ slog.SetLogLoggerLevel(slog.LevelDebug)
+
+ logger := watermill.NewSlogLoggerWithLevelMapping(nil, map[slog.Level]slog.Level{
+ slog.LevelInfo: slog.LevelDebug,
+ })
+
+ // We are decorating ProtobufMarshaler to add extra metadata to the message.
+ cqrsMarshaler := CqrsMarshalerDecorator{
+ cqrs.ProtobufMarshaler{
+ // It will generate topic names based on the event/command type.
+ // So for example, for "RoomBooked" name will be "RoomBooked".
+ GenerateName: cqrs.StructName,
+ },
+ }
+
+ watermillLogger := watermill.NewSlogLoggerWithLevelMapping(
+ slog.With("watermill", true),
+ map[slog.Level]slog.Level{
+ slog.LevelInfo: slog.LevelDebug,
+ },
+ )
+
+ // This marshaler converts Watermill messages to Kafka messages.
+ // We are using it to add partition key to the Kafka message.
+ kafkaMarshaler := kafka.NewWithPartitioningMarshaler(GenerateKafkaPartitionKey)
+
+ // You can use any Pub/Sub implementation from here: https://watermill.io/pubsubs/
+ publisher, err := kafka.NewPublisher(
+ kafka.PublisherConfig{
+ Brokers: []string{"kafka:9092"},
+ Marshaler: kafkaMarshaler,
+ },
+ watermillLogger,
+ )
+ if err != nil {
+ panic(err)
+ }
+
+ // CQRS is built on messages router. Detailed documentation: https://watermill.io/docs/messages-router/
+ router, err := message.NewRouter(message.RouterConfig{}, logger)
+ if err != nil {
+ panic(err)
+ }
+
+ // Simple middleware which will recover panics from event or command handlers.
+ // More about router middlewares you can find in the documentation:
+ // https://watermill.io/docs/messages-router/#middleware
+ //
+ // List of available middlewares you can find in message/router/middleware.
+ router.AddMiddleware(middleware.Recoverer)
+ router.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
+ return func(msg *message.Message) ([]*message.Message, error) {
+ slog.Debug("Received message", "metadata", msg.Metadata)
+ return h(msg)
+ }
+ })
+
+ commandBus, err := cqrs.NewCommandBusWithConfig(publisher, cqrs.CommandBusConfig{
+ GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) {
+ // We are using one topic for all commands to maintain the order of commands.
+ return "commands", nil
+ },
+ Marshaler: cqrsMarshaler,
+ Logger: logger,
+ })
+ if err != nil {
+ panic(err)
+ }
+
+ eventBus, err := cqrs.NewEventBusWithConfig(publisher, cqrs.EventBusConfig{
+ GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
+ // We are using one topic for all events to maintain the order of events.
+ return "events", nil
+ },
+ Marshaler: cqrsMarshaler,
+ Logger: logger,
+ })
+ if err != nil {
+ panic(err)
+ }
+
+ commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
+ router,
+ cqrs.CommandProcessorConfig{
+ GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
+ return "commands", nil
+ },
+ SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
+ return kafka.NewSubscriber(
+ kafka.SubscriberConfig{
+ Brokers: []string{"kafka:9092"},
+ ConsumerGroup: params.HandlerName,
+ Unmarshaler: kafkaMarshaler,
+ },
+ watermillLogger,
+ )
+ },
+ Marshaler: cqrsMarshaler,
+ Logger: logger,
+ },
+ )
+ if err != nil {
+ panic(err)
+ }
+
+ eventProcessor, err := cqrs.NewEventGroupProcessorWithConfig(
+ router,
+ cqrs.EventGroupProcessorConfig{
+ GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
+ return "events", nil
+ },
+ SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
+ return kafka.NewSubscriber(
+ kafka.SubscriberConfig{
+ Brokers: []string{"kafka:9092"},
+ ConsumerGroup: params.EventGroupName,
+ Unmarshaler: kafkaMarshaler,
+ },
+ watermillLogger,
+ )
+ },
+ Marshaler: cqrsMarshaler,
+ Logger: logger,
+ },
+ )
+ if err != nil {
+ panic(err)
+ }
+
+ err = commandProcessor.AddHandlers(
+ cqrs.NewCommandHandler("SubscribeHandler", SubscribeHandler{eventBus}.Handle),
+ cqrs.NewCommandHandler("UnsubscribeHandler", UnsubscribeHandler{eventBus}.Handle),
+ cqrs.NewCommandHandler("UpdateEmailHandler", UpdateEmailHandler{eventBus}.Handle),
+ )
+ if err != nil {
+ panic(err)
+ }
+
+ subscribersReadModel := NewSubscriberReadModel()
+
+ // All messages from this group will have one subscription.
+ // When message arrives, Watermill will match it with the correct handler.
+ err = eventProcessor.AddHandlersGroup(
+ "SubscriberReadModel",
+ cqrs.NewGroupEventHandler(subscribersReadModel.OnSubscribed),
+ cqrs.NewGroupEventHandler(subscribersReadModel.OnUnsubscribed),
+ cqrs.NewGroupEventHandler(subscribersReadModel.OnEmailUpdated),
+ )
+ if err != nil {
+ panic(err)
+ }
+
+ activityReadModel := NewActivityTimelineModel()
+
+ // All messages from this group will have one subscription.
+ // When message arrives, Watermill will match it with the correct handler.
+ err = eventProcessor.AddHandlersGroup(
+ "ActivityTimelineReadModel",
+ cqrs.NewGroupEventHandler(activityReadModel.OnSubscribed),
+ cqrs.NewGroupEventHandler(activityReadModel.OnUnsubscribed),
+ cqrs.NewGroupEventHandler(activityReadModel.OnEmailUpdated),
+ )
+ if err != nil {
+ panic(err)
+ }
+
+ slog.Info("Starting service")
+
+ go simulateTraffic(commandBus)
+
+ if err := router.Run(context.Background()); err != nil {
+ panic(err)
+ }
+}
+
+func simulateTraffic(commandBus *cqrs.CommandBus) {
+ for i := 0; ; i++ {
+ subscriberID := watermill.NewUUID()
+
+ err := commandBus.Send(context.Background(), &Subscribe{
+ Metadata: GenerateMessageMetadata(subscriberID),
+ SubscriberId: subscriberID,
+ Email: fmt.Sprintf("user%d@example.com", i),
+ })
+ if err != nil {
+ slog.Error("Error sending Subscribe command", "err", err)
+ }
+ time.Sleep(time.Millisecond * 500)
+
+ err = commandBus.Send(context.Background(), &UpdateEmail{
+ Metadata: GenerateMessageMetadata(subscriberID),
+ SubscriberId: subscriberID,
+ NewEmail: fmt.Sprintf("updated%d@example.com", i),
+ })
+ if err != nil {
+ slog.Error("Error sending UpdateEmail command", "err", err)
+ }
+ time.Sleep(time.Millisecond * 500)
+
+ if i%3 == 0 {
+ err = commandBus.Send(context.Background(), &Unsubscribe{
+ Metadata: GenerateMessageMetadata(subscriberID),
+ SubscriberId: subscriberID,
+ })
+ if err != nil {
+ slog.Error("Error sending Unsubscribe command", "err", err)
+ }
+ }
+ time.Sleep(time.Millisecond * 500)
+ }
+}
+
+type SubscribeHandler struct {
+ eventBus *cqrs.EventBus
+}
+
+func (h SubscribeHandler) Handle(ctx context.Context, cmd *Subscribe) error {
+ return h.eventBus.Publish(ctx, &SubscriberSubscribed{
+ Metadata: GenerateMessageMetadata(cmd.SubscriberId),
+ SubscriberId: cmd.SubscriberId,
+ Email: cmd.Email,
+ })
+}
+
+type UnsubscribeHandler struct {
+ eventBus *cqrs.EventBus
+}
+
+func (h UnsubscribeHandler) Handle(ctx context.Context, cmd *Unsubscribe) error {
+ return h.eventBus.Publish(ctx, &SubscriberUnsubscribed{
+ Metadata: GenerateMessageMetadata(cmd.SubscriberId),
+ SubscriberId: cmd.SubscriberId,
+ })
+}
+
+type UpdateEmailHandler struct {
+ eventBus *cqrs.EventBus
+}
+
+func (h UpdateEmailHandler) Handle(ctx context.Context, cmd *UpdateEmail) error {
+ return h.eventBus.Publish(ctx, &SubscriberEmailUpdated{
+ Metadata: GenerateMessageMetadata(cmd.SubscriberId),
+ SubscriberId: cmd.SubscriberId,
+ NewEmail: cmd.NewEmail,
+ })
+}
diff --git a/_examples/basic/6-cqrs-ordered-events/makefile b/_examples/basic/6-cqrs-ordered-events/makefile
new file mode 100644
index 000000000..b071206af
--- /dev/null
+++ b/_examples/basic/6-cqrs-ordered-events/makefile
@@ -0,0 +1,4 @@
+.PHONY: proto
+
+proto:
+ protoc --proto_path=proto --go_out=. --go_opt=paths=source_relative messages.proto
diff --git a/_examples/basic/6-cqrs-ordered-events/message.go b/_examples/basic/6-cqrs-ordered-events/message.go
new file mode 100644
index 000000000..3c644b632
--- /dev/null
+++ b/_examples/basic/6-cqrs-ordered-events/message.go
@@ -0,0 +1,56 @@
+package main
+
+import (
+ "fmt"
+ "log/slog"
+
+ "github.com/ThreeDotsLabs/watermill/components/cqrs"
+ "github.com/ThreeDotsLabs/watermill/message"
+ "google.golang.org/protobuf/types/known/timestamppb"
+)
+
+func GenerateMessageMetadata(partitionKey string) *MessageMetadata {
+ return &MessageMetadata{
+ PartitionKey: partitionKey,
+ CreatedAt: timestamppb.Now(),
+ }
+}
+
+type CqrsMarshalerDecorator struct {
+ cqrs.ProtobufMarshaler
+}
+
+const PartitionKeyMetadataField = "partition_key"
+
+func (c CqrsMarshalerDecorator) Marshal(v interface{}) (*message.Message, error) {
+ msg, err := c.ProtobufMarshaler.Marshal(v)
+ if err != nil {
+ return nil, err
+ }
+
+ pm, ok := v.(ProtoMessage)
+ if !ok {
+ return nil, fmt.Errorf("%T does not implement ProtoMessage and can't be marshaled", v)
+ }
+
+ metadata := pm.GetMetadata()
+ if metadata == nil {
+ return nil, fmt.Errorf("%T.GetMetadata returned nil", v)
+ }
+
+ msg.Metadata.Set(PartitionKeyMetadataField, metadata.PartitionKey)
+ msg.Metadata.Set("created_at", metadata.CreatedAt.AsTime().String())
+
+ return msg, nil
+}
+
+type ProtoMessage interface {
+ GetMetadata() *MessageMetadata
+}
+
+// GenerateKafkaPartitionKey is a function that generates a partition key for Kafka messages.
+func GenerateKafkaPartitionKey(topic string, msg *message.Message) (string, error) {
+ slog.Debug("Setting partition key", "topic", topic, "msg_metadata", msg.Metadata)
+
+ return msg.Metadata.Get(PartitionKeyMetadataField), nil
+}
diff --git a/_examples/basic/6-cqrs-ordered-events/messages.pb.go b/_examples/basic/6-cqrs-ordered-events/messages.pb.go
new file mode 100644
index 000000000..519ec4128
--- /dev/null
+++ b/_examples/basic/6-cqrs-ordered-events/messages.pb.go
@@ -0,0 +1,654 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.31.0
+// protoc v4.24.4
+// source: messages.proto
+
+package main
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ timestamppb "google.golang.org/protobuf/types/known/timestamppb"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type MessageMetadata struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ PartitionKey string `protobuf:"bytes,1,opt,name=partition_key,json=partitionKey,proto3" json:"partition_key,omitempty"`
+ CreatedAt *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"`
+}
+
+func (x *MessageMetadata) Reset() {
+ *x = MessageMetadata{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_messages_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *MessageMetadata) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*MessageMetadata) ProtoMessage() {}
+
+func (x *MessageMetadata) ProtoReflect() protoreflect.Message {
+ mi := &file_messages_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use MessageMetadata.ProtoReflect.Descriptor instead.
+func (*MessageMetadata) Descriptor() ([]byte, []int) {
+ return file_messages_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *MessageMetadata) GetPartitionKey() string {
+ if x != nil {
+ return x.PartitionKey
+ }
+ return ""
+}
+
+func (x *MessageMetadata) GetCreatedAt() *timestamppb.Timestamp {
+ if x != nil {
+ return x.CreatedAt
+ }
+ return nil
+}
+
+// Commands
+type Subscribe struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Metadata *MessageMetadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
+ SubscriberId string `protobuf:"bytes,2,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"`
+ Email string `protobuf:"bytes,3,opt,name=email,proto3" json:"email,omitempty"`
+}
+
+func (x *Subscribe) Reset() {
+ *x = Subscribe{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_messages_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Subscribe) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Subscribe) ProtoMessage() {}
+
+func (x *Subscribe) ProtoReflect() protoreflect.Message {
+ mi := &file_messages_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Subscribe.ProtoReflect.Descriptor instead.
+func (*Subscribe) Descriptor() ([]byte, []int) {
+ return file_messages_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *Subscribe) GetMetadata() *MessageMetadata {
+ if x != nil {
+ return x.Metadata
+ }
+ return nil
+}
+
+func (x *Subscribe) GetSubscriberId() string {
+ if x != nil {
+ return x.SubscriberId
+ }
+ return ""
+}
+
+func (x *Subscribe) GetEmail() string {
+ if x != nil {
+ return x.Email
+ }
+ return ""
+}
+
+type Unsubscribe struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Metadata *MessageMetadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
+ SubscriberId string `protobuf:"bytes,2,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"`
+}
+
+func (x *Unsubscribe) Reset() {
+ *x = Unsubscribe{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_messages_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Unsubscribe) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Unsubscribe) ProtoMessage() {}
+
+func (x *Unsubscribe) ProtoReflect() protoreflect.Message {
+ mi := &file_messages_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Unsubscribe.ProtoReflect.Descriptor instead.
+func (*Unsubscribe) Descriptor() ([]byte, []int) {
+ return file_messages_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *Unsubscribe) GetMetadata() *MessageMetadata {
+ if x != nil {
+ return x.Metadata
+ }
+ return nil
+}
+
+func (x *Unsubscribe) GetSubscriberId() string {
+ if x != nil {
+ return x.SubscriberId
+ }
+ return ""
+}
+
+type UpdateEmail struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Metadata *MessageMetadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
+ SubscriberId string `protobuf:"bytes,2,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"`
+ NewEmail string `protobuf:"bytes,3,opt,name=new_email,json=newEmail,proto3" json:"new_email,omitempty"`
+}
+
+func (x *UpdateEmail) Reset() {
+ *x = UpdateEmail{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_messages_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *UpdateEmail) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*UpdateEmail) ProtoMessage() {}
+
+func (x *UpdateEmail) ProtoReflect() protoreflect.Message {
+ mi := &file_messages_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use UpdateEmail.ProtoReflect.Descriptor instead.
+func (*UpdateEmail) Descriptor() ([]byte, []int) {
+ return file_messages_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *UpdateEmail) GetMetadata() *MessageMetadata {
+ if x != nil {
+ return x.Metadata
+ }
+ return nil
+}
+
+func (x *UpdateEmail) GetSubscriberId() string {
+ if x != nil {
+ return x.SubscriberId
+ }
+ return ""
+}
+
+func (x *UpdateEmail) GetNewEmail() string {
+ if x != nil {
+ return x.NewEmail
+ }
+ return ""
+}
+
+// Events
+type SubscriberSubscribed struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Metadata *MessageMetadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
+ SubscriberId string `protobuf:"bytes,2,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"`
+ Email string `protobuf:"bytes,3,opt,name=email,proto3" json:"email,omitempty"`
+}
+
+func (x *SubscriberSubscribed) Reset() {
+ *x = SubscriberSubscribed{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_messages_proto_msgTypes[4]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *SubscriberSubscribed) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SubscriberSubscribed) ProtoMessage() {}
+
+func (x *SubscriberSubscribed) ProtoReflect() protoreflect.Message {
+ mi := &file_messages_proto_msgTypes[4]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SubscriberSubscribed.ProtoReflect.Descriptor instead.
+func (*SubscriberSubscribed) Descriptor() ([]byte, []int) {
+ return file_messages_proto_rawDescGZIP(), []int{4}
+}
+
+func (x *SubscriberSubscribed) GetMetadata() *MessageMetadata {
+ if x != nil {
+ return x.Metadata
+ }
+ return nil
+}
+
+func (x *SubscriberSubscribed) GetSubscriberId() string {
+ if x != nil {
+ return x.SubscriberId
+ }
+ return ""
+}
+
+func (x *SubscriberSubscribed) GetEmail() string {
+ if x != nil {
+ return x.Email
+ }
+ return ""
+}
+
+type SubscriberUnsubscribed struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Metadata *MessageMetadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
+ SubscriberId string `protobuf:"bytes,2,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"`
+}
+
+func (x *SubscriberUnsubscribed) Reset() {
+ *x = SubscriberUnsubscribed{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_messages_proto_msgTypes[5]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *SubscriberUnsubscribed) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SubscriberUnsubscribed) ProtoMessage() {}
+
+func (x *SubscriberUnsubscribed) ProtoReflect() protoreflect.Message {
+ mi := &file_messages_proto_msgTypes[5]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SubscriberUnsubscribed.ProtoReflect.Descriptor instead.
+func (*SubscriberUnsubscribed) Descriptor() ([]byte, []int) {
+ return file_messages_proto_rawDescGZIP(), []int{5}
+}
+
+func (x *SubscriberUnsubscribed) GetMetadata() *MessageMetadata {
+ if x != nil {
+ return x.Metadata
+ }
+ return nil
+}
+
+func (x *SubscriberUnsubscribed) GetSubscriberId() string {
+ if x != nil {
+ return x.SubscriberId
+ }
+ return ""
+}
+
+type SubscriberEmailUpdated struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Metadata *MessageMetadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
+ SubscriberId string `protobuf:"bytes,2,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"`
+ NewEmail string `protobuf:"bytes,3,opt,name=new_email,json=newEmail,proto3" json:"new_email,omitempty"`
+}
+
+func (x *SubscriberEmailUpdated) Reset() {
+ *x = SubscriberEmailUpdated{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_messages_proto_msgTypes[6]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *SubscriberEmailUpdated) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SubscriberEmailUpdated) ProtoMessage() {}
+
+func (x *SubscriberEmailUpdated) ProtoReflect() protoreflect.Message {
+ mi := &file_messages_proto_msgTypes[6]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SubscriberEmailUpdated.ProtoReflect.Descriptor instead.
+func (*SubscriberEmailUpdated) Descriptor() ([]byte, []int) {
+ return file_messages_proto_rawDescGZIP(), []int{6}
+}
+
+func (x *SubscriberEmailUpdated) GetMetadata() *MessageMetadata {
+ if x != nil {
+ return x.Metadata
+ }
+ return nil
+}
+
+func (x *SubscriberEmailUpdated) GetSubscriberId() string {
+ if x != nil {
+ return x.SubscriberId
+ }
+ return ""
+}
+
+func (x *SubscriberEmailUpdated) GetNewEmail() string {
+ if x != nil {
+ return x.NewEmail
+ }
+ return ""
+}
+
+var File_messages_proto protoreflect.FileDescriptor
+
+var file_messages_proto_rawDesc = []byte{
+ 0x0a, 0x0e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+ 0x12, 0x04, 0x6d, 0x61, 0x69, 0x6e, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70,
+ 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d,
+ 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x71, 0x0a, 0x0f, 0x4d, 0x65, 0x73, 0x73, 0x61,
+ 0x67, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x23, 0x0a, 0x0d, 0x70, 0x61,
+ 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28,
+ 0x09, 0x52, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, 0x79, 0x12,
+ 0x39, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x02, 0x20,
+ 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52,
+ 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22, 0x79, 0x0a, 0x09, 0x53, 0x75,
+ 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x31, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64,
+ 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x61, 0x69, 0x6e,
+ 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61,
+ 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x75,
+ 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28,
+ 0x09, 0x52, 0x0c, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x49, 0x64, 0x12,
+ 0x14, 0x0a, 0x05, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05,
+ 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x22, 0x65, 0x0a, 0x0b, 0x55, 0x6e, 0x73, 0x75, 0x62, 0x73, 0x63,
+ 0x72, 0x69, 0x62, 0x65, 0x12, 0x31, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61,
+ 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x4d, 0x65,
+ 0x73, 0x73, 0x61, 0x67, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d,
+ 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x75, 0x62, 0x73, 0x63,
+ 0x72, 0x69, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c,
+ 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x49, 0x64, 0x22, 0x82, 0x01, 0x0a,
+ 0x0b, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x45, 0x6d, 0x61, 0x69, 0x6c, 0x12, 0x31, 0x0a, 0x08,
+ 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15,
+ 0x2e, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4d, 0x65, 0x74,
+ 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12,
+ 0x23, 0x0a, 0x0d, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64,
+ 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62,
+ 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x65, 0x77, 0x5f, 0x65, 0x6d, 0x61, 0x69,
+ 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x65, 0x77, 0x45, 0x6d, 0x61, 0x69,
+ 0x6c, 0x22, 0x84, 0x01, 0x0a, 0x14, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72,
+ 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x64, 0x12, 0x31, 0x0a, 0x08, 0x6d, 0x65,
+ 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d,
+ 0x61, 0x69, 0x6e, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64,
+ 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x23, 0x0a,
+ 0x0d, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72,
+ 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28,
+ 0x09, 0x52, 0x05, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x22, 0x70, 0x0a, 0x16, 0x53, 0x75, 0x62, 0x73,
+ 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x55, 0x6e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62,
+ 0x65, 0x64, 0x12, 0x31, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x4d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74,
+ 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69,
+ 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x75,
+ 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x49, 0x64, 0x22, 0x8d, 0x01, 0x0a, 0x16, 0x53,
+ 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x45, 0x6d, 0x61, 0x69, 0x6c, 0x55, 0x70,
+ 0x64, 0x61, 0x74, 0x65, 0x64, 0x12, 0x31, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74,
+ 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x61, 0x69, 0x6e, 0x2e, 0x4d,
+ 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08,
+ 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x75, 0x62, 0x73,
+ 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
+ 0x0c, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a,
+ 0x09, 0x6e, 0x65, 0x77, 0x5f, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
+ 0x52, 0x08, 0x6e, 0x65, 0x77, 0x45, 0x6d, 0x61, 0x69, 0x6c, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f,
+ 0x6d, 0x61, 0x69, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_messages_proto_rawDescOnce sync.Once
+ file_messages_proto_rawDescData = file_messages_proto_rawDesc
+)
+
+func file_messages_proto_rawDescGZIP() []byte {
+ file_messages_proto_rawDescOnce.Do(func() {
+ file_messages_proto_rawDescData = protoimpl.X.CompressGZIP(file_messages_proto_rawDescData)
+ })
+ return file_messages_proto_rawDescData
+}
+
+var file_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
+var file_messages_proto_goTypes = []interface{}{
+ (*MessageMetadata)(nil), // 0: main.MessageMetadata
+ (*Subscribe)(nil), // 1: main.Subscribe
+ (*Unsubscribe)(nil), // 2: main.Unsubscribe
+ (*UpdateEmail)(nil), // 3: main.UpdateEmail
+ (*SubscriberSubscribed)(nil), // 4: main.SubscriberSubscribed
+ (*SubscriberUnsubscribed)(nil), // 5: main.SubscriberUnsubscribed
+ (*SubscriberEmailUpdated)(nil), // 6: main.SubscriberEmailUpdated
+ (*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp
+}
+var file_messages_proto_depIdxs = []int32{
+ 7, // 0: main.MessageMetadata.created_at:type_name -> google.protobuf.Timestamp
+ 0, // 1: main.Subscribe.metadata:type_name -> main.MessageMetadata
+ 0, // 2: main.Unsubscribe.metadata:type_name -> main.MessageMetadata
+ 0, // 3: main.UpdateEmail.metadata:type_name -> main.MessageMetadata
+ 0, // 4: main.SubscriberSubscribed.metadata:type_name -> main.MessageMetadata
+ 0, // 5: main.SubscriberUnsubscribed.metadata:type_name -> main.MessageMetadata
+ 0, // 6: main.SubscriberEmailUpdated.metadata:type_name -> main.MessageMetadata
+ 7, // [7:7] is the sub-list for method output_type
+ 7, // [7:7] is the sub-list for method input_type
+ 7, // [7:7] is the sub-list for extension type_name
+ 7, // [7:7] is the sub-list for extension extendee
+ 0, // [0:7] is the sub-list for field type_name
+}
+
+func init() { file_messages_proto_init() }
+func file_messages_proto_init() {
+ if File_messages_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_messages_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*MessageMetadata); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_messages_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Subscribe); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_messages_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Unsubscribe); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_messages_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*UpdateEmail); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_messages_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*SubscriberSubscribed); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_messages_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*SubscriberUnsubscribed); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_messages_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*SubscriberEmailUpdated); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_messages_proto_rawDesc,
+ NumEnums: 0,
+ NumMessages: 7,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_messages_proto_goTypes,
+ DependencyIndexes: file_messages_proto_depIdxs,
+ MessageInfos: file_messages_proto_msgTypes,
+ }.Build()
+ File_messages_proto = out.File
+ file_messages_proto_rawDesc = nil
+ file_messages_proto_goTypes = nil
+ file_messages_proto_depIdxs = nil
+}
diff --git a/_examples/basic/6-cqrs-ordered-events/proto/messages.proto b/_examples/basic/6-cqrs-ordered-events/proto/messages.proto
new file mode 100644
index 000000000..574315f4b
--- /dev/null
+++ b/_examples/basic/6-cqrs-ordered-events/proto/messages.proto
@@ -0,0 +1,53 @@
+syntax = "proto3";
+package main;
+
+option go_package = "./main";
+
+import "google/protobuf/timestamp.proto";
+
+message MessageMetadata {
+ string partition_key = 1;
+ google.protobuf.Timestamp created_at = 2;
+}
+
+// Commands
+message Subscribe {
+ MessageMetadata metadata = 1;
+
+ string subscriber_id = 2;
+ string email = 3;
+}
+
+message Unsubscribe {
+ MessageMetadata metadata = 1;
+
+ string subscriber_id = 2;
+}
+
+message UpdateEmail {
+ MessageMetadata metadata = 1;
+
+ string subscriber_id = 2;
+ string new_email = 3;
+}
+
+// Events
+message SubscriberSubscribed {
+ MessageMetadata metadata = 1;
+
+ string subscriber_id = 2;
+ string email = 3;
+}
+
+message SubscriberUnsubscribed {
+ MessageMetadata metadata = 1;
+
+ string subscriber_id = 2;
+}
+
+message SubscriberEmailUpdated {
+ MessageMetadata metadata = 1;
+
+ string subscriber_id = 2;
+ string new_email = 3;
+}
\ No newline at end of file
diff --git a/_examples/basic/6-cqrs-ordered-events/subscribers.go b/_examples/basic/6-cqrs-ordered-events/subscribers.go
new file mode 100644
index 000000000..48d4e0a45
--- /dev/null
+++ b/_examples/basic/6-cqrs-ordered-events/subscribers.go
@@ -0,0 +1,68 @@
+package main
+
+import (
+ "context"
+ "log/slog"
+ "sync"
+)
+
+type SubscriberReadModel struct {
+ subscribers map[string]string // map[subscriberID]email
+ lock sync.RWMutex
+}
+
+func NewSubscriberReadModel() *SubscriberReadModel {
+ return &SubscriberReadModel{
+ subscribers: make(map[string]string),
+ }
+}
+
+func (m *SubscriberReadModel) OnSubscribed(ctx context.Context, event *SubscriberSubscribed) error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ m.subscribers[event.SubscriberId] = event.Email
+
+ slog.Info(
+ "Subscriber added",
+ "subscriber_id", event.SubscriberId,
+ "email", event.Email,
+ )
+
+ return nil
+}
+
+func (m *SubscriberReadModel) OnUnsubscribed(ctx context.Context, event *SubscriberUnsubscribed) error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ delete(m.subscribers, event.SubscriberId)
+
+ slog.Info(
+ "Subscriber removed",
+ "subscriber_id", event.SubscriberId,
+ )
+
+ return nil
+}
+
+func (m *SubscriberReadModel) OnEmailUpdated(ctx context.Context, event *SubscriberEmailUpdated) error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+
+ m.subscribers[event.SubscriberId] = event.NewEmail
+
+ slog.Info(
+ "Subscriber updated",
+ "subscriber_id", event.SubscriberId,
+ "email", event.NewEmail,
+ )
+
+ return nil
+}
+
+func (m *SubscriberReadModel) GetSubscriberCount() int {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
+ return len(m.subscribers)
+}
diff --git a/components/cqrs/name.go b/components/cqrs/name.go
index bdeb393ab..7bb5882a3 100644
--- a/components/cqrs/name.go
+++ b/components/cqrs/name.go
@@ -6,6 +6,13 @@ import (
)
// FullyQualifiedStructName name returns object name in format [package].[type name].
+// For example, for the struct:
+//
+// package events
+// type UserCreated struct {}
+//
+// it will return "events.UserCreated".
+//
// It ignores if the value is a pointer or not.
func FullyQualifiedStructName(v interface{}) string {
s := fmt.Sprintf("%T", v)
@@ -15,6 +22,13 @@ func FullyQualifiedStructName(v interface{}) string {
}
// StructName name returns struct name in format [type name].
+// For example, for the struct:
+//
+// package events
+// type UserCreated struct {}
+//
+// it will return "UserCreated".
+//
// It ignores if the value is a pointer or not.
func StructName(v interface{}) string {
segments := strings.Split(fmt.Sprintf("%T", v), ".")
diff --git a/dev/validate-examples/main.go b/dev/validate-examples/main.go
index 37c77ff83..d22b84995 100644
--- a/dev/validate-examples/main.go
+++ b/dev/validate-examples/main.go
@@ -16,10 +16,11 @@ import (
)
type Config struct {
- ValidationCmd string `yaml:"validation_cmd"`
- TeardownCmd string `yaml:"teardown_cmd"`
- Timeout int `yaml:"timeout"`
- ExpectedOutput string `yaml:"expected_output"`
+ ValidationCmd string `yaml:"validation_cmd"`
+ TeardownCmd string `yaml:"teardown_cmd"`
+ Timeout int `yaml:"timeout"`
+ ExpectedOutput string `yaml:"expected_output"`
+ ExpectedOutputs []string `yaml:"expected_outputs"`
}
func (c *Config) LoadFrom(path string) error {
@@ -35,8 +36,20 @@ func (c *Config) LoadFrom(path string) error {
}
func main() {
- walkErr := filepath.Walk("../../", func(exampleConfig string, f os.FileInfo, _ error) error {
- matches, _ := filepath.Match(".validate_example*.yml", f.Name())
+ path := "../../_examples/"
+
+ if len(os.Args) > 1 {
+ path = filepath.Join(path, os.Args[1])
+ }
+
+ walkErr := filepath.Walk(path, func(exampleConfig string, f os.FileInfo, _ error) error {
+ if f == nil {
+ return nil
+ }
+ matches, err := filepath.Match(".validate_example*.yml", f.Name())
+ if err != nil {
+ return fmt.Errorf("could not match file, err: %w", err)
+ }
if !matches {
return nil
}
@@ -45,7 +58,7 @@ func main() {
fmt.Printf("validating %s\n", exampleDirectory)
- err := validate(exampleConfig)
+ err = validate(exampleConfig)
if err != nil {
return fmt.Errorf("validation for %s failed, err: %v", exampleDirectory, err)
}
@@ -67,9 +80,14 @@ func validate(path string) error {
dirName := filepath.Base(filepath.Dir(path))
+ expectedOutputs := config.ExpectedOutputs
+ if config.ExpectedOutput != "" {
+ expectedOutputs = append(expectedOutputs, config.ExpectedOutput)
+ }
+
fmt.Print("\n\n")
fmt.Println("Validating example:", dirName)
- fmt.Println("Waiting for output: ", color.GreenString(config.ExpectedOutput))
+ fmt.Println("Waiting for output: ", color.GreenString(fmt.Sprintf("%+q", expectedOutputs)))
cmdAndArgs := strings.Fields(config.ValidationCmd)
validationCmd := exec.Command(cmdAndArgs[0], cmdAndArgs[1:]...)
@@ -114,12 +132,20 @@ func validate(path string) error {
go readLines(stdout, lines)
go readLines(stderr, lines)
+ outputsFound := map[int]struct{}{}
+
go func() {
for line := range lines {
fmt.Printf("[%s] > %s\n", color.CyanString(dirName), line)
- ok, _ := regexp.MatchString(config.ExpectedOutput, line)
- if ok {
+ for num, output := range expectedOutputs {
+ ok, _ := regexp.MatchString(output, line)
+ if ok {
+ outputsFound[num] = struct{}{}
+ }
+ }
+
+ if len(outputsFound) == len(expectedOutputs) {
success <- true
return
}
diff --git a/docs/DEVELOP.md b/docs/DEVELOP.md
new file mode 100644
index 000000000..6c7aaa8a0
--- /dev/null
+++ b/docs/DEVELOP.md
@@ -0,0 +1,14 @@
+## How to Develop watermill.io docs?
+
+### Building & running
+
+```bash
+./build.sh
+npm run dev
+```
+
+### Useful resources
+
+- [Available shortcodes](https://getdoks.org/docs/basics/shortcodes/)
+- [Diagrams](https://getdoks.org/docs/built-ins/diagrams/) (we recommend [Mermaid](https://getdoks.org/docs/built-ins/diagrams/#mermaid))
+- [Codeglocks](https://getdoks.org/docs/built-ins/code-blocks/)
diff --git a/docs/content/docs/cqrs.md b/docs/content/docs/cqrs.md
index 4729dc7e6..5c01e1517 100644
--- a/docs/content/docs/cqrs.md
+++ b/docs/content/docs/cqrs.md
@@ -132,19 +132,63 @@ In the scenario, when we have multiple event types on one topic, you have two op
1. You can set `EventConfig.AckOnUnknownEvent` to true - it will acknowledge all events that are not handled by handler,
2. You can use Event Handler groups mechanism.
+**Key differences between `EventProcessor` and `EventGroupProcessor`:**
+
+1. `EventProcessor`:
+- Each handler has its own subscriber instance
+- One handler per event type
+- Simple one-to-one matching of events to handlers
+
+2. `EventGroupProcessor`:
+- Group of handlers share a single subscriber instance (and one consumer group, if such mechanism is supported)
+- One handler group can support multiple event types,
+- When message arrives to the topic, Watermill will match it to the handler in the group based on event type
+
+```kroki {type=mermaid}
+graph TD
+ subgraph Individual Handlers
+ E[Event Bus] --> S1[Subscriber 1]
+ E --> S2[Subscriber 2]
+ E --> S3[Subscriber 3]
+ S1 --> H1[Handler 1]
+ S2 --> H2[Handler 2]
+ S3 --> H3[Handler 3]
+ end
+
+ subgraph Group Handlers
+ EB[Event Bus] --> SharedSub[Shared Subscriber]
+ SharedSub --> GH[Handler Group]
+ GH --> GH1[Handler 1]
+ GH --> GH2[Handler 2]
+ GH --> GH3[Handler 3]
+ end
+```
+
+**Event Handler groups are helpful when you have multiple event types on one topic and you want to maintain order of events.**
+Thanks to using one subscriber instance and consumer group, events will be processed in the order they were sent.
+
+{{< callout context="note" title="Note" icon="outline/info-circle" >}}
+It's allowed to have multiple handlers for the same event type in one group, but we recommend to not do that.
+
+Please keep in mind that those handlers will be processed within the same message.
+If first handler succeeds and the second fails, the message will be re-delivered and the first will be re-executed.
+{{< /callout >}}
+
To use event groups, you need to set `GenerateHandlerGroupSubscribeTopic` and `GroupSubscriberConstructor` options in [`EventConfig`](#event-config).
After that, you can use `AddHandlersGroup` on [`EventProcessor`](#event-processor).
-{{% load-snippet-partial file="src-link/_examples/basic/5-cqrs-protobuf/main.go" first_line_contains="eventProcessor.AddHandlersGroup(" last_line_contains="if err != nil {" padding_after="0" %}}
+{{% load-snippet-partial file="src-link/_examples/basic/6-cqrs-ordered-events/main.go" first_line_contains="eventProcessor.AddHandlersGroup(" last_line_contains="if err != nil {" padding_after="0" %}}
Both `GenerateHandlerGroupSubscribeTopic` and `GroupSubscriberConstructor` receives information about group name in function arguments.
+You can see a fully working example with event groups in our [examples](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/6-cqrs-ordered-events/).
+
### Generic handlers
Since Watermill v1.3 it's possible to use generic handlers for commands and events. It's useful when you have a lot of commands/events and you don't want to create a handler for each of them.
-{{% load-snippet-partial file="src-link/_examples/basic/5-cqrs-protobuf/main.go" first_line_contains="cqrs.NewGroupEventHandler" last_line_contains="})," padding_after="0" %}}
+{{% load-snippet-partial file="src-link/_examples/basic/6-cqrs-ordered-events/main.go" first_line_contains="cqrs.NewGroupEventHandler" last_line_contains=")," padding_after="0" %}}
Under the hood, it creates EventHandler or CommandHandler implementation.
It's available for all kind of handlers.
@@ -154,7 +198,6 @@ It's available for all kind of handlers.
{{% load-snippet-partial file="src-link/components/cqrs/event_handler.go" first_line_contains="// NewEventHandler" last_line_contains="func NewEventHandler" padding_after="0" %}}
{{% load-snippet-partial file="src-link/components/cqrs/event_handler.go" first_line_contains="// NewGroupEventHandler" last_line_contains="func NewGroupEventHandler" padding_after="0" %}}
-
### Building a read model with the event handler
{{% load-snippet-partial file="src-link/_examples/basic/5-cqrs-protobuf/main.go" first_line_contains="// BookingsFinancialReport is a read model" last_line_contains="func main() {" padding_after="0" %}}