diff --git a/deployment/localup/localup.sh b/deployment/localup/localup.sh index 007218664..9218c8139 100644 --- a/deployment/localup/localup.sh +++ b/deployment/localup/localup.sh @@ -157,7 +157,7 @@ function make_config() { sed -i -e "s/SubscribeBucketMigrateEventIntervalMillisecond = .*/SubscribeBucketMigrateEventIntervalMillisecond = 20/g" config.toml sed -i -e "s/GVGPreferSPList = \[\]/GVGPreferSPList = \[1,2,3,4,5,6,7,8\]/g" config.toml sed -i -e "s/GCZombieEnabled = .*/GCZombieEnabled = true/g" config.toml - sed -i -e "s/gcMetaEnabled = .*/gcMetaEnabled = true/g" config.toml + sed -i -e "s/GCMetaEnabled = .*/GCMetaEnabled = true/g" config.toml echo "succeed to generate config.toml in "${sp_dir} cd - >/dev/null diff --git a/modular/executor/execute_task.go b/modular/executor/execute_task.go index 86856417f..f192830aa 100644 --- a/modular/executor/execute_task.go +++ b/modular/executor/execute_task.go @@ -18,6 +18,7 @@ import ( "github.com/bnb-chain/greenfield-common/go/redundancy" "github.com/bnb-chain/greenfield-storage-provider/base/types/gfsperrors" "github.com/bnb-chain/greenfield-storage-provider/core/module" + "github.com/bnb-chain/greenfield-storage-provider/core/piecestore" "github.com/bnb-chain/greenfield-storage-provider/core/spdb" corespdb "github.com/bnb-chain/greenfield-storage-provider/core/spdb" coretask "github.com/bnb-chain/greenfield-storage-provider/core/task" @@ -80,24 +81,18 @@ func NewGCChecker(e *ExecuteModular) *GCChecker { } } -// deleteObjectPiecesAndIntegrityMetaByObjectInfo used by gcZombiePiece -func (gc *GCChecker) deleteObjectPiecesAndIntegrityMetaByObjectInfo(ctx context.Context, objectInfo *storagetypes.ObjectInfo) error { - gc.deleteObjectPieces(ctx, objectInfo) - // TODO delete integrity meta - return nil -} - // deleteObjectPiecesAndIntegrityMetaByObjectInfo used by gcZombiePiece func (gc *GCChecker) deleteObjectPiecesAndIntegrityMeta(ctx context.Context, integrityMeta *corespdb.IntegrityMeta) error { objID := integrityMeta.ObjectID redundancyIdx := integrityMeta.RedundancyIndex maxSegment := len(integrityMeta.PieceChecksumList) - segmentIdx := uint32(0) - for segmentIndex := 0; segmentIndex <= maxSegment; segmentIndex++ { + // delete object pieces + for segmentIdx := uint32(0); segmentIdx <= uint32(maxSegment); segmentIdx++ { gc.deletePiece(ctx, objID, segmentIdx, redundancyIdx) } + // delete integrity meta _ = gc.e.baseApp.GfSpDB().DeleteObjectIntegrity(objID, redundancyIdx) log.CtxDebugw(ctx, "succeed to delete all object segment and integrity meta", "object_id", objID, "integrity_meta", integrityMeta) @@ -126,25 +121,11 @@ func (gc *GCChecker) deleteObjectPieces(ctx context.Context, objectInfo *storage return nil } -// deletePieceByObjectInfo delete single piece if meta data or chain has object info -func (gc *GCChecker) deletePieceByObjectInfo(ctx context.Context, objectInfo *storagetypes.ObjectInfo, segmentIdx uint32, redundancyIdx int32) { - var pieceKey string - objID := objectInfo.Id.Uint64() - if objectInfo.GetRedundancyType() == storagetypes.REDUNDANCY_EC_TYPE { - pieceKey = gc.e.baseApp.PieceOp().ECPieceKey(objID, segmentIdx, uint32(redundancyIdx)) - } else { - pieceKey = gc.e.baseApp.PieceOp().SegmentPieceKey(objID, segmentIdx) - } - deleteErr := gc.e.baseApp.PieceStore().DeletePiece(ctx, pieceKey) - log.CtxDebugw(ctx, "delete the primary sp pieces", "object_info", objectInfo, - "piece_key", pieceKey, "error", deleteErr) -} - // deletePiece delete single piece if meta data or chain has object info func (gc *GCChecker) deletePiece(ctx context.Context, objID uint64, segmentIdx uint32, redundancyIdx int32) { var pieceKey string - // TODO - if redundancyIdx != -1 { + // TODO -1 means REDUNDANCY_EC_TYPE + if redundancyIdx != piecestore.PrimarySPRedundancyIndex { pieceKey = gc.e.baseApp.PieceOp().ECPieceKey(objID, segmentIdx, uint32(redundancyIdx)) } else { pieceKey = gc.e.baseApp.PieceOp().SegmentPieceKey(objID, segmentIdx) @@ -159,6 +140,8 @@ func (gc *GCChecker) deletePieceAndPieceChecksum(ctx context.Context, piece *spd objID := piece.ObjectID segmentIdx := piece.SegmentIndex redundancyIdx := piece.RedundancyIndex + log.CtxInfow(ctx, "delete piece and piece hash", "object_id", objID, "segmentIdx", segmentIdx, "redundancyIdx", redundancyIdx) + gc.deletePiece(ctx, piece.ObjectID, piece.SegmentIndex, piece.RedundancyIndex) err := gc.e.baseApp.GfSpDB().DeleteReplicatePieceChecksum(objID, segmentIdx, redundancyIdx) if err != nil { @@ -170,18 +153,6 @@ func (gc *GCChecker) deletePieceAndPieceChecksum(ctx context.Context, piece *spd return nil } -func (gc *GCChecker) deletePieceAndPieceHash(ctx context.Context, objectInfo *storagetypes.ObjectInfo, segmentIdx uint32, redundancyIdx int32) error { - log.CtxInfow(ctx, "delete piece and piece hash", "object_info", objectInfo, "segmentIdx", segmentIdx, "redundancyIdx", redundancyIdx) - gc.deletePieceByObjectInfo(ctx, objectInfo, segmentIdx, redundancyIdx) - - err := gc.e.baseApp.GfSpDB().DeleteReplicatePieceChecksum(objectInfo.Id.Uint64(), segmentIdx, redundancyIdx) - if err != nil { - log.Debugf("failed to delete replicate piece checksum", "object", objectInfo) - return err - } - return nil -} - // isAllowGCCheck func (gc *GCChecker) isAllowGCCheck(objectInfo *storagetypes.ObjectInfo, bucketInfo *metadatatypes.Bucket) bool { // object not in sealed status @@ -189,15 +160,16 @@ func (gc *GCChecker) isAllowGCCheck(objectInfo *storagetypes.ObjectInfo, bucketI log.Infow("the object isn't sealed, do not need to gc check") return false } + // bucket migrating if bucketInfo.GetBucketInfo().GetBucketStatus() == storagetypes.BUCKET_STATUS_MIGRATING { log.Infow("bucket is migrating, do not need to gc check") return false } + log.Infow("the object is sealed and the bucket is not migrating, the object can gc", "object", objectInfo, "bucket", bucketInfo) return true } func (gc *GCChecker) getGvgAndSpId(ctx context.Context, objectInfo *storagetypes.ObjectInfo) (*metadatatypes.Bucket, *virtualgrouptypes.GlobalVirtualGroup, uint32, error) { - // bucket migrating bucketInfo, err := gc.e.baseApp.GfSpClient().GetBucketByBucketName(ctx, objectInfo.BucketName, true) if err != nil || bucketInfo == nil { log.Errorw("failed to get bucket by bucket name", "error", err) @@ -221,26 +193,8 @@ func (gc *GCChecker) getGvgAndSpId(ctx context.Context, objectInfo *storagetypes return bucketInfo, gvg, spID, nil } -// checkObjectLocationInfo only return ErrInvalidIntegrity means the object was dislocation -func (gc *GCChecker) checkObjectLocationInfo(ctx context.Context, objectInfo *storagetypes.ObjectInfo) error { - bucketInfo, gvg, spID, err := gc.getGvgAndSpId(ctx, objectInfo) - if err != nil { - return err - } - if !gc.isAllowGCCheck(objectInfo, bucketInfo) { - return nil - } - // gvg primary sp should have integrity meta TODO fix it? - if gvg.GetPrimarySpId() != spID { - // delete - return ErrInvalidIntegrity - } - - return nil -} - -// only return ErrSecondaryMismatch means the piece was dislocation -func (gc *GCChecker) checkPieceLocationInfo(ctx context.Context, objectInfo *storagetypes.ObjectInfo, piece *spdb.GCPieceMeta) error { +// checkGVGMatchSP only return ErrInvalidRedundancyIndex means the piece was dislocation +func (gc *GCChecker) checkGVGMatchSP(ctx context.Context, objectInfo *storagetypes.ObjectInfo, redundancyIndex int32) error { bucketInfo, gvg, spID, err := gc.getGvgAndSpId(ctx, objectInfo) if err != nil { return err @@ -250,11 +204,16 @@ func (gc *GCChecker) checkPieceLocationInfo(ctx context.Context, objectInfo *sto return nil } - if gvg.GetSecondarySpIds()[piece.SegmentIndex] != spID { - // delete TODO error code - log.CtxErrorw(ctx, "failed to confirm receive task, secondary sp mismatch", "expect", - gvg.GetSecondarySpIds()[int(piece.RedundancyIndex)], "current", gc.e.baseApp.OperatorAddress()) - return ErrSecondaryMismatch + if redundancyIndex == piecestore.PrimarySPRedundancyIndex { + if gvg.GetPrimarySpId() != spID { + log.CtxInfow(ctx, "the piece isn't in correct location, will be delete", "redundancy_index", redundancyIndex, "gvg", gvg, "sp_id", spID) + return ErrInvalidRedundancyIndex + } + } else { + if gvg.GetSecondarySpIds()[redundancyIndex] != spID { + log.CtxInfow(ctx, "the piece isn't in correct location, will be delete", "redundancy_index", redundancyIndex, "gvg", gvg, "sp_id", spID) + return ErrInvalidRedundancyIndex + } } return nil } @@ -616,14 +575,15 @@ func (e *ExecuteModular) gcMetaBucketTraffic(ctx context.Context, task coretask. // TODO list buckets when large dataset now := time.Now() daysAgo := now.Add(-time.Duration(e.bucketTrafficKeepLatestDay) * time.Hour * 24) + yearMonth := sqldb.TimeToYearMonth(daysAgo) - err := e.baseApp.GfSpDB().DeleteAllBucketTrafficExpired(sqldb.TimestampYearMonth(daysAgo.Unix())) + err := e.baseApp.GfSpDB().DeleteAllBucketTrafficExpired(yearMonth) if err != nil { log.CtxErrorw(ctx, "failed to delete expired bucket traffic", "error", err) return err } - log.CtxInfow(ctx, "succeed to delete bucket traffic", "task", task, "daysAgo", daysAgo) + log.CtxInfow(ctx, "succeed to delete bucket traffic", "task", task, "days_ago", daysAgo, "year_month", yearMonth) return nil } @@ -696,7 +656,7 @@ func (e *ExecuteModular) HandleGCBucketMigrationBucket(ctx context.Context, task // can delete, verify for _, obj := range objects { objectInfo := obj.GetObject().GetObjectInfo() - if e.gcChecker.checkObjectLocationInfo(ctx, objectInfo) == ErrInvalidIntegrity { + if e.gcChecker.checkGVGMatchSP(ctx, objectInfo, piecestore.PrimarySPRedundancyIndex) == ErrInvalidRedundancyIndex { e.gcChecker.deleteObjectPieces(ctx, objectInfo) log.CtxInfow(ctx, "succeed to delete objects by gvg and bucket for gc", "object", objectInfo, "error", err) } @@ -1113,17 +1073,16 @@ func (e *ExecuteModular) gcZombiePieceFromIntegrityMeta(ctx context.Context, tas } } else { // 2) query metadata error, but chain has the object info, gvg primary sp should has integrity meta - // TODO: refine - if e.gcChecker.checkObjectLocationInfo(ctx, objInfoFromChain) == ErrInvalidIntegrity { - e.gcChecker.deleteObjectPiecesAndIntegrityMetaByObjectInfo(ctx, objInfoFromChain) + if e.gcChecker.checkGVGMatchSP(ctx, objInfoFromChain, integrityObject.RedundancyIndex) == ErrInvalidRedundancyIndex { + e.gcChecker.deleteObjectPiecesAndIntegrityMeta(ctx, integrityObject) } continue } } } else { // check integrity meta & object info - if e.gcChecker.checkObjectLocationInfo(ctx, objInfoFromMetaData) == ErrInvalidIntegrity { - e.gcChecker.deleteObjectPiecesAndIntegrityMetaByObjectInfo(ctx, objInfoFromMetaData) + if e.gcChecker.checkGVGMatchSP(ctx, objInfoFromMetaData, integrityObject.RedundancyIndex) == ErrInvalidRedundancyIndex { + e.gcChecker.deleteObjectPiecesAndIntegrityMeta(ctx, integrityObject) } } } @@ -1170,22 +1129,16 @@ func (e *ExecuteModular) gcZombiePieceFromPieceHash(ctx context.Context, task co } else { // 2) query metadata error, but chain has the object info, gvg primary sp should has integrity meta // TODO: refine - if e.gcChecker.checkPieceLocationInfo(ctx, objInfoFromChain, piece) == ErrSecondaryMismatch { + if e.gcChecker.checkGVGMatchSP(ctx, objInfoFromChain, piece.RedundancyIndex) == ErrInvalidRedundancyIndex { e.gcChecker.deletePieceAndPieceChecksum(ctx, piece) } } } } else { - if e.gcChecker.checkPieceLocationInfo(ctx, objectInfoFromMetadata, piece) == ErrSecondaryMismatch { - // delete piece - // delete piece hash checksum + if e.gcChecker.checkGVGMatchSP(ctx, objectInfoFromMetadata, piece.RedundancyIndex) == ErrInvalidRedundancyIndex { //task.SetError(ErrSecondaryMismatch) // ignore error - err = e.baseApp.GfSpDB().DeleteObjectIntegrity(objID, piece.RedundancyIndex) - if err != nil { - log.CtxError(ctx, "failed to delete integrity") - } - e.gcChecker.deletePieceAndPieceHash(ctx, objectInfoFromMetadata, piece.SegmentIndex, piece.RedundancyIndex) + e.gcChecker.deletePieceAndPieceChecksum(ctx, piece) } } } diff --git a/modular/executor/migrate_task.go b/modular/executor/migrate_task.go index d49ae6c9e..cf61356bc 100644 --- a/modular/executor/migrate_task.go +++ b/modular/executor/migrate_task.go @@ -76,7 +76,6 @@ func (e *ExecuteModular) HandleMigrateGVGTask(ctx context.Context, gvgTask coret } for index, object := range objectList { - time.Sleep(1 * time.Second) if err = e.checkAndTryRenewSig(gvgTask.(*gfsptask.GfSpMigrateGVGTask)); err != nil { log.CtxErrorw(ctx, "failed to check and renew gvg task signature", "gvg_task", gvgTask, "error", err) return diff --git a/modular/manager/bucket_migrate_scheduler.go b/modular/manager/bucket_migrate_scheduler.go index f17cd634a..1d8e778ba 100644 --- a/modular/manager/bucket_migrate_scheduler.go +++ b/modular/manager/bucket_migrate_scheduler.go @@ -689,9 +689,9 @@ func (s *BucketMigrateScheduler) PostMigrateBucket(postMsg *gfsptask.GfSpBucketM return err } log.Debugw("succeed to post migrate bucket quota", "src_sp", srcSPInfo, "postMsg", postMsg, "error", err) - // TODO gc for dest sp + // gc for dest sp if !postMsg.GetFinished() { - // list objects and delete generate a task ? + // generate a gc bucket migration task(list objects and delete) go func() { // src sp should wait meta data <-time.After(10 * time.Second) @@ -718,7 +718,6 @@ func (s *BucketMigrateScheduler) PreMigrateBucket(bucketID uint64, srcSPInfo *sp ) log.Debugw("start to pre migrate bucket", "bucket_id", bucketID) - preMsg := &gfsptask.GfSpBucketMigrationInfo{BucketId: bucketID, Finished: false} preMsg.ExpireTime = time.Now().Unix() + SigExpireTimeSecond signature, err = s.manager.baseApp.GfSpClient().SignBucketMigrationInfo(context.Background(), preMsg)