diff --git a/enginerequests.go b/enginerequests.go index 5baf373..9224a32 100644 --- a/enginerequests.go +++ b/enginerequests.go @@ -97,8 +97,10 @@ func (e *Engine) HandleQuery(ctx context.Context, query *sdp.Query) { var pub sdp.EncodedConnection if e.IsNATSConnected() { + span.SetAttributes(attribute.Bool("ovm.nats.connected", true)) pub = e.natsConnection } else { + span.SetAttributes(attribute.Bool("ovm.nats.connected", false)) pub = NilConnection{} } @@ -160,6 +162,8 @@ func (e *Engine) ExecuteQuerySync(ctx context.Context, q *sdp.Query) ([]*sdp.Ite return items, errs, err } +var executionPoolCount atomic.Int32 + // ExecuteQuery Executes a single Query and returns the results without any // linking. Will return an error if all sources fail, or the Query couldn't be // run. @@ -208,6 +212,7 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan< wg := sync.WaitGroup{} for q, sources := range expanded { wg.Add(1) + executionPoolCount.Add(1) // localize values for the closure below q, sources := q, sources @@ -216,9 +221,13 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, items chan< go func() { // queue everything into the execution pool defer LogRecoverToReturn(ctx, "ExecuteQuery outer") + span.SetAttributes( + attribute.Int("ovm.discovery.executionPoolCount", int(executionPoolCount.Load())), + ) e.executionPool.Go(func() { defer LogRecoverToReturn(ctx, "ExecuteQuery inner") defer wg.Done() + defer executionPoolCount.Add(-1) var queryItems []*sdp.Item var queryErrors []*sdp.QueryError numSources.Add(1)