Skip to content

Commit

Permalink
fix service dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Nov 25, 2024
1 parent 00d2092 commit 12d7d79
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 50 deletions.
9 changes: 8 additions & 1 deletion development/mimir-microservices-mode/config/mimir.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
multitenancy_enabled: false
cost_attribution_registry_path: "/usage-metrics"
cost_attribution_eviction_interval: 10m

distributor:
ha_tracker:
Expand Down Expand Up @@ -184,5 +186,10 @@ limits:
ha_replica_label: ha_replica
ha_max_clusters: 10

cost_attribution_labels: "instance"
max_cost_attribution_labels_per_user: 2
max_cost_attribution_cardinality_per_user: 100
cost_attribution_cooldown: 20m

runtime_config:
file: ./config/runtime.yaml
file: ./config/runtime.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
multitenancy_enabled: false
cost_attribution_registry_path: "/usage-metrics"
cost_attribution_eviction_interval: 10m

distributor:
pool:
Expand Down Expand Up @@ -180,5 +182,11 @@ limits:
ha_replica_label: ha_replica
ha_max_clusters: 10

cost_attribution_labels: "instance"
max_cost_attribution_labels_per_user: 2
max_cost_attribution_cardinality_per_user: 100
cost_attribution_cooldown: 20m

runtime_config:
file: ./config/runtime.yaml

11 changes: 9 additions & 2 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/server"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/mimir/pkg/alertmanager"
"github.com/grafana/mimir/pkg/alertmanager/alertmanagerpb"
"github.com/grafana/mimir/pkg/compactor"
Expand All @@ -43,6 +41,8 @@ import (
util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/validation"
"github.com/grafana/mimir/pkg/util/validation/exporter"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type ConfigHandler func(actualCfg interface{}, defaultCfg interface{}) http.HandlerFunc
Expand Down Expand Up @@ -278,6 +278,13 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, true, "GET")
}

// Function to register the usage metrics route
func (a *API) RegisterUsageMetricsRoute(customRegistryPath string, reg *prometheus.Registry) {
// Create a Prometheus HTTP handler for the custom registry
// Register the handler with the API's routing system
a.RegisterRoute(customRegistryPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), true, false, "GET")
}

// Ingester is defined as an interface to allow for alternative implementations
// of ingesters to be passed into the API.RegisterIngester() method.
type Ingester interface {
Expand Down
13 changes: 9 additions & 4 deletions pkg/costattribution/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,26 @@ type Manager struct {
// mu protects the trackersByUserID map
mtx sync.RWMutex
trackersByUserID map[string]*Tracker
reg *prometheus.Registry
}

// NewManager creates a new cost attribution manager. which is responsible for managing the cost attribution of series.
// It will clean up inactive series and update the cost attribution of series every 3 minutes.
func NewManager(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides) *Manager {
s := &Manager{
func NewManager(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, reg *prometheus.Registry) (*Manager, error) {
m := &Manager{
trackersByUserID: make(map[string]*Tracker),
limits: limits,
mtx: sync.RWMutex{},
inactiveTimeout: inactiveTimeout,
logger: logger,
reg: reg,
}

s.Service = services.NewTimerService(cleanupInterval, nil, s.iteration, nil).WithName("cost attribution manager")
return s
m.Service = services.NewTimerService(cleanupInterval, nil, m.iteration, nil).WithName("cost attribution manager")
if err := reg.Register(m); err != nil {
return nil, err
}
return m, nil
}

func (m *Manager) iteration(_ context.Context) error {
Expand Down
28 changes: 14 additions & 14 deletions pkg/costattribution/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/util/validation"
)
Expand Down Expand Up @@ -44,7 +43,12 @@ func getMockLimits(idx int) (*validation.Overrides, error) {
func newTestManager() *Manager {
logger := log.NewNopLogger()
limits, _ := getMockLimits(0)
return NewManager(5*time.Second, 10*time.Second, logger, limits)
reg := prometheus.NewRegistry()
manager, err := NewManager(5*time.Second, 10*time.Second, logger, limits, reg)
if err != nil {
panic(err)
}
return manager
}

func Test_NewManager(t *testing.T) {
Expand All @@ -63,8 +67,6 @@ func Test_EnabledForUser(t *testing.T) {

func Test_CreateDeleteTracker(t *testing.T) {
manager := newTestManager()
reg := prometheus.NewRegistry()
require.NoError(t, reg.Register(manager))

t.Run("Tracker existence and attributes", func(t *testing.T) {
user1Tracker := manager.TrackerForUser("user1")
Expand Down Expand Up @@ -92,7 +94,7 @@ func Test_CreateDeleteTracker(t *testing.T) {
# TYPE cortex_received_attributed_samples_total counter
cortex_received_attributed_samples_total{department="foo",service="dodo",tenant="user3",tracker="custom_attribution"} 1
`
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total", "cortex_received_attributed_samples_total"))
assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total", "cortex_received_attributed_samples_total"))
})

t.Run("Purge inactive attributions", func(t *testing.T) {
Expand All @@ -102,7 +104,7 @@ func Test_CreateDeleteTracker(t *testing.T) {
# TYPE cortex_discarded_attributed_samples_total counter
cortex_discarded_attributed_samples_total{reason="invalid-metrics-name",team="foo",tenant="user1",tracker="custom_attribution"} 1
`
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total"))
assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total"))
})

t.Run("Disabling user cost attribution", func(t *testing.T) {
Expand All @@ -115,7 +117,7 @@ func Test_CreateDeleteTracker(t *testing.T) {
# TYPE cortex_received_attributed_samples_total counter
cortex_received_attributed_samples_total{department="foo",service="dodo",tenant="user3",tracker="custom_attribution"} 1
`
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_received_attributed_samples_total"))
assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_received_attributed_samples_total"))
})

t.Run("Updating user cardinality and labels", func(t *testing.T) {
Expand All @@ -131,7 +133,7 @@ func Test_CreateDeleteTracker(t *testing.T) {
# TYPE cortex_discarded_attributed_samples_total counter
cortex_discarded_attributed_samples_total{feature="__missing__",reason="invalid-metrics-name",team="foo",tenant="user3",tracker="custom_attribution"} 1
`
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total"))
assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total"))
})

t.Run("When cost attribution get overflowed, all metrics are purged except overflow metrics", func(t *testing.T) {
Expand All @@ -144,14 +146,12 @@ func Test_CreateDeleteTracker(t *testing.T) {
# TYPE cortex_received_attributed_samples_total counter
cortex_received_attributed_samples_total{feature="__overflow__",team="__overflow__",tenant="user3",tracker="custom_attribution"} 2
`
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), "cortex_received_attributed_samples_total"))
assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_received_attributed_samples_total"))
})
}

func Test_PurgeInactiveAttributionsUntil(t *testing.T) {
manager := newTestManager()
reg := prometheus.NewRegistry()
require.NoError(t, reg.Register(manager))

// Simulate metrics for multiple users to set up initial state
manager.TrackerForUser("user1").IncrementReceivedSamples(labels.FromStrings("team", "foo"), 1, time.Unix(1, 0))
Expand All @@ -174,7 +174,7 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) {
metricNames := []string{
"cortex_discarded_attributed_samples_total",
}
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...))
assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), metricNames...))
})

t.Run("Purge after inactive timeout", func(t *testing.T) {
Expand All @@ -194,7 +194,7 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) {
metricNames := []string{
"cortex_discarded_attributed_samples_total",
}
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...))
assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), metricNames...))
})

t.Run("Purge all trackers", func(t *testing.T) {
Expand All @@ -209,6 +209,6 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) {
"cortex_discarded_attributed_samples_total",
"cortex_received_attributed_samples_total",
}
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(""), metricNames...))
assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(""), metricNames...))
})
}
12 changes: 2 additions & 10 deletions pkg/costattribution/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,23 +127,15 @@ func (t *Tracker) IncrementActiveSeries(lbs labels.Labels, now time.Time) {
return
}
vals := t.getKeyValues(lbs, now.Unix())
if t.isOverflow {
t.activeSeriesPerUserAttribution.WithLabelValues(overflowValue).Set(1)
} else {
t.activeSeriesPerUserAttribution.WithLabelValues(vals...).Inc()
}
t.activeSeriesPerUserAttribution.WithLabelValues(vals...).Inc()
}

func (t *Tracker) DecrementActiveSeries(lbs labels.Labels, now time.Time) {
if t == nil {
return
}
vals := t.getKeyValues(lbs, now.Unix())
if t.isOverflow {
t.activeSeriesPerUserAttribution.WithLabelValues(overflowValue).Set(1)
} else {
t.activeSeriesPerUserAttribution.WithLabelValues(vals...).Dec()
}
t.activeSeriesPerUserAttribution.WithLabelValues(vals...).Dec()
}

func (t *Tracker) IncrementDiscardedSamples(lbs labels.Labels, value float64, reason string, now time.Time) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2017,7 +2017,7 @@ func BenchmarkDistributor_Push(b *testing.B) {
{
state: "disabled",
customRegistry: nil,
cfg: func(limits *validation.Limits) {},
cfg: func(_ *validation.Limits) {},
},
}

Expand Down Expand Up @@ -2074,8 +2074,7 @@ func BenchmarkDistributor_Push(b *testing.B) {
// Initialize the cost attribution manager
var cam *costattribution.Manager
if caCase.customRegistry != nil {
cam = costattribution.NewManager(5*time.Second, 10*time.Second, nil, overrides)
err := caCase.customRegistry.Register(cam)
cam, err = costattribution.NewManager(5*time.Second, 10*time.Second, nil, overrides, caCase.customRegistry)
require.NoError(b, err)
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3645,8 +3645,7 @@ func BenchmarkIngesterPush(b *testing.B) {

var cam *costattribution.Manager
if caCase.customRegistry != nil {
cam = costattribution.NewManager(5*time.Second, 10*time.Second, nil, overrides)
err = caCase.customRegistry.Register(cam)
cam, err = costattribution.NewManager(5*time.Second, 10*time.Second, nil, overrides, caCase.customRegistry)
require.NoError(b, err)
}

Expand Down
25 changes: 11 additions & 14 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/prometheus/alertmanager/featurecontrol"
"github.com/prometheus/alertmanager/matchers/compat"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/config"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/rules"
Expand Down Expand Up @@ -481,7 +480,6 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) {

func (t *Mimir) initDistributor() (serv services.Service, err error) {
t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor, t.Registerer, t.Overrides)

return nil, nil
}

Expand Down Expand Up @@ -652,14 +650,11 @@ func (t *Mimir) initActiveGroupsCleanupService() (services.Service, error) {
func (t *Mimir) initCostAttributionService() (services.Service, error) {
// The cost attribution service is only initilized if the custom registry path is provided.
if t.Cfg.CostAttributionRegistryPath != "" {
t.CostAttributionManager = costattribution.NewManager(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides)
// if custom registry path is provided, create a custom registry and use it for cost attribution service
customRegistry := prometheus.NewRegistry()
// Register the custom registry with the provided URL.
// This allows users to expose custom metrics on a separate endpoint.
// This is useful when users want to expose metrics that are not part of the default Mimir metrics.
http.Handle(t.Cfg.CostAttributionRegistryPath, promhttp.HandlerFor(customRegistry, promhttp.HandlerOpts{Registry: customRegistry}))
err := customRegistry.Register(t.CostAttributionManager)
// If custom registry path is provided, create a custom registry and use it for cost attribution service only
reg := prometheus.NewRegistry()
var err error
t.CostAttributionManager, err = costattribution.NewManager(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides, reg)
t.API.RegisterUsageMetricsRoute(t.Cfg.CostAttributionRegistryPath, reg)
return t.CostAttributionManager, err
}
return nil, nil
Expand Down Expand Up @@ -696,6 +691,7 @@ func (t *Mimir) initIngester() (serv services.Service, err error) {
ing = ingester.NewIngesterActivityTracker(t.Ingester, t.ActivityTracker)
}
t.API.RegisterIngester(ing)

return nil, nil
}

Expand Down Expand Up @@ -1197,10 +1193,11 @@ func (t *Mimir) setupModuleManager() error {
IngesterPartitionRing: {MemberlistKV, IngesterRing, API},
Overrides: {RuntimeConfig},
OverridesExporter: {Overrides, MemberlistKV, Vault},
Distributor: {DistributorService, API, ActiveGroupsCleanupService, CostAttributionService, Vault},
DistributorService: {IngesterRing, IngesterPartitionRing, Overrides, Vault},
Ingester: {IngesterService, API, ActiveGroupsCleanupService, CostAttributionService, Vault},
IngesterService: {IngesterRing, IngesterPartitionRing, Overrides, RuntimeConfig, MemberlistKV},
Distributor: {DistributorService, API, ActiveGroupsCleanupService, Vault},
DistributorService: {IngesterRing, IngesterPartitionRing, Overrides, Vault, CostAttributionService},
CostAttributionService: {API, Overrides},
Ingester: {IngesterService, API, ActiveGroupsCleanupService, Vault},
IngesterService: {IngesterRing, IngesterPartitionRing, Overrides, RuntimeConfig, MemberlistKV, CostAttributionService},
Flusher: {Overrides, API},
Queryable: {Overrides, DistributorService, IngesterRing, IngesterPartitionRing, API, StoreQueryable, MemberlistKV},
Querier: {TenantFederation, Vault},
Expand Down

0 comments on commit 12d7d79

Please sign in to comment.