Skip to content

Commit

Permalink
Ensure inbound buffer is always flushed to outbound (#44)
Browse files Browse the repository at this point in the history
* Ensure inbound buffer is always flushed to outbound
* Minor refactoring for easier reading + add comment
  • Loading branch information
stevejgordon authored Feb 5, 2024
1 parent 5f560a0 commit f93c967
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
32 changes: 19 additions & 13 deletions src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public abstract class BufferedChannelBase<TChannelOptions, TEvent, TResponse>
where TChannelOptions : ChannelOptionsBase<TEvent, TResponse>
where TResponse : class, new()
{
private readonly Task _inThread;
private readonly Task _outThread;
private readonly Task _inTask;
private readonly Task _outTask;
private readonly SemaphoreSlim _throttleTasks;
private readonly CountdownEvent? _signal;

Expand Down Expand Up @@ -122,13 +122,13 @@ protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallb

InboundBuffer = new InboundBuffer<TEvent>(maxOut, BufferOptions.OutboundBufferMaxLifetime);

_outThread = Task.Factory.StartNew(async () =>
_outTask = Task.Factory.StartNew(async () =>
await ConsumeOutboundEventsAsync().ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
TaskScheduler.Default);

_inThread = Task.Factory.StartNew(async () =>
_inTask = Task.Factory.StartNew(async () =>
await ConsumeInboundEventsAsync(maxOut, BufferOptions.OutboundBufferMaxLifetime).ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness,
Expand Down Expand Up @@ -297,6 +297,7 @@ private async Task ExportBufferAsync(ArraySegment<TEvent> items, IOutboundBuffer
private async Task ConsumeInboundEventsAsync(int maxQueuedMessages, TimeSpan maxInterval)
{
_callbacks.InboundChannelStartedCallback?.Invoke();

while (await InboundBuffer.WaitToReadAsync(InChannel.Reader).ConfigureAwait(false))
{
if (TokenSource.Token.IsCancellationRequested) break;
Expand All @@ -310,21 +311,26 @@ private async Task ConsumeInboundEventsAsync(int maxQueuedMessages, TimeSpan max
break;
}

if (InboundBuffer.NoThresholdsHit) continue;
if (InboundBuffer.ThresholdsHit)
await FlushBufferAsync().ConfigureAwait(false);
}

// It's possible to break out of the above while loop before a threshold was met to flush the buffer.
// This ensures we flush if there are any items left in the inbound buffer.
if (InboundBuffer.Count > 0)
await FlushBufferAsync().ConfigureAwait(false);

OutChannel.Writer.TryComplete();

async Task FlushBufferAsync()
{
var outboundBuffer = new OutboundBuffer<TEvent>(InboundBuffer);

if (await PublishAsync(outboundBuffer).ConfigureAwait(false))
_callbacks.PublishToOutboundChannelCallback?.Invoke();
else
_callbacks.PublishToOutboundChannelFailureCallback?.Invoke();
}

#if DEBUG
Console.WriteLine("Exiting consume inbound loop.");
#endif

OutChannel.Writer.TryComplete();
}

private ValueTask<bool> PublishAsync(IOutboundBuffer<TEvent> buffer)
Expand Down Expand Up @@ -366,15 +372,15 @@ public virtual void Dispose()
}
try
{
_inThread.Dispose();
_inTask.Dispose();
}
catch
{
// ignored
}
try
{
_outThread.Dispose();
_outTask.Dispose();
}
catch
{
Expand Down
3 changes: 3 additions & 0 deletions src/Elastic.Channels/Buffers/InboundBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ internal class InboundBuffer<TEvent> : IWriteTrackingBuffer, IDisposable
public int Count => _count;
public TimeSpan? DurationSinceFirstWrite => DateTimeOffset.UtcNow - TimeOfFirstWrite;
public TimeSpan? DurationSinceFirstWaitToRead => DateTimeOffset.UtcNow - TimeOfFirstWaitToRead;

public bool NoThresholdsHit => Count == 0
|| (Count < _maxBufferSize && DurationSinceFirstWaitToRead <= _forceFlushAfter);

public bool ThresholdsHit => !NoThresholdsHit;

public InboundBuffer(int maxBufferSize, TimeSpan forceFlushAfter)
{
_maxBufferSize = maxBufferSize;
Expand Down

0 comments on commit f93c967

Please sign in to comment.