diff --git a/src/CommunityToolkit.Datasync.Client/Offline/OfflineDbContext.cs b/src/CommunityToolkit.Datasync.Client/Offline/OfflineDbContext.cs
index 6e9b7cc4..a217ea03 100644
--- a/src/CommunityToolkit.Datasync.Client/Offline/OfflineDbContext.cs
+++ b/src/CommunityToolkit.Datasync.Client/Offline/OfflineDbContext.cs
@@ -99,6 +99,12 @@ public abstract partial class OfflineDbContext : DbContext
///
internal OperationsQueueManager QueueManager { get; }
+ ///
+ /// An event delegate that allows the app to monitor synchronization events.
+ ///
+ /// This event can be called from background threads.
+ public event EventHandler? SynchronizationProgress;
+
///
/// Initializes a new instance of the class. The
/// method will be called to
@@ -561,6 +567,15 @@ public async Task SaveChangesAsync(bool acceptAllChangesOnSuccess, bool add
return await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken).ConfigureAwait(false);
}
+ ///
+ /// Sends a synchronization event to the consumers.
+ ///
+ /// The event arguments.
+ internal void SendSynchronizationEvent(SynchronizationEventArgs eventArgs)
+ {
+ SynchronizationProgress?.Invoke(this, eventArgs);
+ }
+
#region IDisposable
///
/// Ensure that the context has not been disposed.
diff --git a/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs b/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs
index 22578310..8341ec6b 100644
--- a/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs
+++ b/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs
@@ -14,6 +14,7 @@
using System.Reflection;
using System.Text.Json;
using System.Text.Json.Serialization;
+using static CommunityToolkit.Datasync.Client.Offline.Operations.PullOperationManager;
namespace CommunityToolkit.Datasync.Client.Offline.Operations;
@@ -53,61 +54,87 @@ public async Task ExecuteAsync(IEnumerable requests, Pu
QueueHandler databaseUpdateQueue = new(1, async pullResponse =>
{
- DateTimeOffset lastSynchronization = await DeltaTokenStore.GetDeltaTokenAsync(pullResponse.QueryId, cancellationToken).ConfigureAwait(false);
- foreach (object item in pullResponse.Items)
+ if (pullResponse.Items.Any())
{
- EntityMetadata metadata = EntityResolver.GetEntityMetadata(item, pullResponse.EntityType);
- object? originalEntity = await context.FindAsync(pullResponse.EntityType, [metadata.Id], cancellationToken).ConfigureAwait(false);
-
- if (originalEntity is null && !metadata.Deleted)
- {
- _ = context.Add(item);
- result.IncrementAdditions();
- }
- else if (originalEntity is not null && metadata.Deleted)
+ DateTimeOffset lastSynchronization = await DeltaTokenStore.GetDeltaTokenAsync(pullResponse.QueryId, cancellationToken).ConfigureAwait(false);
+ foreach (object item in pullResponse.Items)
{
- _ = context.Remove(originalEntity);
- result.IncrementDeletions();
- }
- else if (originalEntity is not null && !metadata.Deleted)
- {
- // Gather properties marked with [JsonIgnore]
- HashSet ignoredProps = pullResponse.EntityType
- .GetProperties(BindingFlags.Public | BindingFlags.Instance)
- .Where(p => p.IsDefined(typeof(JsonIgnoreAttribute), inherit: true))
- .Select(p => p.Name)
- .ToHashSet();
-
- EntityEntry originalEntry = context.Entry(originalEntity);
- EntityEntry newEntry = context.Entry(item);
-
- // Only copy properties that are not marked with [JsonIgnore]
- foreach (IProperty property in originalEntry.Metadata.GetProperties())
+ EntityMetadata metadata = EntityResolver.GetEntityMetadata(item, pullResponse.EntityType);
+ object? originalEntity = await context.FindAsync(pullResponse.EntityType, [metadata.Id], cancellationToken).ConfigureAwait(false);
+
+ if (originalEntity is null && !metadata.Deleted)
+ {
+ _ = context.Add(item);
+ result.IncrementAdditions();
+ }
+ else if (originalEntity is not null && metadata.Deleted)
+ {
+ _ = context.Remove(originalEntity);
+ result.IncrementDeletions();
+ }
+ else if (originalEntity is not null && !metadata.Deleted)
{
- if (!ignoredProps.Contains(property.Name))
+ // Gather properties marked with [JsonIgnore]
+ HashSet ignoredProps = pullResponse.EntityType
+ .GetProperties(BindingFlags.Public | BindingFlags.Instance)
+ .Where(p => p.IsDefined(typeof(JsonIgnoreAttribute), inherit: true))
+ .Select(p => p.Name)
+ .ToHashSet();
+
+ EntityEntry originalEntry = context.Entry(originalEntity);
+ EntityEntry newEntry = context.Entry(item);
+
+ // Only copy properties that are not marked with [JsonIgnore]
+ foreach (IProperty property in originalEntry.Metadata.GetProperties())
{
- originalEntry.Property(property.Name).CurrentValue = newEntry.Property(property.Name).CurrentValue;
+ if (!ignoredProps.Contains(property.Name))
+ {
+ originalEntry.Property(property.Name).CurrentValue = newEntry.Property(property.Name).CurrentValue;
+ }
}
+
+ result.IncrementReplacements();
}
- result.IncrementReplacements();
+ if (metadata.UpdatedAt > lastSynchronization)
+ {
+ lastSynchronization = metadata.UpdatedAt.Value;
+ bool isAdded = await DeltaTokenStore.SetDeltaTokenAsync(pullResponse.QueryId, metadata.UpdatedAt.Value, cancellationToken).ConfigureAwait(false);
+ if (isAdded)
+ {
+ // Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails.
+ _ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
+ }
+ }
}
- if (metadata.UpdatedAt > lastSynchronization)
+ if (pullOptions.SaveAfterEveryServiceRequest)
{
- lastSynchronization = metadata.UpdatedAt.Value;
- bool isAdded = await DeltaTokenStore.SetDeltaTokenAsync(pullResponse.QueryId, metadata.UpdatedAt.Value, cancellationToken).ConfigureAwait(false);
- if (isAdded)
- {
- // Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails.
- _ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
- }
+ _ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
}
+
+ context.SendSynchronizationEvent(new SynchronizationEventArgs()
+ {
+ EventType = SynchronizationEventType.ItemsCommitted,
+ EntityType = pullResponse.EntityType,
+ ItemsProcessed = pullResponse.TotalItemsProcessed,
+ ItemsTotal = pullResponse.TotalRequestItems,
+ QueryId = pullResponse.QueryId
+ });
}
- if (pullOptions.SaveAfterEveryServiceRequest)
+ if (pullResponse.Completed)
{
- _ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false);
+ context.SendSynchronizationEvent(new SynchronizationEventArgs()
+ {
+ EventType = SynchronizationEventType.PullEnded,
+ EntityType = pullResponse.EntityType,
+ ItemsProcessed = pullResponse.TotalItemsProcessed,
+ ItemsTotal = pullResponse.TotalRequestItems,
+ QueryId = pullResponse.QueryId,
+ Exception = pullResponse.Exception,
+ ServiceResponse = pullResponse.Exception is DatasyncPullException ex ? ex.ServiceResponse : null
+ });
}
});
@@ -116,14 +143,34 @@ public async Task ExecuteAsync(IEnumerable requests, Pu
Uri endpoint = ExecutableOperation.MakeAbsoluteUri(pullRequest.HttpClient.BaseAddress, pullRequest.Endpoint);
Uri requestUri = new UriBuilder(endpoint) { Query = pullRequest.QueryDescription.ToODataQueryString() }.Uri;
Type pageType = typeof(Page<>).MakeGenericType(pullRequest.EntityType);
+ long itemsProcessed = 0;
+ long totalCount = 0;
try
{
bool completed = false;
+ // Signal we started the pull operation.
+ context.SendSynchronizationEvent(new SynchronizationEventArgs()
+ {
+ EventType = SynchronizationEventType.PullStarted,
+ EntityType = pullRequest.EntityType,
+ QueryId = pullRequest.QueryId
+ });
do
{
Page