Skip to content

Commit

Permalink
Serialize using a Channel (#146)
Browse files Browse the repository at this point in the history
* Use a stack channel for deserialization

* multi-threaded

* add object dictionary pool

* more pooling

* adjust sqlite transport

* format

* Optimize IsPropNameValid

* object loader first pass

* save test

* add cache pre check

* save better deserialize

* mostly works

* uses tasks but slower at end

* rework to make more sense

* add check to avoid multi-deserialize

* modify max parallelism

* async enqueuing of tasks

* switch to more asyncenumerable

* fmt

* fmt

* cleanup sqlite

* make ServerObjectManager

* revert change

* add ability to skip cache check

* cache json to know what is loaded

* testing

* clean up usage

* clean up and added new op

* Fix exception handling

* fixing progress

* remove codejam

* Hides ObjectPool dependency

* fmt

* Use the 1.0 BCL async to try to be more compatible

* rename to dependencies

* Move Polly to internal dependencies

* format

* remove more old references

* remove stackchannel

* fixes for registration

* remove console writeline

* add cache check shortcut for root object

* start refactoring send

* recevie2 benchmark

* add test for deserialize new

* use channels for sending

* test and fixes

* Use same asyncinterfaces as Dynamo.  Merge fixes

* clean up

* fix download object progress

* put back from bad merge

* intermediate commit: separating get child function from serializer

* send didn't error

* add channels

* Use net48, netstandard2.1 and net8

* remove collection special case

* have to make a tree of tasks even though it may serialize things twice

* pre-id changing during serialize

* need AsyncInterfaces for net48 :(

* options changes

* revert to netstandard2.0 and net8.0

* fix totals

* revert httpcontext changes

* format

* clean up

* active tasks works when accounting for id not being stable

* add id tests

* more fixes

* works

* format

* Convert to BaseItem and use single SQLite checks to avoid locks

* use locks and batch sqlite operations

* hook up and handle null ids

* remove unused parameter

* remove progress from serializer itself

* invert has objects call

* readd object references

* format

* fix tests

* remove active tasks check

* bug fix for json cache

* remove locks from sqlite

* General Send test

* add childclosures

* redo extract all to be enumerable

* group tests in projects

* caching json does matter

* cache checking should be managed by channels

* format

* Merge pull request #152 from specklesystems/new-json-test

Uses a new objects test in Revit for serialization tests

* add skip

* add new roundtrip test

* fix finish

* clean up tests

* check happens in serialize...don't do it twice

* better progress reporting

* fix progress reporting

* only use detached properties when children gathering

* move detached tests

* add detached tests

* fix merge

* Fix progress change

* fix more tests

---------

Co-authored-by: Jedd Morgan <[email protected]>
Co-authored-by: Claire Kuang <[email protected]>
  • Loading branch information
3 people authored Nov 5, 2024
1 parent 8a148b8 commit 4cc78c4
Show file tree
Hide file tree
Showing 44 changed files with 1,728 additions and 138 deletions.
26 changes: 11 additions & 15 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs
Original file line number Diff line number Diff line change
@@ -1,33 +1,29 @@
using Open.ChannelExtensions;

namespace Speckle.Sdk.Serialisation.V2.Receive;
namespace Speckle.Sdk.Dependencies.Serialization;

public abstract class ChannelLoader
{
private const int HTTP_ID_CHUNK_SIZE = 500;
private const int HTTP_GET_CHUNK_SIZE = 500;
private const int MAX_PARALLELISM_HTTP = 4;
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
private static readonly int MAX_CACHE_PARALLELISM = Environment.ProcessorCount;

protected async Task GetAndCache(IEnumerable<string> allChildrenIds, CancellationToken cancellationToken = default) =>
await allChildrenIds
.ToChannel(cancellationToken: cancellationToken)
.Pipe(Environment.ProcessorCount, CheckCache, cancellationToken: cancellationToken)
.Pipe(MAX_CACHE_PARALLELISM, CheckCache, cancellationToken: cancellationToken)
.Filter(x => x is not null)
.Batch(HTTP_ID_CHUNK_SIZE)
.WithTimeout(TimeSpan.FromSeconds(2))
.PipeAsync(
MAX_PARALLELISM_HTTP,
async x => await DownloadAndCache(x).ConfigureAwait(false),
-1,
false,
cancellationToken
)
.Batch(HTTP_GET_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(MAX_PARALLELISM_HTTP, async x => await Download(x).ConfigureAwait(false), -1, false, cancellationToken)
.Join()
.ReadAllConcurrently(Environment.ProcessorCount, SaveToCache, cancellationToken)
.ReadAllConcurrently(MAX_CACHE_PARALLELISM, SaveToCache, cancellationToken)
.ConfigureAwait(false);

public abstract string? CheckCache(string id);

public abstract Task<List<(string, string)>> DownloadAndCache(List<string?> ids);
public abstract Task<List<BaseItem>> Download(List<string?> ids);

public abstract void SaveToCache((string, string) x);
public abstract void SaveToCache(BaseItem x);
}
46 changes: 46 additions & 0 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Threading.Channels;
using Open.ChannelExtensions;

namespace Speckle.Sdk.Dependencies.Serialization;

public readonly record struct BaseItem(string Id, string Json, bool NeedsStorage);

public abstract class ChannelSaver
{
private const int HTTP_SEND_CHUNK_SIZE = 500;
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
private const int MAX_PARALLELISM_HTTP = 4;
private const int MAX_CACHE_WRITE_PARALLELISM = 1;
private const int MAX_CACHE_BATCH = 100;

private readonly Channel<BaseItem> _checkCacheChannel = Channel.CreateUnbounded<BaseItem>();

public Task Start(string streamId, CancellationToken cancellationToken = default) =>
_checkCacheChannel
.Reader.Batch(HTTP_SEND_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
MAX_PARALLELISM_HTTP,
async x => await SendToServer(streamId, x, cancellationToken).ConfigureAwait(false),
-1,
false,
cancellationToken
)
.Join()
.Batch(MAX_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken);

public async Task Save(BaseItem item, CancellationToken cancellationToken = default) =>
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);

public void Done() => _checkCacheChannel.Writer.TryComplete();

public abstract Task<List<BaseItem>> SendToServer(
string streamId,
List<BaseItem> batch,
CancellationToken cancellationToken
);

public abstract void SaveToCache(List<BaseItem> item);
}
2 changes: 1 addition & 1 deletion src/Speckle.Sdk/Api/Operations/Operations.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public async Task<Base> Receive2(

try
{
var sqliteTransport = new SQLiteCacheManager(streamId);
var sqliteTransport = new SQLiteReceiveCacheManager(streamId);
var serverObjects = new ServerObjectManager(speckleHttp, activityFactory, url, authorizationToken);
var o = new ObjectLoader(sqliteTransport, serverObjects, streamId, onProgressAction);
var process = new DeserializeProcess(onProgressAction, o);
Expand Down
40 changes: 40 additions & 0 deletions src/Speckle.Sdk/Api/Operations/Operations.Send.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,52 @@
using Speckle.Sdk.Logging;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation;
using Speckle.Sdk.Serialisation.V2;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.Transports;

namespace Speckle.Sdk.Api;

public partial class Operations
{
public async Task<(string rootObjId, IReadOnlyDictionary<string, ObjectReference> convertedReferences)> Send2(
Uri url,
string streamId,
string? authorizationToken,
Base value,
IProgress<ProgressArgs>? onProgressAction = null,
CancellationToken cancellationToken = default
)
{
using var receiveActivity = activityFactory.Start("Operations.Send");
metricsFactory.CreateCounter<long>("Send").Add(1);

try
{
var sqliteTransport = new SQLiteSendCacheManager(streamId);
var serverObjects = new ServerObjectManager(speckleHttp, activityFactory, url, authorizationToken);
var process = new SerializeProcess(
onProgressAction,
sqliteTransport,
serverObjects,
speckleBaseChildFinder,
speckleBasePropertyGatherer
);
var (rootObjId, convertedReferences) = await process
.Serialize(streamId, value, cancellationToken)
.ConfigureAwait(false);

receiveActivity?.SetStatus(SdkActivityStatusCode.Ok);
return new(rootObjId, convertedReferences);
}
catch (Exception ex)
{
receiveActivity?.SetStatus(SdkActivityStatusCode.Error);
receiveActivity?.RecordException(ex);
throw;
}
}

/// <summary>
/// Sends a Speckle Object to the provided <paramref name="transport"/> and (optionally) the default local cache
/// </summary>
Expand Down
5 changes: 4 additions & 1 deletion src/Speckle.Sdk/Api/Operations/Operations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Helpers;
using Speckle.Sdk.Logging;
using Speckle.Sdk.Serialisation.V2.Send;

namespace Speckle.Sdk.Api;

Expand All @@ -15,5 +16,7 @@ public partial class Operations(
ILogger<Operations> logger,
ISpeckleHttp speckleHttp,
ISdkActivityFactory activityFactory,
ISdkMetricsFactory metricsFactory
ISdkMetricsFactory metricsFactory,
ISpeckleBaseChildFinder speckleBaseChildFinder,
ISpeckleBasePropertyGatherer speckleBasePropertyGatherer
) : IOperations;
2 changes: 1 addition & 1 deletion src/Speckle.Sdk/Credentials/AccountManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ private async Task<TokenExchangeResponse> GetRefreshedToken(string refreshToken,
/// <param name="server">Server endpoint to get header</param>
/// <returns><see langword="true"/> if response contains FE2 header and the value was <see langword="true"/></returns>
/// <exception cref="SpeckleException">response contained FE2 header, but the value was <see langword="null"/>, empty, or not parseable to a <see cref="Boolean"/></exception>
/// <exception cref="HttpRequestException">Request to <paramref name="server"/> failed to send or response was not successful</exception>
/// <exception cref="System.Net.Http.HttpRequestException">Request to <paramref name="server"/> failed to send or response was not successful</exception>
private async Task<bool> IsFrontend2Server(Uri server)
{
using var httpClient = speckleHttp.CreateHttpClient();
Expand Down
2 changes: 2 additions & 0 deletions src/Speckle.Sdk/Credentials/AuthFlowException.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
namespace Speckle.Sdk.Credentials;

#pragma warning disable CA2237
public sealed class AuthFlowException : Exception
#pragma warning restore CA2237
{
public AuthFlowException(string? message, Exception? innerException)
: base(message, innerException) { }
Expand Down
2 changes: 1 addition & 1 deletion src/Speckle.Sdk/Helpers/Http.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class SpeckleHttp(ILogger<SpeckleHttp> logger, ISpeckleHttpClientHandlerF
/// Sends a <c>GET</c> request to the provided <paramref name="uri"/>
/// </summary>
/// <param name="uri">The URI that should be pinged</param>
/// <exception cref="HttpRequestException">Request to <paramref name="uri"/> failed</exception>
/// <exception cref="System.Net.Http.HttpRequestException">Request to <paramref name="uri"/> failed</exception>
public async Task<HttpResponseMessage> HttpPing(Uri uri)
{
try
Expand Down
19 changes: 19 additions & 0 deletions src/Speckle.Sdk/Serialisation/IdGenerator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Diagnostics.Contracts;
using Speckle.Sdk.Helpers;
using Speckle.Sdk.Models;

namespace Speckle.Sdk.Serialisation;

public static class IdGenerator
{
[Pure]
public static string ComputeId(string serialized)
{
#if NET6_0_OR_GREATER
string hash = Crypt.Sha256(serialized.AsSpan(), length: HashUtility.HASH_LENGTH);
#else
string hash = Crypt.Sha256(serialized, length: HashUtility.HASH_LENGTH);
#endif
return hash;
}
}
2 changes: 1 addition & 1 deletion src/Speckle.Sdk/Serialisation/SpeckleObjectDeserializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public async ValueTask<Base> DeserializeAsync([NotNull] string? rootObjectJson)
var closures = ClosureParser.GetClosures(reader);
if (closures.Any())
{
_total = closures.Select(x => x.Item1).Concat(_deserializedObjects.Keys).Distinct().Count();
_total = 0;
foreach (var closure in closures)
{
string objId = closure.Item1;
Expand Down
14 changes: 1 addition & 13 deletions src/Speckle.Sdk/Serialisation/SpeckleObjectSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Collections;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Drawing;
using System.Globalization;
using System.Reflection;
Expand Down Expand Up @@ -359,7 +358,7 @@ private string SerializeBaseObject(Base baseObj, JsonWriter writer, IReadOnlyDic
if (writer is SerializerIdWriter serializerIdWriter)
{
(var json, writer) = serializerIdWriter.FinishIdWriter();
id = ComputeId(json);
id = IdGenerator.ComputeId(json);
}
else
{
Expand Down Expand Up @@ -434,17 +433,6 @@ private void UpdateParentClosures(string objectId)
}
}

[Pure]
private static string ComputeId(string serialized)
{
#if NET6_0_OR_GREATER
string hash = Crypt.Sha256(serialized.AsSpan(), length: HashUtility.HASH_LENGTH);
#else
string hash = Crypt.Sha256(serialized, length: HashUtility.HASH_LENGTH);
#endif
return hash;
}

private void StoreObject(string objectId, string objectJson)
{
_stopwatch.Stop();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Data;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation.Utilities;
using Speckle.Sdk.Transports;
Expand Down Expand Up @@ -89,7 +90,7 @@ private async Task Traverse(string id, CancellationToken cancellationToken)
var json = objectLoader.LoadId(id);
if (json == null)
{
throw new InvalidOperationException();
throw new MissingPrimaryKeyException($"Missing object id in SQLite cache: {id}");
}
var childrenIds = ClosureParser.GetClosures(json).OrderByDescending(x => x.Item2).Select(x => x.Item1).ToList();
closures = (json, childrenIds);
Expand Down
33 changes: 16 additions & 17 deletions src/Speckle.Sdk/Serialisation/V2/Receive/ObjectLoader.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Common;
using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Serialisation.Utilities;
using Speckle.Sdk.Transports;

namespace Speckle.Sdk.Serialisation.V2.Receive;

[GenerateAutoInterface]
public sealed class ObjectLoader(
ISQLiteCacheManager sqLiteCacheManager,
ISQLiteReceiveCacheManager sqliteReceiveCacheManager,
IServerObjectManager serverObjectManager,
string streamId,
IProgress<ProgressArgs>? progress
Expand All @@ -28,7 +29,7 @@ CancellationToken cancellationToken
string? rootJson;
if (!options.SkipCache)
{
rootJson = sqLiteCacheManager.GetObject(rootId);
rootJson = sqliteReceiveCacheManager.GetObject(rootId);
if (rootJson != null)
{
//assume everything exists as the root is there.
Expand All @@ -50,7 +51,10 @@ CancellationToken cancellationToken
await GetAndCache(allChildrenIds, cancellationToken).ConfigureAwait(false);

//save the root last to shortcut later
sqLiteCacheManager.SaveObjectSync(rootId, rootJson);
if (!options.SkipCache)
{
sqliteReceiveCacheManager.SaveObject(new(rootId, rootJson, true));
}
return (rootJson, allChildrenIds);
}

Expand All @@ -59,7 +63,7 @@ CancellationToken cancellationToken
{
_checkCache++;
progress?.Report(new(ProgressEvent.CacheCheck, _checkCache, _allChildrenCount));
if (!_options.SkipCache && !sqLiteCacheManager.HasObject(id))
if (!_options.SkipCache && !sqliteReceiveCacheManager.HasObject(id))
{
return id;
}
Expand All @@ -68,11 +72,9 @@ CancellationToken cancellationToken
}

[AutoInterfaceIgnore]
public override async Task<List<(string, string)>> DownloadAndCache(List<string?> ids)
public override async Task<List<BaseItem>> Download(List<string?> ids)
{
var count = 0L;
progress?.Report(new(ProgressEvent.DownloadObject, count, _allChildrenCount));
var toCache = new List<(string, string)>();
var toCache = new List<BaseItem>();
await foreach (
var (id, json) in serverObjectManager.DownloadObjects(
streamId,
Expand All @@ -82,25 +84,22 @@ CancellationToken cancellationToken
)
)
{
count++;
progress?.Report(new(ProgressEvent.DownloadObject, count, _allChildrenCount));
toCache.Add((id, json));
toCache.Add(new(id, json, true));
}

return toCache;
}

[AutoInterfaceIgnore]
public override void SaveToCache((string, string) x)
public override void SaveToCache(BaseItem x)
{
if (!_options.SkipCache)
{
sqLiteCacheManager.SaveObjectSync(x.Item1, x.Item2);
sqliteReceiveCacheManager.SaveObject(x);
_cached++;
progress?.Report(new(ProgressEvent.CachedToLocal, _cached, _allChildrenCount));
}

_cached++;
progress?.Report(new(ProgressEvent.Cached, _cached, _allChildrenCount));
}

public string? LoadId(string id) => sqLiteCacheManager.GetObject(id);
public string? LoadId(string id) => sqliteReceiveCacheManager.GetObject(id);
}
Loading

0 comments on commit 4cc78c4

Please sign in to comment.