diff --git a/db/migrations/postgres/000105_add_namespace_and_data_id_to_blobs.down.sql b/db/migrations/postgres/000105_add_namespace_and_data_id_to_blobs.down.sql new file mode 100644 index 000000000..788f82dda --- /dev/null +++ b/db/migrations/postgres/000105_add_namespace_and_data_id_to_blobs.down.sql @@ -0,0 +1,9 @@ +BEGIN; + +DROP INDEX blobs_namespace_data_id; +DROP INDEX blobs_payload_ref; +ALTER TABLE blobs DROP COLUMN namespace; +ALTER TABLE blobs DROP COLUMN data_id; +CREATE INDEX blob_hash ON blobs(hash); + +COMMIT; \ No newline at end of file diff --git a/db/migrations/postgres/000105_add_namespace_and_data_id_to_blobs.up.sql b/db/migrations/postgres/000105_add_namespace_and_data_id_to_blobs.up.sql new file mode 100644 index 000000000..8c2583a27 --- /dev/null +++ b/db/migrations/postgres/000105_add_namespace_and_data_id_to_blobs.up.sql @@ -0,0 +1,25 @@ +BEGIN; + +CREATE TABLE temp_blobs ( + seq SERIAL PRIMARY KEY, + namespace VARCHAR(64) NOT NULL, + hash CHAR(64) NOT NULL, + payload_ref VARCHAR(1024) NOT NULL, + created BIGINT NOT NULL, + peer VARCHAR(256) NOT NULL, + size BIGINT, + data_id UUID NOT NULL +); +INSERT INTO temp_blobs (namespace, data_id, hash, payload_ref, created, peer, size) + SELECT DISTINCT data.namespace, data.id, data.blob_hash, blobs.payload_ref, blobs.created, blobs.peer, blobs.size + FROM data + LEFT JOIN blobs ON blobs.hash = data.blob_hash + WHERE data.blob_hash IS NOT NULL +; +DROP INDEX blobs_hash; +DROP TABLE blobs; +ALTER TABLE temp_blobs RENAME TO blobs; +CREATE INDEX blobs_namespace_data_id ON blobs(namespace, data_id); +CREATE INDEX blobs_payload_ref ON blobs(payload_ref); + +COMMIT; \ No newline at end of file diff --git a/db/migrations/sqlite/000105_add_namespace_and_data_id_to_blobs.down.sql b/db/migrations/sqlite/000105_add_namespace_and_data_id_to_blobs.down.sql new file mode 100644 index 000000000..711b8c412 --- /dev/null +++ b/db/migrations/sqlite/000105_add_namespace_and_data_id_to_blobs.down.sql @@ -0,0 +1,5 @@ +DROP INDEX blobs_namespace_data_id; +DROP INDEX blobs_payload_ref; +ALTER TABLE blobs DROP COLUMN namespace; +ALTER TABLE blobs DROP COLUMN data_id; +CREATE INDEX blob_hash ON blobs(hash); \ No newline at end of file diff --git a/db/migrations/sqlite/000105_add_namespace_and_data_id_to_blobs.up.sql b/db/migrations/sqlite/000105_add_namespace_and_data_id_to_blobs.up.sql new file mode 100644 index 000000000..a70770b98 --- /dev/null +++ b/db/migrations/sqlite/000105_add_namespace_and_data_id_to_blobs.up.sql @@ -0,0 +1,21 @@ +CREATE TABLE temp_blobs ( + seq INTEGER PRIMARY KEY AUTOINCREMENT, + namespace VARCHAR(64) NOT NULL, + hash CHAR(64) NOT NULL, + payload_ref VARCHAR(1024) NOT NULL, + created BIGINT NOT NULL, + peer VARCHAR(256) NOT NULL, + size BIGINT, + data_id UUID NOT NULL +); +INSERT INTO temp_blobs (namespace, data_id, hash, payload_ref, created, peer, size) + SELECT DISTINCT data.namespace, data.id, data.blob_hash, blobs.payload_ref, blobs.created, blobs.peer, blobs.size + FROM data + LEFT JOIN blobs ON blobs.hash = data.blob_hash + WHERE data.blob_hash IS NOT NULL +; +DROP INDEX blobs_hash; +DROP TABLE blobs; +ALTER TABLE temp_blobs RENAME TO blobs; +CREATE INDEX blobs_namespace_data_id ON blobs(namespace, data_id); +CREATE INDEX blobs_payload_ref ON blobs(payload_ref); \ No newline at end of file diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 11d18379e..44fbd5f34 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -5055,6 +5055,32 @@ paths: tags: - Default Namespace /data/{dataid}: + delete: + description: Deletes a data item by its ID, including metadata about this item + operationId: deleteData + parameters: + - description: The data item ID + in: path + name: dataid + required: true + schema: + type: string + - description: Server-side request timeout (milliseconds, or set a custom suffix + like 10s) + in: header + name: Request-Timeout + schema: + default: 2m0s + type: string + responses: + "204": + content: + application/json: {} + description: Success + default: + description: "" + tags: + - Default Namespace get: description: Gets a data item by its ID, including metadata about this item operationId: getDataByID @@ -5151,7 +5177,7 @@ paths: description: Downloads the original file that was previously uploaded or received operationId: getDataBlob parameters: - - description: The blob ID + - description: The data item ID in: path name: dataid required: true @@ -15549,6 +15575,39 @@ paths: tags: - Non-Default Namespace /namespaces/{ns}/data/{dataid}: + delete: + description: Deletes a data item by its ID, including metadata about this item + operationId: deleteDataNamespace + parameters: + - description: The data item ID + in: path + name: dataid + required: true + schema: + type: string + - description: The namespace which scopes this request + in: path + name: ns + required: true + schema: + example: default + type: string + - description: Server-side request timeout (milliseconds, or set a custom suffix + like 10s) + in: header + name: Request-Timeout + schema: + default: 2m0s + type: string + responses: + "204": + content: + application/json: {} + description: Success + default: + description: "" + tags: + - Non-Default Namespace get: description: Gets a data item by its ID, including metadata about this item operationId: getDataByIDNamespace @@ -15652,7 +15711,7 @@ paths: description: Downloads the original file that was previously uploaded or received operationId: getDataBlobNamespace parameters: - - description: The blob ID + - description: The data item ID in: path name: dataid required: true diff --git a/internal/apiserver/route_date_data_test.go b/internal/apiserver/route_date_data_test.go new file mode 100644 index 000000000..971909f50 --- /dev/null +++ b/internal/apiserver/route_date_data_test.go @@ -0,0 +1,44 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apiserver + +import ( + "net/http/httptest" + "testing" + + "github.com/hyperledger/firefly-common/pkg/fftypes" + "github.com/hyperledger/firefly/mocks/datamocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestDeleteDataByID(t *testing.T) { + o, r := newTestAPIServer() + o.On("Authorize", mock.Anything, mock.Anything).Return(nil) + dmm := &datamocks.Manager{} + o.On("Data").Return(dmm) + id := fftypes.NewUUID() + req := httptest.NewRequest("DELETE", "/api/v1/namespaces/mynamespace/data/"+id.String(), nil) + req.Header.Set("Content-Type", "application/json; charset=utf-8") + res := httptest.NewRecorder() + + dmm.On("DeleteData", mock.Anything, id.String()). + Return(nil, nil) + r.ServeHTTP(res, req) + + assert.Equal(t, 204, res.Result().StatusCode) +} diff --git a/internal/apiserver/route_delete_data.go b/internal/apiserver/route_delete_data.go new file mode 100644 index 000000000..92409dd54 --- /dev/null +++ b/internal/apiserver/route_delete_data.go @@ -0,0 +1,44 @@ +// Copyright © 2023 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apiserver + +import ( + "net/http" + + "github.com/hyperledger/firefly-common/pkg/ffapi" + "github.com/hyperledger/firefly/internal/coremsgs" +) + +var deleteData = &ffapi.Route{ + Name: "deleteData", + Path: "data/{dataid}", + Method: http.MethodDelete, + PathParams: []*ffapi.PathParam{ + {Name: "dataid", Description: coremsgs.APIParamsDataID}, + }, + QueryParams: nil, + Description: coremsgs.APIEndpointsDeleteData, + JSONInputValue: nil, + JSONOutputValue: nil, + JSONOutputCodes: []int{http.StatusNoContent}, + Extensions: &coreExtensions{ + CoreJSONHandler: func(r *ffapi.APIRequest, cr *coreRequest) (output interface{}, err error) { + err = cr.or.Data().DeleteData(cr.ctx, r.PP["dataid"]) + return nil, err + }, + }, +} diff --git a/internal/apiserver/route_get_data_blob.go b/internal/apiserver/route_get_data_blob.go index a4d0048df..fce1ca6be 100644 --- a/internal/apiserver/route_get_data_blob.go +++ b/internal/apiserver/route_get_data_blob.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -32,7 +32,7 @@ var getDataBlob = &ffapi.Route{ Path: "data/{dataid}/blob", Method: http.MethodGet, PathParams: []*ffapi.PathParam{ - {Name: "dataid", Description: coremsgs.APIParamsBlobID}, + {Name: "dataid", Description: coremsgs.APIParamsDataID}, }, QueryParams: nil, FilterFactory: database.MessageQueryFactory, diff --git a/internal/apiserver/routes.go b/internal/apiserver/routes.go index e724ed56b..5f231a29a 100644 --- a/internal/apiserver/routes.go +++ b/internal/apiserver/routes.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -53,6 +53,7 @@ var routes = append( }), namespacedRoutes([]*ffapi.Route{ deleteContractListener, + deleteData, deleteSubscription, getBatchByID, getBatches, diff --git a/internal/broadcast/manager.go b/internal/broadcast/manager.go index 702157513..f47234a8c 100644 --- a/internal/broadcast/manager.go +++ b/internal/broadcast/manager.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -203,14 +203,15 @@ func (bm *broadcastManager) uploadDataBlob(ctx context.Context, tx *fftypes.UUID return err } - blob, err := bm.database.GetBlobMatchingHash(ctx, d.Blob.Hash) + fb := database.BlobQueryFactory.NewFilter(ctx) + blobs, _, err := bm.database.GetBlobs(ctx, bm.namespace.Name, fb.And(fb.Eq("data_id", d.ID), fb.Eq("hash", d.Blob.Hash))) if err != nil { return err - } else if blob == nil { + } else if len(blobs) == 0 || blobs[0] == nil { return i18n.NewError(ctx, coremsgs.MsgBlobNotFound, d.Blob.Hash) } - _, err = bm.operations.RunOperation(ctx, opUploadBlob(op, d, blob)) + _, err = bm.operations.RunOperation(ctx, opUploadBlob(op, d, blobs[0])) return err } diff --git a/internal/broadcast/manager_test.go b/internal/broadcast/manager_test.go index b13f11924..4b4080707 100644 --- a/internal/broadcast/manager_test.go +++ b/internal/broadcast/manager_test.go @@ -189,7 +189,7 @@ func TestDispatchBatchBlobsFail(t *testing.T) { mom.On("AddOrReuseOperation", mock.Anything, mock.Anything).Return(nil) mdi := bm.database.(*databasemocks.Plugin) - mdi.On("GetBlobMatchingHash", bm.ctx, blobHash).Return(nil, fmt.Errorf("pop")) + mdi.On("GetBlobs", mock.Anything, bm.namespace.Name, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) err := bm.dispatchBatch(bm.ctx, state) assert.EqualError(t, err, "pop") @@ -327,7 +327,7 @@ func TestUploadBlobPublishFail(t *testing.T) { ctx := context.Background() mtx.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeDataPublish, core.IdempotencyKey("idem1")).Return(fftypes.NewUUID(), nil) mdi.On("GetDataByID", ctx, "ns1", d.ID, true).Return(d, nil) - mdi.On("GetBlobMatchingHash", ctx, blob.Hash).Return(blob, nil) + mdi.On("GetBlobs", ctx, bm.namespace.Name, mock.Anything).Return([]*core.Blob{blob}, nil, nil) mom.On("AddOrReuseOperation", mock.Anything, mock.Anything).Return(nil) mom.On("RunOperation", mock.Anything, mock.MatchedBy(func(op *core.PreparedOperation) bool { data := op.Data.(uploadBlobData) @@ -355,7 +355,7 @@ func TestUploadBlobsGetBlobFail(t *testing.T) { dataID := fftypes.NewUUID() ctx := context.Background() - mdi.On("GetBlobMatchingHash", ctx, blob.Hash).Return(nil, fmt.Errorf("pop")) + mdi.On("GetBlobs", ctx, bm.namespace.Name, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) mom := bm.operations.(*operationmocks.Manager) mom.On("AddOrReuseOperation", mock.Anything, mock.Anything).Return(nil) @@ -387,7 +387,7 @@ func TestUploadBlobsGetBlobNotFound(t *testing.T) { dataID := fftypes.NewUUID() ctx := context.Background() - mdi.On("GetBlobMatchingHash", ctx, blob.Hash).Return(nil, nil) + mdi.On("GetBlobs", ctx, bm.namespace.Name, mock.Anything).Return([]*core.Blob{}, nil, nil) mom := bm.operations.(*operationmocks.Manager) mom.On("AddOrReuseOperation", mock.Anything, mock.Anything).Return(nil) @@ -596,7 +596,7 @@ func TestUploadBlobOK(t *testing.T) { ctx := context.Background() mdi.On("GetDataByID", ctx, "ns1", d.ID, true).Return(d, nil) - mdi.On("GetBlobMatchingHash", ctx, blob.Hash).Return(blob, nil) + mdi.On("GetBlobs", ctx, bm.namespace.Name, mock.Anything).Return([]*core.Blob{blob}, nil, nil) mom.On("AddOrReuseOperation", mock.Anything, mock.Anything).Return(nil) mom.On("RunOperation", mock.Anything, mock.MatchedBy(func(op *core.PreparedOperation) bool { data := op.Data.(uploadBlobData) diff --git a/internal/broadcast/operations.go b/internal/broadcast/operations.go index fce59adb1..68f74f412 100644 --- a/internal/broadcast/operations.go +++ b/internal/broadcast/operations.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -114,13 +114,14 @@ func (bm *broadcastManager) PrepareOperation(ctx context.Context, op *core.Opera } else if d == nil || d.Blob == nil { return nil, i18n.NewError(ctx, coremsgs.Msg404NotFound) } - blob, err := bm.database.GetBlobMatchingHash(ctx, d.Blob.Hash) + fb := database.BlobQueryFactory.NewFilter(ctx) + blobs, _, err := bm.database.GetBlobs(ctx, bm.namespace.Name, fb.And(fb.Eq("data_id", dataID), fb.Eq("hash", d.Blob.Hash))) if err != nil { return nil, err - } else if blob == nil { + } else if len(blobs) == 0 || blobs[0] == nil { return nil, i18n.NewError(ctx, coremsgs.Msg404NotFound) } - return opUploadBlob(op, d, blob), nil + return opUploadBlob(op, d, blobs[0]), nil case core.OpTypeSharedStorageUploadValue: dataID, err := retrieveUploadValueInputs(ctx, op) diff --git a/internal/broadcast/operations_test.go b/internal/broadcast/operations_test.go index f94af83a2..43210812c 100644 --- a/internal/broadcast/operations_test.go +++ b/internal/broadcast/operations_test.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -242,7 +242,8 @@ func TestPrepareAndRunUploadBlob(t *testing.T) { Hash: fftypes.NewRandB32(), } data := &core.Data{ - ID: fftypes.NewUUID(), + Namespace: "ns1", + ID: fftypes.NewUUID(), Blob: &core.BlobRef{ Hash: blob.Hash, }, @@ -255,7 +256,7 @@ func TestPrepareAndRunUploadBlob(t *testing.T) { reader := ioutil.NopCloser(strings.NewReader("some data")) mdi.On("GetDataByID", mock.Anything, "ns1", data.ID, false).Return(data, nil) - mdi.On("GetBlobMatchingHash", mock.Anything, blob.Hash).Return(blob, nil) + mdi.On("GetBlobs", mock.Anything, bm.namespace.Name, mock.Anything).Return([]*core.Blob{blob}, nil, nil) mps.On("UploadData", context.Background(), mock.Anything).Return("123", nil) mdx.On("DownloadBlob", context.Background(), mock.Anything).Return(reader, nil) mdi.On("UpdateData", context.Background(), "ns1", data.ID, mock.MatchedBy(func(update ffapi.Update) bool { @@ -337,7 +338,8 @@ func TestPrepareUploadBlobGetBlobMissing(t *testing.T) { Hash: fftypes.NewRandB32(), } data := &core.Data{ - ID: fftypes.NewUUID(), + Namespace: "ns1", + ID: fftypes.NewUUID(), Blob: &core.BlobRef{ Hash: blob.Hash, }, @@ -349,7 +351,7 @@ func TestPrepareUploadBlobGetBlobMissing(t *testing.T) { mdi := bm.database.(*databasemocks.Plugin) mdi.On("GetDataByID", mock.Anything, "ns1", data.ID, false).Return(data, nil) - mdi.On("GetBlobMatchingHash", mock.Anything, blob.Hash).Return(nil, nil) + mdi.On("GetBlobs", mock.Anything, bm.namespace.Name, mock.Anything).Return([]*core.Blob{}, nil, nil) _, err := bm.PrepareOperation(context.Background(), op) assert.Regexp(t, "FF10109", err) @@ -371,7 +373,8 @@ func TestPrepareUploadBlobGetBlobFailing(t *testing.T) { Hash: fftypes.NewRandB32(), } data := &core.Data{ - ID: fftypes.NewUUID(), + Namespace: "ns1", + ID: fftypes.NewUUID(), Blob: &core.BlobRef{ Hash: blob.Hash, }, @@ -381,7 +384,7 @@ func TestPrepareUploadBlobGetBlobFailing(t *testing.T) { mdi := bm.database.(*databasemocks.Plugin) mdi.On("GetDataByID", mock.Anything, "ns1", data.ID, false).Return(data, nil) - mdi.On("GetBlobMatchingHash", mock.Anything, blob.Hash).Return(nil, fmt.Errorf("pop")) + mdi.On("GetBlobs", mock.Anything, bm.namespace.Name, mock.Anything).Return([]*core.Blob{}, nil, fmt.Errorf("pop")) _, err := bm.PrepareOperation(context.Background(), op) assert.Regexp(t, "pop", err) diff --git a/internal/coremsgs/en_api_translations.go b/internal/coremsgs/en_api_translations.go index dea99d3da..b62c69259 100644 --- a/internal/coremsgs/en_api_translations.go +++ b/internal/coremsgs/en_api_translations.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -91,6 +91,7 @@ var ( APIEndpointsGetDataBlob = ffm("api.endpoints.getDataBlob", "Downloads the original file that was previously uploaded or received") APIEndpointsGetDataValue = ffm("api.endpoints.getDataValue", "Downloads the JSON value of the data resource, without the associated metadata") APIEndpointsGetDataByID = ffm("api.endpoints.getDataByID", "Gets a data item by its ID, including metadata about this item") + APIEndpointsDeleteData = ffm("api.endpoints.deleteData", "Deletes a data item by its ID, including metadata about this item") APIEndpointsGetDataMsgs = ffm("api.endpoints.getDataMsgs", "Gets a list of the messages associated with a data item") APIEndpointsGetData = ffm("api.endpoints.getData", "Gets a list of data items") APIEndpointsGetDatatypeByName = ffm("api.endpoints.getDatatypeByName", "Gets a datatype by its name and version") diff --git a/internal/data/blobstore.go b/internal/data/blobstore.go index ab585ceb9..279ef4acd 100644 --- a/internal/data/blobstore.go +++ b/internal/data/blobstore.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -111,6 +111,8 @@ func (bs *blobStore) UploadBlob(ctx context.Context, inData *core.DataRefOrValue } blob := &core.Blob{ + Namespace: bs.dm.namespace.Name, + DataID: data.ID, Hash: hash, Size: blobSize, PayloadRef: payloadRef, @@ -161,15 +163,44 @@ func (bs *blobStore) DownloadBlob(ctx context.Context, dataID string) (*core.Blo if data.Blob == nil || data.Blob.Hash == nil { return nil, nil, i18n.NewError(ctx, coremsgs.MsgDataDoesNotHaveBlob) } - - blob, err := bs.database.GetBlobMatchingHash(ctx, data.Blob.Hash) + fb := database.BlobQueryFactory.NewFilter(ctx) + blobs, _, err := bs.database.GetBlobs(ctx, bs.dm.namespace.Name, fb.And(fb.Eq("data_id", data.ID), fb.Eq("hash", data.Blob.Hash))) if err != nil { return nil, nil, err } - if blob == nil { + if len(blobs) == 0 || blobs[0] == nil { return nil, nil, i18n.NewError(ctx, coremsgs.MsgBlobNotFound, data.Blob.Hash) } + blob := blobs[0] reader, err := bs.exchange.DownloadBlob(ctx, blob.PayloadRef) return blob, reader, err } + +func (bs *blobStore) DeleteBlob(ctx context.Context, blob *core.Blob) error { + if bs.exchange == nil { + return i18n.NewError(ctx, coremsgs.MsgActionNotSupported) + } + + // Compatibility check: Previous versions of FireFly could have had multiple + // data items pointing at the same blob. We should NOT delete the blob if other + // data items still reference this blob! Look at the payloadRef to determine + // uniqueness, as of FireFly 1.2.x this will be unique per data item. + fb := database.BlobQueryFactory.NewFilter(ctx) + blobs, _, err := bs.database.GetBlobs(ctx, bs.dm.namespace.Name, fb.Eq("payloadref", blob.PayloadRef)) + if err != nil { + return err + } + if len(blobs) <= 1 { + + err := bs.exchange.DeleteBlob(ctx, blob.PayloadRef) + if err != nil { + return err + } + } + err = bs.database.DeleteBlob(ctx, blob.Sequence) + if err != nil { + return err + } + return nil +} diff --git a/internal/data/blobstore_test.go b/internal/data/blobstore_test.go index fff10348e..33efd0700 100644 --- a/internal/data/blobstore_test.go +++ b/internal/data/blobstore_test.go @@ -276,10 +276,10 @@ func TestDownloadBlobOk(t *testing.T) { Hash: blobHash, }, }, nil) - mdi.On("GetBlobMatchingHash", ctx, blobHash).Return(&core.Blob{ + mdi.On("GetBlobs", ctx, "ns1", mock.Anything).Return([]*core.Blob{{ Hash: blobHash, PayloadRef: "ns1/blob1", - }, nil) + }}, nil, nil) mdx := dm.exchange.(*dataexchangemocks.Plugin) mdx.On("DownloadBlob", ctx, "ns1/blob1").Return( @@ -322,7 +322,7 @@ func TestDownloadBlobNotFound(t *testing.T) { Hash: blobHash, }, }, nil) - mdi.On("GetBlobMatchingHash", ctx, blobHash).Return(nil, nil) + mdi.On("GetBlobs", ctx, "ns1", mock.Anything).Return(nil, nil, nil) _, _, err := dm.DownloadBlob(ctx, dataID.String()) assert.Regexp(t, "FF10239", err) @@ -345,7 +345,7 @@ func TestDownloadBlobLookupErr(t *testing.T) { Hash: blobHash, }, }, nil) - mdi.On("GetBlobMatchingHash", ctx, blobHash).Return(nil, fmt.Errorf("pop")) + mdi.On("GetBlobs", ctx, "ns1", mock.Anything).Return(nil, nil, fmt.Errorf("pop")) _, _, err := dm.DownloadBlob(ctx, dataID.String()) assert.Regexp(t, "pop", err) @@ -410,3 +410,156 @@ func TestDownloadBlobBadID(t *testing.T) { assert.Regexp(t, "FF00138", err) } + +func TestDeleteBlob(t *testing.T) { + dm, ctx, cancel := newTestDataManager(t) + mdb := dm.database.(*databasemocks.Plugin) + mdx := dm.exchange.(*dataexchangemocks.Plugin) + + defer cancel() + + blob := &core.Blob{ + Sequence: 1, + Namespace: "ns1", + Hash: fftypes.NewRandB32(), + PayloadRef: "payloadref", + Created: fftypes.Now(), + Peer: "peer", + Size: 123456, + DataID: fftypes.NewUUID(), + } + + mdb.On("GetBlobs", ctx, "ns1", mock.Anything).Return([]*core.Blob{}, &ffapi.FilterResult{}, nil) + mdx.On("DeleteBlob", ctx, "payloadref").Return(nil) + mdb.On("DeleteBlob", ctx, int64(1)).Return(nil) + + err := dm.DeleteBlob(ctx, blob) + assert.NoError(t, err) + mdb.AssertExpectations(t) + mdx.AssertExpectations(t) +} + +func TestDeleteBlobFailDX(t *testing.T) { + dm, ctx, cancel := newTestDataManager(t) + mdb := dm.database.(*databasemocks.Plugin) + mdx := dm.exchange.(*dataexchangemocks.Plugin) + + defer cancel() + + blob := &core.Blob{ + Sequence: 1, + Namespace: "ns1", + Hash: fftypes.NewRandB32(), + PayloadRef: "payloadref", + Created: fftypes.Now(), + Peer: "peer", + Size: 123456, + DataID: fftypes.NewUUID(), + } + + mdb.On("GetBlobs", ctx, "ns1", mock.Anything).Return([]*core.Blob{}, &ffapi.FilterResult{}, nil) + mdx.On("DeleteBlob", ctx, "payloadref").Return(fmt.Errorf("pop")) + + err := dm.DeleteBlob(ctx, blob) + assert.Equal(t, "pop", err.Error()) + mdb.AssertExpectations(t) + mdx.AssertExpectations(t) +} + +func TestDeleteBlobFailDB(t *testing.T) { + dm, ctx, cancel := newTestDataManager(t) + mdb := dm.database.(*databasemocks.Plugin) + mdx := dm.exchange.(*dataexchangemocks.Plugin) + + defer cancel() + + blob := &core.Blob{ + Sequence: 1, + Namespace: "ns1", + Hash: fftypes.NewRandB32(), + PayloadRef: "payloadref", + Created: fftypes.Now(), + Peer: "peer", + Size: 123456, + DataID: fftypes.NewUUID(), + } + + mdb.On("GetBlobs", ctx, "ns1", mock.Anything).Return([]*core.Blob{}, &ffapi.FilterResult{}, nil) + mdx.On("DeleteBlob", ctx, "payloadref").Return(nil) + mdb.On("DeleteBlob", ctx, int64(1)).Return(fmt.Errorf("pop")) + + err := dm.DeleteBlob(ctx, blob) + assert.Equal(t, "pop", err.Error()) + mdb.AssertExpectations(t) + mdx.AssertExpectations(t) +} + +func TestDeleteBlobDisabled(t *testing.T) { + + dm, ctx, cancel := newTestDataManager(t) + defer cancel() + dm.exchange = nil + + blob := &core.Blob{ + Sequence: 1, + Namespace: "ns1", + Hash: fftypes.NewRandB32(), + PayloadRef: "payloadref", + Created: fftypes.Now(), + Peer: "peer", + Size: 123456, + DataID: fftypes.NewUUID(), + } + + err := dm.DeleteBlob(ctx, blob) + assert.Regexp(t, "FF10414", err) +} + +func TestDeleteBlobBackwardCompatibility(t *testing.T) { + dm, ctx, cancel := newTestDataManager(t) + mdb := dm.database.(*databasemocks.Plugin) + + defer cancel() + + blob := &core.Blob{ + Sequence: 1, + Namespace: "ns1", + Hash: fftypes.NewRandB32(), + PayloadRef: "payloadref", + Created: fftypes.Now(), + Peer: "peer", + Size: 123456, + DataID: fftypes.NewUUID(), + } + + mdb.On("GetBlobs", ctx, "ns1", mock.Anything).Return([]*core.Blob{{}, {}}, &ffapi.FilterResult{}, nil) + mdb.On("DeleteBlob", ctx, int64(1)).Return(nil) + + err := dm.DeleteBlob(ctx, blob) + assert.NoError(t, err) + mdb.AssertExpectations(t) +} + +func TestDeleteBlobBackwardCompatibilityFail(t *testing.T) { + dm, ctx, cancel := newTestDataManager(t) + mdb := dm.database.(*databasemocks.Plugin) + + defer cancel() + + blob := &core.Blob{ + Sequence: 1, + Namespace: "ns1", + Hash: fftypes.NewRandB32(), + PayloadRef: "payloadref", + Created: fftypes.Now(), + Peer: "peer", + Size: 123456, + DataID: fftypes.NewUUID(), + } + + mdb.On("GetBlobs", ctx, "ns1", mock.Anything).Return(nil, nil, fmt.Errorf("pop")) + + err := dm.DeleteBlob(ctx, blob) + assert.Regexp(t, "pop", err) + mdb.AssertExpectations(t) +} diff --git a/internal/data/data_manager.go b/internal/data/data_manager.go index 0ebade7a2..f60d07133 100644 --- a/internal/data/data_manager.go +++ b/internal/data/data_manager.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -50,6 +50,7 @@ type Manager interface { UploadJSON(ctx context.Context, inData *core.DataRefOrValue) (*core.Data, error) UploadBlob(ctx context.Context, inData *core.DataRefOrValue, blob *ffapi.Multipart, autoMeta bool) (*core.Data, error) DownloadBlob(ctx context.Context, dataID string) (*core.Blob, io.ReadCloser, error) + DeleteData(ctx context.Context, dataID string) error HydrateBatch(ctx context.Context, persistedBatch *core.BatchPersisted) (*core.Batch, error) Start() WaitStop() @@ -278,7 +279,7 @@ func (dm *dataManager) UpdateMessageCache(msg *core.Message, data core.DataArray // UpdateMessageIfCached is used in order to notify the fields of a message that are not initially filled in, have been filled in. // It does not guarantee the cache is up to date, and the CacheReadOptions should be used to check you have the updated data. -// But calling this should reduce the possiblity of the CROs missing +// But calling this should reduce the possibility of the CROs missing func (dm *dataManager) UpdateMessageIfCached(ctx context.Context, msg *core.Message) { mce := dm.queryMessageCache(ctx, msg.Header.ID) if mce != nil { @@ -354,16 +355,17 @@ func (dm *dataManager) resolveRef(ctx context.Context, dataRef *core.DataRef) (* } } -func (dm *dataManager) resolveBlob(ctx context.Context, blobRef *core.BlobRef) (*core.Blob, error) { +func (dm *dataManager) resolveBlob(ctx context.Context, namespace string, blobRef *core.BlobRef, dataID *fftypes.UUID) (*core.Blob, error) { if blobRef != nil && blobRef.Hash != nil { - blob, err := dm.database.GetBlobMatchingHash(ctx, blobRef.Hash) + fb := database.BlobQueryFactory.NewFilter(ctx) + blobs, _, err := dm.database.GetBlobs(ctx, dm.dm.namespace.Name, fb.And(fb.Eq("data_id", dataID), fb.Eq("hash", blobRef.Hash))) if err != nil { return nil, err } - if blob == nil { + if len(blobs) == 0 || blobs[0] == nil { return nil, i18n.NewError(ctx, coremsgs.MsgBlobNotFound, blobRef.Hash) } - return blob, nil + return blobs[0], nil } return nil, nil } @@ -408,7 +410,7 @@ func (dm *dataManager) validateInputData(ctx context.Context, inData *core.DataR return nil, err } - blob, err := dm.resolveBlob(ctx, blobRef) + blob, err := dm.resolveBlob(ctx, dm.namespace.Name, blobRef, inData.ID) if err != nil { return nil, err } @@ -463,7 +465,7 @@ func (dm *dataManager) ResolveInlineData(ctx context.Context, newMessage *NewMes if d == nil { return i18n.NewError(ctx, coremsgs.MsgDataReferenceUnresolvable, i) } - if _, err = dm.resolveBlob(ctx, d.Blob); err != nil { + if _, err = dm.resolveBlob(ctx, dm.namespace.Name, d.Blob, d.ID); err != nil { return err } case dataOrValue.Value != nil || dataOrValue.Blob != nil: @@ -539,3 +541,44 @@ func (dm *dataManager) WriteNewMessage(ctx context.Context, newMsg *NewMessage) func (dm *dataManager) WaitStop() { dm.messageWriter.close() } + +func (dm *dataManager) DeleteData(ctx context.Context, dataID string) error { + id, err := fftypes.ParseUUID(ctx, dataID) + if err != nil { + return err + } + + data, err := dm.database.GetDataByID(ctx, dm.namespace.Name, id, false) + if err != nil { + return err + } + if data == nil { + return i18n.NewError(ctx, coremsgs.Msg404NoResult) + } + if data.Blob != nil && data.Blob.Hash != nil { + fb := database.BlobQueryFactory.NewFilter(ctx) + blobs, _, err := dm.database.GetBlobs(ctx, dm.namespace.Name, fb.And(fb.Eq("data_id", data.ID), fb.Eq("hash", data.Blob.Hash))) + if err != nil { + return err + } + for _, blob := range blobs { + if blob != nil { + err = dm.DeleteBlob(ctx, blob) + if err != nil { + return err + } + } + } + } + + // Invalidate cache entries for any messages that had these data refs + msgs, _, err := dm.database.GetMessagesForData(ctx, dm.namespace.Name, data.ID, database.MessageQueryFactory.NewFilter(ctx).And()) + if err != nil { + return err + } + for _, msg := range msgs { + dm.messageCache.Set(msg.Header.ID.String(), nil) + } + + return dm.database.DeleteData(ctx, data.Namespace, data.ID) +} diff --git a/internal/data/data_manager_test.go b/internal/data/data_manager_test.go index b01804fc6..c8bebc9d9 100644 --- a/internal/data/data_manager_test.go +++ b/internal/data/data_manager_test.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -24,6 +24,7 @@ import ( "time" "github.com/hyperledger/firefly-common/pkg/config" + "github.com/hyperledger/firefly-common/pkg/ffapi" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly/internal/cache" "github.com/hyperledger/firefly/internal/coreconfig" @@ -505,10 +506,10 @@ func TestResolveInlineDataDataToPublish(t *testing.T) { Hash: blobHash, }, }, nil) - mdi.On("GetBlobMatchingHash", ctx, blobHash).Return(&core.Blob{ + mdi.On("GetBlobs", ctx, "ns1", mock.Anything).Return([]*core.Blob{{ Hash: blobHash, PayloadRef: "blob/1", - }, nil) + }}, nil, nil) err := dm.ResolveInlineData(ctx, newMsg) assert.NoError(t, err) @@ -535,7 +536,7 @@ func TestResolveInlineDataResolveBlobFail(t *testing.T) { Hash: blobHash, }, }, nil) - mdi.On("GetBlobMatchingHash", ctx, blobHash).Return(nil, fmt.Errorf("pop")) + mdi.On("GetBlobs", ctx, "ns1", mock.Anything).Return(nil, nil, fmt.Errorf("pop")) err := dm.ResolveInlineData(ctx, newMsg) assert.EqualError(t, err, "pop") @@ -754,7 +755,7 @@ func TestValidateAndStoreBlobError(t *testing.T) { defer cancel() mdi := dm.database.(*databasemocks.Plugin) blobHash := fftypes.NewRandB32() - mdi.On("GetBlobMatchingHash", mock.Anything, blobHash).Return(nil, fmt.Errorf("pop")) + mdi.On("GetBlobs", ctx, dm.namespace.Name, mock.Anything).Return([]*core.Blob{}, nil, fmt.Errorf("pop")) _, err := dm.validateInputData(ctx, &core.DataRefOrValue{ Blob: &core.BlobRef{ Hash: blobHash, @@ -769,7 +770,7 @@ func TestValidateAndStoreBlobNotFound(t *testing.T) { defer cancel() mdi := dm.database.(*databasemocks.Plugin) blobHash := fftypes.NewRandB32() - mdi.On("GetBlobMatchingHash", mock.Anything, blobHash).Return(nil, nil) + mdi.On("GetBlobs", ctx, dm.namespace.Name, mock.Anything).Return([]*core.Blob{}, nil, nil) _, err := dm.validateInputData(ctx, &core.DataRefOrValue{ Blob: &core.BlobRef{ Hash: blobHash, @@ -1171,3 +1172,210 @@ func TestWriteNewMessageFailClosed(t *testing.T) { }) assert.Regexp(t, "FF00154", err) } + +func TestDeleteData(t *testing.T) { + dm, ctx, cancel := newTestDataManager(t) + defer cancel() + mdb := dm.database.(*databasemocks.Plugin) + mdx := dm.exchange.(*dataexchangemocks.Plugin) + + msgID := fftypes.NewUUID() + dataID := fftypes.NewUUID() + hash := fftypes.NewRandB32() + payloadRef := "payloadRef" + + data := &core.Data{ + ID: dataID, + Namespace: dm.namespace.Name, + Blob: &core.BlobRef{ + Hash: hash, + }, + } + + blob := &core.Blob{ + Sequence: 0, + Namespace: dm.namespace.Name, + PayloadRef: payloadRef, + Hash: hash, + DataID: dataID, + } + + mdb.On("GetDataByID", ctx, dm.namespace.Name, dataID, false).Return(data, nil) + mdb.On("GetBlobs", ctx, mock.Anything, mock.Anything).Return([]*core.Blob{blob}, &ffapi.FilterResult{}, nil) + mdx.On("DeleteBlob", ctx, payloadRef).Return(nil) + mdb.On("DeleteBlob", ctx, int64(0)).Return(nil) + mdb.On("GetMessagesForData", ctx, dm.namespace.Name, dataID, mock.Anything).Return([]*core.Message{ + { + Header: core.MessageHeader{ + ID: msgID, + }, + }, + }, &ffapi.FilterResult{}, nil) + mdb.On("DeleteData", ctx, dm.namespace.Name, dataID).Return(nil) + + err := dm.DeleteData(ctx, dataID.String()) + + assert.NoError(t, err) + mdb.AssertExpectations(t) +} + +func TestDeleteDataFailParseUUID(t *testing.T) { + dm, ctx, cancel := newTestDataManager(t) + defer cancel() + err := dm.DeleteData(ctx, "NOT_A_UUID") + assert.Regexp(t, "FF00138", err) +} + +func TestDeleteDataFailGetData(t *testing.T) { + dm, ctx, cancel := newTestDataManager(t) + defer cancel() + mdb := dm.database.(*databasemocks.Plugin) + + dataID := fftypes.NewUUID() + + mdb.On("GetDataByID", ctx, dm.namespace.Name, dataID, false).Return(nil, fmt.Errorf("pop")) + + err := dm.DeleteData(ctx, dataID.String()) + + assert.Regexp(t, "pop", err) + mdb.AssertExpectations(t) +} + +func TestDeleteDataNotFound(t *testing.T) { + dm, ctx, cancel := newTestDataManager(t) + defer cancel() + mdb := dm.database.(*databasemocks.Plugin) + dataID := fftypes.NewUUID() + + mdb.On("GetDataByID", ctx, dm.namespace.Name, dataID, false).Return(nil, nil) + + err := dm.DeleteData(ctx, dataID.String()) + + assert.Regexp(t, "FF10143", err) + mdb.AssertExpectations(t) +} + +func TestDeleteDataFailGetBlob(t *testing.T) { + dm, ctx, cancel := newTestDataManager(t) + defer cancel() + mdb := dm.database.(*databasemocks.Plugin) + + dataID := fftypes.NewUUID() + hash := fftypes.NewRandB32() + + data := &core.Data{ + ID: dataID, + Namespace: dm.namespace.Name, + Blob: &core.BlobRef{ + Hash: hash, + }, + } + + mdb.On("GetDataByID", ctx, dm.namespace.Name, dataID, false).Return(data, nil) + mdb.On("GetBlobs", ctx, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) + + err := dm.DeleteData(ctx, dataID.String()) + + assert.Regexp(t, "pop", err) + mdb.AssertExpectations(t) +} + +func TestDeleteDataFailDeleteBlob(t *testing.T) { + dm, ctx, cancel := newTestDataManager(t) + defer cancel() + mdb := dm.database.(*databasemocks.Plugin) + mdx := dm.exchange.(*dataexchangemocks.Plugin) + + dataID := fftypes.NewUUID() + hash := fftypes.NewRandB32() + payloadRef := "payloadRef" + + data := &core.Data{ + ID: dataID, + Namespace: dm.namespace.Name, + Blob: &core.BlobRef{ + Hash: hash, + }, + } + + blob := &core.Blob{ + Sequence: 0, + Namespace: dm.namespace.Name, + PayloadRef: payloadRef, + Hash: hash, + DataID: dataID, + } + + mdb.On("GetDataByID", ctx, dm.namespace.Name, dataID, false).Return(data, nil) + mdb.On("GetBlobs", ctx, mock.Anything, mock.Anything).Return([]*core.Blob{blob}, &ffapi.FilterResult{}, nil) + mdx.On("DeleteBlob", ctx, payloadRef).Return(nil) + mdb.On("DeleteBlob", ctx, int64(0)).Return(fmt.Errorf("pop")) + + err := dm.DeleteData(ctx, dataID.String()) + + assert.Regexp(t, "pop", err) + mdb.AssertExpectations(t) +} + +func TestDeleteDataFailGetBlobs(t *testing.T) { + dm, ctx, cancel := newTestDataManager(t) + defer cancel() + mdb := dm.database.(*databasemocks.Plugin) + + dataID := fftypes.NewUUID() + hash := fftypes.NewRandB32() + + data := &core.Data{ + ID: dataID, + Namespace: dm.namespace.Name, + Blob: &core.BlobRef{ + Hash: hash, + }, + } + + mdb.On("GetDataByID", ctx, dm.namespace.Name, dataID, false).Return(data, nil) + mdb.On("GetBlobs", ctx, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) + + err := dm.DeleteData(ctx, dataID.String()) + + assert.Regexp(t, "pop", err) + mdb.AssertExpectations(t) +} + +func TestDeleteDataFailGetMessages(t *testing.T) { + dm, ctx, cancel := newTestDataManager(t) + defer cancel() + mdb := dm.database.(*databasemocks.Plugin) + mdx := dm.exchange.(*dataexchangemocks.Plugin) + + dataID := fftypes.NewUUID() + hash := fftypes.NewRandB32() + payloadRef := "payloadRef" + + data := &core.Data{ + ID: dataID, + Namespace: dm.namespace.Name, + Blob: &core.BlobRef{ + Hash: hash, + }, + } + + blob := &core.Blob{ + Sequence: 0, + Namespace: dm.namespace.Name, + PayloadRef: payloadRef, + Hash: hash, + DataID: dataID, + } + + mdb.On("GetDataByID", ctx, dm.namespace.Name, dataID, false).Return(data, nil) + mdb.On("GetBlobs", ctx, mock.Anything, mock.Anything).Return([]*core.Blob{blob}, &ffapi.FilterResult{}, nil) + mdx.On("DeleteBlob", ctx, payloadRef).Return(nil) + mdb.On("DeleteBlob", ctx, int64(0)).Return(nil) + mdb.On("GetMessagesForData", ctx, dm.namespace.Name, dataID, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) + + err := dm.DeleteData(ctx, dataID.String()) + + assert.Regexp(t, "pop", err) + mdb.AssertExpectations(t) +} diff --git a/internal/database/sqlcommon/blob_sql.go b/internal/database/sqlcommon/blob_sql.go index 7cfc75997..98a8fb8cb 100644 --- a/internal/database/sqlcommon/blob_sql.go +++ b/internal/database/sqlcommon/blob_sql.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -23,20 +23,20 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/hyperledger/firefly-common/pkg/dbsql" "github.com/hyperledger/firefly-common/pkg/ffapi" - "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/i18n" - "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly/internal/coremsgs" "github.com/hyperledger/firefly/pkg/core" ) var ( blobColumns = []string{ + "namespace", "hash", "payload_ref", - "peer", "created", + "peer", "size", + "data_id", } blobFilterFieldMap = map[string]string{ "payloadref": "payload_ref", @@ -62,11 +62,13 @@ func (s *SQLCommon) InsertBlob(ctx context.Context, blob *core.Blob) (err error) func (s *SQLCommon) setBlobInsertValues(query sq.InsertBuilder, blob *core.Blob) sq.InsertBuilder { return query.Values( + blob.Namespace, blob.Hash, blob.PayloadRef, - blob.Peer, blob.Created, + blob.Peer, blob.Size, + blob.DataID, ) } @@ -116,11 +118,13 @@ func (s *SQLCommon) InsertBlobs(ctx context.Context, blobs []*core.Blob) (err er func (s *SQLCommon) blobResult(ctx context.Context, row *sql.Rows) (*core.Blob, error) { blob := core.Blob{} err := row.Scan( + &blob.Namespace, &blob.Hash, &blob.PayloadRef, - &blob.Peer, &blob.Created, + &blob.Peer, &blob.Size, + &blob.DataID, &blob.Sequence, ) if err != nil { @@ -129,44 +133,16 @@ func (s *SQLCommon) blobResult(ctx context.Context, row *sql.Rows) (*core.Blob, return &blob, nil } -func (s *SQLCommon) getBlobPred(ctx context.Context, desc string, pred interface{}) (message *core.Blob, err error) { - cols := append([]string{}, blobColumns...) - cols = append(cols, s.SequenceColumn()) - rows, _, err := s.Query(ctx, blobsTable, - sq.Select(cols...). - From(blobsTable). - Where(pred). - Limit(1), - ) - if err != nil { - return nil, err - } - defer rows.Close() - - if !rows.Next() { - log.L(ctx).Debugf("Blob '%s' not found", desc) - return nil, nil - } - - blob, err := s.blobResult(ctx, rows) - if err != nil { - return nil, err - } - - return blob, nil -} - -func (s *SQLCommon) GetBlobMatchingHash(ctx context.Context, hash *fftypes.Bytes32) (message *core.Blob, err error) { - return s.getBlobPred(ctx, hash.String(), sq.Eq{ - "hash": hash, - }) -} - -func (s *SQLCommon) GetBlobs(ctx context.Context, filter ffapi.Filter) (message []*core.Blob, res *ffapi.FilterResult, err error) { +func (s *SQLCommon) GetBlobs(ctx context.Context, namespace string, filter ffapi.Filter) (message []*core.Blob, res *ffapi.FilterResult, err error) { cols := append([]string{}, blobColumns...) cols = append(cols, s.SequenceColumn()) - query, fop, fi, err := s.FilterSelect(ctx, "", sq.Select(cols...).From(blobsTable), filter, blobFilterFieldMap, []interface{}{"sequence"}) + query, fop, fi, err := s.FilterSelect( + ctx, + "", + sq.Select(cols...).From(blobsTable), filter, blobFilterFieldMap, + []interface{}{"sequence"}, + sq.Eq{"namespace": namespace}) if err != nil { return nil, nil, err } diff --git a/internal/database/sqlcommon/blob_sql_test.go b/internal/database/sqlcommon/blob_sql_test.go index 8e588330b..3c8df1fb2 100644 --- a/internal/database/sqlcommon/blob_sql_test.go +++ b/internal/database/sqlcommon/blob_sql_test.go @@ -31,6 +31,8 @@ import ( ) func TestBlobsE2EWithDB(t *testing.T) { + dataID := fftypes.NewUUID() + namespace := "e2e" log.SetLevel("debug") s, cleanup := newSQLiteTestProvider(t) @@ -39,17 +41,21 @@ func TestBlobsE2EWithDB(t *testing.T) { // Create a new blob entry blob := &core.Blob{ + Namespace: namespace, Hash: fftypes.NewRandB32(), Size: 12345, PayloadRef: fftypes.NewRandB32().String(), Peer: "peer1", Created: fftypes.Now(), + DataID: dataID, } err := s.InsertBlob(ctx, blob) assert.NoError(t, err) // Check we get the exact same blob back - blobRead, err := s.GetBlobMatchingHash(ctx, blob.Hash) + fb := database.BlobQueryFactory.NewFilter(ctx) + blobs, _, err := s.GetBlobs(ctx, namespace, fb.Eq("payloadref", blob.PayloadRef)) + blobRead := blobs[0] assert.NoError(t, err) assert.NotNil(t, blobRead) blobJson, _ := json.Marshal(&blob) @@ -57,13 +63,13 @@ func TestBlobsE2EWithDB(t *testing.T) { assert.Equal(t, string(blobJson), string(blobReadJson)) // Query back the blob - fb := database.BlobQueryFactory.NewFilter(ctx) + fb = database.BlobQueryFactory.NewFilter(ctx) filter := fb.And( fb.Eq("hash", blob.Hash), fb.Eq("payloadref", blob.PayloadRef), fb.Eq("created", blob.Created), ) - blobRes, res, err := s.GetBlobs(ctx, filter.Count(true)) + blobRes, res, err := s.GetBlobs(ctx, namespace, filter.Count(true)) assert.NoError(t, err) assert.Equal(t, 1, len(blobRes)) assert.Equal(t, int64(1), *res.TotalCount) @@ -74,7 +80,7 @@ func TestBlobsE2EWithDB(t *testing.T) { // Test delete err = s.DeleteBlob(ctx, blob.Sequence) assert.NoError(t, err) - blobs, _, err := s.GetBlobs(ctx, filter) + blobs, _, err = s.GetBlobs(ctx, namespace, filter) assert.NoError(t, err) assert.Equal(t, 0, len(blobs)) @@ -161,36 +167,36 @@ func TestInsertBlobsSingleRowFail(t *testing.T) { s.callbacks.AssertExpectations(t) } -func TestGetBlobByIDSelectFail(t *testing.T) { - s, mock := newMockProvider().init() - mock.ExpectQuery("SELECT .*").WillReturnError(fmt.Errorf("pop")) - _, err := s.GetBlobMatchingHash(context.Background(), fftypes.NewRandB32()) - assert.Regexp(t, "FF00176", err) - assert.NoError(t, mock.ExpectationsWereMet()) -} - -func TestGetBlobByIDNotFound(t *testing.T) { - s, mock := newMockProvider().init() - mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{})) - msg, err := s.GetBlobMatchingHash(context.Background(), fftypes.NewRandB32()) - assert.NoError(t, err) - assert.Nil(t, msg) - assert.NoError(t, mock.ExpectationsWereMet()) -} - -func TestGetBlobByIDScanFail(t *testing.T) { - s, mock := newMockProvider().init() - mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"hash"}).AddRow("only one")) - _, err := s.GetBlobMatchingHash(context.Background(), fftypes.NewRandB32()) - assert.Regexp(t, "FF10121", err) - assert.NoError(t, mock.ExpectationsWereMet()) -} +// func TestGetBlobByIDSelectFail(t *testing.T) { +// s, mock := newMockProvider().init() +// mock.ExpectQuery("SELECT .*").WillReturnError(fmt.Errorf("pop")) +// _, err := s.GetBlob(context.Background(), "ns1", fftypes.NewUUID(), fftypes.NewRandB32()) +// assert.Regexp(t, "FF00176", err) +// assert.NoError(t, mock.ExpectationsWereMet()) +// } + +// func TestGetBlobByIDNotFound(t *testing.T) { +// s, mock := newMockProvider().init() +// mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{})) +// msg, err := s.GetBlob(context.Background(), "ns1", fftypes.NewUUID(), fftypes.NewRandB32()) +// assert.NoError(t, err) +// assert.Nil(t, msg) +// assert.NoError(t, mock.ExpectationsWereMet()) +// } + +// func TestGetBlobByIDScanFail(t *testing.T) { +// s, mock := newMockProvider().init() +// mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"hash"}).AddRow("only one")) +// _, err := s.GetBlob(context.Background(), "ns1", fftypes.NewUUID(), fftypes.NewRandB32()) +// assert.Regexp(t, "FF10121", err) +// assert.NoError(t, mock.ExpectationsWereMet()) +// } func TestGetBlobQueryFail(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectQuery("SELECT .*").WillReturnError(fmt.Errorf("pop")) f := database.BlobQueryFactory.NewFilter(context.Background()).Eq("hash", "") - _, _, err := s.GetBlobs(context.Background(), f) + _, _, err := s.GetBlobs(context.Background(), "ns1", f) assert.Regexp(t, "FF00176", err) assert.NoError(t, mock.ExpectationsWereMet()) } @@ -198,7 +204,7 @@ func TestGetBlobQueryFail(t *testing.T) { func TestGetBlobBuildQueryFail(t *testing.T) { s, _ := newMockProvider().init() f := database.BlobQueryFactory.NewFilter(context.Background()).Eq("hash", map[bool]bool{true: false}) - _, _, err := s.GetBlobs(context.Background(), f) + _, _, err := s.GetBlobs(context.Background(), "ns1", f) assert.Regexp(t, "FF00143.*type", err) } @@ -206,7 +212,7 @@ func TestGetBlobReadMessageFail(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"hash"}).AddRow("only one")) f := database.BlobQueryFactory.NewFilter(context.Background()).Eq("hash", "") - _, _, err := s.GetBlobs(context.Background(), f) + _, _, err := s.GetBlobs(context.Background(), "ns1", f) assert.Regexp(t, "FF10121", err) assert.NoError(t, mock.ExpectationsWereMet()) } diff --git a/internal/database/sqlcommon/data_sql.go b/internal/database/sqlcommon/data_sql.go index 312fd8210..29e4ba46e 100644 --- a/internal/database/sqlcommon/data_sql.go +++ b/internal/database/sqlcommon/data_sql.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -371,3 +371,19 @@ func (s *SQLCommon) UpdateData(ctx context.Context, namespace string, id *fftype return s.CommitTx(ctx, tx, autoCommit) } + +func (s *SQLCommon) DeleteData(ctx context.Context, namespace string, id *fftypes.UUID) (err error) { + ctx, tx, autoCommit, err := s.BeginOrUseTx(ctx) + if err != nil { + return err + } + defer s.RollbackTx(ctx, tx, autoCommit) + + err = s.DeleteTx(ctx, blobsTable, tx, sq.Delete(dataTable).Where(sq.Eq{"id": id, "namespace": namespace}), + nil /* no change events for blobs */) + if err != nil { + return err + } + + return s.CommitTx(ctx, tx, autoCommit) +} diff --git a/internal/database/sqlcommon/data_sql_test.go b/internal/database/sqlcommon/data_sql_test.go index 873d1ccc9..6162a09ef 100644 --- a/internal/database/sqlcommon/data_sql_test.go +++ b/internal/database/sqlcommon/data_sql_test.go @@ -165,6 +165,13 @@ func TestDataE2EWithDB(t *testing.T) { assert.Equal(t, int64(1), *res.TotalCount) s.callbacks.AssertExpectations(t) + + // Delete + err = s.DeleteData(ctx, "ns1", dataID) + assert.NoError(t, err) + dataRes, res, err = s.GetData(ctx, "ns1", filter.Count(true)) + assert.NoError(t, err) + assert.Len(t, dataRes, 0) } func TestUpsertDataFailBegin(t *testing.T) { @@ -381,3 +388,20 @@ func TestDataUpdateFail(t *testing.T) { err := s.UpdateData(context.Background(), "ns1", fftypes.NewUUID(), u) assert.Regexp(t, "FF00178", err) } + +func TestDeleteDataFailBegin(t *testing.T) { + s, mock := newMockProvider().init() + mock.ExpectBegin().WillReturnError(fmt.Errorf("pop")) + err := s.DeleteData(context.Background(), "ns1", fftypes.NewUUID()) + assert.Regexp(t, "FF00175", err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestDataDeleteFail(t *testing.T) { + s, mock := newMockProvider().init() + mock.ExpectBegin() + mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop")) + mock.ExpectRollback() + err := s.DeleteData(context.Background(), "ns1", fftypes.NewUUID()) + assert.Regexp(t, "FF00179", err) +} diff --git a/internal/dataexchange/ffdx/dxevent.go b/internal/dataexchange/ffdx/dxevent.go index 4b26547e7..b80b1f7d5 100644 --- a/internal/dataexchange/ffdx/dxevent.go +++ b/internal/dataexchange/ffdx/dxevent.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -83,6 +83,7 @@ func (e *dxEvent) PrivateBlobReceived() *dataexchange.PrivateBlobReceived { } func (h *FFDX) dispatchEvent(msg *wsEvent) { + var dataID string var namespace string var err error e := &dxEvent{ffdx: h, id: msg.EventID} @@ -179,7 +180,7 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) { var hash *fftypes.Bytes32 hash, err = fftypes.ParseBytes32(h.ctx, msg.Hash) if err == nil { - _, namespace, _ = splitBlobPath(msg.Path) + _, namespace, dataID = splitBlobPath(msg.Path) e.dxType = dataexchange.DXEventTypePrivateBlobReceived e.privateBlobReceived = &dataexchange.PrivateBlobReceived{ Namespace: namespace, @@ -187,6 +188,7 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) { Hash: *hash, Size: msg.Size, PayloadRef: msg.Path, + DataID: dataID, } } diff --git a/internal/dataexchange/ffdx/ffdx.go b/internal/dataexchange/ffdx/ffdx.go index da5b49c6d..109cc1d6c 100644 --- a/internal/dataexchange/ffdx/ffdx.go +++ b/internal/dataexchange/ffdx/ffdx.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -332,6 +332,19 @@ func (h *FFDX) DownloadBlob(ctx context.Context, payloadRef string) (content io. return res.RawBody(), nil } +func (h *FFDX) DeleteBlob(ctx context.Context, payloadRef string) (err error) { + res, err := h.client.R().SetContext(ctx). + SetDoNotParseResponse(true). + Delete(fmt.Sprintf("/api/v1/blobs/%s", payloadRef)) + if err != nil || !res.IsSuccess() { + if err == nil { + _ = res.RawBody().Close() + } + return ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgDXRESTErr) + } + return nil +} + func (h *FFDX) SendMessage(ctx context.Context, nsOpID string, peer, sender fftypes.JSONObject, data []byte) (err error) { if err := h.checkInitialized(ctx); err != nil { return err diff --git a/internal/dataexchange/ffdx/ffdx_test.go b/internal/dataexchange/ffdx/ffdx_test.go index 02060a109..4693876d6 100644 --- a/internal/dataexchange/ffdx/ffdx_test.go +++ b/internal/dataexchange/ffdx/ffdx_test.go @@ -745,3 +745,29 @@ func TestDXUninitialized(t *testing.T) { err = h.SendMessage(context.Background(), "ns1:"+fftypes.NewUUID().String(), peer, sender, []byte(`some data`)) assert.Regexp(t, "FF10342", err) } + +func TestDeleteBlob(t *testing.T) { + + h, _, _, httpURL, done := newTestFFDX(t, false) + defer done() + + u := fftypes.NewUUID() + httpmock.RegisterResponder("DELETE", fmt.Sprintf("%s/api/v1/blobs/ns1/%s", httpURL, u), + httpmock.NewBytesResponder(204, []byte(``))) + + err := h.DeleteBlob(context.Background(), fmt.Sprintf("ns1/%s", u)) + assert.NoError(t, err) +} + +func TestDeleteBlobFail(t *testing.T) { + + h, _, _, httpURL, done := newTestFFDX(t, false) + defer done() + + u := fftypes.NewUUID() + httpmock.RegisterResponder("DELETE", fmt.Sprintf("%s/api/v1/blobs/ns1/%s", httpURL, u), + httpmock.NewBytesResponder(500, []byte(`ERROR`))) + + err := h.DeleteBlob(context.Background(), fmt.Sprintf("ns1/%s", u)) + assert.Regexp(t, "FF10229", err) +} diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index 2a46c1078..c7b1b4cd2 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -649,12 +649,13 @@ func (ag *aggregator) resolveBlobs(ctx context.Context, data core.DataArray) (re } // See if we already have the data - blob, err := ag.database.GetBlobMatchingHash(ctx, d.Blob.Hash) + fb := database.BlobQueryFactory.NewFilter(ctx) + blobs, _, err := ag.database.GetBlobs(ctx, ag.namespace, fb.And(fb.Eq("data_id", d.ID), fb.Eq("hash", d.Blob.Hash))) if err != nil { return false, err } - if blob != nil { - l.Debugf("Blob '%s' found in local DX with ref '%s'", blob.Hash, blob.PayloadRef) + if len(blobs) > 0 && blobs[0] != nil { + l.Debugf("Blob '%s' found in local DX with ref '%s'", blobs[0].Hash, blobs[0].PayloadRef) continue } diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index 48c2aa588..f4da43bf6 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -1489,7 +1489,7 @@ func TestAttemptMessageDispatchMissingBlobs(t *testing.T) { org1 := newTestOrg("org1") ag.mim.On("FindIdentityForVerifier", ag.ctx, mock.Anything, mock.Anything).Return(org1, nil) - ag.mdi.On("GetBlobMatchingHash", ag.ctx, blobHash).Return(nil, nil) + ag.mdi.On("GetBlobs", ag.ctx, mock.Anything, mock.Anything).Return([]*core.Blob{}, nil, nil) _, dispatched, err := ag.attemptMessageDispatch(ag.ctx, &core.Message{ Header: core.MessageHeader{ID: fftypes.NewUUID(), SignerRef: core.SignerRef{Key: "0x12345", Author: org1.DID}}, @@ -1743,7 +1743,7 @@ func TestDispatchPrivateQueuesLaterDispatch(t *testing.T) { ag.mim.On("FindIdentityForVerifier", ag.ctx, mock.Anything, mock.Anything).Return(org1, nil) data1 := core.DataArray{} - data2 := core.DataArray{{Blob: &core.BlobRef{Hash: fftypes.NewRandB32()}}} + data2 := core.DataArray{{Namespace: "ns1", Blob: &core.BlobRef{Hash: fftypes.NewRandB32()}}} ag.mdm.On("GetMessageWithDataCached", ag.ctx, msg1.Header.ID, data.CRORequirePins).Return(msg1, data1, true, nil).Once() ag.mdm.On("ValidateAll", ag.ctx, data1).Return(true, nil) ag.mdm.On("GetMessageWithDataCached", ag.ctx, msg2.Header.ID, data.CRORequirePins).Return(msg2, data2, true, nil).Once() @@ -1756,7 +1756,7 @@ func TestDispatchPrivateQueuesLaterDispatch(t *testing.T) { ag.mdi.On("GetNextPinsForContext", ag.ctx, "ns1", mock.Anything).Return([]*core.NextPin{ {Context: context, Nonce: 1 /* match member1NonceOne */, Identity: org1.DID, Hash: member1NonceOne}, }, nil).Once() - ag.mdi.On("GetBlobMatchingHash", ag.ctx, data2[0].Blob.Hash).Return(nil, nil) + ag.mdi.On("GetBlobs", ag.ctx, "ns1", mock.Anything).Return([]*core.Blob{}, nil, nil) msg1.Pins = fftypes.FFStringArray{member1NonceOne.String()} msg2.Pins = fftypes.FFStringArray{member1NonceTwo.String()} @@ -2040,7 +2040,7 @@ func TestResolveBlobsErrorGettingHash(t *testing.T) { ag := newTestAggregator() defer ag.cleanup(t) - ag.mdi.On("GetBlobMatchingHash", ag.ctx, mock.Anything).Return(nil, fmt.Errorf("pop")) + ag.mdi.On("GetBlobs", ag.ctx, mock.Anything, mock.Anything).Return([]*core.Blob{}, nil, fmt.Errorf("pop")) resolved, err := ag.resolveBlobs(ag.ctx, core.DataArray{ {ID: fftypes.NewUUID(), Blob: &core.BlobRef{ @@ -2056,7 +2056,7 @@ func TestResolveBlobsNotFoundPrivate(t *testing.T) { ag := newTestAggregator() defer ag.cleanup(t) - ag.mdi.On("GetBlobMatchingHash", ag.ctx, mock.Anything).Return(nil, nil) + ag.mdi.On("GetBlobs", ag.ctx, mock.Anything, mock.Anything).Return([]*core.Blob{}, nil, nil) resolved, err := ag.resolveBlobs(ag.ctx, core.DataArray{ {ID: fftypes.NewUUID(), Blob: &core.BlobRef{ @@ -2072,7 +2072,7 @@ func TestResolveBlobsFoundPrivate(t *testing.T) { ag := newTestAggregator() defer ag.cleanup(t) - ag.mdi.On("GetBlobMatchingHash", ag.ctx, mock.Anything).Return(&core.Blob{}, nil) + ag.mdi.On("GetBlobs", ag.ctx, mock.Anything, mock.Anything).Return([]*core.Blob{{}}, nil, nil) resolved, err := ag.resolveBlobs(ag.ctx, core.DataArray{ {ID: fftypes.NewUUID(), Blob: &core.BlobRef{ diff --git a/internal/events/batch_pin_complete_test.go b/internal/events/batch_pin_complete_test.go index 873af20da..755792ab6 100644 --- a/internal/events/batch_pin_complete_test.go +++ b/internal/events/batch_pin_complete_test.go @@ -677,7 +677,7 @@ func TestPersistBatchDataWithPublicAlreaydDownloadedOk(t *testing.T) { }} batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data}, blob) - em.mdi.On("GetBlobMatchingHash", mock.Anything, blob.Hash).Return(blob, nil) + em.mdi.On("GetBlobs", mock.Anything, mock.Anything, mock.Anything).Return([]*core.Blob{blob}, nil, nil) valid, err := em.checkAndInitiateBlobDownloads(context.Background(), batch, 0, data) assert.Nil(t, err) @@ -700,7 +700,7 @@ func TestPersistBatchDataWithPublicInitiateDownload(t *testing.T) { }} batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data}, blob) - em.mdi.On("GetBlobMatchingHash", mock.Anything, blob.Hash).Return(nil, nil) + em.mdi.On("GetBlobs", mock.Anything, mock.Anything, mock.Anything).Return([]*core.Blob{}, nil, nil) em.msd.On("InitiateDownloadBlob", mock.Anything, batch.Payload.TX.ID, data.ID, "ref1").Return(nil) @@ -725,7 +725,7 @@ func TestPersistBatchDataWithPublicInitiateDownloadFail(t *testing.T) { }} batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data}, blob) - em.mdi.On("GetBlobMatchingHash", mock.Anything, blob.Hash).Return(nil, nil) + em.mdi.On("GetBlobs", mock.Anything, mock.Anything, mock.Anything).Return([]*core.Blob{}, nil, nil) em.msd.On("InitiateDownloadBlob", mock.Anything, batch.Payload.TX.ID, data.ID, "ref1").Return(fmt.Errorf("pop")) @@ -750,7 +750,7 @@ func TestPersistBatchDataWithBlobGetBlobFail(t *testing.T) { }} batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data}, blob) - em.mdi.On("GetBlobMatchingHash", mock.Anything, blob.Hash).Return(nil, fmt.Errorf("pop")) + em.mdi.On("GetBlobs", mock.Anything, mock.Anything, mock.Anything).Return([]*core.Blob{}, nil, fmt.Errorf("pop")) valid, err := em.checkAndInitiateBlobDownloads(context.Background(), batch, 0, data) assert.Regexp(t, "pop", err) diff --git a/internal/events/blob_receiver.go b/internal/events/blob_receiver.go index 3b099bece..9f1bbf77f 100644 --- a/internal/events/blob_receiver.go +++ b/internal/events/blob_receiver.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -202,7 +202,7 @@ func (br *blobReceiver) insertNewBlobs(ctx context.Context, notifications []*blo // even if the hash of that data is the same. fb := database.BlobQueryFactory.NewFilter(ctx) filter := fb.In("hash", allHashes) - existingBlobs, _, err := br.database.GetBlobs(ctx, filter) + existingBlobs, _, err := br.database.GetBlobs(ctx, br.aggregator.namespace, filter) if err != nil { return nil, err } diff --git a/internal/events/blob_receiver_test.go b/internal/events/blob_receiver_test.go index 938bd95d2..55e3c93f4 100644 --- a/internal/events/blob_receiver_test.go +++ b/internal/events/blob_receiver_test.go @@ -31,7 +31,7 @@ func TestBlobReceiverBackgroundDispatchOK(t *testing.T) { defer em.cleanup(t) em.blobReceiver.start() - em.mdi.On("GetBlobs", mock.Anything, mock.Anything).Return([]*core.Blob{}, nil, nil) + em.mdi.On("GetBlobs", mock.Anything, mock.Anything, mock.Anything).Return([]*core.Blob{}, nil, nil) em.mdi.On("InsertBlobs", mock.Anything, mock.Anything).Return(nil, nil) blobHash := fftypes.NewRandB32() @@ -78,7 +78,7 @@ func TestBlobReceiverBackgroundDispatchFail(t *testing.T) { em.blobReceiver.start() done := make(chan struct{}) - em.mdi.On("GetBlobs", mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop")).Run(func(args mock.Arguments) { + em.mdi.On("GetBlobs", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop")).Run(func(args mock.Arguments) { em.cancel() close(done) }) @@ -101,7 +101,7 @@ func TestBlobReceiverDispatchDup(t *testing.T) { blobHash := fftypes.NewRandB32() - em.mdi.On("GetBlobs", mock.Anything, mock.Anything).Return([]*core.Blob{ + em.mdi.On("GetBlobs", mock.Anything, mock.Anything, mock.Anything).Return([]*core.Blob{ {Hash: blobHash, PayloadRef: "payload1"}, }, nil, nil) diff --git a/internal/events/dx_callbacks.go b/internal/events/dx_callbacks.go index 08e8dce5c..d199c31d6 100644 --- a/internal/events/dx_callbacks.go +++ b/internal/events/dx_callbacks.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -210,14 +210,22 @@ func (em *eventManager) privateBlobReceived(dx dataexchange.Plugin, event dataex return } + dataID, err := fftypes.ParseUUID(em.ctx, br.DataID) + if err != nil { + log.L(em.ctx).Warnf("Ignoring blob with invalid data ID '%s'", br.DataID) + return + } + // Dispatch to the blob receiver for efficient batch DB operations em.blobReceiver.blobReceived(em.ctx, &blobNotification{ blob: &core.Blob{ + Namespace: em.namespace.Name, Peer: br.PeerID, PayloadRef: br.PayloadRef, Hash: &br.Hash, Size: br.Size, Created: fftypes.Now(), + DataID: dataID, }, onComplete: func() { event.Ack() diff --git a/internal/events/dx_callbacks_test.go b/internal/events/dx_callbacks_test.go index 0b272a5bf..c34529a2f 100644 --- a/internal/events/dx_callbacks_test.go +++ b/internal/events/dx_callbacks_test.go @@ -94,7 +94,7 @@ func newMessageReceived(peerID string, transport *core.TransportWrapper, expecte return mde } -func newPrivateBlobReceivedNoAck(peerID string, hash *fftypes.Bytes32, size int64, payloadRef string) *dataexchangemocks.DXEvent { +func newPrivateBlobReceivedNoAck(peerID string, hash *fftypes.Bytes32, size int64, payloadRef string, dataID *fftypes.UUID) *dataexchangemocks.DXEvent { mde := &dataexchangemocks.DXEvent{} pathParts := strings.Split(payloadRef, "/") mde.On("PrivateBlobReceived").Return(&dataexchange.PrivateBlobReceived{ @@ -103,13 +103,14 @@ func newPrivateBlobReceivedNoAck(peerID string, hash *fftypes.Bytes32, size int6 Hash: *hash, Size: size, PayloadRef: payloadRef, + DataID: dataID.String(), }) mde.On("Type").Return(dataexchange.DXEventTypePrivateBlobReceived).Maybe() return mde } -func newPrivateBlobReceived(peerID string, hash *fftypes.Bytes32, size int64, payloadRef string) *dataexchangemocks.DXEvent { - mde := newPrivateBlobReceivedNoAck(peerID, hash, size, payloadRef) +func newPrivateBlobReceived(peerID string, hash *fftypes.Bytes32, size int64, payloadRef string, dataID *fftypes.UUID) *dataexchangemocks.DXEvent { + mde := newPrivateBlobReceivedNoAck(peerID, hash, size, payloadRef, dataID) mde.On("Ack").Return() return mde } @@ -415,11 +416,11 @@ func TestPrivateBlobReceivedTriggersRewindOk(t *testing.T) { mdx := &dataexchangemocks.Plugin{} mdx.On("Name").Return("utdx") - em.mdi.On("GetBlobs", em.ctx, mock.Anything).Return([]*core.Blob{}, nil, nil) + em.mdi.On("GetBlobs", em.ctx, mock.Anything, mock.Anything).Return([]*core.Blob{}, nil, nil) em.mdi.On("InsertBlobs", em.ctx, mock.Anything).Return(nil) done := make(chan struct{}) - mde := newPrivateBlobReceivedNoAck("peer1", hash, 12345, "ns1/path1") + mde := newPrivateBlobReceivedNoAck("peer1", hash, 12345, "ns1/path1", fftypes.NewUUID()) mde.On("Ack").Run(func(args mock.Arguments) { close(done) }) @@ -439,7 +440,20 @@ func TestPrivateBlobReceivedBadEvent(t *testing.T) { mdx := &dataexchangemocks.Plugin{} mdx.On("Name").Return("utdx") - mde := newPrivateBlobReceived("", fftypes.NewRandB32(), 12345, "") + mde := newPrivateBlobReceived("", fftypes.NewRandB32(), 12345, "", fftypes.NewUUID()) + em.privateBlobReceived(mdx, mde) + mde.AssertExpectations(t) +} + +func TestPrivateBlobReceivedBadDataID(t *testing.T) { + em := newTestEventManager(t) + defer em.cleanup(t) + + mdx := &dataexchangemocks.Plugin{} + mdx.On("Name").Return("utdx") + + mde := newPrivateBlobReceivedNoAck("peer", fftypes.NewRandB32(), 12345, "ns1", fftypes.NewUUID()) + mde.PrivateBlobReceived().DataID = "bad" em.privateBlobReceived(mdx, mde) mde.AssertExpectations(t) } @@ -453,11 +467,11 @@ func TestPrivateBlobReceivedInsertBlobFails(t *testing.T) { mdx := &dataexchangemocks.Plugin{} mdx.On("Name").Return("utdx") - em.mdi.On("GetBlobs", em.ctx, mock.Anything).Return([]*core.Blob{}, nil, nil) + em.mdi.On("GetBlobs", em.ctx, mock.Anything, mock.Anything).Return([]*core.Blob{}, nil, nil) em.mdi.On("InsertBlobs", em.ctx, mock.Anything).Return(fmt.Errorf("pop")) // no ack as we are simulating termination mid retry - mde := newPrivateBlobReceivedNoAck("peer1", hash, 12345, "ns1/path1") + mde := newPrivateBlobReceivedNoAck("peer1", hash, 12345, "ns1/path1", fftypes.NewUUID()) em.privateBlobReceived(mdx, mde) mde.AssertExpectations(t) @@ -472,10 +486,10 @@ func TestPrivateBlobReceivedGetBlobsFails(t *testing.T) { mdx := &dataexchangemocks.Plugin{} mdx.On("Name").Return("utdx") - em.mdi.On("GetBlobs", em.ctx, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) + em.mdi.On("GetBlobs", em.ctx, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) // no ack as we are simulating termination mid retry - mde := newPrivateBlobReceivedNoAck("peer1", hash, 12345, "ns1/path1") + mde := newPrivateBlobReceivedNoAck("peer1", hash, 12345, "ns1/path1", fftypes.NewUUID()) em.privateBlobReceived(mdx, mde) mde.AssertExpectations(t) @@ -491,7 +505,7 @@ func TestPrivateBlobReceivedWrongNS(t *testing.T) { mdx := &dataexchangemocks.Plugin{} mdx.On("Name").Return("utdx") - mde := newPrivateBlobReceived("peer1", hash, 12345, "ns1/path1") + mde := newPrivateBlobReceived("peer1", hash, 12345, "ns1/path1", fftypes.NewUUID()) em.privateBlobReceived(mdx, mde) mde.AssertExpectations(t) diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index 7f511ba60..c0322f8cf 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -75,7 +75,7 @@ type EventManager interface { // Bound sharedstorage callbacks SharedStorageBatchDownloaded(ss sharedstorage.Plugin, payloadRef string, data []byte) (*fftypes.UUID, error) - SharedStorageBlobDownloaded(ss sharedstorage.Plugin, hash fftypes.Bytes32, size int64, payloadRef string) + SharedStorageBlobDownloaded(ss sharedstorage.Plugin, hash fftypes.Bytes32, size int64, payloadRef string, dataID *fftypes.UUID) // Bound token callbacks TokenPoolCreated(ti tokens.Plugin, pool *tokens.TokenPool) error diff --git a/internal/events/persist_batch.go b/internal/events/persist_batch.go index 2896256c4..6fdd84c0b 100644 --- a/internal/events/persist_batch.go +++ b/internal/events/persist_batch.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -166,11 +166,12 @@ func (em *eventManager) checkAndInitiateBlobDownloads(ctx context.Context, batch if data.Blob != nil && batch.Type == core.BatchTypeBroadcast { // Need to check if we need to initiate a download - blob, err := em.database.GetBlobMatchingHash(ctx, data.Blob.Hash) + fb := database.BlobQueryFactory.NewFilter(ctx) + blobs, _, err := em.database.GetBlobs(ctx, em.namespace.Name, fb.And(fb.Eq("data_id", data.ID), fb.Eq("hash", data.Blob.Hash))) if err != nil { return false, err } - if blob == nil { + if len(blobs) == 0 || blobs[0] == nil { if data.Blob.Public == "" { log.L(ctx).Errorf("Invalid data entry %d id=%s in batch '%s' - missing public blob reference", i, data.ID, batch.ID) return false, nil diff --git a/internal/events/persist_batch_test.go b/internal/events/persist_batch_test.go index cd6bf4af5..49c142da6 100644 --- a/internal/events/persist_batch_test.go +++ b/internal/events/persist_batch_test.go @@ -310,7 +310,7 @@ func TestPersistBatchContentDataMissingBlobRef(t *testing.T) { }} batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data}, blob) - em.mdi.On("GetBlobMatchingHash", mock.Anything, mock.Anything).Return(nil, nil) + em.mdi.On("GetBlobs", mock.Anything, mock.Anything, mock.Anything).Return([]*core.Blob{}, nil, nil) ok, err := em.validateAndPersistBatchContent(em.ctx, batch) assert.NoError(t, err) diff --git a/internal/events/ss_callbacks.go b/internal/events/ss_callbacks.go index 6f558a2eb..1da86c62d 100644 --- a/internal/events/ss_callbacks.go +++ b/internal/events/ss_callbacks.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -69,7 +69,7 @@ func (em *eventManager) SharedStorageBatchDownloaded(ss sharedstorage.Plugin, pa return batch.ID, nil } -func (em *eventManager) SharedStorageBlobDownloaded(ss sharedstorage.Plugin, hash fftypes.Bytes32, size int64, payloadRef string) { +func (em *eventManager) SharedStorageBlobDownloaded(ss sharedstorage.Plugin, hash fftypes.Bytes32, size int64, payloadRef string, dataID *fftypes.UUID) { l := log.L(em.ctx) l.Infof("Blob received event from public storage %s: Hash='%v'", ss.Name(), hash) @@ -77,10 +77,12 @@ func (em *eventManager) SharedStorageBlobDownloaded(ss sharedstorage.Plugin, has blobHash := hash em.blobReceiver.blobReceived(em.ctx, &blobNotification{ blob: &core.Blob{ + Namespace: em.namespace.Name, PayloadRef: payloadRef, Hash: &blobHash, Size: size, Created: fftypes.Now(), + DataID: dataID, }, }) } diff --git a/internal/events/ss_callbacks_test.go b/internal/events/ss_callbacks_test.go index 607699cd3..0a56dba9c 100644 --- a/internal/events/ss_callbacks_test.go +++ b/internal/events/ss_callbacks_test.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -141,11 +141,12 @@ func TestSharedStorageBlobDownloadedOk(t *testing.T) { mss := &sharedstoragemocks.Plugin{} mss.On("Name").Return("utsd") - em.mdi.On("GetBlobs", em.ctx, mock.Anything).Return([]*core.Blob{}, nil, nil) + em.mdi.On("GetBlobs", em.ctx, mock.Anything, mock.Anything).Return(nil, nil, nil) em.mdi.On("InsertBlobs", em.ctx, mock.Anything).Return(nil, nil) hash := fftypes.NewRandB32() - em.SharedStorageBlobDownloaded(mss, *hash, 12345, "payload1") + dataID := fftypes.NewUUID() + em.SharedStorageBlobDownloaded(mss, *hash, 12345, "payload1", dataID) brw := <-em.aggregator.rewinder.rewindRequests assert.Equal(t, rewind{hash: *hash, rewindType: rewindBlob}, brw) diff --git a/internal/orchestrator/bound_callbacks.go b/internal/orchestrator/bound_callbacks.go index addc4ab13..182f90451 100644 --- a/internal/orchestrator/bound_callbacks.go +++ b/internal/orchestrator/bound_callbacks.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -38,6 +38,6 @@ func (bc *boundCallbacks) SharedStorageBatchDownloaded(payloadRef string, data [ return bc.ei.SharedStorageBatchDownloaded(bc.ss, payloadRef, data) } -func (bc *boundCallbacks) SharedStorageBlobDownloaded(hash fftypes.Bytes32, size int64, payloadRef string) { - bc.ei.SharedStorageBlobDownloaded(bc.ss, hash, size, payloadRef) +func (bc *boundCallbacks) SharedStorageBlobDownloaded(hash fftypes.Bytes32, size int64, payloadRef string, dataID *fftypes.UUID) { + bc.ei.SharedStorageBlobDownloaded(bc.ss, hash, size, payloadRef, dataID) } diff --git a/internal/orchestrator/bound_callbacks_test.go b/internal/orchestrator/bound_callbacks_test.go index 662b4368a..3644f3255 100644 --- a/internal/orchestrator/bound_callbacks_test.go +++ b/internal/orchestrator/bound_callbacks_test.go @@ -38,6 +38,7 @@ func TestBoundCallbacks(t *testing.T) { hash := fftypes.NewRandB32() opID := fftypes.NewUUID() nsOpID := "ns1:" + opID.String() + dataID := fftypes.NewUUID() update := &core.OperationUpdate{ NamespacedOpID: nsOpID, @@ -53,8 +54,8 @@ func TestBoundCallbacks(t *testing.T) { _, err := bc.SharedStorageBatchDownloaded("payload1", []byte(`{}`)) assert.EqualError(t, err, "pop") - mei.On("SharedStorageBlobDownloaded", mss, *hash, int64(12345), "payload1").Return() - bc.SharedStorageBlobDownloaded(*hash, 12345, "payload1") + mei.On("SharedStorageBlobDownloaded", mss, *hash, int64(12345), "payload1", dataID).Return() + bc.SharedStorageBlobDownloaded(*hash, 12345, "payload1", dataID) mei.AssertExpectations(t) mss.AssertExpectations(t) diff --git a/internal/privatemessaging/message_test.go b/internal/privatemessaging/message_test.go index c27966766..ce3d54a70 100644 --- a/internal/privatemessaging/message_test.go +++ b/internal/privatemessaging/message_test.go @@ -713,7 +713,7 @@ func TestSendDataTransferBlobsFail(t *testing.T) { mim.On("GetLocalNode", pm.ctx).Return(node1, nil) mdi := pm.database.(*databasemocks.Plugin) - mdi.On("GetBlobMatchingHash", pm.ctx, mock.Anything).Return(nil, fmt.Errorf("pop")) + mdi.On("GetBlobs", pm.ctx, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) err := pm.sendData(pm.ctx, &core.TransportWrapper{ Batch: &core.Batch{ diff --git a/internal/privatemessaging/operations.go b/internal/privatemessaging/operations.go index 4db922ee7..7c5cf2174 100644 --- a/internal/privatemessaging/operations.go +++ b/internal/privatemessaging/operations.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -24,6 +24,7 @@ import ( "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly/internal/coremsgs" "github.com/hyperledger/firefly/pkg/core" + "github.com/hyperledger/firefly/pkg/database" ) type transferBlobData struct { @@ -36,19 +37,28 @@ type batchSendData struct { Transport *core.TransportWrapper `json:"transport"` } -func addTransferBlobInputs(op *core.Operation, nodeID *fftypes.UUID, blobHash *fftypes.Bytes32) { +func addTransferBlobInputs(op *core.Operation, nodeID *fftypes.UUID, blobHash *fftypes.Bytes32, dataID *fftypes.UUID) { op.Input = fftypes.JSONObject{ - "node": nodeID.String(), - "hash": blobHash.String(), + "node": nodeID.String(), + "hash": blobHash.String(), + "data_id": dataID.String(), } } -func retrieveSendBlobInputs(ctx context.Context, op *core.Operation) (nodeID *fftypes.UUID, blobHash *fftypes.Bytes32, err error) { +func retrieveSendBlobInputs(ctx context.Context, op *core.Operation) (nodeID *fftypes.UUID, blobHash *fftypes.Bytes32, dataID *fftypes.UUID, err error) { nodeID, err = fftypes.ParseUUID(ctx, op.Input.GetString("node")) - if err == nil { - blobHash, err = fftypes.ParseBytes32(ctx, op.Input.GetString("hash")) + if err != nil { + return nil, nil, nil, err + } + blobHash, err = fftypes.ParseBytes32(ctx, op.Input.GetString("hash")) + if err != nil { + return nil, nil, nil, err + } + dataID, err = fftypes.ParseUUID(ctx, op.Input.GetString("data_id")) + if err != nil { + return nil, nil, nil, err } - return nodeID, blobHash, err + return nodeID, blobHash, dataID, err } func addBatchSendInputs(op *core.Operation, nodeID *fftypes.UUID, groupHash *fftypes.Bytes32, batchID *fftypes.UUID) { @@ -73,7 +83,7 @@ func retrieveBatchSendInputs(ctx context.Context, op *core.Operation) (nodeID *f func (pm *privateMessaging) PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error) { switch op.Type { case core.OpTypeDataExchangeSendBlob: - nodeID, blobHash, err := retrieveSendBlobInputs(ctx, op) + nodeID, blobHash, dataID, err := retrieveSendBlobInputs(ctx, op) if err != nil { return nil, err } @@ -83,13 +93,14 @@ func (pm *privateMessaging) PrepareOperation(ctx context.Context, op *core.Opera } else if node == nil { return nil, i18n.NewError(ctx, coremsgs.Msg404NotFound) } - blob, err := pm.database.GetBlobMatchingHash(ctx, blobHash) + fb := database.BlobQueryFactory.NewFilter(ctx) + blobs, _, err := pm.database.GetBlobs(ctx, pm.namespace.Name, fb.And(fb.Eq("data_id", dataID), fb.Eq("hash", blobHash))) if err != nil { return nil, err - } else if blob == nil { + } else if len(blobs) == 0 || blobs[0] == nil { return nil, i18n.NewError(ctx, coremsgs.Msg404NotFound) } - return opSendBlob(op, node, blob), nil + return opSendBlob(op, node, blobs[0]), nil case core.OpTypeDataExchangeSendBatch: nodeID, groupHash, batchID, err := retrieveBatchSendInputs(ctx, op) diff --git a/internal/privatemessaging/operations_test.go b/internal/privatemessaging/operations_test.go index 4782aade4..a506f37ea 100644 --- a/internal/privatemessaging/operations_test.go +++ b/internal/privatemessaging/operations_test.go @@ -33,6 +33,7 @@ import ( func TestPrepareAndRunTransferBlob(t *testing.T) { pm, cancel := newTestPrivateMessaging(t) defer cancel() + dataID := fftypes.NewUUID() op := &core.Operation{ Type: core.OpTypeDataExchangeSendBlob, @@ -60,16 +61,18 @@ func TestPrepareAndRunTransferBlob(t *testing.T) { }, } blob := &core.Blob{ + Namespace: "ns1", Hash: fftypes.NewRandB32(), PayloadRef: "payload", + DataID: dataID, } - addTransferBlobInputs(op, node.ID, blob.Hash) + addTransferBlobInputs(op, node.ID, blob.Hash, dataID) mdi := pm.database.(*databasemocks.Plugin) mdx := pm.exchange.(*dataexchangemocks.Plugin) mim := pm.identity.(*identitymanagermocks.Manager) mim.On("CachedIdentityLookupByID", context.Background(), mock.Anything).Return(node, nil) - mdi.On("GetBlobMatchingHash", context.Background(), blob.Hash).Return(blob, nil) + mdi.On("GetBlobs", context.Background(), "ns1", mock.Anything).Return([]*core.Blob{blob}, nil, nil) mim.On("GetLocalNode", context.Background()).Return(localNode, nil) mdx.On("TransferBlob", context.Background(), "ns1:"+op.ID.String(), node.Profile, localNode.Profile, "payload").Return(nil) @@ -238,8 +241,9 @@ func TestPrepareOperationBlobSendNodeFail(t *testing.T) { op := &core.Operation{ Type: core.OpTypeDataExchangeSendBlob, Input: fftypes.JSONObject{ - "node": nodeID.String(), - "hash": blobHash.String(), + "node": nodeID.String(), + "hash": blobHash.String(), + "data_id": fftypes.NewUUID().String(), }, } @@ -261,8 +265,9 @@ func TestPrepareOperationBlobSendNodeNotFound(t *testing.T) { op := &core.Operation{ Type: core.OpTypeDataExchangeSendBlob, Input: fftypes.JSONObject{ - "node": nodeID.String(), - "hash": blobHash.String(), + "node": nodeID.String(), + "hash": blobHash.String(), + "data_id": fftypes.NewUUID().String(), }, } @@ -293,15 +298,16 @@ func TestPrepareOperationBlobSendBlobFail(t *testing.T) { op := &core.Operation{ Type: core.OpTypeDataExchangeSendBlob, Input: fftypes.JSONObject{ - "node": node.ID.String(), - "hash": blobHash.String(), + "node": node.ID.String(), + "hash": blobHash.String(), + "data_id": fftypes.NewUUID().String(), }, } mdi := pm.database.(*databasemocks.Plugin) mim := pm.identity.(*identitymanagermocks.Manager) mim.On("CachedIdentityLookupByID", context.Background(), node.ID).Return(node, nil) - mdi.On("GetBlobMatchingHash", context.Background(), blobHash).Return(nil, fmt.Errorf("pop")) + mdi.On("GetBlobs", context.Background(), mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop")) _, err := pm.PrepareOperation(context.Background(), op) assert.EqualError(t, err, "pop") @@ -328,15 +334,16 @@ func TestPrepareOperationBlobSendBlobNotFound(t *testing.T) { op := &core.Operation{ Type: core.OpTypeDataExchangeSendBlob, Input: fftypes.JSONObject{ - "node": node.ID.String(), - "hash": blobHash.String(), + "node": node.ID.String(), + "hash": blobHash.String(), + "data_id": fftypes.NewUUID().String(), }, } mdi := pm.database.(*databasemocks.Plugin) mim := pm.identity.(*identitymanagermocks.Manager) mim.On("CachedIdentityLookupByID", context.Background(), node.ID).Return(node, nil) - mdi.On("GetBlobMatchingHash", context.Background(), blobHash).Return(nil, nil) + mdi.On("GetBlobs", context.Background(), mock.Anything, mock.Anything).Return([]*core.Blob{}, nil, nil) _, err := pm.PrepareOperation(context.Background(), op) assert.Regexp(t, "FF10109", err) @@ -644,3 +651,50 @@ func TestOperationUpdate(t *testing.T) { defer cancel() assert.NoError(t, pm.OnOperationUpdate(context.Background(), nil, nil)) } + +func TestRetrieveBSendBlobInputs(t *testing.T) { + + nodeID := fftypes.NewUUID() + dataID := fftypes.NewUUID() + hash := fftypes.NewRandB32() + + op := &core.Operation{ + Input: fftypes.JSONObject{ + "node": nodeID.String(), + "hash": hash.String(), + "data_id": dataID.String(), + }, + } + n, h, d, err := retrieveSendBlobInputs(context.Background(), op) + assert.NoError(t, err) + assert.Equal(t, n, nodeID) + assert.Equal(t, h, hash) + assert.Equal(t, d, dataID) + + op = &core.Operation{ + Input: fftypes.JSONObject{ + "hash": hash.String(), + "data_id": dataID.String(), + }, + } + n, h, d, err = retrieveSendBlobInputs(context.Background(), op) + assert.Regexp(t, "FF00138", err) + + op = &core.Operation{ + Input: fftypes.JSONObject{ + "node": nodeID.String(), + "data_id": dataID.String(), + }, + } + n, h, d, err = retrieveSendBlobInputs(context.Background(), op) + assert.Regexp(t, "FF00107", err) + + op = &core.Operation{ + Input: fftypes.JSONObject{ + "node": nodeID.String(), + "hash": hash.String(), + }, + } + n, h, d, err = retrieveSendBlobInputs(context.Background(), op) + assert.Regexp(t, "FF00138", err) +} diff --git a/internal/privatemessaging/privatemessaging.go b/internal/privatemessaging/privatemessaging.go index 25b22b2d0..060d2fb99 100644 --- a/internal/privatemessaging/privatemessaging.go +++ b/internal/privatemessaging/privatemessaging.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -212,21 +212,22 @@ func (pm *privateMessaging) prepareBlobTransfers(ctx context.Context, data core. if d.Blob.Hash == nil { return i18n.NewError(ctx, coremsgs.MsgDataMissingBlobHash, d.ID) } - - blob, err := pm.database.GetBlobMatchingHash(ctx, d.Blob.Hash) + fb := database.BlobQueryFactory.NewFilter(ctx) + blobs, _, err := pm.database.GetBlobs(ctx, pm.namespace.Name, fb.And(fb.Eq("data_id", d.ID), fb.Eq("hash", d.Blob.Hash))) if err != nil { return err } - if blob == nil { - return i18n.NewError(ctx, coremsgs.MsgBlobNotFound, d.Blob) + if len(blobs) == 0 || blobs[0] == nil { + return i18n.NewError(ctx, coremsgs.MsgBlobNotFound, d.Blob.Hash) } + blob := blobs[0] op := core.NewOperation( pm.exchange, pm.namespace.Name, txid, core.OpTypeDataExchangeSendBlob) - addTransferBlobInputs(op, node.ID, blob.Hash) + addTransferBlobInputs(op, node.ID, blob.Hash, d.ID) if err = pm.operations.AddOrReuseOperation(ctx, op); err != nil { return err } diff --git a/internal/privatemessaging/privatemessaging_test.go b/internal/privatemessaging/privatemessaging_test.go index 0dc8f75f6..6218fb146 100644 --- a/internal/privatemessaging/privatemessaging_test.go +++ b/internal/privatemessaging/privatemessaging_test.go @@ -185,10 +185,10 @@ func TestDispatchBatchWithBlobs(t *testing.T) { }, nil) mim.On("CachedIdentityLookupByID", pm.ctx, node1.ID).Return(node1, nil).Once() mim.On("CachedIdentityLookupByID", pm.ctx, node2.ID).Return(node2, nil).Once() - mdi.On("GetBlobMatchingHash", pm.ctx, blob1).Return(&core.Blob{ + mdi.On("GetBlobs", pm.ctx, mock.Anything, mock.Anything).Return([]*core.Blob{{ Hash: blob1, PayloadRef: "/blob/1", - }, nil) + }}, nil, nil) mom.On("AddOrReuseOperation", pm.ctx, mock.MatchedBy(func(op *core.Operation) bool { return op.Type == core.OpTypeDataExchangeSendBlob })).Return(nil, nil) @@ -422,10 +422,10 @@ func TestSendSubmitBlobTransferFail(t *testing.T) { mom := pm.operations.(*operationmocks.Manager) mom.On("AddOrReuseOperation", pm.ctx, mock.Anything).Return(nil) - mdi.On("GetBlobMatchingHash", pm.ctx, blob1).Return(&core.Blob{ - Hash: blob1, + mdi.On("GetBlobs", pm.ctx, mock.Anything, mock.Anything).Return([]*core.Blob{{ PayloadRef: "/blob/1", - }, nil) + Hash: blob1, + }}, nil, nil) mom.On("RunOperation", pm.ctx, mock.MatchedBy(func(op *core.PreparedOperation) bool { data := op.Data.(transferBlobData) @@ -497,10 +497,10 @@ func TestWriteTransactionSubmitBatchPinFail(t *testing.T) { return *data.Node.ID == *node2.ID })).Return(nil, nil) - mdi.On("GetBlobMatchingHash", pm.ctx, blob1).Return(&core.Blob{ + mdi.On("GetBlobs", pm.ctx, "ns1", mock.Anything).Return([]*core.Blob{{ Hash: blob1, PayloadRef: "/blob/1", - }, nil) + }}, nil, nil) mmp := pm.multiparty.(*multipartymocks.Manager) mmp.On("SubmitBatchPin", pm.ctx, mock.Anything, mock.Anything, "").Return(fmt.Errorf("pop")) @@ -516,7 +516,7 @@ func TestWriteTransactionSubmitBatchPinFail(t *testing.T) { }, }, Data: core.DataArray{ - {ID: fftypes.NewUUID(), Blob: &core.BlobRef{Hash: blob1}}, + {Namespace: "ns1", ID: fftypes.NewUUID(), Blob: &core.BlobRef{Hash: blob1}}, }, }) assert.Regexp(t, "pop", err) @@ -543,7 +543,7 @@ func TestTransferBlobsNotFound(t *testing.T) { defer cancel() mdi := pm.database.(*databasemocks.Plugin) - mdi.On("GetBlobMatchingHash", pm.ctx, mock.Anything).Return(nil, nil) + mdi.On("GetBlobs", pm.ctx, mock.Anything, mock.Anything).Return([]*core.Blob{}, nil, nil) _, err := pm.prepareBlobTransfers(pm.ctx, core.DataArray{ {ID: fftypes.NewUUID(), Hash: fftypes.NewRandB32(), Blob: &core.BlobRef{Hash: fftypes.NewRandB32()}}, @@ -561,7 +561,7 @@ func TestTransferBlobsOpInsertFail(t *testing.T) { mdx := pm.exchange.(*dataexchangemocks.Plugin) mom := pm.operations.(*operationmocks.Manager) - mdi.On("GetBlobMatchingHash", pm.ctx, mock.Anything).Return(&core.Blob{PayloadRef: "blob/1"}, nil) + mdi.On("GetBlobs", pm.ctx, mock.Anything, mock.Anything).Return([]*core.Blob{{PayloadRef: "blob/1"}}, nil, nil) mdx.On("TransferBlob", pm.ctx, mock.Anything, "peer1", "blob/1").Return(nil) mom.On("AddOrReuseOperation", pm.ctx, mock.Anything).Return(fmt.Errorf("pop")) diff --git a/internal/shareddownload/download_manager.go b/internal/shareddownload/download_manager.go index ee4ec4b12..cd18de8a5 100644 --- a/internal/shareddownload/download_manager.go +++ b/internal/shareddownload/download_manager.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -76,7 +76,7 @@ type downloadWork struct { type Callbacks interface { SharedStorageBatchDownloaded(payloadRef string, data []byte) (batchID *fftypes.UUID, err error) - SharedStorageBlobDownloaded(hash fftypes.Bytes32, size int64, payloadRef string) + SharedStorageBlobDownloaded(hash fftypes.Bytes32, size int64, payloadRef string, dataID *fftypes.UUID) } func NewDownloadManager(ctx context.Context, ns *core.Namespace, di database.Plugin, ss sharedstorage.Plugin, dx dataexchange.Plugin, om operations.Manager, cb Callbacks) (Manager, error) { diff --git a/internal/shareddownload/download_manager_test.go b/internal/shareddownload/download_manager_test.go index b1c1d9f58..373861a72 100644 --- a/internal/shareddownload/download_manager_test.go +++ b/internal/shareddownload/download_manager_test.go @@ -181,7 +181,7 @@ func TestDownloadBlobWithRetryOk(t *testing.T) { }).Once() mci := dm.callbacks.(*shareddownloadmocks.Callbacks) - mci.On("SharedStorageBlobDownloaded", *blobHash, int64(12345), "privateRef1").Return() + mci.On("SharedStorageBlobDownloaded", *blobHash, int64(12345), "privateRef1", dataID).Return() err := dm.InitiateDownloadBlob(dm.ctx, txID, dataID, "ref1") assert.NoError(t, err) diff --git a/internal/shareddownload/operations.go b/internal/shareddownload/operations.go index fac07ab99..e39d3d756 100644 --- a/internal/shareddownload/operations.go +++ b/internal/shareddownload/operations.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -154,7 +154,7 @@ func (dm *downloadManager) downloadBlob(ctx context.Context, data downloadBlobDa log.L(ctx).Infof("Transferred blob '%s' (%s) from shared storage '%s' to local data exchange '%s'", hash, units.HumanSizeWithPrecision(float64(blobSize), 2), data.PayloadRef, dxPayloadRef) // then callback to store metadata - dm.callbacks.SharedStorageBlobDownloaded(*hash, blobSize, dxPayloadRef) + dm.callbacks.SharedStorageBlobDownloaded(*hash, blobSize, dxPayloadRef, data.DataID) return getDownloadBlobOutputs(hash, blobSize, dxPayloadRef), true, nil } diff --git a/mocks/apiservermocks/ffi_swagger_gen.go b/mocks/apiservermocks/ffi_swagger_gen.go index 840eafffb..26d0e9205 100644 --- a/mocks/apiservermocks/ffi_swagger_gen.go +++ b/mocks/apiservermocks/ffi_swagger_gen.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package apiservermocks diff --git a/mocks/apiservermocks/server.go b/mocks/apiservermocks/server.go index 8e2d7858d..fa6db9eaa 100644 --- a/mocks/apiservermocks/server.go +++ b/mocks/apiservermocks/server.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package apiservermocks diff --git a/mocks/assetmocks/manager.go b/mocks/assetmocks/manager.go index 36bd9addc..74a988069 100644 --- a/mocks/assetmocks/manager.go +++ b/mocks/assetmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package assetmocks diff --git a/mocks/batchmocks/manager.go b/mocks/batchmocks/manager.go index 3e987ac49..65724e877 100644 --- a/mocks/batchmocks/manager.go +++ b/mocks/batchmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package batchmocks diff --git a/mocks/blockchainmocks/callbacks.go b/mocks/blockchainmocks/callbacks.go index 024268dcd..13af1e30a 100644 --- a/mocks/blockchainmocks/callbacks.go +++ b/mocks/blockchainmocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package blockchainmocks diff --git a/mocks/blockchainmocks/plugin.go b/mocks/blockchainmocks/plugin.go index 29c31808b..905e7e9b1 100644 --- a/mocks/blockchainmocks/plugin.go +++ b/mocks/blockchainmocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package blockchainmocks diff --git a/mocks/broadcastmocks/manager.go b/mocks/broadcastmocks/manager.go index 770dc5361..c0e8b8742 100644 --- a/mocks/broadcastmocks/manager.go +++ b/mocks/broadcastmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package broadcastmocks diff --git a/mocks/cachemocks/manager.go b/mocks/cachemocks/manager.go index 48d54cd19..29a04f457 100644 --- a/mocks/cachemocks/manager.go +++ b/mocks/cachemocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package cachemocks diff --git a/mocks/contractmocks/manager.go b/mocks/contractmocks/manager.go index 8a574a80d..2ec15c00f 100644 --- a/mocks/contractmocks/manager.go +++ b/mocks/contractmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package contractmocks diff --git a/mocks/coremocks/operation_callbacks.go b/mocks/coremocks/operation_callbacks.go index 5e0700d0b..bde5613d2 100644 --- a/mocks/coremocks/operation_callbacks.go +++ b/mocks/coremocks/operation_callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package coremocks diff --git a/mocks/databasemocks/callbacks.go b/mocks/databasemocks/callbacks.go index 7bf2d3286..0f5033b0a 100644 --- a/mocks/databasemocks/callbacks.go +++ b/mocks/databasemocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package databasemocks diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index c2796cc7b..3b594ea19 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package databasemocks @@ -67,6 +67,20 @@ func (_m *Plugin) DeleteContractListenerByID(ctx context.Context, namespace stri return r0 } +// DeleteData provides a mock function with given fields: ctx, namespace, id +func (_m *Plugin) DeleteData(ctx context.Context, namespace string, id *fftypes.UUID) error { + ret := _m.Called(ctx, namespace, id) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, *fftypes.UUID) error); ok { + r0 = rf(ctx, namespace, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // DeleteNonce provides a mock function with given fields: ctx, hash func (_m *Plugin) DeleteNonce(ctx context.Context, hash *fftypes.Bytes32) error { ret := _m.Called(ctx, hash) @@ -210,36 +224,13 @@ func (_m *Plugin) GetBatches(ctx context.Context, namespace string, filter ffapi return r0, r1, r2 } -// GetBlobMatchingHash provides a mock function with given fields: ctx, hash -func (_m *Plugin) GetBlobMatchingHash(ctx context.Context, hash *fftypes.Bytes32) (*core.Blob, error) { - ret := _m.Called(ctx, hash) - - var r0 *core.Blob - if rf, ok := ret.Get(0).(func(context.Context, *fftypes.Bytes32) *core.Blob); ok { - r0 = rf(ctx, hash) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*core.Blob) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *fftypes.Bytes32) error); ok { - r1 = rf(ctx, hash) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// GetBlobs provides a mock function with given fields: ctx, filter -func (_m *Plugin) GetBlobs(ctx context.Context, filter ffapi.Filter) ([]*core.Blob, *ffapi.FilterResult, error) { - ret := _m.Called(ctx, filter) +// GetBlobs provides a mock function with given fields: ctx, namespace, filter +func (_m *Plugin) GetBlobs(ctx context.Context, namespace string, filter ffapi.Filter) ([]*core.Blob, *ffapi.FilterResult, error) { + ret := _m.Called(ctx, namespace, filter) var r0 []*core.Blob - if rf, ok := ret.Get(0).(func(context.Context, ffapi.Filter) []*core.Blob); ok { - r0 = rf(ctx, filter) + if rf, ok := ret.Get(0).(func(context.Context, string, ffapi.Filter) []*core.Blob); ok { + r0 = rf(ctx, namespace, filter) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]*core.Blob) @@ -247,8 +238,8 @@ func (_m *Plugin) GetBlobs(ctx context.Context, filter ffapi.Filter) ([]*core.Bl } var r1 *ffapi.FilterResult - if rf, ok := ret.Get(1).(func(context.Context, ffapi.Filter) *ffapi.FilterResult); ok { - r1 = rf(ctx, filter) + if rf, ok := ret.Get(1).(func(context.Context, string, ffapi.Filter) *ffapi.FilterResult); ok { + r1 = rf(ctx, namespace, filter) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(*ffapi.FilterResult) @@ -256,8 +247,8 @@ func (_m *Plugin) GetBlobs(ctx context.Context, filter ffapi.Filter) ([]*core.Bl } var r2 error - if rf, ok := ret.Get(2).(func(context.Context, ffapi.Filter) error); ok { - r2 = rf(ctx, filter) + if rf, ok := ret.Get(2).(func(context.Context, string, ffapi.Filter) error); ok { + r2 = rf(ctx, namespace, filter) } else { r2 = ret.Error(2) } diff --git a/mocks/dataexchangemocks/callbacks.go b/mocks/dataexchangemocks/callbacks.go index 7ecda9c04..6a60d19cc 100644 --- a/mocks/dataexchangemocks/callbacks.go +++ b/mocks/dataexchangemocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package dataexchangemocks diff --git a/mocks/dataexchangemocks/dx_event.go b/mocks/dataexchangemocks/dx_event.go index 82dae91e1..4a66c69f4 100644 --- a/mocks/dataexchangemocks/dx_event.go +++ b/mocks/dataexchangemocks/dx_event.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package dataexchangemocks diff --git a/mocks/dataexchangemocks/plugin.go b/mocks/dataexchangemocks/plugin.go index 01672b978..b1b032284 100644 --- a/mocks/dataexchangemocks/plugin.go +++ b/mocks/dataexchangemocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package dataexchangemocks @@ -53,6 +53,20 @@ func (_m *Plugin) Capabilities() *dataexchange.Capabilities { return r0 } +// DeleteBlob provides a mock function with given fields: ctx, payloadRef +func (_m *Plugin) DeleteBlob(ctx context.Context, payloadRef string) error { + ret := _m.Called(ctx, payloadRef) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, payloadRef) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // DownloadBlob provides a mock function with given fields: ctx, payloadRef func (_m *Plugin) DownloadBlob(ctx context.Context, payloadRef string) (io.ReadCloser, error) { ret := _m.Called(ctx, payloadRef) diff --git a/mocks/datamocks/manager.go b/mocks/datamocks/manager.go index 4cce3323a..97ce9f6bb 100644 --- a/mocks/datamocks/manager.go +++ b/mocks/datamocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package datamocks @@ -50,6 +50,20 @@ func (_m *Manager) CheckDatatype(ctx context.Context, datatype *core.Datatype) e return r0 } +// DeleteData provides a mock function with given fields: ctx, dataID +func (_m *Manager) DeleteData(ctx context.Context, dataID string) error { + ret := _m.Called(ctx, dataID) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, dataID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // DownloadBlob provides a mock function with given fields: ctx, dataID func (_m *Manager) DownloadBlob(ctx context.Context, dataID string) (*core.Blob, io.ReadCloser, error) { ret := _m.Called(ctx, dataID) diff --git a/mocks/definitionsmocks/handler.go b/mocks/definitionsmocks/handler.go index 7b1ab52f0..55593eccb 100644 --- a/mocks/definitionsmocks/handler.go +++ b/mocks/definitionsmocks/handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package definitionsmocks diff --git a/mocks/definitionsmocks/sender.go b/mocks/definitionsmocks/sender.go index 44a207e4d..4ae2f8a37 100644 --- a/mocks/definitionsmocks/sender.go +++ b/mocks/definitionsmocks/sender.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package definitionsmocks diff --git a/mocks/eventmocks/event_manager.go b/mocks/eventmocks/event_manager.go index f580c1e6a..6f8578c37 100644 --- a/mocks/eventmocks/event_manager.go +++ b/mocks/eventmocks/event_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package eventmocks @@ -247,9 +247,9 @@ func (_m *EventManager) SharedStorageBatchDownloaded(ss sharedstorage.Plugin, pa return r0, r1 } -// SharedStorageBlobDownloaded provides a mock function with given fields: ss, hash, size, payloadRef -func (_m *EventManager) SharedStorageBlobDownloaded(ss sharedstorage.Plugin, hash fftypes.Bytes32, size int64, payloadRef string) { - _m.Called(ss, hash, size, payloadRef) +// SharedStorageBlobDownloaded provides a mock function with given fields: ss, hash, size, payloadRef, dataID +func (_m *EventManager) SharedStorageBlobDownloaded(ss sharedstorage.Plugin, hash fftypes.Bytes32, size int64, payloadRef string, dataID *fftypes.UUID) { + _m.Called(ss, hash, size, payloadRef, dataID) } // Start provides a mock function with given fields: diff --git a/mocks/eventsmocks/callbacks.go b/mocks/eventsmocks/callbacks.go index c3e746d3a..021e755a5 100644 --- a/mocks/eventsmocks/callbacks.go +++ b/mocks/eventsmocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package eventsmocks diff --git a/mocks/eventsmocks/plugin.go b/mocks/eventsmocks/plugin.go index 82e66cbab..847c2985d 100644 --- a/mocks/eventsmocks/plugin.go +++ b/mocks/eventsmocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package eventsmocks diff --git a/mocks/identitymanagermocks/manager.go b/mocks/identitymanagermocks/manager.go index cbf985576..6c0241c58 100644 --- a/mocks/identitymanagermocks/manager.go +++ b/mocks/identitymanagermocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package identitymanagermocks diff --git a/mocks/identitymocks/callbacks.go b/mocks/identitymocks/callbacks.go index 1684659f0..41531fff7 100644 --- a/mocks/identitymocks/callbacks.go +++ b/mocks/identitymocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package identitymocks diff --git a/mocks/identitymocks/plugin.go b/mocks/identitymocks/plugin.go index 329ae5f13..81ab06c96 100644 --- a/mocks/identitymocks/plugin.go +++ b/mocks/identitymocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package identitymocks diff --git a/mocks/metricsmocks/manager.go b/mocks/metricsmocks/manager.go index 17141a831..7d750175e 100644 --- a/mocks/metricsmocks/manager.go +++ b/mocks/metricsmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package metricsmocks diff --git a/mocks/multipartymocks/manager.go b/mocks/multipartymocks/manager.go index 80a3277b8..8020196ee 100644 --- a/mocks/multipartymocks/manager.go +++ b/mocks/multipartymocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package multipartymocks diff --git a/mocks/namespacemocks/manager.go b/mocks/namespacemocks/manager.go index bcda5fbba..8702560bb 100644 --- a/mocks/namespacemocks/manager.go +++ b/mocks/namespacemocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package namespacemocks diff --git a/mocks/networkmapmocks/manager.go b/mocks/networkmapmocks/manager.go index 5d9f7dbaf..c1acb8e8d 100644 --- a/mocks/networkmapmocks/manager.go +++ b/mocks/networkmapmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package networkmapmocks diff --git a/mocks/operationmocks/manager.go b/mocks/operationmocks/manager.go index 6fd421a1d..89c44d27b 100644 --- a/mocks/operationmocks/manager.go +++ b/mocks/operationmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package operationmocks diff --git a/mocks/orchestratormocks/orchestrator.go b/mocks/orchestratormocks/orchestrator.go index e3a7d7f80..8b265666d 100644 --- a/mocks/orchestratormocks/orchestrator.go +++ b/mocks/orchestratormocks/orchestrator.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package orchestratormocks diff --git a/mocks/privatemessagingmocks/manager.go b/mocks/privatemessagingmocks/manager.go index 7400ed42d..afbadcc7a 100644 --- a/mocks/privatemessagingmocks/manager.go +++ b/mocks/privatemessagingmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package privatemessagingmocks diff --git a/mocks/shareddownloadmocks/callbacks.go b/mocks/shareddownloadmocks/callbacks.go index 67f2546cc..133f86def 100644 --- a/mocks/shareddownloadmocks/callbacks.go +++ b/mocks/shareddownloadmocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package shareddownloadmocks @@ -35,9 +35,9 @@ func (_m *Callbacks) SharedStorageBatchDownloaded(payloadRef string, data []byte return r0, r1 } -// SharedStorageBlobDownloaded provides a mock function with given fields: hash, size, payloadRef -func (_m *Callbacks) SharedStorageBlobDownloaded(hash fftypes.Bytes32, size int64, payloadRef string) { - _m.Called(hash, size, payloadRef) +// SharedStorageBlobDownloaded provides a mock function with given fields: hash, size, payloadRef, dataID +func (_m *Callbacks) SharedStorageBlobDownloaded(hash fftypes.Bytes32, size int64, payloadRef string, dataID *fftypes.UUID) { + _m.Called(hash, size, payloadRef, dataID) } type mockConstructorTestingTNewCallbacks interface { diff --git a/mocks/shareddownloadmocks/manager.go b/mocks/shareddownloadmocks/manager.go index 698246330..7c6f648d4 100644 --- a/mocks/shareddownloadmocks/manager.go +++ b/mocks/shareddownloadmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package shareddownloadmocks diff --git a/mocks/sharedstoragemocks/callbacks.go b/mocks/sharedstoragemocks/callbacks.go index cba3791df..78d65e597 100644 --- a/mocks/sharedstoragemocks/callbacks.go +++ b/mocks/sharedstoragemocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package sharedstoragemocks diff --git a/mocks/sharedstoragemocks/plugin.go b/mocks/sharedstoragemocks/plugin.go index 2c2a49278..2abb70625 100644 --- a/mocks/sharedstoragemocks/plugin.go +++ b/mocks/sharedstoragemocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package sharedstoragemocks diff --git a/mocks/spieventsmocks/manager.go b/mocks/spieventsmocks/manager.go index 0ae3113e7..3a7c203b2 100644 --- a/mocks/spieventsmocks/manager.go +++ b/mocks/spieventsmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package spieventsmocks diff --git a/mocks/syncasyncmocks/bridge.go b/mocks/syncasyncmocks/bridge.go index 2deb4f70c..cf8565b3b 100644 --- a/mocks/syncasyncmocks/bridge.go +++ b/mocks/syncasyncmocks/bridge.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package syncasyncmocks diff --git a/mocks/syncasyncmocks/sender.go b/mocks/syncasyncmocks/sender.go index 97d17f0ca..9586bbad9 100644 --- a/mocks/syncasyncmocks/sender.go +++ b/mocks/syncasyncmocks/sender.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package syncasyncmocks diff --git a/mocks/systemeventmocks/event_interface.go b/mocks/systemeventmocks/event_interface.go index e5aa317ae..6de973cb3 100644 --- a/mocks/systemeventmocks/event_interface.go +++ b/mocks/systemeventmocks/event_interface.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package systemeventmocks diff --git a/mocks/tokenmocks/callbacks.go b/mocks/tokenmocks/callbacks.go index d2618b9b0..b00c90cd2 100644 --- a/mocks/tokenmocks/callbacks.go +++ b/mocks/tokenmocks/callbacks.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package tokenmocks diff --git a/mocks/tokenmocks/plugin.go b/mocks/tokenmocks/plugin.go index d7939c470..f5c2b19ef 100644 --- a/mocks/tokenmocks/plugin.go +++ b/mocks/tokenmocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package tokenmocks diff --git a/mocks/txcommonmocks/helper.go b/mocks/txcommonmocks/helper.go index 2d289053e..d4b626c7d 100644 --- a/mocks/txcommonmocks/helper.go +++ b/mocks/txcommonmocks/helper.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package txcommonmocks diff --git a/mocks/wsmocks/ws_client.go b/mocks/wsmocks/ws_client.go index 814a6800e..474b5625a 100644 --- a/mocks/wsmocks/ws_client.go +++ b/mocks/wsmocks/ws_client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.1. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package wsmocks diff --git a/pkg/core/blob.go b/pkg/core/blob.go index c6ad71407..2e1e42f50 100644 --- a/pkg/core/blob.go +++ b/pkg/core/blob.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -19,10 +19,12 @@ package core import "github.com/hyperledger/firefly-common/pkg/fftypes" type Blob struct { + Sequence int64 `json:"-"` + Namespace string `json:"namespace"` Hash *fftypes.Bytes32 `json:"hash"` - Size int64 `json:"size"` PayloadRef string `json:"payloadRef,omitempty"` - Peer string `json:"peer,omitempty"` Created *fftypes.FFTime `json:"created,omitempty"` - Sequence int64 `json:"-"` + Peer string `json:"peer,omitempty"` + Size int64 `json:"size"` + DataID *fftypes.UUID `json:"data_id"` } diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 9c16822bf..856b243e0 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -133,6 +133,9 @@ type iDataCollection interface { // GetDataRefs - Get data references only (no data) GetDataRefs(ctx context.Context, namespace string, filter ffapi.Filter) (message core.DataRefs, res *ffapi.FilterResult, err error) + + // DeleteData - Deletes a data record by ID + DeleteData(ctx context.Context, namespace string, id *fftypes.UUID) (err error) } type iBatchCollection interface { @@ -334,11 +337,8 @@ type iBlobCollection interface { // InsertBlobs performs a batch insert of blobs assured to be new records - fails if they already exist, so caller can fall back to upsert individually InsertBlobs(ctx context.Context, blobs []*core.Blob) (err error) - // GetBlobMatchingHash - lookup first blob batching a hash - GetBlobMatchingHash(ctx context.Context, hash *fftypes.Bytes32) (message *core.Blob, err error) - // GetBlobs - get blobs - GetBlobs(ctx context.Context, filter ffapi.Filter) (message []*core.Blob, res *ffapi.FilterResult, err error) + GetBlobs(ctx context.Context, namespace string, filter ffapi.Filter) (message []*core.Blob, res *ffapi.FilterResult, err error) // DeleteBlob - delete a blob, using its local database ID DeleteBlob(ctx context.Context, sequence int64) (err error) @@ -861,6 +861,7 @@ var BlobQueryFactory = &ffapi.QueryFields{ "size": &ffapi.Int64Field{}, "payloadref": &ffapi.StringField{}, "created": &ffapi.TimeField{}, + "data_id": &ffapi.UUIDField{}, } // TokenPoolQueryFactory filter fields for token pools diff --git a/pkg/dataexchange/plugin.go b/pkg/dataexchange/plugin.go index 81a1b15b5..48f02306c 100644 --- a/pkg/dataexchange/plugin.go +++ b/pkg/dataexchange/plugin.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -89,6 +89,9 @@ type Plugin interface { // DownloadBlob streams a received blob out of storage DownloadBlob(ctx context.Context, payloadRef string) (content io.ReadCloser, err error) + // DeleteBlob streams a deletes a blob from the local DB and DX + DeleteBlob(ctx context.Context, payloadRef string) (err error) + // SendMessage sends an in-line package of data to another network node. // Should return as quickly as possible for parallelism, then report completion asynchronously via the operation ID SendMessage(ctx context.Context, nsOpID string, peer, sender fftypes.JSONObject, data []byte) (err error) @@ -134,6 +137,7 @@ type PrivateBlobReceived struct { Hash fftypes.Bytes32 Size int64 PayloadRef string + DataID string } // Capabilities the supported featureset of the data exchange