Skip to content

Commit

Permalink
Make wait strategy configurable in perf tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ocoanet committed Dec 1, 2024
1 parent 63ec917 commit 6b7e026
Show file tree
Hide file tree
Showing 19 changed files with 208 additions and 325 deletions.
2 changes: 0 additions & 2 deletions src/Disruptor.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ public static void Main(string[] args)
config: DefaultConfig.Instance.WithOption(ConfigOptions.JoinSummary, true),
args: args
);

Console.ReadLine();
}

public static async Task MainTests()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ public class OneWaySequencedLatencyTest : ILatencyTest, IDisposable
{
private const int _bufferSize = 1024;
private const long _iterations = 100 * 1000 * 30;
private static readonly long _pause = StopwatchUtil.GetTimestampFromMicroseconds(10);

private static readonly long _pause = StopwatchUtil.GetTimestampFromMicroseconds(10);
private readonly ProgramOptions _options;
private readonly Disruptor<PerfEvent> _disruptor;
private readonly Handler _handler;

public OneWaySequencedLatencyTest()
public OneWaySequencedLatencyTest(ProgramOptions options)
{
_disruptor = new Disruptor<PerfEvent>(() => new PerfEvent(), _bufferSize, new BlockingWaitStrategy());
_handler = new Handler();
_options = options;
_disruptor = new Disruptor<PerfEvent>(() => new PerfEvent(), _bufferSize, new YieldingWaitStrategy());
_handler = new Handler(options.GetCustomCpu(1));
_disruptor.HandleEventsWith(_handler);
_disruptor.Start();
}
Expand All @@ -32,6 +34,8 @@ public void Run(LatencySessionContext sessionContext)
_handler.Initialize(sessionContext.Histogram);
_handler.Started.Wait();

using var _ = ThreadAffinityUtil.SetThreadAffinity(_options.GetCustomCpu(0), ThreadPriority.Highest);

var pause = _pause;
var next = Stopwatch.GetTimestamp() + pause;

Expand Down Expand Up @@ -71,16 +75,30 @@ public void Dispose()

private class Handler : IEventHandler<PerfEvent>
{
private readonly int? _cpu;
private HistogramBase _histogram;
private ThreadAffinityScope _affinityScope;

public Handler(int? cpu)
{
_cpu = cpu;
}

public ManualResetEventSlim Started { get; } = new();
public ManualResetEventSlim Completed { get; } = new();

public void OnStart()
{
_affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu, ThreadPriority.Highest);

Started.Set();
}

public void OnShutdown()
{
_affinityScope.Dispose();
}

public void Initialize(HistogramBase histogram)
{
_histogram = histogram;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ public class OneWaySequencedLatencyTest_BatchHandler : ILatencyTest, IDisposable
private const long _iterations = 100 * 1000 * 30;
private static readonly long _pause = StopwatchUtil.GetTimestampFromMicroseconds(10);

private readonly ProgramOptions _options;
private readonly Disruptor<PerfEvent> _disruptor;
private readonly Handler _handler;

public OneWaySequencedLatencyTest_BatchHandler()
public OneWaySequencedLatencyTest_BatchHandler(ProgramOptions options)
{
_disruptor = new Disruptor<PerfEvent>(() => new PerfEvent(), _bufferSize, new BlockingWaitStrategy());
// _disruptor = new Disruptor<PerfEvent>(() => new PerfEvent(), _bufferSize, new YieldingWaitStrategy());
_handler = new Handler();
_options = options;
_disruptor = new Disruptor<PerfEvent>(() => new PerfEvent(), _bufferSize, new YieldingWaitStrategy());
_handler = new Handler(options.GetCustomCpu(1));
_disruptor.HandleEventsWith(_handler);
_disruptor.Start();
}
Expand All @@ -33,6 +34,8 @@ public void Run(LatencySessionContext sessionContext)
_handler.Initialize(sessionContext.Histogram);
_handler.Started.Wait();

using var _ = ThreadAffinityUtil.SetThreadAffinity(_options.GetCustomCpu(0), ThreadPriority.Highest);

var pause = _pause;
var next = Stopwatch.GetTimestamp() + pause;

Expand Down Expand Up @@ -72,16 +75,29 @@ public void Dispose()

private class Handler : IBatchEventHandler<PerfEvent>
{
private readonly int? _cpu;
private HistogramBase _histogram;
private ThreadAffinityScope _affinityScope;

public Handler(int? cpu)
{
_cpu = cpu;
}

public ManualResetEventSlim Started { get; } = new();
public ManualResetEventSlim Completed { get; } = new();

public void OnStart()
{
_affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu, ThreadPriority.Highest);
Started.Set();
}

public void OnShutdown()
{
_affinityScope.Dispose();
}

public void Initialize(HistogramBase histogram)
{
_histogram = histogram;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public PingPongSequencedLatencyTest(ProgramOptions options)
var pingBarrier = pingBuffer.NewBarrier();
var pongBarrier = pongBuffer.NewBarrier();

_pinger = new Pinger(pingBuffer, options.GetCustomCpu(0), _iterations, _pauseNanos);
_pinger = new Pinger(pingBuffer, options.GetCustomCpu(0));
_ponger = new Ponger(pongBuffer, options.GetCustomCpu(1));

_pingProcessor = EventProcessorFactory.Create(pongBuffer,pongBarrier, _pinger);
Expand Down Expand Up @@ -73,32 +73,28 @@ private class Pinger : IEventHandler<PerfEvent>
{
private readonly RingBuffer<PerfEvent> _buffer;
private readonly int? _cpu;
private readonly long _maxEvents;
private readonly long _pauseTimeNs;
private readonly long _pauseTimeTicks;
private HistogramBase _histogram;
private long _t0;
private long _counter;
private CountdownEvent _globalSignal;
private ManualResetEvent _signal;
private ThreadAffinityUtil.Scope _affinityScope;
private ThreadAffinityScope _affinityScope;

public Pinger(RingBuffer<PerfEvent> buffer, int? cpu, long maxEvents, long pauseTimeNs)
public Pinger(RingBuffer<PerfEvent> buffer, int? cpu)
{
_buffer = buffer;
_cpu = cpu;
_maxEvents = maxEvents;
_pauseTimeNs = pauseTimeNs;
_pauseTimeTicks = StopwatchUtil.GetTimestampFromNanoseconds(pauseTimeNs);
_pauseTimeTicks = StopwatchUtil.GetTimestampFromNanoseconds(_pauseNanos);
}

public void OnEvent(PerfEvent data, long sequence, bool endOfBatch)
{
var t1 = Stopwatch.GetTimestamp();

_histogram.RecordValueWithExpectedInterval(StopwatchUtil.ToNanoseconds(t1 - _t0), _pauseTimeNs);
_histogram.RecordValueWithExpectedInterval(StopwatchUtil.ToNanoseconds(t1 - _t0), _pauseNanos);

if (data.Value < _maxEvents)
if (data.Value < _iterations)
{
while (_pauseTimeTicks > (Stopwatch.GetTimestamp() - t1))
{
Expand All @@ -125,8 +121,7 @@ private void Send()

public void OnStart()
{
if (_cpu != null)
_affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu.Value, ThreadPriority.Highest);
_affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu, ThreadPriority.Highest);

_globalSignal.Signal();
_globalSignal.Wait();
Expand All @@ -138,8 +133,7 @@ public void OnStart()

public void OnShutdown()
{
if (_cpu != null)
_affinityScope.Dispose();
_affinityScope.Dispose();
}

public void Reset(CountdownEvent globalSignal, ManualResetEvent signal, HistogramBase histogram)
Expand All @@ -157,7 +151,7 @@ private class Ponger : IEventHandler<PerfEvent>
private readonly RingBuffer<PerfEvent> _buffer;
private readonly int? _cpu;
private CountdownEvent _globalSignal;
private ThreadAffinityUtil.Scope _affinityScope;
private ThreadAffinityScope _affinityScope;

public Ponger(RingBuffer<PerfEvent> buffer, int? cpu)
{
Expand All @@ -174,17 +168,15 @@ public void OnEvent(PerfEvent data, long sequence, bool endOfBatch)

public void OnStart()
{
if (_cpu != null)
_affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu.Value, ThreadPriority.Highest);
_affinityScope = ThreadAffinityUtil.SetThreadAffinity(_cpu, ThreadPriority.Highest);

_globalSignal.Signal();
_globalSignal.Wait();
}

public void OnShutdown()
{
if (_cpu != null)
_affinityScope.Dispose();
_affinityScope.Dispose();
}

public void Reset(CountdownEvent globalSignal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public PingPongSequencedLatencyTest_AsyncBatchHandler()
var pingBarrier = pingBuffer.NewAsyncBarrier();
var pongBarrier = pongBuffer.NewAsyncBarrier();

_pinger = new Pinger(pongBuffer, _pauseNanos);
_pinger = new Pinger(pongBuffer);
_ponger = new Ponger(pingBuffer);

_pingProcessor = EventProcessorFactory.Create(pingBuffer, pingBarrier, _pinger);
Expand Down Expand Up @@ -77,27 +77,25 @@ private class PingPongEvent
private class Pinger : IAsyncBatchEventHandler<PingPongEvent>
{
private readonly RingBuffer<PingPongEvent> _buffer;
private readonly long _pauseTimeNs;
private readonly long _pauseTimeTicks;
private HistogramBase _histogram;
private CountdownEvent _startCountdown;
private ManualResetEvent _completedSignal;
private long _t0;
private long _counter;

public Pinger(RingBuffer<PingPongEvent> buffer, long pauseTimeNs)
public Pinger(RingBuffer<PingPongEvent> buffer)
{
_buffer = buffer;
_pauseTimeNs = pauseTimeNs;
_pauseTimeTicks = StopwatchUtil.GetTimestampFromNanoseconds(pauseTimeNs);
_pauseTimeTicks = StopwatchUtil.GetTimestampFromNanoseconds(_pauseNanos);
}

public ValueTask OnBatch(EventBatch<PingPongEvent> batch, long sequence)
{
foreach (var data in batch)
{
var t1 = Stopwatch.GetTimestamp();
_histogram.RecordValueWithExpectedInterval(StopwatchUtil.ToNanoseconds(t1 - _t0), _pauseTimeNs);
_histogram.RecordValueWithExpectedInterval(StopwatchUtil.ToNanoseconds(t1 - _t0), _pauseNanos);

if (data.Counter == _iterations)
{
Expand Down
Loading

0 comments on commit 6b7e026

Please sign in to comment.