diff --git a/CHANGELOG.md b/CHANGELOG.md index f78f1b445c..413b6cdb7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ * [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769 * [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778 * [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780 +* [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/go.mod b/go.mod index 1f1609649c..73500b58df 100644 --- a/go.mod +++ b/go.mod @@ -82,7 +82,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20250606162055-b81ebb7e1b96 + github.com/prometheus-community/parquet-common v0.0.0-20250610002942-dfd72bae1309 github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.4 diff --git a/go.sum b/go.sum index f624ce4b8c..93a72df933 100644 --- a/go.sum +++ b/go.sum @@ -1543,8 +1543,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20250606162055-b81ebb7e1b96 h1:5EbDNJOxTWGpe6yzXdgcBCU63BRSrRAh0Q1oB5AVyoA= -github.com/prometheus-community/parquet-common v0.0.0-20250606162055-b81ebb7e1b96/go.mod h1:MwYpD+FKot7LWBMFaPS6FeM8oqo77u5erRlNkSSFPA0= +github.com/prometheus-community/parquet-common v0.0.0-20250610002942-dfd72bae1309 h1:xGnXldBSTFPopLYi7ce+kJb+A1h1mPTeF4SLlRTEek0= +github.com/prometheus-community/parquet-common v0.0.0-20250610002942-dfd72bae1309/go.mod h1:MwYpD+FKot7LWBMFaPS6FeM8oqo77u5erRlNkSSFPA0= github.com/prometheus-community/prom-label-proxy v0.11.0 h1:IO02WiiFMfcIqvjhwMbCYnDJiTNcSHBrkCGRQ/7KDd0= github.com/prometheus-community/prom-label-proxy v0.11.0/go.mod h1:lfvrG70XqsxWDrSh1843QXBG0fSg8EbIXmAo8xGsvw8= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index bd40b0a1c6..3da40e5695 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -186,19 +186,10 @@ func NewBlocksStoreQueryable( func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegateway.Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits BlocksStoreLimits, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) { var stores BlocksStoreSet - bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, gatewayCfg.HedgedRequest.GetHedgedRoundTripper(), "querier", logger, reg) + bucketClient, err := createCachingBucketClient(context.Background(), storageCfg, gatewayCfg.HedgedRequest.GetHedgedRoundTripper(), "querier", logger, reg) if err != nil { - return nil, errors.Wrap(err, "failed to create bucket client") + return nil, err } - - // Blocks finder doesn't use chunks, but we pass config for consistency. - matchers := cortex_tsdb.NewMatchers() - cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, matchers, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg)) - if err != nil { - return nil, errors.Wrap(err, "create caching bucket") - } - bucketClient = cachingBucket - // Create the blocks finder. var finder BlocksFinder if storageCfg.BucketStore.BucketIndex.Enabled { diff --git a/pkg/querier/bucket.go b/pkg/querier/bucket.go new file mode 100644 index 0000000000..e293d345dc --- /dev/null +++ b/pkg/querier/bucket.go @@ -0,0 +1,31 @@ +package querier + +import ( + "context" + "net/http" + + "github.com/go-kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/extprom" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" +) + +func createCachingBucketClient(ctx context.Context, storageCfg cortex_tsdb.BlocksStorageConfig, hedgedRoundTripper func(rt http.RoundTripper) http.RoundTripper, name string, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { + bucketClient, err := bucket.NewClient(ctx, storageCfg.Bucket, hedgedRoundTripper, name, logger, reg) + if err != nil { + return nil, errors.Wrap(err, "failed to create bucket client") + } + + // Blocks finder doesn't use chunks, but we pass config for consistency. + matchers := cortex_tsdb.NewMatchers() + cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, matchers, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": name}, reg)) + if err != nil { + return nil, errors.Wrap(err, "create caching bucket") + } + bucketClient = cachingBucket + return bucketClient, nil +} diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 3475e8cc7b..7542c148f1 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -115,11 +115,11 @@ func NewParquetQueryable( logger log.Logger, reg prometheus.Registerer, ) (storage.Queryable, error) { - bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, nil, "parquet-querier", logger, reg) - + bucketClient, err := createCachingBucketClient(context.Background(), storageCfg, nil, "parquet-querier", logger, reg) if err != nil { return nil, err } + manager, err := services.NewManager(blockStorageQueryable) if err != nil { return nil, err @@ -400,7 +400,7 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool hints.End = maxt if maxt < mint { - return nil + return storage.EmptySeriesSet() } remaining, parquet, err := q.getBlocks(ctx, mint, maxt) diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index c77d94cf88..36f5b54a9f 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -215,6 +215,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata cachingConfigured = true chunksCache = cache.NewTracingCache(chunksCache) cfg.CacheGetRange("chunks", chunksCache, matchers.GetChunksMatcher(), chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests) + cfg.CacheGetRange("parquet-chunks", chunksCache, matchers.GetParquetChunksMatcher(), chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests) } metadataCache, err := createMetadataCache("metadata-cache", &metadataConfig.MetadataCacheBackend, logger, reg) @@ -356,6 +357,7 @@ type Matchers struct { func NewMatchers() Matchers { matcherMap := make(map[string]func(string) bool) matcherMap["chunks"] = isTSDBChunkFile + matcherMap["parquet-chunks"] = isParquetChunkFile matcherMap["metafile"] = isMetaFile matcherMap["block-index"] = isBlockIndexFile matcherMap["bucket-index"] = isBucketIndexFiles @@ -375,6 +377,10 @@ func (m *Matchers) SetChunksMatcher(f func(string) bool) { m.matcherMap["chunks"] = f } +func (m *Matchers) SetParquetChunksMatcher(f func(string) bool) { + m.matcherMap["parquet-chunks"] = f +} + func (m *Matchers) SetBlockIndexMatcher(f func(string) bool) { m.matcherMap["block-index"] = f } @@ -399,6 +405,10 @@ func (m *Matchers) GetChunksMatcher() func(string) bool { return m.matcherMap["chunks"] } +func (m *Matchers) GetParquetChunksMatcher() func(string) bool { + return m.matcherMap["parquet-chunks"] +} + func (m *Matchers) GetMetafileMatcher() func(string) bool { return m.matcherMap["metafile"] } @@ -427,6 +437,8 @@ var chunksMatcher = regexp.MustCompile(`^.*/chunks/\d+$`) func isTSDBChunkFile(name string) bool { return chunksMatcher.MatchString(name) } +func isParquetChunkFile(name string) bool { return strings.HasSuffix(name, "chunks.parquet") } + func isMetaFile(name string) bool { return strings.HasSuffix(name, "/"+metadata.MetaFilename) || strings.HasSuffix(name, "/"+metadata.DeletionMarkFilename) || strings.HasSuffix(name, "/"+TenantDeletionMarkFile) } diff --git a/vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go b/vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go index 8f345fa0e1..53125fbba2 100644 --- a/vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go +++ b/vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go @@ -53,7 +53,7 @@ func (b *bReadAt) ReadAt(p []byte, off int64) (n int, err error) { return 0, err } defer func() { _ = rc.Close() }() - n, err = rc.Read(p) + n, err = io.ReadFull(rc, p) if err == io.EOF { err = nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index b2a6887e8e..7c8f5e6e40 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -848,7 +848,7 @@ github.com/pkg/errors # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250606162055-b81ebb7e1b96 +# github.com/prometheus-community/parquet-common v0.0.0-20250610002942-dfd72bae1309 ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/schema