From 2c07ee8b3a87272c4335910742e9553db69eaff5 Mon Sep 17 00:00:00 2001 From: Joe Williams <7463219+bubbajoe@users.noreply.github.com> Date: Sun, 30 Jun 2024 09:34:28 +0900 Subject: [PATCH] fix: change log out of order bug (#14) * fix: change log out of order bug * nobug: fix readme and add newline to cli --- .gitignore | 13 +----------- README.md | 9 -------- cmd/dgate-cli/main.go | 2 ++ internal/proxy/change_log.go | 40 ++++++++++------------------------- internal/proxy/proxy_state.go | 2 +- pkg/spec/change_log.go | 8 +++++++ 6 files changed, 23 insertions(+), 51 deletions(-) diff --git a/.gitignore b/.gitignore index 75e1fcd..f64bac2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,16 +1,5 @@ *__debug_bin* -.vscode -.idea -.fleet - -infra/ -.aws/ -dist/ -./dgate -./k6 - -# Local files # -.dgate*/ +.dgate cov.out go.work.sum .env \ No newline at end of file diff --git a/README.md b/README.md index 2617067..0ca298f 100644 --- a/README.md +++ b/README.md @@ -45,12 +45,3 @@ DGate CLI is a command-line interface that can be used to interact with the DGat - Error Handler Module (`errorHandler`) - executed when an error occurs when sending a request to the upstream server. This module is used to modify the response before it is sent to the client. - Request Handler Module (`requestHandler`) - executed when a request is received from the client. This module is used to handle arbitrary requests, instead of using an upstream service. - - - -- Examples - - [x] ip hash load balancer - - [x] short url service - - [x] modify json request/response - - [x] send multiple upstream requests and combine them - diff --git a/cmd/dgate-cli/main.go b/cmd/dgate-cli/main.go index 871f763..8129d69 100644 --- a/cmd/dgate-cli/main.go +++ b/cmd/dgate-cli/main.go @@ -16,5 +16,7 @@ func main() { os.Stderr.WriteString(err.Error()) os.Stderr.WriteString("\n") os.Exit(1) + return } + os.Stdout.WriteString("\n") } diff --git a/internal/proxy/change_log.go b/internal/proxy/change_log.go index 69ef075..c8a5697 100644 --- a/internal/proxy/change_log.go +++ b/internal/proxy/change_log.go @@ -31,8 +31,8 @@ func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) ( defer func() { if err == nil { if !ps.raftEnabled { - // dont store change logs - if err = ps.store.StoreChangeLog(cl); err != nil { + // renew the change log ID to avoid out-of-order processing + if err = ps.store.StoreChangeLog(cl.RenewID()); err != nil { ps.logger.Error("Error storing change log, restarting state", zap.Error(err)) return } @@ -346,12 +346,20 @@ func (ps *ProxyState) restoreFromChangeLogs(directApply bool) error { } ps.logger.Info("restoring state change logs from storage", zap.Int("count", len(logs))) // we might need to sort the change logs by timestamp - for _, cl := range logs { + for i, cl := range logs { // skip documents as they are persisted in the store if cl.Cmd.Resource() == spec.Documents { continue } if err = ps.processChangeLog(cl, false, false); err != nil { + ps.logger.Error("error processing change log", + zap.Bool("skip", ps.debugMode), + zap.Error(err), + zap.Int("index", i), + ) + if ps.debugMode { + continue + } return err } else { ps.changeLogs = append(ps.changeLogs, cl) @@ -445,29 +453,3 @@ START: removeList = sliceutil.SliceUnique(removeList, func(cl *spec.ChangeLog) string { return cl.ID }) return removeList } - -// Function to check if there is a delete command between two logs with matching keys -// func hasDeleteBetween(logs []*spec.ChangeLog, start, end *spec.ChangeLog) bool { -// startIndex := -1 -// endIndex := -1 - -// for i, log := range logs { -// if log.ID == start.ID { -// startIndex = i -// } -// if log.ID == end.ID { -// endIndex = i -// } -// } - -// if startIndex == -1 || endIndex == -1 { -// return false -// } - -// for i := startIndex + 1; i < endIndex; i++ { -// if logs[i].Cmd.IsDeleteCommand() { -// return true -// } -// } -// return false -// } diff --git a/internal/proxy/proxy_state.go b/internal/proxy/proxy_state.go index 834c6a4..447cbf2 100644 --- a/internal/proxy/proxy_state.go +++ b/internal/proxy/proxy_state.go @@ -299,7 +299,7 @@ func (ps *ProxyState) ApplyChangeLog(log *spec.ChangeLog) error { if err != nil { return err } - raftLog := raft.Log{Data: encodedCL} + raftLog := raft.Log{ Data: encodedCL } now := time.Now() future := r.ApplyLog(raftLog, time.Second*15) err = future.Error() diff --git a/pkg/spec/change_log.go b/pkg/spec/change_log.go index d16899b..2e49d80 100644 --- a/pkg/spec/change_log.go +++ b/pkg/spec/change_log.go @@ -43,6 +43,14 @@ func NewChangeLog(item Named, namespace string, cmd Command) *ChangeLog { } } +func (cl *ChangeLog) RenewID() *ChangeLog { + changeLog := *cl + changeLog.ID = strconv.FormatInt( + time.Now().UnixNano(), 36, + ) + return &changeLog +} + type Command string type Action string