From 714ccd61e8966c979ac47140248a5d1534b7ed74 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Tue, 31 Dec 2024 14:08:49 +0000 Subject: [PATCH] TBS: flatten badger when compaction is not running --- x-pack/apm-server/sampling/processor.go | 82 ++++++++++++++++++++++++- 1 file changed, 79 insertions(+), 3 deletions(-) diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 9402408b299..9a9c96685e7 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -43,6 +43,8 @@ const ( var ( // gcCh works like a global mutex to protect gc from running concurrently when 2 TBS processors are active during a hot reload gcCh = make(chan struct{}, 1) + // flattenCh works like a global mutex to protect flatten from running concurrently when 2 TBS processors are active during a hot reload + flattenCh = make(chan struct{}, 1) ) // Processor is a tail-sampling event processor. @@ -390,6 +392,34 @@ func (p *Processor) Run() error { } } }) + g.Go(func() error { + select { + case <-p.stopping: + return nil + case flattenCh <- struct{}{}: + } + defer func() { + <-flattenCh + }() + ticker := time.NewTicker(p.config.TTL / 2) + defer ticker.Stop() + for { + select { + case <-p.stopping: + return nil + case <-ticker.C: + if p.eventStore.storageLimitTracker.isStuck() { + p.eventStore.storageLimitTracker.flattenMu.Lock() + err = p.config.DB.Flatten(3) + if err != nil { + p.logger.With(logp.Error(err)).Warnf("fail to flatten db") + } + p.eventStore.storageLimitTracker.flattenMu.Unlock() + } + p.eventStore.storageLimitTracker.reset() + } + } + }) g.Go(func() error { // Protect this goroutine from running concurrently when 2 TBS processors are active // as badger GC is not concurrent safe. @@ -411,6 +441,7 @@ func (p *Processor) Run() error { case <-p.stopping: return nil case <-ticker.C: + p.eventStore.storageLimitTracker.flattenMu.RLock() const discardRatio = 0.5 var err error for err == nil { @@ -419,8 +450,10 @@ func (p *Processor) Run() error { err = p.config.DB.RunValueLogGC(discardRatio) } if err != nil && err != badger.ErrNoRewrite { + p.eventStore.storageLimitTracker.flattenMu.RUnlock() return err } + p.eventStore.storageLimitTracker.flattenMu.RUnlock() } } }) @@ -623,10 +656,36 @@ const ( storageLimitThreshold = 0.90 // Allow 90% of the quota to be used. ) +var ( + ErrFlattenInProgress = errors.New("flatten in progress, writes are discarded") +) + +type storageLimitTracker struct { + anyWrite, anyWriteSuccess atomic.Bool + flattenMu sync.RWMutex +} + +func (s *storageLimitTracker) reset() { + s.anyWrite.Store(false) + s.anyWriteSuccess.Store(false) +} + +func (s *storageLimitTracker) recordError(isErr bool) { + s.anyWrite.Store(true) + if !isErr { + s.anyWriteSuccess.Store(true) + } +} + +func (s *storageLimitTracker) isStuck() bool { + return s.anyWrite.Load() && !s.anyWriteSuccess.Load() +} + // wrappedRW wraps configurable write options for global ShardedReadWriter type wrappedRW struct { - rw *eventstorage.ShardedReadWriter - writerOpts eventstorage.WriterOpts + rw *eventstorage.ShardedReadWriter + writerOpts eventstorage.WriterOpts + storageLimitTracker storageLimitTracker } // Stored entries expire after ttl. @@ -654,11 +713,23 @@ func (s *wrappedRW) ReadTraceEvents(traceID string, out *modelpb.Batch) error { // WriteTraceEvents calls ShardedReadWriter.WriteTraceEvents using the configured WriterOpts func (s *wrappedRW) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent) error { - return s.rw.WriteTraceEvent(traceID, id, event, s.writerOpts) + ok := s.storageLimitTracker.flattenMu.TryRLock() + if !ok { + return ErrFlattenInProgress + } + defer s.storageLimitTracker.flattenMu.RUnlock() + err := s.rw.WriteTraceEvent(traceID, id, event, s.writerOpts) + s.storageLimitTracker.recordError(errors.Is(err, eventstorage.ErrLimitReached)) + return err } // WriteTraceSampled calls ShardedReadWriter.WriteTraceSampled using the configured WriterOpts func (s *wrappedRW) WriteTraceSampled(traceID string, sampled bool) error { + ok := s.storageLimitTracker.flattenMu.TryRLock() + if !ok { + return ErrFlattenInProgress + } + defer s.storageLimitTracker.flattenMu.RUnlock() return s.rw.WriteTraceSampled(traceID, sampled, s.writerOpts) } @@ -669,6 +740,11 @@ func (s *wrappedRW) IsTraceSampled(traceID string) (bool, error) { // DeleteTraceEvent calls ShardedReadWriter.DeleteTraceEvent func (s *wrappedRW) DeleteTraceEvent(traceID, id string) error { + ok := s.storageLimitTracker.flattenMu.TryRLock() + if !ok { + return ErrFlattenInProgress + } + defer s.storageLimitTracker.flattenMu.RUnlock() return s.rw.DeleteTraceEvent(traceID, id) }