From 1fee755e816717a44d53850b04428c2222c79d33 Mon Sep 17 00:00:00 2001 From: bubbajoe Date: Fri, 21 Jun 2024 09:37:40 +0900 Subject: [PATCH] fix: change log out of order bug --- internal/proxy/change_log.go | 14 +++++++++++--- internal/proxy/proxy_state.go | 2 +- pkg/spec/change_log.go | 8 ++++++++ 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/internal/proxy/change_log.go b/internal/proxy/change_log.go index 69ef075..64649c8 100644 --- a/internal/proxy/change_log.go +++ b/internal/proxy/change_log.go @@ -31,8 +31,8 @@ func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) ( defer func() { if err == nil { if !ps.raftEnabled { - // dont store change logs - if err = ps.store.StoreChangeLog(cl); err != nil { + // renew the change log ID to avoid out-of-order processing + if err = ps.store.StoreChangeLog(cl.RenewID()); err != nil { ps.logger.Error("Error storing change log, restarting state", zap.Error(err)) return } @@ -346,12 +346,20 @@ func (ps *ProxyState) restoreFromChangeLogs(directApply bool) error { } ps.logger.Info("restoring state change logs from storage", zap.Int("count", len(logs))) // we might need to sort the change logs by timestamp - for _, cl := range logs { + for i, cl := range logs { // skip documents as they are persisted in the store if cl.Cmd.Resource() == spec.Documents { continue } if err = ps.processChangeLog(cl, false, false); err != nil { + ps.logger.Error("error processing change log", + zap.Bool("skip", ps.debugMode), + zap.Error(err), + zap.Int("index", i), + ) + if ps.debugMode { + continue + } return err } else { ps.changeLogs = append(ps.changeLogs, cl) diff --git a/internal/proxy/proxy_state.go b/internal/proxy/proxy_state.go index 834c6a4..447cbf2 100644 --- a/internal/proxy/proxy_state.go +++ b/internal/proxy/proxy_state.go @@ -299,7 +299,7 @@ func (ps *ProxyState) ApplyChangeLog(log *spec.ChangeLog) error { if err != nil { return err } - raftLog := raft.Log{Data: encodedCL} + raftLog := raft.Log{ Data: encodedCL } now := time.Now() future := r.ApplyLog(raftLog, time.Second*15) err = future.Error() diff --git a/pkg/spec/change_log.go b/pkg/spec/change_log.go index d16899b..2e49d80 100644 --- a/pkg/spec/change_log.go +++ b/pkg/spec/change_log.go @@ -43,6 +43,14 @@ func NewChangeLog(item Named, namespace string, cmd Command) *ChangeLog { } } +func (cl *ChangeLog) RenewID() *ChangeLog { + changeLog := *cl + changeLog.ID = strconv.FormatInt( + time.Now().UnixNano(), 36, + ) + return &changeLog +} + type Command string type Action string