From fa91187ba87d7b4f7e6d2085f2bce72e0a697c1d Mon Sep 17 00:00:00 2001 From: Ying WANG Date: Thu, 24 Oct 2024 15:32:24 +0200 Subject: [PATCH] udpate tests --- pkg/costattribution/manager_test.go | 274 ++++++++++++++++++ pkg/costattribution/tracker_test.go | 50 ++++ pkg/distributor/distributor_test.go | 6 +- pkg/distributor/validate_test.go | 12 +- .../activeseries/active_labels_test.go | 6 +- .../active_native_histogram_postings_test.go | 32 +- .../activeseries/active_postings_test.go | 21 +- .../activeseries/active_series_test.go | 205 +++++++------ pkg/ingester/ingester.go | 78 ++--- .../ingester_early_compaction_test.go | 2 +- pkg/ingester/ingester_ingest_storage_test.go | 2 +- pkg/ingester/ingester_test.go | 6 +- .../benchmarks/comparison_test.go | 2 +- 13 files changed, 515 insertions(+), 181 deletions(-) create mode 100644 pkg/costattribution/manager_test.go create mode 100644 pkg/costattribution/tracker_test.go diff --git a/pkg/costattribution/manager_test.go b/pkg/costattribution/manager_test.go new file mode 100644 index 00000000000..8bdf56b5bc5 --- /dev/null +++ b/pkg/costattribution/manager_test.go @@ -0,0 +1,274 @@ +package costattribution + +// func newTestManager() *Manager { +// logger := log.NewNopLogger() +// limits, _ := validation.NewOverrides(validation.Limits{}, validation.NewMockTenantLimits(map[string]*validation.Limits{ +// "user1": { +// MaxCostAttributionPerUser: 5, +// CostAttributionLabel: "team", +// }, +// "user2": { +// MaxCostAttributionPerUser: 2, +// CostAttributionLabel: "", +// }, +// "user3": { +// MaxCostAttributionPerUser: 2, +// CostAttributionLabel: "department", +// }, +// })) +// inactiveTimeout := 2 * time.Minute +// cooldownTimeout := 1 * time.Minute +// cleanupInterval := 1 * time.Minute +// return NewManager(cleanupInterval, inactiveTimeout, cooldownTimeout, logger, limits) +// } + +// func Test_NewManager(t *testing.T) { +// manager := newTestManager() +// assert.NotNil(t, manager, "Expected manager to be initialized") +// assert.NotNil(t, manager.attributionTracker, "Expected attribution tracker to be initialized") +// assert.Equal(t, "__overflow__", manager.invalidValue, "Expected invalidValue to be initialized") +// } + +// func Test_EnabledForUser(t *testing.T) { +// manager := newTestManager() +// assert.True(t, manager.EnabledForUser("user1"), "Expected cost attribution to be enabled for user1") +// assert.False(t, manager.EnabledForUser("user2"), "Expected cost attribution to be disabled for user2") +// assert.False(t, manager.EnabledForUser("user4"), "Expected cost attribution to be disabled for user4") +// } + +// func Test_UserAttributionLabel(t *testing.T) { +// manager := newTestManager() +// assert.Equal(t, "team", manager.UserAttributionLabel("user1")) +// assert.Equal(t, "", manager.UserAttributionLabel("user2")) +// assert.Equal(t, "department", manager.UserAttributionLabel("user3")) +// assert.Equal(t, 2, len(manager.attributionTracker.trackersByUserID)) +// assert.Equal(t, "team", manager.attributionTracker.trackersByUserID["user1"].trackedLabel) +// assert.Equal(t, "department", manager.attributionTracker.trackersByUserID["user3"].trackedLabel) +// } + +// func Test_UserAttributionLimit(t *testing.T) { +// manager := newTestManager() +// assert.Equal(t, 5, manager.UserAttributionLimit("user1")) +// assert.Equal(t, 0, manager.UserAttributionLimit("user2")) +// assert.Equal(t, 0, manager.UserAttributionLimit("user4")) +// } + +// func Test_UpdateAttributionTimestamp(t *testing.T) { +// manager := newTestManager() + +// lbls := labels.NewBuilder(labels.EmptyLabels()) +// tm1, tm2, tm3 := "bar", "foo", "baz" +// t.Run("Should update the timestamp when limit not reached for the user attribution", func(t *testing.T) { +// lbls.Set("department", tm1) +// isOutdated, result := manager.UpdateAttributionTimestamp("user3", "department", lbls.Labels(), time.Unix(0, 0)) +// assert.False(t, isOutdated, "Expected label to be the same as the one in the cache") +// assert.Equal(t, tm1, result, "Expected attribution to be returned since user is enabled for cost attribution, and limit is not reached") +// assert.NotNil(t, manager.attributionTracker.trackersByUserID["user3"].observed[tm1]) +// assert.Equal(t, int64(0), manager.attributionTracker.trackersByUserID["user3"].observed[tm1].Load()) + +// lbls.Set("department", tm2) +// isOutdated, result = manager.UpdateAttributionTimestamp("user3", "department", lbls.Labels(), time.Unix(1, 0)) +// assert.False(t, isOutdated) +// assert.Equal(t, tm2, result, "Expected attribution to be returned since user is enabled for cost attribution, and limit is not reached") +// assert.NotNil(t, manager.attributionTracker.trackersByUserID["user3"].observed[tm2]) +// assert.Equal(t, int64(1), manager.attributionTracker.trackersByUserID["user3"].observed[tm2].Load()) +// }) + +// t.Run("Should only update the timestamp of invalide when limit reached for the user attribution", func(t *testing.T) { +// lbls.Set("department", tm3) +// isOutdated, result := manager.UpdateAttributionTimestamp("user3", "department", lbls.Labels(), time.Unix(2, 0)) +// assert.False(t, isOutdated) +// assert.Equal(t, manager.invalidValue, result, "Expected invalidValue to be returned since user has reached the limit of cost attribution labels") +// assert.NotNil(t, manager.attributionTracker.trackersByUserID["user3"].observed[manager.invalidValue]) +// assert.Equal(t, int64(2), manager.attributionTracker.trackersByUserID["user3"].observed[manager.invalidValue].Load()) + +// lbls.Set("department", tm1) +// isOutdated, result = manager.UpdateAttributionTimestamp("user3", "department", lbls.Labels(), time.Unix(3, 0)) +// assert.False(t, isOutdated) +// assert.Equal(t, manager.invalidValue, result, "Expected invalidValue to be returned since user has reached the limit of cost attribution labels") +// assert.Equal(t, int64(3), manager.attributionTracker.trackersByUserID["user3"].observed[manager.invalidValue].Load()) +// }) +// } + +// func Test_SetActiveSeries(t *testing.T) { +// manager := newTestManager() +// reg := prometheus.NewRegistry() +// err := reg.Register(manager) +// require.NoError(t, err) +// userID := "user1" + +// lbls := labels.NewBuilder(labels.EmptyLabels()) + +// t.Run("Should set the active series gauge for the given user and attribution", func(t *testing.T) { +// lbls.Set("team", "foo") +// isOutdated, val := manager.UpdateAttributionTimestamp(userID, "team", lbls.Labels(), time.Unix(0, 0)) +// assert.False(t, isOutdated) +// manager.SetActiveSeries(userID, "team", val, 1.0) +// expectedMetrics := ` +// # HELP cortex_ingester_active_series_attribution The total number of active series per user and attribution. +// # TYPE cortex_ingester_active_series_attribution gauge +// cortex_ingester_active_series_attribution{team="foo",user="user1"} 1 +// ` +// metricNames := []string{ +// "cortex_ingester_active_series_attribution", +// } +// assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) +// }) + +// t.Run("Should set the active series gauge for all users and attributions enabled and ignore disabled user", func(t *testing.T) { +// userID = "user3" +// lbls.Set("department", "bar") +// isOutdated, val := manager.UpdateAttributionTimestamp(userID, "department", lbls.Labels(), time.Unix(0, 0)) +// assert.False(t, isOutdated) +// manager.SetActiveSeries(userID, "department", val, 2.0) + +// lbls.Set("department", "baz") +// isOutdated, val = manager.UpdateAttributionTimestamp(userID, "department", lbls.Labels(), time.Unix(1, 0)) +// assert.False(t, isOutdated) +// manager.SetActiveSeries(userID, "department", val, 3.0) + +// expectedMetrics := ` +// # HELP cortex_ingester_active_series_attribution The total number of active series per user and attribution. +// # TYPE cortex_ingester_active_series_attribution gauge +// cortex_ingester_active_series_attribution{department="bar",user="user3"} 2 +// cortex_ingester_active_series_attribution{department="baz",user="user3"} 3 +// cortex_ingester_active_series_attribution{team="foo",user="user1"} 1 +// ` +// metricNames := []string{ +// "cortex_ingester_active_series_attribution", +// } +// assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) +// }) + +// t.Run("Cleanup the active series gauge for the given user and attribution when cost attribution disabled", func(t *testing.T) { +// limits := manager.attributionTracker.limits +// defer func() { manager.attributionTracker.limits = limits }() +// userID = "user3" +// lbls.Set("department", "baz") + +// overrides, _ := validation.NewOverrides(validation.Limits{}, validation.NewMockTenantLimits(map[string]*validation.Limits{ +// userID: { +// MaxCostAttributionPerUser: 2, +// CostAttributionLabel: "", +// }, +// })) +// manager.attributionTracker.limits = overrides +// isOutdated, val := manager.UpdateAttributionTimestamp(userID, "department", lbls.Labels(), time.Unix(5, 0)) +// assert.False(t, isOutdated) +// manager.SetActiveSeries(userID, val, "department", 3.0) + +// expectedMetrics := ` +// # HELP cortex_ingester_active_series_attribution The total number of active series per user and attribution. +// # TYPE cortex_ingester_active_series_attribution gauge +// cortex_ingester_active_series_attribution{team="foo",user="user1"} 1 +// ` +// metricNames := []string{ +// "cortex_ingester_active_series_attribution", +// } +// assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) +// }) + +// t.Run("Should ignore setting the active series gauge for disabled user", func(t *testing.T) { +// userID = "user2" +// lbls.Set("department", "bar") +// isOutdated, val := manager.UpdateAttributionTimestamp(userID, "department", lbls.Labels(), time.Unix(0, 0)) +// assert.False(t, isOutdated) +// manager.SetActiveSeries(userID, val, "department", 4.0) + +// expectedMetrics := ` +// # HELP cortex_ingester_active_series_attribution The total number of active series per user and attribution. +// # TYPE cortex_ingester_active_series_attribution gauge +// cortex_ingester_active_series_attribution{team="foo",user="user1"} 1 +// ` +// metricNames := []string{ +// "cortex_ingester_active_series_attribution", +// } +// assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) +// }) +// } + +// func TestUpdateAttributionTimestampForUser(t *testing.T) { +// cooldownTimeout := 10 * time.Second +// t.Run("Should not update the timestamp for the user if attribution lable is not set", func(t *testing.T) { +// // Create mock limits +// limiter, err := validation.NewOverrides(validation.Limits{CostAttributionLabel: "", MaxCostAttributionPerUser: 5}, nil) +// assert.NoError(t, err) +// trackerGroup := newAttributionTrackerGroup(limiter, cooldownTimeout) +// assert.NotNil(t, trackerGroup) + +// ts := time.Unix(1, 0) +// trackerGroup.updateAttributionCacheForUser("tenantA", "platform", "platformA", ts) +// trackerGroup.updateAttributionCacheForUser("tenantB", "platform", "teamB", ts) + +// assert.Equal(t, 0, len(trackerGroup.trackersByUserID)) +// }) + +// t.Run("Should not update the timestamp for the user if max cost attribution per user is 0", func(t *testing.T) { +// // Create mock limits +// limiter, err := validation.NewOverrides(validation.Limits{CostAttributionLabel: "platform", MaxCostAttributionPerUser: 0}, nil) +// assert.NoError(t, err) + +// trackerGroup := newAttributionTrackerGroup(limiter, cooldownTimeout) +// assert.NotNil(t, trackerGroup) + +// ts := time.Unix(1, 0) +// trackerGroup.updateAttributionCacheForUser("tenantA", "platform", "platformA", ts) +// trackerGroup.updateAttributionCacheForUser("tenantB", "platform", "teamB", ts) + +// assert.Equal(t, 0, len(trackerGroup.trackersByUserID)) +// }) + +// t.Run("Should update the timestamp for the user attribution", func(t *testing.T) { +// // Create mock limits +// limiter, err := validation.NewOverrides(validation.Limits{CostAttributionLabel: "platform", MaxCostAttributionPerUser: 5}, nil) +// assert.NoError(t, err) + +// trackerGroup := newAttributionTrackerGroup(limiter, cooldownTimeout) +// assert.NotNil(t, trackerGroup) + +// ts := time.Unix(1, 0) +// trackerGroup.updateAttributionCacheForUser("tenantA", "platform", "fooA", ts) +// trackerGroup.updateAttributionCacheForUser("tenantB", "platform", "barA", ts) + +// assert.Equal(t, 2, len(trackerGroup.trackersByUserID)) +// fmt.Println(trackerGroup.trackersByUserID) +// assert.NotNil(t, trackerGroup.trackersByUserID["tenantA"]) +// assert.NotNil(t, trackerGroup.trackersByUserID["tenantA"].observed["fooA"]) +// assert.Equal(t, int64(1), trackerGroup.trackersByUserID["tenantA"].observed["fooA"].Load()) + +// trackerGroup.updateAttributionCacheForUser("tenantB", "platform", "barA", ts.Add(time.Second)) +// assert.Equal(t, int64(2), trackerGroup.trackersByUserID["tenantB"].observed["barA"].Load()) +// }) +// } + +// func TestUserAttributionLabel(t *testing.T) { +// cooldownTimeout := 10 * time.Second +// t.Run("Should return the cost attribution label for the user", func(t *testing.T) { +// // Create mock limits +// limiter, err := validation.NewOverrides(validation.Limits{CostAttributionLabel: "platform", MaxCostAttributionPerUser: 5}, nil) +// assert.NoError(t, err) + +// trackerGroup := newAttributionTrackerGroup(limiter, cooldownTimeout) +// assert.NotNil(t, trackerGroup) +// trackerGroup.updateAttributionCacheForUser("tenantA", "platform", "fooA", time.Unix(0, 0)) + +// assert.Equal(t, "platform", trackerGroup.getUserAttributionLabelFromCache("tenantA")) +// }) + +// t.Run("Should return the default cost attribution label for the user if it is in cache", func(t *testing.T) { +// // Create mock limits +// limiter, err := validation.NewOverrides(validation.Limits{CostAttributionLabel: "platform", MaxCostAttributionPerUser: 5}, nil) +// assert.NoError(t, err) + +// trackerGroup := newAttributionTrackerGroup(limiter, cooldownTimeout) +// assert.NotNil(t, trackerGroup) + +// assert.Equal(t, "platform", trackerGroup.getUserAttributionLabelFromCache("tenantA")) + +// // update the timestamp for the user, so cache is updated +// trackerGroup.updateAttributionCacheForUser("tenantA", "platform", "fooA", time.Unix(0, 0)) + +// // still read the cost attribution label from cache until cache is updated by timed service +// assert.Equal(t, "platform", trackerGroup.getUserAttributionLabelFromCache("tenantA")) +// }) +// } diff --git a/pkg/costattribution/tracker_test.go b/pkg/costattribution/tracker_test.go new file mode 100644 index 00000000000..38063880119 --- /dev/null +++ b/pkg/costattribution/tracker_test.go @@ -0,0 +1,50 @@ +package costattribution + +import ( + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_NewTracker(t *testing.T) { + reg := prometheus.NewRegistry() + + // Initialize a new Tracker + trackedLabel := []string{"platform"} + cat, err := newTracker(trackedLabel, 5) + require.NoError(t, err) + err = reg.Register(cat) + require.NoError(t, err) + + // Simulate some values in the metrics + vals := []string{"foo", "user1"} + cat.activeSeriesPerUserAttribution.WithLabelValues(vals...).Set(1.0) + cat.receivedSamplesAttribution.WithLabelValues(vals...).Add(5) + cat.discardedSampleAttribution.WithLabelValues(vals...).Add(2) + + expectedMetrics := ` + # HELP cortex_discarded_samples_attribution_total The total number of samples that were discarded per attribution. + # TYPE cortex_discarded_samples_attribution_total counter + cortex_discarded_samples_attribution_total{platform="foo",user="user1"} 2 + # HELP cortex_ingester_active_series_attribution The total number of active series per user and attribution. + # TYPE cortex_ingester_active_series_attribution gauge + cortex_ingester_active_series_attribution{platform="foo",user="user1"} 1 + # HELP cortex_received_samples_attribution_total The total number of samples that were received per attribution. + # TYPE cortex_received_samples_attribution_total counter + cortex_received_samples_attribution_total{platform="foo",user="user1"} 5 + ` + + metricNames := []string{ + "cortex_discarded_samples_attribution_total", + "cortex_received_samples_attribution_total", + "cortex_ingester_active_series_attribution", + } + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) + + // Clean the tracker for the user attribution + cat.cleanupTrackerAttribution(vals) +} diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index e71fbf24762..e826af69a67 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2035,7 +2035,7 @@ func BenchmarkDistributor_Push(b *testing.B) { require.NoError(b, err) // Start the distributor. - distributor, err := New(distributorCfg, clientConfig, overrides, nil, ingestersRing, nil, true, nil, log.NewNopLogger()) + distributor, err := New(distributorCfg, clientConfig, overrides, nil, nil, ingestersRing, nil, true, nil, log.NewNopLogger()) require.NoError(b, err) require.NoError(b, services.StartAndAwaitRunning(context.Background(), distributor)) @@ -5323,7 +5323,7 @@ func prepare(t testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []* require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - d, err := New(distributorCfg, clientConfig, overrides, nil, ingestersRing, partitionsRing, true, reg, log.NewNopLogger()) + d, err := New(distributorCfg, clientConfig, overrides, nil, nil, ingestersRing, partitionsRing, true, reg, log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, d)) t.Cleanup(func() { @@ -7957,7 +7957,7 @@ func TestCheckStartedMiddleware(t *testing.T) { overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) - distributor, err := New(distributorConfig, clientConfig, overrides, nil, ingestersRing, nil, true, nil, log.NewNopLogger()) + distributor, err := New(distributorConfig, clientConfig, overrides, nil, nil, ingestersRing, nil, true, nil, log.NewNopLogger()) require.NoError(t, err) ctx := user.InjectOrgID(context.Background(), "user") diff --git a/pkg/distributor/validate_test.go b/pkg/distributor/validate_test.go index 2ee061ded13..b2b83335088 100644 --- a/pkg/distributor/validate_test.go +++ b/pkg/distributor/validate_test.go @@ -55,6 +55,7 @@ func (vm validateMetadataCfg) MaxMetadataLength(_ string) int { } func TestValidateLabels(t *testing.T) { + ts := time.Now() reg := prometheus.NewPedanticRegistry() s := newSampleValidationMetrics(reg) @@ -154,7 +155,7 @@ func TestValidateLabels(t *testing.T) { nil, }, } { - err := validateLabels(s, cfg, userID, "custom label", mimirpb.FromMetricsToLabelAdapters(c.metric), c.skipLabelNameValidation) + err := validateLabels(s, cfg, userID, "custom label", mimirpb.FromMetricsToLabelAdapters(c.metric), c.skipLabelNameValidation, nil, ts) assert.Equal(t, c.err, err, "wrong error") } @@ -347,6 +348,7 @@ func TestValidateMetadata(t *testing.T) { } func TestValidateLabelDuplication(t *testing.T) { + ts := time.Now() var cfg validateLabelsCfg cfg.maxLabelNameLength = 10 cfg.maxLabelNamesPerSeries = 10 @@ -357,7 +359,7 @@ func TestValidateLabelDuplication(t *testing.T) { actual := validateLabels(newSampleValidationMetrics(nil), cfg, userID, "", []mimirpb.LabelAdapter{ {Name: model.MetricNameLabel, Value: "a"}, {Name: model.MetricNameLabel, Value: "b"}, - }, false) + }, false, nil, ts) expected := fmt.Errorf( duplicateLabelMsgFormat, model.MetricNameLabel, @@ -374,7 +376,7 @@ func TestValidateLabelDuplication(t *testing.T) { {Name: model.MetricNameLabel, Value: "a"}, {Name: "a", Value: "a"}, {Name: "a", Value: "a"}, - }, false) + }, false, nil, ts) expected = fmt.Errorf( duplicateLabelMsgFormat, "a", @@ -533,7 +535,7 @@ func TestMaxNativeHistorgramBuckets(t *testing.T) { cfg.maxNativeHistogramBuckets = limit ls := []mimirpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "a"}, {Name: "a", Value: "a"}} - _, err := validateSampleHistogram(metrics, model.Now(), cfg, "user-1", "group-1", ls, &h) + _, err := validateSampleHistogram(metrics, model.Now(), cfg, "user-1", "group-1", ls, &h, nil) if limit == 1 { require.Error(t, err) @@ -580,7 +582,7 @@ func TestInvalidNativeHistogramSchema(t *testing.T) { for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { hist.Schema = testCase.schema - _, err := validateSampleHistogram(metrics, model.Now(), cfg, "user-1", "group-1", labels, hist) + _, err := validateSampleHistogram(metrics, model.Now(), cfg, "user-1", "group-1", labels, hist, nil) require.Equal(t, testCase.expectedError, err) }) } diff --git a/pkg/ingester/activeseries/active_labels_test.go b/pkg/ingester/activeseries/active_labels_test.go index aa7f928d7dd..6fdf3e00bc4 100644 --- a/pkg/ingester/activeseries/active_labels_test.go +++ b/pkg/ingester/activeseries/active_labels_test.go @@ -41,7 +41,7 @@ func TestIsLabelValueActive(t *testing.T) { labels.FromStrings("a", "5"), } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl)) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), nil) memPostings := index.NewMemPostings() for i, l := range series { @@ -51,10 +51,10 @@ func TestIsLabelValueActive(t *testing.T) { // Update each series at a different time according to its index. for i := range allStorageRefs { - activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), -1) + activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), -1, nil) } - valid := activeSeries.Purge(mockedTime) + valid := activeSeries.Purge(mockedTime, nil) require.True(t, valid) result, err := IsLabelValueActive(ctx, reader, activeSeries, "a", "1") diff --git a/pkg/ingester/activeseries/active_native_histogram_postings_test.go b/pkg/ingester/activeseries/active_native_histogram_postings_test.go index 665f5787c61..2b95020c68d 100644 --- a/pkg/ingester/activeseries/active_native_histogram_postings_test.go +++ b/pkg/ingester/activeseries/active_native_histogram_postings_test.go @@ -26,7 +26,7 @@ func TestNativeHistogramPostings_Expand(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl)) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), nil) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -34,10 +34,10 @@ func TestNativeHistogramPostings_Expand(t *testing.T) { if i+1 == 3 || i+1 == 4 { buckets = 10 // Native histogram with 10 buckets. } - activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets) + activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets, nil) } - valid := activeSeries.Purge(mockedTime) + valid := activeSeries.Purge(mockedTime, nil) allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() require.True(t, valid) require.Equal(t, 2, allActive) @@ -62,7 +62,7 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl)) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), nil) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -70,10 +70,10 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) { if i == 2 || i == 3 { buckets = i * 10 // Native histogram with i*10 buckets. } - activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets) + activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets, nil) } - valid := activeSeries.Purge(mockedTime) + valid := activeSeries.Purge(mockedTime, nil) allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() require.True(t, valid) require.Equal(t, 5, allActive) @@ -106,17 +106,18 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl)) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), nil) + // Update each series at a different time according to its index. for i := range allStorageRefs { buckets := i * 10 if i+1 == 4 { buckets = -1 // Make ref==4 not a native histogram to check that Seek skips it. } - activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets) + activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets, nil) } - valid := activeSeries.Purge(mockedTime) + valid := activeSeries.Purge(mockedTime, nil) allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() require.True(t, valid) require.Equal(t, 2, allActive) @@ -145,14 +146,15 @@ func TestNativeHistogramPostings_Seek(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl)) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), nil) + // Update each series at a different time according to its index. for i := range allStorageRefs { buckets := i * 10 - activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets) + activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets, nil) } - valid := activeSeries.Purge(mockedTime) + valid := activeSeries.Purge(mockedTime, nil) allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() require.True(t, valid) require.Equal(t, 2, allActive) @@ -181,14 +183,14 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl)) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), nil) // Update each series at a different time according to its index. for i := range allStorageRefs { - activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), 10) + activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), 10, nil) } - valid := activeSeries.Purge(mockedTime) + valid := activeSeries.Purge(mockedTime, nil) allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() require.True(t, valid) require.Equal(t, 0, allActive) diff --git a/pkg/ingester/activeseries/active_postings_test.go b/pkg/ingester/activeseries/active_postings_test.go index a2345841d11..84c71634e72 100644 --- a/pkg/ingester/activeseries/active_postings_test.go +++ b/pkg/ingester/activeseries/active_postings_test.go @@ -26,13 +26,14 @@ func TestPostings_Expand(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl)) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), nil) + // Update each series at a different time according to its index. for i := range allStorageRefs { - activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), -1) + activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), -1, nil) } - valid := activeSeries.Purge(mockedTime) + valid := activeSeries.Purge(mockedTime, nil) allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() require.True(t, valid) require.Equal(t, 2, allActive) @@ -57,13 +58,14 @@ func TestPostings_Seek(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl)) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), nil) + // Update each series at a different time according to its index. for i := range allStorageRefs { - activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), -1) + activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), -1, nil) } - valid := activeSeries.Purge(mockedTime) + valid := activeSeries.Purge(mockedTime, nil) allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() require.True(t, valid) require.Equal(t, 2, allActive) @@ -88,13 +90,14 @@ func TestPostings_SeekToEnd(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl)) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), nil) + // Update each series at a different time according to its index. for i := range allStorageRefs { - activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), -1) + activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), -1, nil) } - valid := activeSeries.Purge(mockedTime) + valid := activeSeries.Purge(mockedTime, nil) allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers() require.True(t, valid) require.Equal(t, 0, allActive) diff --git a/pkg/ingester/activeseries/active_series_test.go b/pkg/ingester/activeseries/active_series_test.go index cf821c5bca5..af0cde2ff8f 100644 --- a/pkg/ingester/activeseries/active_series_test.go +++ b/pkg/ingester/activeseries/active_series_test.go @@ -38,9 +38,8 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { ref4, ls4 := storage.SeriesRef(4), labels.FromStrings("a", "4") ref5 := storage.SeriesRef(5) // will be used for ls1 again. - c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout) - - valid := c.Purge(time.Now()) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, nil) + valid := c.Purge(time.Now(), nil) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers() assert.Equal(t, 0, allActive) @@ -50,8 +49,8 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { assert.Equal(t, 0, allActiveBuckets) assert.Empty(t, activeMatchingBuckets) - c.UpdateSeries(ls1, ref1, time.Now(), -1) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls1, ref1, time.Now(), -1, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) @@ -62,8 +61,8 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { assert.Equal(t, 0, allActiveHistograms) assert.Equal(t, 0, allActiveBuckets) - c.UpdateSeries(ls1, ref1, time.Now(), -1) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls1, ref1, time.Now(), -1, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) @@ -74,8 +73,8 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { assert.Equal(t, 0, allActiveHistograms) assert.Equal(t, 0, allActiveBuckets) - c.UpdateSeries(ls2, ref2, time.Now(), -1) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls2, ref2, time.Now(), -1, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() assert.Equal(t, 2, allActive) @@ -86,8 +85,8 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { assert.Equal(t, 0, allActiveHistograms) assert.Equal(t, 0, allActiveBuckets) - c.UpdateSeries(ls3, ref3, time.Now(), 5) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls3, ref3, time.Now(), 5, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() assert.Equal(t, 3, allActive) @@ -98,8 +97,8 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { assert.Equal(t, 1, allActiveHistograms) assert.Equal(t, 5, allActiveBuckets) - c.UpdateSeries(ls4, ref4, time.Now(), 3) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls4, ref4, time.Now(), 3, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() assert.Equal(t, 4, allActive) @@ -111,8 +110,8 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { assert.Equal(t, 8, allActiveBuckets) // more buckets for a histogram - c.UpdateSeries(ls3, ref3, time.Now(), 7) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls3, ref3, time.Now(), 7, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() assert.Equal(t, 4, allActive) @@ -124,8 +123,8 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { assert.Equal(t, 10, allActiveBuckets) // changing a metric from histogram to float - c.UpdateSeries(ls4, ref4, time.Now(), -1) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls4, ref4, time.Now(), -1, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() assert.Equal(t, 4, allActive) @@ -150,7 +149,7 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { assert.Equal(t, 7, allActiveBuckets) // Doesn't change after purging. - valid = c.Purge(time.Now()) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() assert.Equal(t, 4, allActive) @@ -162,7 +161,7 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { assert.Equal(t, 7, allActiveBuckets) // ref5 is created with the same labelset as ls1, it shouldn't be accounted as different series. - c.UpdateSeries(ls1, ref5, time.Now(), -1) + c.UpdateSeries(ls1, ref5, time.Now(), -1, nil) allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() assert.Equal(t, 4, allActive) assert.Equal(t, 1, allActiveHistograms) @@ -173,7 +172,7 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { assert.Equal(t, 7, allActiveBuckets) // Doesn't change after purging. - valid = c.Purge(time.Now()) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, _, allActiveHistograms, _, allActiveBuckets, _ = c.ActiveWithMatchers() assert.Equal(t, 4, allActive) @@ -204,19 +203,19 @@ func TestActiveSeries_ContainsRef(t *testing.T) { for ttl := 1; ttl <= len(series); ttl++ { t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, nil) // Update each series with a different timestamp according to each index for i := 0; i < len(series); i++ { - c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1) + c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1, nil) } - c.purge(time.Unix(int64(ttl), 0)) + c.purge(time.Unix(int64(ttl), 0), nil) // The expected number of series is the total number of series minus the ttl // because the first ttl series should be purged exp := len(series) - (ttl) - valid := c.Purge(mockedTime) + valid := c.Purge(mockedTime, nil) assert.True(t, valid) allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(t, exp, allActive) @@ -231,7 +230,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) { func TestActiveSeries_UpdateSeries_WithMatchers(t *testing.T) { asm := asmodel.NewMatchers(MustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) - c := NewActiveSeries(asm, DefaultTimeout) + c := NewActiveSeries(asm, DefaultTimeout, nil) testUpdateSeries(t, c) } @@ -243,7 +242,7 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { ref5, ls5 := storage.SeriesRef(5), labels.FromStrings("a", "5") ref6 := storage.SeriesRef(6) // same as ls2 - valid := c.Purge(time.Now()) + valid := c.Purge(time.Now(), nil) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers() assert.Equal(t, 0, allActive) @@ -257,8 +256,8 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { assert.Equal(t, 0, allActiveHistograms) assert.Equal(t, 0, allActiveBuckets) - c.UpdateSeries(ls1, ref1, time.Now(), -1) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls1, ref1, time.Now(), -1, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) @@ -272,8 +271,8 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { assert.Equal(t, 0, allActiveHistograms) assert.Equal(t, 0, allActiveBuckets) - c.UpdateSeries(ls2, ref2, time.Now(), -1) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls2, ref2, time.Now(), -1, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 2, allActive) @@ -287,8 +286,8 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { assert.Equal(t, 0, allActiveHistograms) assert.Equal(t, 0, allActiveBuckets) - c.UpdateSeries(ls3, ref3, time.Now(), -1) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls3, ref3, time.Now(), -1, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 3, allActive) @@ -302,8 +301,8 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { assert.Equal(t, 0, allActiveHistograms) assert.Equal(t, 0, allActiveBuckets) - c.UpdateSeries(ls3, ref3, time.Now(), -1) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls3, ref3, time.Now(), -1, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 3, allActive) @@ -317,8 +316,8 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { assert.Equal(t, 0, allActiveHistograms) assert.Equal(t, 0, allActiveBuckets) - c.UpdateSeries(ls4, ref4, time.Now(), 3) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls4, ref4, time.Now(), 3, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 4, allActive) @@ -332,8 +331,8 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { assert.Equal(t, 1, allActiveHistograms) assert.Equal(t, 3, allActiveBuckets) - c.UpdateSeries(ls5, ref5, time.Now(), 5) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls5, ref5, time.Now(), 5, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 5, allActive) @@ -348,8 +347,8 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { assert.Equal(t, 8, allActiveBuckets) // changing a metric from float to histogram - c.UpdateSeries(ls3, ref3, time.Now(), 6) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls3, ref3, time.Now(), 6, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 5, allActive) @@ -364,8 +363,8 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { assert.Equal(t, 14, allActiveBuckets) // fewer (zero) buckets for a histogram - c.UpdateSeries(ls4, ref4, time.Now(), 0) - valid = c.Purge(time.Now()) + c.UpdateSeries(ls4, ref4, time.Now(), 0, nil) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 5, allActive) @@ -397,7 +396,7 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { assert.Equal(t, 11, allActiveBuckets) // Don't change after purging. - valid = c.Purge(time.Now()) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 5, allActive) @@ -412,7 +411,7 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { assert.Equal(t, 11, allActiveBuckets) // ls2 is pushed again, this time with ref6 - c.UpdateSeries(ls2, ref6, time.Now(), -1) + c.UpdateSeries(ls2, ref6, time.Now(), -1, nil) // Numbers don't change. allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 5, allActive) @@ -427,7 +426,7 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { assert.Equal(t, 11, allActiveBuckets) // Don't change after purging. - valid = c.Purge(time.Now()) + valid = c.Purge(time.Now(), nil) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets = c.ActiveWithMatchers() assert.Equal(t, 5, allActive) @@ -448,7 +447,7 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { func TestActiveSeries_UpdateSeries_Clear(t *testing.T) { asm := asmodel.NewMatchers(MustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) - c := NewActiveSeries(asm, DefaultTimeout) + c := NewActiveSeries(asm, DefaultTimeout, nil) testUpdateSeries(t, c) c.Clear() @@ -489,11 +488,11 @@ func TestActiveSeries_ShouldCorrectlyHandleHashCollisions(t *testing.T) { ls1, ls2 := labelsWithHashCollision() ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2) - c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout) - c.UpdateSeries(ls1, ref1, time.Now(), -1) - c.UpdateSeries(ls2, ref2, time.Now(), -1) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, nil) + c.UpdateSeries(ls1, ref1, time.Now(), -1, nil) + c.UpdateSeries(ls2, ref2, time.Now(), -1, nil) - valid := c.Purge(time.Now()) + valid := c.Purge(time.Now(), nil) assert.True(t, valid) allActive, _, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(t, 2, allActive) @@ -517,22 +516,22 @@ func TestActiveSeries_Purge_NoMatchers(t *testing.T) { for ttl := 1; ttl <= len(series); ttl++ { t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, nil) for i := 0; i < len(series); i++ { - c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1) + c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1, nil) } c.PostDeletion(map[chunks.HeadSeriesRef]labels.Labels{ deletedRef: deletedLabels, }) - c.purge(time.Unix(int64(ttl), 0)) + c.purge(time.Unix(int64(ttl), 0), nil) // call purge twice, just to hit "quick" path. It doesn't really do anything. - c.purge(time.Unix(int64(ttl), 0)) + c.purge(time.Unix(int64(ttl), 0), nil) exp := len(series) - (ttl) // Purge is not intended to purge - valid := c.Purge(mockedTime) + valid := c.Purge(mockedTime, nil) assert.True(t, valid) allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(t, exp, allActive) @@ -563,13 +562,13 @@ func TestActiveSeries_Purge_WithMatchers(t *testing.T) { t.Run(fmt.Sprintf("ttl=%d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(asm, 5*time.Minute) + c := NewActiveSeries(asm, 5*time.Minute, nil) exp := len(series) - ttl expMatchingSeries := 0 for i, s := range series { - c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1) + c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1, nil) // if this series is matching, and they're within the ttl tmp := asm.Matches(s) @@ -578,11 +577,11 @@ func TestActiveSeries_Purge_WithMatchers(t *testing.T) { } } - c.purge(time.Unix(int64(ttl), 0)) + c.purge(time.Unix(int64(ttl), 0), nil) // call purge twice, just to hit "quick" path. It doesn't really do anything. - c.purge(time.Unix(int64(ttl), 0)) + c.purge(time.Unix(int64(ttl), 0), nil) - valid := c.Purge(mockedTime) + valid := c.Purge(mockedTime, nil) assert.True(t, valid) allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(t, exp, allActive) @@ -596,28 +595,28 @@ func TestActiveSeries_PurgeOpt(t *testing.T) { ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2) currentTime := time.Now() - c := NewActiveSeries(&asmodel.Matchers{}, 59*time.Second) + c := NewActiveSeries(&asmodel.Matchers{}, 59*time.Second, nil) - c.UpdateSeries(ls1, ref1, currentTime.Add(-2*time.Minute), -1) - c.UpdateSeries(ls2, ref2, currentTime, -1) + c.UpdateSeries(ls1, ref1, currentTime.Add(-2*time.Minute), -1, nil) + c.UpdateSeries(ls2, ref2, currentTime, -1, nil) - valid := c.Purge(currentTime) + valid := c.Purge(currentTime, nil) assert.True(t, valid) allActive, _, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(t, 1, allActive) - c.UpdateSeries(ls1, ref1, currentTime.Add(-1*time.Minute), -1) - c.UpdateSeries(ls2, ref2, currentTime, -1) + c.UpdateSeries(ls1, ref1, currentTime.Add(-1*time.Minute), -1, nil) + c.UpdateSeries(ls2, ref2, currentTime, -1, nil) - valid = c.Purge(currentTime) + valid = c.Purge(currentTime, nil) assert.True(t, valid) allActive, _, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) // This will *not* update the series, since there is already newer timestamp. - c.UpdateSeries(ls2, ref2, currentTime.Add(-1*time.Minute), -1) + c.UpdateSeries(ls2, ref2, currentTime.Add(-1*time.Minute), -1, nil) - valid = c.Purge(currentTime) + valid = c.Purge(currentTime, nil) assert.True(t, valid) allActive, _, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) @@ -632,30 +631,30 @@ func TestActiveSeries_ReloadSeriesMatchers(t *testing.T) { asm := asmodel.NewMatchers(MustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~.*}`})) currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout) + c := NewActiveSeries(asm, DefaultTimeout, nil) - valid := c.Purge(currentTime) + valid := c.Purge(currentTime, nil) assert.True(t, valid) allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(t, 0, allActive) assert.Equal(t, []int{0}, activeMatching) - c.UpdateSeries(ls1, ref1, currentTime, -1) - valid = c.Purge(currentTime) + c.UpdateSeries(ls1, ref1, currentTime, -1, nil) + valid = c.Purge(currentTime, nil) assert.True(t, valid) allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) assert.Equal(t, []int{1}, activeMatching) c.ReloadMatchers(asm, currentTime) - valid = c.Purge(currentTime) + valid = c.Purge(currentTime, nil) assert.False(t, valid) // Adding timeout time to make Purge results valid. currentTime = currentTime.Add(DefaultTimeout) - c.UpdateSeries(ls1, ref1, currentTime, -1) - c.UpdateSeries(ls2, ref2, currentTime, -1) - valid = c.Purge(currentTime) + c.UpdateSeries(ls1, ref1, currentTime, -1, nil) + c.UpdateSeries(ls2, ref2, currentTime, -1, nil) + valid = c.Purge(currentTime, nil) assert.True(t, valid) allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 2, allActive) @@ -666,8 +665,8 @@ func TestActiveSeries_ReloadSeriesMatchers(t *testing.T) { // Adding timeout time to make Purge results valid. currentTime = currentTime.Add(DefaultTimeout) - c.UpdateSeries(ls3, ref3, currentTime, -1) - valid = c.Purge(currentTime) + c.UpdateSeries(ls3, ref3, currentTime, -1, nil) + valid = c.Purge(currentTime, nil) assert.True(t, valid) allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) @@ -681,8 +680,8 @@ func TestActiveSeries_ReloadSeriesMatchers(t *testing.T) { // Adding timeout time to make Purge results valid. currentTime = currentTime.Add(DefaultTimeout) - c.UpdateSeries(ls4, ref4, currentTime, -1) - valid = c.Purge(currentTime) + c.UpdateSeries(ls4, ref4, currentTime, -1, nil) + valid = c.Purge(currentTime, nil) assert.True(t, valid) allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) @@ -698,15 +697,15 @@ func TestActiveSeries_ReloadSeriesMatchers_LessMatchers(t *testing.T) { })) currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout) - valid := c.Purge(currentTime) + c := NewActiveSeries(asm, DefaultTimeout, nil) + valid := c.Purge(currentTime, nil) assert.True(t, valid) allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(t, 0, allActive) assert.Equal(t, []int{0, 0}, activeMatching) - c.UpdateSeries(ls1, ref1, currentTime, -1) - valid = c.Purge(currentTime) + c.UpdateSeries(ls1, ref1, currentTime, -1, nil) + valid = c.Purge(currentTime, nil) assert.True(t, valid) allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) @@ -717,10 +716,10 @@ func TestActiveSeries_ReloadSeriesMatchers_LessMatchers(t *testing.T) { })) c.ReloadMatchers(asm, currentTime) - c.purge(time.Time{}) + c.purge(time.Time{}, nil) // Adding timeout time to make Purge results valid. currentTime = currentTime.Add(DefaultTimeout) - valid = c.Purge(currentTime) + valid = c.Purge(currentTime, nil) assert.True(t, valid) allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 0, allActive) @@ -737,15 +736,15 @@ func TestActiveSeries_ReloadSeriesMatchers_SameSizeNewLabels(t *testing.T) { currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout) - valid := c.Purge(currentTime) + c := NewActiveSeries(asm, DefaultTimeout, nil) + valid := c.Purge(currentTime, nil) assert.True(t, valid) allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(t, 0, allActive) assert.Equal(t, []int{0, 0}, activeMatching) - c.UpdateSeries(ls1, ref1, currentTime, -1) - valid = c.Purge(currentTime) + c.UpdateSeries(ls1, ref1, currentTime, -1, nil) + valid = c.Purge(currentTime, nil) assert.True(t, valid) allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 1, allActive) @@ -757,11 +756,11 @@ func TestActiveSeries_ReloadSeriesMatchers_SameSizeNewLabels(t *testing.T) { })) c.ReloadMatchers(asm, currentTime) - c.purge(time.Time{}) + c.purge(time.Time{}, nil) // Adding timeout time to make Purge results valid. currentTime = currentTime.Add(DefaultTimeout) - valid = c.Purge(currentTime) + valid = c.Purge(currentTime, nil) assert.True(t, valid) allActive, activeMatching, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(t, 0, allActive) @@ -790,7 +789,7 @@ func benchmarkActiveSeriesUpdateSeriesConcurrency(b *testing.B, numSeries, numGo var ( // Run the active series tracker with an active timeout = 0 so that the Purge() will always // purge the series. - c = NewActiveSeries(&asmodel.Matchers{}, 0) + c = NewActiveSeries(&asmodel.Matchers{}, 0, nil) updateGroup = &sync.WaitGroup{} purgeGroup = &sync.WaitGroup{} start = make(chan struct{}) @@ -841,7 +840,7 @@ func benchmarkActiveSeriesUpdateSeriesConcurrency(b *testing.B, numSeries, numGo case <-stopPurge: return default: - c.Purge(future()) + c.Purge(future(), nil) } // Throttle, but keep high pressure from Purge(). @@ -928,10 +927,10 @@ func BenchmarkActiveSeries_UpdateSeries(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - c := NewActiveSeries(asm, DefaultTimeout) + c := NewActiveSeries(asm, DefaultTimeout, nil) for round := 0; round <= tt.nRounds; round++ { for ix := 0; ix < tt.nSeries; ix++ { - c.UpdateSeries(series[ix], refs[ix], time.Unix(0, now), -1) + c.UpdateSeries(series[ix], refs[ix], time.Unix(0, now), -1, nil) now++ } } @@ -953,7 +952,7 @@ func benchmarkPurge(b *testing.B, twice bool) { const numExpiresSeries = numSeries / 25 currentTime := time.Now() - c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, nil) series := [numSeries]labels.Labels{} refs := [numSeries]storage.SeriesRef{} @@ -968,13 +967,13 @@ func benchmarkPurge(b *testing.B, twice bool) { // Prepare series for ix, s := range series { if ix < numExpiresSeries { - c.UpdateSeries(s, refs[ix], currentTime.Add(-DefaultTimeout), -1) + c.UpdateSeries(s, refs[ix], currentTime.Add(-DefaultTimeout), -1, nil) } else { - c.UpdateSeries(s, refs[ix], currentTime, -1) + c.UpdateSeries(s, refs[ix], currentTime, -1, nil) } } - valid := c.Purge(currentTime) + valid := c.Purge(currentTime, nil) assert.True(b, valid) allActive, _, _, _, _, _ := c.ActiveWithMatchers() assert.Equal(b, numSeries, allActive) @@ -982,13 +981,13 @@ func benchmarkPurge(b *testing.B, twice bool) { // Purge is going to purge everything currentTime = currentTime.Add(DefaultTimeout) - valid = c.Purge(currentTime) + valid = c.Purge(currentTime, nil) assert.True(b, valid) allActive, _, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(b, numSeries-numExpiresSeries, allActive) if twice { - valid = c.Purge(currentTime) + valid = c.Purge(currentTime, nil) assert.True(b, valid) allActive, _, _, _, _, _ = c.ActiveWithMatchers() assert.Equal(b, numSeries-numExpiresSeries, allActive) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 0a0b7ea8eda..4c552d2456b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -770,6 +770,15 @@ func (i *Ingester) replaceMatchers(asm *asmodel.Matchers, userDB *userTSDB, now userDB.activeSeries.ReloadMatchers(asm, now) } +// getCATrackerForUser returns the cost attribution tracker for the user. +// If the cost attribution manager is nil or the user is not enabled for cost attribution, it returns nil. +func getCATrackerForUser(userID string, cam *costattribution.Manager) *costattribution.Tracker { + if cam == nil { + return nil + } + return cam.TrackerForUser(userID) +} + func (i *Ingester) updateActiveSeries(now time.Time) { for _, userID := range i.getTSDBUsers() { userDB := i.getTSDB(userID) @@ -778,7 +787,7 @@ func (i *Ingester) updateActiveSeries(now time.Time) { } newMatchersConfig := i.limits.ActiveSeriesCustomTrackersConfig(userID) - newCostAttributionTracker := i.costAttributionMgr.TrackerForUser(userID) + newCostAttributionTracker := getCATrackerForUser(userID, i.costAttributionMgr) if newMatchersConfig.String() != userDB.activeSeries.CurrentConfig().String() || newCostAttributionTracker != userDB.activeSeries.CurrentCostAttributionTracker() { i.replaceMatchers(asmodel.NewMatchers(newMatchersConfig), userDB, now) } @@ -1286,10 +1295,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre outOfOrderWindow time.Duration, minAppendTimeAvailable bool, minAppendTime int64) error { // Return true if handled as soft error, and we can ingest more series. // get the cost attribution value for the series - var caTracker *costattribution.Tracker - if i.costAttributionMgr != nil { - caTracker = i.costAttributionMgr.TrackerForUser(userID) - } + + cat := getCATrackerForUser(userID, i.costAttributionMgr) handleAppendError := func(err error, timestamp int64, labels []mimirpb.LabelAdapter) bool { stats.failedSamplesCount++ @@ -1300,8 +1307,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre // we actually ingested all samples which haven't failed. switch { case errors.Is(err, storage.ErrOutOfBounds): - if caTracker != nil { - caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleOutOfBounds, startAppend) + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleOutOfBounds, startAppend) } stats.sampleOutOfBoundsCount++ updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() softError { @@ -1310,8 +1317,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return true case errors.Is(err, storage.ErrOutOfOrderSample): - if caTracker != nil { - caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleOutOfOrder, startAppend) + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleOutOfOrder, startAppend) } stats.sampleOutOfOrderCount++ updateFirstPartial(i.errorSamplers.sampleOutOfOrder, func() softError { @@ -1320,8 +1327,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return true case errors.Is(err, storage.ErrTooOldSample): - if caTracker != nil { - caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleTooOld, startAppend) + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleTooOld, startAppend) } stats.sampleTooOldCount++ updateFirstPartial(i.errorSamplers.sampleTimestampTooOldOOOEnabled, func() softError { @@ -1330,8 +1337,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return true case errors.Is(err, globalerror.SampleTooFarInFuture): - if caTracker != nil { - caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleTooFarInFuture, startAppend) + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleTooFarInFuture, startAppend) } stats.sampleTooFarInFutureCount++ updateFirstPartial(i.errorSamplers.sampleTimestampTooFarInFuture, func() softError { @@ -1340,8 +1347,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return true case errors.Is(err, storage.ErrDuplicateSampleForTimestamp): - if caTracker != nil { - caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonNewValueForTimestamp, startAppend) + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonNewValueForTimestamp, startAppend) } stats.newValueForTimestampCount++ updateFirstPartial(i.errorSamplers.sampleDuplicateTimestamp, func() softError { @@ -1350,8 +1357,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return true case errors.Is(err, globalerror.MaxSeriesPerUser): - if caTracker != nil { - caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonPerUserSeriesLimit, startAppend) + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonPerUserSeriesLimit, startAppend) } stats.perUserSeriesLimitCount++ updateFirstPartial(i.errorSamplers.maxSeriesPerUserLimitExceeded, func() softError { @@ -1360,8 +1367,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return true case errors.Is(err, globalerror.MaxSeriesPerMetric): - if caTracker != nil { - caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonPerMetricSeriesLimit, startAppend) + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonPerMetricSeriesLimit, startAppend) } stats.perMetricSeriesLimitCount++ updateFirstPartial(i.errorSamplers.maxSeriesPerMetricLimitExceeded, func() softError { @@ -1371,8 +1378,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre // Map TSDB native histogram validation errors to soft errors. case errors.Is(err, histogram.ErrHistogramCountMismatch): - if caTracker != nil { - caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) } stats.invalidNativeHistogramCount++ updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { @@ -1380,8 +1387,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre }) return true case errors.Is(err, histogram.ErrHistogramCountNotBigEnough): - if caTracker != nil { - caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) } stats.invalidNativeHistogramCount++ updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { @@ -1389,8 +1396,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre }) return true case errors.Is(err, histogram.ErrHistogramNegativeBucketCount): - if caTracker != nil { - caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) } stats.invalidNativeHistogramCount++ updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { @@ -1398,8 +1405,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre }) return true case errors.Is(err, histogram.ErrHistogramSpanNegativeOffset): - if caTracker != nil { - caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) } stats.invalidNativeHistogramCount++ updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { @@ -1407,8 +1414,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre }) return true case errors.Is(err, histogram.ErrHistogramSpansBucketsMismatch): - if caTracker != nil { - caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) } stats.invalidNativeHistogramCount++ updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { @@ -1448,8 +1455,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre stats.failedSamplesCount += len(ts.Samples) + len(ts.Histograms) stats.sampleOutOfBoundsCount += len(ts.Samples) + len(ts.Histograms) - if caTracker != nil { - caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(len(ts.Samples)+len(ts.Histograms)), reasonSampleOutOfBounds, startAppend) + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(len(ts.Samples)+len(ts.Histograms)), reasonSampleOutOfBounds, startAppend) } var firstTimestamp int64 if len(ts.Samples) > 0 { @@ -1471,8 +1478,8 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre stats.failedSamplesCount += len(ts.Samples) stats.sampleOutOfBoundsCount += len(ts.Samples) - if caTracker != nil { - caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(len(ts.Samples)), reasonSampleOutOfBounds, startAppend) + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(len(ts.Samples)), reasonSampleOutOfBounds, startAppend) } firstTimestamp := ts.Samples[0].TimestampMs @@ -2683,10 +2690,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD ownedSeriedStateShardSize = i.ownedSeriesService.ringStrategy.shardSizeForUser(userID) } - var cat *costattribution.Tracker - if i.costAttributionMgr != nil { - cat = i.costAttributionMgr.TrackerForUser(userID) - } + cat := getCATrackerForUser(userID, i.costAttributionMgr) userDB := &userTSDB{ userID: userID, activeSeries: activeseries.NewActiveSeries( diff --git a/pkg/ingester/ingester_early_compaction_test.go b/pkg/ingester/ingester_early_compaction_test.go index b64e3586e96..0f75e94cb76 100644 --- a/pkg/ingester/ingester_early_compaction_test.go +++ b/pkg/ingester/ingester_early_compaction_test.go @@ -130,7 +130,7 @@ func TestIngester_compactBlocksToReduceInMemorySeries_ShouldTriggerCompactionOnl require.Len(t, listBlocksInDir(t, userBlocksDir), 0) // Use a trick to track all series we've written so far as "inactive". - ingester.getTSDB(userID).activeSeries.Purge(now.Add(30 * time.Minute)) + ingester.getTSDB(userID).activeSeries.Purge(now.Add(30*time.Minute), nil) // Pre-condition check. require.Equal(t, uint64(10), ingester.getTSDB(userID).Head().NumSeries()) diff --git a/pkg/ingester/ingester_ingest_storage_test.go b/pkg/ingester/ingester_ingest_storage_test.go index 4a529321155..fcf79dd4bc7 100644 --- a/pkg/ingester/ingester_ingest_storage_test.go +++ b/pkg/ingester/ingester_ingest_storage_test.go @@ -650,7 +650,7 @@ func createTestIngesterWithIngestStorage(t testing.TB, ingesterCfg *Config, over require.NoError(t, services.StopAndAwaitTerminated(ctx, prw)) }) - ingester, err := New(*ingesterCfg, overrides, nil, prw, nil, reg, util_test.NewTestingLogger(t)) + ingester, err := New(*ingesterCfg, overrides, nil, prw, nil, nil, reg, util_test.NewTestingLogger(t)) require.NoError(t, err) return ingester, kafkaCluster, prw diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 5b8db4c663f..bd307443840 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5997,7 +5997,7 @@ func prepareIngesterWithBlockStorageAndOverridesAndPartitionRing(t testing.TB, i ingestersRing = createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()) } - ingester, err := New(ingesterCfg, overrides, ingestersRing, partitionsRing, nil, registerer, noDebugNoopLogger{}) // LOGGING: log.NewLogfmtLogger(os.Stderr) + ingester, err := New(ingesterCfg, overrides, ingestersRing, partitionsRing, nil, nil, registerer, noDebugNoopLogger{}) // LOGGING: log.NewLogfmtLogger(os.Stderr) if err != nil { return nil, err } @@ -6203,7 +6203,7 @@ func TestIngester_OpenExistingTSDBOnStartup(t *testing.T) { // setup the tsdbs dir testData.setup(t, tempDir) - ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, log.NewNopLogger()) + ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, nil, log.NewNopLogger()) require.NoError(t, err) startErr := services.StartAndAwaitRunning(context.Background(), ingester) @@ -7363,7 +7363,7 @@ func TestHeadCompactionOnStartup(t *testing.T) { ingesterCfg.BlocksStorageConfig.Bucket.S3.Endpoint = "localhost" ingesterCfg.BlocksStorageConfig.TSDB.Retention = 2 * 24 * time.Hour // Make sure that no newly created blocks are deleted. - ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, log.NewNopLogger()) + ingester, err := New(ingesterCfg, overrides, createAndStartRing(t, ingesterCfg.IngesterRing.ToRingConfig()), nil, nil, nil, nil, log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingester)) diff --git a/pkg/streamingpromql/benchmarks/comparison_test.go b/pkg/streamingpromql/benchmarks/comparison_test.go index 2678f0d5c46..1ec18d108d3 100644 --- a/pkg/streamingpromql/benchmarks/comparison_test.go +++ b/pkg/streamingpromql/benchmarks/comparison_test.go @@ -237,7 +237,7 @@ func createIngesterQueryable(t testing.TB, address string) storage.Queryable { overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) - d, err := distributor.New(distributorCfg, clientCfg, overrides, nil, ingestersRing, nil, false, nil, logger) + d, err := distributor.New(distributorCfg, clientCfg, overrides, nil, nil, ingestersRing, nil, false, nil, logger) require.NoError(t, err) queryMetrics := stats.NewQueryMetrics(nil)