Skip to content

Commit

Permalink
Merge pull request hyperledger#1132 from kaleido-io/delete-data
Browse files Browse the repository at this point in the history
Add DELETE method for data API
  • Loading branch information
peterbroadhurst authored Jan 14, 2023
2 parents b69d4f2 + 6398aed commit cac029c
Show file tree
Hide file tree
Showing 93 changed files with 1,139 additions and 307 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
BEGIN;

DROP INDEX blobs_namespace_data_id;
DROP INDEX blobs_payload_ref;
ALTER TABLE blobs DROP COLUMN namespace;
ALTER TABLE blobs DROP COLUMN data_id;
CREATE INDEX blob_hash ON blobs(hash);

COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
BEGIN;

CREATE TABLE temp_blobs (
seq SERIAL PRIMARY KEY,
namespace VARCHAR(64) NOT NULL,
hash CHAR(64) NOT NULL,
payload_ref VARCHAR(1024) NOT NULL,
created BIGINT NOT NULL,
peer VARCHAR(256) NOT NULL,
size BIGINT,
data_id UUID NOT NULL
);
INSERT INTO temp_blobs (namespace, data_id, hash, payload_ref, created, peer, size)
SELECT DISTINCT data.namespace, data.id, data.blob_hash, blobs.payload_ref, blobs.created, blobs.peer, blobs.size
FROM data
LEFT JOIN blobs ON blobs.hash = data.blob_hash
WHERE data.blob_hash IS NOT NULL
;
DROP INDEX blobs_hash;
DROP TABLE blobs;
ALTER TABLE temp_blobs RENAME TO blobs;
CREATE INDEX blobs_namespace_data_id ON blobs(namespace, data_id);
CREATE INDEX blobs_payload_ref ON blobs(payload_ref);

COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP INDEX blobs_namespace_data_id;
DROP INDEX blobs_payload_ref;
ALTER TABLE blobs DROP COLUMN namespace;
ALTER TABLE blobs DROP COLUMN data_id;
CREATE INDEX blob_hash ON blobs(hash);
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
CREATE TABLE temp_blobs (
seq INTEGER PRIMARY KEY AUTOINCREMENT,
namespace VARCHAR(64) NOT NULL,
hash CHAR(64) NOT NULL,
payload_ref VARCHAR(1024) NOT NULL,
created BIGINT NOT NULL,
peer VARCHAR(256) NOT NULL,
size BIGINT,
data_id UUID NOT NULL
);
INSERT INTO temp_blobs (namespace, data_id, hash, payload_ref, created, peer, size)
SELECT DISTINCT data.namespace, data.id, data.blob_hash, blobs.payload_ref, blobs.created, blobs.peer, blobs.size
FROM data
LEFT JOIN blobs ON blobs.hash = data.blob_hash
WHERE data.blob_hash IS NOT NULL
;
DROP INDEX blobs_hash;
DROP TABLE blobs;
ALTER TABLE temp_blobs RENAME TO blobs;
CREATE INDEX blobs_namespace_data_id ON blobs(namespace, data_id);
CREATE INDEX blobs_payload_ref ON blobs(payload_ref);
63 changes: 61 additions & 2 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5055,6 +5055,32 @@ paths:
tags:
- Default Namespace
/data/{dataid}:
delete:
description: Deletes a data item by its ID, including metadata about this item
operationId: deleteData
parameters:
- description: The data item ID
in: path
name: dataid
required: true
schema:
type: string
- description: Server-side request timeout (milliseconds, or set a custom suffix
like 10s)
in: header
name: Request-Timeout
schema:
default: 2m0s
type: string
responses:
"204":
content:
application/json: {}
description: Success
default:
description: ""
tags:
- Default Namespace
get:
description: Gets a data item by its ID, including metadata about this item
operationId: getDataByID
Expand Down Expand Up @@ -5151,7 +5177,7 @@ paths:
description: Downloads the original file that was previously uploaded or received
operationId: getDataBlob
parameters:
- description: The blob ID
- description: The data item ID
in: path
name: dataid
required: true
Expand Down Expand Up @@ -15549,6 +15575,39 @@ paths:
tags:
- Non-Default Namespace
/namespaces/{ns}/data/{dataid}:
delete:
description: Deletes a data item by its ID, including metadata about this item
operationId: deleteDataNamespace
parameters:
- description: The data item ID
in: path
name: dataid
required: true
schema:
type: string
- description: The namespace which scopes this request
in: path
name: ns
required: true
schema:
example: default
type: string
- description: Server-side request timeout (milliseconds, or set a custom suffix
like 10s)
in: header
name: Request-Timeout
schema:
default: 2m0s
type: string
responses:
"204":
content:
application/json: {}
description: Success
default:
description: ""
tags:
- Non-Default Namespace
get:
description: Gets a data item by its ID, including metadata about this item
operationId: getDataByIDNamespace
Expand Down Expand Up @@ -15652,7 +15711,7 @@ paths:
description: Downloads the original file that was previously uploaded or received
operationId: getDataBlobNamespace
parameters:
- description: The blob ID
- description: The data item ID
in: path
name: dataid
required: true
Expand Down
44 changes: 44 additions & 0 deletions internal/apiserver/route_date_data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"net/http/httptest"
"testing"

"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly/mocks/datamocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestDeleteDataByID(t *testing.T) {
o, r := newTestAPIServer()
o.On("Authorize", mock.Anything, mock.Anything).Return(nil)
dmm := &datamocks.Manager{}
o.On("Data").Return(dmm)
id := fftypes.NewUUID()
req := httptest.NewRequest("DELETE", "/api/v1/namespaces/mynamespace/data/"+id.String(), nil)
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

dmm.On("DeleteData", mock.Anything, id.String()).
Return(nil, nil)
r.ServeHTTP(res, req)

assert.Equal(t, 204, res.Result().StatusCode)
}
44 changes: 44 additions & 0 deletions internal/apiserver/route_delete_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"net/http"

"github.com/hyperledger/firefly-common/pkg/ffapi"
"github.com/hyperledger/firefly/internal/coremsgs"
)

var deleteData = &ffapi.Route{
Name: "deleteData",
Path: "data/{dataid}",
Method: http.MethodDelete,
PathParams: []*ffapi.PathParam{
{Name: "dataid", Description: coremsgs.APIParamsDataID},
},
QueryParams: nil,
Description: coremsgs.APIEndpointsDeleteData,
JSONInputValue: nil,
JSONOutputValue: nil,
JSONOutputCodes: []int{http.StatusNoContent},
Extensions: &coreExtensions{
CoreJSONHandler: func(r *ffapi.APIRequest, cr *coreRequest) (output interface{}, err error) {
err = cr.or.Data().DeleteData(cr.ctx, r.PP["dataid"])
return nil, err
},
},
}
4 changes: 2 additions & 2 deletions internal/apiserver/route_get_data_blob.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -32,7 +32,7 @@ var getDataBlob = &ffapi.Route{
Path: "data/{dataid}/blob",
Method: http.MethodGet,
PathParams: []*ffapi.PathParam{
{Name: "dataid", Description: coremsgs.APIParamsBlobID},
{Name: "dataid", Description: coremsgs.APIParamsDataID},
},
QueryParams: nil,
FilterFactory: database.MessageQueryFactory,
Expand Down
3 changes: 2 additions & 1 deletion internal/apiserver/routes.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -53,6 +53,7 @@ var routes = append(
}),
namespacedRoutes([]*ffapi.Route{
deleteContractListener,
deleteData,
deleteSubscription,
getBatchByID,
getBatches,
Expand Down
9 changes: 5 additions & 4 deletions internal/broadcast/manager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -203,14 +203,15 @@ func (bm *broadcastManager) uploadDataBlob(ctx context.Context, tx *fftypes.UUID
return err
}

blob, err := bm.database.GetBlobMatchingHash(ctx, d.Blob.Hash)
fb := database.BlobQueryFactory.NewFilter(ctx)
blobs, _, err := bm.database.GetBlobs(ctx, bm.namespace.Name, fb.And(fb.Eq("data_id", d.ID), fb.Eq("hash", d.Blob.Hash)))
if err != nil {
return err
} else if blob == nil {
} else if len(blobs) == 0 || blobs[0] == nil {
return i18n.NewError(ctx, coremsgs.MsgBlobNotFound, d.Blob.Hash)
}

_, err = bm.operations.RunOperation(ctx, opUploadBlob(op, d, blob))
_, err = bm.operations.RunOperation(ctx, opUploadBlob(op, d, blobs[0]))
return err
}

Expand Down
10 changes: 5 additions & 5 deletions internal/broadcast/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestDispatchBatchBlobsFail(t *testing.T) {
mom.On("AddOrReuseOperation", mock.Anything, mock.Anything).Return(nil)

mdi := bm.database.(*databasemocks.Plugin)
mdi.On("GetBlobMatchingHash", bm.ctx, blobHash).Return(nil, fmt.Errorf("pop"))
mdi.On("GetBlobs", mock.Anything, bm.namespace.Name, mock.Anything).Return(nil, nil, fmt.Errorf("pop"))

err := bm.dispatchBatch(bm.ctx, state)
assert.EqualError(t, err, "pop")
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestUploadBlobPublishFail(t *testing.T) {
ctx := context.Background()
mtx.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeDataPublish, core.IdempotencyKey("idem1")).Return(fftypes.NewUUID(), nil)
mdi.On("GetDataByID", ctx, "ns1", d.ID, true).Return(d, nil)
mdi.On("GetBlobMatchingHash", ctx, blob.Hash).Return(blob, nil)
mdi.On("GetBlobs", ctx, bm.namespace.Name, mock.Anything).Return([]*core.Blob{blob}, nil, nil)
mom.On("AddOrReuseOperation", mock.Anything, mock.Anything).Return(nil)
mom.On("RunOperation", mock.Anything, mock.MatchedBy(func(op *core.PreparedOperation) bool {
data := op.Data.(uploadBlobData)
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestUploadBlobsGetBlobFail(t *testing.T) {
dataID := fftypes.NewUUID()

ctx := context.Background()
mdi.On("GetBlobMatchingHash", ctx, blob.Hash).Return(nil, fmt.Errorf("pop"))
mdi.On("GetBlobs", ctx, bm.namespace.Name, mock.Anything).Return(nil, nil, fmt.Errorf("pop"))

mom := bm.operations.(*operationmocks.Manager)
mom.On("AddOrReuseOperation", mock.Anything, mock.Anything).Return(nil)
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestUploadBlobsGetBlobNotFound(t *testing.T) {
dataID := fftypes.NewUUID()

ctx := context.Background()
mdi.On("GetBlobMatchingHash", ctx, blob.Hash).Return(nil, nil)
mdi.On("GetBlobs", ctx, bm.namespace.Name, mock.Anything).Return([]*core.Blob{}, nil, nil)

mom := bm.operations.(*operationmocks.Manager)
mom.On("AddOrReuseOperation", mock.Anything, mock.Anything).Return(nil)
Expand Down Expand Up @@ -596,7 +596,7 @@ func TestUploadBlobOK(t *testing.T) {

ctx := context.Background()
mdi.On("GetDataByID", ctx, "ns1", d.ID, true).Return(d, nil)
mdi.On("GetBlobMatchingHash", ctx, blob.Hash).Return(blob, nil)
mdi.On("GetBlobs", ctx, bm.namespace.Name, mock.Anything).Return([]*core.Blob{blob}, nil, nil)
mom.On("AddOrReuseOperation", mock.Anything, mock.Anything).Return(nil)
mom.On("RunOperation", mock.Anything, mock.MatchedBy(func(op *core.PreparedOperation) bool {
data := op.Data.(uploadBlobData)
Expand Down
9 changes: 5 additions & 4 deletions internal/broadcast/operations.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -114,13 +114,14 @@ func (bm *broadcastManager) PrepareOperation(ctx context.Context, op *core.Opera
} else if d == nil || d.Blob == nil {
return nil, i18n.NewError(ctx, coremsgs.Msg404NotFound)
}
blob, err := bm.database.GetBlobMatchingHash(ctx, d.Blob.Hash)
fb := database.BlobQueryFactory.NewFilter(ctx)
blobs, _, err := bm.database.GetBlobs(ctx, bm.namespace.Name, fb.And(fb.Eq("data_id", dataID), fb.Eq("hash", d.Blob.Hash)))
if err != nil {
return nil, err
} else if blob == nil {
} else if len(blobs) == 0 || blobs[0] == nil {
return nil, i18n.NewError(ctx, coremsgs.Msg404NotFound)
}
return opUploadBlob(op, d, blob), nil
return opUploadBlob(op, d, blobs[0]), nil

case core.OpTypeSharedStorageUploadValue:
dataID, err := retrieveUploadValueInputs(ctx, op)
Expand Down
Loading

0 comments on commit cac029c

Please sign in to comment.