Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: gc for zombie piece & metaTask & bucket migration #1190

Merged
merged 10 commits into from
Nov 24, 2023
30 changes: 16 additions & 14 deletions base/gfspapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,22 @@ type GfSpBaseApp struct {
replicateSpeed int64
receiveSpeed int64

sealObjectTimeout int64
gcObjectTimeout int64
gcZombieTimeout int64
gcMetaTimeout int64
migrateGVGTimeout int64

sealObjectRetry int64
replicateRetry int64
receiveConfirmRetry int64
gcObjectRetry int64
gcZombieRetry int64
gcMetaRetry int64
recoveryRetry int64
migrateGVGRetry int64
sealObjectTimeout int64
gcObjectTimeout int64
gcZombieTimeout int64
gcMetaTimeout int64
gcBucketMigrationTimeout int64
migrateGVGTimeout int64

sealObjectRetry int64
replicateRetry int64
receiveConfirmRetry int64
gcObjectRetry int64
gcZombieRetry int64
gcMetaRetry int64
gcBucketMigrationRetry int64
recoveryRetry int64
migrateGVGRetry int64
}

// AppID returns the GfSpBaseApp ID, the default value is prefix(gfsp) add
Expand Down
1 change: 1 addition & 0 deletions base/gfspapp/app_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func defaultGfSpDB(cfg *config.SQLDBConfig) {
if cfg.Database == "" {
cfg.Database = "storage_provider_db"
}
cfg.EnableTracePutEvent = sqldb.DefaultEnableTracePutEvent
}

var bsdbOnce = sync.Once{}
Expand Down
17 changes: 9 additions & 8 deletions base/gfspapp/app_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ func TestDefaultGfSpDBOptionFailure1(t *testing.T) {

func Test_defaultGfSpDB(t *testing.T) {
cfg := &config.SQLDBConfig{
User: "",
Passwd: "",
Address: "",
Database: "",
ConnMaxLifetime: 0,
ConnMaxIdleTime: 0,
MaxIdleConns: 0,
MaxOpenConns: 0,
User: "",
Passwd: "",
Address: "",
Database: "",
ConnMaxLifetime: 0,
ConnMaxIdleTime: 0,
MaxIdleConns: 0,
MaxOpenConns: 0,
EnableTracePutEvent: true,
}
defaultGfSpDB(cfg)
assert.Equal(t, "storage_provider_db", cfg.Database)
Expand Down
13 changes: 12 additions & 1 deletion base/gfspapp/manage_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func (g *GfSpBaseApp) GfSpAskTask(ctx context.Context, req *gfspserver.GfSpAskTa
resp.Response = &gfspserver.GfSpAskTaskResponse_MigrateGvgTask{
MigrateGvgTask: t,
}
case *gfsptask.GfSpGCBucketMigrationTask:
resp.Response = &gfspserver.GfSpAskTaskResponse_GcBucketMigrationTask{
GcBucketMigrationTask: t,
}
default:
log.CtxErrorw(ctx, "[BUG] Unsupported task type to dispatch")
return &gfspserver.GfSpAskTaskResponse{Err: ErrUnsupportedTaskType}, nil
Expand Down Expand Up @@ -300,6 +304,12 @@ func (g *GfSpBaseApp) GfSpReportTask(ctx context.Context, req *gfspserver.GfSpRe
task.SetAddress(util.GetRPCRemoteAddress(ctx))
log.CtxInfow(ctx, "begin to handle reported migrate gvg task", "task_info", task.Info())
err = g.manager.HandleMigrateGVGTask(ctx, t.MigrateGvgTask)
case *gfspserver.GfSpReportTaskRequest_GcBucketMigrationTask:
task := t.GcBucketMigrationTask
ctx = log.WithValue(ctx, log.CtxKeyTask, task.Key().String())
task.SetAddress(util.GetRPCRemoteAddress(ctx))
log.CtxInfow(ctx, "begin to handle reported gc bucket migration task", "task_info", task.Info())
err = g.manager.HandleGCBucketMigrationTask(ctx, t.GcBucketMigrationTask)
default:
log.CtxError(ctx, "receive unsupported task type")
return &gfspserver.GfSpReportTaskResponse{Err: ErrUnsupportedTaskType}, nil
Expand Down Expand Up @@ -358,9 +368,10 @@ func (g *GfSpBaseApp) GfSpNotifyPreMigrate(ctx context.Context, req *gfspserver.

return &gfspserver.GfSpNotifyPreMigrateBucketResponse{}, nil
}

func (g *GfSpBaseApp) GfSpNotifyPostMigrate(ctx context.Context, req *gfspserver.GfSpNotifyPostMigrateBucketRequest) (
*gfspserver.GfSpNotifyPostMigrateBucketResponse, error) {
if err := g.manager.NotifyPostMigrateBucket(ctx, req.GetBucketId()); err != nil {
if err := g.manager.NotifyPostMigrateBucket(ctx, req.GetBucketMigrationInfo()); err != nil {
return nil, err
}

Expand Down
32 changes: 29 additions & 3 deletions base/gfspapp/task_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ const (
MinMigrateGVGTime int64 = 1800 // 0.5 hour
// MaxMigrateGVGTime defines the max timeout to migrate gvg.
MaxMigrateGVGTime int64 = 3600 // 1 hour
// MinGCBucketMigrationTime defines the min timeout to gc bucket migration.
MinGCBucketMigrationTime int64 = 1800 // 0.5 hour
// MaxGCBucketMigrationTime defines the max timeout to gc bucket migration.
MaxGCBucketMigrationTime int64 = 3600 // 1 hour

// NotUseRetry defines the default task max retry.
NotUseRetry int64 = 0
Expand Down Expand Up @@ -78,6 +82,10 @@ const (
MinMigrateGVGRetry = 2
// MaxMigrateGVGRetry defines the max retry number to migrate gvg.
MaxMigrateGVGRetry = 3
// MinGCBucketMigrationRetry defines the min retry number to gc bucket migration.
MinGCBucketMigrationRetry = 3
// MaxGCBucketMigrationRetry defines the max retry number to gc bucket migration.
MaxGCBucketMigrationRetry = 5
)

// TaskTimeout returns the task timeout by task type and some task need payload size
Expand Down Expand Up @@ -184,6 +192,14 @@ func (g *GfSpBaseApp) TaskTimeout(task coretask.Task, size uint64) int64 {
return MaxMigrateGVGTime
}
return g.migrateGVGTimeout
case coretask.TypeTaskGCBucketMigration:
if g.gcBucketMigrationTimeout < MinGCBucketMigrationTime {
return MinGCBucketMigrationTime
}
if g.gcBucketMigrationTimeout > MaxGCBucketMigrationTime {
return MaxGCBucketMigrationTime
}
return g.gcBucketMigrationTimeout
}
return NotUseTimeout
}
Expand Down Expand Up @@ -267,6 +283,14 @@ func (g *GfSpBaseApp) TaskMaxRetry(task coretask.Task) int64 {
return MaxMigrateGVGRetry
}
return g.migrateGVGRetry
case coretask.TypeTaskGCBucketMigration:
if g.gcBucketMigrationRetry < MinGCBucketMigrationRetry {
return MinGCBucketMigrationRetry
}
if g.gcBucketMigrationRetry > MaxGCBucketMigrationRetry {
return MaxGCBucketMigrationRetry
}
return g.gcBucketMigrationRetry
default:
return NotUseRetry
}
Expand Down Expand Up @@ -297,11 +321,13 @@ func (g *GfSpBaseApp) TaskPriority(task coretask.Task) coretask.TPriority {
case coretask.TypeTaskChallengePiece:
return coretask.UnSchedulingPriority
case coretask.TypeTaskGCObject:
return coretask.UnSchedulingPriority
return coretask.DefaultSmallerPriority / 4
case coretask.TypeTaskGCZombiePiece:
return coretask.UnSchedulingPriority
return coretask.DefaultSmallerPriority / 4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this? Please check coretask.TypeTaskGCObject:coretask.UnSchedulingPriority.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During testing, encountered a crash in PickUpTask when all tasks have a priority of 0.
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnSchedulingPriority indicates tasks that don't need background scheduling. Currently, the priority for all GC-related tasks is uniformly set to coretask.DefaultSmallerPriority / 4.

case coretask.TypeTaskGCMeta:
return coretask.UnSchedulingPriority
return coretask.DefaultSmallerPriority / 4
case coretask.TypeTaskGCBucketMigration:
return coretask.DefaultSmallerPriority / 4
case coretask.TypeTaskRecoverPiece:
return coretask.DefaultSmallerPriority / 4
case coretask.TypeTaskMigrateGVG:
Expand Down
11 changes: 8 additions & 3 deletions base/gfspapp/task_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,17 +796,22 @@ func TestGfSpBaseApp_TaskPriority(t *testing.T) {
{
name: "gc object task",
task: &gfsptask.GfSpGCObjectTask{},
wantedResult: coretask.UnSchedulingPriority,
wantedResult: coretask.DefaultSmallerPriority / 4,
},
{
name: "gc zombie piece task",
task: &gfsptask.GfSpGCZombiePieceTask{},
wantedResult: coretask.UnSchedulingPriority,
wantedResult: coretask.DefaultSmallerPriority / 4,
},
{
name: "gc meta task",
task: &gfsptask.GfSpGCMetaTask{},
wantedResult: coretask.UnSchedulingPriority,
wantedResult: coretask.DefaultSmallerPriority / 4,
},
{
name: "gc bucket migration task",
task: &gfsptask.GfSpGCBucketMigrationTask{},
wantedResult: coretask.DefaultSmallerPriority / 4,
},
{
name: "recover piece task",
Expand Down
3 changes: 2 additions & 1 deletion base/gfspclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type ManagerAPI interface {
NotifyMigrateSwapOut(ctx context.Context, swapOut *virtualgrouptypes.MsgSwapOut) error
GetTasksStats(ctx context.Context) (*gfspserver.TasksStats, error)
NotifyPreMigrateBucket(ctx context.Context, bucketID uint64) error
NotifyPostMigrateBucket(ctx context.Context, bucketID uint64) error
NotifyPostMigrateBucket(ctx context.Context, bmInfo *gfsptask.GfSpBucketMigrationInfo) error
}

// MetadataAPI for mock sue
Expand All @@ -103,6 +103,7 @@ type MetadataAPI interface {
GetBucketByBucketID(ctx context.Context, bucketID int64, includePrivate bool, opts ...grpc.DialOption) (*types.Bucket, error)
ListExpiredBucketsBySp(ctx context.Context, createAt int64, primarySpID uint32, limit int64, opts ...grpc.DialOption) ([]*types.Bucket, error)
GetObjectMeta(ctx context.Context, objectName string, bucketName string, includePrivate bool, opts ...grpc.DialOption) (*types.Object, error)
GetLatestObjectID(ctx context.Context, opts ...grpc.DialOption) (uint64, error)
GetPaymentByBucketName(ctx context.Context, bucketName string, includePrivate bool, opts ...grpc.DialOption) (*payment_types.StreamRecord, error)
GetPaymentByBucketID(ctx context.Context, bucketID int64, includePrivate bool, opts ...grpc.DialOption) (*payment_types.StreamRecord, error)
VerifyPermission(ctx context.Context, Operator string, bucketName string, objectName string, actionType permission_types.ActionType, opts ...grpc.DialOption) (*permission_types.Effect, error)
Expand Down
Loading