Skip to content

Commit

Permalink
Make operation scheduler async internally (#431)
Browse files Browse the repository at this point in the history
  • Loading branch information
einarmo authored Jun 13, 2024
1 parent 61c6411 commit 5839dc3
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 15 deletions.
18 changes: 8 additions & 10 deletions Cognite.Common/OperationScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public OperationScheduler(IEnumerable<T> initialItems, TaskThrottler throttler,
/// <param name="chunk">Chunk to handle</param>
/// <param name="token">Cancellation token</param>
/// <returns>New elements</returns>
protected abstract IEnumerable<T> HandleTaskResult(IChunk<T> chunk, CancellationToken token);
protected abstract Task<IEnumerable<T>> HandleTaskResult(IChunk<T> chunk, CancellationToken token);

/// <summary>
/// Called if the scheduler is aborted before it finishes running.
Expand All @@ -118,7 +118,7 @@ public OperationScheduler(IEnumerable<T> initialItems, TaskThrottler throttler,
/// </summary>
/// <param name="chunk">Chunk to free</param>
/// <param name="token">Cancellation token</param>
protected abstract void AbortChunk(IChunk<T> chunk, CancellationToken token);
protected abstract Task AbortChunk(IChunk<T> chunk, CancellationToken token);

/// <summary>
/// Called on each iteration of the scheduler loop, for reporting.
Expand Down Expand Up @@ -195,19 +195,17 @@ private async Task ConsumeChunkInternal(IChunk<T> chunk, CancellationToken token
/// Start the scheduler loop.
/// </summary>
/// <returns> Task which terminates when the scheduler is finished</returns>
public Task RunAsync()
public async Task RunAsync()
{
return Task.Run(() => Run(), CancellationToken.None);
await Run().ConfigureAwait(false);
}



/// <summary>
/// Runs in a single thread, so that logic in the ThreadScheduler is thread safe.
/// </summary>
private void Run()
private async Task Run()
{
var capacity = GetCapacity(_activeItems.Count(), true).Result;
var capacity = await GetCapacity(_activeItems.Count(), true).ConfigureAwait(false);
var chunks = GetNextChunks(_activeItems, capacity, out _activeItems).ToList();
foreach (var chunk in chunks)
{
Expand Down Expand Up @@ -248,7 +246,7 @@ private void Run()
{
int numFinished = 0;

var next = HandleTaskResult(chunk, TokenSource.Token);
var next = await HandleTaskResult(chunk, TokenSource.Token).ConfigureAwait(false);
foreach (var newItem in next)
{
newItems.Add(newItem);
Expand Down Expand Up @@ -314,7 +312,7 @@ private void Run()
{
foreach (var chunk in chunks)
{
AbortChunk(chunk, TokenSource.Token);
await AbortChunk(chunk, TokenSource.Token).ConfigureAwait(false);
}
}
if (_numPending > 0)
Expand Down
11 changes: 7 additions & 4 deletions ExtractorUtils.Test/unit/SchedulerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ public TestScheduler(
}


protected override void AbortChunk(IChunk<SchedulerItem> chunk, CancellationToken token)
protected override Task AbortChunk(IChunk<SchedulerItem> chunk, CancellationToken token)
{
Aborted++;
return Task.CompletedTask;
}

protected override async Task ConsumeChunk(IChunk<SchedulerItem> chunk, CancellationToken token)
Expand All @@ -66,9 +67,10 @@ protected override IChunk<SchedulerItem> GetChunk(IEnumerable<SchedulerItem> ite
};
}

protected override IEnumerable<SchedulerItem> HandleTaskResult(IChunk<SchedulerItem> chunk, CancellationToken token)
protected override Task<IEnumerable<SchedulerItem>> HandleTaskResult(IChunk<SchedulerItem> chunk, CancellationToken token)
{
CountChunks++;
var res = new List<SchedulerItem>();
foreach (var item in chunk.Items)
{
item.NumRemaining--;
Expand All @@ -78,16 +80,17 @@ protected override IEnumerable<SchedulerItem> HandleTaskResult(IChunk<SchedulerI
{
item.FoundChildren++;
CountItems++;
yield return new SchedulerItem
res.Add(new SchedulerItem
{
DepthRemaining = item.DepthRemaining - 1,
NumChildren = item.NumChildren,
NumRemaining = item.MaxRemaining,
MaxRemaining = item.MaxRemaining
};
});
}
}
}
return Task.FromResult(res as IEnumerable<SchedulerItem>);
}

protected override void OnIteration(int pending, int operations, int finished, int total)
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.23.4
1.24.0

0 comments on commit 5839dc3

Please sign in to comment.