diff --git a/integration/parquet_querier_test.go b/integration/parquet_querier_test.go index e860b6bd042..ca31a019c9a 100644 --- a/integration/parquet_querier_test.go +++ b/integration/parquet_querier_test.go @@ -20,6 +20,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/cortexproject/cortex/integration/e2e" + e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/cortexproject/cortex/integration/e2ecortex" "github.com/cortexproject/cortex/pkg/storage/bucket" @@ -35,7 +36,8 @@ func TestParquetFuzz(t *testing.T) { defer s.Close() consul := e2edb.NewConsulWithName("consul") - require.NoError(t, s.StartAndWaitReady(consul)) + memcached := e2ecache.NewMemcached() + require.NoError(t, s.StartAndWaitReady(consul, memcached)) baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) flags := mergeFlags( @@ -72,6 +74,11 @@ func TestParquetFuzz(t *testing.T) { "-parquet-converter.enabled": "true", // Querier "-querier.enable-parquet-queryable": "true", + // Enable cache for parquet labels and chunks + "-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + "-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), }, ) diff --git a/pkg/querier/bucket.go b/pkg/querier/bucket.go index e293d345dc7..4074abd5ec0 100644 --- a/pkg/querier/bucket.go +++ b/pkg/querier/bucket.go @@ -22,7 +22,7 @@ func createCachingBucketClient(ctx context.Context, storageCfg cortex_tsdb.Block // Blocks finder doesn't use chunks, but we pass config for consistency. matchers := cortex_tsdb.NewMatchers() - cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, matchers, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": name}, reg)) + cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, storageCfg.BucketStore.ParquetLabelsCache, matchers, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": name}, reg)) if err != nil { return nil, errors.Wrap(err, "create caching bucket") } diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index 8be0411f717..278fd097c26 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -186,7 +186,39 @@ func (cfg *MetadataCacheConfig) Validate() error { return cfg.BucketCacheBackend.Validate() } -func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { +type ParquetLabelsCacheConfig struct { + BucketCacheBackend `yaml:",inline"` + + SubrangeSize int64 `yaml:"subrange_size"` + MaxGetRangeRequests int `yaml:"max_get_range_requests"` + AttributesTTL time.Duration `yaml:"attributes_ttl"` + SubrangeTTL time.Duration `yaml:"subrange_ttl"` +} + +func (cfg *ParquetLabelsCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("The parquet labels cache backend type. Single or Multiple cache backend can be provided. "+ + "Supported values in single cache: %s, %s, %s, and '' (disable). "+ + "Supported values in multi level cache: a comma-separated list of (%s)", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory, strings.Join(supportedBucketCacheBackends, ", "))) + + cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.") + cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.") + cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.", "parquet-labels") + cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.") + + f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.") + f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching parquet labels file. Zero or negative value = unlimited number of sub-requests.") + f.DurationVar(&cfg.AttributesTTL, prefix+"attributes-ttl", 168*time.Hour, "TTL for caching object attributes for parquet labels file.") + f.DurationVar(&cfg.SubrangeTTL, prefix+"subrange-ttl", 24*time.Hour, "TTL for caching individual subranges.") + + // In the multi level parquet labels cache, backfill TTL follows subrange TTL + cfg.MultiLevel.BackFillTTL = cfg.SubrangeTTL +} + +func (cfg *ParquetLabelsCacheConfig) Validate() error { + return cfg.BucketCacheBackend.Validate() +} + +func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, parquetLabelsConfig ParquetLabelsCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { cfg := cache.NewCachingBucketConfig() cachingConfigured := false @@ -221,6 +253,16 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata cfg.CacheIter("chunks-iter", metadataCache, matchers.GetChunksIterMatcher(), metadataConfig.ChunksListTTL, codec, "") } + parquetLabelsCache, err := createBucketCache("parquet-labels-cache", &parquetLabelsConfig.BucketCacheBackend, logger, reg) + if err != nil { + return nil, errors.Wrapf(err, "parquet-labels-cache") + } + if parquetLabelsCache != nil { + cachingConfigured = true + parquetLabelsCache = cache.NewTracingCache(parquetLabelsCache) + cfg.CacheGetRange("parquet-labels", parquetLabelsCache, matchers.GetParquetLabelsMatcher(), parquetLabelsConfig.SubrangeSize, parquetLabelsConfig.AttributesTTL, parquetLabelsConfig.SubrangeTTL, parquetLabelsConfig.MaxGetRangeRequests) + } + if !cachingConfigured { // No caching is configured. return bkt, nil @@ -316,6 +358,7 @@ func NewMatchers() Matchers { matcherMap := make(map[string]func(string) bool) matcherMap["chunks"] = isTSDBChunkFile matcherMap["parquet-chunks"] = isParquetChunkFile + matcherMap["parquet-labels"] = isParquetLabelsFile matcherMap["metafile"] = isMetaFile matcherMap["block-index"] = isBlockIndexFile matcherMap["bucket-index"] = isBucketIndexFiles @@ -339,6 +382,10 @@ func (m *Matchers) SetParquetChunksMatcher(f func(string) bool) { m.matcherMap["parquet-chunks"] = f } +func (m *Matchers) SetParquetLabelsMatcher(f func(string) bool) { + m.matcherMap["parquet-labels"] = f +} + func (m *Matchers) SetBlockIndexMatcher(f func(string) bool) { m.matcherMap["block-index"] = f } @@ -367,6 +414,10 @@ func (m *Matchers) GetParquetChunksMatcher() func(string) bool { return m.matcherMap["parquet-chunks"] } +func (m *Matchers) GetParquetLabelsMatcher() func(string) bool { + return m.matcherMap["parquet-labels"] +} + func (m *Matchers) GetMetafileMatcher() func(string) bool { return m.matcherMap["metafile"] } @@ -397,6 +448,8 @@ func isTSDBChunkFile(name string) bool { return chunksMatcher.MatchString(name) func isParquetChunkFile(name string) bool { return strings.HasSuffix(name, "chunks.parquet") } +func isParquetLabelsFile(name string) bool { return strings.HasSuffix(name, "labels.parquet") } + func isMetaFile(name string) bool { return strings.HasSuffix(name, "/"+metadata.MetaFilename) || strings.HasSuffix(name, "/"+metadata.DeletionMarkFilename) || strings.HasSuffix(name, "/"+TenantDeletionMarkFile) } diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 4e0af8fde7f..7b92fe0d887 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -274,23 +274,24 @@ func (cfg *TSDBConfig) IsBlocksShippingEnabled() bool { // BucketStoreConfig holds the config information for Bucket Stores used by the querier and store-gateway. type BucketStoreConfig struct { - SyncDir string `yaml:"sync_dir"` - SyncInterval time.Duration `yaml:"sync_interval"` - MaxConcurrent int `yaml:"max_concurrent"` - MaxInflightRequests int `yaml:"max_inflight_requests"` - TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"` - BlockSyncConcurrency int `yaml:"block_sync_concurrency"` - MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` - ConsistencyDelay time.Duration `yaml:"consistency_delay"` - IndexCache IndexCacheConfig `yaml:"index_cache"` - ChunksCache ChunksCacheConfig `yaml:"chunks_cache"` - MetadataCache MetadataCacheConfig `yaml:"metadata_cache"` - MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"` - IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"` - IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"` - IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"` - BucketIndex BucketIndexConfig `yaml:"bucket_index"` - BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"` + SyncDir string `yaml:"sync_dir"` + SyncInterval time.Duration `yaml:"sync_interval"` + MaxConcurrent int `yaml:"max_concurrent"` + MaxInflightRequests int `yaml:"max_inflight_requests"` + TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"` + BlockSyncConcurrency int `yaml:"block_sync_concurrency"` + MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` + ConsistencyDelay time.Duration `yaml:"consistency_delay"` + IndexCache IndexCacheConfig `yaml:"index_cache"` + ChunksCache ChunksCacheConfig `yaml:"chunks_cache"` + MetadataCache MetadataCacheConfig `yaml:"metadata_cache"` + ParquetLabelsCache ParquetLabelsCacheConfig `yaml:"parquet_labels_cache" doc:"hidden"` + MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"` + IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"` + IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"` + IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"` + BucketIndex BucketIndexConfig `yaml:"bucket_index"` + BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"` // Chunk pool. MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` @@ -348,6 +349,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { cfg.IndexCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.index-cache.") cfg.ChunksCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.chunks-cache.") cfg.MetadataCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.metadata-cache.") + cfg.ParquetLabelsCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.parquet-labels-cache.") cfg.BucketIndex.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.bucket-index.") f.StringVar(&cfg.SyncDir, "blocks-storage.bucket-store.sync-dir", "tsdb-sync", "Directory to store synchronized TSDB index headers.") @@ -403,6 +405,10 @@ func (cfg *BucketStoreConfig) Validate() error { if err != nil { return errors.Wrap(err, "metadata-cache configuration") } + err = cfg.ParquetLabelsCache.Validate() + if err != nil { + return errors.Wrap(err, "parquet-labels-cache configuration") + } if !util.StringsContain(supportedBlockDiscoveryStrategies, cfg.BlockDiscoveryStrategy) { return ErrInvalidBucketIndexBlockDiscoveryStrategy } diff --git a/pkg/storage/tsdb/multilevel_bucket_cache.go b/pkg/storage/tsdb/multilevel_bucket_cache.go index 8358c22ced0..83fcfddce78 100644 --- a/pkg/storage/tsdb/multilevel_bucket_cache.go +++ b/pkg/storage/tsdb/multilevel_bucket_cache.go @@ -67,6 +67,9 @@ func newMultiLevelBucketCache(name string, cfg MultiLevelBucketCacheConfig, reg case "metadata-cache": itemName = "metadata_cache" metricHelpText = "metadata cache" + case "parquet-labels-cache": + itemName = "parquet_labels_cache" + metricHelpText = "parquet labels cache" default: itemName = name } diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index c44c4f53a10..1c445d25024 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -101,7 +101,7 @@ var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many // NewBucketStores makes a new BucketStores. func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { matchers := tsdb.NewMatchers() - cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, matchers, bucketClient, logger, reg) + cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, tsdb.ParquetLabelsCacheConfig{}, matchers, bucketClient, logger, reg) if err != nil { return nil, errors.Wrapf(err, "create caching bucket") }