Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bucket migration state improvement #1250

Merged
merged 24 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ca7a1ac
feat: add bucket migrate status persist & confirm complete event
jingjunLi Nov 15, 2023
7b66ebf
feat: bucket migration gc report
jingjunLi Dec 5, 2023
4d5c8d8
feat: add bucket migration gc progress & migration state persist
jingjunLi Dec 5, 2023
50dabe5
feat: sp gc state persist finished
jingjunLi Dec 9, 2023
a05ebb4
feat: support bucket migration progress
jingjunLi Dec 11, 2023
00b91ee
feat: add more comment
jingjunLi Dec 11, 2023
f4b398d
feat: fix some comment
jingjunLi Dec 13, 2023
cc2df52
feat: fix some variable name as comment
jingjunLi Dec 14, 2023
88f6f6d
feat: fix bucket migration progress
jingjunLi Dec 14, 2023
b144508
feat: rename some variable name
jingjunLi Dec 14, 2023
1d34ec7
feat: rename some variable name
jingjunLi Dec 14, 2023
532578a
feat: empty bucket quota sync
jingjunLi Dec 14, 2023
55d71ae
feat: fix some bugs for migrate
jingjunLi Dec 14, 2023
a30c03d
feat: fix some variable name
jingjunLi Dec 15, 2023
c0ab6e4
feat: add pre & post ut
jingjunLi Dec 15, 2023
9c19f50
feat: add more ut
jingjunLi Dec 15, 2023
ba2951e
feat: format code
jingjunLi Dec 15, 2023
1b48287
feat: format code
jingjunLi Dec 15, 2023
25e5926
feat: reject migrate bucket when current migrate not finished
jingjunLi Dec 15, 2023
e4593f2
feat: add load from db for bucket migration gc task
jingjunLi Dec 18, 2023
1199274
feat: proto buf lint
jingjunLi Dec 18, 2023
aa08db2
feat: fix query bucket info from chain
jingjunLi Dec 18, 2023
51207f8
feat: rename some variable name
jingjunLi Dec 18, 2023
d5d24a2
feat: add config for enable bucket migrate cache
jingjunLi Dec 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions base/gfspapp/manage_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,22 +360,48 @@ func (g *GfSpBaseApp) GfSpQueryTasksStats(ctx context.Context, _ *gfspserver.GfS
}, nil
}

func (g *GfSpBaseApp) GfSpNotifyPreMigrate(ctx context.Context, req *gfspserver.GfSpNotifyPreMigrateBucketRequest) (
func (g *GfSpBaseApp) GfSpQueryBucketMigrationProgress(ctx context.Context, req *gfspserver.GfSpQueryBucketMigrationProgressRequest) (
jingjunLi marked this conversation as resolved.
Show resolved Hide resolved
*gfspserver.GfSpQueryBucketMigrationProgressResponse, error) {
var (
progress *gfspserver.MigrateBucketProgressMeta
err error
)
if progress, err = g.manager.QueryBucketMigrationProgress(ctx, req.GetBucketId()); err != nil {
log.CtxErrorw(ctx, "failed to query bucket migration progress", "bucket_id", req.GetBucketId(), "error", err)
return nil, err
}

return &gfspserver.GfSpQueryBucketMigrationProgressResponse{
Progress: progress,
}, nil
}

func (g *GfSpBaseApp) GfSpNotifyPreMigrateBucketAndDeductQuota(ctx context.Context, req *gfspserver.GfSpNotifyPreMigrateBucketRequest) (
*gfspserver.GfSpNotifyPreMigrateBucketResponse, error) {
if err := g.manager.NotifyPreMigrateBucket(ctx, req.GetBucketId()); err != nil {
var (
quota *gfsptask.GfSpBucketQuotaInfo
err error
)
if quota, err = g.manager.NotifyPreMigrateBucketAndDeductQuota(ctx, req.GetBucketId()); err != nil {
jingjunLi marked this conversation as resolved.
Show resolved Hide resolved
log.CtxErrorw(ctx, "failed to notify pre migrate bucket and deduct quota", "bucket_id", req.GetBucketId(), "error", err)
return nil, err
}

return &gfspserver.GfSpNotifyPreMigrateBucketResponse{}, nil
return &gfspserver.GfSpNotifyPreMigrateBucketResponse{Quota: quota}, nil
}

func (g *GfSpBaseApp) GfSpNotifyPostMigrate(ctx context.Context, req *gfspserver.GfSpNotifyPostMigrateBucketRequest) (
func (g *GfSpBaseApp) GfSpNotifyPostMigrateAndRecoupQuota(ctx context.Context, req *gfspserver.GfSpNotifyPostMigrateBucketRequest) (
*gfspserver.GfSpNotifyPostMigrateBucketResponse, error) {
if err := g.manager.NotifyPostMigrateBucket(ctx, req.GetBucketMigrationInfo()); err != nil {
var (
quota *gfsptask.GfSpBucketQuotaInfo
err error
)
if quota, err = g.manager.NotifyPostMigrateBucketAndRecoupQuota(ctx, req.GetBucketMigrationInfo()); err != nil {
jingjunLi marked this conversation as resolved.
Show resolved Hide resolved
log.CtxErrorw(ctx, "failed to notify post migrate bucket and recoup quota", "bucket_migration_info", req.GetBucketMigrationInfo(), "error", err)
return nil, err
}

return &gfspserver.GfSpNotifyPostMigrateBucketResponse{}, nil
return &gfspserver.GfSpNotifyPostMigrateBucketResponse{Quota: quota}, nil
}

func (g *GfSpBaseApp) GfSpResetRecoveryFailedList(ctx context.Context, _ *gfspserver.GfSpResetRecoveryFailedListRequest) (
Expand Down
50 changes: 50 additions & 0 deletions base/gfspapp/manage_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,3 +632,53 @@ func TestGfSpBaseApp_GfSpNotifyMigrateSwapOutFailure(t *testing.T) {
assert.Equal(t, mockErr, err)
assert.Nil(t, result)
}

func TestGfSpBaseApp_GfSpNotifyPreMigrateBucketAndDeductQuotaSuccess(t *testing.T) {
g := setup(t)
ctrl := gomock.NewController(t)
m := module.NewMockManager(ctrl)
g.manager = m
quota := &gfsptask.GfSpBucketQuotaInfo{BucketId: 2}
m.EXPECT().NotifyPreMigrateBucketAndDeductQuota(gomock.Any(), gomock.Any()).Return(quota, nil).Times(1)
req := &gfspserver.GfSpNotifyPreMigrateBucketRequest{BucketId: 2}
result, err := g.GfSpNotifyPreMigrateBucketAndDeductQuota(context.TODO(), req)
assert.Nil(t, err)
assert.Equal(t, &gfspserver.GfSpNotifyPreMigrateBucketResponse{Err: nil, Quota: quota}, result)
}

func TestGfSpBaseApp_GfSpNotifyPreMigrateBucketAndDeductQuotaFailure(t *testing.T) {
g := setup(t)
ctrl := gomock.NewController(t)
m := module.NewMockManager(ctrl)
g.manager = m
m.EXPECT().NotifyPreMigrateBucketAndDeductQuota(gomock.Any(), gomock.Any()).Return(nil, mockErr).Times(1)
req := &gfspserver.GfSpNotifyPreMigrateBucketRequest{BucketId: 2}
result, err := g.GfSpNotifyPreMigrateBucketAndDeductQuota(context.TODO(), req)
assert.Equal(t, mockErr, err)
assert.Nil(t, result)
}

func TestGfSpBaseApp_GfSpNotifyPostMigrateBucketAndRecoupQuotaSuccess(t *testing.T) {
g := setup(t)
ctrl := gomock.NewController(t)
m := module.NewMockManager(ctrl)
g.manager = m
quota := &gfsptask.GfSpBucketQuotaInfo{BucketId: 2}
m.EXPECT().NotifyPostMigrateBucketAndRecoupQuota(gomock.Any(), gomock.Any()).Return(quota, nil).Times(1)
req := &gfspserver.GfSpNotifyPostMigrateBucketRequest{BucketId: 2}
result, err := g.GfSpNotifyPostMigrateAndRecoupQuota(context.TODO(), req)
assert.Nil(t, err)
assert.Equal(t, &gfspserver.GfSpNotifyPostMigrateBucketResponse{Quota: quota}, result)
}

func TestGfSpBaseApp_GfSpPostMigrateBucketAndRecoupQuotaFailure(t *testing.T) {
g := setup(t)
ctrl := gomock.NewController(t)
m := module.NewMockManager(ctrl)
g.manager = m
m.EXPECT().NotifyPostMigrateBucketAndRecoupQuota(gomock.Any(), gomock.Any()).Return(nil, mockErr).Times(1)
req := &gfspserver.GfSpNotifyPostMigrateBucketRequest{BucketId: 2}
result, err := g.GfSpNotifyPostMigrateAndRecoupQuota(context.TODO(), req)
assert.Equal(t, mockErr, err)
assert.Nil(t, result)
}
18 changes: 10 additions & 8 deletions base/gfspclient/gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ const (
GnfdUnsignedApprovalMsgHeader = "X-Gnfd-Unsigned-Msg"
// GnfdSignedApprovalMsgHeader defines signed msg, which is used by get-approval
GnfdSignedApprovalMsgHeader = "X-Gnfd-Signed-Msg"
// GnfdQuotaInfoHeader defines quota info, which is used by sp
GnfdQuotaInfoHeader = "X-Gnfd-Quota-Info"
)

func (s *GfSpClient) ReplicatePieceToSecondary(ctx context.Context, endpoint string, receive coretask.ReceivePieceTask,
Expand Down Expand Up @@ -243,13 +245,13 @@ func (s *GfSpClient) QueryLatestBucketQuota(ctx context.Context, endpoint string
return gfsptask.GfSpBucketQuotaInfo{}, fmt.Errorf("failed to query latest bucket quota, bucket(%s), status_code(%d), endpoint(%s)", queryMsg, resp.StatusCode, endpoint)
}

signedMsg, err := hex.DecodeString(resp.Header.Get(GnfdSignedApprovalMsgHeader))
quotaInfoMsg, err := hex.DecodeString(resp.Header.Get(GnfdQuotaInfoHeader))
if err != nil {
return gfsptask.GfSpBucketQuotaInfo{}, err
}

quotaResult := gfsptask.GfSpBucketQuotaInfo{}
if err = proto.Unmarshal(signedMsg, &quotaResult); err != nil {
if err = proto.Unmarshal(quotaInfoMsg, &quotaResult); err != nil {
return gfsptask.GfSpBucketQuotaInfo{}, err
}

Expand Down Expand Up @@ -280,13 +282,13 @@ func (s *GfSpClient) PreMigrateBucket(ctx context.Context, srcSPEndpoint string,
return gfsptask.GfSpBucketQuotaInfo{}, fmt.Errorf("failed to pre migrate bucket, bucket_migration_info(%s), status_code(%d), endpoint(%s)", preMsg, resp.StatusCode, srcSPEndpoint)
}

signedMsg, err := hex.DecodeString(resp.Header.Get(GnfdSignedApprovalMsgHeader))
quotaInfoMsg, err := hex.DecodeString(resp.Header.Get(GnfdQuotaInfoHeader))
if err != nil {
return gfsptask.GfSpBucketQuotaInfo{}, err
}

quotaResult := gfsptask.GfSpBucketQuotaInfo{}
if err = proto.Unmarshal(signedMsg, &quotaResult); err != nil {
if err = proto.Unmarshal(quotaInfoMsg, &quotaResult); err != nil {
return gfsptask.GfSpBucketQuotaInfo{}, err
}

Expand Down Expand Up @@ -319,13 +321,13 @@ func (s *GfSpClient) PostMigrateBucket(ctx context.Context, srcSPEndpoint string
return gfsptask.GfSpBucketQuotaInfo{}, fmt.Errorf("failed to post migrate bucket to src sp, bucket(%d), status_code(%d), endpoint(%s)", bucketID, resp.StatusCode, srcSPEndpoint)
}

signedMsg, err := hex.DecodeString(resp.Header.Get(GnfdSignedApprovalMsgHeader))
quotaInfoMsg, err := hex.DecodeString(resp.Header.Get(GnfdQuotaInfoHeader))
if err != nil {
return gfsptask.GfSpBucketQuotaInfo{}, err
}

quotaResult := gfsptask.GfSpBucketQuotaInfo{}
if err = proto.Unmarshal(signedMsg, &quotaResult); err != nil {
if err = proto.Unmarshal(quotaInfoMsg, &quotaResult); err != nil {
return gfsptask.GfSpBucketQuotaInfo{}, err
}

Expand Down Expand Up @@ -358,13 +360,13 @@ func (s *GfSpClient) QuerySPHasEnoughQuotaForMigrateBucket(ctx context.Context,
bucketID, resp.StatusCode, srcSPEndpoint)
}

signedMsg, err := hex.DecodeString(resp.Header.Get(GnfdSignedApprovalMsgHeader))
quotaInfoMsg, err := hex.DecodeString(resp.Header.Get(GnfdQuotaInfoHeader))
if err != nil {
return err
}

quotaResult := gfsptask.GfSpBucketQuotaInfo{}
if err = proto.Unmarshal(signedMsg, &quotaResult); err != nil {
if err = proto.Unmarshal(quotaInfoMsg, &quotaResult); err != nil {
return err
}

Expand Down
13 changes: 11 additions & 2 deletions base/gfspclient/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,23 @@ func (s mockManagerServer) GfSpQueryTasksStats(ctx context.Context, req *gfspser
}, nil
}

func (s mockManagerServer) GfSpNotifyPreMigrate(ctx context.Context, req *gfspserver.GfSpNotifyPreMigrateBucketRequest) (*gfspserver.GfSpNotifyPreMigrateBucketResponse, error) {
func (s mockManagerServer) GfSpQueryBucketMigrationProgress(ctx context.Context, req *gfspserver.GfSpQueryBucketMigrationProgressRequest) (*gfspserver.GfSpQueryBucketMigrationProgressResponse, error) {
if req == nil {
return nil, mockRPCErr
}
return &gfspserver.GfSpQueryBucketMigrationProgressResponse{
Progress: &gfspserver.MigrateBucketProgressMeta{},
}, nil
}

func (s mockManagerServer) GfSpNotifyPreMigrateBucketAndDeductQuota(ctx context.Context, req *gfspserver.GfSpNotifyPreMigrateBucketRequest) (*gfspserver.GfSpNotifyPreMigrateBucketResponse, error) {
if req == nil {
return nil, mockRPCErr
}
return &gfspserver.GfSpNotifyPreMigrateBucketResponse{}, nil
}

func (s mockManagerServer) GfSpNotifyPostMigrate(ctx context.Context, req *gfspserver.GfSpNotifyPostMigrateBucketRequest) (*gfspserver.GfSpNotifyPostMigrateBucketResponse, error) {
func (s mockManagerServer) GfSpNotifyPostMigrateAndRecoupQuota(ctx context.Context, req *gfspserver.GfSpNotifyPostMigrateBucketRequest) (*gfspserver.GfSpNotifyPostMigrateBucketResponse, error) {
if req == nil {
return nil, mockRPCErr
}
Expand Down
8 changes: 5 additions & 3 deletions base/gfspclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ type ManagerAPI interface {
PickVirtualGroupFamilyID(ctx context.Context, task coretask.ApprovalCreateBucketTask) (uint32, error)
NotifyMigrateSwapOut(ctx context.Context, swapOut *virtualgrouptypes.MsgSwapOut) error
GetTasksStats(ctx context.Context) (*gfspserver.TasksStats, error)
NotifyPreMigrateBucket(ctx context.Context, bucketID uint64) error
NotifyPostMigrateBucket(ctx context.Context, bmInfo *gfsptask.GfSpBucketMigrationInfo) error
GetMigrateBucketProgress(ctx context.Context, bucketID uint64) (*gfspserver.MigrateBucketProgressMeta, error)
NotifyPostMigrateBucketAndRecoupQuota(ctx context.Context, bmInfo *gfsptask.GfSpBucketMigrationInfo) (*gfsptask.GfSpBucketQuotaInfo, error)
NotifyPreMigrateBucketAndDeductQuota(ctx context.Context, bucketID uint64) (*gfsptask.GfSpBucketQuotaInfo, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which word is better: Migrate or Migration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before it was migratebucket, now the newly added code is bucket migration.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please try to keep a consistent style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// MetadataAPI for mock sue
Expand All @@ -110,7 +111,7 @@ type MetadataAPI interface {
GetBucketMeta(ctx context.Context, bucketName string, includePrivate bool, opts ...grpc.DialOption) (*types.VGFInfoBucket, *payment_types.StreamRecord, error)
GetEndpointBySpID(ctx context.Context, spID uint32, opts ...grpc.DialOption) (string, error)
GetBucketReadQuota(ctx context.Context, bucket *storagetypes.BucketInfo, yearMonth string, opts ...grpc.DialOption) (uint64, uint64, uint64, uint64, error)
GetLatestBucketReadQuota(ctx context.Context, bucketID uint64, opts ...grpc.DialOption) (gfsptask.GfSpBucketQuotaInfo, error)
GetLatestBucketReadQuota(ctx context.Context, bucketID uint64, opts ...grpc.DialOption) (*gfsptask.GfSpBucketQuotaInfo, error)
ListBucketReadRecord(ctx context.Context, bucket *storagetypes.BucketInfo, startTimestampUs, endTimestampUs, maxRecordNum int64, opts ...grpc.DialOption) ([]*types.ReadRecord, int64, error)
GetUploadObjectState(ctx context.Context, objectID uint64, opts ...grpc.DialOption) (int32, string, error)
GetUploadObjectSegment(ctx context.Context, objectID uint64, opts ...grpc.DialOption) (uint32, error)
Expand All @@ -127,6 +128,7 @@ type MetadataAPI interface {
ListGlobalVirtualGroupsByBucket(ctx context.Context, bucketID uint64, opts ...grpc.DialOption) ([]*virtualgrouptypes.GlobalVirtualGroup, error)
ListGlobalVirtualGroupsBySecondarySP(ctx context.Context, spID uint32, opts ...grpc.DialOption) ([]*virtualgrouptypes.GlobalVirtualGroup, error)
ListMigrateBucketEvents(ctx context.Context, blockID uint64, spID uint32, opts ...grpc.DialOption) ([]*types.ListMigrateBucketEvents, error)
ListCompleteMigrationBucketEvents(ctx context.Context, blockID uint64, srcSpID uint32, opts ...grpc.DialOption) ([]*storagetypes.EventCompleteMigrationBucket, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to distinguish the difference between ListMigrateBucketEvents and ListCompleteMigrationBucketEvents through naming?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics of the interfaces are currently a bit different:

  • ListMigrateBucketEvents returns all events between (0-height), while ListCompleteMigrationBucketEvents returns the current block height.
  • ListMigrateBucketEvents does not return CompleteMigrationBucketEvent. ListCompleteMigrationBucketEvents is mainly used for the src SP to listen for CompleteMigrationBucketEvent.
  • ListMigrateBucketEvents has an internal table, EventMigrationBucket, using (dst_primary_sp_id). There is no src_sp_id here, making the implementation a bit more complex.

Considering the above reasons, ListCompleteMigrationBucketEvents has been implemented."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. src sp , dest sp comment

ListSwapOutEvents(ctx context.Context, blockID uint64, spID uint32, opts ...grpc.DialOption) ([]*types.ListSwapOutEvents, error)
ListSpExitEvents(ctx context.Context, blockID uint64, spID uint32, opts ...grpc.DialOption) (*types.ListSpExitEvents, error)
GetObjectByID(ctx context.Context, objectID uint64, opts ...grpc.DialOption) (*storagetypes.ObjectInfo, error)
Expand Down
Loading
Loading