From ab87a1a089e7d6cacbecf55c8fd917662a53a180 Mon Sep 17 00:00:00 2001 From: "will@2012" <xibaow2020@qq.com> Date: Sun, 3 Dec 2023 09:46:18 +0800 Subject: [PATCH] chore: refine some code --- .github/workflows/e2e-test.yml | 4 +- Makefile | 1 + base/gfspconfig/config.go | 9 +- base/types/gfsptask/approval.go | 8 +- base/types/gfsptask/download.go | 6 +- base/types/gfsptask/gc.go | 6 +- base/types/gfsptask/migrate_gvg.go | 4 +- base/types/gfsptask/recovery.go | 2 +- base/types/gfsptask/task.go | 4 +- base/types/gfsptask/upload.go | 10 +- core/spdb/entity.go | 14 +- core/spdb/spdb.go | 4 +- core/spdb/spdb_mock.go | 112 ++++++++++-- core/task/null_task.go | 2 +- core/task/task.go | 2 +- core/task/task_mock.go | 183 +++----------------- deployment/localup/localup.sh | 2 + modular/authenticator/authenticator.go | 18 +- modular/authenticator/authenticator_test.go | 45 +++-- modular/downloader/download_task.go | 4 +- modular/downloader/download_task_test.go | 26 +-- modular/gater/object_handler.go | 12 ++ modular/manager/manage_task.go | 107 ++++++------ modular/manager/manager.go | 5 +- modular/manager/task_retry_scheduler.go | 127 ++++++++------ modular/uploader/upload_task.go | 17 ++ modular/uploader/uploader_task_test.go | 1 + store/bsdb/database_mock.go | 6 +- store/sqldb/upload_object.go | 52 +++--- store/sqldb/upload_object_schema.go | 2 +- 30 files changed, 411 insertions(+), 384 deletions(-) diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index 44f89396b..c94029879 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -50,7 +50,7 @@ jobs: - name: Build and Start Greenfield Blockchain run: | bash ./test/e2e/spworkflow/e2e_test.sh --startChain -# # Build and Start Greenfield SP + # Build and Start Greenfield SP - name: Build and Start Greenfield SP run: | bash ./test/e2e/spworkflow/e2e_test.sh --startSP @@ -58,7 +58,7 @@ jobs: - name: Build Greenfield Cmd run: | bash ./test/e2e/spworkflow/e2e_test.sh --buildCmd -# # Use Greenfield Cmd Running SP E2E Test + # Use Greenfield Cmd Running SP E2E Test - name: Run Greenfield SP E2E Test run: | bash ./test/e2e/spworkflow/e2e_test.sh --runTest diff --git a/Makefile b/Makefile index c58bdcbc6..b610a6dce 100644 --- a/Makefile +++ b/Makefile @@ -50,6 +50,7 @@ lint: mock-gen: mockgen -source=core/spdb/spdb.go -destination=core/spdb/spdb_mock.go -package=spdb mockgen -source=store/bsdb/database.go -destination=store/bsdb/database_mock.go -package=bsdb + mockgen -source=core/task/task.go -destination=core/task/task_mock.go -package=task # only run unit tests, exclude e2e tests test: diff --git a/base/gfspconfig/config.go b/base/gfspconfig/config.go index ef9fde892..e9adeba10 100644 --- a/base/gfspconfig/config.go +++ b/base/gfspconfig/config.go @@ -276,6 +276,11 @@ type ManagerConfig struct { SubscribeBucketMigrateEventIntervalMillisecond uint `comment:"optional"` GVGPreferSPList []uint32 `comment:"optional"` SPBlackList []uint32 `comment:"optional"` - EnableTaskRetryScheduler bool `comment:"optional"` - RejectUnsealThresholdSecond uint64 `comment:"optional"` + + // EnableTaskRetryScheduler is used to enable task retry scheduler. + EnableTaskRetryScheduler bool `comment:"optional"` + // RejectUnsealThresholdSecond is as the following meanings: + // retry replicate and seal task if the task's create timestamp + RejectUnsealThresholdSecond > now.time() + // retry reject unseal if the task's create timestamp + RejectUnsealThresholdSecond < now.time() && timestamp + 2*RejectUnsealThresholdSecond > now.time() + RejectUnsealThresholdSecond uint64 `comment:"optional"` } diff --git a/base/types/gfsptask/approval.go b/base/types/gfsptask/approval.go index 431796a07..f1987ef35 100644 --- a/base/types/gfsptask/approval.go +++ b/base/types/gfsptask/approval.go @@ -88,7 +88,7 @@ func (m *GfSpCreateBucketApprovalTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpCreateBucketApprovalTask) SetRetry(retry int) { +func (m *GfSpCreateBucketApprovalTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } @@ -230,7 +230,7 @@ func (m *GfSpMigrateBucketApprovalTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpMigrateBucketApprovalTask) SetRetry(retry int) { +func (m *GfSpMigrateBucketApprovalTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } @@ -378,7 +378,7 @@ func (m *GfSpCreateObjectApprovalTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpCreateObjectApprovalTask) SetRetry(retry int) { +func (m *GfSpCreateObjectApprovalTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } @@ -536,7 +536,7 @@ func (m *GfSpReplicatePieceApprovalTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpReplicatePieceApprovalTask) SetRetry(retry int) { +func (m *GfSpReplicatePieceApprovalTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } diff --git a/base/types/gfsptask/download.go b/base/types/gfsptask/download.go index 190795945..add1dc955 100644 --- a/base/types/gfsptask/download.go +++ b/base/types/gfsptask/download.go @@ -94,7 +94,7 @@ func (m *GfSpDownloadObjectTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpDownloadObjectTask) SetRetry(retry int) { +func (m *GfSpDownloadObjectTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } @@ -259,7 +259,7 @@ func (m *GfSpDownloadPieceTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpDownloadPieceTask) SetRetry(retry int) { +func (m *GfSpDownloadPieceTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } @@ -419,7 +419,7 @@ func (m *GfSpChallengePieceTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpChallengePieceTask) SetRetry(retry int) { +func (m *GfSpChallengePieceTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } diff --git a/base/types/gfsptask/gc.go b/base/types/gfsptask/gc.go index 9bebf05e1..165a23a05 100644 --- a/base/types/gfsptask/gc.go +++ b/base/types/gfsptask/gc.go @@ -86,7 +86,7 @@ func (m *GfSpGCObjectTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpGCObjectTask) SetRetry(retry int) { +func (m *GfSpGCObjectTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } @@ -240,7 +240,7 @@ func (m *GfSpGCZombiePieceTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpGCZombiePieceTask) SetRetry(retry int) { +func (m *GfSpGCZombiePieceTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } @@ -375,7 +375,7 @@ func (m *GfSpGCMetaTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpGCMetaTask) SetRetry(retry int) { +func (m *GfSpGCMetaTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } diff --git a/base/types/gfsptask/migrate_gvg.go b/base/types/gfsptask/migrate_gvg.go index fe7820568..c4c5bd2ee 100644 --- a/base/types/gfsptask/migrate_gvg.go +++ b/base/types/gfsptask/migrate_gvg.go @@ -89,7 +89,7 @@ func (m *GfSpMigrateGVGTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpMigrateGVGTask) SetRetry(retry int) { +func (m *GfSpMigrateGVGTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } @@ -306,7 +306,7 @@ func (m *GfSpGCBucketMigrationTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpGCBucketMigrationTask) SetRetry(retry int) { +func (m *GfSpGCBucketMigrationTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } diff --git a/base/types/gfsptask/recovery.go b/base/types/gfsptask/recovery.go index e73252e94..2107a1582 100644 --- a/base/types/gfsptask/recovery.go +++ b/base/types/gfsptask/recovery.go @@ -93,7 +93,7 @@ func (m *GfSpRecoverPieceTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpRecoverPieceTask) SetRetry(retry int) { +func (m *GfSpRecoverPieceTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } diff --git a/base/types/gfsptask/task.go b/base/types/gfsptask/task.go index b51ee7228..7215f5bfd 100644 --- a/base/types/gfsptask/task.go +++ b/base/types/gfsptask/task.go @@ -51,8 +51,8 @@ func (m *GfSpTask) IncRetry() { m.Retry++ } -func (m *GfSpTask) SetRetry(retry int) { - m.Retry = int64(retry) +func (m *GfSpTask) SetRetry(retry int64) { + m.Retry = retry } func (m *GfSpTask) ExceedRetry() bool { diff --git a/base/types/gfsptask/upload.go b/base/types/gfsptask/upload.go index c5fbde520..7177c3c17 100644 --- a/base/types/gfsptask/upload.go +++ b/base/types/gfsptask/upload.go @@ -93,7 +93,7 @@ func (m *GfSpUploadObjectTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpUploadObjectTask) SetRetry(retry int) { +func (m *GfSpUploadObjectTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } @@ -255,7 +255,7 @@ func (m *GfSpResumableUploadObjectTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpResumableUploadObjectTask) SetRetry(retry int) { +func (m *GfSpResumableUploadObjectTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } @@ -405,7 +405,7 @@ func (m *GfSpReplicatePieceTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpReplicatePieceTask) SetRetry(retry int) { +func (m *GfSpReplicatePieceTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } @@ -590,7 +590,7 @@ func (m *GfSpSealObjectTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpSealObjectTask) SetRetry(retry int) { +func (m *GfSpSealObjectTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } @@ -760,7 +760,7 @@ func (m *GfSpReceivePieceTask) IncRetry() { m.GetTask().IncRetry() } -func (m *GfSpReceivePieceTask) SetRetry(retry int) { +func (m *GfSpReceivePieceTask) SetRetry(retry int64) { m.GetTask().SetRetry(retry) } diff --git a/core/spdb/entity.go b/core/spdb/entity.go index 31e4397e5..ebee68cbe 100644 --- a/core/spdb/entity.go +++ b/core/spdb/entity.go @@ -19,13 +19,13 @@ const ( // UploadObjectMeta defines the upload object state and related seal info, etc. type UploadObjectMeta struct { - ObjectID uint64 - TaskState storetypes.TaskState - GlobalVirtualGroupID uint32 - SecondaryEndpoints []string - SecondarySignatures [][]byte - ErrorDescription string - UpdateTimeStamp int64 + ObjectID uint64 + TaskState storetypes.TaskState + GlobalVirtualGroupID uint32 + SecondaryEndpoints []string + SecondarySignatures [][]byte + ErrorDescription string + CreateTimeStampSecond int64 } // GCObjectMeta defines the gc object range progress info. diff --git a/core/spdb/spdb.go b/core/spdb/spdb.go index b42adbe8f..e6d7bfe1f 100644 --- a/core/spdb/spdb.go +++ b/core/spdb/spdb.go @@ -43,9 +43,9 @@ type UploadObjectProgressDB interface { // GetUploadMetasToSealByStartTS queries the replicate_done/seal_doing object to continue seal. // It is used in task retry scheduler. GetUploadMetasToSealByStartTS(limit int, startTimeStamp int64) ([]*UploadObjectMeta, error) - // GetUploadMetasToRejectByRangeTS queries the upload_done/replicate_doing object to reject. + // GetUploadMetasToRejectUnsealByRangeTS queries the upload_done/replicate_doing object to reject. // It is used in task retry scheduler. - GetUploadMetasToRejectByRangeTS(limit int, startTimeStamp int64, endTimeStamp int64) ([]*UploadObjectMeta, error) + GetUploadMetasToRejectUnsealByRangeTS(limit int, startTimeStamp int64, endTimeStamp int64) ([]*UploadObjectMeta, error) // InsertPutEvent inserts a new upload event progress. InsertPutEvent(task coretask.Task) error } diff --git a/core/spdb/spdb_mock.go b/core/spdb/spdb_mock.go index c13e1d60a..57aa737d3 100644 --- a/core/spdb/spdb_mock.go +++ b/core/spdb/spdb_mock.go @@ -1,10 +1,6 @@ // Code generated by MockGen. DO NOT EDIT. // Source: core/spdb/spdb.go -// -// Generated by this command: -// -// mockgen -source=core/spdb/spdb.go -destination=core/spdb/spdb_mock.go -package=spdb -// + // Package spdb is a generated GoMock package. package spdb @@ -443,6 +439,21 @@ func (mr *MockSPDBMockRecorder) GetSpByID(id interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSpByID", reflect.TypeOf((*MockSPDB)(nil).GetSpByID), id) } +// GetUploadMetasToRejectUnsealByRangeTS mocks base method. +func (m *MockSPDB) GetUploadMetasToRejectUnsealByRangeTS(limit int, startTimeStamp, endTimeStamp int64) ([]*UploadObjectMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetUploadMetasToRejectUnsealByRangeTS", limit, startTimeStamp, endTimeStamp) + ret0, _ := ret[0].([]*UploadObjectMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetUploadMetasToRejectUnsealByRangeTS indicates an expected call of GetUploadMetasToRejectUnsealByRangeTS. +func (mr *MockSPDBMockRecorder) GetUploadMetasToRejectUnsealByRangeTS(limit, startTimeStamp, endTimeStamp interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadMetasToRejectUnsealByRangeTS", reflect.TypeOf((*MockSPDB)(nil).GetUploadMetasToRejectUnsealByRangeTS), limit, startTimeStamp, endTimeStamp) +} + // GetUploadMetasToReplicate mocks base method. func (m *MockSPDB) GetUploadMetasToReplicate(limit int, timeout int64) ([]*UploadObjectMeta, error) { m.ctrl.T.Helper() @@ -458,6 +469,21 @@ func (mr *MockSPDBMockRecorder) GetUploadMetasToReplicate(limit, timeout interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadMetasToReplicate", reflect.TypeOf((*MockSPDB)(nil).GetUploadMetasToReplicate), limit, timeout) } +// GetUploadMetasToReplicateByStartTS mocks base method. +func (m *MockSPDB) GetUploadMetasToReplicateByStartTS(limit int, startTimeStamp int64) ([]*UploadObjectMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetUploadMetasToReplicateByStartTS", limit, startTimeStamp) + ret0, _ := ret[0].([]*UploadObjectMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetUploadMetasToReplicateByStartTS indicates an expected call of GetUploadMetasToReplicateByStartTS. +func (mr *MockSPDBMockRecorder) GetUploadMetasToReplicateByStartTS(limit, startTimeStamp interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadMetasToReplicateByStartTS", reflect.TypeOf((*MockSPDB)(nil).GetUploadMetasToReplicateByStartTS), limit, startTimeStamp) +} + // GetUploadMetasToSeal mocks base method. func (m *MockSPDB) GetUploadMetasToSeal(limit int, timeout int64) ([]*UploadObjectMeta, error) { m.ctrl.T.Helper() @@ -473,6 +499,21 @@ func (mr *MockSPDBMockRecorder) GetUploadMetasToSeal(limit, timeout interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadMetasToSeal", reflect.TypeOf((*MockSPDB)(nil).GetUploadMetasToSeal), limit, timeout) } +// GetUploadMetasToSealByStartTS mocks base method. +func (m *MockSPDB) GetUploadMetasToSealByStartTS(limit int, startTimeStamp int64) ([]*UploadObjectMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetUploadMetasToSealByStartTS", limit, startTimeStamp) + ret0, _ := ret[0].([]*UploadObjectMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetUploadMetasToSealByStartTS indicates an expected call of GetUploadMetasToSealByStartTS. +func (mr *MockSPDBMockRecorder) GetUploadMetasToSealByStartTS(limit, startTimeStamp interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadMetasToSealByStartTS", reflect.TypeOf((*MockSPDB)(nil).GetUploadMetasToSealByStartTS), limit, startTimeStamp) +} + // GetUploadState mocks base method. func (m *MockSPDB) GetUploadState(objectID uint64) (types.TaskState, string, error) { m.ctrl.T.Helper() @@ -618,18 +659,18 @@ func (mr *MockSPDBMockRecorder) ListDestSPSwapOutUnits() *gomock.Call { } // ListIntegrityMetaByObjectIDRange mocks base method. -func (m *MockSPDB) ListIntegrityMetaByObjectIDRange(startBlockNumber, endBlockNumber int64, includePrivate bool) ([]*IntegrityMeta, error) { +func (m *MockSPDB) ListIntegrityMetaByObjectIDRange(startObjectID, endObjectID int64, includePrivate bool) ([]*IntegrityMeta, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListIntegrityMetaByObjectIDRange", startBlockNumber, endBlockNumber, includePrivate) + ret := m.ctrl.Call(m, "ListIntegrityMetaByObjectIDRange", startObjectID, endObjectID, includePrivate) ret0, _ := ret[0].([]*IntegrityMeta) ret1, _ := ret[1].(error) return ret0, ret1 } // ListIntegrityMetaByObjectIDRange indicates an expected call of ListIntegrityMetaByObjectIDRange. -func (mr *MockSPDBMockRecorder) ListIntegrityMetaByObjectIDRange(startBlockNumber, endBlockNumber, includePrivate interface{}) *gomock.Call { +func (mr *MockSPDBMockRecorder) ListIntegrityMetaByObjectIDRange(startObjectID, endObjectID, includePrivate interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListIntegrityMetaByObjectIDRange", reflect.TypeOf((*MockSPDB)(nil).ListIntegrityMetaByObjectIDRange), startBlockNumber, endBlockNumber, includePrivate) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListIntegrityMetaByObjectIDRange", reflect.TypeOf((*MockSPDB)(nil).ListIntegrityMetaByObjectIDRange), startObjectID, endObjectID, includePrivate) } // ListMigrateGVGUnitsByBucketID mocks base method. @@ -1026,6 +1067,21 @@ func (mr *MockUploadObjectProgressDBMockRecorder) DeleteUploadProgress(objectID return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteUploadProgress", reflect.TypeOf((*MockUploadObjectProgressDB)(nil).DeleteUploadProgress), objectID) } +// GetUploadMetasToRejectUnsealByRangeTS mocks base method. +func (m *MockUploadObjectProgressDB) GetUploadMetasToRejectUnsealByRangeTS(limit int, startTimeStamp, endTimeStamp int64) ([]*UploadObjectMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetUploadMetasToRejectUnsealByRangeTS", limit, startTimeStamp, endTimeStamp) + ret0, _ := ret[0].([]*UploadObjectMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetUploadMetasToRejectUnsealByRangeTS indicates an expected call of GetUploadMetasToRejectUnsealByRangeTS. +func (mr *MockUploadObjectProgressDBMockRecorder) GetUploadMetasToRejectUnsealByRangeTS(limit, startTimeStamp, endTimeStamp interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadMetasToRejectUnsealByRangeTS", reflect.TypeOf((*MockUploadObjectProgressDB)(nil).GetUploadMetasToRejectUnsealByRangeTS), limit, startTimeStamp, endTimeStamp) +} + // GetUploadMetasToReplicate mocks base method. func (m *MockUploadObjectProgressDB) GetUploadMetasToReplicate(limit int, timeout int64) ([]*UploadObjectMeta, error) { m.ctrl.T.Helper() @@ -1041,6 +1097,21 @@ func (mr *MockUploadObjectProgressDBMockRecorder) GetUploadMetasToReplicate(limi return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadMetasToReplicate", reflect.TypeOf((*MockUploadObjectProgressDB)(nil).GetUploadMetasToReplicate), limit, timeout) } +// GetUploadMetasToReplicateByStartTS mocks base method. +func (m *MockUploadObjectProgressDB) GetUploadMetasToReplicateByStartTS(limit int, startTimeStamp int64) ([]*UploadObjectMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetUploadMetasToReplicateByStartTS", limit, startTimeStamp) + ret0, _ := ret[0].([]*UploadObjectMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetUploadMetasToReplicateByStartTS indicates an expected call of GetUploadMetasToReplicateByStartTS. +func (mr *MockUploadObjectProgressDBMockRecorder) GetUploadMetasToReplicateByStartTS(limit, startTimeStamp interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadMetasToReplicateByStartTS", reflect.TypeOf((*MockUploadObjectProgressDB)(nil).GetUploadMetasToReplicateByStartTS), limit, startTimeStamp) +} + // GetUploadMetasToSeal mocks base method. func (m *MockUploadObjectProgressDB) GetUploadMetasToSeal(limit int, timeout int64) ([]*UploadObjectMeta, error) { m.ctrl.T.Helper() @@ -1056,6 +1127,21 @@ func (mr *MockUploadObjectProgressDBMockRecorder) GetUploadMetasToSeal(limit, ti return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadMetasToSeal", reflect.TypeOf((*MockUploadObjectProgressDB)(nil).GetUploadMetasToSeal), limit, timeout) } +// GetUploadMetasToSealByStartTS mocks base method. +func (m *MockUploadObjectProgressDB) GetUploadMetasToSealByStartTS(limit int, startTimeStamp int64) ([]*UploadObjectMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetUploadMetasToSealByStartTS", limit, startTimeStamp) + ret0, _ := ret[0].([]*UploadObjectMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetUploadMetasToSealByStartTS indicates an expected call of GetUploadMetasToSealByStartTS. +func (mr *MockUploadObjectProgressDBMockRecorder) GetUploadMetasToSealByStartTS(limit, startTimeStamp interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadMetasToSealByStartTS", reflect.TypeOf((*MockUploadObjectProgressDB)(nil).GetUploadMetasToSealByStartTS), limit, startTimeStamp) +} + // GetUploadState mocks base method. func (m *MockUploadObjectProgressDB) GetUploadState(objectID uint64) (types.TaskState, string, error) { m.ctrl.T.Helper() @@ -1319,18 +1405,18 @@ func (mr *MockSignatureDBMockRecorder) GetObjectIntegrity(objectID, redundancyIn } // ListIntegrityMetaByObjectIDRange mocks base method. -func (m *MockSignatureDB) ListIntegrityMetaByObjectIDRange(startBlockNumber, endBlockNumber int64, includePrivate bool) ([]*IntegrityMeta, error) { +func (m *MockSignatureDB) ListIntegrityMetaByObjectIDRange(startObjectID, endObjectID int64, includePrivate bool) ([]*IntegrityMeta, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListIntegrityMetaByObjectIDRange", startBlockNumber, endBlockNumber, includePrivate) + ret := m.ctrl.Call(m, "ListIntegrityMetaByObjectIDRange", startObjectID, endObjectID, includePrivate) ret0, _ := ret[0].([]*IntegrityMeta) ret1, _ := ret[1].(error) return ret0, ret1 } // ListIntegrityMetaByObjectIDRange indicates an expected call of ListIntegrityMetaByObjectIDRange. -func (mr *MockSignatureDBMockRecorder) ListIntegrityMetaByObjectIDRange(startBlockNumber, endBlockNumber, includePrivate interface{}) *gomock.Call { +func (mr *MockSignatureDBMockRecorder) ListIntegrityMetaByObjectIDRange(startObjectID, endObjectID, includePrivate interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListIntegrityMetaByObjectIDRange", reflect.TypeOf((*MockSignatureDB)(nil).ListIntegrityMetaByObjectIDRange), startBlockNumber, endBlockNumber, includePrivate) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListIntegrityMetaByObjectIDRange", reflect.TypeOf((*MockSignatureDB)(nil).ListIntegrityMetaByObjectIDRange), startObjectID, endObjectID, includePrivate) } // ListReplicatePieceChecksumByObjectIDRange mocks base method. diff --git a/core/task/null_task.go b/core/task/null_task.go index d53c28c39..20e250a61 100644 --- a/core/task/null_task.go +++ b/core/task/null_task.go @@ -43,7 +43,7 @@ func (*NullTask) Expired() bool { return false } func (*NullTask) ExceedTimeout() bool { return false } func (*NullTask) GetPriority() TPriority { return 0 } func (*NullTask) SetPriority(TPriority) {} -func (*NullTask) SetRetry(int) {} +func (*NullTask) SetRetry(int64) {} func (*NullTask) IncRetry() {} func (*NullTask) ExceedRetry() bool { return false } func (*NullTask) GetRetry() int64 { return 0 } diff --git a/core/task/task.go b/core/task/task.go index 0964e8bc2..930c1500d 100644 --- a/core/task/task.go +++ b/core/task/task.go @@ -55,7 +55,7 @@ type Task interface { // GetRetry returns the retry counter of the task. GetRetry() int64 // SetRetry sets the retry counter of the task. - SetRetry(int) + SetRetry(int64) // IncRetry increases the retry counter of the task. Each task has the max retry // times, if retry counter exceed the max retry, the task should be canceled. IncRetry() diff --git a/core/task/task_mock.go b/core/task/task_mock.go index 5fd2e2dd2..a27e6f0d2 100644 --- a/core/task/task_mock.go +++ b/core/task/task_mock.go @@ -1,10 +1,6 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: ./task.go -// -// Generated by this command: -// -// mockgen -source=./task.go -destination=./task_mock.go -package=task -// +// Source: core/task/task.go + // Package task is a generated GoMock package. package task @@ -362,7 +358,7 @@ func (mr *MockTaskMockRecorder) SetPriority(arg0 interface{}) *gomock.Call { } // SetRetry mocks base method. -func (m *MockTask) SetRetry(arg0 int) { +func (m *MockTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -793,7 +789,7 @@ func (mr *MockApprovalTaskMockRecorder) SetPriority(arg0 interface{}) *gomock.Ca } // SetRetry mocks base method. -func (m *MockApprovalTask) SetRetry(arg0 int) { +func (m *MockApprovalTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -1262,7 +1258,7 @@ func (mr *MockApprovalCreateBucketTaskMockRecorder) SetPriority(arg0 interface{} } // SetRetry mocks base method. -func (m *MockApprovalCreateBucketTask) SetRetry(arg0 int) { +func (m *MockApprovalCreateBucketTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -1731,7 +1727,7 @@ func (mr *MockApprovalMigrateBucketTaskMockRecorder) SetPriority(arg0 interface{ } // SetRetry mocks base method. -func (m *MockApprovalMigrateBucketTask) SetRetry(arg0 int) { +func (m *MockApprovalMigrateBucketTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -2200,7 +2196,7 @@ func (mr *MockApprovalCreateObjectTaskMockRecorder) SetPriority(arg0 interface{} } // SetRetry mocks base method. -func (m *MockApprovalCreateObjectTask) SetRetry(arg0 int) { +func (m *MockApprovalCreateObjectTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -2853,7 +2849,7 @@ func (mr *MockApprovalReplicatePieceTaskMockRecorder) SetPriority(arg0 interface } // SetRetry mocks base method. -func (m *MockApprovalReplicatePieceTask) SetRetry(arg0 int) { +func (m *MockApprovalReplicatePieceTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -3310,7 +3306,7 @@ func (mr *MockObjectTaskMockRecorder) SetPriority(arg0 interface{}) *gomock.Call } // SetRetry mocks base method. -func (m *MockObjectTask) SetRetry(arg0 int) { +func (m *MockObjectTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -3793,7 +3789,7 @@ func (mr *MockUploadObjectTaskMockRecorder) SetPriority(arg0 interface{}) *gomoc } // SetRetry mocks base method. -func (m *MockUploadObjectTask) SetRetry(arg0 int) { +func (m *MockUploadObjectTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -4328,7 +4324,7 @@ func (mr *MockResumableUploadObjectTaskMockRecorder) SetResumeOffset(offset inte } // SetRetry mocks base method. -func (m *MockResumableUploadObjectTask) SetRetry(arg0 int) { +func (m *MockResumableUploadObjectTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -4893,7 +4889,7 @@ func (mr *MockReplicatePieceTaskMockRecorder) SetPriority(arg0 interface{}) *gom } // SetRetry mocks base method. -func (m *MockReplicatePieceTask) SetRetry(arg0 int) { +func (m *MockReplicatePieceTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -5610,7 +5606,7 @@ func (mr *MockReceivePieceTaskMockRecorder) SetRedundancyIdx(arg0 interface{}) * } // SetRetry mocks base method. -func (m *MockReceivePieceTask) SetRetry(arg0 int) { +func (m *MockReceivePieceTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -6157,7 +6153,7 @@ func (mr *MockSealObjectTaskMockRecorder) SetPriority(arg0 interface{}) *gomock. } // SetRetry mocks base method. -func (m *MockSealObjectTask) SetRetry(arg0 int) { +func (m *MockSealObjectTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -6694,7 +6690,7 @@ func (mr *MockDownloadObjectTaskMockRecorder) SetPriority(arg0 interface{}) *gom } // SetRetry mocks base method. -func (m *MockDownloadObjectTask) SetRetry(arg0 int) { +func (m *MockDownloadObjectTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -7273,7 +7269,7 @@ func (mr *MockDownloadPieceTaskMockRecorder) SetPriority(arg0 interface{}) *gomo } // SetRetry mocks base method. -func (m *MockDownloadPieceTask) SetRetry(arg0 int) { +func (m *MockDownloadPieceTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -7886,7 +7882,7 @@ func (mr *MockChallengePieceTaskMockRecorder) SetRedundancyIdx(idx interface{}) } // SetRetry mocks base method. -func (m *MockChallengePieceTask) SetRetry(arg0 int) { +func (m *MockChallengePieceTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -8315,7 +8311,7 @@ func (mr *MockGCTaskMockRecorder) SetPriority(arg0 interface{}) *gomock.Call { } // SetRetry mocks base method. -func (m *MockGCTask) SetRetry(arg0 int) { +func (m *MockGCTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -8851,7 +8847,7 @@ func (mr *MockGCObjectTaskMockRecorder) SetPriority(arg0 interface{}) *gomock.Ca } // SetRetry mocks base method. -func (m *MockGCObjectTask) SetRetry(arg0 int) { +func (m *MockGCObjectTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -9057,34 +9053,6 @@ func (mr *MockGCZombiePieceTaskMockRecorder) GetCreateTime() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCreateTime", reflect.TypeOf((*MockGCZombiePieceTask)(nil).GetCreateTime)) } -// GetCurrentBlockNumber mocks base method. -func (m *MockGCZombiePieceTask) GetCurrentBlockNumber() uint64 { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetCurrentBlockNumber") - ret0, _ := ret[0].(uint64) - return ret0 -} - -// GetCurrentBlockNumber indicates an expected call of GetCurrentBlockNumber. -func (mr *MockGCZombiePieceTaskMockRecorder) GetCurrentBlockNumber() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCurrentBlockNumber", reflect.TypeOf((*MockGCZombiePieceTask)(nil).GetCurrentBlockNumber)) -} - -// GetEndBlockNumber mocks base method. -func (m *MockGCZombiePieceTask) GetEndBlockNumber() uint64 { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetEndBlockNumber") - ret0, _ := ret[0].(uint64) - return ret0 -} - -// GetEndBlockNumber indicates an expected call of GetEndBlockNumber. -func (mr *MockGCZombiePieceTaskMockRecorder) GetEndBlockNumber() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEndBlockNumber", reflect.TypeOf((*MockGCZombiePieceTask)(nil).GetEndBlockNumber)) -} - // GetEndObjectId mocks base method. func (m *MockGCZombiePieceTask) GetEndObjectId() uint64 { m.ctrl.T.Helper() @@ -9099,35 +9067,6 @@ func (mr *MockGCZombiePieceTaskMockRecorder) GetEndObjectId() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEndObjectId", reflect.TypeOf((*MockGCZombiePieceTask)(nil).GetEndObjectId)) } -// GetGCObjectProgress mocks base method. -func (m *MockGCZombiePieceTask) GetGCObjectProgress() (uint64, uint64) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetGCObjectProgress") - ret0, _ := ret[0].(uint64) - ret1, _ := ret[1].(uint64) - return ret0, ret1 -} - -// GetGCObjectProgress indicates an expected call of GetGCObjectProgress. -func (mr *MockGCZombiePieceTaskMockRecorder) GetGCObjectProgress() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetGCObjectProgress", reflect.TypeOf((*MockGCZombiePieceTask)(nil).GetGCObjectProgress)) -} - -// GetLastDeletedObjectId mocks base method. -func (m *MockGCZombiePieceTask) GetLastDeletedObjectId() uint64 { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetLastDeletedObjectId") - ret0, _ := ret[0].(uint64) - return ret0 -} - -// GetLastDeletedObjectId indicates an expected call of GetLastDeletedObjectId. -func (mr *MockGCZombiePieceTaskMockRecorder) GetLastDeletedObjectId() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLastDeletedObjectId", reflect.TypeOf((*MockGCZombiePieceTask)(nil).GetLastDeletedObjectId)) -} - // GetLogs mocks base method. func (m *MockGCZombiePieceTask) GetLogs() string { m.ctrl.T.Helper() @@ -9184,20 +9123,6 @@ func (mr *MockGCZombiePieceTaskMockRecorder) GetRetry() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRetry", reflect.TypeOf((*MockGCZombiePieceTask)(nil).GetRetry)) } -// GetStartBlockNumber mocks base method. -func (m *MockGCZombiePieceTask) GetStartBlockNumber() uint64 { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetStartBlockNumber") - ret0, _ := ret[0].(uint64) - return ret0 -} - -// GetStartBlockNumber indicates an expected call of GetStartBlockNumber. -func (mr *MockGCZombiePieceTaskMockRecorder) GetStartBlockNumber() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStartBlockNumber", reflect.TypeOf((*MockGCZombiePieceTask)(nil).GetStartBlockNumber)) -} - // GetStartObjectId mocks base method. func (m *MockGCZombiePieceTask) GetStartObjectId() uint64 { m.ctrl.T.Helper() @@ -9330,30 +9255,6 @@ func (mr *MockGCZombiePieceTaskMockRecorder) SetCreateTime(arg0 interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetCreateTime", reflect.TypeOf((*MockGCZombiePieceTask)(nil).SetCreateTime), arg0) } -// SetCurrentBlockNumber mocks base method. -func (m *MockGCZombiePieceTask) SetCurrentBlockNumber(arg0 uint64) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetCurrentBlockNumber", arg0) -} - -// SetCurrentBlockNumber indicates an expected call of SetCurrentBlockNumber. -func (mr *MockGCZombiePieceTaskMockRecorder) SetCurrentBlockNumber(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetCurrentBlockNumber", reflect.TypeOf((*MockGCZombiePieceTask)(nil).SetCurrentBlockNumber), arg0) -} - -// SetEndBlockNumber mocks base method. -func (m *MockGCZombiePieceTask) SetEndBlockNumber(arg0 uint64) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetEndBlockNumber", arg0) -} - -// SetEndBlockNumber indicates an expected call of SetEndBlockNumber. -func (mr *MockGCZombiePieceTaskMockRecorder) SetEndBlockNumber(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetEndBlockNumber", reflect.TypeOf((*MockGCZombiePieceTask)(nil).SetEndBlockNumber), arg0) -} - // SetEndObjectID mocks base method. func (m *MockGCZombiePieceTask) SetEndObjectID(arg0 uint64) { m.ctrl.T.Helper() @@ -9378,30 +9279,6 @@ func (mr *MockGCZombiePieceTaskMockRecorder) SetError(arg0 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetError", reflect.TypeOf((*MockGCZombiePieceTask)(nil).SetError), arg0) } -// SetGCObjectProgress mocks base method. -func (m *MockGCZombiePieceTask) SetGCObjectProgress(arg0, arg1 uint64) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetGCObjectProgress", arg0, arg1) -} - -// SetGCObjectProgress indicates an expected call of SetGCObjectProgress. -func (mr *MockGCZombiePieceTaskMockRecorder) SetGCObjectProgress(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetGCObjectProgress", reflect.TypeOf((*MockGCZombiePieceTask)(nil).SetGCObjectProgress), arg0, arg1) -} - -// SetLastDeletedObjectId mocks base method. -func (m *MockGCZombiePieceTask) SetLastDeletedObjectId(arg0 uint64) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetLastDeletedObjectId", arg0) -} - -// SetLastDeletedObjectId indicates an expected call of SetLastDeletedObjectId. -func (mr *MockGCZombiePieceTaskMockRecorder) SetLastDeletedObjectId(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastDeletedObjectId", reflect.TypeOf((*MockGCZombiePieceTask)(nil).SetLastDeletedObjectId), arg0) -} - // SetLogs mocks base method. func (m *MockGCZombiePieceTask) SetLogs(logs string) { m.ctrl.T.Helper() @@ -9439,7 +9316,7 @@ func (mr *MockGCZombiePieceTaskMockRecorder) SetPriority(arg0 interface{}) *gomo } // SetRetry mocks base method. -func (m *MockGCZombiePieceTask) SetRetry(arg0 int) { +func (m *MockGCZombiePieceTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -9450,18 +9327,6 @@ func (mr *MockGCZombiePieceTaskMockRecorder) SetRetry(arg0 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetRetry", reflect.TypeOf((*MockGCZombiePieceTask)(nil).SetRetry), arg0) } -// SetStartBlockNumber mocks base method. -func (m *MockGCZombiePieceTask) SetStartBlockNumber(arg0 uint64) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetStartBlockNumber", arg0) -} - -// SetStartBlockNumber indicates an expected call of SetStartBlockNumber. -func (mr *MockGCZombiePieceTaskMockRecorder) SetStartBlockNumber(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetStartBlockNumber", reflect.TypeOf((*MockGCZombiePieceTask)(nil).SetStartBlockNumber), arg0) -} - // SetStartObjectID mocks base method. func (m *MockGCZombiePieceTask) SetStartObjectID(arg0 uint64) { m.ctrl.T.Helper() @@ -9895,7 +9760,7 @@ func (mr *MockGCMetaTaskMockRecorder) SetPriority(arg0 interface{}) *gomock.Call } // SetRetry mocks base method. -func (m *MockGCMetaTask) SetRetry(arg0 int) { +func (m *MockGCMetaTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -10434,7 +10299,7 @@ func (mr *MockRecoveryPieceTaskMockRecorder) SetRecoverDone() *gomock.Call { } // SetRetry mocks base method. -func (m *MockRecoveryPieceTask) SetRetry(arg0 int) { +func (m *MockRecoveryPieceTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -11073,7 +10938,7 @@ func (mr *MockMigrateGVGTaskMockRecorder) SetRedundancyIdx(arg0 interface{}) *go } // SetRetry mocks base method. -func (m *MockMigrateGVGTask) SetRetry(arg0 int) { +func (m *MockMigrateGVGTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } @@ -11552,7 +11417,7 @@ func (mr *MockGCBucketMigrationTaskMockRecorder) SetPriority(arg0 interface{}) * } // SetRetry mocks base method. -func (m *MockGCBucketMigrationTask) SetRetry(arg0 int) { +func (m *MockGCBucketMigrationTask) SetRetry(arg0 int64) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetRetry", arg0) } diff --git a/deployment/localup/localup.sh b/deployment/localup/localup.sh index b1e2cfbbe..26bbe3ea6 100644 --- a/deployment/localup/localup.sh +++ b/deployment/localup/localup.sh @@ -162,6 +162,8 @@ function make_config() { sed -i -e "s/GCZombiePieceTimeInterval = .*/GCZombiePieceTimeInterval = 3/g" config.toml sed -i -e "s/GCZombieSafeObjectIDDistance = .*/GCZombieSafeObjectIDDistance = 1/g" config.toml sed -i -e "s/GCZombiePieceObjectIDInterval = .*/GCZombiePieceObjectIDInterval = 5/g" config.toml + sed -i -e "s/EnableTaskRetryScheduler = .*/EnableTaskRetryScheduler = true/g" config.toml + sed -i -e "s/RejectUnsealThresholdSecond = .*/RejectUnsealThresholdSecond = 600/g" config.toml echo "succeed to generate config.toml in "${sp_dir} cd - >/dev/null diff --git a/modular/authenticator/authenticator.go b/modular/authenticator/authenticator.go index 21bdfb8ae..a2fc78dac 100644 --- a/modular/authenticator/authenticator.go +++ b/modular/authenticator/authenticator.go @@ -256,7 +256,7 @@ func (a *AuthenticationModular) VerifyAuthentication( return allow, nil case coremodule.AuthOpTypeGetUploadingState: queryTime := time.Now() - bucketInfo, objectInfo, err := a.baseApp.Consensus().QueryBucketInfoAndObjectInfo(ctx, bucket, object) + bucketInfo, _, err := a.baseApp.Consensus().QueryBucketInfoAndObjectInfo(ctx, bucket, object) metrics.PerfAuthTimeHistogram.WithLabelValues("auth_server_get_object_process_query_bucket_object_time").Observe(time.Since(queryTime).Seconds()) if err != nil { log.CtxErrorw(ctx, "failed to get bucket and object info from consensus", "error", err) @@ -282,18 +282,7 @@ func (a *AuthenticationModular) VerifyAuthentication( "expected_sp_id", bucketSPID) return false, ErrMismatchSp } - if objectInfo.GetObjectStatus() != storagetypes.OBJECT_STATUS_CREATED { - log.CtxErrorw(ctx, "object state should be OBJECT_STATUS_CREATED", "state", objectInfo.GetObjectStatus()) - return false, ErrUnexpectedObjectStatusWithDetail(objectInfo.ObjectName, storagetypes.OBJECT_STATUS_CREATED, objectInfo.GetObjectStatus()) - } - permissionTime := time.Now() - allow, err := a.baseApp.Consensus().VerifyPutObjectPermission(ctx, account, bucket, object) - metrics.PerfAuthTimeHistogram.WithLabelValues("auth_server_get_object_process_verify_permission_time").Observe(time.Since(permissionTime).Seconds()) - if err != nil { - log.CtxErrorw(ctx, "failed to verify put object permission from consensus", "error", err) - return false, err - } - return allow, nil + return true, nil case coremodule.AuthOpTypeGetObject: queryTime := time.Now() bucketInfo, objectInfo, err := a.baseApp.Consensus().QueryBucketInfoAndObjectInfo(ctx, bucket, object) @@ -322,7 +311,8 @@ func (a *AuthenticationModular) VerifyAuthentication( "expected_sp_id", bucketSPID) return false, ErrMismatchSp } - if objectInfo.GetObjectStatus() != storagetypes.OBJECT_STATUS_SEALED || objectInfo.GetObjectStatus() != storagetypes.OBJECT_STATUS_CREATED { + if objectInfo.GetObjectStatus() != storagetypes.OBJECT_STATUS_SEALED && + objectInfo.GetObjectStatus() != storagetypes.OBJECT_STATUS_CREATED { log.CtxErrorw(ctx, "object state is not sealed or created", "state", objectInfo.GetObjectStatus()) return false, ErrNoPermission } diff --git a/modular/authenticator/authenticator_test.go b/modular/authenticator/authenticator_test.go index b39204e51..906ba4b40 100644 --- a/modular/authenticator/authenticator_test.go +++ b/modular/authenticator/authenticator_test.go @@ -4,6 +4,9 @@ import ( "context" "encoding/hex" "errors" + "strings" + "testing" + "time" sdkmath "cosmossdk.io/math" "github.com/bnb-chain/greenfield-storage-provider/base/gfspapp" @@ -21,10 +24,6 @@ import ( "google.golang.org/grpc" - "strings" - "testing" - "time" - "github.com/consensys/gnark-crypto/ecc/bn254/fr/mimc" "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/assert" @@ -462,7 +461,9 @@ func VerifyObjectAndBucketAndSPID(t *testing.T, authType coremodule.AuthOpType) _, err = a.VerifyAuthentication(context.Background(), authType, userAddress, "test_bucket", "test_object") assert.Equal(t, ErrMismatchSp, err) } -func VerifyAuthPutObjectAndGetUploadingState(t *testing.T, authType coremodule.AuthOpType) { + +func Test_VerifyAuth_PutObject(t *testing.T) { + authType := coremodule.AuthOpTypePutObject VerifyObjectAndBucketAndSPID(t, authType) a := setup(t) ctrl := gomock.NewController(t) @@ -518,14 +519,33 @@ func VerifyAuthPutObjectAndGetUploadingState(t *testing.T, authType coremodule.A a.baseApp.SetConsensus(mockedConsensus) verifyResult, _ = a.VerifyAuthentication(context.Background(), authType, userAddress, "test_bucket", "test_object") assert.Equal(t, true, verifyResult) -} - -func Test_VerifyAuth_PutObject(t *testing.T) { - VerifyAuthPutObjectAndGetUploadingState(t, coremodule.AuthOpTypePutObject) } func Test_VerifyAuth_GetUploadingState(t *testing.T) { - VerifyAuthPutObjectAndGetUploadingState(t, coremodule.AuthOpTypeGetUploadingState) + authType := coremodule.AuthOpTypeGetUploadingState + VerifyObjectAndBucketAndSPID(t, authType) + a := setup(t) + ctrl := gomock.NewController(t) + m := spdb.NewMockSPDB(ctrl) + a.baseApp.SetGfSpDB(m) + + privateKey, _ := crypto.GenerateKey() + userAddress := crypto.PubkeyToAddress(privateKey.PublicKey).Hex() + + // VerifyPutObjectPermission doesn't have an error + mockedConsensus := consensus.NewMockConsensus(ctrl) + mockedConsensus.EXPECT().QueryBucketInfoAndObjectInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&storagetypes.BucketInfo{}, &storagetypes.ObjectInfo{ + ObjectStatus: storagetypes.OBJECT_STATUS_CREATED, + }, nil).Times(1) + mockedConsensus.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{ + Id: 1, + }, nil).Times(1) + mockedConsensus.EXPECT().QueryVirtualGroupFamily(gomock.Any(), gomock.Any()).Return(&virtualgrouptypes.GlobalVirtualGroupFamily{ + PrimarySpId: 1, + }, nil).Times(1) + a.baseApp.SetConsensus(mockedConsensus) + verifyResult, _ := a.VerifyAuthentication(context.Background(), authType, userAddress, "test_bucket", "test_object") + assert.Equal(t, true, verifyResult) } func Test_VerifyAuth_GetObject(t *testing.T) { @@ -539,7 +559,7 @@ func Test_VerifyAuth_GetObject(t *testing.T) { privateKey, _ := crypto.GenerateKey() userAddress := crypto.PubkeyToAddress(privateKey.PublicKey).Hex() - // ObjectStatus is not OBJECT_STATUS_CREATED + // QueryPaymentStreamRecord get error mockedConsensus := consensus.NewMockConsensus(ctrl) mockedConsensus.EXPECT().QueryBucketInfoAndObjectInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&storagetypes.BucketInfo{}, &storagetypes.ObjectInfo{ ObjectStatus: storagetypes.OBJECT_STATUS_CREATED, @@ -550,9 +570,10 @@ func Test_VerifyAuth_GetObject(t *testing.T) { mockedConsensus.EXPECT().QueryVirtualGroupFamily(gomock.Any(), gomock.Any()).Return(&virtualgrouptypes.GlobalVirtualGroupFamily{ PrimarySpId: 1, }, nil).Times(1) + mockedConsensus.EXPECT().QueryPaymentStreamRecord(gomock.Any(), gomock.Any()).Return(nil, errors.New("error")).Times(1) a.baseApp.SetConsensus(mockedConsensus) _, err := a.VerifyAuthentication(context.Background(), coremodule.AuthOpTypeGetObject, userAddress, "test_bucket", "test_object") - assert.Equal(t, ErrNotSealedState, err) + assert.Equal(t, errors.New("error"), err) // QueryPaymentStreamRecord get error mockedConsensus = consensus.NewMockConsensus(ctrl) diff --git a/modular/downloader/download_task.go b/modular/downloader/download_task.go index 9833a0e40..0c5ccb488 100644 --- a/modular/downloader/download_task.go +++ b/modular/downloader/download_task.go @@ -58,7 +58,7 @@ func (d *DownloadModular) PreDownloadObject(ctx context.Context, downloadObjectT log.CtxErrorw(ctx, "failed pre download object due to pointer dangling") return ErrDanglingPointer } - if downloadObjectTask.GetObjectInfo().GetObjectStatus() != storagetypes.OBJECT_STATUS_SEALED || + if downloadObjectTask.GetObjectInfo().GetObjectStatus() != storagetypes.OBJECT_STATUS_SEALED && downloadObjectTask.GetObjectInfo().GetObjectStatus() != storagetypes.OBJECT_STATUS_CREATED { log.CtxErrorw(ctx, "failed to pre download object due to object is not in sealed or created") return ErrDownloadStatus @@ -265,7 +265,7 @@ func (d *DownloadModular) PreDownloadPiece(ctx context.Context, downloadPieceTas return ErrDanglingPointer } - if downloadPieceTask.GetObjectInfo().GetObjectStatus() != storagetypes.OBJECT_STATUS_SEALED || + if downloadPieceTask.GetObjectInfo().GetObjectStatus() != storagetypes.OBJECT_STATUS_SEALED && downloadPieceTask.GetObjectInfo().GetObjectStatus() != storagetypes.OBJECT_STATUS_CREATED { log.CtxErrorw(ctx, "failed to pre download piece due to object is not in sealed or created") return ErrDownloadStatus diff --git a/modular/downloader/download_task_test.go b/modular/downloader/download_task_test.go index f54669b47..efdb78f5c 100644 --- a/modular/downloader/download_task_test.go +++ b/modular/downloader/download_task_test.go @@ -163,18 +163,20 @@ func TestPreDownloadObject(t *testing.T) { // failed due to object unsealed mockTask1 := &gfsptask.GfSpDownloadObjectTask{ - ObjectInfo: &storagetypes.ObjectInfo{}, + ObjectInfo: &storagetypes.ObjectInfo{ + Id: sdkmath.NewUint(100), + }, StorageParams: &storagetypes.Params{}, } - err = d.PreDownloadObject(context.TODO(), mockTask1) - - assert.NotNil(t, err) - - // failed due to query spdb traffic failed ctrl := gomock.NewController(t) defer ctrl.Finish() mockSPDB := spdb.NewMockSPDB(ctrl) d.baseApp.SetGfSpDB(mockSPDB) + mockSPDB.EXPECT().GetObjectIntegrity(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("fail to get hash")).Times(1) + err = d.PreDownloadObject(context.TODO(), mockTask1) + assert.NotNil(t, err) + + // failed due to query spdb traffic failed mockGRPCAPI := gfspclient.NewMockGfSpClientAPI(ctrl) d.baseApp.SetGfSpClient(mockGRPCAPI) mockGRPCAPI.EXPECT().ReportTask(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() @@ -183,7 +185,6 @@ func TestPreDownloadObject(t *testing.T) { func(ctx context.Context, bucketName string, includePrivate bool, opts ...grpc.DialOption) (*payment_types.StreamRecord, error) { return &payment_types.StreamRecord{}, nil }).AnyTimes() - mockSPDB.EXPECT().GetBucketTraffic(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("failed to get bucket traffic")).Times(1) mockTask2 := &gfsptask.GfSpDownloadObjectTask{ @@ -285,18 +286,19 @@ func TestPreDownloadPiece(t *testing.T) { err := d.PreDownloadPiece(context.TODO(), nil) assert.NotNil(t, err) - // failed due to object unsealed + mockSPDB := spdb.NewMockSPDB(ctrl) + d.baseApp.SetGfSpDB(mockSPDB) + + // failed due to get hash failed + mockSPDB.EXPECT().GetObjectIntegrity(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("fail to get hash")).Times(1) mockTask1 := &gfsptask.GfSpDownloadPieceTask{ - ObjectInfo: &storagetypes.ObjectInfo{}, + ObjectInfo: &storagetypes.ObjectInfo{Id: sdkmath.NewUint(100)}, StorageParams: &storagetypes.Params{}, } err = d.PreDownloadPiece(context.TODO(), mockTask1) assert.NotNil(t, err) // failed due to query spdb traffic failed - mockSPDB := spdb.NewMockSPDB(ctrl) - d.baseApp.SetGfSpDB(mockSPDB) - mockGRPCAPI.EXPECT().GetPaymentByBucketName(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, bucketName string, includePrivate bool, opts ...grpc.DialOption) (*payment_types.StreamRecord, error) { return &payment_types.StreamRecord{}, nil diff --git a/modular/gater/object_handler.go b/modular/gater/object_handler.go index 130f881e0..707c0e4c1 100644 --- a/modular/gater/object_handler.go +++ b/modular/gater/object_handler.go @@ -641,6 +641,7 @@ func (g *GateModular) queryUploadProgressHandler(w http.ResponseWriter, r *http. var ( err error reqCtx *RequestContext + authenticated bool objectInfo *storagetypes.ObjectInfo errDescription string taskStateDescription string @@ -667,6 +668,17 @@ func (g *GateModular) queryUploadProgressHandler(w http.ResponseWriter, r *http. if err != nil { return } + authenticated, err = g.baseApp.GfSpClient().VerifyAuthentication(reqCtx.Context(), + coremodule.AuthOpTypeGetUploadingState, reqCtx.Account(), reqCtx.bucketName, reqCtx.objectName) + if err != nil { + log.CtxErrorw(reqCtx.Context(), "failed to verify authentication", "error", err) + return + } + if !authenticated { + log.CtxErrorw(reqCtx.Context(), "no permission to operate") + err = ErrNoPermission + return + } objectInfo, err = g.baseApp.Consensus().QueryObjectInfo(reqCtx.Context(), reqCtx.bucketName, reqCtx.objectName) if err != nil { diff --git a/modular/manager/manage_task.go b/modular/manager/manage_task.go index 1d2795aa6..18e1acbb5 100644 --- a/modular/manager/manage_task.go +++ b/modular/manager/manage_task.go @@ -10,6 +10,7 @@ import ( "time" "cosmossdk.io/math" + virtualgrouptypes "github.com/bnb-chain/greenfield/x/virtualgroup/types" sdk "github.com/cosmos/cosmos-sdk/types" "golang.org/x/exp/slices" @@ -28,7 +29,6 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/store/types" "github.com/bnb-chain/greenfield-storage-provider/util" storagetypes "github.com/bnb-chain/greenfield/x/storage/types" - virtualgrouptypes "github.com/bnb-chain/greenfield/x/virtualgroup/types" ) var ( @@ -156,6 +156,7 @@ func (m *ManageModular) pickGVGAndReplicate(ctx context.Context, vgfID uint32, t log.Debugw("replicate task info", "task", replicateTask, "gvg_meta", gvgMeta) replicateTask.SetCreateTime(task.GetCreateTime()) replicateTask.SetLogs(task.GetLogs()) + replicateTask.SetRetry(task.GetRetry()) replicateTask.AppendLog("manager-create-replicate-task") err = m.replicateQueue.Push(replicateTask) if err != nil { @@ -165,8 +166,10 @@ func (m *ManageModular) pickGVGAndReplicate(ctx context.Context, vgfID uint32, t go m.backUpTask() go func() { err = m.baseApp.GfSpDB().UpdateUploadProgress(&spdb.UploadObjectMeta{ - ObjectID: task.GetObjectInfo().Id.Uint64(), - TaskState: types.TaskState_TASK_STATE_REPLICATE_OBJECT_DOING, + ObjectID: task.GetObjectInfo().Id.Uint64(), + TaskState: types.TaskState_TASK_STATE_REPLICATE_OBJECT_DOING, + GlobalVirtualGroupID: gvgMeta.ID, + SecondaryEndpoints: gvgMeta.SecondarySPEndpoints, }) if err != nil { log.Errorw("failed to update object task state", "task_info", task.Info(), "error", err) @@ -270,8 +273,10 @@ func (m *ManageModular) HandleDoneResumableUploadObjectTask(ctx context.Context, go m.backUpTask() go func() error { err = m.baseApp.GfSpDB().UpdateUploadProgress(&spdb.UploadObjectMeta{ - ObjectID: task.GetObjectInfo().Id.Uint64(), - TaskState: types.TaskState_TASK_STATE_REPLICATE_OBJECT_DOING, + ObjectID: task.GetObjectInfo().Id.Uint64(), + TaskState: types.TaskState_TASK_STATE_REPLICATE_OBJECT_DOING, + GlobalVirtualGroupID: gvgMeta.ID, + SecondaryEndpoints: gvgMeta.SecondarySPEndpoints, }) if err != nil { log.CtxErrorw(ctx, "failed to update object task state", "error", err) @@ -289,7 +294,7 @@ func (m *ManageModular) HandleReplicatePieceTask(ctx context.Context, task task. return ErrDanglingTask } if task.Error() != nil { - log.CtxErrorw(ctx, "handler error replicate piece task", "task_info", task.Info(), "error", task.Error()) + log.CtxErrorw(ctx, "failed to replicate piece task", "task_info", task.Info(), "error", task.Error()) _ = m.handleFailedReplicatePieceTask(ctx, task) metrics.ManagerCounter.WithLabelValues(ManagerFailureReplicate).Inc() metrics.ManagerTime.WithLabelValues(ManagerFailureReplicate).Observe( @@ -362,48 +367,6 @@ func (m *ManageModular) HandleReplicatePieceTask(ctx context.Context, task task. } func (m *ManageModular) handleFailedReplicatePieceTask(ctx context.Context, handleTask task.ReplicatePieceTask) error { - if handleTask.GetNotAvailableSpIdx() != -1 { - objectInfo, queryErr := m.baseApp.Consensus().QueryObjectInfoByID(ctx, util.Uint64ToString(handleTask.GetObjectInfo().Id.Uint64())) - if queryErr != nil { - log.Errorw("failed to query object info", "object", handleTask.GetObjectInfo(), "error", queryErr) - return queryErr - } - if objectInfo.GetObjectStatus() == storagetypes.OBJECT_STATUS_SEALED { - log.CtxInfow(ctx, "object already sealed, abort replicate task", "object_info", objectInfo) - m.replicateQueue.PopByKey(handleTask.Key()) - return nil - } - - gvgID := handleTask.GetGlobalVirtualGroupId() - gvg, err := m.baseApp.Consensus().QueryGlobalVirtualGroup(context.Background(), gvgID) - if err != nil { - log.Errorw("failed to query global virtual group from chain, ", "gvgID", gvgID, "error", err) - return err - } - sspID := gvg.GetSecondarySpIds()[handleTask.GetNotAvailableSpIdx()] - sspJoinGVGs, err := m.baseApp.GfSpClient().ListGlobalVirtualGroupsBySecondarySP(ctx, sspID) - if err != nil { - log.Errorw("failed to list GVGs by secondary sp", "spID", sspID, "error", err) - return err - } - shouldFreezeGVGs := make([]*virtualgrouptypes.GlobalVirtualGroup, 0) - selfSPID, err := m.getSPID() - if err != nil { - log.CtxErrorw(ctx, "failed to get self sp id", "error", err) - return err - } - for _, g := range sspJoinGVGs { - if g.GetPrimarySpId() == selfSPID { - shouldFreezeGVGs = append(shouldFreezeGVGs, g) - } - } - m.virtualGroupManager.FreezeSPAndGVGs(sspID, shouldFreezeGVGs) - log.CtxDebugw(ctx, "add sp to freeze pool", "spID", sspID, "excludedGVGs", shouldFreezeGVGs) - m.replicateQueue.PopByKey(handleTask.Key()) - - return m.pickGVGAndReplicate(ctx, gvg.FamilyId, handleTask) - } - shadowTask := handleTask oldTask := m.replicateQueue.PopByKey(handleTask.Key()) if m.TaskUploading(ctx, handleTask) { @@ -419,8 +382,52 @@ func (m *ManageModular) handleFailedReplicatePieceTask(ctx context.Context, hand handleTask.AppendLog(fmt.Sprintf("manager-handle-failed-replicate-task-repush:%d", shadowTask.GetRetry())) handleTask.AppendLog(shadowTask.GetLogs()) handleTask.SetUpdateTime(time.Now().Unix()) - err := m.replicateQueue.Push(handleTask) - log.CtxDebugw(ctx, "push task again to retry", "task_info", handleTask.Info(), "error", err) + if handleTask.GetNotAvailableSpIdx() != -1 { + objectInfo, queryErr := m.baseApp.Consensus().QueryObjectInfoByID(ctx, util.Uint64ToString(handleTask.GetObjectInfo().Id.Uint64())) + if queryErr != nil { + log.Errorw("failed to query object info", "object", handleTask.GetObjectInfo(), "error", queryErr) + return queryErr + } + if objectInfo.GetObjectStatus() == storagetypes.OBJECT_STATUS_SEALED { + log.CtxInfow(ctx, "object already sealed, abort replicate task", "object_info", objectInfo) + m.replicateQueue.PopByKey(handleTask.Key()) + return nil + } + + gvgID := handleTask.GetGlobalVirtualGroupId() + gvg, queryErr := m.baseApp.Consensus().QueryGlobalVirtualGroup(context.Background(), gvgID) + if queryErr != nil { + log.Errorw("failed to query global virtual group from chain", "gvgID", gvgID, "error", queryErr) + return queryErr + } + sspID := gvg.GetSecondarySpIds()[handleTask.GetNotAvailableSpIdx()] + sspJoinGVGs, queryErr := m.baseApp.GfSpClient().ListGlobalVirtualGroupsBySecondarySP(ctx, sspID) + if queryErr != nil { + log.Errorw("failed to list GVGs by secondary sp", "spID", sspID, "error", queryErr) + return queryErr + } + shouldFreezeGVGs := make([]*virtualgrouptypes.GlobalVirtualGroup, 0) + selfSPID, queryErr := m.getSPID() + if queryErr != nil { + log.CtxErrorw(ctx, "failed to get self sp id", "error", queryErr) + return queryErr + } + for _, g := range sspJoinGVGs { + if g.GetPrimarySpId() == selfSPID { + shouldFreezeGVGs = append(shouldFreezeGVGs, g) + } + } + m.virtualGroupManager.FreezeSPAndGVGs(sspID, shouldFreezeGVGs) + rePickAndReplicateErr := m.pickGVGAndReplicate(ctx, gvg.FamilyId, handleTask) + log.CtxDebugw(ctx, "add failed sp to freeze pool, re-pick and push task again", + "failed_sp_id", sspID, "task_info", handleTask.Info(), + "excludedGVGs", shouldFreezeGVGs, "error", rePickAndReplicateErr) + return rePickAndReplicateErr + } else { + pushErr := m.replicateQueue.Push(handleTask) + log.CtxDebugw(ctx, "push task again to retry", "task_info", handleTask.Info(), "error", pushErr) + return pushErr + } } else { shadowTask.AppendLog(fmt.Sprintf("manager-handle-failed-replicate-task-error:%s-retry:%d", shadowTask.Error().Error(), shadowTask.GetRetry())) metrics.ManagerCounter.WithLabelValues(ManagerCancelReplicate).Inc() diff --git a/modular/manager/manager.go b/modular/manager/manager.go index bc244f422..9c2a43064 100644 --- a/modular/manager/manager.go +++ b/modular/manager/manager.go @@ -417,9 +417,8 @@ func (m *ManageModular) LoadTaskFromDB() error { replicateTask := &gfsptask.GfSpReplicatePieceTask{} replicateTask.InitReplicatePieceTask(objectInfo, storageParams, m.baseApp.TaskPriority(replicateTask), m.baseApp.TaskTimeout(replicateTask, objectInfo.GetPayloadSize()), m.baseApp.TaskMaxRetry(replicateTask)) - replicateTask.SetSecondaryAddresses(meta.SecondaryEndpoints) - replicateTask.SetSecondarySignatures(meta.SecondarySignatures) replicateTask.GlobalVirtualGroupId = meta.GlobalVirtualGroupID + replicateTask.SetSecondaryAddresses(meta.SecondaryEndpoints) pushErr := m.replicateQueue.Push(replicateTask) if pushErr != nil { log.Errorw("failed to push replicate piece task to queue", "object_info", objectInfo, "error", pushErr) @@ -773,7 +772,7 @@ func (m *ManageModular) RejectUnSealObject(ctx context.Context, object *storaget func (m *ManageModular) Statistics() string { return fmt.Sprintf( - "upload[%d], resumableUpload[%d], replicate[%d], seal[%d], receive[%d], recovery[%d] gcObject[%d], gcZombie[%d], gcMeta[%d], download[%d], challenge[%d], migrateGVG[%d], gcBlockHeight[%d], gcSafeDistance[%d], backupTaskNum[%d]", + "current inner status, upload[%d], resumableUpload[%d], replicate[%d], seal[%d], receive[%d], recovery[%d] gcObject[%d], gcZombie[%d], gcMeta[%d], download[%d], challenge[%d], migrateGVG[%d], gcBlockHeight[%d], gcSafeDistance[%d], backupTaskNum[%d]", m.uploadQueue.Len(), m.resumableUploadQueue.Len(), m.replicateQueue.Len(), m.sealQueue.Len(), m.receiveQueue.Len(), m.recoveryQueue.Len(), m.gcObjectQueue.Len(), m.gcZombieQueue.Len(), m.gcMetaQueue.Len(), m.downloadQueue.Len(), m.challengeQueue.Len(), m.migrateGVGQueue.Len(), diff --git a/modular/manager/task_retry_scheduler.go b/modular/manager/task_retry_scheduler.go index ce648fa5f..13e9506ee 100644 --- a/modular/manager/task_retry_scheduler.go +++ b/modular/manager/task_retry_scheduler.go @@ -44,12 +44,12 @@ const ( // TaskRetryScheduler is used to schedule background task retry. type TaskRetryScheduler struct { manager *ManageModular - rejectUnsealThresholdSecond uint64 + rejectUnsealThresholdSecond int64 } // NewTaskRetryScheduler returns a task retry scheduler instance. func NewTaskRetryScheduler(m *ManageModular) *TaskRetryScheduler { - rejectUnsealThresholdSecond := m.rejectUnsealThresholdSecond + rejectUnsealThresholdSecond := int64(m.rejectUnsealThresholdSecond) if rejectUnsealThresholdSecond == 0 { rejectUnsealThresholdSecond = defaultRejectUnsealThresholdSecond } @@ -77,8 +77,8 @@ func (s *TaskRetryScheduler) startReplicateTaskRetry() { ) for { - time.Sleep(retryIntervalSecond * 10) - iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retryReplicate, int64(s.rejectUnsealThresholdSecond)) + time.Sleep(retryIntervalSecond * 100) + iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retryReplicate, s.rejectUnsealThresholdSecond) log.Infow("start a new loop to retry replicate", "iterator", iter, "loop_number", loopNumber, "total_retry_number", totalRetryNumber) @@ -110,8 +110,8 @@ func (s *TaskRetryScheduler) startSealTaskRetry() { ) for { - time.Sleep(retryIntervalSecond * 10) - iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retrySeal, int64(s.rejectUnsealThresholdSecond)) + time.Sleep(retryIntervalSecond * 100) + iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retrySeal, s.rejectUnsealThresholdSecond) log.Infow("start a new loop to retry seal", "iterator", iter, "loop_number", loopNumber, "total_retry_number", totalRetryNumber) @@ -143,8 +143,8 @@ func (s *TaskRetryScheduler) startRejectUnsealTaskRetry() { ) for { - time.Sleep(retryIntervalSecond * 10) - iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retryRejectUnseal, int64(s.rejectUnsealThresholdSecond)) + time.Sleep(retryIntervalSecond * 100) + iter = NewTaskIterator(s.manager.baseApp.GfSpDB(), retryRejectUnseal, s.rejectUnsealThresholdSecond) log.Infow("start a new loop to retry reject unseal task", "iterator", iter, "loop_number", loopNumber, "total_retry_number", totalRetryNumber) @@ -166,102 +166,125 @@ func (s *TaskRetryScheduler) startRejectUnsealTaskRetry() { } } -func isAlreadyNotFound(err error) bool { +func isNotFound(err error) bool { return strings.Contains(err.Error(), "No such object") } // retryReplicateTask is used to push the failed replicate task to task dispatcher, // and the task will be executed by executor. -func (s *TaskRetryScheduler) retryReplicateTask(t *spdb.UploadObjectMeta) error { - objectInfo, queryErr := s.manager.baseApp.Consensus().QueryObjectInfoByID(context.Background(), util.Uint64ToString(t.ObjectID)) - if queryErr != nil { - log.Errorw("failed to query object info", "object_id", t.ObjectID, "error", queryErr) - if !isAlreadyNotFound(queryErr) { +func (s *TaskRetryScheduler) retryReplicateTask(meta *spdb.UploadObjectMeta) error { + var ( + err error + objectInfo *storagetypes.ObjectInfo + storageParams *storagetypes.Params + replicateTask *gfsptask.GfSpReplicatePieceTask + ) + + objectInfo, err = s.manager.baseApp.Consensus().QueryObjectInfoByID(context.Background(), util.Uint64ToString(meta.ObjectID)) + if err != nil { + log.Errorw("failed to query object info", "object_id", meta.ObjectID, "error", err) + if !isNotFound(err) { // the object maybe deleted. time.Sleep(backoffIntervalSecond) } - return queryErr + return err } if objectInfo.GetObjectStatus() != storagetypes.OBJECT_STATUS_CREATED { log.Infow("object is not in create status", "object_info", objectInfo) return fmt.Errorf("object is not in create status") } - storageParams, queryErr := s.manager.baseApp.Consensus().QueryStorageParamsByTimestamp(context.Background(), objectInfo.GetCreateAt()) - if queryErr != nil { - log.Errorw("failed to query storage param", "object_id", t.ObjectID, "error", queryErr) + storageParams, err = s.manager.baseApp.Consensus().QueryStorageParamsByTimestamp(context.Background(), objectInfo.GetCreateAt()) + if err != nil { + log.Errorw("failed to query storage param", "object_id", meta.ObjectID, "error", err) time.Sleep(backoffIntervalSecond) - return queryErr + return err } - replicateTask := &gfsptask.GfSpReplicatePieceTask{} + replicateTask = &gfsptask.GfSpReplicatePieceTask{} replicateTask.InitReplicatePieceTask(objectInfo, storageParams, s.manager.baseApp.TaskPriority(replicateTask), s.manager.baseApp.TaskTimeout(replicateTask, objectInfo.GetPayloadSize()), s.manager.baseApp.TaskMaxRetry(replicateTask)) - replicateTask.SetSecondaryAddresses(t.SecondaryEndpoints) - replicateTask.SetSecondarySignatures(t.SecondarySignatures) - replicateTask.GlobalVirtualGroupId = t.GlobalVirtualGroupID - pushErr := s.manager.replicateQueue.Push(replicateTask) - if pushErr != nil { - if errors.Is(pushErr, gfsptqueue.ErrTaskQueueExceed) { + replicateTask.GlobalVirtualGroupId = meta.GlobalVirtualGroupID + replicateTask.SecondaryEndpoints = meta.SecondaryEndpoints + err = s.manager.replicateQueue.Push(replicateTask) + if err != nil { + if errors.Is(err, gfsptqueue.ErrTaskQueueExceed) { time.Sleep(backoffIntervalSecond) } - log.Errorw("failed to push replicate piece task to queue", "object_info", objectInfo, "error", pushErr) - return pushErr + log.Errorw("failed to push replicate piece task to queue", "object_info", objectInfo, "error", err) + return err } return nil } // retrySealTask is used to send seal tx to chain. // This task is very lightweight and therefore executed directly inside the scheduler. -func (s *TaskRetryScheduler) retrySealTask(t *spdb.UploadObjectMeta) error { - objectInfo, queryErr := s.manager.baseApp.Consensus().QueryObjectInfoByID(context.Background(), util.Uint64ToString(t.ObjectID)) - if queryErr != nil { - log.Errorw("failed to query object info", "object_id", t.ObjectID, "error", queryErr) - if !isAlreadyNotFound(queryErr) { +func (s *TaskRetryScheduler) retrySealTask(meta *spdb.UploadObjectMeta) error { + var ( + err error + objectInfo *storagetypes.ObjectInfo + blsSig []bls.Signature + sealMsg *storagetypes.MsgSealObject + ) + + objectInfo, err = s.manager.baseApp.Consensus().QueryObjectInfoByID(context.Background(), util.Uint64ToString(meta.ObjectID)) + if err != nil { + log.Errorw("failed to query object info", "object_id", meta.ObjectID, "error", err) + if !isNotFound(err) { // the object maybe deleted. time.Sleep(backoffIntervalSecond) } - return queryErr + return err } if objectInfo.GetObjectStatus() != storagetypes.OBJECT_STATUS_CREATED { log.Infow("object is not in create status", "object_info", objectInfo) return fmt.Errorf("object is not in create status") } - blsSig, err := bls.MultipleSignaturesFromBytes(t.SecondarySignatures) + blsSig, err = bls.MultipleSignaturesFromBytes(meta.SecondarySignatures) if err != nil { - log.Errorw("failed to get multiple signature", "object_id", t.ObjectID, "error", err) + log.Errorw("failed to get multiple signature", "object_id", meta.ObjectID, "error", err) return err } - sealMsg := &storagetypes.MsgSealObject{ + sealMsg = &storagetypes.MsgSealObject{ Operator: s.manager.baseApp.OperatorAddress(), BucketName: objectInfo.GetBucketName(), ObjectName: objectInfo.GetObjectName(), - GlobalVirtualGroupId: t.GlobalVirtualGroupID, + GlobalVirtualGroupId: meta.GlobalVirtualGroupID, SecondarySpBlsAggSignatures: bls.AggregateSignatures(blsSig).Marshal(), } - return sendAndConfirmSealObjectTx(s.manager.baseApp, sealMsg) + err = sendAndConfirmSealObjectTx(s.manager.baseApp, sealMsg) + if err == nil { + _ = s.manager.baseApp.GfSpDB().DeleteUploadProgress(objectInfo.Id.Uint64()) + } + return err } // retryRejectTask is used to send reject unseal tx to chain. // This task is very lightweight and therefore executed directly inside the scheduler. -func (s *TaskRetryScheduler) retryRejectUnsealTask(t *spdb.UploadObjectMeta) error { - objectInfo, queryErr := s.manager.baseApp.Consensus().QueryObjectInfoByID(context.Background(), util.Uint64ToString(t.ObjectID)) - if queryErr != nil { - log.Errorw("failed to query object info", "object_id", t.ObjectID, "error", queryErr) - if !isAlreadyNotFound(queryErr) { +func (s *TaskRetryScheduler) retryRejectUnsealTask(meta *spdb.UploadObjectMeta) error { + var ( + err error + objectInfo *storagetypes.ObjectInfo + rejectUnsealMsg *storagetypes.MsgRejectSealObject + ) + + objectInfo, err = s.manager.baseApp.Consensus().QueryObjectInfoByID(context.Background(), util.Uint64ToString(meta.ObjectID)) + if err != nil { + log.Errorw("failed to query object info", "object_id", meta.ObjectID, "error", err) + if !isNotFound(err) { // the object maybe deleted. time.Sleep(backoffIntervalSecond) } - return queryErr + return err } if objectInfo.GetObjectStatus() != storagetypes.OBJECT_STATUS_CREATED { log.Infow("object is not in create status", "object_info", objectInfo) return fmt.Errorf("object is not in create status") } - rejectUnsealMsg := &storagetypes.MsgRejectSealObject{ + rejectUnsealMsg = &storagetypes.MsgRejectSealObject{ Operator: s.manager.baseApp.OperatorAddress(), BucketName: objectInfo.GetBucketName(), ObjectName: objectInfo.GetObjectName(), } - err := sendAndConfirmRejectUnsealObjectTx(s.manager.baseApp, rejectUnsealMsg) + err = sendAndConfirmRejectUnsealObjectTx(s.manager.baseApp, rejectUnsealMsg) if err == nil { _ = s.manager.baseApp.GfSpDB().DeleteUploadProgress(objectInfo.Id.Uint64()) } @@ -317,6 +340,7 @@ func NewTaskIterator(db spdb.SPDB, taskType RetryTaskType, rejectUnsealThreshold endTS int64 prefetchFunc PrefetchFunc ) + switch taskType { case retryReplicate: startTS = sqldb.GetCurrentUnixTime() - rejectUnsealThresholdSecond @@ -333,7 +357,7 @@ func NewTaskIterator(db spdb.SPDB, taskType RetryTaskType, rejectUnsealThreshold case retrySeal: return iter.dbReader.GetUploadMetasToSealByStartTS(prefetchLimit, iter.startTimeStampSecond) case retryRejectUnseal: - return iter.dbReader.GetUploadMetasToRejectByRangeTS(prefetchLimit, iter.startTimeStampSecond, iter.endTimeStampSecond) + return iter.dbReader.GetUploadMetasToRejectUnsealByRangeTS(prefetchLimit, iter.startTimeStampSecond, iter.endTimeStampSecond) } return nil, nil } @@ -348,18 +372,19 @@ func NewTaskIterator(db spdb.SPDB, taskType RetryTaskType, rejectUnsealThreshold } func (iter *TaskIterator) Valid() bool { + var err error if iter.currentIndex >= len(iter.cachedValueList) { - var err error iter.cachedValueList, err = iter.prefetchFunc(iter) if err != nil { - log.Errorw("failed to get upload metas to replicate by start timestamp", "error", err) + log.Errorw("failed to prefetch retry task meta", "iter_type", iter.taskType, "error", err) return false } if len(iter.cachedValueList) == 0 { + log.Debugw("Skip to iterate due to empty result", "iter_type", iter.taskType) return false } iter.currentIndex = 0 - iter.startTimeStampSecond = iter.cachedValueList[len(iter.cachedValueList)-1].UpdateTimeStamp + iter.startTimeStampSecond = iter.cachedValueList[len(iter.cachedValueList)-1].CreateTimeStampSecond } return true } diff --git a/modular/uploader/upload_task.go b/modular/uploader/upload_task.go index b92c46e3f..1d9ac2531 100644 --- a/modular/uploader/upload_task.go +++ b/modular/uploader/upload_task.go @@ -18,6 +18,7 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/core/taskqueue" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics" + "github.com/bnb-chain/greenfield-storage-provider/store/types" storagetypes "github.com/bnb-chain/greenfield/x/storage/types" ) @@ -148,6 +149,14 @@ func (u *UploadModular) HandleUploadObjectTask(ctx context.Context, uploadObject log.CtxErrorw(ctx, "failed to write integrity hash to db", "error", err) return ErrGfSpDBWithDetail("failed to write integrity hash to db, error: " + err.Error()) } + err = u.baseApp.GfSpDB().UpdateUploadProgress(&corespdb.UploadObjectMeta{ + ObjectID: uploadObjectTask.GetObjectInfo().Id.Uint64(), + TaskState: types.TaskState_TASK_STATE_UPLOAD_OBJECT_DONE, + }) + if err != nil { + log.CtxErrorw(ctx, "failed to update upload progress", "error", err) + return ErrGfSpDBWithDetail("failed to update upload progress, error: " + err.Error()) + } log.CtxDebugw(ctx, "succeed to upload payload to piece store") return nil } @@ -273,6 +282,14 @@ func (u *UploadModular) HandleResumableUploadObjectTask(ctx context.Context, tas log.CtxErrorw(ctx, "failed to write integrity hash to db", "error", err) return ErrGfSpDBWithDetail("failed to write integrity hash to db, error: " + err.Error()) } + err = u.baseApp.GfSpDB().UpdateUploadProgress(&corespdb.UploadObjectMeta{ + ObjectID: task.GetObjectInfo().Id.Uint64(), + TaskState: types.TaskState_TASK_STATE_UPLOAD_OBJECT_DONE, + }) + if err != nil { + log.CtxErrorw(ctx, "failed to update upload progress", "error", err) + return ErrGfSpDBWithDetail("failed to update upload progress, error: " + err.Error()) + } } log.CtxDebug(ctx, "succeed to upload payload to piece store") diff --git a/modular/uploader/uploader_task_test.go b/modular/uploader/uploader_task_test.go index 7ab360811..31467f1da 100644 --- a/modular/uploader/uploader_task_test.go +++ b/modular/uploader/uploader_task_test.go @@ -127,6 +127,7 @@ func TestUploadModular_HandleUploadObjectTaskSuccess1(t *testing.T) { m3 := corespdb.NewMockSPDB(ctrl) u.baseApp.SetGfSpDB(m3) m3.EXPECT().SetObjectIntegrity(gomock.Any()).Return(nil).AnyTimes() + m3.EXPECT().UpdateUploadProgress(gomock.Any()).Return(nil).AnyTimes() m4 := gfspclient.NewMockGfSpClientAPI(ctrl) u.baseApp.SetGfSpClient(m4) diff --git a/store/bsdb/database_mock.go b/store/bsdb/database_mock.go index 81bcde7e6..09430dd9f 100644 --- a/store/bsdb/database_mock.go +++ b/store/bsdb/database_mock.go @@ -1,10 +1,6 @@ // Code generated by MockGen. DO NOT EDIT. // Source: store/bsdb/database.go -// -// Generated by this command: -// -// mockgen -source=store/bsdb/database.go -destination=store/bsdb/database_mock.go -package=bsdb -// + // Package bsdb is a generated GoMock package. package bsdb diff --git a/store/sqldb/upload_object.go b/store/sqldb/upload_object.go index 7fdc10d5c..1e0875ae5 100644 --- a/store/sqldb/upload_object.go +++ b/store/sqldb/upload_object.go @@ -131,10 +131,11 @@ func (s *SpDBImpl) GetUploadMetasToReplicateByStartTS(limit int, startTS int64) uploadObjectProgresses []UploadObjectProgressTable returnUploadObjectMetas []*corespdb.UploadObjectMeta ) - result = s.db.Where("task_state IN ? and update_timestamp_second > ?", []string{ - util.Uint32ToString(uint32(storetypes.TaskState_TASK_STATE_REPLICATE_OBJECT_DONE)), - util.Uint32ToString(uint32(storetypes.TaskState_TASK_STATE_SEAL_OBJECT_DOING)), - }, startTS).Order("update_timestamp_second ASC").Limit(limit).Find(&uploadObjectProgresses) + result = s.db.Where("task_state IN ? and create_timestamp_second > ?", []string{ + util.Uint32ToString(uint32(storetypes.TaskState_TASK_STATE_UPLOAD_OBJECT_DONE)), + util.Uint32ToString(uint32(storetypes.TaskState_TASK_STATE_REPLICATE_OBJECT_DOING)), + util.Uint32ToString(uint32(storetypes.TaskState_TASK_STATE_REPLICATE_OBJECT_ERROR)), + }, startTS).Order("create_timestamp_second ASC").Limit(limit).Find(&uploadObjectProgresses) if result.Error != nil { return nil, fmt.Errorf("failed to query upload table: %s", result.Error) } @@ -144,11 +145,11 @@ func (s *SpDBImpl) GetUploadMetasToReplicateByStartTS(limit int, startTS int64) return nil, err } returnUploadObjectMetas = append(returnUploadObjectMetas, &corespdb.UploadObjectMeta{ - ObjectID: u.ObjectID, - GlobalVirtualGroupID: u.GlobalVirtualGroupID, - SecondaryEndpoints: util.SplitByComma(u.SecondaryEndpoints), - SecondarySignatures: secondarySignatures, - UpdateTimeStamp: u.UpdateTimestampSecond, + ObjectID: u.ObjectID, + GlobalVirtualGroupID: u.GlobalVirtualGroupID, + SecondaryEndpoints: util.SplitByComma(u.SecondaryEndpoints), + SecondarySignatures: secondarySignatures, + CreateTimeStampSecond: u.CreateTimestampSecond, }) } return returnUploadObjectMetas, nil @@ -160,10 +161,11 @@ func (s *SpDBImpl) GetUploadMetasToSealByStartTS(limit int, startTS int64) ([]*c uploadObjectProgresses []UploadObjectProgressTable returnUploadObjectMetas []*corespdb.UploadObjectMeta ) - result = s.db.Where("task_state IN ? and update_timestamp_second > ?", []string{ - util.Uint32ToString(uint32(storetypes.TaskState_TASK_STATE_UPLOAD_OBJECT_DONE)), - util.Uint32ToString(uint32(storetypes.TaskState_TASK_STATE_REPLICATE_OBJECT_DOING)), - }, startTS).Order("update_timestamp_second ASC").Limit(limit).Find(&uploadObjectProgresses) + result = s.db.Where("task_state IN ? and create_timestamp_second > ?", []string{ + util.Uint32ToString(uint32(storetypes.TaskState_TASK_STATE_REPLICATE_OBJECT_DONE)), + util.Uint32ToString(uint32(storetypes.TaskState_TASK_STATE_SEAL_OBJECT_DOING)), + util.Uint32ToString(uint32(storetypes.TaskState_TASK_STATE_SEAL_OBJECT_ERROR)), + }, startTS).Order("create_timestamp_second ASC").Limit(limit).Find(&uploadObjectProgresses) if result.Error != nil { return nil, fmt.Errorf("failed to query upload table: %s", result.Error) } @@ -173,35 +175,31 @@ func (s *SpDBImpl) GetUploadMetasToSealByStartTS(limit int, startTS int64) ([]*c return nil, err } returnUploadObjectMetas = append(returnUploadObjectMetas, &corespdb.UploadObjectMeta{ - ObjectID: u.ObjectID, - GlobalVirtualGroupID: u.GlobalVirtualGroupID, - SecondaryEndpoints: util.SplitByComma(u.SecondaryEndpoints), - SecondarySignatures: secondarySignatures, - UpdateTimeStamp: u.UpdateTimestampSecond, + ObjectID: u.ObjectID, + GlobalVirtualGroupID: u.GlobalVirtualGroupID, + SecondaryEndpoints: util.SplitByComma(u.SecondaryEndpoints), + SecondarySignatures: secondarySignatures, + CreateTimeStampSecond: u.CreateTimestampSecond, }) } return returnUploadObjectMetas, nil } -func (s *SpDBImpl) GetUploadMetasToRejectByRangeTS(limit int, startTS int64, endTS int64) ([]*corespdb.UploadObjectMeta, error) { +func (s *SpDBImpl) GetUploadMetasToRejectUnsealByRangeTS(limit int, startTS int64, endTS int64) ([]*corespdb.UploadObjectMeta, error) { var ( result *gorm.DB uploadObjectProgresses []UploadObjectProgressTable returnUploadObjectMetas []*corespdb.UploadObjectMeta ) - result = s.db.Where("task_state IN ? and update_timestamp_second > ? and update_timestamp_second <= ?", []string{ - util.Uint32ToString(uint32(storetypes.TaskState_TASK_STATE_UPLOAD_OBJECT_DONE)), - util.Uint32ToString(uint32(storetypes.TaskState_TASK_STATE_REPLICATE_OBJECT_DOING)), - util.Uint32ToString(uint32(storetypes.TaskState_TASK_STATE_REPLICATE_OBJECT_DONE)), - util.Uint32ToString(uint32(storetypes.TaskState_TASK_STATE_SEAL_OBJECT_DOING)), - }, startTS, endTS).Order("update_timestamp_second ASC").Limit(limit).Find(&uploadObjectProgresses) + result = s.db.Where("create_timestamp_second > ? and create_timestamp_second <= ?", + startTS, endTS).Order("create_timestamp_second ASC").Limit(limit).Find(&uploadObjectProgresses) if result.Error != nil { return nil, fmt.Errorf("failed to query upload table: %s", result.Error) } for _, u := range uploadObjectProgresses { returnUploadObjectMetas = append(returnUploadObjectMetas, &corespdb.UploadObjectMeta{ - ObjectID: u.ObjectID, - UpdateTimeStamp: u.UpdateTimestampSecond, + ObjectID: u.ObjectID, + CreateTimeStampSecond: u.CreateTimestampSecond, }) } return returnUploadObjectMetas, nil diff --git a/store/sqldb/upload_object_schema.go b/store/sqldb/upload_object_schema.go index 3a929d96f..4144f3d24 100644 --- a/store/sqldb/upload_object_schema.go +++ b/store/sqldb/upload_object_schema.go @@ -9,7 +9,7 @@ type UploadObjectProgressTable struct { ErrorDescription string SecondaryEndpoints string SecondarySignatures string - CreateTimestampSecond int64 + CreateTimestampSecond int64 `gorm:"index:create_timestamp_index"` UpdateTimestampSecond int64 `gorm:"index:update_timestamp_index"` }