-
Notifications
You must be signed in to change notification settings - Fork 211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add an API to allow for querying events under a subscription with additional filtering #1452
Conversation
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1452 +/- ##
=======================================
Coverage 99.99% 99.99%
=======================================
Files 321 322 +1
Lines 23175 23270 +95
=======================================
+ Hits 23173 23268 +95
Misses 1 1
Partials 1 1 ☔ View full report in Codecov by Sentry. |
Signed-off-by: SamMayWork <[email protected]>
WRT
To my knowledge the only way around these problems would be:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great @SamMayWork - very helpful addition.
Returned with some thoughts on what i hope are small additions before it merges.
internal/events/event_manager.go
Outdated
@@ -303,3 +305,104 @@ func (em *eventManager) EnrichEvent(ctx context.Context, event *core.Event) (*co | |||
func (em *eventManager) QueueBatchRewind(batchID *fftypes.UUID) { | |||
em.aggregator.queueBatchRewind(batchID) | |||
} | |||
|
|||
func (em *eventManager) FilterEventsOnSubscription(events []*core.EnrichedEvent, subscription *core.Subscription) []*core.EnrichedEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this function added, but I don't see the old code (which I assume was in a place that wasn't already in a function) removed. Can you help me with where you've moved this from?
Want to make sure we don't have a duplicate copy of the same filtering code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code was duplicated, I've pulled the code out and pushed it down now. The complexity here is that we've got 3 variations of a subscription and the existing filtering code that is being used here is on the event-delivery kind of subscription (which does not use the underlying fields).
In commit de6d46b, I've pushed the filtering code down into the internal event-delivery kind of subscription, and then parsed the inbound subscription to that type, and then performed the filtering, which now means we've got filtering being done in a single place.
func (or *orchestrator) GetSubscriptionEventsHistorical(ctx context.Context, subscription *core.Subscription, filter ffapi.AndFilter) ([]*core.EnrichedEvent, *ffapi.FilterResult, error) { | ||
|
||
// Internally we need to know the limit/count from the inbound filter | ||
inboundFilterOptions, err := filter.Finalize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry @SamMayWork - I'm missing where we pass this filter into our DB filter.
I see ssFilter.And()
, but it's not ssFilter.And(filter)
- so the pre-filtering the user has supplied on which thing they're looking for in the andFilter seems to be missed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in 33294f1 but will need a retest quickly.
Edit: Doing some basic filtering using the options seems to work as expected now! 🚀
internalSkip := 0 | ||
ssFilter.Limit(uint64(internalLimit)) | ||
|
||
for len(subscriptionFilteredEvents) < int(finalDesiredOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the easiest solution to the o(n)
safety issue (which is a significant problem) is to have a server-configurable (not API configurable) limit on the max scan length for this.
I would suggest something like 1000 by default.
If the query reaches this limit before either:
- Reaching the end of the table
- Reaching the full
limit
specified
Then it returns with an error result. For example:
Event scan limit reached after matching X events and skipping Y events. Please restrict your query to a narrower range
It's far from a perfect solution, but in the vast majority of cases I think it gives a solution.
Either:
- You have lots of events that match your subscription, so your page size will find 25 (or whatever) events in the last 1000 events
- You are hunting for one specific event, so the pre-filter (discussed in my comment above) will sub-select to less than 1000 records. Either with a time-range, or with a spear-fishing approach (like looking for one specific event)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation for these changes is split across commits de6d46b and 7e22a2c.
This approach means that:
- Supplying a skip/limit in the initial query now applies specifically to the amount of unfiltered events that are processed on the backend and you cannot control how many results you get from the API*1
- If the amount of records being indexed exceeds the total amount configured on the namespace, no results are returned to the user, and instead they get an error indicating that they need to refine their query
Crucially, this approach presents a new problem worth noting: how do I as the caller of the API, know how to adjust my skip/limit so that I can receive the next sequentially filtered event?
Suppose 1000 unfiltered events, with 2 matching events at position [0:1], and a matching event at [999]. My first query might be skip:0 limit:100
which would return me events 0 and 1, but how would I get event 2 which is at unfiltered index 999?
Currently the answer to this is: 'Keep adjusting the skip/limit until you find it' which is fair, but depending on the use-case could be a frustrating answer. For us being able to supply information to make this easier for users is difficult because we as FireFly don't have the intelligence required to know where that index is.
*1 - The skip/limit allows the user to configure how many unfiltered events are indexed, and then returns all of the matching events within that bound, we then assume that the user will then limit this down to be the number of events they want to search on their side.
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Spotted a problem during testing that might be a bit tricky to solve. Given there are likely to be X number of events on the chain where X is a high number, being restricted to skipping 1000 records (as is the default API behaviour) is super restrictive. There's an entirely valid use-case for skip 90,000 records and then return me 200 records matching my filter but this would not work as of right now. I've pushed changes in this commit 52c191e which allows for a custom (and ✨ configurable ✨) number of events to be skipped, buuuuuut this doesn't play nicely with the swagger generation, so the generated API documentation isn't updated to reflect the new value. This means there's a disconnect right now between what our backend supports and what we say it supports in the swagger. Needs more thinking for a workaround... EDIT: Commit 80ddde2 has a nifty but imperfect workaround... Nifty: After the swagger is generated by the core libraries, we grab the params and overwrite the description on the skip field with our own description with the actually correct value for the maximum supported sip Imperfect: If another API has the same issue at this API, they'll need to duplicate the code I've added to change the values on their API. What would be great would be if the values for skip/limit were configurable per route and then that got read in through to the Swagger get, but this is likely a larger item of work. |
Signed-off-by: SamMayWork <[email protected]>
In discussion w/ @nguyer, we've landed on some further changes for this PR given the fact that we're concerned about the usefulness of this API if we don't solve the N+1 events problem. We've agreed on the following changes:
This means that assuming that you hit the limit of matching events before you hit the end of the sequence range, you can get the next event in the sequence by calling the API again and providing the starting sequence ID to be the sequence ID of the last record. The hope is that the DB should be way faster indexing by sequence ID so performance should be come significantly less of an issue. These changes semantically change how we're using some of the values so in the new world this is what we have:
EDIT: Will also need to make the new configuration options make sense with the changes listed above, probably going to remove both the new options and have a configurable limit on the difference between the start and end sequence IDs. Changes on the way 🚀 |
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
cols := append([]string{}, eventColumns...) | ||
cols = append(cols, s.SequenceColumn()) | ||
|
||
query := sq.Select(cols...).FromSelect(sq.Select(cols...).From(eventsTable).Where(sq.GtOrEq{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling this out as something potentially worth of discussion, initially I wanted to use the BETWEEN
operator referencing the sequence ID but squirrel (the underlying SQL construction lib we're using) does not support building queries with BETWEEN
.
The other option was to use a sub-query and then apply a limit and offset but this was not good from a performance perspective, so I've gone for a sub-query and a WHERE
which should jump straight to the correct sequence ID. Comparing the use of OFFSET and this approach showed roughly a ~140ms difference in speed.
~440ms for a query using OFFSET, searching for 1000 records between sequence ID 100,000 and 101,000
~300ms for a query using WHERE, searching for 1000 records between sequence ID 100,000 and 101,000
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment above about the SQL performance of this query syntax, but I think we're in a good place for a re-review now 🚀 |
Signed-off-by: SamMayWork <[email protected]>
Looks like codecov is just having trouble talking to GitHub. I confirmed by checking out this branch myself that it does not lower test coverage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the all the back and forth on this. I think it's ending up in a really good spot though. One more question though: as the user of this API, if I don't know what the current sequence number (for the most recent event) is, how do I start using this API?
I get how to use it once we have a sequence number as a reference and we can go higher or lower from there. I think most of the time users are going to be interested in the most recent n
events though and don't know where to start from.
Some initial thoughts on this here:
I think option 2 here is just bad since we're going to need to make another query to the DB to find out the current highest index and then tag this onto a request that the user might not want to make. I.e. If I was a user, and wanted the most recent event I'd need to make a request I don't care about first to get the index so I can make a request I care about - that doesn't make sense. Option 1 is the way forward, and it's consistent with the
So one little wrinkle though is making sure that the Swagger reflects the behaviour and that the user is informed that if they don't provide a value, they'll get the most recent events. The (*1) - Default max range is 1000 records, through configuration this value could be changed EDIT: I think this should be addressed now in 0395957 |
Signed-off-by: SamMayWork <[email protected]>
Signed-off-by: SamMayWork <[email protected]>
Right, so it looks like there's a flaky e2e at the moment which I've seen pass and fail on different commits on this branch, I'll dig into it here for a little bit, but I'd be surprised if the changes I have made would be causing this test to fail given the test is failing during multi-party set up, initial look suggests that one of the nodes is failing to register its org. EDIT: Spent some time looking at this, and it looks like the tests are a bit flaky on main too, I think for now we should re-run the failing E2E on this PR, and it should work. Probably worth a separate item to look into the E2E failures. Note: Could not get the E2E's running on my M1 mac, looking in the Dockerfile there's a lot of AMD64 stuff, could spend some cycles looking into this if required but not sure it's worth the time given we know this is a flaky test. |
All requested changes have been addressed
ref: #1443
Adds a new fancy API:
subscriptions/{subid}/events
My current concern is that this query is an expensive operation because by necessity, it needs to be O(n) as we have to go through all of the events to find those that match the subscription type. Additionally, skip provided by the user does not help us here as we don't know how many of the subscribed events actually match our filter. If the provided filter does not match any events, we will iterate through all of the events in the DB.
Local benchmark for processing ~93,000 events in FireFly was ~30 seconds on this API with a subscription that matches no events (I.e. going through all of the events and running checks on them). An internal limit of <100 takes roughly 30-45 seconds to complete, higher figures tend more towards ~30 seconds. Increasing the internal limit past 200 does not substantially decrease the time required for the query.