Skip to content

Commit

Permalink
Merge pull request #278 from overmindtech/mehtrics
Browse files Browse the repository at this point in the history
Add additional metrics to capture execution pool load
  • Loading branch information
DavidS-ovm committed May 15, 2024
2 parents 39e51f6 + 2140377 commit e099e3d
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions enginerequests.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ func (e *Engine) HandleQuery(ctx context.Context, query *sdp.Query) {
var pub sdp.EncodedConnection

if e.IsNATSConnected() {
span.SetAttributes(attribute.Bool("ovm.nats.connected", true))
pub = e.natsConnection
} else {
span.SetAttributes(attribute.Bool("ovm.nats.connected", false))
pub = NilConnection{}
}

Expand Down Expand Up @@ -160,6 +162,8 @@ func (e *Engine) ExecuteQuerySync(ctx context.Context, q *sdp.Query) ([]*sdp.Ite
return items, errs, err
}

var executionPoolCount atomic.Int32

// ExecuteQuery Executes a single Query and returns the results without any
// linking. Will return an error if all sources fail, or the Query couldn't be
// run.
Expand Down Expand Up @@ -208,6 +212,7 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan<
wg := sync.WaitGroup{}
for q, sources := range expanded {
wg.Add(1)
executionPoolCount.Add(1)
// localize values for the closure below
q, sources := q, sources

Expand All @@ -216,9 +221,13 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan<
go func() {
// queue everything into the execution pool
defer LogRecoverToReturn(ctx, "ExecuteQuery outer")
span.SetAttributes(
attribute.Int("ovm.discovery.executionPoolCount", int(executionPoolCount.Load())),
)
e.executionPool.Go(func() {
defer LogRecoverToReturn(ctx, "ExecuteQuery inner")
defer wg.Done()
defer executionPoolCount.Add(-1)
var queryItems []*sdp.Item
var queryErrors []*sdp.QueryError
numSources.Add(1)
Expand Down

0 comments on commit e099e3d

Please sign in to comment.