Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sync instead of psync while Redis version is less than 2.8.0. #874

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type syncStandaloneReader struct {
AofReceivedBytes int64 `json:"aof_received_bytes"` // bytes of AOF received from master
AofReceivedHuman string `json:"aof_received_human"`
}

// version info
SupportPSYNC bool
}

func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reader {
Expand All @@ -90,10 +93,36 @@ func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reade
r.stat.Status = kHandShake
r.stat.Dir = utils.GetAbsPath(r.stat.Name)
utils.CreateEmptyDir(r.stat.Dir)

r.SupportPSYNC = r.supportPSYNC();
return r
}


func (r *syncStandaloneReader) supportPSYNC() bool {
reply := r.client.DoWithStringReply("info", "server")
for _, line := range strings.Split(reply, "\n") {
if strings.HasPrefix(line, "redis_version:") {
version := strings.Split(line, ":")[1]
parts := strings.Split(version,".");
if len(parts) > 2{
v1,_ := strconv.Atoi(parts[0]);
v2,_ := strconv.Atoi(parts[1]);
if v1 * 1000 + v2 < 2008{
return false
}
}

}
}

return true;
}

func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry {
if !r.SupportPSYNC{
return r.StartReadWithSync(ctx);
}
r.ctx = ctx
r.ch = make(chan *entry.Entry, 1024)
go func() {
Expand All @@ -116,6 +145,29 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entr
return []chan *entry.Entry{r.ch}
}

func (r *syncStandaloneReader) StartReadWithSync(ctx context.Context) []chan *entry.Entry {
r.ctx = ctx
r.ch = make(chan *entry.Entry, 1024)
go func() {
//r.sendReplconfListenPort()
r.sendSync()
rdbFilePath := r.receiveRDB()
startOffset := r.stat.AofReceivedOffset
//go r.sendReplconfAck() // start sent replconf ack
go r.receiveAOF(r.rd)
if r.opts.SyncRdb {
r.sendRDB(rdbFilePath)
}
if r.opts.SyncAof {
r.stat.Status = kSyncAof
r.sendAOF(startOffset)
}
close(r.ch)
}()

return []chan *entry.Entry{r.ch}
}

func (r *syncStandaloneReader) sendReplconfListenPort() {
// use status_port as redis-shake port
argv := []interface{}{"replconf", "listening-port", strconv.Itoa(config.Opt.Advanced.StatusPort)}
Expand Down Expand Up @@ -166,6 +218,40 @@ func (r *syncStandaloneReader) sendPSync() {
r.stat.AofReceivedOffset = int64(masterOffset)
}

func (r *syncStandaloneReader) sendSync() {
if r.opts.TryDiskless {
argv := []interface{}{"REPLCONF", "CAPA", "EOF"}
reply := r.client.DoWithStringReply(argv...)
if reply != "OK" {
log.Warnf("[%s] send replconf capa eof to redis server failed. reply=[%v]", r.stat.Name, reply)
}
}
// send SYNC
argv := []interface{}{"SYNC"}
if config.Opt.Advanced.AwsPSync != "" {
argv = []interface{}{config.Opt.Advanced.GetPSyncCommand(r.stat.Address), "?", "-1"}
}
r.client.Send(argv...)

// format: \n\n\n+<reply>\r\n
for {
select {
case <-r.ctx.Done():
close(r.ch)
runtime.Goexit() // stop goroutine
default:
}
bytes, err := r.rd.Peek(1)
if err != nil {
log.Panicf(err.Error())
}
if bytes[0] != '\n' {
break
}
r.rd.ReadByte()
}
}

func (r *syncStandaloneReader) receiveRDB() string {
log.Debugf("[%s] source db is doing bgsave.", r.stat.Name)
r.stat.Status = kWaitBgsave
Expand Down
Loading