Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poison message handling for DT.AS V2 #1130

Open
wants to merge 3 commits into
base: azure-storage-v12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public class AzureStorageOrchestrationServiceSettings
/// </summary>
public int MaxConcurrentTaskEntityWorkItems { get; set; } = 100;

/// <summary>
/// Gets or sets the maximum dequeue count of any message before it is flagged as a "poison message".
/// The default value is 20.
/// </summary>
public int PoisonMessageDeuqueCountThreshold { get; set; } = 20;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public int PoisonMessageDeuqueCountThreshold { get; set; } = 20;
public int PoisonMessageDequeueCountThreshold { get; set; } = 20;


/// <summary>
/// Gets or sets the maximum number of concurrent storage operations that can be executed in the context
/// of a single orchestration instance.
Expand Down
48 changes: 30 additions & 18 deletions src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,28 +106,27 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage)
MessageData messageData;
try
{
// Try to de-serialize the message
messageData = await this.messageManager.DeserializeQueueMessageAsync(
queueMessage,
this.storageQueue.Name);

// If successful, check if it's a poison message. If so, we handle it
// and log metadata about it as the de-serialization succeeded.
await this.HandleIfPoisonMessageAsync(messageData);
}
catch (Exception e)
catch (Exception exception)
{
// We have limited information about the details of the message
// since we failed to deserialize it.
this.settings.Logger.MessageFailure(
this.storageAccountName,
this.settings.TaskHubName,
queueMessage.MessageId /* MessageId */,
string.Empty /* InstanceId */,
string.Empty /* ExecutionId */,
this.storageQueue.Name,
string.Empty /* EventType */,
0 /* TaskEventId */,
e.ToString());

// Abandon the message so we can try it again later.
// Note: We will fetch the message again from the queue before retrying, so no need to read the receipt
_ = await this.AbandonMessageAsync(queueMessage);
// Deserialization errors can be persistent, so we check if this is a poison message.
bool isPoisonMessage = await this.TryHandlingDeserializationPoisonMessage(queueMessage, exception);
if (isPoisonMessage)
{
// we have already handled the poison message, so we move on.
return;
}

// This is not a poison message (at least not yet), so we abandon it to retry later.
await this.AbandonMessageAsync(queueMessage, exception);
return;
}

Expand Down Expand Up @@ -192,8 +191,21 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage)
}

// This overload is intended for cases where we aren't able to deserialize an instance of MessageData.
public Task<UpdateReceipt?> AbandonMessageAsync(QueueMessage queueMessage)
public Task<UpdateReceipt?> AbandonMessageAsync(QueueMessage queueMessage, Exception exception)
{
// We have limited information about the details of the message
// since we failed to deserialize it.
this.settings.Logger.MessageFailure(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Abandoning a message is not necessarily an error. There are other cases, such as out-of-order messaging race conditions where we're able to recover gracefully by just abandoning a message once.

Consider putting this error message inside of an if (exception != null) block.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this overload is only called in cases where there's an exception, as you can see by the comment in line 193:

// This overload is intended for cases where we aren't able to deserialize an instance of MessageData.
. In that case, doesn't that mean the exception will never be null, @cgillum ?

this.storageAccountName,
this.settings.TaskHubName,
queueMessage.MessageId /* MessageId */,
string.Empty /* InstanceId */,
string.Empty /* ExecutionId */,
this.storageQueue.Name,
string.Empty /* EventType */,
0 /* TaskEventId */,
exception.ToString());

this.stats.PendingOrchestratorMessages.TryRemove(queueMessage.MessageId, out _);
return base.AbandonMessageAsync(
queueMessage,
Expand Down
92 changes: 92 additions & 0 deletions src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ namespace DurableTask.AzureStorage.Messaging
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Data.Tables;
using Azure.Storage.Queues.Models;
using DurableTask.AzureStorage.Storage;
using DurableTask.Core;
using DurableTask.Core.History;


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra newline

abstract class TaskHubQueue
{
static long messageSequenceNumber;
Expand Down Expand Up @@ -150,6 +152,96 @@ await this.storageQueue.AddMessageAsync(
return data;
}

public async Task HandleIfPoisonMessageAsync(MessageData messageData)
{
QueueMessage queueMessage = messageData.OriginalQueueMessage;
int maxThreshold = this.settings.PoisonMessageDeuqueCountThreshold;

if (queueMessage.DequeueCount > maxThreshold)
{
// Create the poison message table if it doesn't exist
string poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "Poison";
Table poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName);
await poisonMessagesTable.CreateIfNotExistsAsync();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to consider caching to remember if we've already created this table. Otherwise, we may end up spamming the storage account with a lot of these calls.


// provide guidance, which is backend-specific
string guidance = $"Queue message ID '{queueMessage.MessageId}' was dequeued {queueMessage.DequeueCount} times," +
$" which is greater than the threshold poison message threshold ({maxThreshold}). " +
$"The message has been moved to the '{poisonMessageTableName}' table for manual review. " +
$"This will fail the consuming orchestrator, activity, or entity";
messageData.TaskMessage.Event.PoisonGuidance = guidance;

// Add the message to the poison message table
TableEntity tableEntity = new TableEntity(queueMessage.MessageId, this.Name)
{
["RawMessage"] = queueMessage.Body,
["Reason"] = guidance
};

await poisonMessagesTable.InsertEntityAsync(tableEntity);

// Delete the message from the queue
await this.storageQueue.DeleteMessageAsync(queueMessage);

// Since isPoison is `true`, we'll override the deserialized message
messageData.TaskMessage.Event.IsPoison = true;

this.settings.Logger.PoisonMessageDetected(
this.storageAccountName,
this.settings.TaskHubName,
messageData.TaskMessage.Event.EventType.ToString(),
messageData.TaskMessage.Event.EventId,
messageData.OriginalQueueMessage.MessageId,
messageData.TaskMessage.OrchestrationInstance.InstanceId,
messageData.TaskMessage.OrchestrationInstance.ExecutionId,
this.Name,
messageData.OriginalQueueMessage.DequeueCount);
}
}

public async Task<bool> TryHandlingDeserializationPoisonMessage(QueueMessage queueMessage, Exception deserializationException)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like there's a lot of redundancy between these two methods. Is it possible to refactor them to share some logic?

{
var maxThreshold = this.settings.PoisonMessageDeuqueCountThreshold;
bool isPoisonMessage = queueMessage.DequeueCount > maxThreshold;

if (isPoisonMessage)
{
isPoisonMessage = true;
string guidance = $"Queue message ID '{queueMessage.MessageId}' was dequeued {queueMessage.DequeueCount} times," +
$" which is greater than the threshold poison message threshold ({maxThreshold}). " +
$"A de-serialization error ocurred: \n {deserializationException}";

// Create poison message table if it doesn't exist and add the poison message
TableEntity tableEntity = new TableEntity(queueMessage.MessageId, this.Name)
{
["RawMessage"] = queueMessage.Body,
["Reason"] = guidance
};

string poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "Poison";
Table poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName);
await poisonMessagesTable.CreateIfNotExistsAsync();

await poisonMessagesTable.InsertEntityAsync(tableEntity);

// Delete the message from the queue
await this.storageQueue.DeleteMessageAsync(queueMessage);

this.settings.Logger.PoisonMessageDetected(
this.storageAccountName,
this.settings.TaskHubName,
string.Empty,
0,
string.Empty,
string.Empty,
string.Empty,
this.Name,
queueMessage.DequeueCount);
}
return isPoisonMessage;
}


static string? GetSerializableTraceContext(TaskMessage taskMessage)
{
TraceContextBase traceContext = CorrelationTraceContext.Current;
Expand Down
25 changes: 20 additions & 5 deletions src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// limitations under the License.
// ----------------------------------------------------------------------------------

#nullable enable
namespace DurableTask.AzureStorage.Messaging
{
using System;
Expand All @@ -31,26 +32,40 @@ public WorkItemQueue(

protected override TimeSpan MessageVisibilityTimeout => this.settings.WorkItemQueueVisibilityTimeout;

public async Task<MessageData> GetMessageAsync(CancellationToken cancellationToken)
public async Task<MessageData?> GetMessageAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
QueueMessage queueMessage = await this.storageQueue.GetMessageAsync(this.settings.WorkItemQueueVisibilityTimeout, cancellationToken);
QueueMessage? queueMessage = await this.storageQueue.GetMessageAsync(this.settings.WorkItemQueueVisibilityTimeout, cancellationToken);

if (queueMessage == null)
{
await this.backoffHelper.WaitAsync(cancellationToken);
continue;
}

MessageData data = await this.messageManager.DeserializeQueueMessageAsync(
try
{
MessageData data = await this.messageManager.DeserializeQueueMessageAsync(
queueMessage,
this.storageQueue.Name);

this.backoffHelper.Reset();
return data;
this.backoffHelper.Reset();
return data;
}
catch (Exception exception)
{
// Deserialization errors can be persistent, so we check if this is a poison message.
bool isPoisonMessage = await this.TryHandlingDeserializationPoisonMessage(queueMessage, exception);
if (isPoisonMessage)
{
// we have already handled the poison message, so we move on.
continue;
}
}

}
catch (Exception e)
{
Expand Down
11 changes: 11 additions & 0 deletions src/DurableTask.Core/History/HistoryEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,16 @@ protected HistoryEvent(int eventId)
/// Implementation for <see cref="IExtensibleDataObject.ExtensionData"/>.
/// </summary>
public ExtensionDataObject? ExtensionData { get; set; }

/// <summary>
/// Gets or sets whether this is a poison message.
/// </summary>
public bool IsPoison { get; set; } = false;

/// <summary>
/// Gets or sets user-facing details for why a message was labeled as poison.
/// This is to be set by each storage provider.
/// </summary>
public string PoisonGuidance { get; set; } = "";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not comfortable with adding these properties to every DTFx history event, especially if we don't have a plan or design for whether/how to implement poison message handling for other backend providers. For the purposes of this PR, I'd prefer we make changes only to DurableTask.AzureStorage unless we can get broad agreement from across the team that this is the right approach.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Just to add more context here (may be obvious) - the reason for the DTFx.Core change here is solely that it allows us to easily fail the function (activity or orchestrator) with the poison message. In the activity/orchestrator dispatchers, we are checking if isPoison is true and, if so, we're replacing the history event of the poison message with a failure event.

I assume that if instead of doing this, we replace the message itself with a failure event in the DTFx.AS level, we should be able to get away without this "isPoison" property. I recall trying to do this, and not finding it to be super obvious, but it's worth trying again. @cgillum if you have suggestions, I'm all ears!

}
}
15 changes: 15 additions & 0 deletions src/DurableTask.Core/TaskActivityDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace DurableTask.Core
using DurableTask.Core.Logging;
using DurableTask.Core.Middleware;
using DurableTask.Core.Tracing;
using DurableTask.Core.Serializing;

/// <summary>
/// Dispatcher for task activities to handle processing and renewing of work items
Expand Down Expand Up @@ -190,6 +191,20 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>

try
{
if (scheduledEvent.IsPoison)
{
// If the activity is "poison", then we should not executed again. Instead, we'll manually fail the activity
// by throwing an exception on behalf of the user-code. In the exception, we provide the storage-provider's guidance
// on how to deal with the poison message.
// We need to account for all possible deserialization modes, so we construct an exception valid in all modes.
// TODO: revise - this is clunky
var exception = new Exception(scheduledEvent.PoisonGuidance);
var failureDetails = new FailureDetails(exception);
var details = Utils.SerializeCause(exception, JsonDataConverter.Default);
var taskFailure = new TaskFailureException(details, exception, details).WithFailureDetails(failureDetails);
throw taskFailure;
}

string? output = await taskActivity.RunAsync(context, scheduledEvent.Input);
responseEvent = new TaskCompletedEvent(-1, scheduledEvent.EventId, output);
}
Expand Down
16 changes: 16 additions & 0 deletions src/DurableTask.Core/TaskOrchestrationExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,22 @@ void ProcessEvents(IEnumerable<HistoryEvent> events)

void ProcessEvent(HistoryEvent historyEvent)
{
if (historyEvent.IsPoison)
{
// If the message is labeled as "poison", then we should avoid processing it again.
// Therefore, we replace the event "in place" with an "ExecutionTerminatedEvent", so the
// orchestrator stops immediately.

var terminationEvent = new ExecutionTerminatedEvent(-1, historyEvent.PoisonGuidance);
historyEvent = terminationEvent;

// since replay is not guaranteed, we need to populate `this.result`
// with a completed task
var taskCompletionSource = new TaskCompletionSource<string>();
taskCompletionSource.SetResult("");
this.result = taskCompletionSource.Task;
}

bool overrideSuspension = historyEvent.EventType == EventType.ExecutionResumed || historyEvent.EventType == EventType.ExecutionTerminated;
if (this.context.IsSuspended && !overrideSuspension)
{
Expand Down