From 0dd0b4145594d3d4c8115872d49f33a2225f0d84 Mon Sep 17 00:00:00 2001 From: lyramilk Date: Wed, 23 Oct 2024 09:41:10 +0800 Subject: [PATCH] feat: use sync instead of psync while Redis version is less than 2.8.0. (#874) --- internal/reader/sync_standalone_reader.go | 86 +++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index 27b83293..e9452f29 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -78,6 +78,9 @@ type syncStandaloneReader struct { AofReceivedBytes int64 `json:"aof_received_bytes"` // bytes of AOF received from master AofReceivedHuman string `json:"aof_received_human"` } + + // version info + SupportPSYNC bool } func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reader { @@ -90,10 +93,36 @@ func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reade r.stat.Status = kHandShake r.stat.Dir = utils.GetAbsPath(r.stat.Name) utils.CreateEmptyDir(r.stat.Dir) + + r.SupportPSYNC = r.supportPSYNC(); return r } + +func (r *syncStandaloneReader) supportPSYNC() bool { + reply := r.client.DoWithStringReply("info", "server") + for _, line := range strings.Split(reply, "\n") { + if strings.HasPrefix(line, "redis_version:") { + version := strings.Split(line, ":")[1] + parts := strings.Split(version,"."); + if len(parts) > 2{ + v1,_ := strconv.Atoi(parts[0]); + v2,_ := strconv.Atoi(parts[1]); + if v1 * 1000 + v2 < 2008{ + return false + } + } + + } + } + + return true; +} + func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry { + if !r.SupportPSYNC{ + return r.StartReadWithSync(ctx); + } r.ctx = ctx r.ch = make(chan *entry.Entry, 1024) go func() { @@ -116,6 +145,29 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entr return []chan *entry.Entry{r.ch} } +func (r *syncStandaloneReader) StartReadWithSync(ctx context.Context) []chan *entry.Entry { + r.ctx = ctx + r.ch = make(chan *entry.Entry, 1024) + go func() { + //r.sendReplconfListenPort() + r.sendSync() + rdbFilePath := r.receiveRDB() + startOffset := r.stat.AofReceivedOffset + //go r.sendReplconfAck() // start sent replconf ack + go r.receiveAOF(r.rd) + if r.opts.SyncRdb { + r.sendRDB(rdbFilePath) + } + if r.opts.SyncAof { + r.stat.Status = kSyncAof + r.sendAOF(startOffset) + } + close(r.ch) + }() + + return []chan *entry.Entry{r.ch} +} + func (r *syncStandaloneReader) sendReplconfListenPort() { // use status_port as redis-shake port argv := []interface{}{"replconf", "listening-port", strconv.Itoa(config.Opt.Advanced.StatusPort)} @@ -166,6 +218,40 @@ func (r *syncStandaloneReader) sendPSync() { r.stat.AofReceivedOffset = int64(masterOffset) } +func (r *syncStandaloneReader) sendSync() { + if r.opts.TryDiskless { + argv := []interface{}{"REPLCONF", "CAPA", "EOF"} + reply := r.client.DoWithStringReply(argv...) + if reply != "OK" { + log.Warnf("[%s] send replconf capa eof to redis server failed. reply=[%v]", r.stat.Name, reply) + } + } + // send SYNC + argv := []interface{}{"SYNC"} + if config.Opt.Advanced.AwsPSync != "" { + argv = []interface{}{config.Opt.Advanced.GetPSyncCommand(r.stat.Address), "?", "-1"} + } + r.client.Send(argv...) + + // format: \n\n\n+\r\n + for { + select { + case <-r.ctx.Done(): + close(r.ch) + runtime.Goexit() // stop goroutine + default: + } + bytes, err := r.rd.Peek(1) + if err != nil { + log.Panicf(err.Error()) + } + if bytes[0] != '\n' { + break + } + r.rd.ReadByte() + } +} + func (r *syncStandaloneReader) receiveRDB() string { log.Debugf("[%s] source db is doing bgsave.", r.stat.Name) r.stat.Status = kWaitBgsave