Skip to content

Introduce a regex tenant resolver #6713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
* [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680
* [FEATURE] Ruler: Add support for group labels. #6665
* [FEATURE] Query federation: Introduce a regex tenant resolver to allow regex in `X-Scope-OrgID` value. #6713
- Add an experimental `tenant-federation.regex-matcher-enabled` flag. If it enabled, user can input regex to `X-Scope-OrgId`, the matched tenantIDs are automatically involved. The user discovery is based on scanning block storage, so new users can get queries after uploading a block (generally 2h).
- Add an experimental `tenant-federation.user-sync-interval` flag, it specifies how frequently to scan users. The scanned users are used to calculate matched tenantIDs.
* [FEATURE] Experimental Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet and Parquet Queryable. #6716 #6743
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
Expand Down
16 changes: 16 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,22 @@ tenant_federation:
# CLI flag: -tenant-federation.max-tenant
[max_tenant: <int> | default = 0]

# [Experimental] If enabled, the `X-Scope-OrgID` header value can accept a
# regex and the matched tenantIDs are automatically involved. The regex
# matching rule follows the Prometheus, see the detail:
# https://prometheus.io/docs/prometheus/latest/querying/basics/#regular-expressions.
# The user discovery is based on scanning block storage, so new users can get
# queries after uploading a block (generally 2h).
# CLI flag: -tenant-federation.regex-matcher-enabled
[regex_matcher_enabled: <boolean> | default = false]

# [Experimental] If the regex matcher is enabled, it specifies how frequently
# to scan users. The scanned users are used to calculate matched tenantIDs.
# The scanning strategy depends on the
# `-blocks-storage.users-scanner.strategy`.
# CLI flag: -tenant-federation.user-sync-interval
[user_sync_interval: <duration> | default = 5m]

# The ruler_config configures the Cortex ruler.
[ruler: <ruler_config>]

Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ Currently experimental features are:
- The block deletion marks migration support in the compactor (`-compactor.block-deletion-marks-migration-enabled`) is temporarily and will be removed in future versions
- Blocks storage user index
- Querier: tenant federation
- `-tenant-federation.enabled`
- `-tenant-federation.regex-matcher-enabled`
- `-tenant-federation.user-sync-interval`
- The thanosconvert tool for converting Thanos block metadata to Cortex
- HA Tracker: cleanup of old replicas from KV Store.
- Instance limits in ingester and distributor
Expand Down
230 changes: 230 additions & 0 deletions integration/querier_tenant_federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,255 @@ type querierTenantFederationConfig struct {

func TestQuerierTenantFederation(t *testing.T) {
runQuerierTenantFederationTest(t, querierTenantFederationConfig{})
runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{})
}

func TestQuerierTenantFederationWithQueryScheduler(t *testing.T) {
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
querySchedulerEnabled: true,
})
runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{
querySchedulerEnabled: true,
})
}

func TestQuerierTenantFederationWithShuffleSharding(t *testing.T) {
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
shuffleShardingEnabled: true,
})
runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{
shuffleShardingEnabled: true,
})
}

func TestQuerierTenantFederationWithQuerySchedulerAndShuffleSharding(t *testing.T) {
runQuerierTenantFederationTest(t, querierTenantFederationConfig{
querySchedulerEnabled: true,
shuffleShardingEnabled: true,
})
runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{
querySchedulerEnabled: true,
shuffleShardingEnabled: true,
})
}

func TestRegexResolver_NewlyCreatedTenant(t *testing.T) {
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-querier.cache-results": "true",
"-querier.split-queries-by-interval": "24h",
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
"-tenant-federation.enabled": "true",
"-tenant-federation.regex-matcher-enabled": "true",

// to upload block quickly
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),

// store gateway
"-blocks-storage.bucket-store.sync-interval": blockRangePeriod.String(),
"-querier.max-fetched-series-per-query": "1",
})

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

// Start ingester and distributor.
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(ingester, distributor))

// Wait until distributor have updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Start the query-frontend.
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
require.NoError(t, s.Start(queryFrontend))

// Start the querier
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")

// Start queriers.
require.NoError(t, s.StartAndWaitReady(querier))
require.NoError(t, s.WaitReady(queryFrontend))

now := time.Now()
series, expectedVector := generateSeries("series_1", now)

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

result, err := c.Query("series_1", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
require.Equal(t, expectedVector, result.(model.Vector))
}

func runQuerierTenantFederationTest_UseRegexResolver(t *testing.T, cfg querierTenantFederationConfig) {
const numUsers = 10
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

memcached := e2ecache.NewMemcached()
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul, memcached))

flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-querier.cache-results": "true",
"-querier.split-queries-by-interval": "24h",
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-tenant-federation.enabled": "true",
"-tenant-federation.regex-matcher-enabled": "true",
"-tenant-federation.user-sync-interval": "1s",

// to upload block quickly
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),

// store gateway
"-blocks-storage.bucket-store.sync-interval": blockRangePeriod.String(),
"-querier.max-fetched-series-per-query": "1",
})

// Start the query-scheduler if enabled.
var queryScheduler *e2ecortex.CortexService
if cfg.querySchedulerEnabled {
queryScheduler = e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
require.NoError(t, s.StartAndWaitReady(queryScheduler))
flags["-frontend.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
flags["-querier.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint()
}

if cfg.shuffleShardingEnabled {
// Use only single querier for each user.
flags["-frontend.max-queriers-per-tenant"] = "1"
}

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

// Start ingester and distributor.
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(ingester, distributor))

// Wait until distributor have updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push a series for each user to Cortex.
now := time.Now()
expectedVectors := make([]model.Vector, numUsers)
tenantIDs := make([]string, numUsers)

for u := 0; u < numUsers; u++ {
tenantIDs[u] = fmt.Sprintf("user-%d", u)
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", tenantIDs[u])
require.NoError(t, err)

var series []prompb.TimeSeries
series, expectedVectors[u] = generateSeries("series_1", now)
// To ship series_1 block
series2, _ := generateSeries("series_2", now.Add(blockRangePeriod*2))

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c.Push(series2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
}

// Start the query-frontend.
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
require.NoError(t, s.Start(queryFrontend))

if !cfg.querySchedulerEnabled {
flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint()
}

// Start the querier and store-gateway
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")

var querier2 *e2ecortex.CortexService
if cfg.shuffleShardingEnabled {
querier2 = e2ecortex.NewQuerier("querier-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
}

// Start queriers.
require.NoError(t, s.StartAndWaitReady(querier, storeGateway))
require.NoError(t, s.WaitReady(queryFrontend))
if cfg.shuffleShardingEnabled {
require.NoError(t, s.StartAndWaitReady(querier2))
}

// Wait until the querier and store-gateway have updated ring
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total"))
if cfg.shuffleShardingEnabled {
require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total"))
}

// wait to upload blocks
require.NoError(t, ingester.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_ingester_shipper_uploads_total"}, e2e.WaitMissingMetrics))

// wait to update knownUsers
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics)
if cfg.shuffleShardingEnabled {
require.NoError(t, querier2.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics)
}

// query all tenants
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-.+")
require.NoError(t, err)

result, err := c.Query("series_1", now)
require.NoError(t, err)

assert.Equal(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector))

// ensure a push to multiple tenants is failing
series, _ := generateSeries("series_1", now)
res, err := c.Push(series)
require.NoError(t, err)

require.Equal(t, 500, res.StatusCode)

// check metric label values for total queries in the query frontend
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "user", "user-.+"),
labels.MustNewMatcher(labels.MatchEqual, "op", "query"))))

// check metric label values for query queue length in either query frontend or query scheduler
queueComponent := queryFrontend
queueMetricName := "cortex_query_frontend_queue_length"
if cfg.querySchedulerEnabled {
queueComponent = queryScheduler
queueMetricName = "cortex_query_scheduler_queue_length"
}
require.NoError(t, queueComponent.WaitSumMetricsWithOptions(e2e.Equals(0), []string{queueMetricName}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "user", "user-.+"))))
}

func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationConfig) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/cortexproject/cortex/pkg/scheduler"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storegateway"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/modules"
Expand Down Expand Up @@ -282,10 +283,30 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
// single tenant. This allows for a less impactful enabling of tenant
// federation.
byPassForSingleQuerier := true

t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer))
t.MetadataQuerier = tenantfederation.NewMetadataQuerier(t.MetadataQuerier, t.Cfg.TenantFederation.MaxConcurrent, prometheus.DefaultRegisterer)
t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer)

if t.Cfg.TenantFederation.RegexMatcherEnabled {
util_log.WarnExperimentalUse("tenant-federation.regex-matcher-enabled")

bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) {
return bucket.NewClient(ctx, t.Cfg.BlocksStorage.Bucket, nil, "regex-resolver", util_log.Logger, prometheus.DefaultRegisterer)
}

regexResolver, err := tenantfederation.NewRegexResolver(t.Cfg.BlocksStorage.UsersScanner, prometheus.DefaultRegisterer, bucketClientFactory, t.Cfg.TenantFederation.UserSyncInterval, util_log.Logger)
if err != nil {
return nil, fmt.Errorf("failed to initialize regex resolver: %v", err)
}
tenant.WithDefaultResolver(regexResolver)

return regexResolver, nil
}

return nil, nil
}

return nil, nil
}

Expand Down Expand Up @@ -497,6 +518,11 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)

if t.Cfg.TenantFederation.Enabled && t.Cfg.TenantFederation.RegexMatcherEnabled {
// If regex matcher enabled, we use regex validator to pass regex to the querier
tenant.WithDefaultResolver(tenantfederation.NewRegexValidator())
}

queryRangeMiddlewares, cache, err := queryrange.Middlewares(
t.Cfg.QueryRange,
util_log.Logger,
Expand Down Expand Up @@ -776,6 +802,11 @@ func (t *Cortex) initTenantDeletionAPI() (services.Service, error) {
}

func (t *Cortex) initQueryScheduler() (services.Service, error) {
if t.Cfg.TenantFederation.Enabled && t.Cfg.TenantFederation.RegexMatcherEnabled {
// If regex matcher enabled, we use regex validator to pass regex to the querier
tenant.WithDefaultResolver(tenantfederation.NewRegexValidator())
}

s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, errors.Wrap(err, "query-scheduler init")
Expand Down
Loading
Loading