Skip to content

Commit

Permalink
Serialize now waits for the scheduler to be completed before returning (
Browse files Browse the repository at this point in the history
#225)

* Serialize now waits for the scheduler to be completed before returning

* formatting

* wait in deserialize too

* Pass cancellation token to download, refactor how things are passed/created

* responses are now streamed and cancel them earlier

* format

* add manual empty message for sending that definitely knows when the channels are empty

* Fix configure awaits

* more configure await false
  • Loading branch information
adamhathcock authored Feb 17, 2025
1 parent 988599f commit e5a0915
Show file tree
Hide file tree
Showing 16 changed files with 188 additions and 134 deletions.
16 changes: 10 additions & 6 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,27 @@ public Task Start(CancellationToken cancellationToken) =>
);

public async ValueTask Save(T item, CancellationToken cancellationToken) =>
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(true);
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);

public async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch)
private async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch)
{
await SendToServer((Batch<T>)batch).ConfigureAwait(false);
return batch;
}

public abstract Task SendToServer(Batch<T> batch);

public void DoneTraversing() => _checkCacheChannel.Writer.TryComplete();
public abstract void SaveToCache(List<T> item);

public Task DoneTraversing()
{
_checkCacheChannel.Writer.TryComplete();
return Task.CompletedTask;
}

public async Task DoneSaving()
{
await _checkCacheChannel.Reader.Completion.ConfigureAwait(true);
await _checkCacheChannel.Reader.Completion.ConfigureAwait(false);
lock (_exceptions)
{
if (_exceptions.Count > 0)
Expand All @@ -103,6 +109,4 @@ public async Task DoneSaving()
}
}
}

public abstract void SaveToCache(List<T> item);
}
19 changes: 11 additions & 8 deletions src/Speckle.Sdk/Api/Operations/Operations.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@ CancellationToken cancellationToken
metricsFactory.CreateCounter<long>("Receive").Add(1);

receiveActivity?.SetTag("objectId", objectId);

var process = serializeProcessFactory.CreateDeserializeProcess(
url,
streamId,
authorizationToken,
onProgressAction,
cancellationToken
);
try
{
using var process = serializeProcessFactory.CreateDeserializeProcess(
url,
streamId,
authorizationToken,
onProgressAction,
cancellationToken
);
var result = await process.Deserialize(objectId).ConfigureAwait(false);
receiveActivity?.SetStatus(SdkActivityStatusCode.Ok);
return result;
Expand All @@ -41,6 +40,10 @@ CancellationToken cancellationToken
receiveActivity?.RecordException(ex);
throw;
}
finally
{
await process.DisposeAsync().ConfigureAwait(false);
}
}

/// <summary>
Expand Down
18 changes: 11 additions & 7 deletions src/Speckle.Sdk/Api/Operations/Operations.Send.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ CancellationToken cancellationToken
using var receiveActivity = activityFactory.Start("Operations.Send");
metricsFactory.CreateCounter<long>("Send").Add(1);

var process = serializeProcessFactory.CreateSerializeProcess(
url,
streamId,
authorizationToken,
onProgressAction,
cancellationToken
);
try
{
using var process = serializeProcessFactory.CreateSerializeProcess(
url,
streamId,
authorizationToken,
onProgressAction,
cancellationToken
);
var results = await process.Serialize(value).ConfigureAwait(false);

receiveActivity?.SetStatus(SdkActivityStatusCode.Ok);
Expand All @@ -43,6 +43,10 @@ CancellationToken cancellationToken
receiveActivity?.RecordException(ex);
throw;
}
finally
{
await process.DisposeAsync().ConfigureAwait(false);
}
}

/// <summary>
Expand Down
19 changes: 19 additions & 0 deletions src/Speckle.Sdk/Serialisation/V2/PriorityScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,23 @@ protected override void QueueTask(Task task)
}

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false; // we might not want to execute task that should schedule as high or low priority inline

public Task WaitForCompletion()
{
_tasks.CompleteAdding();
return Task
.Factory.StartNew(
async () =>
{
while (_threads != null && _threads.Any(x => x.IsAlive))
{
await Task.Delay(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false);
}
},
CancellationToken.None,
TaskCreationOptions.AttachedToParent,
TaskScheduler.Default
)
.Unwrap();
}
}
36 changes: 30 additions & 6 deletions src/Speckle.Sdk/Serialisation/V2/Receive/DeserializeProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Speckle.Sdk.Dependencies;
using Speckle.Sdk.Models;
using Speckle.Sdk.Serialisation.Utilities;
using Speckle.Sdk.SQLite;
using Speckle.Sdk.Transports;

namespace Speckle.Sdk.Serialisation.V2.Receive;
Expand All @@ -16,24 +17,47 @@ public record DeserializeProcessOptions(
bool SkipServer = false
);

public partial interface IDeserializeProcess : IDisposable;
public partial interface IDeserializeProcess : IAsyncDisposable;

[GenerateAutoInterface]
public sealed class DeserializeProcess(
IProgress<ProgressArgs>? progress,
IObjectLoader objectLoader,
IProgress<ProgressArgs>? progress,
IBaseDeserializer baseDeserializer,
ILoggerFactory loggerFactory,
CancellationToken cancellationToken,
DeserializeProcessOptions? options = null
) : IDeserializeProcess
{
public DeserializeProcess(
ISqLiteJsonCacheManager sqLiteJsonCacheManager,
IServerObjectManager serverObjectManager,
IProgress<ProgressArgs>? progress,
IBaseDeserializer baseDeserializer,
ILoggerFactory loggerFactory,
CancellationToken cancellationToken,
DeserializeProcessOptions? options = null
)
:
#pragma warning disable CA2000
this(
new ObjectLoader(sqLiteJsonCacheManager, serverObjectManager, progress, cancellationToken),
progress,
baseDeserializer,
loggerFactory,
cancellationToken,
options
)
#pragma warning restore CA2000
{ }

private readonly PriorityScheduler _belowNormal = new(
loggerFactory.CreateLogger<PriorityScheduler>(),
ThreadPriority.BelowNormal,
Environment.ProcessorCount * 2,
cancellationToken
);

private readonly DeserializeProcessOptions _options = options ?? new();

private readonly ConcurrentDictionary<Id, (Json, IReadOnlyCollection<Id>)> _closures = new();
Expand All @@ -44,10 +68,11 @@ public sealed class DeserializeProcess(
public long Total { get; private set; }

[AutoInterfaceIgnore]
public void Dispose()
public async ValueTask DisposeAsync()
{
objectLoader.Dispose();
_belowNormal.Dispose();
await _belowNormal.WaitForCompletion().ConfigureAwait(false);
}

/// <summary>
Expand All @@ -57,9 +82,7 @@ public void Dispose()

public async Task<Base> Deserialize(string rootId)
{
var (rootJson, childrenIds) = await objectLoader
.GetAndCache(rootId, _options, cancellationToken)
.ConfigureAwait(false);
var (rootJson, childrenIds) = await objectLoader.GetAndCache(rootId, _options).ConfigureAwait(false);
var root = new Id(rootId);
//childrenIds is already frozen but need to just add root?
_allIds = childrenIds.Concat([root]).Freeze();
Expand All @@ -68,6 +91,7 @@ public async Task<Base> Deserialize(string rootId)
_closures.TryAdd(root, (rootJson, childrenIds));
progress?.Report(new(ProgressEvent.DeserializeObject, _baseCache.Count, childrenIds.Count));
await Traverse(root).ConfigureAwait(false);
await _belowNormal.WaitForCompletion().ConfigureAwait(false);
return _baseCache[root];
}

Expand Down
17 changes: 10 additions & 7 deletions src/Speckle.Sdk/Serialisation/V2/Receive/ObjectLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public partial interface IObjectLoader : IDisposable;
public sealed class ObjectLoader(
ISqLiteJsonCacheManager sqLiteJsonCacheManager,
IServerObjectManager serverObjectManager,
IProgress<ProgressArgs>? progress
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
) : ChannelLoader<BaseItem>, IObjectLoader
{
private int? _allChildrenCount;
Expand All @@ -28,11 +29,7 @@ public sealed class ObjectLoader(
[AutoInterfaceIgnore]
public void Dispose() => sqLiteJsonCacheManager.Dispose();

public async Task<(Json, IReadOnlyCollection<Id>)> GetAndCache(
string rootId,
DeserializeProcessOptions options,
CancellationToken cancellationToken
)
public async Task<(Json, IReadOnlyCollection<Id>)> GetAndCache(string rootId, DeserializeProcessOptions options)
{
_options = options;
string? rootJson;
Expand Down Expand Up @@ -97,9 +94,14 @@ public override async Task<List<BaseItem>> Download(List<string?> ids)
{
var toCache = new List<BaseItem>();
await foreach (
var (id, json) in serverObjectManager.DownloadObjects(ids.Select(x => x.NotNull()).ToList(), progress, default)
var (id, json) in serverObjectManager.DownloadObjects(
ids.Select(x => x.NotNull()).ToList(),
progress,
cancellationToken
)
)
{
cancellationToken.ThrowIfCancellationRequested();
Interlocked.Increment(ref _downloaded);
progress?.Report(new(ProgressEvent.DownloadObjects, _downloaded, _totalToDownload));
toCache.Add(new(new(id), new(json), true, null));
Expand All @@ -119,6 +121,7 @@ public override void SaveToCache(List<BaseItem> batch)
{
if (!_options.SkipCache)
{
cancellationToken.ThrowIfCancellationRequested();
sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id.Value, x.Json.Value)));
Interlocked.Exchange(ref _cached, _cached + batch.Count);
progress?.Report(new(ProgressEvent.CachedToLocal, _cached, _allChildrenCount));
Expand Down
34 changes: 25 additions & 9 deletions src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.Logging;
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Common;
Expand All @@ -22,7 +23,7 @@ public readonly record struct SerializeProcessResults(
IReadOnlyDictionary<Id, ObjectReference> ConvertedReferences
);

public partial interface ISerializeProcess : IDisposable;
public partial interface ISerializeProcess : IAsyncDisposable;

[GenerateAutoInterface]
public sealed class SerializeProcess(
Expand All @@ -36,12 +37,17 @@ public sealed class SerializeProcess(
SerializeProcessOptions? options = null
) : ChannelSaver<BaseItem>, ISerializeProcess
{
//async dispose
[SuppressMessage("Usage", "CA2213:Disposable fields should be disposed")]
private readonly PriorityScheduler _highest = new(
loggerFactory.CreateLogger<PriorityScheduler>(),
ThreadPriority.Highest,
2,
cancellationToken
);

//async dispose
[SuppressMessage("Usage", "CA2213:Disposable fields should be disposed")]
private readonly PriorityScheduler _belowNormal = new(
loggerFactory.CreateLogger<PriorityScheduler>(),
ThreadPriority.BelowNormal,
Expand All @@ -67,11 +73,18 @@ public sealed class SerializeProcess(
private long _cached;

[AutoInterfaceIgnore]
public void Dispose()
public async ValueTask DisposeAsync()
{
_highest.Dispose();
_belowNormal.Dispose();
sqLiteJsonCacheManager.Dispose();
await WaitForSchedulerCompletion().ConfigureAwait(false);
}

private async Task WaitForSchedulerCompletion()
{
await _highest.WaitForCompletion().ConfigureAwait(false);
await _belowNormal.WaitForCompletion().ConfigureAwait(false);
}

public async Task<SerializeProcessResults> Serialize(Base root)
Expand All @@ -88,10 +101,13 @@ public async Task<SerializeProcessResults> Serialize(Base root)
_highest
);
}
await Traverse(root).ConfigureAwait(true);
DoneTraversing();
await Task.WhenAll(findTotalObjectsTask, channelTask).ConfigureAwait(true);
await DoneSaving().ConfigureAwait(true);
await Traverse(root).ConfigureAwait(false);
await DoneTraversing().ConfigureAwait(false);
await Task.WhenAll(findTotalObjectsTask, channelTask).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
await DoneSaving().ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
await WaitForSchedulerCompletion().ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
return new(root.id.NotNull(), baseSerializer.ObjectReferences.Freeze());
}
Expand All @@ -116,7 +132,7 @@ private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj)
cancellationToken.ThrowIfCancellationRequested();
var t = Task
.Factory.StartNew(
async () => await Traverse(tmp).ConfigureAwait(true),
async () => await Traverse(tmp).ConfigureAwait(false),
cancellationToken,
TaskCreationOptions.AttachedToParent | TaskCreationOptions.PreferFairness,
_belowNormal
Expand All @@ -128,7 +144,7 @@ private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj)
Dictionary<Id, NodeInfo>[] taskClosures = [];
if (tasks.Count > 0)
{
taskClosures = await Task.WhenAll(tasks).ConfigureAwait(true);
taskClosures = await Task.WhenAll(tasks).ConfigureAwait(false);
}
var childClosures = _childClosurePool.Get();
foreach (var childClosure in taskClosures)
Expand All @@ -150,7 +166,7 @@ private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj)
if (item.NeedsStorage)
{
Interlocked.Increment(ref _objectsSerialized);
await Save(item, cancellationToken).ConfigureAwait(true);
await Save(item, cancellationToken).ConfigureAwait(false);
}

if (!currentClosures.ContainsKey(item.Id))
Expand Down
15 changes: 9 additions & 6 deletions src/Speckle.Sdk/Serialisation/V2/SerializeProcessFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ public IDeserializeProcess CreateDeserializeProcess(
{
var sqLiteJsonCacheManager = sqLiteJsonCacheManagerFactory.CreateFromStream(streamId);
var serverObjectManager = serverObjectManagerFactory.Create(url, streamId, authorizationToken);

#pragma warning disable CA2000
//owned by process, refactor later
var objectLoader = new ObjectLoader(sqLiteJsonCacheManager, serverObjectManager, progress);
#pragma warning restore CA2000
return new DeserializeProcess(progress, objectLoader, baseDeserializer, loggerFactory, cancellationToken, options);
return new DeserializeProcess(
sqLiteJsonCacheManager,
serverObjectManager,
progress,
baseDeserializer,
loggerFactory,
cancellationToken,
options
);
}
}
Loading

0 comments on commit e5a0915

Please sign in to comment.