8
8
"github.com/go-kit/log"
9
9
"github.com/go-kit/log/level"
10
10
lru "github.com/hashicorp/golang-lru/v2"
11
+ "github.com/opentracing/opentracing-go"
11
12
"github.com/parquet-go/parquet-go"
12
13
"github.com/pkg/errors"
13
14
"github.com/prometheus-community/parquet-common/schema"
@@ -146,14 +147,17 @@ func NewParquetQueryable(
146
147
shards := make ([]* parquet_storage.ParquetShard , len (blocks ))
147
148
errGroup := & errgroup.Group {}
148
149
150
+ span , ctx := opentracing .StartSpanFromContext (ctx , "parquetQuerierWithFallback.OpenShards" )
151
+ defer span .Finish ()
152
+
149
153
for i , block := range blocks {
150
154
errGroup .Go (func () error {
151
155
cacheKey := fmt .Sprintf ("%v-%v" , userID , block .ID )
152
156
shard := cache .Get (cacheKey )
153
157
if shard == nil {
154
158
// we always only have 1 shard - shard 0
155
159
// Use context.Background() here as the file can be cached and live after the request ends.
156
- shard , err = parquet_storage .OpenParquetShard (context .Background ( ),
160
+ shard , err = parquet_storage .OpenParquetShard (context .WithoutCancel ( ctx ),
157
161
userBkt ,
158
162
block .ID .String (),
159
163
0 ,
@@ -165,7 +169,7 @@ func NewParquetQueryable(
165
169
parquet_storage .WithOptimisticReader (true ),
166
170
)
167
171
if err != nil {
168
- return err
172
+ return errors . Wrapf ( err , "failed to open parquet shard. block: %v" , block . ID . String ())
169
173
}
170
174
cache .Set (cacheKey , shard )
171
175
}
@@ -266,6 +270,9 @@ type parquetQuerierWithFallback struct {
266
270
}
267
271
268
272
func (q * parquetQuerierWithFallback ) LabelValues (ctx context.Context , name string , hints * storage.LabelHints , matchers ... * labels.Matcher ) ([]string , annotations.Annotations , error ) {
273
+ span , ctx := opentracing .StartSpanFromContext (ctx , "parquetQuerierWithFallback.LabelValues" )
274
+ defer span .Finish ()
275
+
269
276
remaining , parquet , err := q .getBlocks (ctx , q .minT , q .maxT )
270
277
defer q .incrementOpsMetric ("LabelValues" , remaining , parquet )
271
278
if err != nil {
@@ -312,6 +319,9 @@ func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name strin
312
319
}
313
320
314
321
func (q * parquetQuerierWithFallback ) LabelNames (ctx context.Context , hints * storage.LabelHints , matchers ... * labels.Matcher ) ([]string , annotations.Annotations , error ) {
322
+ span , ctx := opentracing .StartSpanFromContext (ctx , "parquetQuerierWithFallback.LabelNames" )
323
+ defer span .Finish ()
324
+
315
325
remaining , parquet , err := q .getBlocks (ctx , q .minT , q .maxT )
316
326
defer q .incrementOpsMetric ("LabelNames" , remaining , parquet )
317
327
if err != nil {
@@ -359,6 +369,9 @@ func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *stor
359
369
}
360
370
361
371
func (q * parquetQuerierWithFallback ) Select (ctx context.Context , sortSeries bool , h * storage.SelectHints , matchers ... * labels.Matcher ) storage.SeriesSet {
372
+ span , ctx := opentracing .StartSpanFromContext (ctx , "parquetQuerierWithFallback.Select" )
373
+ defer span .Finish ()
374
+
362
375
userID , err := tenant .TenantID (ctx )
363
376
if err != nil {
364
377
storage .ErrSeriesSet (err )
@@ -408,6 +421,8 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
408
421
p := make (chan storage.SeriesSet , 1 )
409
422
promises = append (promises , p )
410
423
go func () {
424
+ span , _ := opentracing .StartSpanFromContext (ctx , "parquetQuerier.Select" )
425
+ defer span .Finish ()
411
426
p <- q .parquetQuerier .Select (InjectBlocksIntoContext (ctx , parquet ... ), sortSeries , & hints , matchers ... )
412
427
}()
413
428
}
0 commit comments