Skip to content

Commit

Permalink
on-update functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
rbroggi committed Jul 24, 2024
1 parent c1a1b9c commit 8ed4b76
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 10 deletions.
7 changes: 4 additions & 3 deletions example/server/configs.http
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ GET http://localhost:8080/configs/latest

### Modify latest config
PUT http://localhost:8080/configs/latest
user-id: mark
user-id: pippo

{
"name": "stark",
"age": 35
"name": "gloria",
"age": 38,
"friends": ["joe", "mark"]
}

### List config versions
Expand Down
10 changes: 10 additions & 0 deletions streamingconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ func WithCollectionName[T Config](collectionName string) func(repo *WatchedRepo[
}
}

func WithOnUpdate[T Config](onUpdate func(conf T)) func(repo *WatchedRepo[T]) {
return func(repo *WatchedRepo[T]) {
repo.onUpdate = onUpdate
}
}

type WatchedRepo[T Config] struct {
lgr *slog.Logger
source *mongo.Database
Expand All @@ -82,6 +88,7 @@ type WatchedRepo[T Config] struct {
skipIndexOperation bool
configs *mongo.Collection
started bool
onUpdate func(conf T)
}

func NewWatchedRepo[T Config](
Expand Down Expand Up @@ -419,6 +426,9 @@ func (s *WatchedRepo[T]) iterateChangeStream(ctx context.Context, cs *mongo.Chan
if s.cfgWithDefaults, err = copyAndSetDefaults(s.cfg); err != nil {
s.lgr.With("error", err).ErrorContext(ctx, "could not set defaults")
}
if s.onUpdate != nil {
s.onUpdate(s.cfgWithDefaults.Config)
}
default:
s.lgr.With("operationType", dto.OperationType).ErrorContext(ctx, "invalid or unexpected operation")
continue
Expand Down
60 changes: 53 additions & 7 deletions streamingconfig_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ func (a *appConfigV1) validate() error {

func NewTestStore[T config.Config](
t *testing.T,
nowProvider func() time.Time,
db *mongo.Database,
opts ...func(repo *config.WatchedRepo[T]),
) *config.WatchedRepo[T] {
t.Helper()
configStore, err := config.NewWatchedRepo(
config.Args{
Logger: slog.Default(),
DB: db,
}, config.WithNowFn[T](nowProvider))
}, opts...)
require.NoError(t, err)

return configStore
Expand Down Expand Up @@ -120,7 +120,7 @@ func newFixture(t *testing.T) *fixture {
assert.NoError(t, client.Database("admin").
RunCommand(ctx, bson.D{primitive.E{Key: "isMaster", Value: 1}}).Decode(&result), "checking mongoDB primary status")
assert.Equal(t, true, result["ismaster"])
}, 15*time.Second, 100*time.Millisecond)
}, 30*time.Second, 100*time.Millisecond)

return &fixture{db: db}
}
Expand All @@ -145,7 +145,7 @@ func Test_ConfigCreateFindAndUpdateConfiguration(t *testing.T) {
}
ctx, cnl := context.WithTimeout(context.Background(), 30*time.Second)

configStoreOne := NewTestStore[*appConfigV0](t, nowProvider, f.db)
configStoreOne := NewTestStore[*appConfigV0](t, f.db, config.WithNowFn[*appConfigV0](nowProvider))
t.Run("cannot perform methods before starting", func(t *testing.T) {
cfg, err := configStoreOne.GetConfig()
require.ErrorIs(t, err, config.ErrNotStarted)
Expand All @@ -158,7 +158,7 @@ func Test_ConfigCreateFindAndUpdateConfiguration(t *testing.T) {
done1, err := configStoreOne.Start(ctx)
require.NoError(t, err)

configStoreTwo := NewTestStore[*appConfigV0](t, nowProvider, f.db)
configStoreTwo := NewTestStore[*appConfigV0](t, f.db, config.WithNowFn[*appConfigV0](nowProvider))
done2, err := configStoreTwo.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
Expand Down Expand Up @@ -287,6 +287,44 @@ func Test_ConfigCreateFindAndUpdateConfiguration(t *testing.T) {
require.Len(t, configsByVersion, 0)
})
})

t.Run("on update", func(t *testing.T) {
var cfg *appConfigV0
configStore3 := NewTestStore[*appConfigV0](
t,
f.db,
config.WithNowFn[*appConfigV0](nowProvider),
config.WithOnUpdate[*appConfigV0](func(conf *appConfigV0) {
cfg = conf
}),
)
done2, err := configStore3.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
cnl()
doneOrTimeout(t, done1, 5*time.Second)
doneOrTimeout(t, done2, 5*time.Second)
})

now = &at
setV3 := &appConfigV0{
Name: "n3",
Duration: 3 * time.Second,
Time: at.Add(3 * time.Second),
Nested: nestedConfig{Counter: 5},
List: []string{"e", "f", "d"},
}
cV3, err := configStoreTwo.UpdateConfig(ctx, config.UpdateConfigCmd[*appConfigV0]{
By: "u2",
Config: setV3,
})
require.NoError(t, err)

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, cV3.Config, cfg)
}, 5*time.Second, 100*time.Millisecond)

})
})
}

Expand All @@ -302,11 +340,19 @@ func Test_ConfigBackwardCompatibility(t *testing.T) {
}
ctx, cnl := context.WithTimeout(context.Background(), 30*time.Second)

configStoreOne := NewTestStore[*appConfigV0](t, nowProvider, f.db)
configStoreOne := NewTestStore[*appConfigV0](
t,
f.db,
config.WithNowFn[*appConfigV0](nowProvider),
)
done1, err := configStoreOne.Start(ctx)
require.NoError(t, err)

configStoreTwo := NewTestStore[*appConfigV1](t, nowProvider, f.db)
configStoreTwo := NewTestStore[*appConfigV1](
t,
f.db,
config.WithNowFn[*appConfigV1](nowProvider),
)
done2, err := configStoreTwo.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
Expand Down

0 comments on commit 8ed4b76

Please sign in to comment.