Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add full sync info in job progress #389

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 43 additions & 25 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,13 +523,15 @@ func (j *Job) partialSync() error {
if backupObject, ok := backupJobInfo.BackupObjects[table]; !ok {
return xerror.Errorf(xerror.Normal, "table %s not found in backup objects", table)
} else if backupObject.Id != tableId {
log.Warnf("partial sync table %s id not match, force full sync. table id %d, backup object id %d",
info := fmt.Sprintf("partial sync table %s id not match, force full sync. table id %d, backup object id %d",
table, tableId, backupObject.Id)
log.Warnf("%s", info)
if j.SyncType == TableSync {
log.Infof("reset src table id from %d to %d, table %s", j.Src.TableId, backupObject.Id, table)
info = fmt.Sprintf("partial sync table %s id not match, reset src table id from %d to %d, table %s, force full sync", table, j.Src.TableId, backupObject.Id, table)
log.Infof("%s", info)
j.Src.TableId = backupObject.Id
}
return j.newSnapshot(j.progress.CommitSeq)
return j.newSnapshot(j.progress.CommitSeq, info)
} else if _, ok := tableCommitSeqMap[backupObject.Id]; !ok {
return xerror.Errorf(xerror.Normal, "commit seq not found, table id %d, table name: %s", backupObject.Id, table)
}
Expand Down Expand Up @@ -762,7 +764,7 @@ func (j *Job) fullSync() error {
switch j.progress.SubSyncState {
case Done:
log.Infof("fullsync status: done")
if err := j.newSnapshot(j.progress.CommitSeq); err != nil {
if err := j.newSnapshot(j.progress.CommitSeq, ""); err != nil {
return err
}

Expand Down Expand Up @@ -849,10 +851,11 @@ func (j *Job) fullSync() error {

if snapshotResp.Status.GetStatusCode() == tstatus.TStatusCode_SNAPSHOT_NOT_EXIST ||
snapshotResp.Status.GetStatusCode() == tstatus.TStatusCode_SNAPSHOT_EXPIRED {
log.Warnf("get snapshot %s: %s (%s), retry with new full sync", snapshotName,
info := fmt.Sprintf("get snapshot %s: %s (%s), retry with new full sync", snapshotName,
utils.FirstOr(snapshotResp.Status.GetErrorMsgs(), "unknown"),
snapshotResp.Status.GetStatusCode())
return j.newSnapshot(j.progress.CommitSeq)
log.Warnf("%s", info)
return j.newSnapshot(j.progress.CommitSeq, info)
} else if snapshotResp.Status.GetStatusCode() != tstatus.TStatusCode_OK {
err = xerror.Errorf(xerror.FE, "get snapshot failed, status: %v", snapshotResp.Status)
return err
Expand Down Expand Up @@ -890,10 +893,11 @@ func (j *Job) fullSync() error {
return xerror.Errorf(xerror.Normal, "table %s not found in backup objects", j.Src.Table)
} else if backupObject.Id != j.Src.TableId {
// Might be the table has been replace.
log.Warnf("full sync table %s id not match, force full sync and reset table id from %d to %d",
info := fmt.Sprintf("full sync table %s id not match, force full sync. table id %d, backup object id %d",
j.Src.Table, j.Src.TableId, backupObject.Id)
log.Warnf("%s", info)
j.Src.TableId = backupObject.Id
return j.newSnapshot(j.progress.CommitSeq)
return j.newSnapshot(j.progress.CommitSeq, info)
} else if _, ok := tableCommitSeqMap[j.Src.TableId]; !ok {
return xerror.Errorf(xerror.Normal, "table id %d, commit seq not found", j.Src.TableId)
}
Expand Down Expand Up @@ -1064,11 +1068,12 @@ func (j *Job) fullSync() error {
snapshotResp := inMemoryData.SnapshotResp

if snapshotResp.GetExpiredAt() > 0 && time.Now().UnixMilli() > snapshotResp.GetExpiredAt() {
log.Infof("fullsync snapshot %s is expired, cancel and retry with new full sync", restoreSnapshotName)
info := fmt.Sprintf("snapshot %s is expired, cancel and retry with new full sync", restoreSnapshotName)
log.Infof("%s", info)
if err := j.IDest.CancelRestoreIfExists(restoreSnapshotName); err != nil {
return err
}
return j.newSnapshot(j.progress.CommitSeq)
return j.newSnapshot(j.progress.CommitSeq, info)
}

for {
Expand Down Expand Up @@ -2081,9 +2086,10 @@ func (j *Job) handleDropTable(binlog *festruct.TBinlog) error {
func (j *Job) handleDummy(binlog *festruct.TBinlog) error {
dummyCommitSeq := binlog.GetCommitSeq()

log.Infof("handle dummy binlog, need full sync. SyncType: %v, seq: %v", j.SyncType, dummyCommitSeq)
info := fmt.Sprintf("handle dummy binlog, need full sync. SyncType: %v, seq: %v", j.SyncType, dummyCommitSeq)
log.Infof("%s", info)

return j.newSnapshot(dummyCommitSeq)
return j.newSnapshot(dummyCommitSeq, info)
}

func (j *Job) handleModifyProperty(binlog *festruct.TBinlog) error {
Expand Down Expand Up @@ -2235,7 +2241,9 @@ func (j *Job) handleSchemaChange(alterJob *record.AlterJobV2) error {
}
}

return j.newSnapshot(j.progress.CommitSeq)
info := fmt.Sprintf("handle schema change job, need full sync, Table: %v", alterJob.TableName)

return j.newSnapshot(j.progress.CommitSeq, info)
}

// handleLightningSchemaChange
Expand Down Expand Up @@ -2369,13 +2377,15 @@ func (j *Job) handleReplacePartitions(binlog *festruct.TBinlog) error {
}

if !replacePartition.StrictRange {
log.Warnf("replacing partitions with non strict range is not supported yet, replace partition record: %s", string(data))
return j.newSnapshot(j.progress.CommitSeq)
info := fmt.Sprintf("replace partitions with non strict range is not supported yet, replace partition record: %s", string(data))
log.Warnf("%s", info)
return j.newSnapshot(j.progress.CommitSeq, info)
}

if replacePartition.UseTempName {
log.Warnf("replacing partitions with use tmp name is not supported yet, replace partition record: %s", string(data))
return j.newSnapshot(j.progress.CommitSeq)
info := fmt.Sprintf("replace partitions with use tmp name is not supported yet, replace partition record: %s", string(data))
log.Warnf("%s", info)
return j.newSnapshot(j.progress.CommitSeq, info)
}

oldPartitions := strings.Join(replacePartition.Partitions, ",")
Expand Down Expand Up @@ -2472,10 +2482,11 @@ func (j *Job) handleReplaceTable(binlog *festruct.TBinlog) error {

func (j *Job) handleReplaceTableRecord(commitSeq int64, record *record.ReplaceTableRecord) error {
if j.SyncType == TableSync {
log.Infof("replace table %s with fullsync in table sync, reset src table id from %d to %d, swap: %t",
info := fmt.Sprintf("replace table %s with fullsync in table sync, reset src table id from %d to %d, swap: %t",
record.OriginTableName, record.OriginTableId, record.NewTableId, record.SwapTable)
log.Infof("%s", info)
j.Src.TableId = record.NewTableId
return j.newSnapshot(commitSeq)
return j.newSnapshot(commitSeq, info)
}

if isAsyncMv, err := j.isMaterializedViewTable(record.OriginTableId); err != nil {
Expand Down Expand Up @@ -3032,8 +3043,9 @@ func (j *Job) incrementalSync() error {

// Force fullsync unconditionally
if j.Extra.SkipBinlog && j.Extra.SkipBy == SkipByFullSync {
log.Warnf("skip binlog via fullsync by user, commit seq %d", j.progress.CommitSeq)
return j.newSnapshot(j.progress.CommitSeq)
info := fmt.Sprintf("skip binlog via fullsync by user, commit seq %d", j.progress.CommitSeq)
log.Warnf("%s", info)
return j.newSnapshot(j.progress.CommitSeq, info)
}

// Step 1: get binlog
Expand Down Expand Up @@ -3207,8 +3219,9 @@ func (j *Job) handleError(err error) error {
}

if xerr.Category() == xerror.Meta {
log.Warnf("receive meta category error, make new snapshot, job: %s, err: %v", j.Name, err)
_ = j.newSnapshot(j.progress.CommitSeq)
info := fmt.Sprintf("receive meta category error, make new snapshot, job: %s, err: %v", j.Name, err)
log.Warnf("%s", info)
_ = j.newSnapshot(j.progress.CommitSeq, info)
}
return nil
}
Expand Down Expand Up @@ -3253,9 +3266,13 @@ func (j *Job) run() {
}
}

func (j *Job) newSnapshot(commitSeq int64) error {
func (j *Job) newSnapshot(commitSeq int64, fullSyncInfo string) error {
log.Infof("new snapshot, commitSeq: %d", commitSeq)

if fullSyncInfo != "" {
j.progress.SetFullSyncInfo(fullSyncInfo)
}

j.progress.PartialSyncData = nil
j.progress.TableAliases = nil
j.progress.SyncId += 1
Expand Down Expand Up @@ -3352,7 +3369,8 @@ func (j *Job) Run() error {
}
} else {
j.progress = NewJobProgress(j.Name, j.SyncType, j.db)
if err := j.newSnapshot(0); err != nil {
info := fmt.Sprintf("new job, job: %s, sync type: %v", j.Name, j.SyncType)
if err := j.newSnapshot(0, info); err != nil {
return err
}
}
Expand Down
25 changes: 20 additions & 5 deletions pkg/ccr/job_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,19 @@ type JobProgress struct {
ShadowIndexes map[int64]int64 `json:"shadow_index_map,omitempty"`

// Some fields to save the unix epoch time of the key timepoint.
CreatedAt int64 `json:"created_at,omitempty"`
FullSyncStartAt int64 `json:"full_sync_start_at,omitempty"`
PartialSyncStartAt int64 `json:"partial_sync_start_at,omitempty"`
IncrementalSyncStartAt int64 `json:"incremental_sync_start_at,omitempty"`
IngestBinlogAt int64 `json:"ingest_binlog_at,omitempty"`
CreatedAt int64 `json:"created_at,omitempty"`
FullSyncStartAt int64 `json:"full_sync_start_at,omitempty"`
PartialSyncStartAt int64 `json:"partial_sync_start_at,omitempty"`
IncrementalSyncStartAt int64 `json:"incremental_sync_start_at,omitempty"`
IngestBinlogAt int64 `json:"ingest_binlog_at,omitempty"`
FullSyncInfo FullSyncInfo `json:"full_sync_info,omitempty"`
}

type FullSyncInfo struct {
PrevCommitSeq int64 `json:"prev_commit_seq"`
CommitSeq int64 `json:"commit_seq"`
SubSyncState SubSyncState `json:"sub_sync_state"`
Info string `json:"info"`
}

func (j *JobProgress) String() string {
Expand Down Expand Up @@ -409,3 +417,10 @@ func (j *JobProgress) Persist() {
log.Tracef("update job progress done, state: %s, subState: %s, commitSeq: %d, prevCommitSeq: %d",
j.SyncState, j.SubSyncState, j.CommitSeq, j.PrevCommitSeq)
}

func (j *JobProgress) SetFullSyncInfo(info string) {
j.FullSyncInfo.Info = info
j.FullSyncInfo.CommitSeq = j.CommitSeq
j.FullSyncInfo.PrevCommitSeq = j.PrevCommitSeq
j.FullSyncInfo.SubSyncState = j.SubSyncState
}
Loading