Skip to content

Commit

Permalink
Remove in-engine linking (#265)
Browse files Browse the repository at this point in the history
* Remove in-engine linking

We are having performance issues which I believe
is due to the linking inside the engine. Since
this is done by the gateway now there is no real
reason why this should be done in the engine too
  • Loading branch information
dylanratcliffe authored Apr 16, 2024
1 parent 09bf389 commit 4507d28
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 446 deletions.
79 changes: 2 additions & 77 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,6 @@ func newStartedEngine(t *testing.T, name string, no *auth.NATSOptions, sources .
return e
}

func TestDeleteQuery(t *testing.T) {
one := &sdp.LinkedItemQuery{Query: &sdp.Query{
Scope: "one",
Method: sdp.QueryMethod_LIST,
Query: "",
}}
two := &sdp.LinkedItemQuery{Query: &sdp.Query{
Scope: "two",
Method: sdp.QueryMethod_SEARCH,
Query: "2",
}}
irs := []*sdp.LinkedItemQuery{
one,
two,
}

deleted := deleteQuery(irs, two)

if len(deleted) > 1 {
t.Errorf("Item not successfully deleted: %v", irs)
}
}

func TestTrackQuery(t *testing.T) {
t.Run("With normal query", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -274,33 +251,6 @@ func TestNats(t *testing.T) {
}
})

t.Run("Handling a deeply linking query", func(t *testing.T) {
t.Cleanup(func() {
src.ClearCalls()
e.ClearCache()
})

req := sdp.NewQueryProgress(&sdp.Query{
Type: "person",
Method: sdp.QueryMethod_GET,
Query: "deeplink",
RecursionBehaviour: &sdp.Query_RecursionBehaviour{
LinkDepth: 10,
},
Scope: "test",
})

_, _, err := req.Execute(context.Background(), e.natsConnection)

if err != nil {
t.Error(err)
}

if len(src.GetCalls) != 11 {
t.Errorf("expected 11 get calls, got %v: %v", len(src.GetCalls), src.GetCalls)
}
})

t.Run("stopping", func(t *testing.T) {
err := e.Stop()

Expand Down Expand Up @@ -330,7 +280,7 @@ func TestNatsCancel(t *testing.T) {
e.MaxParallelExecutions = 1

src := SpeedTestSource{
QueryDelay: 250 * time.Millisecond,
QueryDelay: 2 * time.Second,
ReturnType: "person",
ReturnScopes: []string{"test"},
}
Expand Down Expand Up @@ -369,7 +319,7 @@ func TestNatsCancel(t *testing.T) {
t.Error(err)
}

time.Sleep(1 * time.Second)
time.Sleep(250 * time.Millisecond)

conn.Publish(context.Background(), "cancel.all", &sdp.CancelQuery{
UUID: u[:],
Expand Down Expand Up @@ -680,31 +630,6 @@ func TestNatsAuth(t *testing.T) {
}
})

t.Run("Handling a deeply linking query", func(t *testing.T) {
t.Cleanup(func() {
src.ClearCalls()
e.ClearCache()
})

_, _, err := sdp.NewQueryProgress(&sdp.Query{
Type: "person",
Method: sdp.QueryMethod_GET,
Query: "deeplink",
RecursionBehaviour: &sdp.Query_RecursionBehaviour{
LinkDepth: 10,
},
Scope: "test",
}).Execute(context.Background(), e.natsConnection)

if err != nil {
t.Error(err)
}

if len(src.GetCalls) != 11 {
t.Errorf("expected 11 get calls, got %v: %v", len(src.GetCalls), src.GetCalls)
}
})

t.Run("stopping", func(t *testing.T) {
err := e.Stop()

Expand Down
25 changes: 2 additions & 23 deletions enginerequests.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,29 +221,6 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan<
queryItems, queryErrors = e.Execute(ctx, q, sources)

for _, i := range queryItems {
// If the main query had a linkDepth of greater than zero it means we
// need to keep linking, this means that we need to pass down all of the
// subject info along with the number of remaining links. If the link
// depth is zero then we just pass then back in their normal form as we
// won't be executing them
if query.RecursionBehaviour.GetLinkDepth() > 0 {
for _, liq := range i.LinkedItemQueries {
liq.Query.RecursionBehaviour = &sdp.Query_RecursionBehaviour{
LinkDepth: query.RecursionBehaviour.GetLinkDepth() - 1,
FollowOnlyBlastPropagation: query.RecursionBehaviour.GetFollowOnlyBlastPropagation(),
}
if query.RecursionBehaviour.GetFollowOnlyBlastPropagation() && !liq.BlastPropagation.GetOut() {
// we're only following blast propagation, so do not link this item further
// TODO: we might want to drop the link completely if this returns too much
// information, but that could risk missing revlinks
liq.Query.RecursionBehaviour.LinkDepth = 0
}
liq.Query.IgnoreCache = query.IgnoreCache
liq.Query.Deadline = query.Deadline
liq.Query.UUID = query.UUID
}
}

// Assign the source query
if i.Metadata != nil {
i.Metadata.SourceQuery = query
Expand Down Expand Up @@ -306,6 +283,8 @@ func (e *Engine) callSources(ctx context.Context, q *sdp.Query, relevantSources

// Check that our context is okay before doing anything expensive
if ctx.Err() != nil {
span.RecordError(ctx.Err())

return nil, []*sdp.QueryError{
{
UUID: q.UUID,
Expand Down
5 changes: 0 additions & 5 deletions enginerequests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ func TestExecuteQuery(t *testing.T) {
}

item := items[0]
query := item.LinkedItemQueries[0].Query

if ld := query.GetRecursionBehaviour().GetLinkDepth(); ld != 2 {
t.Errorf("expected linked item depth to be 1 less than the query (2), got %v", ld)
}

if item.Metadata.SourceQuery != q {
t.Error("source query mismatch")
Expand Down
6 changes: 3 additions & 3 deletions meta_sources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func TestTypeSource(t *testing.T) {
s := &TypeSource{
sh: newTestSourceHost(t),
sh: newTestSourceHost(),
}

t.Run("satisfies Source interface", func(t *testing.T) {
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestTypeSource(t *testing.T) {

func TestScopeSource(t *testing.T) {
s := &ScopeSource{
sh: newTestSourceHost(t),
sh: newTestSourceHost(),
}

t.Run("satisfies Source interface", func(t *testing.T) {
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestScopeSource(t *testing.T) {
})
}

func newTestSourceHost(t *testing.T) *SourceHost {
func newTestSourceHost() *SourceHost {
sh := NewSourceHost()
sh.AddSources(
&TestSource{
Expand Down
4 changes: 2 additions & 2 deletions nats_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func SkipWithoutNats(t *testing.T) {
var err error

for _, url := range NatsTestURLs {
err := testURL(url)
err = testURL(url)

if err == nil {
return
Expand All @@ -46,7 +46,7 @@ func SkipWithoutNatsAuth(t *testing.T) {
var err error

for _, url := range NatsAuthTestURLs {
err := testURL(url)
err = testURL(url)

if err == nil {
return
Expand Down
7 changes: 0 additions & 7 deletions performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,6 @@ func TestParallelQueryPerformance(t *testing.T) {
RunLinearPerformanceTest(t, "100 queries", 100, 0, 10)
RunLinearPerformanceTest(t, "1,000 queries", 1000, 0, 100)
})

t.Run("With linking", func(t *testing.T) {
RunLinearPerformanceTest(t, "1 query 3 depth", 1, 3, 1)
RunLinearPerformanceTest(t, "1 query 3 depth", 1, 3, 100)
RunLinearPerformanceTest(t, "1 query 5 depth", 1, 5, 100)
RunLinearPerformanceTest(t, "10 queries 5 depth", 10, 5, 100)
})
}

// RunLinearPerformanceTest Runs a test with a given number in input queries,
Expand Down
Loading

0 comments on commit 4507d28

Please sign in to comment.