Skip to content

Commit

Permalink
Validate that timeout is positive in WaitForOrchestrationAsync, and e…
Browse files Browse the repository at this point in the history
…nable nullable checks (#910)
  • Loading branch information
davidmrdavid authored Jun 2, 2023
1 parent acdbf7a commit 0159481
Showing 1 changed file with 39 additions and 21 deletions.
60 changes: 39 additions & 21 deletions src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.AzureStorage
{
using System;
Expand Down Expand Up @@ -1838,15 +1837,15 @@ async Task<PurgeResult> IOrchestrationServicePurgeClient.PurgeInstanceStateAsync
purgeInstanceFilter.RuntimeStatus);
return storagePurgeHistoryResult.ToCorePurgeHistoryResult();
}

#nullable enable
/// <summary>
/// Wait for an orchestration to reach any terminal state within the given timeout
/// </summary>
/// <param name="instanceId">The orchestration instance to wait for.</param>
/// <param name="executionId">The execution ID (generation) of the specified instance.</param>
/// <param name="timeout">Max timeout to wait.</param>
/// <param name="timeout">Max timeout to wait. Only positive <see cref="TimeSpan"/> values, <see cref="TimeSpan.Zero"/>, or <see cref="Timeout.InfiniteTimeSpan"/> are allowed.</param>
/// <param name="cancellationToken">Task cancellation token.</param>
public async Task<OrchestrationState> WaitForOrchestrationAsync(
public async Task<OrchestrationState?> WaitForOrchestrationAsync(
string instanceId,
string executionId,
TimeSpan timeout,
Expand All @@ -1857,20 +1856,24 @@ public async Task<OrchestrationState> WaitForOrchestrationAsync(
throw new ArgumentException(nameof(instanceId));
}

bool isInfiniteTimeSpan = timeout == Timeout.InfiniteTimeSpan;
if (timeout < TimeSpan.Zero && !isInfiniteTimeSpan)
{
throw new ArgumentException($"The parameter {nameof(timeout)} cannot be negative." +
$" The value for {nameof(timeout)} was '{timeout}'." +
$" Please provide either a positive timeout value or Timeout.InfiniteTimeSpan.");
}

TimeSpan statusPollingInterval = TimeSpan.FromSeconds(2);
while (!cancellationToken.IsCancellationRequested && timeout > TimeSpan.Zero)
{
OrchestrationState state = await this.GetOrchestrationStateAsync(instanceId, executionId);
if (state == null ||
state.OrchestrationStatus == OrchestrationStatus.Running ||
state.OrchestrationStatus == OrchestrationStatus.Suspended ||
state.OrchestrationStatus == OrchestrationStatus.Pending ||
state.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew)
{
await Task.Delay(statusPollingInterval, cancellationToken);
timeout -= statusPollingInterval;
}
else
while (!cancellationToken.IsCancellationRequested)
{
OrchestrationState? state = await this.GetOrchestrationStateAsync(instanceId, executionId);

if (state != null &&
state.OrchestrationStatus != OrchestrationStatus.Running &&
state.OrchestrationStatus != OrchestrationStatus.Suspended &&
state.OrchestrationStatus != OrchestrationStatus.Pending &&
state.OrchestrationStatus != OrchestrationStatus.ContinuedAsNew)
{
if (this.settings.FetchLargeMessageDataEnabled)
{
Expand All @@ -1879,6 +1882,17 @@ public async Task<OrchestrationState> WaitForOrchestrationAsync(
}
return state;
}

timeout -= statusPollingInterval;

// For a user-provided timeout of `TimeSpan.Zero`,
// we want to check the status of the orchestration once and then return.
// Therefore, we check the timeout condition after the status check.
if (!isInfiniteTimeSpan && (timeout <= TimeSpan.Zero))
{
break;
}
await Task.Delay(statusPollingInterval, cancellationToken);
}

return null;
Expand Down Expand Up @@ -1909,7 +1923,7 @@ public Task<string> DownloadBlobAsync(string blobUri)

// TODO: Change this to a sticky assignment so that partition count changes can
// be supported: https://github.com/Azure/azure-functions-durable-extension/issues/1
async Task<ControlQueue> GetControlQueueAsync(string instanceId)
async Task<ControlQueue?> GetControlQueueAsync(string instanceId)
{
uint partitionIndex = Fnv1aHashHelper.ComputeHash(instanceId) % (uint)this.settings.PartitionCount;
string queueName = GetControlQueueName(this.settings.TaskHubName, (int)partitionIndex);
Expand Down Expand Up @@ -1980,12 +1994,12 @@ private static OrchestrationQueryResult ConvertFrom(DurableStatusQueryResult sta

class PendingMessageBatch
{
public string OrchestrationInstanceId { get; set; }
public string OrchestrationExecutionId { get; set; }
public string? OrchestrationInstanceId { get; set; }
public string? OrchestrationExecutionId { get; set; }

public List<MessageData> Messages { get; set; } = new List<MessageData>();

public OrchestrationRuntimeState Orchestrationstate { get; set; }
public OrchestrationRuntimeState? Orchestrationstate { get; set; }
}

class ResettableLazy<T>
Expand All @@ -1995,7 +2009,10 @@ class ResettableLazy<T>

Lazy<T> lazy;

// Supress warning because it's incorrect: the lazy variable is initialized in the constructor, in the `Reset()` method
#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
public ResettableLazy(Func<T> valueFactory, LazyThreadSafetyMode mode)
#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
{
this.valueFactory = valueFactory;
this.threadSafetyMode = mode;
Expand Down Expand Up @@ -2025,3 +2042,4 @@ public TaskHubQueueMessage(TaskHubQueue queue, TaskMessage message)
}
}
}
#nullable disable

0 comments on commit 0159481

Please sign in to comment.