Skip to content

Commit

Permalink
fix(store): retry with new timestamp value
Browse files Browse the repository at this point in the history
Retry inserting a list value will produce duplicated list elements
in Scylla when done with the same timestamp.

Fixes scylladb/scylladb#7937
  • Loading branch information
nuivall committed Jun 23, 2023
1 parent b63bacf commit fd28158
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
6 changes: 4 additions & 2 deletions pkg/store/cqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ func (cs *cqlStore) name() string {
return cs.system
}

func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, ts time.Time, values ...interface{}) (err error) {
func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...interface{}) (err error) {
var i int
for i = 0; i < cs.maxRetriesMutate; i++ {
err = cs.doMutate(ctx, builder, ts, values...)
// retry with new timestamp as list modification with the same ts
// will produce duplicated values, see https://github.com/scylladb/scylladb/issues/7937
err = cs.doMutate(ctx, builder, time.Now(), values...)
if err == nil {
cs.ops.WithLabelValues(cs.system, opType(builder)).Inc()
return nil
Expand Down
18 changes: 8 additions & 10 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type loader interface {
}

type storer interface {
mutate(context.Context, qb.Builder, time.Time, ...interface{}) error
mutate(context.Context, qb.Builder, ...interface{}) error
}

type storeLoader interface {
Expand Down Expand Up @@ -123,7 +123,7 @@ type noOpStore struct {
system string
}

func (n *noOpStore) mutate(context.Context, qb.Builder, time.Time, ...interface{}) error {
func (n *noOpStore) mutate(context.Context, qb.Builder, ...interface{}) error {
return nil
}

Expand Down Expand Up @@ -151,28 +151,26 @@ type delegatingStore struct {
}

func (ds delegatingStore) Create(ctx context.Context, testBuilder, oracleBuilder qb.Builder) error {
ts := time.Now()
if err := mutate(ctx, ds.oracleStore, ts, oracleBuilder, []interface{}{}); err != nil {
if err := mutate(ctx, ds.oracleStore, oracleBuilder, []interface{}{}); err != nil {
return errors.Wrap(err, "oracle failed store creation")
}
if err := mutate(ctx, ds.testStore, ts, testBuilder, []interface{}{}); err != nil {
if err := mutate(ctx, ds.testStore, testBuilder, []interface{}{}); err != nil {
return errors.Wrap(err, "test failed store creation")
}
return nil
}

func (ds delegatingStore) Mutate(ctx context.Context, builder qb.Builder, values ...interface{}) error {
ts := time.Now()
if err := mutate(ctx, ds.oracleStore, ts, builder, values...); err != nil {
if err := mutate(ctx, ds.oracleStore, builder, values...); err != nil {
// Oracle failed, transition cannot take place
ds.logger.Info("oracle failed mutation, transition to next state impossible so continuing with next mutation", zap.Error(err))
return nil
}
return mutate(ctx, ds.testStore, ts, builder, values...)
return mutate(ctx, ds.testStore, builder, values...)
}

func mutate(ctx context.Context, s storeLoader, ts time.Time, builder qb.Builder, values ...interface{}) error {
if err := s.mutate(ctx, builder, ts, values...); err != nil {
func mutate(ctx context.Context, s storeLoader, builder qb.Builder, values ...interface{}) error {
if err := s.mutate(ctx, builder, values...); err != nil {
return errors.Wrapf(err, "unable to apply mutations to the %s store", s.name())
}
return nil
Expand Down

0 comments on commit fd28158

Please sign in to comment.