Skip to content

Commit

Permalink
Custom task scheduler for serialization, fix batch size calc (#194)
Browse files Browse the repository at this point in the history
* Add a custom task scheduler

* Better usage, don't wait to enqueue to save to channels

* Completely pre-cal batch size to avoid spinning issues

* Try to fix cache counting

* properly dispose things

* format

* clean up

* adjust count and save on current thread

* move batch it's own file

* update a few packages

* fix build and add batch tests
  • Loading branch information
adamhathcock authored Jan 3, 2025
1 parent a1b9030 commit 1fe1a54
Show file tree
Hide file tree
Showing 25 changed files with 274 additions and 164 deletions.
7 changes: 3 additions & 4 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<PackageVersion Include="Microsoft.CSharp" Version="4.7.0" />
<!-- Keep at exactly 7.0.5 for side by side with V2 -->
<PackageVersion Include="Microsoft.Data.Sqlite" Version="7.0.5" />
<PackageVersion Include="Microsoft.Extensions.ObjectPool" Version="8.0.10" />
<PackageVersion Include="Microsoft.Extensions.ObjectPool" Version="8.0.11" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.2.0" />
Expand All @@ -19,17 +19,16 @@
<PackageVersion Include="NUnit3TestAdapter" Version="4.6.0" />
<PackageVersion Include="NUnit" Version="4.2.2" />
<PackageVersion Include="NUnit.Analyzers" Version="4.2.0" />
<PackageVersion Include="Open.ChannelExtensions" Version="8.5.0" />
<PackageVersion Include="Open.ChannelExtensions" Version="8.6.0" />
<PackageVersion Include="Polly" Version="7.2.3" />
<PackageVersion Include="Polly.Contrib.WaitAndRetry" Version="1.1.1" />
<PackageVersion Include="Polly.Extensions.Http" Version="3.0.0" />
<PackageVersion Include="Shouldly" Version="4.2.1" />
<PackageVersion Include="Speckle.Newtonsoft.Json" Version="13.0.2" />
<PackageVersion Include="Speckle.DoubleNumerics" Version="4.0.1" />
<PackageVersion Include="SimpleExec" Version="12.0.0" />
<PackageVersion Include="Speckle.Objects" Version="3.1.0-dev.109" />
<PackageVersion Include="System.Threading.Channels" Version="8.0.0" />
<GlobalPackageReference Include="PolySharp" Version="1.14.1" />
<GlobalPackageReference Include="PolySharp" Version="1.15.0" />
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<GlobalPackageReference Include="GitVersion.MsBuild" Version="5.12.0" />
<GlobalPackageReference Include="Speckle.InterfaceGenerator" Version="0.9.6" />
Expand Down
6 changes: 3 additions & 3 deletions build/packages.lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
},
"PolySharp": {
"type": "Direct",
"requested": "[1.14.1, )",
"resolved": "1.14.1",
"contentHash": "mOOmFYwad3MIOL14VCjj02LljyF1GNw1wP0YVlxtcPvqdxjGGMNdNJJxHptlry3MOd8b40Flm8RPOM8JOlN2sQ=="
"requested": "[1.15.0, )",
"resolved": "1.15.0",
"contentHash": "FbU0El+EEjdpuIX4iDbeS7ki1uzpJPx8vbqOzEtqnl1GZeAGJfq+jCbxeJL2y0EPnUNk8dRnnqR2xnYXg9Tf+g=="
},
"SimpleExec": {
"type": "Direct",
Expand Down
12 changes: 6 additions & 6 deletions src/Speckle.Objects/packages.lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
},
"PolySharp": {
"type": "Direct",
"requested": "[1.14.1, )",
"resolved": "1.14.1",
"contentHash": "mOOmFYwad3MIOL14VCjj02LljyF1GNw1wP0YVlxtcPvqdxjGGMNdNJJxHptlry3MOd8b40Flm8RPOM8JOlN2sQ=="
"requested": "[1.15.0, )",
"resolved": "1.15.0",
"contentHash": "FbU0El+EEjdpuIX4iDbeS7ki1uzpJPx8vbqOzEtqnl1GZeAGJfq+jCbxeJL2y0EPnUNk8dRnnqR2xnYXg9Tf+g=="
},
"Speckle.InterfaceGenerator": {
"type": "Direct",
Expand Down Expand Up @@ -333,9 +333,9 @@
},
"PolySharp": {
"type": "Direct",
"requested": "[1.14.1, )",
"resolved": "1.14.1",
"contentHash": "mOOmFYwad3MIOL14VCjj02LljyF1GNw1wP0YVlxtcPvqdxjGGMNdNJJxHptlry3MOd8b40Flm8RPOM8JOlN2sQ=="
"requested": "[1.15.0, )",
"resolved": "1.15.0",
"contentHash": "FbU0El+EEjdpuIX4iDbeS7ki1uzpJPx8vbqOzEtqnl1GZeAGJfq+jCbxeJL2y0EPnUNk8dRnnqR2xnYXg9Tf+g=="
},
"Speckle.InterfaceGenerator": {
"type": "Direct",
Expand Down
19 changes: 18 additions & 1 deletion src/Speckle.Sdk.Dependencies/Pools.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Text;
using System.Collections.Concurrent;
using System.Text;
using Microsoft.Extensions.ObjectPool;

namespace Speckle.Sdk.Dependencies;
Expand Down Expand Up @@ -33,6 +34,19 @@ public bool Return(Dictionary<TKey, TValue> obj)
}
}

private sealed class ObjectConcurrentDictionaryPolicy<TKey, TValue>
: IPooledObjectPolicy<ConcurrentDictionary<TKey, TValue>>
where TKey : notnull
{
public ConcurrentDictionary<TKey, TValue> Create() => new(Environment.ProcessorCount, 50);

public bool Return(ConcurrentDictionary<TKey, TValue> obj)
{
obj.Clear();
return true;
}
}

private sealed class ObjectListPolicy<T> : IPooledObjectPolicy<List<T>>
{
public List<T> Create() => new(50);
Expand All @@ -48,4 +62,7 @@ public bool Return(List<T> obj)

public static Pool<Dictionary<TKey, TValue>> CreateDictionaryPool<TKey, TValue>()
where TKey : notnull => new(new ObjectDictionaryPolicy<TKey, TValue>());

public static Pool<ConcurrentDictionary<TKey, TValue>> CreateConcurrentDictionaryPool<TKey, TValue>()
where TKey : notnull => new(new ObjectConcurrentDictionaryPolicy<TKey, TValue>());
}
25 changes: 25 additions & 0 deletions src/Speckle.Sdk.Dependencies/Serialization/Batch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace Speckle.Sdk.Serialisation.V2.Send;

public class Batch<T>(int capacity) : IHasSize
where T : IHasSize
{
#pragma warning disable IDE0032
private readonly List<T> _items = new(capacity);
private int _batchSize;
#pragma warning restore IDE0032

public void Add(T item)
{
_items.Add(item);
_batchSize += item.Size;
}

public void TrimExcess()
{
_items.TrimExcess();
_batchSize = _items.Sum(x => x.Size);
}

public int Size => _batchSize;
public List<T> Items => _items;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Speckle.Sdk.Serialisation.V2.Send;

public static class ChannelExtensions
{
public static BatchingChannelReader<T, List<T>> BatchBySize<T>(
public static BatchingChannelReader<T, Batch<T>> BatchBySize<T>(
this ChannelReader<T> source,
int batchSize,
bool singleReader = false,
Expand Down
82 changes: 26 additions & 56 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@ namespace Speckle.Sdk.Dependencies.Serialization;
public abstract class ChannelSaver<T>
where T : IHasSize
{
private const int SEND_CAPACITY = 50;
private const int SEND_CAPACITY = 500;
private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
private const int MAX_PARALLELISM_HTTP = 4;
private const int HTTP_CAPACITY = 50;
private const int MAX_CACHE_WRITE_PARALLELISM = 1;
private const int HTTP_CAPACITY = 500;
private const int MAX_CACHE_WRITE_PARALLELISM = 4;
private const int MAX_CACHE_BATCH = 200;

private bool _enabled;

private readonly Channel<T> _checkCacheChannel = Channel.CreateBounded<T>(
new BoundedChannelOptions(SEND_CAPACITY)
{
Expand All @@ -29,59 +27,31 @@ public abstract class ChannelSaver<T>
_ => throw new NotImplementedException("Dropping items not supported.")
);

public Task<long> Start(
bool enableServerSending = true,
bool enableCacheSaving = true,
CancellationToken cancellationToken = default
)
{
ValueTask<long> t = new(Task.FromResult(0L));
if (enableServerSending)
{
_enabled = true;
var tChannelReader = _checkCacheChannel
.Reader.BatchBySize(HTTP_SEND_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
MAX_PARALLELISM_HTTP,
async x => await SendToServer(x, cancellationToken).ConfigureAwait(false),
HTTP_CAPACITY,
false,
cancellationToken
);
if (enableCacheSaving)
{
t = new(
tChannelReader
.Join()
.Batch(MAX_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
);
}
else
{
t = tChannelReader.ReadUntilCancelledAsync(cancellationToken, (list, l) => new ValueTask());
}
}

return t.AsTask();
}

public async ValueTask Save(T item, CancellationToken cancellationToken = default)
{
if (_enabled)
{
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
}
}

public abstract Task<List<T>> SendToServer(List<T> batch, CancellationToken cancellationToken);

public ValueTask Done()
public Task Start(CancellationToken cancellationToken = default) =>
_checkCacheChannel
.Reader.BatchBySize(HTTP_SEND_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
MAX_PARALLELISM_HTTP,
async x => await SendToServer(x, cancellationToken).ConfigureAwait(false),
HTTP_CAPACITY,
false,
cancellationToken
)
.Join()
.Batch(MAX_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken);

public ValueTask Save(T item, CancellationToken cancellationToken = default) =>
_checkCacheChannel.Writer.WriteAsync(item, cancellationToken);

public abstract Task<List<T>> SendToServer(Batch<T> batch, CancellationToken cancellationToken);

public Task Done()
{
_checkCacheChannel.Writer.Complete();
return new(Task.CompletedTask);
return Task.CompletedTask;
}

public abstract void SaveToCache(List<T> item);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,20 @@ public class SizeBatchingChannelReader<T>(
int batchSize,
bool singleReader,
bool syncCont = false
) : BatchingChannelReader<T, List<T>>(source, batchSize, singleReader, syncCont)
) : BatchingChannelReader<T, Batch<T>>(x => new(x), source, batchSize, singleReader, syncCont)
where T : IHasSize
{
private readonly int _batchSize = batchSize;
protected override Batch<T> CreateBatch(int capacity) => new(capacity);

protected override List<T> CreateBatch(int capacity) => new();

protected override void TrimBatch(List<T> batch) => batch.TrimExcess();

protected override void AddBatchItem(List<T> batch, T item) => batch.Add(item);

protected override int GetBatchSize(List<T> batch)
protected override void TrimBatch(ref Batch<T> batch, bool isVerifiedFull)
{
int size = 0;
foreach (T item in batch)
{
size += item.Size;
}

if (size >= _batchSize)
if (!isVerifiedFull)
{
return _batchSize;
batch.TrimExcess();
}
return size;
}

protected override void AddBatchItem(Batch<T> batch, T item) => batch.Add(item);

protected override int GetBatchSize(Batch<T> batch) => batch.Size;
}
36 changes: 18 additions & 18 deletions src/Speckle.Sdk.Dependencies/packages.lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
},
"Microsoft.Extensions.ObjectPool": {
"type": "Direct",
"requested": "[8.0.10, )",
"resolved": "8.0.10",
"contentHash": "u7gAG7JgxF8VSJUGPSudAcPxOt+ymJKQCSxNRxiuKV+klCQbHljQR75SilpedCTfhPWDhtUwIJpnDVtspr9nMg=="
"requested": "[8.0.11, )",
"resolved": "8.0.11",
"contentHash": "6ApKcHNJigXBfZa6XlDQ8feJpq7SG1ogZXg6M4FiNzgd6irs3LUAzo0Pfn4F2ZI9liGnH1XIBR/OtSbZmJAV5w=="
},
"Microsoft.SourceLink.GitHub": {
"type": "Direct",
Expand All @@ -44,9 +44,9 @@
},
"Open.ChannelExtensions": {
"type": "Direct",
"requested": "[8.5.0, )",
"resolved": "8.5.0",
"contentHash": "dKD2iNfUYw+aOvwM2vCnD+q6JCtHiabkufKM1GateedRzcgv0RrtA4MoJI+7Y8N21R5A+wUA+j6P88g6mXPavA==",
"requested": "[8.6.0, )",
"resolved": "8.6.0",
"contentHash": "g5axz417bA6FXifJaBlB0l62gV7dYmknXx0n8lT/LSA3+7isaGMsOjJp5J+H/yXDRe4r+KZrE+bzQcs4Ets2kA==",
"dependencies": {
"Microsoft.Bcl.AsyncInterfaces": "8.0.0",
"System.Collections.Immutable": "8.0.0",
Expand Down Expand Up @@ -76,9 +76,9 @@
},
"PolySharp": {
"type": "Direct",
"requested": "[1.14.1, )",
"resolved": "1.14.1",
"contentHash": "mOOmFYwad3MIOL14VCjj02LljyF1GNw1wP0YVlxtcPvqdxjGGMNdNJJxHptlry3MOd8b40Flm8RPOM8JOlN2sQ=="
"requested": "[1.15.0, )",
"resolved": "1.15.0",
"contentHash": "FbU0El+EEjdpuIX4iDbeS7ki1uzpJPx8vbqOzEtqnl1GZeAGJfq+jCbxeJL2y0EPnUNk8dRnnqR2xnYXg9Tf+g=="
},
"Speckle.InterfaceGenerator": {
"type": "Direct",
Expand Down Expand Up @@ -185,9 +185,9 @@
},
"Microsoft.Extensions.ObjectPool": {
"type": "Direct",
"requested": "[8.0.10, )",
"resolved": "8.0.10",
"contentHash": "u7gAG7JgxF8VSJUGPSudAcPxOt+ymJKQCSxNRxiuKV+klCQbHljQR75SilpedCTfhPWDhtUwIJpnDVtspr9nMg=="
"requested": "[8.0.11, )",
"resolved": "8.0.11",
"contentHash": "6ApKcHNJigXBfZa6XlDQ8feJpq7SG1ogZXg6M4FiNzgd6irs3LUAzo0Pfn4F2ZI9liGnH1XIBR/OtSbZmJAV5w=="
},
"Microsoft.SourceLink.GitHub": {
"type": "Direct",
Expand All @@ -201,9 +201,9 @@
},
"Open.ChannelExtensions": {
"type": "Direct",
"requested": "[8.5.0, )",
"resolved": "8.5.0",
"contentHash": "dKD2iNfUYw+aOvwM2vCnD+q6JCtHiabkufKM1GateedRzcgv0RrtA4MoJI+7Y8N21R5A+wUA+j6P88g6mXPavA=="
"requested": "[8.6.0, )",
"resolved": "8.6.0",
"contentHash": "g5axz417bA6FXifJaBlB0l62gV7dYmknXx0n8lT/LSA3+7isaGMsOjJp5J+H/yXDRe4r+KZrE+bzQcs4Ets2kA=="
},
"Polly": {
"type": "Direct",
Expand All @@ -228,9 +228,9 @@
},
"PolySharp": {
"type": "Direct",
"requested": "[1.14.1, )",
"resolved": "1.14.1",
"contentHash": "mOOmFYwad3MIOL14VCjj02LljyF1GNw1wP0YVlxtcPvqdxjGGMNdNJJxHptlry3MOd8b40Flm8RPOM8JOlN2sQ=="
"requested": "[1.15.0, )",
"resolved": "1.15.0",
"contentHash": "FbU0El+EEjdpuIX4iDbeS7ki1uzpJPx8vbqOzEtqnl1GZeAGJfq+jCbxeJL2y0EPnUNk8dRnnqR2xnYXg9Tf+g=="
},
"Speckle.InterfaceGenerator": {
"type": "Direct",
Expand Down
7 changes: 6 additions & 1 deletion src/Speckle.Sdk/Api/Operations/Operations.Send.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ public async Task<SerializeProcessResults> Send2(

try
{
var process = serializeProcessFactory.CreateSerializeProcess(url, streamId, authorizationToken, onProgressAction);
using var process = serializeProcessFactory.CreateSerializeProcess(
url,
streamId,
authorizationToken,
onProgressAction
);
var results = await process.Serialize(value, cancellationToken).ConfigureAwait(false);

receiveActivity?.SetStatus(SdkActivityStatusCode.Ok);
Expand Down
Loading

0 comments on commit 1fe1a54

Please sign in to comment.