From 12d7d793a3670435ae24bbd0830562f2335ebee5 Mon Sep 17 00:00:00 2001 From: Ying WANG Date: Mon, 25 Nov 2024 18:10:35 +0100 Subject: [PATCH] fix service dependencies --- .../config/mimir.yaml | 9 +++++- .../config/mimir_override.yaml | 8 ++++++ pkg/api/api.go | 11 ++++++-- pkg/costattribution/manager.go | 13 ++++++--- pkg/costattribution/manager_test.go | 28 +++++++++---------- pkg/costattribution/tracker.go | 12 ++------ pkg/distributor/distributor_test.go | 5 ++-- pkg/ingester/ingester_test.go | 3 +- pkg/mimir/modules.go | 25 ++++++++--------- 9 files changed, 64 insertions(+), 50 deletions(-) diff --git a/development/mimir-microservices-mode/config/mimir.yaml b/development/mimir-microservices-mode/config/mimir.yaml index 5d245999115..09bc2c5a918 100644 --- a/development/mimir-microservices-mode/config/mimir.yaml +++ b/development/mimir-microservices-mode/config/mimir.yaml @@ -1,4 +1,6 @@ multitenancy_enabled: false +cost_attribution_registry_path: "/usage-metrics" +cost_attribution_eviction_interval: 10m distributor: ha_tracker: @@ -184,5 +186,10 @@ limits: ha_replica_label: ha_replica ha_max_clusters: 10 + cost_attribution_labels: "instance" + max_cost_attribution_labels_per_user: 2 + max_cost_attribution_cardinality_per_user: 100 + cost_attribution_cooldown: 20m + runtime_config: - file: ./config/runtime.yaml + file: ./config/runtime.yaml \ No newline at end of file diff --git a/development/mimir-microservices-mode/config/mimir_override.yaml b/development/mimir-microservices-mode/config/mimir_override.yaml index c7c9e8fd2c5..176894eadd6 100644 --- a/development/mimir-microservices-mode/config/mimir_override.yaml +++ b/development/mimir-microservices-mode/config/mimir_override.yaml @@ -1,4 +1,6 @@ multitenancy_enabled: false +cost_attribution_registry_path: "/usage-metrics" +cost_attribution_eviction_interval: 10m distributor: pool: @@ -180,5 +182,11 @@ limits: ha_replica_label: ha_replica ha_max_clusters: 10 + cost_attribution_labels: "instance" + max_cost_attribution_labels_per_user: 2 + max_cost_attribution_cardinality_per_user: 100 + cost_attribution_cooldown: 20m + runtime_config: file: ./config/runtime.yaml + diff --git a/pkg/api/api.go b/pkg/api/api.go index 4342a65bd52..286ea7c7e77 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -19,8 +19,6 @@ import ( "github.com/grafana/dskit/kv/memberlist" "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/server" - "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/mimir/pkg/alertmanager" "github.com/grafana/mimir/pkg/alertmanager/alertmanagerpb" "github.com/grafana/mimir/pkg/compactor" @@ -43,6 +41,8 @@ import ( util_log "github.com/grafana/mimir/pkg/util/log" "github.com/grafana/mimir/pkg/util/validation" "github.com/grafana/mimir/pkg/util/validation/exporter" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" ) type ConfigHandler func(actualCfg interface{}, defaultCfg interface{}) http.HandlerFunc @@ -278,6 +278,13 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, true, "GET") } +// Function to register the usage metrics route +func (a *API) RegisterUsageMetricsRoute(customRegistryPath string, reg *prometheus.Registry) { + // Create a Prometheus HTTP handler for the custom registry + // Register the handler with the API's routing system + a.RegisterRoute(customRegistryPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), true, false, "GET") +} + // Ingester is defined as an interface to allow for alternative implementations // of ingesters to be passed into the API.RegisterIngester() method. type Ingester interface { diff --git a/pkg/costattribution/manager.go b/pkg/costattribution/manager.go index e6ac57d3aac..b21b269f3f5 100644 --- a/pkg/costattribution/manager.go +++ b/pkg/costattribution/manager.go @@ -29,21 +29,26 @@ type Manager struct { // mu protects the trackersByUserID map mtx sync.RWMutex trackersByUserID map[string]*Tracker + reg *prometheus.Registry } // NewManager creates a new cost attribution manager. which is responsible for managing the cost attribution of series. // It will clean up inactive series and update the cost attribution of series every 3 minutes. -func NewManager(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides) *Manager { - s := &Manager{ +func NewManager(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, reg *prometheus.Registry) (*Manager, error) { + m := &Manager{ trackersByUserID: make(map[string]*Tracker), limits: limits, mtx: sync.RWMutex{}, inactiveTimeout: inactiveTimeout, logger: logger, + reg: reg, } - s.Service = services.NewTimerService(cleanupInterval, nil, s.iteration, nil).WithName("cost attribution manager") - return s + m.Service = services.NewTimerService(cleanupInterval, nil, m.iteration, nil).WithName("cost attribution manager") + if err := reg.Register(m); err != nil { + return nil, err + } + return m, nil } func (m *Manager) iteration(_ context.Context) error { diff --git a/pkg/costattribution/manager_test.go b/pkg/costattribution/manager_test.go index 03fe7b679a4..00f53a77d90 100644 --- a/pkg/costattribution/manager_test.go +++ b/pkg/costattribution/manager_test.go @@ -12,7 +12,6 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "github.com/grafana/mimir/pkg/util/validation" ) @@ -44,7 +43,12 @@ func getMockLimits(idx int) (*validation.Overrides, error) { func newTestManager() *Manager { logger := log.NewNopLogger() limits, _ := getMockLimits(0) - return NewManager(5*time.Second, 10*time.Second, logger, limits) + reg := prometheus.NewRegistry() + manager, err := NewManager(5*time.Second, 10*time.Second, logger, limits, reg) + if err != nil { + panic(err) + } + return manager } func Test_NewManager(t *testing.T) { @@ -63,8 +67,6 @@ func Test_EnabledForUser(t *testing.T) { func Test_CreateDeleteTracker(t *testing.T) { manager := newTestManager() - reg := prometheus.NewRegistry() - require.NoError(t, reg.Register(manager)) t.Run("Tracker existence and attributes", func(t *testing.T) { user1Tracker := manager.TrackerForUser("user1") @@ -92,7 +94,7 @@ func Test_CreateDeleteTracker(t *testing.T) { # TYPE cortex_received_attributed_samples_total counter cortex_received_attributed_samples_total{department="foo",service="dodo",tenant="user3",tracker="custom_attribution"} 1 ` - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total", "cortex_received_attributed_samples_total")) + assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total", "cortex_received_attributed_samples_total")) }) t.Run("Purge inactive attributions", func(t *testing.T) { @@ -102,7 +104,7 @@ func Test_CreateDeleteTracker(t *testing.T) { # TYPE cortex_discarded_attributed_samples_total counter cortex_discarded_attributed_samples_total{reason="invalid-metrics-name",team="foo",tenant="user1",tracker="custom_attribution"} 1 ` - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total")) + assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total")) }) t.Run("Disabling user cost attribution", func(t *testing.T) { @@ -115,7 +117,7 @@ func Test_CreateDeleteTracker(t *testing.T) { # TYPE cortex_received_attributed_samples_total counter cortex_received_attributed_samples_total{department="foo",service="dodo",tenant="user3",tracker="custom_attribution"} 1 ` - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_received_attributed_samples_total")) + assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_received_attributed_samples_total")) }) t.Run("Updating user cardinality and labels", func(t *testing.T) { @@ -131,7 +133,7 @@ func Test_CreateDeleteTracker(t *testing.T) { # TYPE cortex_discarded_attributed_samples_total counter cortex_discarded_attributed_samples_total{feature="__missing__",reason="invalid-metrics-name",team="foo",tenant="user3",tracker="custom_attribution"} 1 ` - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total")) + assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total")) }) t.Run("When cost attribution get overflowed, all metrics are purged except overflow metrics", func(t *testing.T) { @@ -144,14 +146,12 @@ func Test_CreateDeleteTracker(t *testing.T) { # TYPE cortex_received_attributed_samples_total counter cortex_received_attributed_samples_total{feature="__overflow__",team="__overflow__",tenant="user3",tracker="custom_attribution"} 2 ` - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_received_attributed_samples_total")) + assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_received_attributed_samples_total")) }) } func Test_PurgeInactiveAttributionsUntil(t *testing.T) { manager := newTestManager() - reg := prometheus.NewRegistry() - require.NoError(t, reg.Register(manager)) // Simulate metrics for multiple users to set up initial state manager.TrackerForUser("user1").IncrementReceivedSamples(labels.FromStrings("team", "foo"), 1, time.Unix(1, 0)) @@ -174,7 +174,7 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) { metricNames := []string{ "cortex_discarded_attributed_samples_total", } - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) + assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), metricNames...)) }) t.Run("Purge after inactive timeout", func(t *testing.T) { @@ -194,7 +194,7 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) { metricNames := []string{ "cortex_discarded_attributed_samples_total", } - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) + assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), metricNames...)) }) t.Run("Purge all trackers", func(t *testing.T) { @@ -209,6 +209,6 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) { "cortex_discarded_attributed_samples_total", "cortex_received_attributed_samples_total", } - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(""), metricNames...)) + assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(""), metricNames...)) }) } diff --git a/pkg/costattribution/tracker.go b/pkg/costattribution/tracker.go index c28ce2badd4..134fb48c9b6 100644 --- a/pkg/costattribution/tracker.go +++ b/pkg/costattribution/tracker.go @@ -127,11 +127,7 @@ func (t *Tracker) IncrementActiveSeries(lbs labels.Labels, now time.Time) { return } vals := t.getKeyValues(lbs, now.Unix()) - if t.isOverflow { - t.activeSeriesPerUserAttribution.WithLabelValues(overflowValue).Set(1) - } else { - t.activeSeriesPerUserAttribution.WithLabelValues(vals...).Inc() - } + t.activeSeriesPerUserAttribution.WithLabelValues(vals...).Inc() } func (t *Tracker) DecrementActiveSeries(lbs labels.Labels, now time.Time) { @@ -139,11 +135,7 @@ func (t *Tracker) DecrementActiveSeries(lbs labels.Labels, now time.Time) { return } vals := t.getKeyValues(lbs, now.Unix()) - if t.isOverflow { - t.activeSeriesPerUserAttribution.WithLabelValues(overflowValue).Set(1) - } else { - t.activeSeriesPerUserAttribution.WithLabelValues(vals...).Dec() - } + t.activeSeriesPerUserAttribution.WithLabelValues(vals...).Dec() } func (t *Tracker) IncrementDiscardedSamples(lbs labels.Labels, value float64, reason string, now time.Time) { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 2850bdef553..32fec0f55cf 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2017,7 +2017,7 @@ func BenchmarkDistributor_Push(b *testing.B) { { state: "disabled", customRegistry: nil, - cfg: func(limits *validation.Limits) {}, + cfg: func(_ *validation.Limits) {}, }, } @@ -2074,8 +2074,7 @@ func BenchmarkDistributor_Push(b *testing.B) { // Initialize the cost attribution manager var cam *costattribution.Manager if caCase.customRegistry != nil { - cam = costattribution.NewManager(5*time.Second, 10*time.Second, nil, overrides) - err := caCase.customRegistry.Register(cam) + cam, err = costattribution.NewManager(5*time.Second, 10*time.Second, nil, overrides, caCase.customRegistry) require.NoError(b, err) } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index bafb17e68d2..10214b119e2 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -3645,8 +3645,7 @@ func BenchmarkIngesterPush(b *testing.B) { var cam *costattribution.Manager if caCase.customRegistry != nil { - cam = costattribution.NewManager(5*time.Second, 10*time.Second, nil, overrides) - err = caCase.customRegistry.Register(cam) + cam, err = costattribution.NewManager(5*time.Second, 10*time.Second, nil, overrides, caCase.customRegistry) require.NoError(b, err) } diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 2ec2fa70323..20effe97d66 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -29,7 +29,6 @@ import ( "github.com/prometheus/alertmanager/featurecontrol" "github.com/prometheus/alertmanager/matchers/compat" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/config" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/rules" @@ -481,7 +480,6 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) { func (t *Mimir) initDistributor() (serv services.Service, err error) { t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor, t.Registerer, t.Overrides) - return nil, nil } @@ -652,14 +650,11 @@ func (t *Mimir) initActiveGroupsCleanupService() (services.Service, error) { func (t *Mimir) initCostAttributionService() (services.Service, error) { // The cost attribution service is only initilized if the custom registry path is provided. if t.Cfg.CostAttributionRegistryPath != "" { - t.CostAttributionManager = costattribution.NewManager(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides) - // if custom registry path is provided, create a custom registry and use it for cost attribution service - customRegistry := prometheus.NewRegistry() - // Register the custom registry with the provided URL. - // This allows users to expose custom metrics on a separate endpoint. - // This is useful when users want to expose metrics that are not part of the default Mimir metrics. - http.Handle(t.Cfg.CostAttributionRegistryPath, promhttp.HandlerFor(customRegistry, promhttp.HandlerOpts{Registry: customRegistry})) - err := customRegistry.Register(t.CostAttributionManager) + // If custom registry path is provided, create a custom registry and use it for cost attribution service only + reg := prometheus.NewRegistry() + var err error + t.CostAttributionManager, err = costattribution.NewManager(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides, reg) + t.API.RegisterUsageMetricsRoute(t.Cfg.CostAttributionRegistryPath, reg) return t.CostAttributionManager, err } return nil, nil @@ -696,6 +691,7 @@ func (t *Mimir) initIngester() (serv services.Service, err error) { ing = ingester.NewIngesterActivityTracker(t.Ingester, t.ActivityTracker) } t.API.RegisterIngester(ing) + return nil, nil } @@ -1197,10 +1193,11 @@ func (t *Mimir) setupModuleManager() error { IngesterPartitionRing: {MemberlistKV, IngesterRing, API}, Overrides: {RuntimeConfig}, OverridesExporter: {Overrides, MemberlistKV, Vault}, - Distributor: {DistributorService, API, ActiveGroupsCleanupService, CostAttributionService, Vault}, - DistributorService: {IngesterRing, IngesterPartitionRing, Overrides, Vault}, - Ingester: {IngesterService, API, ActiveGroupsCleanupService, CostAttributionService, Vault}, - IngesterService: {IngesterRing, IngesterPartitionRing, Overrides, RuntimeConfig, MemberlistKV}, + Distributor: {DistributorService, API, ActiveGroupsCleanupService, Vault}, + DistributorService: {IngesterRing, IngesterPartitionRing, Overrides, Vault, CostAttributionService}, + CostAttributionService: {API, Overrides}, + Ingester: {IngesterService, API, ActiveGroupsCleanupService, Vault}, + IngesterService: {IngesterRing, IngesterPartitionRing, Overrides, RuntimeConfig, MemberlistKV, CostAttributionService}, Flusher: {Overrides, API}, Queryable: {Overrides, DistributorService, IngesterRing, IngesterPartitionRing, API, StoreQueryable, MemberlistKV}, Querier: {TenantFederation, Vault},