From cb4361dceb26f3b40b02be4f998e05a81dcc1920 Mon Sep 17 00:00:00 2001 From: skyjiang <470623352@qq.com> Date: Thu, 14 Dec 2023 17:03:47 +0800 Subject: [PATCH] support gracefully shutdown --- cmd/redis-shake/main.go | 17 +++- internal/aof/aof.go | 16 ++-- internal/client/redis.go | 8 ++ internal/rdb/rdb.go | 9 +- internal/reader/aof_reader.go | 8 +- internal/reader/interface.go | 5 +- internal/reader/parsing_aof.go | 15 ++-- internal/reader/rdb_reader.go | 6 +- internal/reader/scan_cluster_reader.go | 5 +- internal/reader/scan_standalone_reader.go | 33 ++++--- internal/reader/sync_cluster_reader.go | 5 +- internal/reader/sync_standalone_reader.go | 103 +++++++++++++--------- 12 files changed, 145 insertions(+), 85 deletions(-) diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go index 705fdb27..28d741d1 100644 --- a/cmd/redis-shake/main.go +++ b/cmd/redis-shake/main.go @@ -1,6 +1,10 @@ package main import ( + "os" + "os/signal" + "syscall" + "context" _ "net/http/pprof" "RedisShake/internal/config" @@ -107,7 +111,10 @@ func main() { log.Infof("start syncing...") - ch := theReader.StartRead() + ctx, cancel := context.WithCancel(context.Background()) + ch := theReader.StartRead(ctx) + go waitShutdown(cancel) + for e := range ch { // calc arguments e.Parse() @@ -129,3 +136,11 @@ func main() { utils.ReleaseFileLock() // Release file lock log.Infof("all done") } + +func waitShutdown(cancel context.CancelFunc) { + quitCh := make(chan os.Signal, 1) + signal.Notify(quitCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + sig := <-quitCh + log.Infof("Got signal: %s to exit.", sig) + cancel() +} \ No newline at end of file diff --git a/internal/aof/aof.go b/internal/aof/aof.go index 8a3943da..2f86add4 100644 --- a/internal/aof/aof.go +++ b/internal/aof/aof.go @@ -2,6 +2,7 @@ package aof import ( "bufio" + "context" "io" "os" "strconv" @@ -51,7 +52,7 @@ func ReadCompleteLine(reader *bufio.Reader) ([]byte, error) { return line, err } -func (ld *Loader) LoadSingleAppendOnlyFile(timestamp int64) int { +func (ld *Loader) LoadSingleAppendOnlyFile(ctx context.Context, timestamp int64) int { ret := OK filePath := ld.filePath fp, err := os.Open(filePath) @@ -80,12 +81,14 @@ func (ld *Loader) LoadSingleAppendOnlyFile(timestamp int64) int { } reader := bufio.NewReader(fp) for { - - line, err := ReadCompleteLine(reader) - { + select { + case <-ctx.Done(): + return ret + default: + line, err := ReadCompleteLine(reader) if err != nil { if err == io.EOF { - break + return ret } else { log.Infof("Unrecoverable error reading the append only File %v: %v", filePath, err) ret = Failed @@ -152,9 +155,6 @@ func (ld *Loader) LoadSingleAppendOnlyFile(timestamp int64) int { e.Argv = append(e.Argv, value) } ld.ch <- e - } - } - return ret } diff --git a/internal/client/redis.go b/internal/client/redis.go index 848ce90d..5672bf5c 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -12,6 +12,7 @@ import ( ) type Redis struct { + conn net.Conn reader *bufio.Reader writer *bufio.Writer protoReader *proto.Reader @@ -33,6 +34,7 @@ func NewRedisClient(address string, username string, password string, Tls bool) log.Panicf("dial failed. address=[%s], tls=[%v], err=[%v]", address, Tls, err) } + r.conn = conn r.reader = bufio.NewReader(conn) r.writer = bufio.NewWriter(conn) r.protoReader = proto.NewReader(r.reader) @@ -129,6 +131,12 @@ func (r *Redis) SetBufioReader(rd *bufio.Reader) { r.protoReader = proto.NewReader(r.reader) } +func (r *Redis) Close() { + if err := r.conn.Close(); err != nil { + log.Infof("close redis conn err: %s\n", err.Error()) + } +} + /* Commands */ func (r *Redis) Scan(cursor uint64) (newCursor uint64, keys []string) { diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 357ba670..748d3e14 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -3,6 +3,7 @@ package rdb import ( "bufio" "bytes" + "context" "encoding/binary" "io" "os" @@ -60,7 +61,7 @@ func NewLoader(name string, updateFunc func(int64), filPath string, ch chan *ent // ParseRDB parse rdb file // return repl stream db id -func (ld *Loader) ParseRDB() int { +func (ld *Loader) ParseRDB(ctx context.Context) int { var err error ld.fp, err = os.OpenFile(ld.filPath, os.O_RDONLY, 0666) if err != nil { @@ -89,12 +90,12 @@ func (ld *Loader) ParseRDB() int { log.Debugf("[%s] RDB version: %d", ld.name, version) // read entries - ld.parseRDBEntry(rd) + ld.parseRDBEntry(ctx, rd) return ld.replStreamDbId } -func (ld *Loader) parseRDBEntry(rd *bufio.Reader) { +func (ld *Loader) parseRDBEntry(ctx context.Context, rd *bufio.Reader) { // for stat updateProcessSize := func() { if ld.updateFunc == nil { @@ -198,6 +199,8 @@ func (ld *Loader) parseRDBEntry(rd *bufio.Reader) { select { case <-tick: updateProcessSize() + case <- ctx.Done(): + return default: } } diff --git a/internal/reader/aof_reader.go b/internal/reader/aof_reader.go index f18b9248..d21d1443 100644 --- a/internal/reader/aof_reader.go +++ b/internal/reader/aof_reader.go @@ -1,10 +1,10 @@ package reader import ( + "context" "path/filepath" "RedisShake/internal/aof" - "RedisShake/internal/entry" "RedisShake/internal/log" "RedisShake/internal/utils" @@ -66,7 +66,7 @@ func NewAOFReader(opts *AOFReaderOptions) Reader { return r } -func (r *aofReader) StartRead() chan *entry.Entry { +func (r *aofReader) StartRead(ctx context.Context) chan *entry.Entry { //init entry r.ch = make(chan *entry.Entry, 1024) @@ -79,7 +79,7 @@ func (r *aofReader) StartRead() chan *entry.Entry { if manifestInfo == nil { // load single aof file log.Infof("start send single AOF path=[%s]", r.path) aofLoader := aof.NewLoader(r.path, r.ch) - ret := aofLoader.LoadSingleAppendOnlyFile(r.stat.AOFTimestamp) + ret := aofLoader.LoadSingleAppendOnlyFile(ctx, r.stat.AOFTimestamp) if ret == AOFOk || ret == AOFTruncated { log.Infof("The AOF File was successfully loaded") } else { @@ -89,7 +89,7 @@ func (r *aofReader) StartRead() chan *entry.Entry { close(r.ch) } else { aofLoader := NewAOFFileInfo(r.path, r.ch) - ret := aofLoader.LoadAppendOnlyFile(manifestInfo, r.stat.AOFTimestamp) + ret := aofLoader.LoadAppendOnlyFile(ctx, manifestInfo, r.stat.AOFTimestamp) if ret == AOFOk || ret == AOFTruncated { log.Infof("The AOF File was successfully loaded") } else { diff --git a/internal/reader/interface.go b/internal/reader/interface.go index 7ddc0d0a..9a84104d 100644 --- a/internal/reader/interface.go +++ b/internal/reader/interface.go @@ -3,9 +3,10 @@ package reader import ( "RedisShake/internal/entry" "RedisShake/internal/status" + "context" ) type Reader interface { status.Statusable - StartRead() chan *entry.Entry -} + StartRead(ctx context.Context) chan *entry.Entry +} \ No newline at end of file diff --git a/internal/reader/parsing_aof.go b/internal/reader/parsing_aof.go index 06865cfd..a71c3d55 100644 --- a/internal/reader/parsing_aof.go +++ b/internal/reader/parsing_aof.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "container/list" + "context" "fmt" "io" "os" @@ -555,7 +556,7 @@ func GetHistoryAndIncrAppendOnlyFilesNum(am *AOFManifest) int { return num } -func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int { +func (aofInfo *INFO) LoadAppendOnlyFile(ctx context.Context, am *AOFManifest, AOFTimeStamp int64) int { if am == nil { log.Panicf("AOFManifest is null") } @@ -593,7 +594,7 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int aofInfo.UpdateLoadingFileName(AOFName) BaseSize = aofInfo.GetAppendOnlyFileSize(AOFName, nil) start = Ustime() - ret = aofInfo.ParsingSingleAppendOnlyFile(AOFName, 0) //Currently, RDB files cannot be restored at a point in time. + ret = aofInfo.ParsingSingleAppendOnlyFile(ctx, AOFName, 0) //Currently, RDB files cannot be restored at a point in time. if ret == AOFOk || (ret == AOFTruncated) { log.Infof("DB loaded from Base File %v: %.3f seconds", AOFName, float64(Ustime()-start)/1000000) } @@ -627,7 +628,7 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int aofInfo.UpdateLoadingFileName(AOFName) AOFNum++ start = Ustime() - ret = aofInfo.ParsingSingleAppendOnlyFile(AOFName, AOFTimeStamp) + ret = aofInfo.ParsingSingleAppendOnlyFile(ctx, AOFName, AOFTimeStamp) if ret == AOFOk || (ret == AOFTruncated) { log.Infof("DB loaded from History File %v: %.3f seconds", AOFName, float64(Ustime()-start)/1000000) return ret @@ -659,7 +660,7 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int aofInfo.UpdateLoadingFileName(AOFName) AOFNum++ start = Ustime() - ret = aofInfo.ParsingSingleAppendOnlyFile(AOFName, AOFTimeStamp) + ret = aofInfo.ParsingSingleAppendOnlyFile(ctx, AOFName, AOFTimeStamp) if ret == AOFOk || (ret == AOFTruncated) { log.Infof("DB loaded from incr File %v: %.3f seconds", AOFName, float64(Ustime()-start)/1000000) return ret @@ -691,7 +692,7 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int } -func (aofInfo *INFO) ParsingSingleAppendOnlyFile(FileName string, AOFTimeStamp int64) int { +func (aofInfo *INFO) ParsingSingleAppendOnlyFile(ctx context.Context, FileName string, AOFTimeStamp int64) int { ret := AOFOk AOFFilepath := path.Join(aofInfo.AOFDirName, FileName) println(AOFFilepath) @@ -725,12 +726,12 @@ func (aofInfo *INFO) ParsingSingleAppendOnlyFile(FileName string, AOFTimeStamp i log.Infof("Reading RDB Base File on AOF loading...") rdbOpt := RdbReaderOptions{Filepath: AOFFilepath} ldRDB := NewRDBReader(&rdbOpt) - ldRDB.StartRead() + ldRDB.StartRead(ctx) return AOFOk } // load single aof file aofSingleReader := aof.NewLoader(MakePath(aofInfo.AOFDirName, FileName), aofInfo.ch) - ret = aofSingleReader.LoadSingleAppendOnlyFile(AOFTimeStamp) + ret = aofSingleReader.LoadSingleAppendOnlyFile(ctx, AOFTimeStamp) return ret } diff --git a/internal/reader/rdb_reader.go b/internal/reader/rdb_reader.go index 0088880c..f99b49ce 100644 --- a/internal/reader/rdb_reader.go +++ b/internal/reader/rdb_reader.go @@ -1,13 +1,13 @@ package reader import ( + "context" "fmt" "RedisShake/internal/entry" "RedisShake/internal/log" "RedisShake/internal/rdb" "RedisShake/internal/utils" - "github.com/dustin/go-humanize" ) @@ -41,7 +41,7 @@ func NewRDBReader(opts *RdbReaderOptions) Reader { return r } -func (r *rdbReader) StartRead() chan *entry.Entry { +func (r *rdbReader) StartRead(ctx context.Context) chan *entry.Entry { log.Infof("[%s] start read", r.stat.Name) r.ch = make(chan *entry.Entry, 1024) updateFunc := func(offset int64) { @@ -53,7 +53,7 @@ func (r *rdbReader) StartRead() chan *entry.Entry { rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, r.stat.Filepath, r.ch) go func() { - _ = rdbLoader.ParseRDB() + _ = rdbLoader.ParseRDB(ctx) log.Infof("[%s] rdb file parse done", r.stat.Name) close(r.ch) }() diff --git a/internal/reader/scan_cluster_reader.go b/internal/reader/scan_cluster_reader.go index 133942e6..f3a39dc3 100644 --- a/internal/reader/scan_cluster_reader.go +++ b/internal/reader/scan_cluster_reader.go @@ -1,6 +1,7 @@ package reader import ( + "context" "fmt" "sync" @@ -25,13 +26,13 @@ func NewScanClusterReader(opts *ScanReaderOptions) Reader { return rd } -func (rd *scanClusterReader) StartRead() chan *entry.Entry { +func (rd *scanClusterReader) StartRead(ctx context.Context) chan *entry.Entry { ch := make(chan *entry.Entry, 1024) var wg sync.WaitGroup for _, r := range rd.readers { wg.Add(1) go func(r Reader) { - for e := range r.StartRead() { + for e := range r.StartRead(ctx) { ch <- e } wg.Done() diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index 81d62741..9ebf5567 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -1,6 +1,7 @@ package reader import ( + "context" "fmt" "math/bits" "regexp" @@ -32,6 +33,7 @@ type dbKey struct { } type scanStandaloneReader struct { + ctx context.Context dbs []int opts *ScanReaderOptions ch chan *entry.Entry @@ -67,7 +69,8 @@ func NewScanStandaloneReader(opts *ScanReaderOptions) Reader { return r } -func (r *scanStandaloneReader) StartRead() chan *entry.Entry { +func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry { + r.ctx = ctx r.subscript() go r.scan() go r.fetch() @@ -88,23 +91,30 @@ func (r *scanStandaloneReader) subscript() { } regex := regexp.MustCompile(`\d+`) for { - resp, err := c.Receive() - if err != nil { - log.Panicf(err.Error()) + select { + case <-r.ctx.Done(): + close(r.keyQueue.Ch) + return + default: + resp, err := c.Receive() + if err != nil { + log.Panicf(err.Error()) + } + key := resp.([]interface{})[3].(string) + dbId := regex.FindString(resp.([]interface{})[2].(string)) + dbIdInt, err := strconv.Atoi(dbId) + if err != nil { + log.Panicf(err.Error()) + } + r.keyQueue.Put(dbKey{db: dbIdInt, key: key}) } - key := resp.([]interface{})[3].(string) - dbId := regex.FindString(resp.([]interface{})[2].(string)) - dbIdInt, err := strconv.Atoi(dbId) - if err != nil { - log.Panicf(err.Error()) - } - r.keyQueue.Put(dbKey{db: dbIdInt, key: key}) } }() } func (r *scanStandaloneReader) scan() { c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + defer c.Close() for _, dbId := range r.dbs { if dbId != 0 { reply := c.DoWithStringReply("SELECT", strconv.Itoa(dbId)) @@ -140,6 +150,7 @@ func (r *scanStandaloneReader) scan() { func (r *scanStandaloneReader) fetch() { nowDbId := 0 c := client.NewRedisClient(r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + defer c.Close() for item := range r.keyQueue.Ch { r.stat.NeedUpdateCount = int64(r.keyQueue.Len()) dbId := item.(dbKey).db diff --git a/internal/reader/sync_cluster_reader.go b/internal/reader/sync_cluster_reader.go index b8a44de0..9dafaccc 100644 --- a/internal/reader/sync_cluster_reader.go +++ b/internal/reader/sync_cluster_reader.go @@ -1,6 +1,7 @@ package reader import ( + "context" "fmt" "sync" @@ -29,14 +30,14 @@ func NewSyncClusterReader(opts *SyncReaderOptions) Reader { return rd } -func (rd *syncClusterReader) StartRead() chan *entry.Entry { +func (rd *syncClusterReader) StartRead(ctx context.Context) chan *entry.Entry { ch := make(chan *entry.Entry, 1024) var wg sync.WaitGroup for _, r := range rd.readers { wg.Add(1) go func(r Reader) { defer wg.Done() - for e := range r.StartRead() { + for e := range r.StartRead(ctx) { ch <- e } }(r) diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index 002e2387..4cdef2ef 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -1,6 +1,7 @@ package reader import ( + "context" "bufio" "fmt" "io" @@ -16,7 +17,7 @@ import ( "RedisShake/internal/log" "RedisShake/internal/rdb" "RedisShake/internal/utils" - "RedisShake/internal/utils/file_rotate" + rotate "RedisShake/internal/utils/file_rotate" "github.com/dustin/go-humanize" ) @@ -42,6 +43,7 @@ const ( ) type syncStandaloneReader struct { + ctx context.Context opts *SyncReaderOptions client *client.Redis @@ -87,7 +89,8 @@ func NewSyncStandaloneReader(opts *SyncReaderOptions) Reader { return r } -func (r *syncStandaloneReader) StartRead() chan *entry.Entry { +func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry { + r.ctx = ctx r.ch = make(chan *entry.Entry, 1024) go func() { r.sendReplconfListenPort() @@ -104,6 +107,7 @@ func (r *syncStandaloneReader) StartRead() chan *entry.Entry { r.sendAOF(startOffset) } close(r.ch) + r.client.Close() }() return r.ch @@ -226,14 +230,19 @@ func (r *syncStandaloneReader) receiveAOF(rd io.Reader) { defer aofWriter.Close() buf := make([]byte, 16*1024) // 16KB is enough for writing file for { - n, err := rd.Read(buf) - if err != nil { - log.Panicf(err.Error()) + select { + case <-r.ctx.Done(): + return + default: + n, err := rd.Read(buf) + if err != nil { + log.Panicf(err.Error()) + } + r.stat.AofReceivedBytes += int64(n) + r.stat.AofReceivedHuman = humanize.IBytes(uint64(r.stat.AofReceivedBytes)) + aofWriter.Write(buf[:n]) + r.stat.AofReceivedOffset += int64(n) } - r.stat.AofReceivedBytes += int64(n) - r.stat.AofReceivedHuman = humanize.IBytes(uint64(r.stat.AofReceivedBytes)) - aofWriter.Write(buf[:n]) - r.stat.AofReceivedOffset += int64(n) } } @@ -246,7 +255,7 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) { r.stat.RdbSentHuman = humanize.IBytes(uint64(offset)) } rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, rdbFilePath, r.ch) - r.DbId = rdbLoader.ParseRDB() + r.DbId = rdbLoader.ParseRDB(r.ctx) log.Debugf("[%s] send RDB finished", r.stat.Name) // delete file _ = os.Remove(rdbFilePath) @@ -259,46 +268,56 @@ func (r *syncStandaloneReader) sendAOF(offset int64) { defer aofReader.Close() r.client.SetBufioReader(bufio.NewReader(aofReader)) for { - argv := client.ArrayString(r.client.Receive()) - r.stat.AofSentOffset = aofReader.Offset() - // select - if strings.EqualFold(argv[0], "select") { - DbId, err := strconv.Atoi(argv[1]) - if err != nil { - log.Panicf(err.Error()) + select { + case <-r.ctx.Done(): + return + default: + argv := client.ArrayString(r.client.Receive()) + r.stat.AofSentOffset = aofReader.Offset() + // select + if strings.EqualFold(argv[0], "select") { + DbId, err := strconv.Atoi(argv[1]) + if err != nil { + log.Panicf(err.Error()) + } + r.DbId = DbId + continue + } + // ping + if strings.EqualFold(argv[0], "ping") { + continue + } + // replconf @AWS + if strings.EqualFold(argv[0], "replconf") { + continue + } + // opinfo @Aliyun + if strings.EqualFold(argv[0], "opinfo") { + continue + } + // sentinel + if strings.EqualFold(argv[0], "publish") && strings.EqualFold(argv[1], "__sentinel__:hello") { + continue } - r.DbId = DbId - continue - } - // ping - if strings.EqualFold(argv[0], "ping") { - continue - } - // replconf @AWS - if strings.EqualFold(argv[0], "replconf") { - continue - } - // opinfo @Aliyun - if strings.EqualFold(argv[0], "opinfo") { - continue - } - // sentinel - if strings.EqualFold(argv[0], "publish") && strings.EqualFold(argv[1], "__sentinel__:hello") { - continue - } - e := entry.NewEntry() - e.Argv = argv - e.DbId = r.DbId - r.ch <- e + e := entry.NewEntry() + e.Argv = argv + e.DbId = r.DbId + r.ch <- e + } } } // sendReplconfAck send replconf ack to master to keep heartbeat between redis-shake and source redis. func (r *syncStandaloneReader) sendReplconfAck() { for range time.Tick(time.Millisecond * 100) { - if r.stat.AofReceivedOffset != 0 { - r.client.Send("replconf", "ack", strconv.FormatInt(r.stat.AofReceivedOffset, 10)) + select { + case <-r.ctx.Done(): + return + default: + if r.stat.AofReceivedOffset != 0 { + r.client.Send("replconf", "ack", strconv.FormatInt(r.stat.AofReceivedOffset, 10)) + } } } }