Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenbin1002 committed Aug 5, 2024
1 parent 717c1c6 commit fb700d9
Showing 1 changed file with 40 additions and 44 deletions.
84 changes: 40 additions & 44 deletions pkg/vm/engine/tae/rpc/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,8 +1001,8 @@ type ckpStatArg struct {
cid string
tid uint64
limit int
all bool

all, latest bool
path, name, res string
}

Expand All @@ -1016,11 +1016,10 @@ func (c *ckpStatArg) PrepareCommand() *cobra.Command {

ckpStatCmd.SetUsageTemplate(c.Usage())

ckpStatCmd.Flags().StringP("cid", "c", "", "checkpoint lsn")
ckpStatCmd.Flags().StringP("cid", "c", "", "checkpoint end ts")
ckpStatCmd.Flags().IntP("tid", "t", invalidId, "checkpoint tid")
ckpStatCmd.Flags().IntP("limit", "l", invalidLimit, "checkpoint limit")
ckpStatCmd.Flags().BoolP("all", "a", false, "checkpoint all tables")
ckpStatCmd.Flags().Bool("latest", false, "checkpoint latest tables")
ckpStatCmd.Flags().StringP("name", "n", "", "checkpoint name")

return ckpStatCmd
Expand All @@ -1032,7 +1031,6 @@ func (c *ckpStatArg) FromCommand(cmd *cobra.Command) (err error) {
c.tid = uint64(id)
c.all, _ = cmd.Flags().GetBool("all")
c.limit, _ = cmd.Flags().GetInt("limit")
c.latest, _ = cmd.Flags().GetBool("latest")
dir, _ := cmd.Flags().GetString("name")
c.path, c.name = filepath.Split(dir)
if cmd.Flag("ictx") != nil {
Expand Down Expand Up @@ -1069,24 +1067,17 @@ func (c *ckpStatArg) Usage() (res string) {
func (c *ckpStatArg) Run() (err error) {
ctx := context.Background()
checkpointJson := logtail.ObjectInfoJson{}
var entries []*checkpoint.CheckpointEntry
if c.ctx == nil {
entries, err = readCkpFromDisk(ctx, c.path, c.name)
} else {
if c.latest {
entries = c.ctx.db.BGCheckpointRunner.GetAllCheckpoints()
} else {
entries = c.ctx.db.BGCheckpointRunner.GetAllGlobalCheckpoints()
entries = append(entries, c.ctx.db.BGCheckpointRunner.GetAllIncrementalCheckpoints()...)
}
entries, err := getCkpEntries(ctx, c.ctx, c.path, c.name, false)
if err != nil {
return moerr.NewInfoNoCtx(fmt.Sprintf("failed to get ckp entries %s", err))
}
tables := make(map[uint64]*logtail.TableInfoJson)
tableins := make(map[uint64]uint64)
tabledel := make(map[uint64]uint64)
locations := make([]objectio.Location, 0, len(entries))
versions := make([]uint32, 0, len(entries))
for _, entry := range entries {
if c.latest || entry.GetEnd().ToString() == c.cid {
if c.cid == "" || entry.GetEnd().ToString() == c.cid {
var data *logtail.CheckpointData
var Fs *objectio.ObjectFS
if c.ctx == nil {
Expand Down Expand Up @@ -1180,6 +1171,33 @@ func (c *ckpStatArg) Run() (err error) {
return
}

func readCkpFromDisk(ctx context.Context, path, name string) ([]*checkpoint.CheckpointEntry, error) {
fs, err := initFs(ctx, path, true)
if err != nil {
return nil, err
}
reader, err := InitReader(fs, name)
if err != nil {
return nil, err
}
return checkpoint.ReplayCheckpointEntry(ctx, reader)
}

func getCkpEntries(ctx context.Context, context *inspectContext, path, name string, all bool) (entries []*checkpoint.CheckpointEntry, err error) {
if context == nil {
entries, err = readCkpFromDisk(ctx, path, name)
} else {
if all {
entries = context.db.BGCheckpointRunner.GetAllGlobalCheckpoints()
entries = append(entries, context.db.BGCheckpointRunner.GetAllIncrementalCheckpoints()...)
} else {
entries = context.db.BGCheckpointRunner.GetAllCheckpoints()
}
}

return
}

func getCkpData(
ctx context.Context,
entry *checkpoint.CheckpointEntry,
Expand Down Expand Up @@ -1289,6 +1307,7 @@ func (c *ckpListArg) Run() (err error) {
return moerr.NewInfoNoCtx(fmt.Sprintf("failed to download entries, err %v", err))
}
c.res = fmt.Sprintf("downloaded %d entries", cnt)
return nil
}

if c.cid == "" {
Expand All @@ -1302,7 +1321,7 @@ func (c *ckpListArg) Run() (err error) {

func (c *ckpListArg) getCkpList() (res string, err error) {
ctx := context.Background()
entries, err := c.getCkpEntries(ctx)
entries, err := getCkpEntries(ctx, c.ctx, c.path, c.name, c.all)
if err != nil {
return "", err
}
Expand All @@ -1328,6 +1347,10 @@ func (c *ckpListArg) getCkpList() (res string, err error) {
})
}

sort.Slice(ckpEntries, func(i, j int) bool {
return ckpEntries[i].End < ckpEntries[j].End
})

var json = jsoniter.ConfigCompatibleWithStandardLibrary
ckpEntriesJson := CkpEntries{
Count: len(ckpEntries),
Expand All @@ -1342,41 +1365,14 @@ func (c *ckpListArg) getCkpList() (res string, err error) {
return
}

func readCkpFromDisk(ctx context.Context, path, name string) ([]*checkpoint.CheckpointEntry, error) {
fs, err := initFs(ctx, path, true)
if err != nil {
return nil, err
}
reader, err := InitReader(fs, name)
if err != nil {
return nil, err
}
return checkpoint.ReplayCheckpointEntry(ctx, reader)
}

func (c *ckpListArg) getCkpEntries(ctx context.Context) (entries []*checkpoint.CheckpointEntry, err error) {
if c.ctx == nil {
entries, err = readCkpFromDisk(ctx, c.path, c.name)
} else {
if c.all {
entries = c.ctx.db.BGCheckpointRunner.GetAllGlobalCheckpoints()
entries = append(entries, c.ctx.db.BGCheckpointRunner.GetAllIncrementalCheckpoints()...)
} else {
entries = c.ctx.db.BGCheckpointRunner.GetAllCheckpoints()
}
}

return
}

type TableIds struct {
TableCnt int `json:"table_count"`
Ids []uint64 `json:"tables"`
}

func (c *ckpListArg) getTableList(ctx context.Context) (res string, err error) {
c.all = true
entries, err := c.getCkpEntries(ctx)
entries, err := getCkpEntries(ctx, c.ctx, c.path, c.name, c.all)
if err != nil {
return "", err
}
Expand Down

0 comments on commit fb700d9

Please sign in to comment.