Skip to content

Commit 662feec

Browse files
authored
Merge pull request #1452 from SamMayWork/query-api
feat: Add an API to allow for querying events under a subscription with additional filtering
2 parents fd542c0 + 45fc854 commit 662feec

31 files changed

+1988
-401
lines changed

docs/reference/config.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1390,6 +1390,12 @@ nav_order: 2
13901390
|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`50`
13911391
|batchTimeout|Default batch timeout|`int`|`50ms`
13921392

1393+
## subscription.events
1394+
1395+
|Key|Description|Type|Default Value|
1396+
|---|-----------|----|-------------|
1397+
|maxScanLength|The maximum number of events a search for historical events matching a subscription will index from the database|`int`|`1000`
1398+
13931399
## subscription.retry
13941400

13951401
|Key|Description|Type|Default Value|

docs/swagger/swagger.yaml

Lines changed: 393 additions & 0 deletions
Large diffs are not rendered by default.

go.work.sum

Lines changed: 121 additions & 2 deletions
Large diffs are not rendered by default.
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright © 2024 Kaleido, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package apiserver
18+
19+
import (
20+
"fmt"
21+
"net/http"
22+
"strconv"
23+
24+
"github.com/hyperledger/firefly-common/pkg/ffapi"
25+
"github.com/hyperledger/firefly-common/pkg/i18n"
26+
"github.com/hyperledger/firefly/internal/coremsgs"
27+
"github.com/hyperledger/firefly/pkg/core"
28+
"github.com/hyperledger/firefly/pkg/database"
29+
)
30+
31+
var getSubscriptionEventsFiltered = &ffapi.Route{
32+
Name: "getSubscriptionEventsFiltered",
33+
Path: "subscriptions/{subid}/events",
34+
Method: http.MethodGet,
35+
PathParams: []*ffapi.PathParam{
36+
{Name: "subid", Description: coremsgs.APIParamsSubscriptionID},
37+
},
38+
QueryParams: []*ffapi.QueryParam{
39+
{Name: "startsequence", IsBool: false, Description: coremsgs.APISubscriptionStartSequenceID},
40+
{Name: "endsequence", IsBool: false, Description: coremsgs.APISubscriptionEndSequenceID},
41+
},
42+
FilterFactory: database.EventQueryFactory,
43+
Description: coremsgs.APIEndpointsGetSubscriptionEventsFiltered,
44+
JSONInputValue: nil,
45+
JSONOutputValue: func() interface{} { return []*core.Event{} },
46+
JSONOutputCodes: []int{http.StatusOK},
47+
Extensions: &coreExtensions{
48+
CoreJSONHandler: func(r *ffapi.APIRequest, cr *coreRequest) (output interface{}, err error) {
49+
subscription, _ := cr.or.GetSubscriptionByID(cr.ctx, r.PP["subid"])
50+
var startSeq int
51+
var endSeq int
52+
53+
if r.QP["startsequence"] != "" {
54+
startSeq, err = strconv.Atoi(r.QP["startsequence"])
55+
if err != nil {
56+
return nil, i18n.NewError(cr.ctx, coremsgs.MsgSequenceIDDidNotParseToInt, fmt.Sprintf("startsequence: %s", r.QP["startsequence"]))
57+
}
58+
} else {
59+
startSeq = -1
60+
}
61+
62+
if r.QP["endsequence"] != "" {
63+
endSeq, err = strconv.Atoi(r.QP["endsequence"])
64+
if err != nil {
65+
return nil, i18n.NewError(cr.ctx, coremsgs.MsgSequenceIDDidNotParseToInt, fmt.Sprintf("endsequence: %s", r.QP["endsequence"]))
66+
}
67+
} else {
68+
endSeq = -1
69+
}
70+
71+
return r.FilterResult(cr.or.GetSubscriptionEventsHistorical(cr.ctx, subscription, r.Filter, startSeq, endSeq))
72+
},
73+
},
74+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright © 2024 Kaleido, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package apiserver
18+
19+
import (
20+
"net/http/httptest"
21+
"testing"
22+
23+
"github.com/hyperledger/firefly/pkg/core"
24+
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/mock"
26+
)
27+
28+
func TestGetSubscriptionEventsFiltered(t *testing.T) {
29+
o, r := newTestAPIServer()
30+
o.On("Authorize", mock.Anything, mock.Anything).Return(nil)
31+
req := httptest.NewRequest("GET", "/api/v1/namespaces/mynamespace/subscriptions/abcd12345/events?startsequence=100&endsequence=200", nil)
32+
req.Header.Set("Content-Type", "application/json; charset=utf-8")
33+
res := httptest.NewRecorder()
34+
35+
o.On("GetSubscriptionByID", mock.Anything, "abcd12345").
36+
Return(&core.Subscription{}, nil)
37+
o.On("GetSubscriptionEventsHistorical", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
38+
Return([]*core.EnrichedEvent{}, nil, nil)
39+
40+
r.ServeHTTP(res, req)
41+
assert.Equal(t, 200, res.Result().StatusCode)
42+
}
43+
44+
func TestGetSubscriptionEventsFilteredStartSequenceIDDoesNotParse(t *testing.T) {
45+
o, r := newTestAPIServer()
46+
o.On("Authorize", mock.Anything, mock.Anything).Return(nil)
47+
req := httptest.NewRequest("GET", "/api/v1/namespaces/mynamespace/subscriptions/abcd12345/events?startsequence=helloworld", nil)
48+
req.Header.Set("Content-Type", "application/json; charset=utf-8")
49+
res := httptest.NewRecorder()
50+
o.On("GetSubscriptionByID", mock.Anything, "abcd12345").
51+
Return(&core.Subscription{}, nil)
52+
53+
r.ServeHTTP(res, req)
54+
assert.Equal(t, 400, res.Result().StatusCode)
55+
assert.Contains(t, res.Body.String(), "helloworld")
56+
}
57+
58+
func TestGetSubscriptionEventsFilteredEndSequenceIDDoesNotParse(t *testing.T) {
59+
o, r := newTestAPIServer()
60+
o.On("Authorize", mock.Anything, mock.Anything).Return(nil)
61+
req := httptest.NewRequest("GET", "/api/v1/namespaces/mynamespace/subscriptions/abcd12345/events?endsequence=helloworld", nil)
62+
req.Header.Set("Content-Type", "application/json; charset=utf-8")
63+
res := httptest.NewRecorder()
64+
o.On("GetSubscriptionByID", mock.Anything, "abcd12345").
65+
Return(&core.Subscription{}, nil)
66+
67+
r.ServeHTTP(res, req)
68+
assert.Equal(t, 400, res.Result().StatusCode)
69+
assert.Contains(t, res.Body.String(), "helloworld")
70+
}
71+
72+
func TestGetSubscriptionEventsFilteredNoSequenceIDsProvided(t *testing.T) {
73+
o, r := newTestAPIServer()
74+
o.On("Authorize", mock.Anything, mock.Anything).Return(nil)
75+
req := httptest.NewRequest("GET", "/api/v1/namespaces/mynamespace/subscriptions/abcd12345/events", nil)
76+
req.Header.Set("Content-Type", "application/json; charset=utf-8")
77+
res := httptest.NewRecorder()
78+
o.On("GetSubscriptionByID", mock.Anything, "abcd12345").
79+
Return(&core.Subscription{}, nil)
80+
o.On("GetSubscriptionEventsHistorical", mock.Anything, mock.Anything, mock.Anything, -1, -1).
81+
Return([]*core.EnrichedEvent{}, nil, nil)
82+
83+
r.ServeHTTP(res, req)
84+
assert.Equal(t, 200, res.Result().StatusCode)
85+
}

internal/apiserver/routes.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2023 Kaleido, Inc.
1+
// Copyright © 2024 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -110,6 +110,7 @@ var routes = append(
110110
getStatusBatchManager,
111111
getSubscriptionByID,
112112
getSubscriptions,
113+
getSubscriptionEventsFiltered,
113114
getTokenAccountPools,
114115
getTokenAccounts,
115116
getTokenApprovals,

internal/apiserver/server.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2023 Kaleido, Inc.
1+
// Copyright © 2024 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -23,8 +23,6 @@ import (
2323
"strings"
2424
"time"
2525

26-
"github.com/prometheus/client_golang/prometheus/promhttp"
27-
2826
"github.com/gorilla/mux"
2927
"github.com/hyperledger/firefly-common/pkg/config"
3028
"github.com/hyperledger/firefly-common/pkg/ffapi"
@@ -40,6 +38,7 @@ import (
4038
"github.com/hyperledger/firefly/internal/metrics"
4139
"github.com/hyperledger/firefly/internal/namespace"
4240
"github.com/hyperledger/firefly/internal/orchestrator"
41+
"github.com/prometheus/client_golang/prometheus/promhttp"
4342
)
4443

4544
var (

internal/coreconfig/coreconfig.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,8 @@ var (
339339
SubscriptionsRetryMaxDelay = ffc("subscription.retry.maxDelay")
340340
// SubscriptionsRetryFactor the backoff factor to use for retry of database operations
341341
SubscriptionsRetryFactor = ffc("subscription.retry.factor")
342+
// SubscriptionMaxHistoricalEventScanLength the maximum amount of historical events we scan for in the DB when indexing through old events against a subscription
343+
SubscriptionMaxHistoricalEventScanLength = ffc("subscription.events.maxScanLength")
342344
// TransactionWriterCount
343345
TransactionWriterCount = ffc("transaction.writer.count")
344346
// TransactionWriterBatchTimeout
@@ -459,6 +461,7 @@ func setDefaults() {
459461
viper.SetDefault(string(SubscriptionsRetryInitialDelay), "250ms")
460462
viper.SetDefault(string(SubscriptionsRetryMaxDelay), "30s")
461463
viper.SetDefault(string(SubscriptionsRetryFactor), 2.0)
464+
viper.SetDefault(string(SubscriptionMaxHistoricalEventScanLength), 1000)
462465
viper.SetDefault(string(TransactionWriterBatchMaxTransactions), 100)
463466
viper.SetDefault(string(TransactionWriterBatchTimeout), "10ms")
464467
viper.SetDefault(string(TransactionWriterCount), 5)

internal/coremsgs/en_api_translations.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2023 Kaleido, Inc.
1+
// Copyright © 2024 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -133,6 +133,7 @@ var (
133133
APIEndpointsGetWebSockets = ffm("api.endpoints.getStatusWebSockets", "Gets a list of the current WebSocket connections to this node")
134134
APIEndpointsGetStatus = ffm("api.endpoints.getStatus", "Gets the status of this namespace")
135135
APIEndpointsGetSubscriptionByID = ffm("api.endpoints.getSubscriptionByID", "Gets a subscription by its ID")
136+
APIEndpointsGetSubscriptionEventsFiltered = ffm("api.endpoints.getSubscriptionEventsFiltered", "Gets a collection of events filtered by the subscription for further filtering")
136137
APIEndpointsGetSubscriptions = ffm("api.endpoints.getSubscriptions", "Gets a list of subscriptions")
137138
APIEndpointsGetTokenAccountPools = ffm("api.endpoints.getTokenAccountPools", "Gets a list of token pools that contain a given token account key")
138139
APIEndpointsGetTokenAccounts = ffm("api.endpoints.getTokenAccounts", "Gets a list of token accounts")
@@ -208,4 +209,7 @@ var (
208209
APISmartContractDetails = ffm("api.smartContractDetails", "Additional smart contract details")
209210
APISmartContractDetailsKey = ffm("api.smartContractDetailsKey", "Key")
210211
APISmartContractDetailsValue = ffm("api.smartContractDetailsValue", "Value")
212+
213+
APISubscriptionStartSequenceID = ffm("api.startsequenceid", "The sequence ID in the raw event stream to start indexing through events from. Leave blank to start indexing from the most recent events")
214+
APISubscriptionEndSequenceID = ffm("api.endsequenceid", "The sequence ID in the raw event stream to stop indexing through events at. Leave blank to start indexing from the most recent events")
211215
)

internal/coremsgs/en_config_descriptions.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -383,9 +383,10 @@ var (
383383
ConfigPluginSharedstorageIpfsGatewayURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.url", "The URL for the IPFS Gateway", urlStringType)
384384
ConfigPluginSharedstorageIpfsGatewayProxyURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.proxy.url", "Optional HTTP proxy server to use when connecting to the IPFS Gateway", urlStringType)
385385

386-
ConfigSubscriptionMax = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType)
387-
ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType)
388-
ConfigSubscriptionDefaultsBatchTimeout = ffc("config.subscription.defaults.batchTimeout", "Default batch timeout", i18n.IntType)
386+
ConfigSubscriptionMax = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType)
387+
ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType)
388+
ConfigSubscriptionDefaultsBatchTimeout = ffc("config.subscription.defaults.batchTimeout", "Default batch timeout", i18n.IntType)
389+
ConfigSubscriptionMaxHistoricalEventScanLength = ffc("config.subscription.events.maxScanLength", "The maximum number of events a search for historical events matching a subscription will index from the database", i18n.IntType)
389390

390391
ConfigTokensName = ffc("config.tokens[].name", "A name to identify this token plugin", i18n.StringType)
391392
ConfigTokensPlugin = ffc("config.tokens[].plugin", "The type of the token plugin to use", i18n.StringType)

0 commit comments

Comments
 (0)