Skip to content

Commit

Permalink
Merge pull request #1697 from Cratis/fix/event-constraint-append-many
Browse files Browse the repository at this point in the history
Fix/event constraint append many
  • Loading branch information
woksin authored Feb 7, 2025
2 parents 704b617 + cc35265 commit 6b39b98
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 74 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Concepts.Events;
using Cratis.Chronicle.EventSequences;
using Cratis.Chronicle.Integration.Base;
using context = Cratis.Chronicle.Integration.Orleans.InProcess.for_EventSequence.when_appending.many_with_first_event_violating_unique_constraint.context;

namespace Cratis.Chronicle.Integration.Orleans.InProcess.for_EventSequence.when_appending;

[Collection(GlobalCollection.Name)]
public class many_with_first_event_violating_unique_constraint(context context) : Given<context>(context)
{
public class context(GlobalFixture globalFixture) : IntegrationSpecificationContext(globalFixture)
{
public override IEnumerable<Type> ConstraintTypes => [typeof(UniqueUserConstraint)];
public override IEnumerable<Type> EventTypes => [typeof(UserOnboardingStarted), typeof(UserRemoved)];

public UserOnboardingStarted Event { get; private set; }

public AppendManyResult Result { get; private set; }

public async Task Establish()
{
Event = new UserOnboardingStarted(Guid.NewGuid().ToString(), Guid.NewGuid().ToString());
await EventStore.EventLog.Append(Guid.NewGuid().ToString(), Event);
}

public async Task Because()
{
Result = await EventStore.EventLog.AppendMany(Guid.NewGuid().ToString(), [Event, new UserRemoved()]);
}
}

[Fact] void should_not_succeed_on_second_attempt() => Context.Result.IsSuccess.ShouldBeFalse();
[Fact] void should_not_commit_any_of_the_two_events() => Context.EventLogSequenceGrain.GetTailSequenceNumber().Result.Value.ShouldEqual(EventSequenceNumber.First.Value);

Check warning on line 36 in Integration/Orleans.InProcess/for_EventSequence/when_appending/many_with_first_event_violating_unique_constraint.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

Check warning on line 36 in Integration/Orleans.InProcess/for_EventSequence/when_appending/many_with_first_event_violating_unique_constraint.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

Check warning on line 36 in Integration/Orleans.InProcess/for_EventSequence/when_appending/many_with_first_event_violating_unique_constraint.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

Check warning on line 36 in Integration/Orleans.InProcess/for_EventSequence/when_appending/many_with_first_event_violating_unique_constraint.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

Check warning on line 36 in Integration/Orleans.InProcess/for_EventSequence/when_appending/many_with_first_event_violating_unique_constraint.cs

View workflow job for this annotation

GitHub Actions / integration

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)
}

240 changes: 168 additions & 72 deletions Source/Kernel/Grains/EventSequences/EventSequence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Immutable;
using System.Dynamic;
using System.Text.Json.Nodes;
using Cratis.Chronicle.Compliance;
using Cratis.Chronicle.Concepts;
Expand Down Expand Up @@ -149,75 +150,19 @@ public async Task<AppendResult> Append(
{
try
{
var eventSchema = await EventTypesStorage.GetFor(eventType.Id, eventType.Generation);

var compliantEvent = await jsonComplianceManagerProvider.Apply(_eventSequenceKey.EventStore, _eventSequenceKey.Namespace, eventSchema.Schema, eventSourceId, content);
var compliantEventAsExpandoObject = expandoObjectConverter.ToExpandoObject(compliantEvent, eventSchema.Schema);

var constraintContext = _constraints!.Establish(eventSourceId, eventType.Id, compliantEventAsExpandoObject);
var constraintValidationResult = await constraintContext.Validate();
if (!constraintValidationResult.IsValid)
{
_metrics?.ConstraintViolation(eventSourceId, eventType.Id);
return AppendResult.Failed(correlationId, constraintValidationResult.Violations);
}

Result<AppendedEvent, AppendEventError>? appendResult = null;

var identity = await IdentityStorage.GetFor(causedBy);
do
var getValidAndCompliantEvent = await GetValidAndCompliantEvent(eventSourceType, eventSourceId, eventStreamId, eventType, content, correlationId);
if (getValidAndCompliantEvent.TryGetError(out var error))
{
await HandleFailedAppendResult(appendResult, eventType, eventSourceId, eventType.Id);
var occurred = DateTimeOffset.UtcNow;
logger.Appending(
_eventSequenceKey.EventStore,
_eventSequenceKey.Namespace,
_eventSequenceId,
eventType,
eventSourceId,
State.SequenceNumber);

appendResult = await EventSequenceStorage.Append(
State.SequenceNumber,
eventSourceType,
eventSourceId,
eventStreamType,
eventStreamId,
eventType,
correlationId,
causation,
identity,
occurred,
compliantEventAsExpandoObject);
return error;
}
while(!appendResult.IsSuccess);

var appendedSequenceNumber = State.SequenceNumber;
State.SequenceNumber = appendedSequenceNumber.Next();
State.TailSequenceNumberPerEventType[eventType.Id] = appendedSequenceNumber;
await WriteStateAsync();
var (compliantEvent, constraintContext) = getValidAndCompliantEvent.AsT0;

_metrics?.AppendedEvent(eventSourceId, eventType.Id);
var appendedEvents = new[] { (AppendedEvent)appendResult }.ToList();
await (_appendedEventsQueues?.Enqueue(appendedEvents) ?? Task.CompletedTask);
await constraintContext.Update(State.SequenceNumber);

return AppendResult.Success(correlationId, appendedSequenceNumber);
return await AppendValidAndCompliantEvent(eventSourceType, eventSourceId, eventStreamType, eventStreamId, eventType, correlationId, causation, causedBy, compliantEvent, constraintContext);
}
catch (Exception ex)
{
_metrics?.FailedAppending(eventSourceId, eventType.Id);
logger.ErrorAppending(
_eventSequenceKey.EventStore,
_eventSequenceKey.Namespace,
eventType,
eventStreamId.Value,
eventSourceType,
eventSourceId,
State.SequenceNumber,
ex);

return AppendResult.Failed(correlationId, [ex.Message]);
return HandleAppendEventException(ex, eventSourceType, eventSourceId, eventType, eventStreamId, correlationId);
}
}

Expand All @@ -228,20 +173,48 @@ public async Task<AppendManyResult> AppendMany(
IEnumerable<Causation> causation,
Identity causedBy)
{
var results = new List<AppendResult>();
var tasks = events.Select(async e =>
{
var result = await GetValidAndCompliantEvent(e.EventSourceType, e.EventSourceId, e.eventStreamId, e.EventType, e.Content, correlationId);
return (Event: e, Result: result);
});

var getValidAndCompliantEvents = await Task.WhenAll(tasks);
var failedEvents = getValidAndCompliantEvents.Where(eventAndResult => !eventAndResult.Result.IsSuccess).ToList();

if (failedEvents.Count != 0)
{
return new()
{
CorrelationId = correlationId,
ConstraintViolations = failedEvents.SelectMany(r => r.Result.AsT1.ConstraintViolations).ToImmutableList(),
Errors = failedEvents.SelectMany(r => r.Result.AsT1.Errors).ToImmutableList(),
};
}

foreach (var @event in events)
var results = new List<AppendResult>();
foreach (var (eventToAppend, validAndCompliantEvent) in getValidAndCompliantEvents)
{
results.Add(await Append(
@event.EventSourceType,
@event.EventSourceId,
@event.eventStreamType,
@event.eventStreamId,
@event.EventType,
@event.Content,
var (compliantEvent, constraintContext) = validAndCompliantEvent.AsT0;
var appendResult = await AppendValidAndCompliantEvent(
eventToAppend.EventSourceType,
eventToAppend.EventSourceId,
eventToAppend.eventStreamType,
eventToAppend.eventStreamId,
eventToAppend.EventType,
correlationId,
causation,
causedBy));
causedBy,
compliantEvent,
constraintContext);
results.Add(appendResult);
if (appendResult.IsSuccess)
{
continue;
}

logger.FailedDuringAppendingManyEvents();
break;
}

return new()
Expand Down Expand Up @@ -323,6 +296,129 @@ await IdentityStorage.GetFor(causedBy),
await RewindPartitionForAffectedObservers(eventSourceId, affectedEventTypes);
}

async Task<AppendResult> AppendValidAndCompliantEvent(
EventSourceType eventSourceType,
EventSourceId eventSourceId,
EventStreamType eventStreamType,
EventStreamId eventStreamId,
EventType eventType,
CorrelationId correlationId,
IEnumerable<Causation> causation,
Identity causedBy,
ExpandoObject compliantEvent,
ConstraintValidationContext constraintContext)
{
try
{
Result<AppendedEvent, AppendEventError>? appendResult = null;

var identity = await IdentityStorage.GetFor(causedBy);
do
{
await HandleFailedAppendResult(appendResult, eventType, eventSourceId, eventType.Id);
var occurred = DateTimeOffset.UtcNow;
logger.Appending(
_eventSequenceKey.EventStore,
_eventSequenceKey.Namespace,
_eventSequenceId,
eventType,
eventSourceId,
State.SequenceNumber);

appendResult = await EventSequenceStorage.Append(
State.SequenceNumber,
eventSourceType,
eventSourceId,
eventStreamType,
eventStreamId,
eventType,
correlationId,
causation,
identity,
occurred,
compliantEvent);
}
while(!appendResult.IsSuccess);

var appendedSequenceNumber = State.SequenceNumber;
State.SequenceNumber = appendedSequenceNumber.Next();
State.TailSequenceNumberPerEventType[eventType.Id] = appendedSequenceNumber;
await WriteStateAsync();

_metrics?.AppendedEvent(eventSourceId, eventType.Id);
var appendedEvents = new[] { (AppendedEvent)appendResult }.ToList();
await (_appendedEventsQueues?.Enqueue(appendedEvents) ?? Task.CompletedTask);
await constraintContext.Update(State.SequenceNumber);

return AppendResult.Success(correlationId, appendedSequenceNumber);
}
catch (Exception ex)
{
return HandleAppendEventException(ex, eventSourceType, eventSourceId, eventType, eventStreamId, correlationId);
}
}

async Task<Result<(ExpandoObject CompliantEvent, ConstraintValidationContext ConstraintValidationContext), AppendResult>> GetValidAndCompliantEvent(
EventSourceType eventSourceType,
EventSourceId eventSourceId,
EventStreamId eventStreamId,
EventType eventType,
JsonObject content,
CorrelationId correlationId)
{
try
{
var compliantEventAsExpandoObject = await MakeEventCompliant(eventSourceId, eventType, content);
var checkConstraintViolation = await CheckConstraintViolation(eventSourceId, eventType, correlationId, compliantEventAsExpandoObject);
if (checkConstraintViolation.TryGetError(out var error))
{
return error;
}

return (compliantEventAsExpandoObject, (ConstraintValidationContext)checkConstraintViolation);
}
catch (Exception ex)
{
return HandleAppendEventException(ex, eventSourceType, eventSourceId, eventType, eventStreamId, correlationId);
}
}

AppendResult HandleAppendEventException(Exception ex, EventSourceType eventSourceType, EventSourceId eventSourceId, EventType eventType, EventStreamId eventStreamId, CorrelationId correlationId)
{
_metrics?.FailedAppending(eventSourceId, eventType.Id);
logger.ErrorAppending(
_eventSequenceKey.EventStore,
_eventSequenceKey.Namespace,
eventType,
eventStreamId.Value,
eventSourceType,
eventSourceId,
State.SequenceNumber,
ex);

return AppendResult.Failed(correlationId, [ex.Message]);
}

async Task<ExpandoObject> MakeEventCompliant(EventSourceId eventSourceId, EventType eventType, JsonObject content)
{
var eventSchema = await EventTypesStorage.GetFor(eventType.Id, eventType.Generation);

var compliantEvent = await jsonComplianceManagerProvider.Apply(_eventSequenceKey.EventStore, _eventSequenceKey.Namespace, eventSchema.Schema, eventSourceId, content);
return expandoObjectConverter.ToExpandoObject(compliantEvent, eventSchema.Schema);
}

async Task<Result<ConstraintValidationContext, AppendResult>> CheckConstraintViolation(EventSourceId eventSourceId, EventType eventType, CorrelationId correlationId, ExpandoObject compliantEventAsExpandoObject)
{
var constraintContext = _constraints!.Establish(eventSourceId, eventType.Id, compliantEventAsExpandoObject);
var constraintValidationResult = await constraintContext.Validate();
if (constraintValidationResult.IsValid)
{
return constraintContext;
}
_metrics?.ConstraintViolation(eventSourceId, eventType.Id);
return AppendResult.Failed(correlationId, constraintValidationResult.Violations);
}

async Task HandleFailedAppendResult(Result<AppendedEvent, AppendEventError>? appendResult, EventType eventType, EventSourceId eventSourceId, string eventName)
{
if (appendResult is null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ internal static partial class EventSequenceLogMessages
[LoggerMessage(LogLevel.Information, "Redacting events with event source id {EventSourceId} and event types {EventTypes} in event sequence {EventSequenceId} for event store '{EventStore}' on namespace {Namespace}")]
internal static partial void RedactingMultiple(this ILogger<EventSequence> logger, EventStoreName eventStore, EventStoreNamespaceName @namespace, EventSequenceId eventSequenceId, EventSourceId eventSourceId, IEnumerable<EventType> eventTypes);

[LoggerMessage(LogLevel.Critical, "A critical error occurred while appending many events after successfully validating all event constraints. One or more events was committed while they should have been rolled back and not been written to the event store")]
internal static partial void FailedDuringAppendingManyEvents(this ILogger<EventSequence> logger);

[LoggerMessage(LogLevel.Debug, "Getting tail sequence number for event types from event sequence {EventSequenceId} for event store '{EventStore}' on namespace {Namespace} for event types {EventTypes}")]
internal static partial void GettingTailSequenceNumberForEventTypes(this ILogger<EventSequence> logger, EventStoreName eventStore, EventStoreNamespaceName @namespace, EventSequenceId eventSequenceId, IEnumerable<EventType> eventTypes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public bool CanValidate(ConstraintValidationContext context) =>
/// <inheritdoc/>
public async Task<ConstraintValidationResult> Validate(ConstraintValidationContext context)
{
var propertiesWithValues = definition.GetPropertiesAndValues(context);
if (!propertiesWithValues.Any())
var propertiesWithValues = definition.GetPropertiesAndValues(context).ToList();
if (propertiesWithValues.Count == 0)
{
return ConstraintValidationResult.Success;
}
Expand Down

0 comments on commit 6b39b98

Please sign in to comment.