Skip to content

Commit

Permalink
K8SPXC-645: improve pitr (percona#790)
Browse files Browse the repository at this point in the history
  • Loading branch information
qJkee authored Mar 8, 2021
1 parent 6fe1bf5 commit 6766b32
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 57 deletions.
176 changes: 122 additions & 54 deletions cmd/pitr/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type Config struct {
}

const (
lastSetFileName string = "last-binlog-set" // name for object where the last binlog set will stored
gtidPostfix string = "-gtid-set" // filename postfix for files with GTID set
lastSetFilePrefix string = "last-binlog-set-" // filename prefix for object where the last binlog set will stored
gtidPostfix string = "-gtid-set" // filename postfix for files with GTID set
)

func New(c Config) (*Collector, error) {
Expand All @@ -59,19 +59,8 @@ func New(c Config) (*Collector, error) {
return nil, errors.Wrap(err, "new storage manager")
}

// get last binlog set stored on S3
lastSetObject, err := s3.GetObject(lastSetFileName)
if err != nil {
return nil, errors.Wrap(err, "get last set content")
}
lastSet, err := ioutil.ReadAll(lastSetObject)
if err != nil && minio.ToErrorResponse(errors.Cause(err)).Code != "NoSuchKey" {
return nil, errors.Wrap(err, "read last gtid set")
}

return &Collector{
storage: s3,
lastSet: string(lastSet),
pxcUser: c.PXCUser,
pxcServiceName: c.PXCServiceName,
}, nil
Expand All @@ -84,6 +73,10 @@ func (c *Collector) Run() error {
}
defer c.close()

// remove last set because we always
// read it from aws file
c.lastSet = ""

err = c.CollectBinLogs()
if err != nil {
return errors.Wrap(err, "collect binlog files")
Expand All @@ -92,6 +85,19 @@ func (c *Collector) Run() error {
return nil
}

func (c *Collector) lastGTIDSet(sourceID string) (string, error) {
// get last binlog set stored on S3
lastSetObject, err := c.storage.GetObject(lastSetFilePrefix + sourceID)
if err != nil {
return "", errors.Wrap(err, "get last set content")
}
lastSet, err := ioutil.ReadAll(lastSetObject)
if err != nil && minio.ToErrorResponse(errors.Cause(err)).Code != "NoSuchKey" {
return "", errors.Wrap(err, "read last gtid set")
}
return string(lastSet), nil
}

func (c *Collector) newDB() error {
file, err := os.Open("/etc/mysql/mysql-users-secret/xtrabackup")
if err != nil {
Expand Down Expand Up @@ -122,48 +128,119 @@ func (c *Collector) close() error {
return c.db.Close()
}

func (c *Collector) CurrentSourceID(logs []pxc.Binlog) (string, error) {
var (
gtidSet string
i int
err error
)
for gtidSet == "" && i < len(logs) {
gtidSet, err = c.db.GetGTIDSet(logs[i].Name)
if err != nil {
return gtidSet, err
}
i++
}
return strings.Split(gtidSet, ":")[0], nil
}

func (c *Collector) removeEmptyBinlogs(logs []pxc.Binlog) ([]pxc.Binlog, error) {
result := make([]pxc.Binlog, 0)
for _, v := range logs {
set, err := c.db.GetGTIDSet(v.Name)
if err != nil {
return nil, errors.Wrap(err, "get GTID set")
}
// we don't upload binlog without gtid
// because it is empty and doesn't have any information
if set != "" {
v.GTIDSet = set
result = append(result, v)
}
}
return result, nil
}

func (c *Collector) filterBinLogs(logs []pxc.Binlog, lastBinlogName string) ([]pxc.Binlog, error) {
if lastBinlogName == "" {
return c.removeEmptyBinlogs(logs)
}

logsLen := len(logs)

startIndex := 0
for logs[startIndex].Name != lastBinlogName && startIndex < logsLen {
startIndex++
}

if startIndex == logsLen {
return nil, nil
}

set, err := c.db.GetGTIDSet(logs[startIndex].Name)
if err != nil {
return nil, errors.Wrap(err, "get gtid set of last uploaded binlog")
}
// we don't need to reupload last file
// if gtid set is not changed
if set == c.lastSet {
startIndex++
}

return c.removeEmptyBinlogs(logs[startIndex:])
}

func (c *Collector) CollectBinLogs() error {
list, err := c.db.GetBinLogList()
if err != nil {
return errors.Wrap(err, "get binlog list")
}

// get last uploaded binlog file name
lastUploadedBinlogName, err := c.db.GetBinLogName(c.lastSet)
sourceID, err := c.CurrentSourceID(list)
if err != nil {
return errors.Wrap(err, "get last uploaded binlog name by gtid set")
return errors.Wrap(err, "get current source id")
}

upload := false
// if there are no uploaded files we going to upload every binlog file
if len(lastUploadedBinlogName) == 0 {
upload = true
if sourceID == "" {
log.Println("No binlogs to upload")
return nil
}

for _, binlog := range list {
binlogSet := ""
// this check is for uploading starting from needed file
if binlog.Name == lastUploadedBinlogName {
binlogSet, err = c.db.GetGTIDSet(binlog.Name)
if err != nil {
return errors.Wrap(err, "get binlog gtid set")
}
if c.lastSet != binlogSet {
upload = true
}
}
if upload {
err = c.manageBinlog(binlog)
if err != nil {
return errors.Wrap(err, "manage binlog")
}
c.lastSet, err = c.lastGTIDSet(sourceID)
if err != nil {
return errors.Wrap(err, "get last uploaded gtid set")
}

lastUploadedBinlogName := ""

if c.lastSet != "" {
// get last uploaded binlog file name
lastUploadedBinlogName, err = c.db.GetBinLogName(c.lastSet)
if err != nil {
return errors.Wrap(err, "get last uploaded binlog name by gtid set")
}
// need this for start uploading files that goes after current
if c.lastSet == binlogSet {
upload = true

if lastUploadedBinlogName == "" {
log.Println("Gap detected in the binary logs. Binary logs will be uploaded anyway, but full backup needed for consistent recovery.")
}
}

list, err = c.filterBinLogs(list, lastUploadedBinlogName)
if err != nil {
return errors.Wrap(err, "filter empty binlogs")
}

if len(list) == 0 {
log.Println("No binlogs to upload")
return nil
}

for _, binlog := range list {
err = c.manageBinlog(binlog)
if err != nil {
return errors.Wrap(err, "manage binlog")
}
}
return nil
}

Expand All @@ -179,27 +256,18 @@ func mergeErrors(a, b error) error {
}

func (c *Collector) manageBinlog(binlog pxc.Binlog) (err error) {
set, err := c.db.GetGTIDSet(binlog.Name)
if err != nil {
return errors.Wrap(err, "get GTID set")
}
// we don't upload binlog without gtid
// because it is empty and doesn't have any information
if len(set) == 0 {
return nil
}

binlogTmstmp, err := c.db.GetBinLogFirstTimestamp(binlog.Name)
if err != nil {
return errors.Wrapf(err, "get first timestamp for %s", binlog.Name)
}

binlogName := fmt.Sprintf("binlog_%s_%x", binlogTmstmp, md5.Sum([]byte(set)))
binlogName := fmt.Sprintf("binlog_%s_%x", binlogTmstmp, md5.Sum([]byte(binlog.GTIDSet)))

var setBuffer bytes.Buffer
// no error handling because WriteString() always return nil error
// nolint:errcheck
setBuffer.WriteString(set)
setBuffer.WriteString(binlog.GTIDSet)

tmpDir := os.TempDir() + "/"

Expand Down Expand Up @@ -267,13 +335,13 @@ func (c *Collector) manageBinlog(binlog pxc.Binlog) (err error) {
}
// no error handling because WriteString() always return nil error
// nolint:errcheck
setBuffer.WriteString(set)
setBuffer.WriteString(binlog.GTIDSet)

err = c.storage.PutObject(lastSetFileName, &setBuffer, int64(setBuffer.Len()))
err = c.storage.PutObject(lastSetFilePrefix+strings.Split(binlog.GTIDSet, ":")[0], &setBuffer, int64(setBuffer.Len()))
if err != nil {
return errors.Wrap(err, "put last-set object")
}
c.lastSet = set
c.lastSet = binlog.GTIDSet

return nil
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/pitr/pxc/pxc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Binlog struct {
Name string
Size int64
Encrypted string
GTIDSet string
}

// GetBinLogList return binary log files list
Expand Down Expand Up @@ -144,15 +145,15 @@ func (p *PXC) GetBinLogName(gtidSet string) (string, error) {
return "", errors.Wrap(err, "create function")
}
}
var binlog string
var binlog sql.NullString
row := p.db.QueryRow("SELECT get_binlog_by_gtid_set(?)", gtidSet)

err = row.Scan(&binlog)
if err != nil {
return "", errors.Wrap(err, "scan binlog")
}

return strings.TrimPrefix(binlog, "./"), nil
return strings.TrimPrefix(binlog.String, "./"), nil
}

// GetBinLogFirstTimestamp return binary log file first timestamp
Expand Down
6 changes: 5 additions & 1 deletion cmd/pitr/recoverer/recoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ func (r *Recoverer) setBinlogs() error {
}
reverse(list)
binlogs := []string{}
sourceID := strings.Split(r.startGTID, ":")[0]
for _, binlog := range list {
if strings.Contains(binlog, "-gtid-set") {
continue
Expand All @@ -329,6 +330,9 @@ func (r *Recoverer) setBinlogs() error {
return errors.Wrapf(err, "read %s gtid-set object", binlog)
}
binlogGTIDSet := string(content)
if sourceID != strings.Split(binlogGTIDSet, ":")[0] {
continue
}
binlogs = append(binlogs, binlog)
subResult, err := r.db.SubtractGTIDSet(r.startGTID, binlogGTIDSet)
if err != nil {
Expand All @@ -339,7 +343,7 @@ func (r *Recoverer) setBinlogs() error {
}
}
if len(binlogs) == 0 {
return errors.Errorf("no objects for prefix %s", "binlog_")
return errors.Errorf("no objects for prefix binlog_ or with source_id=%s", sourceID)
}
reverse(binlogs)
r.binlogs = binlogs
Expand Down

0 comments on commit 6766b32

Please sign in to comment.