Skip to content

Commit

Permalink
[Event Hubs] GeoDR - OffsetString (#47133)
Browse files Browse the repository at this point in the history
* [Event Hubs] GeoDR - OffsetString

The focus of these changes is to introduce members for the string-based offset
format and mark the numeric-based offset members as obsolete.  This is part of
the Event Hubs GeoDR feature design for the service, where replication-based
offsets will be composite strings and no longer parsable to numeric values.

Not included in these changes is the error handling for the new error for an
invalid reader position after failover.  The error details are still being
finalized by the service and will be included in a future change set.

* Fixing AOT warning

* Fixing build error from rebase and regenerating net8.0 API listings.

* Fixing comments

---------

Co-authored-by: Madalyn Redding <[email protected]>
  • Loading branch information
jsquire and m-redding authored Dec 3, 2024
1 parent 7efe3d1 commit 1d41111
Show file tree
Hide file tree
Showing 96 changed files with 1,644 additions and 881 deletions.
1 change: 1 addition & 0 deletions eng/ApiListing.exclude-attributes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ T:System.Runtime.CompilerServices.AsyncStateMachineAttribute
T:System.Runtime.CompilerServices.CompilerGeneratedAttribute
T:System.Runtime.CompilerServices.NullableContextAttribute
T:System.Runtime.CompilerServices.NullableAttribute
T:System.Runtime.CompilerServices.IsReadOnlyAttribute
T:Azure.Core.CodeGenSuppressAttribute
T:Azure.Core.CodeGenModelAttribute
T:Azure.Core.CodeGenMemberAttribute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public EventProcessorClient(Azure.Storage.Blobs.BlobContainerClient checkpointSt
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
[System.ObsoleteAttribute("The Event Hubs service does not guarantee a numeric offset for all resource configurations. Checkpoints created from a numeric offset may not work in all cases going forward. Please use a string-based offset via the overload accepting 'CheckpointPosition' instead.", false)]
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
protected override System.Threading.Tasks.Task ValidateProcessingPreconditions(System.Threading.CancellationToken cancellationToken) { throw null; }
}
Expand Down Expand Up @@ -71,6 +73,8 @@ public BlobCheckpointStore(Azure.Storage.Blobs.BlobContainerClient blobContainer
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ClaimOwnershipAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership> desiredOwnership, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint> GetCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ListOwnershipAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
[System.ObsoleteAttribute("The Event Hubs service does not guarantee a numeric offset for all resource configurations. Checkpoints created from a numeric offset may not work in all cases going forward. Please use a string-based offset via the overload accepting 'CheckpointPosition' instead.", false)]
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, string clientIdentifier, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public EventProcessorClient(Azure.Storage.Blobs.BlobContainerClient checkpointSt
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
[System.ObsoleteAttribute("The Event Hubs service does not guarantee a numeric offset for all resource configurations. Checkpoints created from a numeric offset may not work in all cases going forward. Please use a string-based offset via the overload accepting 'CheckpointPosition' instead.", false)]
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
protected override System.Threading.Tasks.Task ValidateProcessingPreconditions(System.Threading.CancellationToken cancellationToken) { throw null; }
}
Expand Down Expand Up @@ -71,6 +73,8 @@ public BlobCheckpointStore(Azure.Storage.Blobs.BlobContainerClient blobContainer
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ClaimOwnershipAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership> desiredOwnership, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint> GetCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ListOwnershipAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
[System.ObsoleteAttribute("The Event Hubs service does not guarantee a numeric offset for all resource configurations. Checkpoints created from a numeric offset may not work in all cases going forward. Please use a string-based offset via the overload accepting 'CheckpointPosition' instead.", false)]
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, string clientIdentifier, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ EventData eventData = EventHubsModelFactory.EventData(
systemProperties: new Dictionary<string, object>(), //arbitrary value
partitionKey: "sample-key",
sequenceNumber: 1000,
offset: 1500,
offsetString: "1500:1:3344.1",
enqueuedTime: DateTimeOffset.Parse("11:36 PM"));

// This creates a new instance of ProcessEventArgs to pass into the handler directly.
Expand Down Expand Up @@ -110,7 +110,7 @@ TimerCallback dispatchEvent = async _ =>
systemProperties: new Dictionary<string, object>(), //arbitrary value
partitionKey: "sample-key",
sequenceNumber: 1000,
offset: 1500,
offsetString: "1500:1:1111",
enqueuedTime: DateTimeOffset.Parse("11:36 PM"));

ProcessEventArgs eventArgs = new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<Description>Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This library extends its Event Processor with durable storage for checkpoint information using Azure Blob storage. For more information about Event Hubs, see https://azure.microsoft.com/en-us/services/event-hubs/</Description>
<Version>5.12.0-beta.2</Version>
<!--The ApiCompatVersion is managed automatically and should not generally be modified manually.-->
<!--The ApiCompatVersion is managed automatically and should not generally be modified manually. -->
<ApiCompatVersion>5.11.5</ApiCompatVersion>
<PackageTags>Azure;Event Hubs;EventHubs;.NET;Event Processor;EventProcessor;$(PackageCommonTags)</PackageTags>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
Expand All @@ -16,7 +16,9 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" />
<!-- TEMP TEMP -->
<ProjectReference Include="../../Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj" />
<!-- PackageReference Include="Azure.Messaging.EventHubs" /-->
<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Microsoft.Azure.Amqp" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ partial void InvalidCheckpointFound(string partitionId,
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with the checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with the checkpoint.</param>
/// <param name="offset">The offset associated with the checkpoint.</param>
/// <param name="exception">The exception that occurred.</param>
///
Expand All @@ -107,10 +106,9 @@ partial void UpdateCheckpointError(string partitionId,
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset,
Exception exception) =>
Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, exception.Message, sequenceNumber, replicationSegment, offset);
Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, exception.Message, sequenceNumber, offset);

/// <summary>
/// Indicates that an attempt to update a checkpoint has completed.
Expand All @@ -122,7 +120,6 @@ partial void UpdateCheckpointError(string partitionId,
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
/// <param name="offset">The offset associated with this checkpoint.</param>
///
partial void UpdateCheckpointComplete(string partitionId,
Expand All @@ -131,9 +128,8 @@ partial void UpdateCheckpointComplete(string partitionId,
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset) =>
Logger.UpdateCheckpointComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, replicationSegment, offset);
Logger.UpdateCheckpointComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, offset);

/// <summary>
/// Indicates that an attempt to create/update a checkpoint has started.
Expand All @@ -145,7 +141,6 @@ partial void UpdateCheckpointComplete(string partitionId,
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
/// <param name="offset">The offset associated with this checkpoint.</param>
///
partial void UpdateCheckpointStart(string partitionId,
Expand All @@ -154,9 +149,8 @@ partial void UpdateCheckpointStart(string partitionId,
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset) =>
Logger.UpdateCheckpointStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, replicationSegment, offset);
Logger.UpdateCheckpointStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, offset);

/// <summary>
/// Indicates that an attempt to retrieve claim partition ownership has completed.
Expand Down
Loading

0 comments on commit 1d41111

Please sign in to comment.