diff --git a/BitFaster.Caching/Lfu/ConcurrentLfu.cs b/BitFaster.Caching/Lfu/ConcurrentLfu.cs index 83a11e8e..59385f94 100644 --- a/BitFaster.Caching/Lfu/ConcurrentLfu.cs +++ b/BitFaster.Caching/Lfu/ConcurrentLfu.cs @@ -70,6 +70,9 @@ public sealed class ConcurrentLfu : ICache, IAsyncCache, IBoun private readonly IScheduler scheduler; + private readonly MpmcBoundedBuffer>> cleanBufferQueue = new MpmcBoundedBuffer>>(Environment.ProcessorCount); + private readonly MpmcBoundedBuffer>> deleteBufferQueue = new MpmcBoundedBuffer>>(Environment.ProcessorCount); + private readonly LfuNode[] drainBuffer; /// @@ -181,6 +184,7 @@ public void Trim(int itemCount) { // flush all buffers Maintenance(); + DeleteAllBufferedItems(); // walk in lru order, get itemCount keys to evict TakeCandidatesInLruOrder(this.probationLru, candidates, itemCount); @@ -334,8 +338,49 @@ private void AfterWrite(LfuNode node) } TryScheduleDrain(); + TryDeleteBufferedItems(); } + ////////////////////////////////////////////////////////////////////////////////////////////////// + + //while (true) + //{ + // bool wasTaken = false; + // Monitor.TryEnter(this.maintenanceLock, ref wasTaken); + // try + // { + // if (wasTaken) + // { + // // aggressively try to exit the lock early before doing full maintenance + // var status = BufferStatus.Contended; + // while (status != BufferStatus.Full) + // { + // status = writeBuffer.TryAdd(node); + + // if (status == BufferStatus.Success) + // { + // ScheduleAfterWrite(); + // return; + // } + // } + + // Maintenance(node); + // break; + // } + // } + // finally + // { + // if (wasTaken) + // { + // Monitor.Exit(this.maintenanceLock); + // } + // } + //} + + //DeleteAllBufferedItems(); + + ////////////////////////////////////////////////////////////////////////////////////////////////// + lock (this.maintenanceLock) { // aggressively try to exit the lock early before doing full maintenance @@ -354,6 +399,9 @@ private void AfterWrite(LfuNode node) // if the write was dropped from the buffer, explicitly pass it to maintenance Maintenance(node); } + + // remove outside the lock + DeleteAllBufferedItems(); } private void ScheduleAfterWrite() @@ -433,6 +481,9 @@ private void DrainBuffers() done = Maintenance(); } + // remove outside the lock + DeleteAllBufferedItems(); + // don't run continuous foreground maintenance if (!scheduler.IsBackground) { @@ -450,6 +501,7 @@ private bool Maintenance(LfuNode droppedWrite = null) { this.drainStatus.Set(DrainStatus.ProcessingToIdle); + // extract to a buffer before doing book keeping work, ~2x faster int readCount = readBuffer.DrainTo(this.drainBuffer); @@ -480,6 +532,8 @@ private bool Maintenance(LfuNode droppedWrite = null) } EvictEntries(); + QueueDeleteBuffer(); + this.capacity.OptimizePartitioning(this.metrics, this.cmSketch.ResetSampleSize); ReFitProtected(); @@ -496,6 +550,119 @@ private bool Maintenance(LfuNode droppedWrite = null) return done; } + private void TryAddToRemoveList(LfuNode node) + { + deleteBuffer.Add(node); + + if (deleteBuffer.Count >= deleteBufferSize) + { + // int attempts = 0; + + while (true) + { + if (this.deleteBufferQueue.TryAdd(deleteBuffer) == BufferStatus.Success) + { + if (this.cleanBufferQueue.TryTake(out deleteBuffer) == BufferStatus.Success) + { + return; + } + + deleteBuffer = new List>(deleteBufferSize); + return; + } + } + + //foreach (var n in deleteBuffer) + //{ + // this.dictionary.TryRemove(n.Key, out var _); + // Disposer.Dispose(n.Value); + //} + + //deleteBuffer.Clear(); + } + } + + private void QueueDeleteBuffer() + { + if (deleteBuffer.Count == 0) + { + return; + } + + while (true) + { + if (this.deleteBufferQueue.TryAdd(deleteBuffer) == BufferStatus.Success) + { + if (this.cleanBufferQueue.TryTake(out deleteBuffer) == BufferStatus.Success) + { + return; + } + + deleteBuffer = new List>(deleteBufferSize); + return; + } + } + + //if (this.deleteBufferQueue.TryAdd(deleteBuffer) != BufferStatus.Success) + //{ + // foreach (var n in deleteBuffer) + // { + // this.dictionary.TryRemove(n.Key, out var _); + // Disposer.Dispose(n.Value); + // } + + // deleteBuffer.Clear(); + //} + //else + //{ + // if (this.cleanBufferQueue.TryTake(out deleteBuffer) == BufferStatus.Success) + // { + // return; + // } + + // deleteBuffer = new List>(deleteBufferSize); + //} + } + + const int deleteBufferSize = 128; + private List> deleteBuffer = new List>(deleteBufferSize); + + private void TryDeleteBufferedItems() + { + if (deleteBufferQueue.TryTake(out var buffer) == BufferStatus.Success) + { + foreach (var n in buffer) + { + this.dictionary.TryRemove(n.Key, out var _); + Disposer.Dispose(n.Value); + } + + buffer.Clear(); + this.cleanBufferQueue.TryAdd(buffer); + } + } + + private void DeleteAllBufferedItems() + { + var spinner = new SpinWait(); + while (deleteBufferQueue.TryTake(out var buffer) != BufferStatus.Empty) + { + if (buffer != null) + { + foreach (var n in buffer) + { + this.dictionary.TryRemove(n.Key, out var _); + Disposer.Dispose(n.Value); + } + + buffer.Clear(); + this.cleanBufferQueue.TryAdd(buffer); + } + + spinner.SpinOnce(); + } + } + private void OnAccess(LfuNode node) { // there was a cache hit even if the item was removed or is not yet added. @@ -709,9 +876,10 @@ private bool AdmitCandidate(K candidateKey, K victimKey) private void Evict(LfuNode evictee) { - this.dictionary.TryRemove(evictee.Key, out var _); + //this.dictionary.TryRemove(evictee.Key, out var _); + TryAddToRemoveList(evictee); evictee.list.Remove(evictee); - Disposer.Dispose(evictee.Value); +// Disposer.Dispose(evictee.Value); this.metrics.evictedCount++; }