Skip to content

Commit

Permalink
Merge pull request #1430 from bnb-chain/develop
Browse files Browse the repository at this point in the history
feat: sync develop to master
  • Loading branch information
BarryTong65 authored Jul 22, 2024
2 parents e9144c8 + b1060f1 commit f9f0f47
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 25 deletions.
4 changes: 4 additions & 0 deletions modular/gater/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,7 @@ func ErrNotifySwapOutWithDetail(detail string) *gfsperrors.GfSpError {
func ErrConsensusWithDetail(detail string) *gfsperrors.GfSpError {
return gfsperrors.Register(module.GateModularName, http.StatusInternalServerError, 55001, detail)
}

func ErrConsensusNotFoundWithDetail(detail string) *gfsperrors.GfSpError {
return gfsperrors.Register(module.GateModularName, http.StatusNotFound, 55002, detail)
}
18 changes: 17 additions & 1 deletion modular/gater/object_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,11 @@ func (g *GateModular) downloadObject(w http.ResponseWriter, reqCtx *RequestConte
metrics.PerfGetObjectTimeHistogram.WithLabelValues("get_object_get_object_info_time").Observe(time.Since(getObjectTime).Seconds())
if err != nil {
log.CtxErrorw(reqCtx.Context(), "failed to get object info from consensus", "error", err)
err = ErrConsensusWithDetail("failed to get object info from consensus, object_name: " + reqCtx.objectName + ", bucket_name: " + reqCtx.bucketName + ", error:" + err.Error())
if strings.Contains(err.Error(), "No such object") {
err = ErrConsensusNotFoundWithDetail("failed to get object info from consensus, the object may be deleted. object_name: " + reqCtx.objectName + ", bucket_name: " + reqCtx.bucketName + ", error:" + err.Error())
} else {
err = ErrConsensusWithDetail("failed to get object info from consensus, object_name: " + reqCtx.objectName + ", bucket_name: " + reqCtx.bucketName + ", error:" + err.Error())
}
return err
}

Expand Down Expand Up @@ -1166,6 +1170,10 @@ func (g *GateModular) delegatePutObjectHandler(w http.ResponseWriter, r *http.Re
if err != nil {
return
}
if visibilityInt < 0 || visibilityInt >= int64(len(storagetypes.VisibilityType_value)) {
err = ErrInvalidQuery
return
}
visibility = storagetypes.VisibilityType(visibilityInt)
if visibility == storagetypes.VISIBILITY_TYPE_UNSPECIFIED {
visibility = storagetypes.VISIBILITY_TYPE_INHERIT // set default visibility type
Expand Down Expand Up @@ -1391,6 +1399,10 @@ func (g *GateModular) delegateResumablePutObjectHandler(w http.ResponseWriter, r
if err != nil {
return
}
if visibilityInt < 0 || visibilityInt >= int64(len(storagetypes.VisibilityType_value)) {
err = ErrInvalidQuery
return
}
visibility = storagetypes.VisibilityType(visibilityInt)
if visibility == storagetypes.VISIBILITY_TYPE_UNSPECIFIED {
visibility = storagetypes.VISIBILITY_TYPE_INHERIT // set default visibility type
Expand Down Expand Up @@ -1617,6 +1629,10 @@ func (g *GateModular) delegateCreateFolderHandler(w http.ResponseWriter, r *http
if err != nil {
return
}
if visibilityInt < 0 || visibilityInt >= int64(len(storagetypes.VisibilityType_value)) {
err = ErrInvalidQuery
return
}
visibility = storagetypes.VisibilityType(visibilityInt)
if visibility == storagetypes.VISIBILITY_TYPE_UNSPECIFIED {
visibility = storagetypes.VISIBILITY_TYPE_INHERIT // set default visibility type
Expand Down
54 changes: 33 additions & 21 deletions modular/receiver/receive_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,20 @@ func (r *ReceiveModular) HandleDoneReceivePieceTask(ctx context.Context, task ta
log.CtxErrorw(ctx, "failed to get checksum from db", "task", task, "error", err)
return nil, ErrGfSpDBWithDetail("failed to get checksum from db, error: " + err.Error())
}
// // If it already have integrity data,Avoid repetitive writing db
skipInsertIntegrityMeta := false
if len(pieceChecksums) != int(segmentCount) {
log.CtxErrorw(ctx, "replicate piece unfinished", "task", task)
err = ErrUnfinishedTask
return nil, ErrUnfinishedTask
// Interface idempotent processing. If it already have integrity data, can skip this check
integrityMeta, integrityErr := r.baseApp.GfSpDB().GetObjectIntegrity(task.GetObjectInfo().Id.Uint64(), task.GetRedundancyIdx())
if integrityMeta != nil && integrityErr == nil {
// The checksum is obtained from integrityMeta
pieceChecksums = integrityMeta.PieceChecksumList
skipInsertIntegrityMeta = true
} else {
log.CtxErrorw(ctx, "replicate piece unfinished", "task", task)
err = ErrUnfinishedTask
return nil, ErrUnfinishedTask
}
}

expectedIntegrityHash := task.GetObjectInfo().GetChecksums()[task.GetRedundancyIdx()+1]
Expand All @@ -137,25 +147,27 @@ func (r *ReceiveModular) HandleDoneReceivePieceTask(ctx context.Context, task ta
}

setIntegrityTime := time.Now()
if task.GetObjectInfo().GetIsUpdating() {
integrityMeta := &corespdb.ShadowIntegrityMeta{
ObjectID: task.GetObjectInfo().Id.Uint64(),
RedundancyIndex: task.GetRedundancyIdx(),
IntegrityChecksum: integrityChecksum,
PieceChecksumList: pieceChecksums,
Version: task.GetObjectInfo().GetVersion(),
ObjectSize: task.GetObjectInfo().GetPayloadSize(),
}
err = r.baseApp.GfSpDB().SetShadowObjectIntegrity(integrityMeta)
} else {
integrityMeta := &corespdb.IntegrityMeta{
ObjectID: task.GetObjectInfo().Id.Uint64(),
RedundancyIndex: task.GetRedundancyIdx(),
IntegrityChecksum: integrityChecksum,
PieceChecksumList: pieceChecksums,
ObjectSize: task.GetObjectInfo().GetPayloadSize(),
if !skipInsertIntegrityMeta {
if task.GetObjectInfo().GetIsUpdating() {
integrityMeta := &corespdb.ShadowIntegrityMeta{
ObjectID: task.GetObjectInfo().Id.Uint64(),
RedundancyIndex: task.GetRedundancyIdx(),
IntegrityChecksum: integrityChecksum,
PieceChecksumList: pieceChecksums,
Version: task.GetObjectInfo().GetVersion(),
ObjectSize: task.GetObjectInfo().GetPayloadSize(),
}
err = r.baseApp.GfSpDB().SetShadowObjectIntegrity(integrityMeta)
} else {
integrityMeta := &corespdb.IntegrityMeta{
ObjectID: task.GetObjectInfo().Id.Uint64(),
RedundancyIndex: task.GetRedundancyIdx(),
IntegrityChecksum: integrityChecksum,
PieceChecksumList: pieceChecksums,
ObjectSize: task.GetObjectInfo().GetPayloadSize(),
}
err = r.baseApp.GfSpDB().SetObjectIntegrity(integrityMeta)
}
err = r.baseApp.GfSpDB().SetObjectIntegrity(integrityMeta)
}
metrics.PerfReceivePieceTimeHistogram.WithLabelValues("receive_piece_server_done_set_integrity_time").Observe(time.Since(setIntegrityTime).Seconds())
if err != nil {
Expand Down
9 changes: 6 additions & 3 deletions modular/receiver/receive_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ import (

sdkmath "cosmossdk.io/math"
"github.com/bnb-chain/greenfield-common/go/hash"
storagetypes "github.com/bnb-chain/greenfield/x/storage/types"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
"gorm.io/gorm"

"github.com/bnb-chain/greenfield-storage-provider/base/gfspclient"
"github.com/bnb-chain/greenfield-storage-provider/base/gfsppieceop"
"github.com/bnb-chain/greenfield-storage-provider/base/types/gfsptask"
"github.com/bnb-chain/greenfield-storage-provider/core/piecestore"
"github.com/bnb-chain/greenfield-storage-provider/core/spdb"
"github.com/bnb-chain/greenfield-storage-provider/core/taskqueue"
storagetypes "github.com/bnb-chain/greenfield/x/storage/types"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
)

func TestErrPieceStoreWithDetail(t *testing.T) {
Expand Down Expand Up @@ -236,6 +238,7 @@ func TestHandleDoneReceivePieceTask_PieceCountMismatch(t *testing.T) {
mockSPDB := spdb.NewMockSPDB(ctrl)
r.baseApp.SetGfSpDB(mockSPDB)
mockSPDB.EXPECT().GetAllReplicatePieceChecksumOptimized(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
mockSPDB.EXPECT().GetObjectIntegrity(gomock.Any(), gomock.Any()).Return(nil, gorm.ErrRecordNotFound).AnyTimes()
_, err := r.HandleDoneReceivePieceTask(context.TODO(), mockTask)
assert.NotNil(t, err)
}
Expand Down

0 comments on commit f9f0f47

Please sign in to comment.