Skip to content

Commit

Permalink
fix the distributor crashloop
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Nov 27, 2024
1 parent 12d7d79 commit e0525e1
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 18 deletions.
2 changes: 1 addition & 1 deletion development/mimir-microservices-mode/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ limits:
ha_replica_label: ha_replica
ha_max_clusters: 10

cost_attribution_labels: "instance"
cost_attribution_labels: "container"
max_cost_attribution_labels_per_user: 2
max_cost_attribution_cardinality_per_user: 100
cost_attribution_cooldown: 20m
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/server"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/grafana/mimir/pkg/alertmanager"
"github.com/grafana/mimir/pkg/alertmanager/alertmanagerpb"
"github.com/grafana/mimir/pkg/compactor"
Expand All @@ -41,8 +44,6 @@ import (
util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/validation"
"github.com/grafana/mimir/pkg/util/validation/exporter"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type ConfigHandler func(actualCfg interface{}, defaultCfg interface{}) http.HandlerFunc
Expand Down
4 changes: 2 additions & 2 deletions pkg/costattribution/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (m *Manager) TrackerForUser(userID string) *Tracker {

// if not exists, create a new tracker
if _, exists := m.trackersByUserID[userID]; !exists {
m.trackersByUserID[userID], _ = newTracker(userID, m.limits.CostAttributionLabels(userID), m.limits.MaxCostAttributionCardinalityPerUser(userID), m.limits.CostAttributionCooldown(userID))
m.trackersByUserID[userID], _ = newTracker(userID, m.limits.CostAttributionLabels(userID), m.limits.MaxCostAttributionCardinalityPerUser(userID), m.limits.CostAttributionCooldown(userID), m.logger)
}
return m.trackersByUserID[userID]
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func (m *Manager) purgeInactiveObservationsForUser(userID string, deadline int64
// if they are different, we need to update the tracker, we don't mind, just reinitialized the tracker
if !CompareCALabels(cat.CALabels(), newTrackedLabels) {
m.mtx.Lock()
m.trackersByUserID[userID], _ = newTracker(userID, m.limits.CostAttributionLabels(userID), m.limits.MaxCostAttributionCardinalityPerUser(userID), m.limits.CostAttributionCooldown(userID))
m.trackersByUserID[userID], _ = newTracker(userID, m.limits.CostAttributionLabels(userID), m.limits.MaxCostAttributionCardinalityPerUser(userID), m.limits.CostAttributionCooldown(userID), m.logger)
// update the tracker with the new tracker
cat = m.trackersByUserID[userID]
m.mtx.Unlock()
Expand Down
70 changes: 65 additions & 5 deletions pkg/costattribution/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ package costattribution

import (
"sort"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"
Expand Down Expand Up @@ -34,13 +36,17 @@ type Tracker struct {
obseveredMtx sync.RWMutex
observed map[uint64]*Observation

activeSerieMtx sync.RWMutex
activeSeriesAttributionMap map[string]*atomic.Int64

hashBuffer []byte
isOverflow bool
cooldownUntil *atomic.Int64
cooldownDuration int64
logger log.Logger
}

func newTracker(userID string, trackedLabels []string, limit int, cooldown time.Duration) (*Tracker, error) {
func newTracker(userID string, trackedLabels []string, limit int, cooldown time.Duration, logger log.Logger) (*Tracker, error) {
// keep tracked labels sorted for consistent metric labels
sort.Slice(trackedLabels, func(i, j int) bool {
return trackedLabels[i] < trackedLabels[j]
Expand Down Expand Up @@ -69,8 +75,11 @@ func newTracker(userID string, trackedLabels []string, limit int, cooldown time.
Help: "The total number of active series per user and attribution.",
ConstLabels: prometheus.Labels{TrackerLabel: "custom_attribution"},
}, append(trackedLabels, TenantLabel)),
hashBuffer: make([]byte, 0, 1024),
cooldownDuration: int64(cooldown.Seconds()),
hashBuffer: make([]byte, 0, 1024),
cooldownDuration: int64(cooldown.Seconds()),
logger: logger,
activeSerieMtx: sync.RWMutex{},
activeSeriesAttributionMap: map[string]*atomic.Int64{},
}
return m, nil
}
Expand All @@ -96,10 +105,25 @@ func (t *Tracker) CooldownDuration() int64 {
return t.cooldownDuration
}

// sep is used to separate the labels in the key, it is not a valid label caracter
const sep = rune(0x80)

func (t *Tracker) cleanupTrackerAttribution(vals []string) {
if t == nil {
return
}

var sb strings.Builder
for i, v := range vals {
if i > 0 {
sb.WriteRune(sep)
}
sb.WriteString(v)
}
t.activeSerieMtx.Lock()
delete(t.activeSeriesAttributionMap, sb.String())
t.activeSerieMtx.Unlock()

t.activeSeriesPerUserAttribution.DeleteLabelValues(vals...)
t.receivedSamplesAttribution.DeleteLabelValues(vals...)

Expand All @@ -116,6 +140,9 @@ func (t *Tracker) cleanupTracker(userID string) {
if t == nil {
return
}
t.activeSerieMtx.Lock()
t.activeSeriesAttributionMap = map[string]*atomic.Int64{}
t.activeSerieMtx.Unlock()
filter := prometheus.Labels{TenantLabel: userID}
t.activeSeriesPerUserAttribution.DeletePartialMatch(filter)
t.receivedSamplesAttribution.DeletePartialMatch(filter)
Expand All @@ -127,15 +154,39 @@ func (t *Tracker) IncrementActiveSeries(lbs labels.Labels, now time.Time) {
return
}
vals := t.getKeyValues(lbs, now.Unix())
t.activeSeriesPerUserAttribution.WithLabelValues(vals...).Inc()
var sb strings.Builder
for i, v := range vals {
if i > 0 {
sb.WriteRune(sep)
}
sb.WriteString(v)
}
t.activeSerieMtx.Lock()
if cnt, ok := t.activeSeriesAttributionMap[sb.String()]; !ok {
t.activeSeriesAttributionMap[sb.String()] = atomic.NewInt64(1)
} else {
cnt.Inc()
}
t.activeSerieMtx.Unlock()
}

func (t *Tracker) DecrementActiveSeries(lbs labels.Labels, now time.Time) {
if t == nil {
return
}
vals := t.getKeyValues(lbs, now.Unix())
t.activeSeriesPerUserAttribution.WithLabelValues(vals...).Dec()
var sb strings.Builder
for i, v := range vals {
if i > 0 {
sb.WriteRune(sep)
}
sb.WriteString(v)
}
t.activeSerieMtx.Lock()
if cnt, ok := t.activeSeriesAttributionMap[sb.String()]; ok {
cnt.Dec()
}
t.activeSerieMtx.Unlock()
}

func (t *Tracker) IncrementDiscardedSamples(lbs labels.Labels, value float64, reason string, now time.Time) {
Expand Down Expand Up @@ -163,6 +214,13 @@ func (t *Tracker) Collect(out chan<- prometheus.Metric) {
if t == nil {
return
}
t.activeSerieMtx.Lock()
for key, c := range t.activeSeriesAttributionMap {
if c != nil {
t.activeSeriesPerUserAttribution.WithLabelValues(strings.Split(key, string(sep))...).Set(float64(c.Load()))
}
}
t.activeSerieMtx.Unlock()
t.activeSeriesPerUserAttribution.Collect(out)
t.receivedSamplesAttribution.Collect(out)
t.discardedSampleAttribution.Collect(out)
Expand Down Expand Up @@ -205,6 +263,7 @@ func (t *Tracker) overflow(stream uint64, values []string, ts int64) bool {
return false
}

t.obseveredMtx.Lock()
// we store up to 2 * maxCardinality observations, if we have seen the stream before, we update the last update time
if o, known := t.observed[stream]; known && o.lastUpdate != nil && o.lastUpdate.Load() < ts {
o.lastUpdate.Store(ts)
Expand All @@ -214,6 +273,7 @@ func (t *Tracker) overflow(stream uint64, values []string, ts int64) bool {
lastUpdate: atomic.NewInt64(ts),
}
}
t.obseveredMtx.Unlock()

// If the maximum cardinality is hit all streams become `__overflow__`, the function would return true.
// the origin labels ovserved time is not updated, but the overflow hash is updated.
Expand Down
14 changes: 6 additions & 8 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ type seriesStripe struct {
activeMatchingNativeHistograms []uint32 // Number of active entries (only native histograms) in this stripe matching each matcher of the configured Matchers.
activeNativeHistogramBuckets uint32 // Number of buckets in active native histogram entries in this stripe. Only decreased during purge or clear.
activeMatchingNativeHistogramBuckets []uint32 // Number of buckets in active native histogram entries in this stripe matching each matcher of the configured Matchers.
buf labels.ScratchBuilder
}

// seriesEntry holds a timestamp for single series.
Expand Down Expand Up @@ -451,7 +450,6 @@ func (s *seriesStripe) reinitialize(
s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms)
s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets)
s.cat = cat
s.buf = labels.NewScratchBuilder(128)
}

func (s *seriesStripe) purge(keepUntil time.Time, idx tsdb.IndexReader) {
Expand All @@ -472,6 +470,7 @@ func (s *seriesStripe) purge(keepUntil time.Time, idx tsdb.IndexReader) {
s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(s.activeMatchingNativeHistogramBuckets), s.activeMatchingNativeHistogramBuckets)

oldest := int64(math.MaxInt64)
buf := labels.NewScratchBuilder(128)
for ref, entry := range s.refs {
ts := entry.nanos.Load()
if ts < keepUntilNanos {
Expand All @@ -480,12 +479,11 @@ func (s *seriesStripe) purge(keepUntil time.Time, idx tsdb.IndexReader) {
}

if idx != nil {
if err := idx.Series(ref, &s.buf, nil); err != nil {
if err := idx.Series(ref, &buf, nil); err != nil {
//TODO: think about what to do here
_ = err
}
s.cat.DecrementActiveSeries(s.buf.Labels(), keepUntil)
s.buf.Reset()
s.cat.DecrementActiveSeries(buf.Labels(), keepUntil)
}
delete(s.refs, ref)
continue
Expand Down Expand Up @@ -535,12 +533,12 @@ func (s *seriesStripe) remove(ref storage.SeriesRef, idx tsdb.IndexReader) {

s.active--
if idx != nil {
if err := idx.Series(ref, &s.buf, nil); err != nil {
buf := labels.NewScratchBuilder(10)
if err := idx.Series(ref, &buf, nil); err != nil {
//TODO: think about what to do here
_ = err
}
s.cat.DecrementActiveSeries(s.buf.Labels(), time.Now())
defer s.buf.Reset()
s.cat.DecrementActiveSeries(buf.Labels(), time.Now())
}
if entry.numNativeHistogramBuckets >= 0 {
s.activeNativeHistograms--
Expand Down

0 comments on commit e0525e1

Please sign in to comment.