Skip to content

Commit

Permalink
feat: add GC GVG.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgao001 committed Dec 4, 2023
1 parent 07da1fb commit 85f140e
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 33 deletions.
60 changes: 60 additions & 0 deletions base/gfspclient/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 74 additions & 16 deletions base/gfspvgmgr/virtual_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@ import (
var _ vgmgr.VirtualGroupManager = &virtualGroupManager{}

const (
VirtualGroupManagerSpace = "VirtualGroupManager"
RefreshMetaInterval = 5 * time.Second
MaxStorageUsageRatio = 0.95
DefaultInitialGVGStakingStorageSize = uint64(2) * 1024 * 1024 * 1024 * 1024 // 2TB per GVG, chain side DefaultMaxStoreSizePerFamily is 64 TB
AdditioanlAddUpGVGStakingStorageSize = uint64(1) * 1024 * 1024 * 1024 * 1024 // 1TB
VirtualGroupManagerSpace = "VirtualGroupManager"
RefreshMetaInterval = 5 * time.Second
MaxStorageUsageRatio = 0.95
DefaultInitialGVGStakingStorageSize = uint64(2) * 1024 * 1024 * 1024 * 1024 // 2TB per GVG, chain side DefaultMaxStoreSizePerFamily is 64 TB
additionalGVGStakingStorageSize = uint64(1) * 1024 * 1024 * 1024 * 1024 // 1TB

defaultSPCheckTimeout = 3 * time.Second
defaultSPHealthCheckerInterval = 5 * time.Second
httpStatusPath = "/status"

emptyGVGSafeDeletePeriod = int64(60) * 60 * 24
)

var (
Expand Down Expand Up @@ -252,6 +254,7 @@ type virtualGroupManager struct {
vgfManager *virtualGroupFamilyManager
freezeSPPool *FreezeSPPool
healthChecker *HealthChecker
gvgGCMap sync.Map // Keep track of empty GVG and the time for GC. Once a GVG is detected empty, it will be put into gvgGCMap, and delete it if it is still empty after 1 day
}

// NewVirtualGroupManager returns a virtual group manager interface.
Expand All @@ -268,6 +271,7 @@ func NewVirtualGroupManager(selfOperatorAddress string, chainClient consensus.Co
gfspClient: gfspClient,
freezeSPPool: NewFreezeSPPool(),
healthChecker: healthChecker,
gvgGCMap: sync.Map{},
}
vgm.refreshGVGMeta(true)
go func() {
Expand Down Expand Up @@ -374,11 +378,14 @@ func (vgm *virtualGroupManager) refreshGVGMeta(byChain bool) {
return
}
}
deposited, err := vgm.monitorGVGUsage(byChain, gvg, vgParams)
deposited, deleted, err := vgm.monitorGVGUsage(gvg, vgParams, byChain)
if err != nil {
log.Errorw("failed to monitor global virtual group usage", "gvgID", gvg.Id, "error", err)
return
}
if deleted {
continue
}
if deposited {
time.Sleep(RefreshMetaInterval)
gvg, err = vgm.chainClient.QueryGlobalVirtualGroup(context.Background(), gvg.Id)
Expand Down Expand Up @@ -538,36 +545,87 @@ func (vgm *virtualGroupManager) releaseSPAndGVGLoop() {
}
}

func (vgm *virtualGroupManager) monitorGVGUsage(byChain bool, gvg *virtualgrouptypes.GlobalVirtualGroup, vgParams *virtualgrouptypes.Params) (deposited bool, err error) {
func (vgm *virtualGroupManager) monitorGVGUsage(gvg *virtualgrouptypes.GlobalVirtualGroup, vgParams *virtualgrouptypes.Params, byChain bool) (deposited, deleted bool, err error) {
needDeposit := func(gvg *virtualgrouptypes.GlobalVirtualGroup, vgParams *virtualgrouptypes.Params) bool {
return float64(gvg.GetStoredSize()) >= MaxStorageUsageRatio*float64(util.TotalStakingStoreSizeOfGVG(gvg, vgParams.GvgStakingPerBytes))
}
isEmpty := func(gvg *virtualgrouptypes.GlobalVirtualGroup) bool {
return gvg.StoredSize == 0
}

curTime := time.Now().Unix()
if isEmpty(gvg) {
// if the GVG is refilled, it can be removed from the gcMap.
vgm.gvgGCMap.Range(func(k, v interface{}) bool {
safeDeleteTime := v.(int64)
if curTime > safeDeleteTime+int64(time.Minute.Seconds()) {
vgm.gvgGCMap.Delete(k)
}
return true
})
if !byChain {
gvg, err = vgm.chainClient.QueryGlobalVirtualGroup(context.Background(), gvg.Id)
if err != nil {
log.Errorw("failed to query global virtual group", "error", err)
return
}
if !isEmpty(gvg) {
return
}
}
var safeDeleteTime int64
val, found := vgm.gvgGCMap.Load(gvg.Id)
if !found {
safeDeleteTime = curTime + emptyGVGSafeDeletePeriod
vgm.gvgGCMap.Store(gvg.Id, safeDeleteTime)
return
}
safeDeleteTime = val.(int64)
if curTime < safeDeleteTime {
return
}
var deleteGVGHash string
deleteGVGHash, err = vgm.gfspClient.DeleteGlobalVirtualGroup(context.Background(), &virtualgrouptypes.MsgDeleteGlobalVirtualGroup{
GlobalVirtualGroupId: gvg.Id,
})
if err != nil {
log.Errorw("failed to delete global virtual group", "gvgID", gvg.Id, "txHash", deleteGVGHash, "error", err)
return
}
vgm.gvgGCMap.Delete(gvg.Id)
deleted = true
return
}

if !needDeposit(gvg, vgParams) {
return false, nil
return
}

msgDeposit := &virtualgrouptypes.MsgDeposit{
GlobalVirtualGroupId: gvg.Id,
Deposit: sdk.Coin{
Denom: vgParams.GetDepositDenom(),
Amount: vgParams.GvgStakingPerBytes.Mul(math.NewIntFromUint64(AdditioanlAddUpGVGStakingStorageSize)),
Amount: vgParams.GvgStakingPerBytes.Mul(math.NewIntFromUint64(additionalGVGStakingStorageSize)),
},
}
if !byChain {
queryGVG, err := vgm.chainClient.QueryGlobalVirtualGroup(context.Background(), gvg.Id)
gvg, err = vgm.chainClient.QueryGlobalVirtualGroup(context.Background(), gvg.Id)
if err != nil {
log.Errorw("failed to query global virtual group", "error", err)
return false, err
return
}
if !needDeposit(queryGVG, vgParams) {
return false, nil
if !needDeposit(gvg, vgParams) {
return
}
}
depositTxHash, err := vgm.gfspClient.Deposit(context.Background(), msgDeposit)
var depositTxHash string
depositTxHash, err = vgm.gfspClient.Deposit(context.Background(), msgDeposit)
if err != nil {
log.Errorw("failed to deposit global virtual group", "gvgID", gvg.Id, "txHash", depositTxHash, "error", err)
return false, err
return
}
return true, nil
deposited = true
return
}

type HealthChecker struct {
Expand Down
10 changes: 5 additions & 5 deletions modular/signer/signer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ func (client *GreenfieldChainSignClient) RejectMigrateBucket(ctx context.Context

func (client *GreenfieldChainSignClient) Deposit(ctx context.Context, scope SignType,
msg *virtualgrouptypes.MsgDeposit) (string, error) {
log.Infow("signer starts to deposit into GVG", "scope", scope)
log.Infow("signer starts to make deposit into GVG", "scope", scope)
if msg == nil {
log.CtxError(ctx, "deposit msg pointer dangling")
return "", ErrDanglingPointer
Expand Down Expand Up @@ -863,8 +863,8 @@ func (client *GreenfieldChainSignClient) Deposit(ctx context.Context, scope Sign
nonce, nonceErr = client.getNonceOnChain(ctx, client.greenfieldClients[scope])
if nonceErr != nil {
log.CtxErrorw(ctx, "failed to get operator account nonce", "error", err)
ErrRejectMigrateBucketOnChain.SetError(fmt.Errorf("failed to get operator account nonce, error: %v", err))
return "", ErrRejectMigrateBucketOnChain
ErrDepositOnChain.SetError(fmt.Errorf("failed to get operator account nonce, error: %v", err))
return "", ErrDepositOnChain
}
client.operatorAccNonce = nonce
}
Expand Down Expand Up @@ -916,8 +916,8 @@ func (client *GreenfieldChainSignClient) DeleteGlobalVirtualGroup(ctx context.Co
nonce, nonceErr = client.getNonceOnChain(ctx, client.greenfieldClients[scope])
if nonceErr != nil {
log.CtxErrorw(ctx, "failed to get operator account nonce", "error", err)
ErrRejectMigrateBucketOnChain.SetError(fmt.Errorf("failed to get operator account nonce, error: %v", err))
return "", ErrRejectMigrateBucketOnChain
ErrDeleteGVGOnChain.SetError(fmt.Errorf("failed to get operator account nonce, error: %v", err))
return "", ErrDeleteGVGOnChain
}
client.operatorAccNonce = nonce
}
Expand Down
24 changes: 12 additions & 12 deletions modular/uploader/upload_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
)

var (
RtyAttNum = uint(3)
RtyAttem = retry.Attempts(RtyAttNum)
RtyDelay = retry.Delay(time.Millisecond * 500)
RtyErr = retry.LastErrorOnly(true)
rtyAttNum = uint(3)
rtyAttem = retry.Attempts(rtyAttNum)
rtyDelay = retry.Delay(time.Millisecond * 500)
rtyErr = retry.LastErrorOnly(true)
)

var (
Expand Down Expand Up @@ -105,11 +105,11 @@ func (u *UploadModular) HandleUploadObjectTask(ctx context.Context, uploadObject
metrics.PerfPutObjectTime.WithLabelValues("uploader_put_object_before_report_manager_end").Observe(time.Since(time.Unix(uploadObjectTask.GetCreateTime(), 0)).Seconds())
if err = retry.Do(func() error {
return u.baseApp.GfSpClient().ReportTask(context.Background(), uploadObjectTask)
}, RtyAttem,
RtyDelay,
RtyErr,
}, rtyAttem,
rtyDelay,
rtyErr,
retry.OnRetry(func(n uint, err error) {
log.CtxErrorw(ctx, "failed to report upload object task", "error", err, "attempt", n, "max_attempts", RtyAttNum)
log.CtxErrorw(ctx, "failed to report upload object task", "error", err, "attempt", n, "max_attempts", rtyAttNum)
})); err != nil {
log.CtxErrorw(ctx, "failed to report upload object task", "error", err)
}
Expand Down Expand Up @@ -243,11 +243,11 @@ func (u *UploadModular) HandleResumableUploadObjectTask(ctx context.Context, tas
metrics.PerfPutObjectTime.WithLabelValues("uploader_put_object_before_report_manager_end").Observe(time.Since(time.Unix(task.GetCreateTime(), 0)).Seconds())
if err = retry.Do(func() error {
return u.baseApp.GfSpClient().ReportTask(context.Background(), task)
}, RtyAttem,
RtyDelay,
RtyErr,
}, rtyAttem,
rtyDelay,
rtyErr,
retry.OnRetry(func(n uint, err error) {
log.CtxErrorw(ctx, "failed to report upload object task", "error", err, "attempt", n, "max_attempts", RtyAttNum)
log.CtxErrorw(ctx, "failed to report upload object task", "error", err, "attempt", n, "max_attempts", rtyAttNum)
})); err != nil {
log.CtxErrorw(ctx, "failed to report upload object task", "error", err)
}
Expand Down

0 comments on commit 85f140e

Please sign in to comment.