Skip to content

Commit

Permalink
com.utilities.rest 3.3.0 (#90)
Browse files Browse the repository at this point in the history
- Fixed duplicate server sent events being invoked. They are now enqueued and processed in the order they are received.
- com.utilities.async -> 2.2.1
  • Loading branch information
StephenHodgson authored Nov 4, 2024
1 parent ea6b89f commit 669a2b5
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 24 deletions.
72 changes: 50 additions & 22 deletions Runtime/Rest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,9 @@ UnityWebRequest.kHttpVerbPUT or
}
}

var serverSentEventQueue = new Queue<Tuple<Response, ServerSentEvent>>();
CancellationTokenSource serverSentEventCts = null;

if (parameters is { Progress: not null } ||
serverSentEventHandler != null)
{
Expand All @@ -1262,7 +1265,7 @@ async void CallbackThread()
{
if (serverSentEventHandler != null)
{
await SendServerEventCallback(false, requestBody).ConfigureAwait(true);
EnqueueServerSentEventCallbacks();
}

if (parameters is { Progress: not null })
Expand Down Expand Up @@ -1317,22 +1320,61 @@ async void CallbackThread()
}
}

async void ServerSentEventQueue()
{
serverSentEventCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
do
{
try
{
await Awaiters.UnityMainThread;

if (serverSentEventQueue.TryDequeue(out var @event))
{
var (sseResponse, ssEvent) = @event;
await serverSentEventHandler.Invoke(sseResponse, ssEvent);
}
}
catch (Exception e)
{
Debug.LogError(e);
}
} while (!serverSentEventCts.Token.IsCancellationRequested);
}
#pragma warning disable CS4014 // We purposefully don't await this task, so it will run on a background thread.
Task.Run(CallbackThread, cancellationToken);

if (serverSentEventHandler != null)
{
Task.Run(ServerSentEventQueue, cancellationToken);
}
#pragma warning restore CS4014
}

try
{
await webRequest.SendWebRequest();
}

catch (Exception e)
{
return new Response(webRequest.url, webRequest.method, requestBody, false, $"{nameof(Rest)}.{nameof(SendAsync)}::{nameof(UnityWebRequest.SendWebRequest)} Failed!", null, -1, null, parameters, e.ToString());
}
finally
{
parameters?.Progress?.Report(new Progress(webRequest.downloadedBytes, webRequest.downloadedBytes, 100f, 0, Progress.DataUnit.b));

parameters?.Progress?.Report(new Progress(webRequest.downloadedBytes, webRequest.downloadedBytes, 100f, 0, Progress.DataUnit.b));
if (serverSentEventHandler != null)
{
EnqueueServerSentEventCallbacks();
}

if (serverSentEventCts != null)
{
await new WaitUntil(() => serverSentEventQueue.Count == 0);
serverSentEventCts?.Cancel();
serverSentEventCts?.Dispose();
}
}

if (webRequest.result is
UnityWebRequest.Result.ConnectionError or
Expand All @@ -1342,23 +1384,18 @@ UnityWebRequest.Result.ConnectionError or
return new Response(webRequest, requestBody, false, parameters);
}

if (serverSentEventHandler != null)
{
await SendServerEventCallback(true, requestBody).ConfigureAwait(true);
}

return new Response(webRequest, requestBody, true, parameters);

async Task SendServerEventCallback(bool isEnd, string body)
void EnqueueServerSentEventCallbacks()
{
var allEventMessages = webRequest.downloadHandler?.text;
if (string.IsNullOrWhiteSpace(allEventMessages)) { return; }

var matches = sseRegex.Matches(allEventMessages!);
var stride = isEnd ? 0 : 1;
parameters ??= new RestParameters();
var eventCount = parameters.ServerSentEventCount;

for (var i = parameters.ServerSentEventCount; i < matches.Count - stride; i++)
for (var i = eventCount; i < matches.Count; i++)
{
ServerSentEventKind type;
string value;
Expand Down Expand Up @@ -1404,17 +1441,8 @@ async Task SendServerEventCallback(bool isEnd, string body)
@event.Data = null;
}

var sseResponse = new Response(webRequest, body, true, parameters, (@event.Data ?? @event.Value).ToString(Formatting.None));

try
{
await serverSentEventHandler.Invoke(sseResponse, @event).ConfigureAwait(true);
}
catch (Exception e)
{
Debug.LogError(e);
}

var sseResponse = new Response(webRequest, requestBody, true, parameters, (@event.Data ?? @event.Value).ToString(Formatting.None));
serverSentEventQueue.Enqueue(Tuple.Create(sseResponse, @event));
parameters.ServerSentEventCount++;
parameters.ServerSentEvents.Add(@event);
}
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"displayName": "Utilities.Rest",
"description": "This package contains useful RESTful utilities for the Unity Game Engine.",
"keywords": [],
"version": "3.2.5",
"version": "3.3.0",
"unity": "2021.3",
"documentationUrl": "https://github.com/RageAgainstThePixel/com.utilities.rest#documentation",
"changelogUrl": "https://github.com/RageAgainstThePixel/com.utilities.rest/releases",
Expand All @@ -15,7 +15,7 @@
"author": "Stephen Hodgson",
"url": "https://github.com/StephenHodgson",
"dependencies": {
"com.utilities.async": "2.1.7",
"com.utilities.async": "2.2.1",
"com.utilities.extensions": "1.1.16",
"com.unity.modules.unitywebrequest": "1.0.0",
"com.unity.modules.unitywebrequestassetbundle": "1.0.0",
Expand Down

0 comments on commit 669a2b5

Please sign in to comment.