Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support gracefully shutdown #728

Merged
merged 1 commit into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package main

import (
"os"
"os/signal"
"syscall"
"context"
_ "net/http/pprof"

"RedisShake/internal/config"
Expand Down Expand Up @@ -107,7 +111,10 @@ func main() {

log.Infof("start syncing...")

ch := theReader.StartRead()
ctx, cancel := context.WithCancel(context.Background())
ch := theReader.StartRead(ctx)
go waitShutdown(cancel)

for e := range ch {
// calc arguments
e.Parse()
Expand All @@ -129,3 +136,11 @@ func main() {
utils.ReleaseFileLock() // Release file lock
log.Infof("all done")
}

func waitShutdown(cancel context.CancelFunc) {
quitCh := make(chan os.Signal, 1)
signal.Notify(quitCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
sig := <-quitCh
log.Infof("Got signal: %s to exit.", sig)
cancel()
}
16 changes: 8 additions & 8 deletions internal/aof/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aof

import (
"bufio"
"context"
"io"
"os"
"strconv"
Expand Down Expand Up @@ -51,7 +52,7 @@ func ReadCompleteLine(reader *bufio.Reader) ([]byte, error) {
return line, err
}

func (ld *Loader) LoadSingleAppendOnlyFile(timestamp int64) int {
func (ld *Loader) LoadSingleAppendOnlyFile(ctx context.Context, timestamp int64) int {
ret := OK
filePath := ld.filePath
fp, err := os.Open(filePath)
Expand Down Expand Up @@ -80,12 +81,14 @@ func (ld *Loader) LoadSingleAppendOnlyFile(timestamp int64) int {
}
reader := bufio.NewReader(fp)
for {

line, err := ReadCompleteLine(reader)
{
select {
case <-ctx.Done():
return ret
default:
line, err := ReadCompleteLine(reader)
if err != nil {
if err == io.EOF {
break
return ret
} else {
log.Infof("Unrecoverable error reading the append only File %v: %v", filePath, err)
ret = Failed
Expand Down Expand Up @@ -152,9 +155,6 @@ func (ld *Loader) LoadSingleAppendOnlyFile(timestamp int64) int {
e.Argv = append(e.Argv, value)
}
ld.ch <- e

}

}
return ret
}
8 changes: 8 additions & 0 deletions internal/client/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

type Redis struct {
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
protoReader *proto.Reader
Expand All @@ -33,6 +34,7 @@ func NewRedisClient(address string, username string, password string, Tls bool)
log.Panicf("dial failed. address=[%s], tls=[%v], err=[%v]", address, Tls, err)
}

r.conn = conn
r.reader = bufio.NewReader(conn)
r.writer = bufio.NewWriter(conn)
r.protoReader = proto.NewReader(r.reader)
Expand Down Expand Up @@ -129,6 +131,12 @@ func (r *Redis) SetBufioReader(rd *bufio.Reader) {
r.protoReader = proto.NewReader(r.reader)
}

func (r *Redis) Close() {
if err := r.conn.Close(); err != nil {
log.Infof("close redis conn err: %s\n", err.Error())
}
}

/* Commands */

func (r *Redis) Scan(cursor uint64) (newCursor uint64, keys []string) {
Expand Down
9 changes: 6 additions & 3 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rdb
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"io"
"os"
Expand Down Expand Up @@ -60,7 +61,7 @@ func NewLoader(name string, updateFunc func(int64), filPath string, ch chan *ent

// ParseRDB parse rdb file
// return repl stream db id
func (ld *Loader) ParseRDB() int {
func (ld *Loader) ParseRDB(ctx context.Context) int {
var err error
ld.fp, err = os.OpenFile(ld.filPath, os.O_RDONLY, 0666)
if err != nil {
Expand Down Expand Up @@ -89,12 +90,12 @@ func (ld *Loader) ParseRDB() int {
log.Debugf("[%s] RDB version: %d", ld.name, version)

// read entries
ld.parseRDBEntry(rd)
ld.parseRDBEntry(ctx, rd)

return ld.replStreamDbId
}

func (ld *Loader) parseRDBEntry(rd *bufio.Reader) {
func (ld *Loader) parseRDBEntry(ctx context.Context, rd *bufio.Reader) {
// for stat
updateProcessSize := func() {
if ld.updateFunc == nil {
Expand Down Expand Up @@ -198,6 +199,8 @@ func (ld *Loader) parseRDBEntry(rd *bufio.Reader) {
select {
case <-tick:
updateProcessSize()
case <- ctx.Done():
return
default:
}
}
Expand Down
8 changes: 4 additions & 4 deletions internal/reader/aof_reader.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package reader

import (
"context"
"path/filepath"

"RedisShake/internal/aof"

"RedisShake/internal/entry"
"RedisShake/internal/log"
"RedisShake/internal/utils"
Expand Down Expand Up @@ -66,7 +66,7 @@ func NewAOFReader(opts *AOFReaderOptions) Reader {
return r
}

func (r *aofReader) StartRead() chan *entry.Entry {
func (r *aofReader) StartRead(ctx context.Context) chan *entry.Entry {
//init entry
r.ch = make(chan *entry.Entry, 1024)

Expand All @@ -79,7 +79,7 @@ func (r *aofReader) StartRead() chan *entry.Entry {
if manifestInfo == nil { // load single aof file
log.Infof("start send single AOF path=[%s]", r.path)
aofLoader := aof.NewLoader(r.path, r.ch)
ret := aofLoader.LoadSingleAppendOnlyFile(r.stat.AOFTimestamp)
ret := aofLoader.LoadSingleAppendOnlyFile(ctx, r.stat.AOFTimestamp)
if ret == AOFOk || ret == AOFTruncated {
log.Infof("The AOF File was successfully loaded")
} else {
Expand All @@ -89,7 +89,7 @@ func (r *aofReader) StartRead() chan *entry.Entry {
close(r.ch)
} else {
aofLoader := NewAOFFileInfo(r.path, r.ch)
ret := aofLoader.LoadAppendOnlyFile(manifestInfo, r.stat.AOFTimestamp)
ret := aofLoader.LoadAppendOnlyFile(ctx, manifestInfo, r.stat.AOFTimestamp)
if ret == AOFOk || ret == AOFTruncated {
log.Infof("The AOF File was successfully loaded")
} else {
Expand Down
5 changes: 3 additions & 2 deletions internal/reader/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package reader
import (
"RedisShake/internal/entry"
"RedisShake/internal/status"
"context"
)

type Reader interface {
status.Statusable
StartRead() chan *entry.Entry
}
StartRead(ctx context.Context) chan *entry.Entry
}
15 changes: 8 additions & 7 deletions internal/reader/parsing_aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"container/list"
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -555,7 +556,7 @@ func GetHistoryAndIncrAppendOnlyFilesNum(am *AOFManifest) int {
return num
}

func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int {
func (aofInfo *INFO) LoadAppendOnlyFile(ctx context.Context, am *AOFManifest, AOFTimeStamp int64) int {
if am == nil {
log.Panicf("AOFManifest is null")
}
Expand Down Expand Up @@ -593,7 +594,7 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int
aofInfo.UpdateLoadingFileName(AOFName)
BaseSize = aofInfo.GetAppendOnlyFileSize(AOFName, nil)
start = Ustime()
ret = aofInfo.ParsingSingleAppendOnlyFile(AOFName, 0) //Currently, RDB files cannot be restored at a point in time.
ret = aofInfo.ParsingSingleAppendOnlyFile(ctx, AOFName, 0) //Currently, RDB files cannot be restored at a point in time.
if ret == AOFOk || (ret == AOFTruncated) {
log.Infof("DB loaded from Base File %v: %.3f seconds", AOFName, float64(Ustime()-start)/1000000)
}
Expand Down Expand Up @@ -627,7 +628,7 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int
aofInfo.UpdateLoadingFileName(AOFName)
AOFNum++
start = Ustime()
ret = aofInfo.ParsingSingleAppendOnlyFile(AOFName, AOFTimeStamp)
ret = aofInfo.ParsingSingleAppendOnlyFile(ctx, AOFName, AOFTimeStamp)
if ret == AOFOk || (ret == AOFTruncated) {
log.Infof("DB loaded from History File %v: %.3f seconds", AOFName, float64(Ustime()-start)/1000000)
return ret
Expand Down Expand Up @@ -659,7 +660,7 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int
aofInfo.UpdateLoadingFileName(AOFName)
AOFNum++
start = Ustime()
ret = aofInfo.ParsingSingleAppendOnlyFile(AOFName, AOFTimeStamp)
ret = aofInfo.ParsingSingleAppendOnlyFile(ctx, AOFName, AOFTimeStamp)
if ret == AOFOk || (ret == AOFTruncated) {
log.Infof("DB loaded from incr File %v: %.3f seconds", AOFName, float64(Ustime()-start)/1000000)
return ret
Expand Down Expand Up @@ -691,7 +692,7 @@ func (aofInfo *INFO) LoadAppendOnlyFile(am *AOFManifest, AOFTimeStamp int64) int

}

func (aofInfo *INFO) ParsingSingleAppendOnlyFile(FileName string, AOFTimeStamp int64) int {
func (aofInfo *INFO) ParsingSingleAppendOnlyFile(ctx context.Context, FileName string, AOFTimeStamp int64) int {
ret := AOFOk
AOFFilepath := path.Join(aofInfo.AOFDirName, FileName)
println(AOFFilepath)
Expand Down Expand Up @@ -725,12 +726,12 @@ func (aofInfo *INFO) ParsingSingleAppendOnlyFile(FileName string, AOFTimeStamp i
log.Infof("Reading RDB Base File on AOF loading...")
rdbOpt := RdbReaderOptions{Filepath: AOFFilepath}
ldRDB := NewRDBReader(&rdbOpt)
ldRDB.StartRead()
ldRDB.StartRead(ctx)
return AOFOk
}
// load single aof file
aofSingleReader := aof.NewLoader(MakePath(aofInfo.AOFDirName, FileName), aofInfo.ch)
ret = aofSingleReader.LoadSingleAppendOnlyFile(AOFTimeStamp)
ret = aofSingleReader.LoadSingleAppendOnlyFile(ctx, AOFTimeStamp)
return ret

}
6 changes: 3 additions & 3 deletions internal/reader/rdb_reader.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package reader

import (
"context"
"fmt"

"RedisShake/internal/entry"
"RedisShake/internal/log"
"RedisShake/internal/rdb"
"RedisShake/internal/utils"

"github.com/dustin/go-humanize"
)

Expand Down Expand Up @@ -41,7 +41,7 @@ func NewRDBReader(opts *RdbReaderOptions) Reader {
return r
}

func (r *rdbReader) StartRead() chan *entry.Entry {
func (r *rdbReader) StartRead(ctx context.Context) chan *entry.Entry {
log.Infof("[%s] start read", r.stat.Name)
r.ch = make(chan *entry.Entry, 1024)
updateFunc := func(offset int64) {
Expand All @@ -53,7 +53,7 @@ func (r *rdbReader) StartRead() chan *entry.Entry {
rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, r.stat.Filepath, r.ch)

go func() {
_ = rdbLoader.ParseRDB()
_ = rdbLoader.ParseRDB(ctx)
log.Infof("[%s] rdb file parse done", r.stat.Name)
close(r.ch)
}()
Expand Down
5 changes: 3 additions & 2 deletions internal/reader/scan_cluster_reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package reader

import (
"context"
"fmt"
"sync"

Expand All @@ -25,13 +26,13 @@ func NewScanClusterReader(opts *ScanReaderOptions) Reader {
return rd
}

func (rd *scanClusterReader) StartRead() chan *entry.Entry {
func (rd *scanClusterReader) StartRead(ctx context.Context) chan *entry.Entry {
ch := make(chan *entry.Entry, 1024)
var wg sync.WaitGroup
for _, r := range rd.readers {
wg.Add(1)
go func(r Reader) {
for e := range r.StartRead() {
for e := range r.StartRead(ctx) {
ch <- e
}
wg.Done()
Expand Down
Loading
Loading