Skip to content

Commit

Permalink
set back exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Nov 26, 2024
1 parent b82f550 commit 9b8ae76
Showing 1 changed file with 50 additions and 51 deletions.
101 changes: 50 additions & 51 deletions pkg/costattribution/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type Tracker struct {
obseveredMtx sync.RWMutex
observed map[uint64]*Observation

activeSerieMtx sync.RWMutex
activeSeriesAttributionMap map[string]*LabelData

hashBuffer []byte
isOverflow bool
cooldownUntil *atomic.Int64
Expand Down Expand Up @@ -76,10 +73,10 @@ func newTracker(userID string, trackedLabels []string, limit int, cooldown time.
Help: "The total number of active series per user and attribution.",
ConstLabels: prometheus.Labels{TrackerLabel: "custom_attribution"},
}, append(trackedLabels, TenantLabel)),
hashBuffer: make([]byte, 0, 1024),
cooldownDuration: int64(cooldown.Seconds()),
activeSerieMtx: sync.RWMutex{},
activeSeriesAttributionMap: map[string]*LabelData{},
hashBuffer: make([]byte, 0, 1024),
cooldownDuration: int64(cooldown.Seconds()),
// activeSerieMtx: sync.RWMutex{},
// activeSeriesAttributionMap: map[string]*LabelData{},
}
return m, nil
}
Expand Down Expand Up @@ -117,9 +114,9 @@ func (t *Tracker) cleanupTrackerAttribution(vals []string) {
}
key = append(key, v...)
}
t.activeSerieMtx.Lock()
delete(t.activeSeriesAttributionMap, string(key))
t.activeSerieMtx.Unlock()
// t.activeSerieMtx.Lock()
// delete(t.activeSeriesAttributionMap, string(key))
// t.activeSerieMtx.Unlock()

t.activeSeriesPerUserAttribution.DeleteLabelValues(vals...)
t.receivedSamplesAttribution.DeleteLabelValues(vals...)
Expand All @@ -137,9 +134,9 @@ func (t *Tracker) cleanupTracker(userID string) {
if t == nil {
return
}
t.activeSerieMtx.Lock()
t.activeSeriesAttributionMap = map[string]*LabelData{}
t.activeSerieMtx.Unlock()
// t.activeSerieMtx.Lock()
// t.activeSeriesAttributionMap = map[string]*LabelData{}
// t.activeSerieMtx.Unlock()
filter := prometheus.Labels{TenantLabel: userID}
t.activeSeriesPerUserAttribution.DeletePartialMatch(filter)
t.receivedSamplesAttribution.DeletePartialMatch(filter)
Expand All @@ -154,44 +151,46 @@ func (t *Tracker) IncrementActiveSeries(lbs labels.Labels, now time.Time) {
return
}
vals := t.getKeyValues(lbs, now.Unix())
var keyBuf [4096]byte
key := keyBuf[:0]
for i, v := range vals {
if i > 0 {
key = append(key, sep)
}
key = append(key, v...)
}
t.activeSerieMtx.Lock()
if cnt, ok := t.activeSeriesAttributionMap[string(key)]; !ok {
t.activeSeriesAttributionMap[string(key)] = &LabelData{
Labels: vals,
Count: atomic.NewInt64(1),
}
} else {
cnt.Count.Inc()
}
t.activeSerieMtx.Unlock()
// var keyBuf [4096]byte
// key := keyBuf[:0]
// for i, v := range vals {
// if i > 0 {
// key = append(key, sep)
// }
// key = append(key, v...)
// }
// t.activeSerieMtx.Lock()
// if cnt, ok := t.activeSeriesAttributionMap[string(key)]; !ok {
// t.activeSeriesAttributionMap[string(key)] = &LabelData{
// Labels: vals,
// Count: atomic.NewInt64(1),
// }
// } else {
// cnt.Count.Inc()
// }
// t.activeSerieMtx.Unlock()
t.activeSeriesPerUserAttribution.WithLabelValues(vals...).Inc()
}

func (t *Tracker) DecrementActiveSeries(lbs labels.Labels, now time.Time) {
if t == nil {
return
}
vals := t.getKeyValues(lbs, now.Unix())
var keyBuf [4096]byte
key := keyBuf[:0]
for i, v := range vals {
if i > 0 {
key = append(key, sep)
}
key = append(key, v...)
}
t.activeSerieMtx.Lock()
if cnt, ok := t.activeSeriesAttributionMap[string(key)]; ok {
cnt.Count.Dec()
}
t.activeSerieMtx.Unlock()
t.activeSeriesPerUserAttribution.WithLabelValues(vals...).Dec()
// var keyBuf [4096]byte
// key := keyBuf[:0]
// for i, v := range vals {
// if i > 0 {
// key = append(key, sep)
// }
// key = append(key, v...)
// }
// t.activeSerieMtx.Lock()
// if cnt, ok := t.activeSeriesAttributionMap[string(key)]; ok {
// cnt.Count.Dec()
// }
// t.activeSerieMtx.Unlock()
}

func (t *Tracker) IncrementDiscardedSamples(lbs labels.Labels, value float64, reason string, now time.Time) {
Expand Down Expand Up @@ -219,13 +218,13 @@ func (t *Tracker) Collect(out chan<- prometheus.Metric) {
if t == nil {
return
}
t.activeSerieMtx.RLock()
for _, c := range t.activeSeriesAttributionMap {
if c != nil {
t.activeSeriesPerUserAttribution.WithLabelValues(c.Labels...).Set(float64(c.Count.Load()))
}
}
t.activeSerieMtx.RUnlock()
// t.activeSerieMtx.RLock()
// for _, c := range t.activeSeriesAttributionMap {
// if c != nil {
// t.activeSeriesPerUserAttribution.WithLabelValues(c.Labels...).Set(float64(c.Count.Load()))
// }
// }
// t.activeSerieMtx.RUnlock()

t.activeSeriesPerUserAttribution.Collect(out)
t.receivedSamplesAttribution.Collect(out)
Expand Down

0 comments on commit 9b8ae76

Please sign in to comment.