Skip to content

Commit

Permalink
Merge pull request apache#470 from mlycore/pretty-print
Browse files Browse the repository at this point in the history
refactor: add PrettyPrinter to print backup progress
  • Loading branch information
tristaZero authored Nov 20, 2023
2 parents fcd8dcc + f64ab8e commit bb1f000
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 65 deletions.
107 changes: 54 additions & 53 deletions pitr/cli/internal/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,20 +345,35 @@ func checkBackupStatus(lsBackup *model.LsBackup) model.BackupStatus {
dataNodeMap[dn.IP] = dn
}

pw := prettyoutput.NewPW(totalNum)
pw := prettyoutput.NewProgressPrinter(prettyoutput.ProgressPrintOption{
NumTrackersExpected: totalNum,
})

go pw.Render()
for idx := 0; idx < totalNum; idx++ {
sn := lsBackup.SsBackup.StorageNodes[idx]

for i := 0; i < totalNum; i++ {
sn := lsBackup.SsBackup.StorageNodes[i]
as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort))
dn := dataNodeMap[sn.IP]
go checkStatus(as, sn, dn, dnCh, pw)
backupInfo := &model.BackupInfo{}
task := &backuptask{
As: as,
Sn: sn,
Dn: dn,
DnCh: dnCh,
Backup: backupInfo,
retries: defaultShowDetailRetryTimes,
}
tracker := &progress.Tracker{
Message: fmt.Sprintf("Checking backup status # %s:%d", sn.IP, sn.Port),
Total: 0,
Units: progress.UnitsDefault,
}
pw.AppendTracker(tracker)
go pw.UpdateProgress(tracker, task.checkProgress)
}

// wait for all data node backup finished
time.Sleep(time.Millisecond * 100)
for pw.IsRenderInProgress() {
time.Sleep(time.Millisecond * 100)
}
pw.BlockedRendered()

close(dnCh)

Expand Down Expand Up @@ -388,62 +403,48 @@ func checkBackupStatus(lsBackup *model.LsBackup) model.BackupStatus {
return backupFinalStatus
}

func checkStatus(as pkg.IAgentServer, sn *model.StorageNode, dn *model.DataNode, dnCh chan *model.DataNode, pw progress.Writer) {
var (
// mark check status is done, time ticker should break.
done = make(chan struct{})
// time ticker, try to doCheck request every 2 seconds.
ticker = time.Tick(time.Second * 2)
// progress bar.
tracker = progress.Tracker{Message: fmt.Sprintf("Checking backup status # %s:%d", sn.IP, sn.Port), Total: 0, Units: progress.UnitsDefault}
)

pw.AppendTracker(&tracker)
type backuptask struct {
As pkg.IAgentServer
Sn *model.StorageNode
Dn *model.DataNode
DnCh chan *model.DataNode

for !tracker.IsDone() {
select {
case <-done:
return
case <-ticker:
status, err := doCheck(as, sn, dn.BackupID, defaultShowDetailRetryTimes)
if err != nil {
tracker.MarkAsErrored()
dn.Status = status
dn.EndTime = timeutil.Now().String()
dnCh <- dn
done <- struct{}{}
}
if status == model.SsBackupStatusCompleted || status == model.SsBackupStatusFailed {
tracker.MarkAsDone()
dn.Status = status
dn.EndTime = timeutil.Now().String()
dnCh <- dn
done <- struct{}{}
}
}
}
Backup *model.BackupInfo
retries int
}

func doCheck(as pkg.IAgentServer, sn *model.StorageNode, backupID string, retries int) (status model.BackupStatus, err error) {
func (t *backuptask) checkProgress() (bool, error) {
var err error
in := &model.ShowDetailIn{
DBPort: sn.Port,
DBName: sn.Database,
Username: sn.Username,
Password: sn.Password,
DnBackupID: backupID,
DBPort: t.Sn.Port,
DBName: t.Sn.Database,
Username: t.Sn.Username,
Password: t.Sn.Password,
DnBackupID: t.Dn.BackupID,
DnBackupPath: BackupPath,
Instance: defaultInstance,
}
backupInfo, err := as.ShowDetail(in)

t.Backup, err = t.As.ShowDetail(in)
if err != nil {
if retries == 0 {
return model.SsBackupStatusCheckError, err
if t.retries == 0 {
t.Dn.Status = model.SsBackupStatusCheckError
t.DnCh <- t.Dn
return false, err
}
time.Sleep(time.Second * 1)
return doCheck(as, sn, backupID, retries-1)
t.retries--
return t.checkProgress()
}

return backupInfo.Status, nil
t.Dn.Status = t.Backup.Status
t.Dn.EndTime = timeutil.Now().String()

if t.Backup.Status == model.SsBackupStatusCompleted || t.Backup.Status == model.SsBackupStatusFailed {
t.DnCh <- t.Dn
return true, nil
}
return false, nil
}

type deleteMode int
Expand Down
43 changes: 32 additions & 11 deletions pitr/cli/internal/cmd/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,42 +41,63 @@ var _ = Describe("Backup", func() {
sn = &model.StorageNode{
IP: "127.0.0.1",
}

task = &backuptask{}
)
BeforeEach(func() {
ctrl = gomock.NewController(GinkgoT())
as = mock_pkg.NewMockIAgentServer(ctrl)
task = &backuptask{
As: as,
Sn: sn,
Dn: &model.DataNode{},
DnCh: make(chan *model.DataNode, 2),

Backup: &model.BackupInfo{},
}
})
AfterEach(func() {
ctrl.Finish()
})

It("agent server return err", func() {
as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("timeout"))
status, err := doCheck(as, sn, "", 0)
It("mock agent server return err", func() {
as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("mock agent timeout"))

finished, err := task.checkProgress()
Expect(err).To(HaveOccurred())
Expect(status).To(Equal(model.SsBackupStatusCheckError))
Expect(finished).To(BeFalse())
Expect(task.Dn.Status).To(Equal(model.SsBackupStatusCheckError))
})

It("mock agent server and return failed status", func() {
as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusFailed}, nil)
status, err := doCheck(as, sn, "", 0)
finished, err := task.checkProgress()
Expect(err).ToNot(HaveOccurred())
Expect(status).To(Equal(model.SsBackupStatusFailed))
Expect(finished).To(BeTrue())
Expect(task.Backup.Status).To(Equal(model.SsBackupStatusFailed))
})

It("mock agent server and return completed status", func() {
as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil)
status, err := doCheck(as, sn, "", 0)

finished, err := task.checkProgress()
Expect(err).ToNot(HaveOccurred())
Expect(status).To(Equal(model.SsBackupStatusCompleted))
Expect(finished).To(BeTrue())
Expect(task.Backup.Status).To(Equal(model.SsBackupStatusCompleted))
})

It("mock agent server and return check err first time and then success", func() {
as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("timeout"))
as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("mock agent timeout"))
finished, err := task.checkProgress()
Expect(err).To(HaveOccurred())
Expect(finished).To(BeFalse())
Expect(task.Dn.Status).To(Equal(model.SsBackupStatusCheckError))

as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil)
status, err := doCheck(as, sn, "", 1)
finished, err = task.checkProgress()
Expect(err).ToNot(HaveOccurred())
Expect(status).To(Equal(model.SsBackupStatusCompleted))
Expect(finished).To(BeTrue())
Expect(task.Backup.Status).To(Equal(model.SsBackupStatusCompleted))
})
})

Expand Down
67 changes: 66 additions & 1 deletion pitr/cli/pkg/prettyoutput/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,83 @@
package prettyoutput

import (
"time"

"github.com/jedib0t/go-pretty/v6/progress"
)

func NewPW(totalNum int) progress.Writer {
pw := progress.NewWriter()

pw.SetTrackerLength(25)
pw.SetAutoStop(true)
pw.SetNumTrackersExpected(totalNum)
pw.SetSortBy(progress.SortByPercentDsc)
pw.SetTrackerPosition(progress.PositionRight)

style := progress.StyleDefault
style.Options.PercentIndeterminate = "running"
pw.SetStyle(style)
pw.SetTrackerPosition(progress.PositionRight)

return pw
}

type ProgressPrintOption struct {
NumTrackersExpected int
}

type ProgressPrinter struct {
progress.Writer
}

func NewProgressPrinter(opt ProgressPrintOption) *ProgressPrinter {
p := &ProgressPrinter{
Writer: progress.NewWriter(),
}

// passed printer options
p.SetNumTrackersExpected(opt.NumTrackersExpected)

// default printer options
p.SetTrackerLength(25)
p.SetAutoStop(true)
p.SetSortBy(progress.SortByPercentDsc)
p.SetTrackerPosition(progress.PositionRight)
style := progress.StyleDefault
style.Options.PercentIndeterminate = "running"
p.SetStyle(style)

return p
}

func (p *ProgressPrinter) BlockedRendered() {
time.Sleep(time.Millisecond * 100)
for p.IsRenderInProgress() {
time.Sleep(time.Millisecond * 100)
}
}

func (p *ProgressPrinter) UpdateProgress(tracker *progress.Tracker, updateF func() (bool, error)) {
var (
done = make(chan struct{})
ticker = time.NewTicker(time.Second * 2)
)

for !tracker.IsDone() {
select {
case <-done:
return
case <-ticker.C:
finished, err := updateF()
if err != nil {
tracker.MarkAsErrored()
done <- struct{}{}
}

if finished {
tracker.MarkAsDone()
done <- struct{}{}
}
}
}
}

0 comments on commit bb1f000

Please sign in to comment.