Skip to content

Commit

Permalink
feat: gc for bucket migration & gc for ZombiePiece & meta
Browse files Browse the repository at this point in the history
feat: format code
  • Loading branch information
jingjunLi committed Nov 1, 2023
1 parent b4b8383 commit 1188bae
Show file tree
Hide file tree
Showing 52 changed files with 2,942 additions and 763 deletions.
30 changes: 16 additions & 14 deletions base/gfspapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,22 @@ type GfSpBaseApp struct {
replicateSpeed int64
receiveSpeed int64

sealObjectTimeout int64
gcObjectTimeout int64
gcZombieTimeout int64
gcMetaTimeout int64
migrateGVGTimeout int64

sealObjectRetry int64
replicateRetry int64
receiveConfirmRetry int64
gcObjectRetry int64
gcZombieRetry int64
gcMetaRetry int64
recoveryRetry int64
migrateGVGRetry int64
sealObjectTimeout int64
gcObjectTimeout int64
gcZombieTimeout int64
gcMetaTimeout int64
migrateGVGTimeout int64
gcBucketMigrationTimeout int64

sealObjectRetry int64
replicateRetry int64
receiveConfirmRetry int64
gcObjectRetry int64
gcZombieRetry int64
gcMetaRetry int64
recoveryRetry int64
migrateGVGRetry int64
gcBucketMigrationRetry int64
}

// AppID returns the GfSpBaseApp ID, the default value is prefix(gfsp) add
Expand Down
1 change: 1 addition & 0 deletions base/gfspapp/app_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func defaultGfSpDB(cfg *config.SQLDBConfig) {
if cfg.MaxOpenConns == 0 {
cfg.MaxOpenConns = sqldb.DefaultMaxOpenConns
}
cfg.EnableTracePutEvent = sqldb.DefaultEnableTracePutEvent
if cfg.User == "" {
cfg.User = "root"
}
Expand Down
13 changes: 12 additions & 1 deletion base/gfspapp/manage_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func (g *GfSpBaseApp) GfSpAskTask(ctx context.Context, req *gfspserver.GfSpAskTa
resp.Response = &gfspserver.GfSpAskTaskResponse_MigrateGvgTask{
MigrateGvgTask: t,
}
case *gfsptask.GfSpGCBucketMigrationTask:
resp.Response = &gfspserver.GfSpAskTaskResponse_GcBucketMigrationTask{
GcBucketMigrationTask: t,
}
default:
log.CtxErrorw(ctx, "[BUG] Unsupported task type to dispatch")
return &gfspserver.GfSpAskTaskResponse{Err: ErrUnsupportedTaskType}, nil
Expand Down Expand Up @@ -300,6 +304,12 @@ func (g *GfSpBaseApp) GfSpReportTask(ctx context.Context, req *gfspserver.GfSpRe
task.SetAddress(util.GetRPCRemoteAddress(ctx))
log.CtxInfow(ctx, "begin to handle reported migrate gvg task", "task_info", task.Info())
err = g.manager.HandleMigrateGVGTask(ctx, t.MigrateGvgTask)
case *gfspserver.GfSpReportTaskRequest_GcBucketMigrationTask:
task := t.GcBucketMigrationTask
ctx = log.WithValue(ctx, log.CtxKeyTask, task.Key().String())
task.SetAddress(util.GetRPCRemoteAddress(ctx))
log.CtxInfow(ctx, "begin to handle reported gc bucket migration task", "task_info", task.Info())
err = g.manager.HandleGCBucketMigrationTask(ctx, t.GcBucketMigrationTask)
default:
log.CtxError(ctx, "receive unsupported task type")
return &gfspserver.GfSpReportTaskResponse{Err: ErrUnsupportedTaskType}, nil
Expand Down Expand Up @@ -358,9 +368,10 @@ func (g *GfSpBaseApp) GfSpNotifyPreMigrate(ctx context.Context, req *gfspserver.

return &gfspserver.GfSpNotifyPreMigrateBucketResponse{}, nil
}

func (g *GfSpBaseApp) GfSpNotifyPostMigrate(ctx context.Context, req *gfspserver.GfSpNotifyPostMigrateBucketRequest) (
*gfspserver.GfSpNotifyPostMigrateBucketResponse, error) {
if err := g.manager.NotifyPostMigrateBucket(ctx, req.GetBucketId()); err != nil {
if err := g.manager.NotifyPostMigrateBucket(ctx, req.GetBucketMigrationInfo()); err != nil {
return nil, err
}

Expand Down
30 changes: 28 additions & 2 deletions base/gfspapp/task_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ const (
MinMigrateGVGTime int64 = 1800 // 0.5 hour
// MaxMigrateGVGTime defines the max timeout to migrate gvg.
MaxMigrateGVGTime int64 = 3600 // 1 hour
// MinGCBucketMigrationTime defines the min timeout to gc bucket migration.
MinGCBucketMigrationTime int64 = 300 // 0.5 hour
// MaxGCBucketMigrationTime defines the max timeout to gc bucket migration.
MaxGCBucketMigrationTime int64 = 600 // 1 hour

// NotUseRetry defines the default task max retry.
NotUseRetry int64 = 0
Expand Down Expand Up @@ -78,6 +82,10 @@ const (
MinMigrateGVGRetry = 2
// MaxMigrateGVGRetry defines the max retry number to migrate gvg.
MaxMigrateGVGRetry = 3
// MinGCBucketMigrationRetry defines the min retry number to gc bucket migration.
MinGCBucketMigrationRetry = 3
// MaxBucketMigrationRetry defines the max retry number to gc bucket migration.
MaxBucketMigrationRetry = 5
)

// TaskTimeout returns the task timeout by task type and some task need payload size
Expand Down Expand Up @@ -184,6 +192,14 @@ func (g *GfSpBaseApp) TaskTimeout(task coretask.Task, size uint64) int64 {
return MaxMigrateGVGTime
}
return g.migrateGVGTimeout
case coretask.TypeTaskGCBucketMigration:
if g.gcBucketMigrationTimeout < MinGCBucketMigrationTime {
return MinGCBucketMigrationTime
}
if g.gcBucketMigrationTimeout > MaxGCBucketMigrationTime {
return MaxGCBucketMigrationTime
}
return g.gcBucketMigrationTimeout
}
return NotUseTimeout
}
Expand Down Expand Up @@ -267,6 +283,14 @@ func (g *GfSpBaseApp) TaskMaxRetry(task coretask.Task) int64 {
return MaxMigrateGVGRetry
}
return g.migrateGVGRetry
case coretask.TypeTaskGCBucketMigration:
if g.gcBucketMigrationRetry < MinGCBucketMigrationRetry {
return MinMigrateGVGRetry
}
if g.gcBucketMigrationRetry > MaxBucketMigrationRetry {
return MaxMigrateGVGRetry
}
return g.gcBucketMigrationRetry
default:
return NotUseRetry
}
Expand Down Expand Up @@ -299,13 +323,15 @@ func (g *GfSpBaseApp) TaskPriority(task coretask.Task) coretask.TPriority {
case coretask.TypeTaskGCObject:
return coretask.UnSchedulingPriority
case coretask.TypeTaskGCZombiePiece:
return coretask.UnSchedulingPriority
return coretask.DefaultSmallerPriority / 4
case coretask.TypeTaskGCMeta:
return coretask.UnSchedulingPriority
return coretask.DefaultSmallerPriority / 4
case coretask.TypeTaskRecoverPiece:
return coretask.DefaultSmallerPriority / 4
case coretask.TypeTaskMigrateGVG:
return coretask.DefaultSmallerPriority
case coretask.TypeTaskGCBucketMigration:
return coretask.DefaultSmallerPriority
default:
return coretask.UnKnownTaskPriority
}
Expand Down
4 changes: 2 additions & 2 deletions base/gfspapp/task_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,12 +801,12 @@ func TestGfSpBaseApp_TaskPriority(t *testing.T) {
{
name: "gc zombie piece task",
task: &gfsptask.GfSpGCZombiePieceTask{},
wantedResult: coretask.UnSchedulingPriority,
wantedResult: coretask.DefaultSmallerPriority / 4,
},
{
name: "gc meta task",
task: &gfsptask.GfSpGCMetaTask{},
wantedResult: coretask.UnSchedulingPriority,
wantedResult: coretask.DefaultSmallerPriority / 4,
},
{
name: "recover piece task",
Expand Down
3 changes: 2 additions & 1 deletion base/gfspclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,14 @@ type ManagerAPI interface {
NotifyMigrateSwapOut(ctx context.Context, swapOut *virtualgrouptypes.MsgSwapOut) error
GetTasksStats(ctx context.Context) (*gfspserver.TasksStats, error)
NotifyPreMigrateBucket(ctx context.Context, bucketID uint64) error
NotifyPostMigrateBucket(ctx context.Context, bucketID uint64) error
NotifyPostMigrateBucket(ctx context.Context, bmInfo *gfsptask.GfSpBucketMigrationInfo) error
}

// MetadataAPI for mock sue
type MetadataAPI interface {
GetUserBucketsCount(ctx context.Context, account string, includeRemoved bool, opts ...grpc.DialOption) (int64, error)
ListDeletedObjectsByBlockNumberRange(ctx context.Context, spOperatorAddress string, startBlockNumber uint64, endBlockNumber uint64, includePrivate bool, opts ...grpc.DialOption) ([]*types.Object, uint64, error)
ListObjectsByBlockNumberRange(ctx context.Context, spOperatorAddress string, startBlockNumber uint64, endBlockNumber uint64, includePrivate bool, opts ...grpc.DialOption) ([]*types.Object, uint64, error)
GetUserBuckets(ctx context.Context, account string, includeRemoved bool, opts ...grpc.DialOption) ([]*types.VGFInfoBucket, error)
ListObjectsByBucketName(ctx context.Context, bucketName string, accountID string, maxKeys uint64, startAfter string, continuationToken string, delimiter string, prefix string, includeRemoved bool,
opts ...grpc.DialOption) (objects []*types.Object, keyCount, maxKeysRe uint64, isTruncated bool, nextContinuationToken, name, prefixRe, delimiterRe string, commonPrefixes []string, continuationTokenRe string, err error)
Expand Down
58 changes: 50 additions & 8 deletions base/gfspclient/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions base/gfspclient/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func (s *GfSpClient) AskTask(ctx context.Context, limit corercmgr.Limit) (coreta
return t.RecoverPieceTask, nil
case *gfspserver.GfSpAskTaskResponse_MigrateGvgTask:
return t.MigrateGvgTask, nil
case *gfspserver.GfSpAskTaskResponse_GcBucketMigrationTask:
return t.GcBucketMigrationTask, nil
default:
return nil, ErrTypeMismatch
}
Expand Down Expand Up @@ -127,6 +129,8 @@ func (s *GfSpClient) ReportTask(ctx context.Context, report coretask.Task) error
req.Request = &gfspserver.GfSpReportTaskRequest_RecoverPieceTask{RecoverPieceTask: t}
case *gfsptask.GfSpMigrateGVGTask:
req.Request = &gfspserver.GfSpReportTaskRequest_MigrateGvgTask{MigrateGvgTask: t}
case *gfsptask.GfSpGCBucketMigrationTask:
req.Request = &gfspserver.GfSpReportTaskRequest_GcBucketMigrationTask{GcBucketMigrationTask: t}
default:
log.CtxErrorw(ctx, "unsupported task type to report")
return ErrTypeMismatch
Expand Down Expand Up @@ -218,14 +222,14 @@ func (s *GfSpClient) NotifyPreMigrateBucket(ctx context.Context, bucketID uint64
return nil
}

func (s *GfSpClient) NotifyPostMigrateBucket(ctx context.Context, bucketID uint64) error {
func (s *GfSpClient) NotifyPostMigrateBucket(ctx context.Context, bmInfo *gfsptask.GfSpBucketMigrationInfo) error {
conn, connErr := s.ManagerConn(ctx)
if connErr != nil {
log.CtxErrorw(ctx, "client failed to connect manager", "error", connErr)
return ErrRPCUnknownWithDetail("client failed to connect manager, error: " + connErr.Error())
}
req := &gfspserver.GfSpNotifyPostMigrateBucketRequest{
BucketId: bucketID,
BucketMigrationInfo: bmInfo,
}
resp, err := gfspserver.NewGfSpManageServiceClient(conn).GfSpNotifyPostMigrate(ctx, req)
if err != nil {
Expand Down
24 changes: 22 additions & 2 deletions base/gfspclient/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,26 @@ func (s *GfSpClient) ListDeletedObjectsByBlockNumberRange(ctx context.Context, s
return resp.GetObjects(), uint64(resp.GetEndBlockNumber()), nil
}

func (s *GfSpClient) ListObjectsByBlockNumberRange(ctx context.Context, spOperatorAddress string, startBlockNumber uint64,
endBlockNumber uint64, includePrivate bool, opts ...grpc.DialOption) ([]*types.Object, uint64, error) {
conn, err := s.Connection(ctx, s.metadataEndpoint, opts...)
if err != nil {
return nil, uint64(0), ErrRPCUnknownWithDetail("client failed to connect metadata, error: " + err.Error())
}
defer conn.Close()
req := &types.GfSpListDeletedObjectsByBlockNumberRangeRequest{
StartBlockNumber: int64(startBlockNumber),
EndBlockNumber: int64(endBlockNumber),
IncludePrivate: includePrivate,
}
resp, err := types.NewGfSpMetadataServiceClient(conn).GfSpListDeletedObjectsByBlockNumberRange(ctx, req)
if err != nil {
log.CtxErrorw(ctx, "failed to list deleted objects by block number range", "error", err)
return nil, uint64(0), ErrRPCUnknownWithDetail("failed to list deleted objects by block number range, error: " + err.Error())
}
return resp.GetObjects(), uint64(resp.GetEndBlockNumber()), nil
}

func (s *GfSpClient) GetUserBuckets(ctx context.Context, account string, includeRemoved bool, opts ...grpc.DialOption) ([]*types.VGFInfoBucket, error) {
conn, err := s.Connection(ctx, s.metadataEndpoint, opts...)
if err != nil {
Expand Down Expand Up @@ -568,15 +588,15 @@ func (s *GfSpClient) ListObjectsInGVG(ctx context.Context, gvgID uint32, startAf
return resp.Objects, nil
}

func (s *GfSpClient) ListObjectsByGVGAndBucketForGC(ctx context.Context, gvgID uint32, bucketID uint64, startAfter uint64, limit uint32, opts ...grpc.DialOption) ([]*types.ObjectDetails, error) {
func (s *GfSpClient) ListObjectsByGVGAndBucketForGC(ctx context.Context, dstGvgID uint32, bucketID uint64, startAfter uint64, limit uint32, opts ...grpc.DialOption) ([]*types.ObjectDetails, error) {
conn, connErr := s.Connection(ctx, s.metadataEndpoint, opts...)
if connErr != nil {
log.CtxErrorw(ctx, "client failed to connect metadata", "error", connErr)
return nil, ErrRPCUnknownWithDetail("client failed to connect metadata, error: " + connErr.Error())
}
defer conn.Close()
req := &types.GfSpListObjectsByGVGAndBucketForGCRequest{
GvgId: gvgID,
DstGvgId: dstGvgID,
BucketId: bucketID,
StartAfter: startAfter,
Limit: limit,
Expand Down
Loading

0 comments on commit 1188bae

Please sign in to comment.