Skip to content

Commit

Permalink
TBS: flatten badger when compaction is not running
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Dec 31, 2024
1 parent c81896c commit 714ccd6
Showing 1 changed file with 79 additions and 3 deletions.
82 changes: 79 additions & 3 deletions x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const (
var (
// gcCh works like a global mutex to protect gc from running concurrently when 2 TBS processors are active during a hot reload
gcCh = make(chan struct{}, 1)
// flattenCh works like a global mutex to protect flatten from running concurrently when 2 TBS processors are active during a hot reload
flattenCh = make(chan struct{}, 1)
)

// Processor is a tail-sampling event processor.
Expand Down Expand Up @@ -390,6 +392,34 @@ func (p *Processor) Run() error {
}
}
})
g.Go(func() error {
select {
case <-p.stopping:
return nil
case flattenCh <- struct{}{}:
}
defer func() {
<-flattenCh
}()
ticker := time.NewTicker(p.config.TTL / 2)
defer ticker.Stop()
for {
select {
case <-p.stopping:
return nil
case <-ticker.C:
if p.eventStore.storageLimitTracker.isStuck() {
p.eventStore.storageLimitTracker.flattenMu.Lock()
err = p.config.DB.Flatten(3)
if err != nil {
p.logger.With(logp.Error(err)).Warnf("fail to flatten db")
}
p.eventStore.storageLimitTracker.flattenMu.Unlock()
}
p.eventStore.storageLimitTracker.reset()
}
}
})
g.Go(func() error {
// Protect this goroutine from running concurrently when 2 TBS processors are active
// as badger GC is not concurrent safe.
Expand All @@ -411,6 +441,7 @@ func (p *Processor) Run() error {
case <-p.stopping:
return nil
case <-ticker.C:
p.eventStore.storageLimitTracker.flattenMu.RLock()
const discardRatio = 0.5
var err error
for err == nil {
Expand All @@ -419,8 +450,10 @@ func (p *Processor) Run() error {
err = p.config.DB.RunValueLogGC(discardRatio)
}
if err != nil && err != badger.ErrNoRewrite {
p.eventStore.storageLimitTracker.flattenMu.RUnlock()
return err
}
p.eventStore.storageLimitTracker.flattenMu.RUnlock()
}
}
})
Expand Down Expand Up @@ -623,10 +656,36 @@ const (
storageLimitThreshold = 0.90 // Allow 90% of the quota to be used.
)

var (
ErrFlattenInProgress = errors.New("flatten in progress, writes are discarded")
)

type storageLimitTracker struct {
anyWrite, anyWriteSuccess atomic.Bool
flattenMu sync.RWMutex
}

func (s *storageLimitTracker) reset() {
s.anyWrite.Store(false)
s.anyWriteSuccess.Store(false)
}

func (s *storageLimitTracker) recordError(isErr bool) {
s.anyWrite.Store(true)
if !isErr {
s.anyWriteSuccess.Store(true)
}
}

func (s *storageLimitTracker) isStuck() bool {
return s.anyWrite.Load() && !s.anyWriteSuccess.Load()
}

// wrappedRW wraps configurable write options for global ShardedReadWriter
type wrappedRW struct {
rw *eventstorage.ShardedReadWriter
writerOpts eventstorage.WriterOpts
rw *eventstorage.ShardedReadWriter
writerOpts eventstorage.WriterOpts
storageLimitTracker storageLimitTracker
}

// Stored entries expire after ttl.
Expand Down Expand Up @@ -654,11 +713,23 @@ func (s *wrappedRW) ReadTraceEvents(traceID string, out *modelpb.Batch) error {

// WriteTraceEvents calls ShardedReadWriter.WriteTraceEvents using the configured WriterOpts
func (s *wrappedRW) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent) error {
return s.rw.WriteTraceEvent(traceID, id, event, s.writerOpts)
ok := s.storageLimitTracker.flattenMu.TryRLock()
if !ok {
return ErrFlattenInProgress
}
defer s.storageLimitTracker.flattenMu.RUnlock()
err := s.rw.WriteTraceEvent(traceID, id, event, s.writerOpts)
s.storageLimitTracker.recordError(errors.Is(err, eventstorage.ErrLimitReached))
return err
}

// WriteTraceSampled calls ShardedReadWriter.WriteTraceSampled using the configured WriterOpts
func (s *wrappedRW) WriteTraceSampled(traceID string, sampled bool) error {
ok := s.storageLimitTracker.flattenMu.TryRLock()
if !ok {
return ErrFlattenInProgress
}
defer s.storageLimitTracker.flattenMu.RUnlock()
return s.rw.WriteTraceSampled(traceID, sampled, s.writerOpts)
}

Expand All @@ -669,6 +740,11 @@ func (s *wrappedRW) IsTraceSampled(traceID string) (bool, error) {

// DeleteTraceEvent calls ShardedReadWriter.DeleteTraceEvent
func (s *wrappedRW) DeleteTraceEvent(traceID, id string) error {
ok := s.storageLimitTracker.flattenMu.TryRLock()
if !ok {
return ErrFlattenInProgress
}
defer s.storageLimitTracker.flattenMu.RUnlock()
return s.rw.DeleteTraceEvent(traceID, id)
}

Expand Down

0 comments on commit 714ccd6

Please sign in to comment.