From fb700d9e957aeb3c52af1c453d7ec3b9255c3461 Mon Sep 17 00:00:00 2001 From: Wenbin1002 Date: Mon, 5 Aug 2024 16:36:22 +0800 Subject: [PATCH] refactor --- pkg/vm/engine/tae/rpc/tool.go | 84 +++++++++++++++++------------------ 1 file changed, 40 insertions(+), 44 deletions(-) diff --git a/pkg/vm/engine/tae/rpc/tool.go b/pkg/vm/engine/tae/rpc/tool.go index 556a9dad6206..0d965d3e3662 100644 --- a/pkg/vm/engine/tae/rpc/tool.go +++ b/pkg/vm/engine/tae/rpc/tool.go @@ -1001,8 +1001,8 @@ type ckpStatArg struct { cid string tid uint64 limit int + all bool - all, latest bool path, name, res string } @@ -1016,11 +1016,10 @@ func (c *ckpStatArg) PrepareCommand() *cobra.Command { ckpStatCmd.SetUsageTemplate(c.Usage()) - ckpStatCmd.Flags().StringP("cid", "c", "", "checkpoint lsn") + ckpStatCmd.Flags().StringP("cid", "c", "", "checkpoint end ts") ckpStatCmd.Flags().IntP("tid", "t", invalidId, "checkpoint tid") ckpStatCmd.Flags().IntP("limit", "l", invalidLimit, "checkpoint limit") ckpStatCmd.Flags().BoolP("all", "a", false, "checkpoint all tables") - ckpStatCmd.Flags().Bool("latest", false, "checkpoint latest tables") ckpStatCmd.Flags().StringP("name", "n", "", "checkpoint name") return ckpStatCmd @@ -1032,7 +1031,6 @@ func (c *ckpStatArg) FromCommand(cmd *cobra.Command) (err error) { c.tid = uint64(id) c.all, _ = cmd.Flags().GetBool("all") c.limit, _ = cmd.Flags().GetInt("limit") - c.latest, _ = cmd.Flags().GetBool("latest") dir, _ := cmd.Flags().GetString("name") c.path, c.name = filepath.Split(dir) if cmd.Flag("ictx") != nil { @@ -1069,16 +1067,9 @@ func (c *ckpStatArg) Usage() (res string) { func (c *ckpStatArg) Run() (err error) { ctx := context.Background() checkpointJson := logtail.ObjectInfoJson{} - var entries []*checkpoint.CheckpointEntry - if c.ctx == nil { - entries, err = readCkpFromDisk(ctx, c.path, c.name) - } else { - if c.latest { - entries = c.ctx.db.BGCheckpointRunner.GetAllCheckpoints() - } else { - entries = c.ctx.db.BGCheckpointRunner.GetAllGlobalCheckpoints() - entries = append(entries, c.ctx.db.BGCheckpointRunner.GetAllIncrementalCheckpoints()...) - } + entries, err := getCkpEntries(ctx, c.ctx, c.path, c.name, false) + if err != nil { + return moerr.NewInfoNoCtx(fmt.Sprintf("failed to get ckp entries %s", err)) } tables := make(map[uint64]*logtail.TableInfoJson) tableins := make(map[uint64]uint64) @@ -1086,7 +1077,7 @@ func (c *ckpStatArg) Run() (err error) { locations := make([]objectio.Location, 0, len(entries)) versions := make([]uint32, 0, len(entries)) for _, entry := range entries { - if c.latest || entry.GetEnd().ToString() == c.cid { + if c.cid == "" || entry.GetEnd().ToString() == c.cid { var data *logtail.CheckpointData var Fs *objectio.ObjectFS if c.ctx == nil { @@ -1180,6 +1171,33 @@ func (c *ckpStatArg) Run() (err error) { return } +func readCkpFromDisk(ctx context.Context, path, name string) ([]*checkpoint.CheckpointEntry, error) { + fs, err := initFs(ctx, path, true) + if err != nil { + return nil, err + } + reader, err := InitReader(fs, name) + if err != nil { + return nil, err + } + return checkpoint.ReplayCheckpointEntry(ctx, reader) +} + +func getCkpEntries(ctx context.Context, context *inspectContext, path, name string, all bool) (entries []*checkpoint.CheckpointEntry, err error) { + if context == nil { + entries, err = readCkpFromDisk(ctx, path, name) + } else { + if all { + entries = context.db.BGCheckpointRunner.GetAllGlobalCheckpoints() + entries = append(entries, context.db.BGCheckpointRunner.GetAllIncrementalCheckpoints()...) + } else { + entries = context.db.BGCheckpointRunner.GetAllCheckpoints() + } + } + + return +} + func getCkpData( ctx context.Context, entry *checkpoint.CheckpointEntry, @@ -1289,6 +1307,7 @@ func (c *ckpListArg) Run() (err error) { return moerr.NewInfoNoCtx(fmt.Sprintf("failed to download entries, err %v", err)) } c.res = fmt.Sprintf("downloaded %d entries", cnt) + return nil } if c.cid == "" { @@ -1302,7 +1321,7 @@ func (c *ckpListArg) Run() (err error) { func (c *ckpListArg) getCkpList() (res string, err error) { ctx := context.Background() - entries, err := c.getCkpEntries(ctx) + entries, err := getCkpEntries(ctx, c.ctx, c.path, c.name, c.all) if err != nil { return "", err } @@ -1328,6 +1347,10 @@ func (c *ckpListArg) getCkpList() (res string, err error) { }) } + sort.Slice(ckpEntries, func(i, j int) bool { + return ckpEntries[i].End < ckpEntries[j].End + }) + var json = jsoniter.ConfigCompatibleWithStandardLibrary ckpEntriesJson := CkpEntries{ Count: len(ckpEntries), @@ -1342,33 +1365,6 @@ func (c *ckpListArg) getCkpList() (res string, err error) { return } -func readCkpFromDisk(ctx context.Context, path, name string) ([]*checkpoint.CheckpointEntry, error) { - fs, err := initFs(ctx, path, true) - if err != nil { - return nil, err - } - reader, err := InitReader(fs, name) - if err != nil { - return nil, err - } - return checkpoint.ReplayCheckpointEntry(ctx, reader) -} - -func (c *ckpListArg) getCkpEntries(ctx context.Context) (entries []*checkpoint.CheckpointEntry, err error) { - if c.ctx == nil { - entries, err = readCkpFromDisk(ctx, c.path, c.name) - } else { - if c.all { - entries = c.ctx.db.BGCheckpointRunner.GetAllGlobalCheckpoints() - entries = append(entries, c.ctx.db.BGCheckpointRunner.GetAllIncrementalCheckpoints()...) - } else { - entries = c.ctx.db.BGCheckpointRunner.GetAllCheckpoints() - } - } - - return -} - type TableIds struct { TableCnt int `json:"table_count"` Ids []uint64 `json:"tables"` @@ -1376,7 +1372,7 @@ type TableIds struct { func (c *ckpListArg) getTableList(ctx context.Context) (res string, err error) { c.all = true - entries, err := c.getCkpEntries(ctx) + entries, err := getCkpEntries(ctx, c.ctx, c.path, c.name, c.all) if err != nil { return "", err }