Skip to content

Commit

Permalink
Allow consumers to choose BoundedChannelFullMode (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz authored Apr 10, 2024
1 parent 78cef91 commit a05bf2c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 4 deletions.
10 changes: 8 additions & 2 deletions examples/playground/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information

using System.Text.Json.Serialization;
using System.Threading.Channels;
using Elastic.Channels;
using Elastic.Elasticsearch.Ephemeral;
using Elastic.Ingest.Elasticsearch;
Expand All @@ -13,7 +14,7 @@
var ctxs = new CancellationTokenSource();
var parallelOpts = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token };
const int numDocs = 1_000_000;
var bufferOptions = new BufferOptions { InboundBufferMaxSize = numDocs, OutboundBufferMaxSize = 10_000 };
var bufferOptions = new BufferOptions { InboundBufferMaxSize = 1000, OutboundBufferMaxSize = 100, ExportMaxConcurrency = 1, BoundedChannelFullMode = BoundedChannelFullMode.Wait };
var config = new EphemeralClusterConfiguration("8.13.0");
using var cluster = new EphemeralCluster(config);
using var channel = SetupElasticsearchChannel();
Expand Down Expand Up @@ -48,10 +49,15 @@ async Task PushToChannel(DataStreamChannel<EcsDocument> c)
if (c == null) throw new ArgumentNullException(nameof(c));

await c.BootstrapElasticsearchAsync(BootstrapMethod.Failure);

foreach (var i in Enumerable.Range(0, numDocs))
await DoChannelWrite(i, ctxs.Token);

/*
await Parallel.ForEachAsync(Enumerable.Range(0, numDocs), parallelOpts, async (i, ctx) =>
{
await DoChannelWrite(i, ctx);
});
});*/

async Task DoChannelWrite(int i, CancellationToken cancellationToken)
{
Expand Down
10 changes: 10 additions & 0 deletions src/Elastic.Channels/BufferOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System;
using System.Threading;
using System.Threading.Channels;

namespace Elastic.Channels;

Expand Down Expand Up @@ -62,4 +63,13 @@ public class BufferOptions
/// Allows you to inject a <see cref="CountdownEvent"/> to wait for N number of buffers to flush.
/// </summary>
public CountdownEvent? WaitHandle { get; set; }

/// <summary>
/// <inheritdoc cref="BoundedChannelFullMode" path="summary" />
/// <para>Defaults to <see cref="BoundedChannelFullMode.Wait"/>, this will use more memory as overproducing will need to wait to enqueue data</para>
/// <para>Use <see cref="BoundedChannelFullMode.DropWrite"/> to minimize memory consumption at the expense of more likely to drop data</para>
/// <para>You might need to tweak <see cref="InboundBufferMaxSize"/> and <see cref="OutboundBufferMaxSize"/> to ensure sufficient allocations are available </para>
/// <para>The defaults for both <see cref="InboundBufferMaxSize"/> adn <see cref="OutboundBufferMaxSize"/> are quite liberal already though.</para>
/// </summary>
public BoundedChannelFullMode BoundedChannelFullMode { get; set; } = BoundedChannelFullMode.Wait;
}
4 changes: 2 additions & 2 deletions src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallb
AllowSynchronousContinuations = true,
// wait does not block it simply signals that Writer.TryWrite should return false and be retried
// DropWrite will make `TryWrite` always return true, which is not what we want.
FullMode = BoundedChannelFullMode.Wait
FullMode = options.BufferOptions.BoundedChannelFullMode
});
OutChannel = Channel.CreateBounded<IOutboundBuffer<TEvent>>(
new BoundedChannelOptions(maxOut)
new BoundedChannelOptions(_maxConcurrency * 2)
{
SingleReader = false,
SingleWriter = true,
Expand Down

0 comments on commit a05bf2c

Please sign in to comment.