Skip to content

Commit

Permalink
Improve the way contexts are tracked (#163)
Browse files Browse the repository at this point in the history
* Improve the way contexts are tracked

This should also address quite a large memory leak since all results from all queries were being stored forever

* Fixed looping issues

* Addressed comments
  • Loading branch information
dylanratcliffe authored Aug 16, 2023
1 parent 18db162 commit f29b137
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 94 deletions.
2 changes: 1 addition & 1 deletion engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (e *Engine) HandleCancelQuery(ctx context.Context, cancelQuery *sdp.CancelQ
return
}

if rt != nil {
if rt != nil && rt.Cancel != nil {
log.WithFields(log.Fields{
"UUID": u.String(),
}).Debug("Cancelling query")
Expand Down
4 changes: 3 additions & 1 deletion engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func TestNatsCancel(t *testing.T) {
}
})

t.Run("Cancelling querys", func(t *testing.T) {
t.Run("Cancelling queries", func(t *testing.T) {
conn := e.natsConnection
u := uuid.New()

Expand Down Expand Up @@ -380,6 +380,8 @@ func TestNatsCancel(t *testing.T) {
for range errs {
}

time.Sleep(250 * time.Millisecond)

if progress.NumCancelled() != 1 {
t.Errorf("Expected query to be cancelled, got\n%v", progress.String())
}
Expand Down
73 changes: 42 additions & 31 deletions enginerequests.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (e *Engine) HandleQuery(ctx context.Context, query *sdp.Query) {
timeoutOverride = true
}

// Add the query timeout to the context stack
ctx, cancel := query.TimeoutContext(ctx)
defer cancel()

numExpandedQueries := len(e.sh.ExpandQuery(query))

// Extract and parse the UUID
Expand Down Expand Up @@ -96,27 +100,22 @@ func (e *Engine) HandleQuery(ctx context.Context, query *sdp.Query) {
pub = NilConnection{}
}

// The context we were given will be context.Background(), we want to create
// a child context from here that will be specific to the request. This will
// be passed to the responder, which will cancel that context if it's unable
// to send responses (i.e. there is no longer anyone listening)
//
ctx, cancel := context.WithCancel(ctx)

responder.Start(
ctx,
cancel,
pub,
e.Name,
)

qt := QueryTracker{
Query: query,
Engine: e,
Query: query,
Engine: e,
Context: ctx,
Cancel: cancel,
}

if uuidErr == nil {
e.TrackQuery(u, &qt)
defer e.DeleteTrackedQuery(u)
}

_, _, err := qt.Execute(ctx)
Expand Down Expand Up @@ -321,6 +320,20 @@ func (e *Engine) callSources(ctx context.Context, r *sdp.Query, relevantSources
))
defer span.End()

// Check that our context is okay before doing anything expensive
if ctx.Err() != nil {
return nil, []*sdp.QueryError{
{
UUID: r.UUID,
ErrorType: sdp.QueryError_OTHER,
ErrorString: ctx.Err().Error(),
Scope: r.Scope,
ResponderName: e.Name,
ItemType: r.Type,
},
}
}

items := make([]*sdp.Item, 0)
errs := make([]*sdp.QueryError, 0)

Expand Down Expand Up @@ -456,33 +469,31 @@ func (e *Engine) callSources(ctx context.Context, r *sdp.Query, relevantSources
var err error
var sourceDuration time.Duration

func(ctx context.Context) {
start := time.Now()
start := time.Now()

switch method {
case Get:
var newItem *sdp.Item
switch method {
case Get:
var newItem *sdp.Item

newItem, err = src.Get(ctx, r.Scope, r.Query)
newItem, err = src.Get(ctx, r.Scope, r.Query)

if err == nil {
resultItems = []*sdp.Item{newItem}
}
case List:
resultItems, err = src.List(ctx, r.Scope)
case Search:
if searchableSrc, ok := src.(SearchableSource); ok {
resultItems, err = searchableSrc.Search(ctx, r.Scope, r.Query)
} else {
err = &sdp.QueryError{
ErrorType: sdp.QueryError_NOTFOUND,
ErrorString: "source is not searchable",
}
if err == nil {
resultItems = []*sdp.Item{newItem}
}
case List:
resultItems, err = src.List(ctx, r.Scope)
case Search:
if searchableSrc, ok := src.(SearchableSource); ok {
resultItems, err = searchableSrc.Search(ctx, r.Scope, r.Query)
} else {
err = &sdp.QueryError{
ErrorType: sdp.QueryError_NOTFOUND,
ErrorString: "source is not searchable",
}
}
}

sourceDuration = time.Since(start)
}(ctx)
sourceDuration = time.Since(start)

span.SetAttributes(
attribute.Int("om.source.numItems", len(resultItems)),
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/goombaio/namegenerator v0.0.0-20181006234301-989e774b106e
github.com/nats-io/nats-server/v2 v2.9.21
github.com/nats-io/nats.go v1.28.0
github.com/overmindtech/sdp-go v0.43.0
github.com/overmindtech/sdp-go v0.44.1
github.com/overmindtech/sdpcache v1.5.0
github.com/sirupsen/logrus v1.9.3
github.com/sourcegraph/conc v0.3.0
Expand Down Expand Up @@ -38,11 +38,11 @@ require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/oauth2 v0.10.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
Expand Down
30 changes: 12 additions & 18 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/getsentry/sentry-go v0.22.0 h1:XNX9zKbv7baSEI65l+H1GEJgSeIC1c7EN5kluWaP6dM=
github.com/getsentry/sentry-go v0.22.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
github.com/getsentry/sentry-go v0.23.0 h1:dn+QRCeJv4pPt9OjVXiMcGIBIefaTJPw/h0bZWO05nE=
github.com/getsentry/sentry-go v0.23.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
Expand Down Expand Up @@ -38,8 +36,6 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.9.20 h1:bt1dW6xsL1hWWwv7Hovm+EJt5L6iplyqlgEFkoEUk0k=
github.com/nats-io/nats-server/v2 v2.9.20/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw=
github.com/nats-io/nats-server/v2 v2.9.21 h1:2TBTh0UDE74eNXQmV4HofsmRSCiVN0TH2Wgrp6BD6fk=
github.com/nats-io/nats-server/v2 v2.9.21/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU=
github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c=
Expand All @@ -50,10 +46,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/overmindtech/api-client v0.14.0 h1:zXyjJsIeawNqoWv7FqOjwcqgFpLrDYz7l9MWqh1G9ZQ=
github.com/overmindtech/api-client v0.14.0/go.mod h1:msdkTAQFlvDGOU4tQk2adk2P8j23uaMWkJ9YRX4wGWI=
github.com/overmindtech/sdp-go v0.42.0 h1:nS/8XLhVn2SneDiPt5wClIosbwvMB+LMGn96Vj360bc=
github.com/overmindtech/sdp-go v0.42.0/go.mod h1:s/CnUoFH5WfugwQ6+v8M+0JXvCMlq1/bz2ha2G7B92E=
github.com/overmindtech/sdp-go v0.43.0 h1:1BVry/jSKABqEYKmLMIC8XCJIcWt0iWDDHU7AVROJwE=
github.com/overmindtech/sdp-go v0.43.0/go.mod h1:s/CnUoFH5WfugwQ6+v8M+0JXvCMlq1/bz2ha2G7B92E=
github.com/overmindtech/sdp-go v0.44.1 h1:R00LfcqpXBMj/S4PYuATvyZIW/2AykQylv52XMiNK2U=
github.com/overmindtech/sdp-go v0.44.1/go.mod h1:9/yAXfMAAC0CQZ/pe3lxti/xL0cK27iOWpxYv11r5aw=
github.com/overmindtech/sdpcache v1.5.0 h1:QzHWQm1KGN9rNHPb/VZvz7WDCsyKOuVLlNUGF2CIFGc=
github.com/overmindtech/sdpcache v1.5.0/go.mod h1:GFMMle860EWMDQXbk6dhLVSQrV0YlEqqJ6/VNxINb0o=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
Expand Down Expand Up @@ -82,24 +76,24 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/oauth2 v0.10.0 h1:zHCpF2Khkwy4mMB4bv0U37YtJdTGW8jI0glAApi0Kh8=
golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI=
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU=
golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
1 change: 1 addition & 0 deletions performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestParallelQueryPerformance(t *testing.T) {
// goroutines running will start to make the response times non-linear which
// maybe isn't ideal but given realistic loads we probably don't care.
t.Run("Without linking", func(t *testing.T) {
RunLinearPerformanceTest(t, "1 query", 1, 0, 1)
RunLinearPerformanceTest(t, "10 queries", 10, 0, 1)
RunLinearPerformanceTest(t, "100 queries", 100, 0, 10)
RunLinearPerformanceTest(t, "1,000 queries", 1000, 0, 100)
Expand Down
36 changes: 14 additions & 22 deletions querytracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ type QueryTracker struct {
// The query to track
Query *sdp.Query

Context context.Context // The context that this query is running in
Cancel context.CancelFunc // The cancel function for the context

// The engine that this is connected to, used for sending NATS messages
Engine *Engine

Expand All @@ -35,10 +38,6 @@ type QueryTracker struct {
// The keys in this map are the GloballyUniqueName to speed up searching
linkedItems map[string]*sdp.Item
linkedItemsMutex sync.RWMutex

// cancelFunc A function that will cancel all queries when called
cancelFunc context.CancelFunc
cancelFuncMutex sync.Mutex
}

func (qt *QueryTracker) LinkedItems() []*sdp.Item {
Expand Down Expand Up @@ -96,7 +95,12 @@ func (qt *QueryTracker) startLinking(ctx context.Context) {
select {
case <-ctx.Done():
return
case unlinkedItem := <-qt.unlinkedItems:
case unlinkedItem, ok := <-qt.unlinkedItems:
if !ok {
// Return if the channel is closed
return
}

if unlinkedItem != nil {
go func(i *sdp.Item) {
defer qt.unlinkedItemsWG.Done()
Expand Down Expand Up @@ -257,6 +261,8 @@ func (qt *QueryTracker) stopLinking() {
// relevant nats subjects. Returns the full list of items, errors, and a final
// error. The final error will be populated if all sources failed, or some other
// error was encountered while trying run the query
//
// If the context is cancelled, all query work will stop
func (qt *QueryTracker) Execute(ctx context.Context) ([]*sdp.Item, []*sdp.QueryError, error) {
if qt.unlinkedItems == nil {
qt.unlinkedItems = make(chan *sdp.Item)
Expand All @@ -275,13 +281,6 @@ func (qt *QueryTracker) Execute(ctx context.Context) ([]*sdp.Item, []*sdp.QueryE
errChan := make(chan error)
sdpErrs := make([]*sdp.QueryError, 0)

// Create context to enforce timeouts
ctx, cancel := qt.Query.TimeoutContext(ctx)
qt.cancelFuncMutex.Lock()
qt.cancelFunc = cancel
qt.cancelFuncMutex.Unlock()
defer cancel()

qt.startLinking(ctx)

// Run the query
Expand Down Expand Up @@ -319,6 +318,9 @@ func (qt *QueryTracker) Execute(ctx context.Context) ([]*sdp.Item, []*sdp.QueryE
} else {
errs = nil
}
case <-ctx.Done():
// If the context is closed, return an error
return qt.LinkedItems(), sdpErrs, ctx.Err()
}

if items == nil && errs == nil {
Expand All @@ -341,16 +343,6 @@ func (qt *QueryTracker) Execute(ctx context.Context) ([]*sdp.Item, []*sdp.QueryE
return qt.LinkedItems(), sdpErrs, ctx.Err()
}

// Cancel Cancels the currently running query
func (qt *QueryTracker) Cancel() {
qt.cancelFuncMutex.Lock()
defer qt.cancelFuncMutex.Unlock()

if qt.cancelFunc != nil {
qt.cancelFunc()
}
}

// deleteQuery Deletes an item query from a slice
func deleteQuery(queries []*sdp.LinkedItemQuery, remove *sdp.LinkedItemQuery) []*sdp.LinkedItemQuery {
finalQueries := make([]*sdp.LinkedItemQuery, 0)
Expand Down
Loading

0 comments on commit f29b137

Please sign in to comment.