Skip to content

Commit

Permalink
Batch by size, Closures and Detached items are computed differently (#…
Browse files Browse the repository at this point in the history
…164)

* Can debug dependencies

* Different exceptions

* Uses root id only after we found it to signal the end

* DataChunks are created later and need to be accounted for

* format

* use app ids in tests and references

* check sqlite cache after serialize

* use dummy to go through channels to end

* fmt

* Extend channel lib to batch by size

* fmt

* build fix

* adjust limits

* FIx sending

* Optimize reference generation

* more

* remove tolist

* rework closures to be constant and serializer only deals with current....references bases are cached

* fix chunk creation

* another bug fix

* clean up with factories

* add deserializer factory

* Needed to reference interface

* move around streamId

* some clean up

* Use StringBuilder pool on serialization to reduce memory pressure

* remove extra

* remove extra clears

* Fix a flaw in batchsize

* use default complete

* format

* loader should use 1 writer that is batched

* remove redundant ref gen

* Fix graphql commands by adding project id
  • Loading branch information
adamhathcock authored Nov 12, 2024
1 parent 43445bc commit 715bb72
Show file tree
Hide file tree
Showing 33 changed files with 396 additions and 327 deletions.
17 changes: 4 additions & 13 deletions src/Speckle.Sdk.Dependencies/Pools.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.ObjectPool;
using System.Text;
using Microsoft.Extensions.ObjectPool;

namespace Speckle.Sdk.Dependencies;

Expand All @@ -17,16 +18,6 @@ public bool Return(Dictionary<string, object?> obj)
}
}

public static Pool<List<string>> ListString { get; } = new(new ListStringPolicy());

private sealed class ListStringPolicy : IPooledObjectPolicy<List<string>>
{
public List<string> Create() => new(20);

public bool Return(List<string> obj)
{
obj.Clear();
return true;
}
}
public static Pool<StringBuilder> StringBuilders { get; } =
new(new StringBuilderPooledObjectPolicy() { MaximumRetainedCapacity = 100 * 1024 * 1024 });
}
21 changes: 21 additions & 0 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Dependencies.Serialization;

namespace Speckle.Sdk.Serialisation.V2.Send;

public static class ChannelExtensions
{
public static BatchingChannelReader<BaseItem, List<BaseItem>> BatchBySize(
this ChannelReader<BaseItem> source,
int batchSize,
bool singleReader = false,
bool allowSynchronousContinuations = false
) =>
new SizeBatchingChannelReader(
source ?? throw new ArgumentNullException(nameof(source)),
batchSize,
singleReader,
allowSynchronousContinuations
);
}
12 changes: 8 additions & 4 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,27 @@ public abstract class ChannelLoader
private const int HTTP_GET_CHUNK_SIZE = 500;
private const int MAX_PARALLELISM_HTTP = 4;
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
private static readonly int MAX_CACHE_PARALLELISM = Environment.ProcessorCount;
private static readonly int MAX_READ_CACHE_PARALLELISM = Environment.ProcessorCount;
private const int MAX_SAVE_CACHE_BATCH = 200;
private const int MAX_SAVE_CACHE_PARALLELISM = 1;

protected async Task GetAndCache(IEnumerable<string> allChildrenIds, CancellationToken cancellationToken = default) =>
await allChildrenIds
.ToChannel(cancellationToken: cancellationToken)
.Pipe(MAX_CACHE_PARALLELISM, CheckCache, cancellationToken: cancellationToken)
.Pipe(MAX_READ_CACHE_PARALLELISM, CheckCache, cancellationToken: cancellationToken)
.Filter(x => x is not null)
.Batch(HTTP_GET_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(MAX_PARALLELISM_HTTP, async x => await Download(x).ConfigureAwait(false), -1, false, cancellationToken)
.Join()
.ReadAllConcurrently(MAX_CACHE_PARALLELISM, SaveToCache, cancellationToken)
.Batch(MAX_SAVE_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_SAVE_CACHE_PARALLELISM, SaveToCache, cancellationToken)
.ConfigureAwait(false);

public abstract string? CheckCache(string id);

public abstract Task<List<BaseItem>> Download(List<string?> ids);

public abstract void SaveToCache(BaseItem x);
public abstract void SaveToCache(List<BaseItem> x);
}
63 changes: 17 additions & 46 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs
Original file line number Diff line number Diff line change
@@ -1,82 +1,53 @@
using System.Threading.Channels;
using System.Text;
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Serialisation.V2.Send;

namespace Speckle.Sdk.Dependencies.Serialization;

public readonly record struct BaseItem(string Id, string Json, bool NeedsStorage);
public readonly record struct BaseItem(string Id, string Json, bool NeedsStorage)
{
public int Size { get; } = Encoding.UTF8.GetByteCount(Json);
}

public abstract class ChannelSaver
{
private const int HTTP_SEND_CHUNK_SIZE = 500;
private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
private const int MAX_PARALLELISM_HTTP = 4;
private const int MAX_CACHE_WRITE_PARALLELISM = 1;
private const int MAX_CACHE_BATCH = 100;
private const string DUMMY = "dummy";
private const int MAX_CACHE_BATCH = 200;

private readonly Channel<BaseItem> _checkCacheChannel = Channel.CreateUnbounded<BaseItem>();

public Task Start(string streamId, CancellationToken cancellationToken = default)
public Task Start(CancellationToken cancellationToken = default)
{
var t = _checkCacheChannel
.Reader.Batch(HTTP_SEND_CHUNK_SIZE)
.Reader.BatchBySize(HTTP_SEND_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
MAX_PARALLELISM_HTTP,
async x => await SendToServerInternal(streamId, x, cancellationToken).ConfigureAwait(false),
async x => await SendToServer(x, cancellationToken).ConfigureAwait(false),
-1,
false,
cancellationToken
)
.Join()
.Batch(MAX_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCacheInternal, cancellationToken);
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken);
return t;
}

public async Task Save(BaseItem item, CancellationToken cancellationToken = default) =>
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);

private async Task<List<BaseItem>> SendToServerInternal(
string streamId,
List<BaseItem> batch,
CancellationToken cancellationToken = default
)
{
var ending = batch.Select(x => x.Id).Contains(DUMMY);
if (ending)
{
batch.RemoveAll(x => x.Id == DUMMY);
}
var results = await SendToServer(streamId, batch, cancellationToken).ConfigureAwait(false);
if (ending)
{
results.Add(new BaseItem(DUMMY, DUMMY, false));
}
return results;
}

public abstract Task<List<BaseItem>> SendToServer(
string streamId,
List<BaseItem> batch,
CancellationToken cancellationToken
);

public async Task Done() => await Save(new BaseItem(DUMMY, DUMMY, false)).ConfigureAwait(false);
public abstract Task<List<BaseItem>> SendToServer(List<BaseItem> batch, CancellationToken cancellationToken);

private void SaveToCacheInternal(List<BaseItem> batch)
public Task Done()
{
var ending = batch.Select(x => x.Id).Contains(DUMMY);
if (ending)
{
batch.RemoveAll(x => x.Id == DUMMY);
}
SaveToCache(batch);
if (ending)
{
_checkCacheChannel.Writer.Complete();
}
_checkCacheChannel.Writer.Complete();
return Task.CompletedTask;
}

public abstract void SaveToCache(List<BaseItem> item);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Dependencies.Serialization;

namespace Speckle.Sdk.Serialisation.V2.Send;

public class SizeBatchingChannelReader(
ChannelReader<BaseItem> source,
int batchSize,
bool singleReader,
bool syncCont = false
) : BatchingChannelReader<BaseItem, List<BaseItem>>(source, batchSize, singleReader, syncCont)
{
private readonly int _batchSize = batchSize;

protected override List<BaseItem> CreateBatch(int capacity) => new();

protected override void TrimBatch(List<BaseItem> batch) => batch.TrimExcess();

protected override void AddBatchItem(List<BaseItem> batch, BaseItem item) => batch.Add(item);

protected override int GetBatchSize(List<BaseItem> batch)
{
int size = 0;
foreach (BaseItem item in batch)
{
size += item.Size;
}

if (size >= _batchSize)
{
return _batchSize;
}
return size;
}
}
6 changes: 3 additions & 3 deletions src/Speckle.Sdk/Api/GraphQL/Inputs/VersionInputs.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
namespace Speckle.Sdk.Api.GraphQL.Inputs;

public sealed record UpdateVersionInput(string versionId, string? message);
public sealed record UpdateVersionInput(string versionId, string projectId, string? message);

public sealed record MoveVersionsInput(string targetModelName, IReadOnlyList<string> versionIds);
public sealed record MoveVersionsInput(string projectId, string targetModelName, IReadOnlyList<string> versionIds);

public sealed record DeleteVersionsInput(IReadOnlyList<string> versionIds);
public sealed record DeleteVersionsInput(IReadOnlyList<string> versionIds, string projectId);

public sealed record CreateVersionInput(
string objectId,
Expand Down
12 changes: 6 additions & 6 deletions src/Speckle.Sdk/Api/Operations/Operations.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
using Speckle.Sdk.Logging;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation;
using Speckle.Sdk.Serialisation.V2;
using Speckle.Sdk.Serialisation.V2.Receive;
using Speckle.Sdk.Transports;

namespace Speckle.Sdk.Api;
Expand All @@ -26,10 +24,12 @@ public async Task<Base> Receive2(

try
{
var sqliteTransport = new SQLiteReceiveCacheManager(streamId);
var serverObjects = new ServerObjectManager(speckleHttp, activityFactory, url, authorizationToken);
var o = new ObjectLoader(sqliteTransport, serverObjects, streamId, onProgressAction);
var process = new DeserializeProcess(onProgressAction, o);
var process = serializeProcessFactory.CreateDeserializeProcess(
url,
streamId,
authorizationToken,
onProgressAction
);
var result = await process.Deserialize(objectId, cancellationToken).ConfigureAwait(false);
receiveActivity?.SetStatus(SdkActivityStatusCode.Ok);
return result;
Expand Down
16 changes: 2 additions & 14 deletions src/Speckle.Sdk/Api/Operations/Operations.Send.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
using Speckle.Sdk.Logging;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation;
using Speckle.Sdk.Serialisation.V2;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.Transports;

namespace Speckle.Sdk.Api;
Expand All @@ -26,18 +24,8 @@ public partial class Operations

try
{
var sqliteTransport = new SQLiteSendCacheManager(streamId);
var serverObjects = new ServerObjectManager(speckleHttp, activityFactory, url, authorizationToken);
var process = new SerializeProcess(
onProgressAction,
sqliteTransport,
serverObjects,
speckleBaseChildFinder,
speckleBasePropertyGatherer
);
var (rootObjId, convertedReferences) = await process
.Serialize(streamId, value, cancellationToken)
.ConfigureAwait(false);
var process = serializeProcessFactory.CreateSerializeProcess(url, streamId, authorizationToken, onProgressAction);
var (rootObjId, convertedReferences) = await process.Serialize(value, cancellationToken).ConfigureAwait(false);

receiveActivity?.SetStatus(SdkActivityStatusCode.Ok);
return new(rootObjId, convertedReferences);
Expand Down
7 changes: 2 additions & 5 deletions src/Speckle.Sdk/Api/Operations/Operations.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
using Microsoft.Extensions.Logging;
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Helpers;
using Speckle.Sdk.Logging;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.Serialisation.V2;

namespace Speckle.Sdk.Api;

Expand All @@ -14,9 +13,7 @@ namespace Speckle.Sdk.Api;
[GenerateAutoInterface]
public partial class Operations(
ILogger<Operations> logger,
ISpeckleHttp speckleHttp,
ISdkActivityFactory activityFactory,
ISdkMetricsFactory metricsFactory,
ISpeckleBaseChildFinder speckleBaseChildFinder,
ISpeckleBasePropertyGatherer speckleBasePropertyGatherer
ISerializeProcessFactory serializeProcessFactory
) : IOperations;
9 changes: 7 additions & 2 deletions src/Speckle.Sdk/Helpers/SerializerIdWriter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Speckle.Newtonsoft.Json;
using System.Text;
using Speckle.Newtonsoft.Json;
using Speckle.Sdk.Dependencies;
using Speckle.Sdk.Serialisation;

namespace Speckle.Sdk.Helpers;
Expand All @@ -9,12 +11,14 @@ public sealed class SerializerIdWriter : JsonWriter
#pragma warning disable CA2213
private readonly JsonWriter _jsonIdWriter;
private readonly StringWriter _idWriter;
private readonly StringBuilder _stringBuilder;
#pragma warning restore CA2213

public SerializerIdWriter(JsonWriter jsonWriter)
{
_jsonWriter = jsonWriter;
_idWriter = new StringWriter();
_stringBuilder = Pools.StringBuilders.Get();
_idWriter = new StringWriter(_stringBuilder);
_jsonIdWriter = SpeckleObjectSerializerPool.Instance.GetJsonTextWriter(_idWriter);
}

Expand All @@ -23,6 +27,7 @@ public SerializerIdWriter(JsonWriter jsonWriter)
_jsonIdWriter.WriteEndObject();
_jsonIdWriter.Flush();
var json = _idWriter.ToString();
Pools.StringBuilders.Return(_stringBuilder);
return (json, _jsonWriter);
}

Expand Down
3 changes: 3 additions & 0 deletions src/Speckle.Sdk/Serialisation/SpeckleObjectSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Speckle.DoubleNumerics;
using Speckle.Newtonsoft.Json;
using Speckle.Sdk.Common;
using Speckle.Sdk.Dependencies;
using Speckle.Sdk.Helpers;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation.Utilities;
Expand Down Expand Up @@ -249,10 +250,12 @@ private void SerializeProperty(
_parentClosures.Add(closure);
}

var stringBuilder = Pools.StringBuilders.Get();
using var writer = new StringWriter();
using var jsonWriter = SpeckleObjectSerializerPool.Instance.GetJsonTextWriter(writer);
string id = SerializeBaseObject(baseObj, jsonWriter, closure);
var json = writer.ToString();
Pools.StringBuilders.Return(stringBuilder);

if (computeClosures || inheritedDetachInfo.IsDetachable || baseObj is Blob)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public static class ReferenceGenerator
{
private const string REFERENCE_JSON_START = "{\"speckle_type\":\"reference\",\"referencedId\":\"";
private const string REFERENCE_JSON_END = "\",\"__closure\":null}";
private const string REFERENCE_JSON_END = "\",\"__closure\":null}"; //TODO: remove null but ID calculation changes

public static string CreateReference(string id) => REFERENCE_JSON_START + id + REFERENCE_JSON_END;
}
Loading

0 comments on commit 715bb72

Please sign in to comment.