Skip to content

Commit

Permalink
Merge pull request #407 from AltScore/feature/let-set-collection-names
Browse files Browse the repository at this point in the history
Add custom collection names
  • Loading branch information
maxekman authored Jun 17, 2023
2 parents e5ca550 + 337e2a3 commit ac3a972
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 58 deletions.
32 changes: 23 additions & 9 deletions eventstore/mongodb_v2/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io"
"time"

"github.com/looplab/eventhorizon/mongoutils"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
Expand All @@ -41,11 +42,10 @@ import (

// EventStore is an eventhorizon.EventStore for MongoDB, using one collection
// for all events and another to keep track of all aggregates/streams. It also
// keep tracks of the global position of events, stored as metadata.
// keeps track of the global position of events, stored as metadata.
type EventStore struct {
client *mongo.Client
clientOwnership clientOwnership
db *mongo.Database
events *mongo.Collection
streams *mongo.Collection
snapshots *mongo.Collection
Expand Down Expand Up @@ -90,7 +90,6 @@ func newEventStoreWithClient(client *mongo.Client, clientOwnership clientOwnersh
s := &EventStore{
client: client,
clientOwnership: clientOwnership,
db: db,
events: db.Collection("events"),
streams: db.Collection("streams"),
snapshots: db.Collection("snapshots"),
Expand Down Expand Up @@ -194,16 +193,31 @@ func WithEventHandlerInTX(h eh.EventHandler) Option {
// Will return an error if provided parameters are equal.
func WithCollectionNames(eventsColl, streamsColl string) Option {
return func(s *EventStore) error {
if eventsColl == streamsColl {
if err := mongoutils.CheckCollectionName(eventsColl); err != nil {
return fmt.Errorf("events collection: %w", err)
} else if err := mongoutils.CheckCollectionName(streamsColl); err != nil {
return fmt.Errorf("streams collection: %w", err)
} else if eventsColl == streamsColl {
return fmt.Errorf("custom collection names are equal")
}

if eventsColl == "" || streamsColl == "" {
return fmt.Errorf("missing collection name")
db := s.events.Database()
s.events = db.Collection(eventsColl)
s.streams = db.Collection(streamsColl)

return nil
}
}

// WithSnapshotCollectionName uses different collections from the default "snapshots" collections.
func WithSnapshotCollectionName(snapshotColl string) Option {
return func(s *EventStore) error {
if err := mongoutils.CheckCollectionName(snapshotColl); err != nil {
return fmt.Errorf("snapshot collection: %w", err)
}

s.events = s.db.Collection(eventsColl)
s.streams = s.db.Collection(streamsColl)
db := s.events.Database()
s.snapshots = db.Collection(snapshotColl)

return nil
}
Expand Down Expand Up @@ -688,7 +702,7 @@ type evt struct {
}

// newEvt returns a new evt for an event.
func newEvt(ctx context.Context, event eh.Event) (*evt, error) {
func newEvt(_ context.Context, event eh.Event) (*evt, error) {
e := &evt{
EventType: event.EventType(),
Timestamp: event.Timestamp(),
Expand Down
99 changes: 81 additions & 18 deletions eventstore/mongodb_v2/eventstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,11 @@ func TestWithCollectionNamesIntegration(t *testing.T) {
t.Skip("skipping integration test")
}

// Use MongoDB in Docker with fallback to localhost.
url := os.Getenv("MONGODB_ADDR")
if url == "" {
url = "localhost:27017"
}
url, db := makeDB(t)

url = "mongodb://" + url

// Get a random DB name.
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
t.Fatal(err)
}

db := "test-" + hex.EncodeToString(b)
eventsColl := "foo_events"
streamsColl := "bar_streams"

t.Log("using DB:", db)

store, err := NewEventStore(url, db,
WithCollectionNames(eventsColl, streamsColl),
)
Expand Down Expand Up @@ -127,17 +112,95 @@ func TestWithCollectionNamesIntegration(t *testing.T) {
_, err = NewEventStore(url, db,
WithCollectionNames("", "my-collection"),
)
if err == nil || err.Error() != "error while applying option: missing collection name" {
if err == nil || err.Error() != "error while applying option: events collection: missing collection name" {
t.Fatal("there should be an error")
}

// providing empty collection names should result in an error
_, err = NewEventStore(url, db,
WithCollectionNames("my-collection", ""),
)
if err == nil || err.Error() != "error while applying option: missing collection name" {
if err == nil || err.Error() != "error while applying option: streams collection: missing collection name" {
t.Fatal("there should be an error")
}
// providing invalid streams collection names should result in an error
_, err = NewEventStore(url, db,
WithCollectionNames("my-collection", "name with spaces"),
)
if err == nil || err.Error() != "error while applying option: streams collection: invalid char in collection name (space)" {
t.Fatal("there should be an error")
}
// providing invalid events collection names should result in an error
_, err = NewEventStore(url, db,
WithCollectionNames("my collection", "a-good-name"),
)
if err == nil || err.Error() != "error while applying option: events collection: invalid char in collection name (space)" {
t.Fatal("there should be an error")
}
}

func TestWithSnapshotCollectionNamesIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

url, db := makeDB(t)

snapshotColl := "foo_snapshots"

store, err := NewEventStore(url, db,
WithSnapshotCollectionName(snapshotColl),
)
if err != nil {
t.Fatal("there should be no error:", err)
}

if store == nil {
t.Fatal("there should be a store")
}

defer store.Close()

if store.snapshots.Name() != snapshotColl {
t.Fatal("snapshots collection should use custom collection name")
}

// providing empty snapshot collection names should result in an error
_, err = NewEventStore(url, db,
WithSnapshotCollectionName(""),
)
if err == nil || err.Error() != "error while applying option: snapshot collection: missing collection name" {
t.Fatal("there should be an error")
}

// providing invalid snapshot collection names should result in an error
_, err = NewEventStore(url, db,
WithSnapshotCollectionName("no space-allowed"),
)
if err == nil || err.Error() != "error while applying option: snapshot collection: invalid char in collection name (space)" {
t.Fatal("there should be an error")
}
}

func makeDB(t *testing.T) (string, string) {
// Use MongoDB in Docker with fallback to localhost.
url := os.Getenv("MONGODB_ADDR")
if url == "" {
url = "localhost:27017"
}

url = "mongodb://" + url

// Get a random DB name.
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
t.Fatal(err)
}

db := "test-" + hex.EncodeToString(b)

t.Log("using DB:", db)
return url, db
}

func TestWithEventHandlerIntegration(t *testing.T) {
Expand Down
22 changes: 22 additions & 0 deletions mongoutils/checks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package mongoutils

import (
"errors"
"strings"
)

var (
ErrMissingCollectionName = errors.New("missing collection name")
ErrInvalidCharInCollectionName = errors.New("invalid char in collection name (space)")
)

// CheckCollectionName checks if a collection name is valid for mongodb.
// We only check on spaces because they are hard to see by humans.
func CheckCollectionName(name string) error {
if name == "" {
return ErrMissingCollectionName
} else if strings.ContainsAny(name, " ") {
return ErrInvalidCharInCollectionName
}
return nil
}
45 changes: 45 additions & 0 deletions mongoutils/checks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package mongoutils

import (
"testing"
)

func TestCheckValidCollectionName(t *testing.T) {
type args struct {
name string
}
tests := []struct {
name string
args args
wantErr bool
}{
{
"empty name",
args{
name: "",
},
true,
},
{
"valid name",
args{
name: "valid",
},
false,
},
{
"with spaces",
args{
name: "invalid name",
},
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := CheckCollectionName(tt.args.name); (err != nil) != tt.wantErr {
t.Errorf("CheckCollectionName() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
16 changes: 15 additions & 1 deletion outbox/mongodb/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

bsonCodec "github.com/looplab/eventhorizon/codec/bson"
"github.com/looplab/eventhorizon/mongoutils"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
Expand Down Expand Up @@ -127,6 +128,19 @@ func WithWatchToken(token string) Option {
}
}

// WithCollectionName uses different collections from the default "outbox" collection.
func WithCollectionName(outboxColl string) Option {
return func(s *Outbox) error {
if err := mongoutils.CheckCollectionName(outboxColl); err != nil {
return fmt.Errorf("outbox collection: %w", err)
}

s.outbox = s.outbox.Database().Collection(outboxColl)

return nil
}
}

// Client returns the MongoDB client used by the outbox. To use the outbox with
// the EventStore it needs to be created with the same client.
func (o *Outbox) Client() *mongo.Client {
Expand All @@ -139,7 +153,7 @@ func (o *Outbox) HandlerType() eh.EventHandlerType {
}

// AddHandler implements the AddHandler method of the eventhorizon.Outbox interface.
func (o *Outbox) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.EventHandler) error {
func (o *Outbox) AddHandler(_ context.Context, m eh.EventMatcher, h eh.EventHandler) error {
if m == nil {
return eh.ErrMissingMatcher
}
Expand Down
Loading

0 comments on commit ac3a972

Please sign in to comment.