From 11fe8e8cce33e7c54943b258c58d6e124a96e184 Mon Sep 17 00:00:00 2001 From: Adam Hathcock Date: Fri, 3 Jan 2025 14:25:29 +0000 Subject: [PATCH] Use CancellationSource to properly dispose of threads in PriorityScheduler (#197) * Use CancellationSource to properly dispose of threads in PriorityScheduler * formatting --- .../V2/Send/PriorityScheduler.cs | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/Speckle.Sdk/Serialisation/V2/Send/PriorityScheduler.cs b/src/Speckle.Sdk/Serialisation/V2/Send/PriorityScheduler.cs index 3e254d1e..c38a3df3 100644 --- a/src/Speckle.Sdk/Serialisation/V2/Send/PriorityScheduler.cs +++ b/src/Speckle.Sdk/Serialisation/V2/Send/PriorityScheduler.cs @@ -4,10 +4,17 @@ namespace Speckle.Sdk.Serialisation.V2.Send; public sealed class PriorityScheduler(ThreadPriority priority, int maximumConcurrencyLevel) : TaskScheduler, IDisposable { + private readonly CancellationTokenSource _cancellationTokenSource = new(); private readonly BlockingCollection _tasks = new(); private Thread[]? _threads; - public void Dispose() => _tasks.Dispose(); + public void Dispose() + { + _tasks.CompleteAdding(); + _cancellationTokenSource.Cancel(); + _tasks.Dispose(); + _cancellationTokenSource.Dispose(); + } public override int MaximumConcurrencyLevel => maximumConcurrencyLevel; @@ -24,9 +31,26 @@ protected override void QueueTask(Task task) { _threads[i] = new Thread(() => { - foreach (Task t in _tasks.GetConsumingEnumerable()) + try + { + foreach (Task t in _tasks.GetConsumingEnumerable(_cancellationTokenSource.Token)) + { + if (_cancellationTokenSource.IsCancellationRequested) + { + break; + } + TryExecuteTask(t); + if (_cancellationTokenSource.IsCancellationRequested) + { + break; + } + } + } +#pragma warning disable CA1031 + catch (Exception) +#pragma warning restore CA1031 { - TryExecuteTask(t); + // ignored } }) {