Skip to content

Commit

Permalink
feat: gc meta fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
jingjunLi committed Nov 8, 2023
1 parent 2226495 commit 3d180ca
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 85 deletions.
2 changes: 1 addition & 1 deletion deployment/localup/localup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ function make_config() {
sed -i -e "s/SubscribeBucketMigrateEventIntervalMillisecond = .*/SubscribeBucketMigrateEventIntervalMillisecond = 20/g" config.toml
sed -i -e "s/GVGPreferSPList = \[\]/GVGPreferSPList = \[1,2,3,4,5,6,7,8\]/g" config.toml
sed -i -e "s/GCZombieEnabled = .*/GCZombieEnabled = true/g" config.toml
sed -i -e "s/gcMetaEnabled = .*/gcMetaEnabled = true/g" config.toml
sed -i -e "s/GCMetaEnabled = .*/GCMetaEnabled = true/g" config.toml

echo "succeed to generate config.toml in "${sp_dir}
cd - >/dev/null
Expand Down
113 changes: 33 additions & 80 deletions modular/executor/execute_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/bnb-chain/greenfield-common/go/redundancy"
"github.com/bnb-chain/greenfield-storage-provider/base/types/gfsperrors"
"github.com/bnb-chain/greenfield-storage-provider/core/module"
"github.com/bnb-chain/greenfield-storage-provider/core/piecestore"
"github.com/bnb-chain/greenfield-storage-provider/core/spdb"
corespdb "github.com/bnb-chain/greenfield-storage-provider/core/spdb"
coretask "github.com/bnb-chain/greenfield-storage-provider/core/task"
Expand Down Expand Up @@ -80,24 +81,18 @@ func NewGCChecker(e *ExecuteModular) *GCChecker {
}
}

// deleteObjectPiecesAndIntegrityMetaByObjectInfo used by gcZombiePiece
func (gc *GCChecker) deleteObjectPiecesAndIntegrityMetaByObjectInfo(ctx context.Context, objectInfo *storagetypes.ObjectInfo) error {
gc.deleteObjectPieces(ctx, objectInfo)
// TODO delete integrity meta
return nil
}

// deleteObjectPiecesAndIntegrityMetaByObjectInfo used by gcZombiePiece
func (gc *GCChecker) deleteObjectPiecesAndIntegrityMeta(ctx context.Context, integrityMeta *corespdb.IntegrityMeta) error {
objID := integrityMeta.ObjectID
redundancyIdx := integrityMeta.RedundancyIndex
maxSegment := len(integrityMeta.PieceChecksumList)
segmentIdx := uint32(0)

for segmentIndex := 0; segmentIndex <= maxSegment; segmentIndex++ {
// delete object pieces
for segmentIdx := uint32(0); segmentIdx <= uint32(maxSegment); segmentIdx++ {
gc.deletePiece(ctx, objID, segmentIdx, redundancyIdx)
}

// delete integrity meta
_ = gc.e.baseApp.GfSpDB().DeleteObjectIntegrity(objID, redundancyIdx)
log.CtxDebugw(ctx, "succeed to delete all object segment and integrity meta", "object_id", objID, "integrity_meta", integrityMeta)

Expand Down Expand Up @@ -126,25 +121,11 @@ func (gc *GCChecker) deleteObjectPieces(ctx context.Context, objectInfo *storage
return nil
}

// deletePieceByObjectInfo delete single piece if meta data or chain has object info
func (gc *GCChecker) deletePieceByObjectInfo(ctx context.Context, objectInfo *storagetypes.ObjectInfo, segmentIdx uint32, redundancyIdx int32) {
var pieceKey string
objID := objectInfo.Id.Uint64()
if objectInfo.GetRedundancyType() == storagetypes.REDUNDANCY_EC_TYPE {
pieceKey = gc.e.baseApp.PieceOp().ECPieceKey(objID, segmentIdx, uint32(redundancyIdx))
} else {
pieceKey = gc.e.baseApp.PieceOp().SegmentPieceKey(objID, segmentIdx)
}
deleteErr := gc.e.baseApp.PieceStore().DeletePiece(ctx, pieceKey)
log.CtxDebugw(ctx, "delete the primary sp pieces", "object_info", objectInfo,
"piece_key", pieceKey, "error", deleteErr)
}

// deletePiece delete single piece if meta data or chain has object info
func (gc *GCChecker) deletePiece(ctx context.Context, objID uint64, segmentIdx uint32, redundancyIdx int32) {
var pieceKey string
// TODO
if redundancyIdx != -1 {
// TODO -1 means REDUNDANCY_EC_TYPE
if redundancyIdx != piecestore.PrimarySPRedundancyIndex {
pieceKey = gc.e.baseApp.PieceOp().ECPieceKey(objID, segmentIdx, uint32(redundancyIdx))
} else {
pieceKey = gc.e.baseApp.PieceOp().SegmentPieceKey(objID, segmentIdx)
Expand All @@ -159,6 +140,8 @@ func (gc *GCChecker) deletePieceAndPieceChecksum(ctx context.Context, piece *spd
objID := piece.ObjectID
segmentIdx := piece.SegmentIndex
redundancyIdx := piece.RedundancyIndex
log.CtxInfow(ctx, "delete piece and piece hash", "object_id", objID, "segmentIdx", segmentIdx, "redundancyIdx", redundancyIdx)

gc.deletePiece(ctx, piece.ObjectID, piece.SegmentIndex, piece.RedundancyIndex)
err := gc.e.baseApp.GfSpDB().DeleteReplicatePieceChecksum(objID, segmentIdx, redundancyIdx)
if err != nil {
Expand All @@ -170,34 +153,23 @@ func (gc *GCChecker) deletePieceAndPieceChecksum(ctx context.Context, piece *spd
return nil
}

func (gc *GCChecker) deletePieceAndPieceHash(ctx context.Context, objectInfo *storagetypes.ObjectInfo, segmentIdx uint32, redundancyIdx int32) error {
log.CtxInfow(ctx, "delete piece and piece hash", "object_info", objectInfo, "segmentIdx", segmentIdx, "redundancyIdx", redundancyIdx)
gc.deletePieceByObjectInfo(ctx, objectInfo, segmentIdx, redundancyIdx)

err := gc.e.baseApp.GfSpDB().DeleteReplicatePieceChecksum(objectInfo.Id.Uint64(), segmentIdx, redundancyIdx)
if err != nil {
log.Debugf("failed to delete replicate piece checksum", "object", objectInfo)
return err
}
return nil
}

// isAllowGCCheck
func (gc *GCChecker) isAllowGCCheck(objectInfo *storagetypes.ObjectInfo, bucketInfo *metadatatypes.Bucket) bool {
// object not in sealed status
if objectInfo.GetObjectStatus() != storagetypes.OBJECT_STATUS_SEALED {
log.Infow("the object isn't sealed, do not need to gc check")
return false
}
// bucket migrating
if bucketInfo.GetBucketInfo().GetBucketStatus() == storagetypes.BUCKET_STATUS_MIGRATING {
log.Infow("bucket is migrating, do not need to gc check")
return false
}
log.Infow("the object is sealed and the bucket is not migrating, the object can gc", "object", objectInfo, "bucket", bucketInfo)
return true
}

func (gc *GCChecker) getGvgAndSpId(ctx context.Context, objectInfo *storagetypes.ObjectInfo) (*metadatatypes.Bucket, *virtualgrouptypes.GlobalVirtualGroup, uint32, error) {
// bucket migrating
bucketInfo, err := gc.e.baseApp.GfSpClient().GetBucketByBucketName(ctx, objectInfo.BucketName, true)
if err != nil || bucketInfo == nil {
log.Errorw("failed to get bucket by bucket name", "error", err)
Expand All @@ -221,26 +193,8 @@ func (gc *GCChecker) getGvgAndSpId(ctx context.Context, objectInfo *storagetypes
return bucketInfo, gvg, spID, nil
}

// checkObjectLocationInfo only return ErrInvalidIntegrity means the object was dislocation
func (gc *GCChecker) checkObjectLocationInfo(ctx context.Context, objectInfo *storagetypes.ObjectInfo) error {
bucketInfo, gvg, spID, err := gc.getGvgAndSpId(ctx, objectInfo)
if err != nil {
return err
}
if !gc.isAllowGCCheck(objectInfo, bucketInfo) {
return nil
}
// gvg primary sp should have integrity meta TODO fix it?
if gvg.GetPrimarySpId() != spID {
// delete
return ErrInvalidIntegrity
}

return nil
}

// only return ErrSecondaryMismatch means the piece was dislocation
func (gc *GCChecker) checkPieceLocationInfo(ctx context.Context, objectInfo *storagetypes.ObjectInfo, piece *spdb.GCPieceMeta) error {
// checkGVGMatchSP only return ErrInvalidRedundancyIndex means the piece was dislocation
func (gc *GCChecker) checkGVGMatchSP(ctx context.Context, objectInfo *storagetypes.ObjectInfo, redundancyIndex int32) error {
bucketInfo, gvg, spID, err := gc.getGvgAndSpId(ctx, objectInfo)
if err != nil {
return err
Expand All @@ -250,11 +204,16 @@ func (gc *GCChecker) checkPieceLocationInfo(ctx context.Context, objectInfo *sto
return nil
}

if gvg.GetSecondarySpIds()[piece.SegmentIndex] != spID {
// delete TODO error code
log.CtxErrorw(ctx, "failed to confirm receive task, secondary sp mismatch", "expect",
gvg.GetSecondarySpIds()[int(piece.RedundancyIndex)], "current", gc.e.baseApp.OperatorAddress())
return ErrSecondaryMismatch
if redundancyIndex == piecestore.PrimarySPRedundancyIndex {
if gvg.GetPrimarySpId() != spID {
log.CtxInfow(ctx, "the piece isn't in correct location, will be delete", "redundancy_index", redundancyIndex, "gvg", gvg, "sp_id", spID)
return ErrInvalidRedundancyIndex
}
} else {
if gvg.GetSecondarySpIds()[redundancyIndex] != spID {
log.CtxInfow(ctx, "the piece isn't in correct location, will be delete", "redundancy_index", redundancyIndex, "gvg", gvg, "sp_id", spID)
return ErrInvalidRedundancyIndex
}
}
return nil
}
Expand Down Expand Up @@ -616,14 +575,15 @@ func (e *ExecuteModular) gcMetaBucketTraffic(ctx context.Context, task coretask.
// TODO list buckets when large dataset
now := time.Now()
daysAgo := now.Add(-time.Duration(e.bucketTrafficKeepLatestDay) * time.Hour * 24)
yearMonth := sqldb.TimeToYearMonth(daysAgo)

err := e.baseApp.GfSpDB().DeleteAllBucketTrafficExpired(sqldb.TimestampYearMonth(daysAgo.Unix()))
err := e.baseApp.GfSpDB().DeleteAllBucketTrafficExpired(yearMonth)
if err != nil {
log.CtxErrorw(ctx, "failed to delete expired bucket traffic", "error", err)
return err
}

log.CtxInfow(ctx, "succeed to delete bucket traffic", "task", task, "daysAgo", daysAgo)
log.CtxInfow(ctx, "succeed to delete bucket traffic", "task", task, "days_ago", daysAgo, "year_month", yearMonth)
return nil
}

Expand Down Expand Up @@ -696,7 +656,7 @@ func (e *ExecuteModular) HandleGCBucketMigrationBucket(ctx context.Context, task
// can delete, verify
for _, obj := range objects {
objectInfo := obj.GetObject().GetObjectInfo()
if e.gcChecker.checkObjectLocationInfo(ctx, objectInfo) == ErrInvalidIntegrity {
if e.gcChecker.checkGVGMatchSP(ctx, objectInfo, piecestore.PrimarySPRedundancyIndex) == ErrInvalidRedundancyIndex {
e.gcChecker.deleteObjectPieces(ctx, objectInfo)
log.CtxInfow(ctx, "succeed to delete objects by gvg and bucket for gc", "object", objectInfo, "error", err)
}
Expand Down Expand Up @@ -1113,17 +1073,16 @@ func (e *ExecuteModular) gcZombiePieceFromIntegrityMeta(ctx context.Context, tas
}
} else {
// 2) query metadata error, but chain has the object info, gvg primary sp should has integrity meta
// TODO: refine
if e.gcChecker.checkObjectLocationInfo(ctx, objInfoFromChain) == ErrInvalidIntegrity {
e.gcChecker.deleteObjectPiecesAndIntegrityMetaByObjectInfo(ctx, objInfoFromChain)
if e.gcChecker.checkGVGMatchSP(ctx, objInfoFromChain, integrityObject.RedundancyIndex) == ErrInvalidRedundancyIndex {
e.gcChecker.deleteObjectPiecesAndIntegrityMeta(ctx, integrityObject)
}
continue
}
}
} else {
// check integrity meta & object info
if e.gcChecker.checkObjectLocationInfo(ctx, objInfoFromMetaData) == ErrInvalidIntegrity {
e.gcChecker.deleteObjectPiecesAndIntegrityMetaByObjectInfo(ctx, objInfoFromMetaData)
if e.gcChecker.checkGVGMatchSP(ctx, objInfoFromMetaData, integrityObject.RedundancyIndex) == ErrInvalidRedundancyIndex {
e.gcChecker.deleteObjectPiecesAndIntegrityMeta(ctx, integrityObject)
}
}
}
Expand Down Expand Up @@ -1170,22 +1129,16 @@ func (e *ExecuteModular) gcZombiePieceFromPieceHash(ctx context.Context, task co
} else {
// 2) query metadata error, but chain has the object info, gvg primary sp should has integrity meta
// TODO: refine
if e.gcChecker.checkPieceLocationInfo(ctx, objInfoFromChain, piece) == ErrSecondaryMismatch {
if e.gcChecker.checkGVGMatchSP(ctx, objInfoFromChain, piece.RedundancyIndex) == ErrInvalidRedundancyIndex {
e.gcChecker.deletePieceAndPieceChecksum(ctx, piece)
}
}
}
} else {
if e.gcChecker.checkPieceLocationInfo(ctx, objectInfoFromMetadata, piece) == ErrSecondaryMismatch {
// delete piece
// delete piece hash checksum
if e.gcChecker.checkGVGMatchSP(ctx, objectInfoFromMetadata, piece.RedundancyIndex) == ErrInvalidRedundancyIndex {
//task.SetError(ErrSecondaryMismatch)
// ignore error
err = e.baseApp.GfSpDB().DeleteObjectIntegrity(objID, piece.RedundancyIndex)
if err != nil {
log.CtxError(ctx, "failed to delete integrity")
}
e.gcChecker.deletePieceAndPieceHash(ctx, objectInfoFromMetadata, piece.SegmentIndex, piece.RedundancyIndex)
e.gcChecker.deletePieceAndPieceChecksum(ctx, piece)
}
}
}
Expand Down
1 change: 0 additions & 1 deletion modular/executor/migrate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func (e *ExecuteModular) HandleMigrateGVGTask(ctx context.Context, gvgTask coret
}

for index, object := range objectList {
time.Sleep(1 * time.Second)
if err = e.checkAndTryRenewSig(gvgTask.(*gfsptask.GfSpMigrateGVGTask)); err != nil {
log.CtxErrorw(ctx, "failed to check and renew gvg task signature", "gvg_task", gvgTask, "error", err)
return
Expand Down
5 changes: 2 additions & 3 deletions modular/manager/bucket_migrate_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,9 +689,9 @@ func (s *BucketMigrateScheduler) PostMigrateBucket(postMsg *gfsptask.GfSpBucketM
return err
}
log.Debugw("succeed to post migrate bucket quota", "src_sp", srcSPInfo, "postMsg", postMsg, "error", err)
// TODO gc for dest sp
// gc for dest sp
if !postMsg.GetFinished() {
// list objects and delete generate a task ?
// generate a gc bucket migration task(list objects and delete)
go func() {
// src sp should wait meta data
<-time.After(10 * time.Second)
Expand All @@ -718,7 +718,6 @@ func (s *BucketMigrateScheduler) PreMigrateBucket(bucketID uint64, srcSPInfo *sp
)

log.Debugw("start to pre migrate bucket", "bucket_id", bucketID)

preMsg := &gfsptask.GfSpBucketMigrationInfo{BucketId: bucketID, Finished: false}
preMsg.ExpireTime = time.Now().Unix() + SigExpireTimeSecond
signature, err = s.manager.baseApp.GfSpClient().SignBucketMigrationInfo(context.Background(), preMsg)
Expand Down

0 comments on commit 3d180ca

Please sign in to comment.