@@ -68,7 +68,7 @@ func getBlockStoreType(ctx context.Context, defaultBlockStoreType blockStoreType
68
68
69
69
type parquetQueryableFallbackMetrics struct {
70
70
blocksQueriedTotal * prometheus.CounterVec
71
- selectCount * prometheus.CounterVec
71
+ operationsTotal * prometheus.CounterVec
72
72
}
73
73
74
74
func newParquetQueryableFallbackMetrics (reg prometheus.Registerer ) * parquetQueryableFallbackMetrics {
@@ -77,10 +77,10 @@ func newParquetQueryableFallbackMetrics(reg prometheus.Registerer) *parquetQuery
77
77
Name : "cortex_parquet_queryable_blocks_queried_total" ,
78
78
Help : "Total number of blocks found to query." ,
79
79
}, []string {"type" }),
80
- selectCount : promauto .With (reg ).NewCounterVec (prometheus.CounterOpts {
81
- Name : "cortex_parquet_queryable_selects_queried_total " ,
82
- Help : "Total number of selects ." ,
83
- }, []string {"type" }),
80
+ operationsTotal : promauto .With (reg ).NewCounterVec (prometheus.CounterOpts {
81
+ Name : "cortex_parquet_queryable_operations_total " ,
82
+ Help : "Total number of Operations ." ,
83
+ }, []string {"type" , "method" }),
84
84
}
85
85
}
86
86
@@ -267,6 +267,7 @@ type parquetQuerierWithFallback struct {
267
267
268
268
func (q * parquetQuerierWithFallback ) LabelValues (ctx context.Context , name string , hints * storage.LabelHints , matchers ... * labels.Matcher ) ([]string , annotations.Annotations , error ) {
269
269
remaining , parquet , err := q .getBlocks (ctx , q .minT , q .maxT )
270
+ defer q .incrementOpsMetric ("LabelValues" , remaining , parquet )
270
271
if err != nil {
271
272
return nil , nil , err
272
273
}
@@ -312,6 +313,7 @@ func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name strin
312
313
313
314
func (q * parquetQuerierWithFallback ) LabelNames (ctx context.Context , hints * storage.LabelHints , matchers ... * labels.Matcher ) ([]string , annotations.Annotations , error ) {
314
315
remaining , parquet , err := q .getBlocks (ctx , q .minT , q .maxT )
316
+ defer q .incrementOpsMetric ("LabelNames" , remaining , parquet )
315
317
if err != nil {
316
318
return nil , nil , err
317
319
}
@@ -356,7 +358,7 @@ func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *stor
356
358
return result , rAnnotations , nil
357
359
}
358
360
359
- func (q * parquetQuerierWithFallback ) Select (ctx context.Context , sortSeries bool , hints * storage.SelectHints , matchers ... * labels.Matcher ) storage.SeriesSet {
361
+ func (q * parquetQuerierWithFallback ) Select (ctx context.Context , sortSeries bool , h * storage.SelectHints , matchers ... * labels.Matcher ) storage.SeriesSet {
360
362
userID , err := tenant .TenantID (ctx )
361
363
if err != nil {
362
364
storage .ErrSeriesSet (err )
@@ -366,68 +368,101 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
366
368
uLogger := util_log .WithUserID (userID , q .logger )
367
369
level .Warn (uLogger ).Log ("msg" , "parquet queryable enabled but vertical sharding > 1. Falling back to the block storage" )
368
370
369
- return q .blocksStoreQuerier .Select (ctx , sortSeries , hints , matchers ... )
371
+ return q .blocksStoreQuerier .Select (ctx , sortSeries , h , matchers ... )
370
372
}
371
373
372
- mint , maxt , limit := q .minT , q .maxT , 0
374
+ hints := storage.SelectHints {
375
+ Start : q .minT ,
376
+ End : q .maxT ,
377
+ }
373
378
374
- if hints != nil {
379
+ mint , maxt , limit := q .minT , q .maxT , 0
380
+ if h != nil {
381
+ // let copy the hints here as we wanna potentially modify it
382
+ hints = * h
375
383
mint , maxt , limit = hints .Start , hints .End , hints .Limit
376
384
}
377
385
386
+ maxt = q .adjustMaxT (maxt )
387
+ hints .End = maxt
388
+
389
+ if maxt < mint {
390
+ return nil
391
+ }
392
+
378
393
remaining , parquet , err := q .getBlocks (ctx , mint , maxt )
394
+ defer q .incrementOpsMetric ("Select" , remaining , parquet )
395
+
379
396
if err != nil {
380
397
return storage .ErrSeriesSet (err )
381
398
}
382
399
383
- serieSets := []storage.SeriesSet {}
384
-
385
400
// Lets sort the series to merge
386
401
if len (parquet ) > 0 && len (remaining ) > 0 {
387
402
sortSeries = true
388
403
}
389
404
405
+ promises := make ([]chan storage.SeriesSet , 0 , 2 )
406
+
390
407
if len (parquet ) > 0 {
391
- serieSets = append (serieSets , q .parquetQuerier .Select (InjectBlocksIntoContext (ctx , parquet ... ), sortSeries , hints , matchers ... ))
408
+ p := make (chan storage.SeriesSet , 1 )
409
+ promises = append (promises , p )
410
+ go func () {
411
+ p <- q .parquetQuerier .Select (InjectBlocksIntoContext (ctx , parquet ... ), sortSeries , & hints , matchers ... )
412
+ }()
392
413
}
393
414
394
415
if len (remaining ) > 0 {
395
- serieSets = append (serieSets , q .blocksStoreQuerier .Select (InjectBlocksIntoContext (ctx , remaining ... ), sortSeries , hints , matchers ... ))
416
+ p := make (chan storage.SeriesSet , 1 )
417
+ promises = append (promises , p )
418
+ go func () {
419
+ p <- q .blocksStoreQuerier .Select (InjectBlocksIntoContext (ctx , remaining ... ), sortSeries , & hints , matchers ... )
420
+ }()
396
421
}
397
422
398
- if len (serieSets ) == 1 {
399
- return serieSets [0 ]
423
+ if len (promises ) == 1 {
424
+ return <- promises [0 ]
400
425
}
401
426
402
- return storage .NewMergeSeriesSet (serieSets , limit , storage .ChainedSeriesMerge )
403
- }
427
+ seriesSets := make ([]storage.SeriesSet , len (promises ))
428
+ for i , promise := range promises {
429
+ seriesSets [i ] = <- promise
430
+ }
404
431
405
- func (q * parquetQuerierWithFallback ) Close () error {
406
- mErr := multierror.MultiError {}
407
- mErr .Add (q .parquetQuerier .Close ())
408
- mErr .Add (q .blocksStoreQuerier .Close ())
409
- return mErr .Err ()
432
+ return storage .NewMergeSeriesSet (seriesSets , limit , storage .ChainedSeriesMerge )
410
433
}
411
434
412
- func (q * parquetQuerierWithFallback ) getBlocks ( ctx context. Context , minT , maxT int64 ) ([] * bucketindex. Block , [] * bucketindex. Block , error ) {
435
+ func (q * parquetQuerierWithFallback ) adjustMaxT ( maxt int64 ) int64 {
413
436
// If queryStoreAfter is enabled, we do manipulate the query maxt to query samples up until
414
437
// now - queryStoreAfter, because the most recent time range is covered by ingesters. This
415
438
// optimization is particularly important for the blocks storage because can be used to skip
416
439
// querying most recent not-compacted-yet blocks from the storage.
417
440
if q .queryStoreAfter > 0 {
418
441
now := time .Now ()
419
- maxT = min (maxT , util .TimeToMillis (now .Add (- q .queryStoreAfter )))
420
-
421
- if maxT < minT {
422
- return nil , nil , nil
423
- }
442
+ maxt = min (maxt , util .TimeToMillis (now .Add (- q .queryStoreAfter )))
424
443
}
444
+ return maxt
445
+ }
446
+
447
+ func (q * parquetQuerierWithFallback ) Close () error {
448
+ mErr := multierror.MultiError {}
449
+ mErr .Add (q .parquetQuerier .Close ())
450
+ mErr .Add (q .blocksStoreQuerier .Close ())
451
+ return mErr .Err ()
452
+ }
425
453
454
+ func (q * parquetQuerierWithFallback ) getBlocks (ctx context.Context , minT , maxT int64 ) ([]* bucketindex.Block , []* bucketindex.Block , error ) {
426
455
userID , err := tenant .TenantID (ctx )
427
456
if err != nil {
428
457
return nil , nil , err
429
458
}
430
459
460
+ maxT = q .adjustMaxT (maxT )
461
+
462
+ if maxT < minT {
463
+ return nil , nil , nil
464
+ }
465
+
431
466
blocks , _ , err := q .finder .GetBlocks (ctx , userID , minT , maxT )
432
467
if err != nil {
433
468
return nil , nil , err
@@ -446,17 +481,18 @@ func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT i
446
481
447
482
q .metrics .blocksQueriedTotal .WithLabelValues ("parquet" ).Add (float64 (len (parquetBlocks )))
448
483
q .metrics .blocksQueriedTotal .WithLabelValues ("tsdb" ).Add (float64 (len (remaining )))
484
+ return remaining , parquetBlocks , nil
485
+ }
449
486
487
+ func (q * parquetQuerierWithFallback ) incrementOpsMetric (method string , remaining []* bucketindex.Block , parquetBlocks []* bucketindex.Block ) {
450
488
switch {
451
489
case len (remaining ) > 0 && len (parquetBlocks ) > 0 :
452
- q .metrics .selectCount .WithLabelValues ("mixed" ).Inc ()
490
+ q .metrics .operationsTotal .WithLabelValues ("mixed" , method ).Inc ()
453
491
case len (remaining ) > 0 && len (parquetBlocks ) == 0 :
454
- q .metrics .selectCount .WithLabelValues ("tsdb" ).Inc ()
492
+ q .metrics .operationsTotal .WithLabelValues ("tsdb" , method ).Inc ()
455
493
case len (remaining ) == 0 && len (parquetBlocks ) > 0 :
456
- q .metrics .selectCount .WithLabelValues ("parquet" ).Inc ()
494
+ q .metrics .operationsTotal .WithLabelValues ("parquet" , method ).Inc ()
457
495
}
458
-
459
- return remaining , parquetBlocks , nil
460
496
}
461
497
462
498
type cacheInterface [T any ] interface {
0 commit comments