Skip to content

Commit

Permalink
Merge pull request #60 from houseofcat/main
Browse files Browse the repository at this point in the history
Release v4.1.5
  • Loading branch information
houseofcat authored May 2, 2024
2 parents 539fb84 + 12d24ee commit 06a0819
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 124 deletions.
9 changes: 0 additions & 9 deletions src/HouseofCat.Dataflows/IWorkState.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using OpenTelemetry.Trace;
using System;
using System.Collections.Generic;
using System.Runtime.ExceptionServices;

Expand All @@ -9,17 +8,9 @@ public interface IWorkState
{
IDictionary<string, object> Data { get; set; }

// Routing Logic
IDictionary<string, bool> StepSuccess { get; set; }
string StepIdentifier { get; set; }

// Error Handling
bool IsFaulted { get; set; }
ExceptionDispatchInfo EDI { get; set; }

// Outbound
ReadOnlyMemory<byte> SendData { get; set; }

// RootSpan or ChildSpan derived from TraceParentHeader
TelemetrySpan WorkflowSpan { get; set; }
}
17 changes: 13 additions & 4 deletions src/HouseofCat.RabbitMQ/Consumer/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class Consumer : IConsumer<IReceivedMessage>, IDisposable
public IChannelPool ChannelPool { get; }
public bool Started { get; private set; }

protected JsonSerializerOptions _defaultOptions;

public Consumer(
RabbitOptions options,
string consumerName,
Expand Down Expand Up @@ -395,8 +397,6 @@ protected void EnrichSpanWithTags(TelemetrySpan span, IReceivedMessage receivedM
}
}

protected JsonSerializerOptions _defaultOptions;

protected virtual void AutoDeserialize(ReceivedMessage receivedMessage)
{
if (receivedMessage.ObjectType == Constants.HeaderValueForMessageObjectType
Expand All @@ -407,15 +407,24 @@ protected virtual void AutoDeserialize(ReceivedMessage receivedMessage)
case Constants.HeaderValueForContentTypeJson:
try
{
receivedMessage.Message = JsonSerializer.Deserialize<Message>(receivedMessage.Body.Span, _defaultOptions);
receivedMessage.Message = JsonSerializer
.Deserialize<Message>(
receivedMessage.Body.Span,
_defaultOptions);

receivedMessage.Body = default;
}
catch
{ receivedMessage.FailedToDeserialize = true; }
break;
case Constants.HeaderValueForContentTypeMessagePack:
try
{
receivedMessage.Message = MessagePackProvider.GlobalDeserialize<Message>(receivedMessage.Body);
receivedMessage.Message = MessagePackProvider
.GlobalDeserialize<Message>(
receivedMessage.Body);

receivedMessage.Body = default;
}
catch
{ receivedMessage.FailedToDeserialize = true; }
Expand Down
80 changes: 59 additions & 21 deletions src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace HouseofCat.RabbitMQ.Dataflows;
public interface IConsumerDataflow<TState> where TState : class, IRabbitWorkState, new()
{
string WorkflowName { get; }
string TimeFormat { get; set; }

TState BuildState(IReceivedMessage receivedMessage);

Expand Down Expand Up @@ -92,6 +93,8 @@ public string WorkflowName
}
}

public string TimeFormat { get; set; } = TimeHelpers.Formats.RFC3339Long;

public ConsumerDataflow(
IRabbitService rabbitService,
ConsumerOptions consumerOptions,
Expand Down Expand Up @@ -393,7 +396,7 @@ public ConsumerDataflow<TState> WithDefaultFinalization(
return this;
}

protected static readonly string _defaultFinalizationMessage = "Message [{0}] finished processing. Acking message.";
protected static readonly string _defaultFinalizationMessage = "Message [Id: {0}] finished processing. Acking message.";

protected void DefaultFinalization(TState state)
{
Expand Down Expand Up @@ -433,7 +436,17 @@ public ConsumerDataflow<TState> WithDecryptionStep(
executionOptions,
false,
x => x.ReceivedMessage.Encrypted,
GetSpanName("decrypt"));
GetSpanName("receive_decrypt"),
(state) =>
{
if (state?.ReceivedMessage?.Message?.Metadata?.Fields is null) return;
state.ReceivedMessage.Encrypted = false;
state.ReceivedMessage.EncryptionType = null;
state.ReceivedMessage.EncryptedDateTime = default;
state.ReceivedMessage.Message.Metadata.Fields[Constants.HeaderForEncrypted] = false;
state.ReceivedMessage.Message.Metadata.Fields.Remove(Constants.HeaderForEncryption);
state.ReceivedMessage.Message.Metadata.Fields.Remove(Constants.HeaderForEncryptDate);
});
}
return this;
}
Expand All @@ -454,7 +467,15 @@ public ConsumerDataflow<TState> WithDecompressionStep(
executionOptions,
false,
x => x.ReceivedMessage.Compressed,
GetSpanName("decompress"));
GetSpanName("receive_decompress"),
(state) =>
{
if (state?.ReceivedMessage?.Message?.Metadata?.Fields is null) return;
state.ReceivedMessage.Compressed = false;
state.ReceivedMessage.CompressionType = null;
state.ReceivedMessage.Message.Metadata.Fields[Constants.HeaderForCompressed] = false;
state.ReceivedMessage.Message.Metadata.Fields.Remove(Constants.HeaderForCompression);
});
}

return this;
Expand All @@ -470,7 +491,7 @@ public ConsumerDataflow<TState> WithCreateSendMessage(
if (_createSendMessage is null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_createSendMessage = GetWrappedTransformBlock(createMessage, executionOptions, GetSpanName("create send message"));
_createSendMessage = GetWrappedTransformBlock(createMessage, executionOptions, GetSpanName("send_create"));
}
return this;
}
Expand All @@ -485,7 +506,7 @@ public ConsumerDataflow<TState> WithCreateSendMessage(
if (_createSendMessage is null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_createSendMessage = GetWrappedTransformBlock(createMessage, executionOptions, GetSpanName("create send message"));
_createSendMessage = GetWrappedTransformBlock(createMessage, executionOptions, GetSpanName("send_create"));
}
return this;
}
Expand All @@ -505,8 +526,14 @@ public ConsumerDataflow<TState> WithSendCompressedStep(
_compressionProvider.Compress,
executionOptions,
true,
x => !x.ReceivedMessage.Compressed,
GetSpanName("compress send message"));
x => !x.SendMessage.Metadata.Compressed(),
GetSpanName("send_compress"),
(state) =>
{
if (state?.SendMessage?.Metadata?.Fields is null) return;
state.SendMessage.Metadata.Fields[Constants.HeaderForCompressed] = true;
state.SendMessage.Metadata.Fields[Constants.HeaderForCompression] = _compressionProvider.Type;
});
}
return this;
}
Expand All @@ -526,8 +553,15 @@ public ConsumerDataflow<TState> WithSendEncryptedStep(
_encryptionProvider.Encrypt,
executionOptions,
true,
x => !x.ReceivedMessage.Encrypted,
GetSpanName("encrypt send message"));
x => !x.SendMessage.Metadata.Encrypted(),
GetSpanName("send_encrypt"),
(state) =>
{
if (state?.SendMessage?.Metadata?.Fields is null) return;
state.SendMessage.Metadata.Fields[Constants.HeaderForEncrypted] = true;
state.SendMessage.Metadata.Fields[Constants.HeaderForEncryption] = _encryptionProvider.Type;
state.SendMessage.Metadata.Fields[Constants.HeaderForEncryptDate] = TimeHelpers.GetDateTimeNow(TimeFormat);
});
}
return this;
}
Expand Down Expand Up @@ -714,7 +748,8 @@ protected TransformBlock<TState, TState> GetByteManipulationTransformBlock(
ExecutionDataflowBlockOptions options,
bool outbound,
Predicate<TState> predicate,
string spanName)
string spanName,
Action<TState> callback = null)
{
TState WrapAction(TState state)
{
Expand All @@ -730,16 +765,17 @@ TState WrapAction(TState state)
}
else if (predicate.Invoke(state))
{
if (state.ReceivedMessage.ObjectType == Constants.HeaderValueForMessageObjectType)
if (state.ReceivedMessage.ObjectType == Constants.HeaderValueForMessageObjectType
&& state.ReceivedMessage.Message is not null)
{
if (state.ReceivedMessage.Message is null)
{ state.ReceivedMessage.Message = _serializationProvider.Deserialize<Message>(state.ReceivedMessage.Body); }

state.ReceivedMessage.Message.Body = action(state.ReceivedMessage.Message.Body);
}
else
{ state.ReceivedMessage.Body = action(state.ReceivedMessage.Body); }
}

if (callback is not null)
{ callback(state); }
}
catch (Exception ex)
{
Expand All @@ -761,7 +797,8 @@ protected TransformBlock<TState, TState> GetByteManipulationTransformBlock(
ExecutionDataflowBlockOptions options,
bool outbound,
Predicate<TState> predicate,
string spanName)
string spanName,
Action<TState> callback = null)
{
async Task<TState> WrapActionAsync(TState state)
{
Expand All @@ -778,16 +815,18 @@ async Task<TState> WrapActionAsync(TState state)
}
else if (predicate.Invoke(state))
{
if (state.ReceivedMessage.ObjectType == Constants.HeaderValueForMessageObjectType)
if (state.ReceivedMessage.ObjectType == Constants.HeaderValueForMessageObjectType
&& state.ReceivedMessage.Message is not null)
{
if (state.ReceivedMessage.Message is null)
{ state.ReceivedMessage.Message = _serializationProvider.Deserialize<Message>(state.ReceivedMessage.Body); }

state.ReceivedMessage.Message.Body = await action(state.ReceivedMessage.Message.Body).ConfigureAwait(false);
state.ReceivedMessage.Message.Body = await action(state.ReceivedMessage.Message.Body)
.ConfigureAwait(false);
}
else
{ state.ReceivedMessage.Body = await action(state.ReceivedMessage.Body).ConfigureAwait(false); }
}

if (callback is not null)
{ callback(state); }
}
catch (Exception ex)
{
Expand Down Expand Up @@ -818,7 +857,6 @@ async Task<TState> WrapPublishAsync(TState state)
try
{
await rabbitService.Publisher.QueueMessageAsync(state.SendMessage).ConfigureAwait(false);
state.SendMessageSent = true;
}
// Shutdown is likely in progress, so we can't publish this message.
catch (InvalidOperationException ex) when (ex.Message.StartsWith("AutoPublisher"))
Expand Down
8 changes: 3 additions & 5 deletions src/HouseofCat.RabbitMQ/Dataflows/RabbitWorkState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ namespace HouseofCat.RabbitMQ.Dataflows;
public interface IRabbitWorkState : IWorkState
{
IReceivedMessage ReceivedMessage { get; set; }

ReadOnlyMemory<byte> SendData { get; set; }
IMessage SendMessage { get; set; }
bool SendMessageSent { get; set; }
}

public abstract class RabbitWorkState : IRabbitWorkState
Expand All @@ -20,14 +21,11 @@ public abstract class RabbitWorkState : IRabbitWorkState
public virtual IReceivedMessage ReceivedMessage { get; set; }

public virtual ReadOnlyMemory<byte> SendData { get; set; }

public virtual IMessage SendMessage { get; set; }
public virtual bool SendMessageSent { get; set; }

public virtual IDictionary<string, object> Data { get; set; }

public virtual IDictionary<string, bool> StepSuccess { get; set; }
public virtual string StepIdentifier { get; set; }

public bool IsFaulted { get; set; }
public ExceptionDispatchInfo EDI { get; set; }

Expand Down
20 changes: 10 additions & 10 deletions src/HouseofCat.RabbitMQ/Messages/ReceivedMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ public interface IReceivedMessage

string ObjectType { get; }

bool Encrypted { get; }
string EncryptionType { get; }
DateTime EncryptedDateTime { get; }
bool Encrypted { get; set; }
string EncryptionType { get; set; }
DateTime EncryptedDateTime { get; set; }

bool Compressed { get; }
string CompressionType { get; }
bool Compressed { get; set; }
string CompressionType { get; set; }

public string TraceParentHeader { get; }
public SpanContext? ParentSpanContext { get; set; }
Expand Down Expand Up @@ -52,12 +52,12 @@ public class ReceivedMessage : IReceivedMessage, IDisposable

public string ObjectType { get; private set; }

public bool Encrypted { get; private set; }
public string EncryptionType { get; private set; }
public DateTime EncryptedDateTime { get; private set; }
public bool Encrypted { get; set; }
public string EncryptionType { get; set; }
public DateTime EncryptedDateTime { get; set; }

public bool Compressed { get; private set; }
public string CompressionType { get; private set; }
public bool Compressed { get; set; }
public string CompressionType { get; set; }

public string TraceParentHeader { get; private set; }
public SpanContext? ParentSpanContext { get; set; }
Expand Down
Loading

0 comments on commit 06a0819

Please sign in to comment.