Skip to content

Commit

Permalink
Add a semaphore for receiving data through WebSocket (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
occluder authored Dec 14, 2023
1 parent 5077248 commit c16f570
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 6 deletions.
5 changes: 5 additions & 0 deletions MiniTwitch.Common/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# MiniTwitch.Common Changelog

## Upcoming version

- Minor: Improved handling of WebSocket reads
8 changes: 7 additions & 1 deletion MiniTwitch.Common/Internal/Models/ByteBucket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ public ByteBucket(short tempBufferSize = 2048)
/// <returns><see langword="true"/> if the bucket should be drained</returns>
public async ValueTask<bool> FillFrom(ClientWebSocket source, CancellationToken cToken)
{
ValueTask<ValueWebSocketReceiveResult> valueTask = source.ReceiveAsync(_temp, cToken);
if (valueTask.IsCanceled || cToken.IsCancellationRequested)
{
return false;
}

// Receive bytes into _temp
// This overwrites previous data
ValueWebSocketReceiveResult result = await source.ReceiveAsync(_temp, cToken);
ValueWebSocketReceiveResult result = await valueTask.ConfigureAwait(false);
// Copy the received bytes into the bucket
_temp[..result.Count].CopyTo(_bucket[_reached..(result.Count + _reached)]);
_reached += result.Count;
Expand Down
17 changes: 12 additions & 5 deletions MiniTwitch.Common/WebSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ public sealed class WebSocketClient : IAsyncDisposable
#endregion

#region Fields
private readonly SemaphoreSlim _sendLock = new(1);
private readonly SemaphoreSlim _reconnectionLock = new(0);
private readonly SemaphoreSlim _receiveLock = new(1);
private readonly SemaphoreSlim _sendLock = new(1);
private readonly TimeSpan _reconnectDelay;
private readonly ByteBucket _bucket;
private CancellationTokenSource _cts;
Expand Down Expand Up @@ -148,6 +149,7 @@ private async Task Receive()
{
try
{
await _receiveLock.WaitAsync(_cts.Token);
// Continue if not at the end of message
if (!await _bucket.FillFrom(_client, _cts.Token))
continue;
Expand All @@ -157,7 +159,8 @@ private async Task Receive()
}
catch (WebSocketException wse)
{
Log(LogLevel.Critical, "An error occurred while receiving data from the WebSocket connection: {msg}", wse.Message);
Log(LogLevel.Critical, "An error occurred while receiving data from the WebSocket connection: {msg}",
wse.Message);
break;
}
catch (InvalidOperationException)
Expand All @@ -174,6 +177,10 @@ private async Task Receive()
{
LogException(ex, "Exception caught in data receiver: ");
}
finally
{
_ = _receiveLock.Release();
}
}

// Don't restart if it's a user disconnect
Expand All @@ -186,16 +193,16 @@ public async ValueTask SendAsync(string data, bool sensitive = false, Cancellati
var (written, bytes) = StringToBytes(data);
if (!await _sendLock.WaitAsync(TimeSpan.FromSeconds(10), cancellationToken).ConfigureAwait(false))
{
Log(LogLevel.Warning, "{method} timed out after 10 seconds.", nameof(SendAsync));
Log(LogLevel.Warning, "{Method} timed out after 10 seconds.", nameof(SendAsync));
return;
}
else if (!this.IsConnected)
{
Log(LogLevel.Warning, "Cannot send data in non-connect state. ({state})", _client.State);
Log(LogLevel.Warning, "Cannot send data in non-connect state. ({State})", _client.State);
}

if (!sensitive)
Log(LogLevel.Debug, "Sending data: {msg}", data);
Log(LogLevel.Debug, "Sending data: {Message}", data);

try
{
Expand Down

0 comments on commit c16f570

Please sign in to comment.