diff --git a/Runtime/Rest.cs b/Runtime/Rest.cs index b06e507..d7332da 100644 --- a/Runtime/Rest.cs +++ b/Runtime/Rest.cs @@ -1241,6 +1241,9 @@ UnityWebRequest.kHttpVerbPUT or } } + var serverSentEventQueue = new Queue>(); + CancellationTokenSource serverSentEventCts = null; + if (parameters is { Progress: not null } || serverSentEventHandler != null) { @@ -1262,7 +1265,7 @@ async void CallbackThread() { if (serverSentEventHandler != null) { - await SendServerEventCallback(false, requestBody).ConfigureAwait(true); + EnqueueServerSentEventCallbacks(); } if (parameters is { Progress: not null }) @@ -1317,8 +1320,34 @@ async void CallbackThread() } } + async void ServerSentEventQueue() + { + serverSentEventCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + do + { + try + { + await Awaiters.UnityMainThread; + + if (serverSentEventQueue.TryDequeue(out var @event)) + { + var (sseResponse, ssEvent) = @event; + await serverSentEventHandler.Invoke(sseResponse, ssEvent); + } + } + catch (Exception e) + { + Debug.LogError(e); + } + } while (!serverSentEventCts.Token.IsCancellationRequested); + } #pragma warning disable CS4014 // We purposefully don't await this task, so it will run on a background thread. Task.Run(CallbackThread, cancellationToken); + + if (serverSentEventHandler != null) + { + Task.Run(ServerSentEventQueue, cancellationToken); + } #pragma warning restore CS4014 } @@ -1326,13 +1355,26 @@ async void CallbackThread() { await webRequest.SendWebRequest(); } - catch (Exception e) { return new Response(webRequest.url, webRequest.method, requestBody, false, $"{nameof(Rest)}.{nameof(SendAsync)}::{nameof(UnityWebRequest.SendWebRequest)} Failed!", null, -1, null, parameters, e.ToString()); } + finally + { + parameters?.Progress?.Report(new Progress(webRequest.downloadedBytes, webRequest.downloadedBytes, 100f, 0, Progress.DataUnit.b)); - parameters?.Progress?.Report(new Progress(webRequest.downloadedBytes, webRequest.downloadedBytes, 100f, 0, Progress.DataUnit.b)); + if (serverSentEventHandler != null) + { + EnqueueServerSentEventCallbacks(); + } + + if (serverSentEventCts != null) + { + await new WaitUntil(() => serverSentEventQueue.Count == 0); + serverSentEventCts?.Cancel(); + serverSentEventCts?.Dispose(); + } + } if (webRequest.result is UnityWebRequest.Result.ConnectionError or @@ -1342,23 +1384,18 @@ UnityWebRequest.Result.ConnectionError or return new Response(webRequest, requestBody, false, parameters); } - if (serverSentEventHandler != null) - { - await SendServerEventCallback(true, requestBody).ConfigureAwait(true); - } - return new Response(webRequest, requestBody, true, parameters); - async Task SendServerEventCallback(bool isEnd, string body) + void EnqueueServerSentEventCallbacks() { var allEventMessages = webRequest.downloadHandler?.text; if (string.IsNullOrWhiteSpace(allEventMessages)) { return; } var matches = sseRegex.Matches(allEventMessages!); - var stride = isEnd ? 0 : 1; parameters ??= new RestParameters(); + var eventCount = parameters.ServerSentEventCount; - for (var i = parameters.ServerSentEventCount; i < matches.Count - stride; i++) + for (var i = eventCount; i < matches.Count; i++) { ServerSentEventKind type; string value; @@ -1404,17 +1441,8 @@ async Task SendServerEventCallback(bool isEnd, string body) @event.Data = null; } - var sseResponse = new Response(webRequest, body, true, parameters, (@event.Data ?? @event.Value).ToString(Formatting.None)); - - try - { - await serverSentEventHandler.Invoke(sseResponse, @event).ConfigureAwait(true); - } - catch (Exception e) - { - Debug.LogError(e); - } - + var sseResponse = new Response(webRequest, requestBody, true, parameters, (@event.Data ?? @event.Value).ToString(Formatting.None)); + serverSentEventQueue.Enqueue(Tuple.Create(sseResponse, @event)); parameters.ServerSentEventCount++; parameters.ServerSentEvents.Add(@event); } diff --git a/package.json b/package.json index 5cf2aee..c645f72 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,7 @@ "displayName": "Utilities.Rest", "description": "This package contains useful RESTful utilities for the Unity Game Engine.", "keywords": [], - "version": "3.2.5", + "version": "3.3.0", "unity": "2021.3", "documentationUrl": "https://github.com/RageAgainstThePixel/com.utilities.rest#documentation", "changelogUrl": "https://github.com/RageAgainstThePixel/com.utilities.rest/releases", @@ -15,7 +15,7 @@ "author": "Stephen Hodgson", "url": "https://github.com/StephenHodgson", "dependencies": { - "com.utilities.async": "2.1.7", + "com.utilities.async": "2.2.1", "com.utilities.extensions": "1.1.16", "com.unity.modules.unitywebrequest": "1.0.0", "com.unity.modules.unitywebrequestassetbundle": "1.0.0",