From d70d3ea51e2cfc57bfcb2c9619be0ca78c4fb30f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Wed, 20 Jul 2022 17:35:26 +0600 Subject: [PATCH 01/17] new lease keep alive method added --- dotnet-etcd/LeaseExpiredException.cs | 34 +++++++++ dotnet-etcd/dotnet-etcd.csproj | 1 + dotnet-etcd/leaseClient.cs | 106 +++++++++++++++++++++++++++ 3 files changed, 141 insertions(+) create mode 100644 dotnet-etcd/LeaseExpiredException.cs diff --git a/dotnet-etcd/LeaseExpiredException.cs b/dotnet-etcd/LeaseExpiredException.cs new file mode 100644 index 0000000..962deec --- /dev/null +++ b/dotnet-etcd/LeaseExpiredException.cs @@ -0,0 +1,34 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Runtime.Serialization; + +namespace dotnet_etcd +{ + [Serializable] + public class LeaseExpiredException : Exception + { + public LeaseExpiredException() + { + } + + public LeaseExpiredException(string message) : base(message) + { + } + + public LeaseExpiredException(string message, Exception inner) : base( + message, + inner) + { + } + + protected LeaseExpiredException( + SerializationInfo info, + StreamingContext context) : base( + info, + context) + { + } + } +} diff --git a/dotnet-etcd/dotnet-etcd.csproj b/dotnet-etcd/dotnet-etcd.csproj index 492f4a9..4f70654 100644 --- a/dotnet-etcd/dotnet-etcd.csproj +++ b/dotnet-etcd/dotnet-etcd.csproj @@ -65,6 +65,7 @@ Advanced uses take advantage of the consistency guarantees to implement database all runtime; build; native; contentfiles; analyzers + diff --git a/dotnet-etcd/leaseClient.cs b/dotnet-etcd/leaseClient.cs index 5e41452..7e7a169 100644 --- a/dotnet-etcd/leaseClient.cs +++ b/dotnet-etcd/leaseClient.cs @@ -4,6 +4,7 @@ using System; using System.IO; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Etcdserverpb; @@ -109,6 +110,111 @@ public async Task LeaseKeepAlive(long leaseId, CancellationToken cancellationTok } }).ConfigureAwait(false); + + /// + /// LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client + /// to the server and streaming keep alive responses from the server to the client. + /// + /// + /// + /// throws an exception if no response + /// is received within the lease TTL or + public async Task LeaseKeepAlive(long leaseId, long leaseRemainigTTL, CancellationToken cancellationToken) + { + int retryCount = 0; + while (true) + { + var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + var attemptCancellationToken = cts.Token; + try + { + var connection = _balancer.GetConnection(); + using (AsyncDuplexStreamingCall leaser = + connection._leaseClient.LeaseKeepAlive(cancellationToken: attemptCancellationToken)) + { + var requestsChannel = Channel.CreateUnbounded(); + await requestsChannel.Writer.WriteAsync( + new LeaseKeepAliveRequest() { ID = leaseId }, + attemptCancellationToken); + var requestsTask = Task.Run( + async + () => + { + await foreach (LeaseKeepAliveRequest lkReq in requestsChannel.Reader.ReadAllAsync( + attemptCancellationToken)) + { + await leaser.RequestStream.WriteAsync(lkReq); + } + }, + attemptCancellationToken); + var responsesTask = Task.Run( + async + () => + { + var leaseEndOfLiveAlert = Task.Delay( + (int)leaseRemainigTTL * 1000, + attemptCancellationToken); + while (true) + { + var nextResponsePromise = leaser.ResponseStream.MoveNext(attemptCancellationToken); + + await Task.WhenAny( + leaseEndOfLiveAlert, + nextResponsePromise).Unwrap(); + attemptCancellationToken.ThrowIfCancellationRequested(); + if (leaseEndOfLiveAlert.IsCompleted) + { + throw new LeaseExpiredException(); + } + + if (nextResponsePromise.IsCompleted) + { + if (nextResponsePromise.Result == false) + { + break; + } + + retryCount = 0; + + var rsp = leaser.ResponseStream.Current; + leaseEndOfLiveAlert = Task.Delay( + (int)rsp.TTL * 1000, + attemptCancellationToken); + + var never = Task.Delay( + (int)rsp.TTL * 1000 / 3, + cancellationToken).ContinueWith( + async t => + { + await requestsChannel.Writer.WriteAsync( + new LeaseKeepAliveRequest() { ID = rsp.ID }, + cancellationToken); + }, + cancellationToken); + } + } + }, + attemptCancellationToken); + + await Task.WhenAny( + requestsTask, + responsesTask).Unwrap().ConfigureAwait(false); + await leaser.RequestStream.CompleteAsync().ConfigureAwait(false); + } + } + catch (RpcException ex) + { + retryCount++; + if (retryCount >= _balancer._numNodes) + { + throw; + } + + cts.Cancel(); + } + } + } + /// /// LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client /// to the server and streaming keep alive responses from the server to the client. From ae9fd93d090480b748cf29a714461410d43319c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Wed, 3 Aug 2022 13:27:56 +0600 Subject: [PATCH 02/17] =?UTF-8?q?=D1=82=D0=B5=D1=81=D1=82=D1=8B=20=D0=B8?= =?UTF-8?q?=20=D1=82=D0=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DevelopmentSandbox/DevelopmentSandbox.csproj | 20 ++++++ DevelopmentSandbox/Program.cs | 75 ++++++++++++++++++++ dotnet-etcd.sln | 6 ++ dotnet-etcd/etcdClient.cs | 2 +- dotnet-etcd/leaseClient.cs | 3 +- dotnet-etcd/multiplexer/Balancer.cs | 2 +- tests/Integration/DevelopmentProcessTests.cs | 57 +++++++++++++-- tests/Integration/Framework.cs | 25 +++---- 8 files changed, 169 insertions(+), 21 deletions(-) create mode 100644 DevelopmentSandbox/DevelopmentSandbox.csproj create mode 100644 DevelopmentSandbox/Program.cs diff --git a/DevelopmentSandbox/DevelopmentSandbox.csproj b/DevelopmentSandbox/DevelopmentSandbox.csproj new file mode 100644 index 0000000..2f45200 --- /dev/null +++ b/DevelopmentSandbox/DevelopmentSandbox.csproj @@ -0,0 +1,20 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + + + + + + + diff --git a/DevelopmentSandbox/Program.cs b/DevelopmentSandbox/Program.cs new file mode 100644 index 0000000..98f0574 --- /dev/null +++ b/DevelopmentSandbox/Program.cs @@ -0,0 +1,75 @@ +using System.Net; +using dotnet_etcd; +using DotnetNiceGrpcLogs; +using Etcdserverpb; +using Serilog; +using Serilog.Sinks.SystemConsole.Themes; + +namespace DevelopmentSandbox // Note: actual namespace depends on the project name. +{ + internal class Program + { + static void Main(string[] args) + { + ILogger logger = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Console( + theme: SystemConsoleTheme.Literate, + outputTemplate: + "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}{Properties:j}{NewLine}") + .Enrich.FromLogContext() + .CreateLogger(); + + HttpMessageHandler handler = new SocketsHttpHandler + { + + // ConnectTimeout = default, + KeepAlivePingDelay = TimeSpan.FromMilliseconds(1000), + KeepAlivePingTimeout = TimeSpan.FromMilliseconds(1000), + KeepAlivePingPolicy = HttpKeepAlivePingPolicy.WithActiveRequests, + // + // PooledConnectionIdleTimeout = default, + // PooledConnectionLifetime = default, + // ResponseDrainTimeout = default + }; + EtcdClient client = new EtcdClient( + "http://127.0.0.1:2379", //todo: вытащить в конфигурацию + handler: handler, + useLegacyRpcExceptionForCancellation: false, + interceptors: new GrpcLogsInterceptor( + logger, + new LogsInterceptorOptions + { + //LoggerName = null, + IncludeLogData = true + })); + + + Task.Run( + async () => + { + while (true) + { + await Task.Delay(500); + client.Put( + "1", + DateTime.Now.ToString()); + } + + }); + + void Rsp(WatchResponse response) + { + logger.ForContext( + "watchEvent", + response) + .Information("new watch event"); + } + + client.WatchRange( + "", + method: (Action)Rsp); + + + logger.Information("endddd"); + } + } +}; \ No newline at end of file diff --git a/dotnet-etcd.sln b/dotnet-etcd.sln index d9eb707..223fb8a 100644 --- a/dotnet-etcd.sln +++ b/dotnet-etcd.sln @@ -9,6 +9,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{8645A28E EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Integration", "tests\Integration\Integration.csproj", "{617202A0-4D3A-4FA7-978E-B5A01A3152BD}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DevelopmentSandbox", "DevelopmentSandbox\DevelopmentSandbox.csproj", "{41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -23,6 +25,10 @@ Global {617202A0-4D3A-4FA7-978E-B5A01A3152BD}.Debug|Any CPU.Build.0 = Debug|Any CPU {617202A0-4D3A-4FA7-978E-B5A01A3152BD}.Release|Any CPU.ActiveCfg = Release|Any CPU {617202A0-4D3A-4FA7-978E-B5A01A3152BD}.Release|Any CPU.Build.0 = Release|Any CPU + {41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/dotnet-etcd/etcdClient.cs b/dotnet-etcd/etcdClient.cs index de14ea7..9871a90 100644 --- a/dotnet-etcd/etcdClient.cs +++ b/dotnet-etcd/etcdClient.cs @@ -31,7 +31,7 @@ public partial class EtcdClient : IDisposable, IEtcdClient #region Initializers public EtcdClient(string connectionString, int port = 2379, - HttpClientHandler handler = null, bool ssl = false, + HttpMessageHandler handler = null, bool ssl = false, bool useLegacyRpcExceptionForCancellation = false, params Interceptor[] interceptors) { if (string.IsNullOrWhiteSpace(connectionString)) diff --git a/dotnet-etcd/leaseClient.cs b/dotnet-etcd/leaseClient.cs index 7e7a169..98eb115 100644 --- a/dotnet-etcd/leaseClient.cs +++ b/dotnet-etcd/leaseClient.cs @@ -204,6 +204,7 @@ await Task.WhenAny( } catch (RpcException ex) { + retryCount++; if (retryCount >= _balancer._numNodes) { @@ -379,4 +380,4 @@ public async Task LeaseTimeToLiveAsync(LeaseTimeToLiveR CancellationToken cancellationToken = default) => await CallEtcdAsync(async (connection) => await connection._leaseClient .LeaseTimeToLiveAsync(request, headers, deadline, cancellationToken)).ConfigureAwait(false); } -} \ No newline at end of file +} diff --git a/dotnet-etcd/multiplexer/Balancer.cs b/dotnet-etcd/multiplexer/Balancer.cs index 7ac998d..7ded4ed 100644 --- a/dotnet-etcd/multiplexer/Balancer.cs +++ b/dotnet-etcd/multiplexer/Balancer.cs @@ -36,7 +36,7 @@ internal class Balancer private static readonly Random s_random = new Random(); - internal Balancer(List nodes, HttpClientHandler handler = null, bool ssl = false, + internal Balancer(List nodes, HttpMessageHandler handler = null, bool ssl = false, bool useLegacyRpcExceptionForCancellation = false, params Interceptor[] interceptors) { _numNodes = nodes.Count; diff --git a/tests/Integration/DevelopmentProcessTests.cs b/tests/Integration/DevelopmentProcessTests.cs index b9fa69f..8f2db1a 100644 --- a/tests/Integration/DevelopmentProcessTests.cs +++ b/tests/Integration/DevelopmentProcessTests.cs @@ -1,32 +1,77 @@ +using System; +using System.Diagnostics; +using System.Net; +using System.Net.Http; +using System.Security.Authentication; +using System.Threading; using System.Threading.Tasks; using dotnet_etcd; using DotnetNiceGrpcLogs; using Etcdserverpb; using NUnit.Framework; -using Serilog; -using Serilog.Sinks.SystemConsole.Themes; namespace Integration; public class DevelopmentProcessTests { - + private EtcdClient Client { get; } = new EtcdClient( + "http://127.0.0.1:2379", //todo: вытащить в конфигурацию + useLegacyRpcExceptionForCancellation: false, + interceptors: new GrpcLogsInterceptor( + Framework.Logger, + new LogsInterceptorOptions + { + //LoggerName = null, + IncludeLogData = true + })); [SetUp] public async Task Setup() { - await Framework.CleanEtcdTestsKeys(); + await Framework.CleanEtcdTestsKeys(Client); } [TearDown] public async Task TearDownAsync() { - await Framework.CleanEtcdTestsKeys(); + await Framework.CleanEtcdTestsKeys(Client); } //debug space here [Test] public async Task Test1() { + Trace.Listeners.Add(new ConsoleTraceListener()); + // var rsp = await Client.LeaseGrantAsync(new LeaseGrantRequest() { TTL = 8 }); + // var ct = new CancellationTokenSource(); + // await Client.LeaseKeepAlive( + // rsp.ID, + // 30, + // ct.Token); + + Task.Run( + () => + { + while (true) + { + Task.Delay(500).GetAwaiter().GetResult(); + Client.Put( + "1", + DateTime.Now.ToString()); + } + + }); + + void Rsp(WatchResponse response) + { + Framework.Logger.ForContext( + "watchEvent", + response) + .Information("new watch event"); + } + + Client.WatchRange( + "", + method: (Action)Rsp); } -} \ No newline at end of file +} diff --git a/tests/Integration/Framework.cs b/tests/Integration/Framework.cs index d54a6cb..bc45281 100644 --- a/tests/Integration/Framework.cs +++ b/tests/Integration/Framework.cs @@ -17,19 +17,20 @@ public static class Framework .Enrich.FromLogContext() .CreateLogger(); - public static EtcdClient Client { get; } = new EtcdClient( - "http://localhost:23790,http://localhost:23791,http://localhost:23792", //todo: вытащить в конфигурацию - useLegacyRpcExceptionForCancellation: false, - interceptors: new GrpcLogsInterceptor( - Logger, - new LogsInterceptorOptions - { - //LoggerName = null, - IncludeLogData = true - })); + // public static EtcdClient Client { get; } = new EtcdClient( + // "http://localhost:23790,http://localhost:23791,http://localhost:23792", //todo: вытащить в конфигурацию + // useLegacyRpcExceptionForCancellation: false, + // interceptors: new GrpcLogsInterceptor( + // Logger, + // new LogsInterceptorOptions + // { + // //LoggerName = null, + // IncludeLogData = true + // })); - public static async Task CleanEtcdTestsKeys() + public static async Task CleanEtcdTestsKeys(EtcdClient client) { - await Client.DeleteRangeAsync(TestPrefix); + await client.DeleteRangeAsync(TestPrefix); } + } \ No newline at end of file From ced0097520ce09a9ebe0ae12327f83f2f32fac7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Sun, 7 Aug 2022 09:56:29 +0600 Subject: [PATCH 03/17] new version based on streams --- DevelopmentSandbox/Program.cs | 91 ++++---- .../Properties/launchSettings.json | 11 + docker-compose.yml | 54 +++++ dotnet-etcd/LeaseExpiredException.cs | 34 --- .../LeaseExpiredOrNotFoundException.cs | 39 ++++ dotnet-etcd/leaseClient.cs | 212 +++++++++++------- dotnet-etcd/multiplexer/Balancer.cs | 3 +- 7 files changed, 280 insertions(+), 164 deletions(-) create mode 100644 DevelopmentSandbox/Properties/launchSettings.json create mode 100644 docker-compose.yml delete mode 100644 dotnet-etcd/LeaseExpiredException.cs create mode 100644 dotnet-etcd/LeaseExpiredOrNotFoundException.cs diff --git a/DevelopmentSandbox/Program.cs b/DevelopmentSandbox/Program.cs index 98f0574..be0029c 100644 --- a/DevelopmentSandbox/Program.cs +++ b/DevelopmentSandbox/Program.cs @@ -9,7 +9,7 @@ namespace DevelopmentSandbox // Note: actual namespace depends on the project na { internal class Program { - static void Main(string[] args) + static async Task Main(string[] args) { ILogger logger = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Console( theme: SystemConsoleTheme.Literate, @@ -18,57 +18,52 @@ static void Main(string[] args) .Enrich.FromLogContext() .CreateLogger(); - HttpMessageHandler handler = new SocketsHttpHandler - { + // HttpMessageHandler handler = new SocketsHttpHandler + // { + // + // // ConnectTimeout = default, + // KeepAlivePingDelay = TimeSpan.FromMilliseconds(1000), + // KeepAlivePingTimeout = TimeSpan.FromMilliseconds(1000), + // KeepAlivePingPolicy = HttpKeepAlivePingPolicy.WithActiveRequests, + // // + // // PooledConnectionIdleTimeout = default, + // // PooledConnectionLifetime = default, + // // ResponseDrainTimeout = default + // }; + string connection_string = Environment.GetEnvironmentVariable("ETCD_CONNECTION_STRING"); + EtcdClient client = new EtcdClient(connection_string, + // handler: handler, + useLegacyRpcExceptionForCancellation: false//, + // interceptors: new GrpcLogsInterceptor( + // logger, + // new LogsInterceptorOptions + // { + // //LoggerName = null, + // IncludeLogData = true + // }) + ); - // ConnectTimeout = default, - KeepAlivePingDelay = TimeSpan.FromMilliseconds(1000), - KeepAlivePingTimeout = TimeSpan.FromMilliseconds(1000), - KeepAlivePingPolicy = HttpKeepAlivePingPolicy.WithActiveRequests, - // - // PooledConnectionIdleTimeout = default, - // PooledConnectionLifetime = default, - // ResponseDrainTimeout = default + Func doJob = async () => + { + var leaseId = client.LeaseGrant(new LeaseGrantRequest() { TTL = 1 }).ID; + await client.HighlyReliableLeaseKeepAlive( + leaseId, + 3, + CancellationToken.None); + // await client.LeaseKeepAlive( + // leaseId, + // CancellationToken.None); }; - EtcdClient client = new EtcdClient( - "http://127.0.0.1:2379", //todo: вытащить в конфигурацию - handler: handler, - useLegacyRpcExceptionForCancellation: false, - interceptors: new GrpcLogsInterceptor( - logger, - new LogsInterceptorOptions - { - //LoggerName = null, - IncludeLogData = true - })); - - Task.Run( - async () => - { - while (true) - { - await Task.Delay(500); - client.Put( - "1", - DateTime.Now.ToString()); - } - - }); - - void Rsp(WatchResponse response) + var jobs = Enumerable.Range( + 0, + 100000).Select(i => { - logger.ForContext( - "watchEvent", - response) - .Information("new watch event"); - } - - client.WatchRange( - "", - method: (Action)Rsp); - - + Console.WriteLine(i); + return doJob(); + }); + await await Task.WhenAny(jobs); + logger.Information("endddd"); } } diff --git a/DevelopmentSandbox/Properties/launchSettings.json b/DevelopmentSandbox/Properties/launchSettings.json new file mode 100644 index 0000000..dcf8593 --- /dev/null +++ b/DevelopmentSandbox/Properties/launchSettings.json @@ -0,0 +1,11 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "DevelopmentSandbox": { + "commandName": "Project", + "environmentVariables": { + "ETCD_CONNECTION_STRING": "http://127.0.0.1:23790,http://127.0.0.1:23791,http://127.0.0.1:23792" + } + } + } +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..1a23c2c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,54 @@ +version: '3.5' + +services: + etcd0: + image: "gcr.io/etcd-development/etcd:v3.5.0-beta.4" + ports: + - "23790:2379" + - "23792379" + command: + [ + "etcd", + "--name=etcd0", + "--advertise-client-urls=http://etcd0:2379", + "--listen-client-urls=http://0.0.0.0:2379", + "--initial-advertise-peer-urls=http://etcd0:2380", + "--listen-peer-urls=http://0.0.0.0:2380", + "--initial-cluster-token=etcd-cluster-1", + "--initial-cluster=etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380", + "--initial-cluster-state=new" + ] + + etcd1: + image: "gcr.io/etcd-development/etcd:v3.5.0-beta.4" + ports: + - "23791:2379" + command: + [ + "etcd", + "--name=etcd1", + "--advertise-client-urls=http://etcd1:2379", + "--listen-client-urls=http://0.0.0.0:2379", + "--initial-advertise-peer-urls=http://etcd1:2380", + "--listen-peer-urls=http://0.0.0.0:2380", + "--initial-cluster-token=etcd-cluster-1", + "--initial-cluster=etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380", + "--initial-cluster-state=new" + ] + + etcd2: + image: "gcr.io/etcd-development/etcd:v3.5.0-beta.4" + ports: + - "23792:2379" + command: + [ + "etcd", + "--name=etcd2", + "--advertise-client-urls=http://etcd2:2379", + "--listen-client-urls=http://0.0.0.0:2379", + "--initial-advertise-peer-urls=http://etcd2:2380", + "--listen-peer-urls=http://0.0.0.0:2380", + "--initial-cluster-token=etcd-cluster-1", + "--initial-cluster=etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380", + "--initial-cluster-state=new" + ] diff --git a/dotnet-etcd/LeaseExpiredException.cs b/dotnet-etcd/LeaseExpiredException.cs deleted file mode 100644 index 962deec..0000000 --- a/dotnet-etcd/LeaseExpiredException.cs +++ /dev/null @@ -1,34 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using System; -using System.Runtime.Serialization; - -namespace dotnet_etcd -{ - [Serializable] - public class LeaseExpiredException : Exception - { - public LeaseExpiredException() - { - } - - public LeaseExpiredException(string message) : base(message) - { - } - - public LeaseExpiredException(string message, Exception inner) : base( - message, - inner) - { - } - - protected LeaseExpiredException( - SerializationInfo info, - StreamingContext context) : base( - info, - context) - { - } - } -} diff --git a/dotnet-etcd/LeaseExpiredOrNotFoundException.cs b/dotnet-etcd/LeaseExpiredOrNotFoundException.cs new file mode 100644 index 0000000..8249ad9 --- /dev/null +++ b/dotnet-etcd/LeaseExpiredOrNotFoundException.cs @@ -0,0 +1,39 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Runtime.Serialization; + +namespace dotnet_etcd +{ + [Serializable] + public class LeaseExpiredOrNotFoundException : Exception + { + private readonly long _leaseId; + + public LeaseExpiredOrNotFoundException(long leaseId) + { + _leaseId = leaseId; + } + + public LeaseExpiredOrNotFoundException(long leaseId, string message) : base(message) + { + _leaseId = leaseId; + } + + public LeaseExpiredOrNotFoundException(long leaseId, string message, Exception inner) : base( + message, + inner) + { + _leaseId = leaseId; + } + + protected LeaseExpiredOrNotFoundException( + SerializationInfo info, + StreamingContext context) : base( + info, + context) + { + } + } +} diff --git a/dotnet-etcd/leaseClient.cs b/dotnet-etcd/leaseClient.cs index 98eb115..c499b6d 100644 --- a/dotnet-etcd/leaseClient.cs +++ b/dotnet-etcd/leaseClient.cs @@ -2,11 +2,14 @@ // The .NET Foundation licenses this file to you under the MIT license. using System; +using System.Collections.Generic; using System.IO; +using System.Linq; +using System.Net.Http.Headers; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; - +using dotnet_etcd.multiplexer; using Etcdserverpb; using Grpc.Core; @@ -116,106 +119,153 @@ public async Task LeaseKeepAlive(long leaseId, CancellationToken cancellationTok /// to the server and streaming keep alive responses from the server to the client. /// /// + /// /// - /// throws an exception if no response + /// throws an exception if no response /// is received within the lease TTL or - public async Task LeaseKeepAlive(long leaseId, long leaseRemainigTTL, CancellationToken cancellationToken) + public async Task HighlyReliableLeaseKeepAlive(long leaseId, long leaseRemainigTTL, CancellationToken cancellationToken) { - int retryCount = 0; + int startNodeIndex = (new Random()).Next(_balancer._numNodes); while (true) { - var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - var attemptCancellationToken = cts.Token; - try + cancellationToken.ThrowIfCancellationRequested(); + int retryCount = 0; + startNodeIndex = ++startNodeIndex >= _balancer._numNodes ? 0 : startNodeIndex; + DateTime leaseExpiredAt = DateTime.Now.ToUniversalTime().AddSeconds(leaseRemainigTTL); + var attemptPeriodMs = (int)(leaseRemainigTTL * 0.8 * 1000 / _balancer._numNodes); + IEnumerable> calls = new List>(); + bool hasSuccessAttempt = false; + while (retryCount < _balancer._numNodes) { - var connection = _balancer.GetConnection(); - using (AsyncDuplexStreamingCall leaser = - connection._leaseClient.LeaseKeepAlive(cancellationToken: attemptCancellationToken)) + retryCount++; + cancellationToken.ThrowIfCancellationRequested(); + int currentNodeIndex = startNodeIndex + retryCount; + currentNodeIndex = currentNodeIndex >= _balancer._numNodes ? currentNodeIndex - _balancer._numNodes : currentNodeIndex; + var connection = _balancer._healthyNode.ElementAt(currentNodeIndex); + calls = await NewLeaseTimeToLiveAttempt( + calls, + leaseId, + connection, + leaseExpiredAt, + attemptPeriodMs, + cancellationToken) + .ConfigureAwait(false); + + if (IsAnyCallCompletedSuccessfully(calls, out var response)) { - var requestsChannel = Channel.CreateUnbounded(); - await requestsChannel.Writer.WriteAsync( - new LeaseKeepAliveRequest() { ID = leaseId }, - attemptCancellationToken); - var requestsTask = Task.Run( - async - () => - { - await foreach (LeaseKeepAliveRequest lkReq in requestsChannel.Reader.ReadAllAsync( - attemptCancellationToken)) - { - await leaser.RequestStream.WriteAsync(lkReq); - } - }, - attemptCancellationToken); - var responsesTask = Task.Run( - async - () => + if (response.TTL < 1) + { + throw new LeaseExpiredOrNotFoundException(leaseId); + } + hasSuccessAttempt = true; + leaseRemainigTTL = response.TTL; + break; + } + } + + if (!hasSuccessAttempt) + { + var leaseExpiredDuration = + leaseExpiredAt.Subtract(DateTime.Now.ToUniversalTime()); + var waitLeaseExpired = Task.Delay(leaseExpiredDuration, + cancellationToken); //todo если лиза уже истекла??? + Func>> getRemainigCalls = () => calls.Where( + c => c.IsCompleted == false + || c.IsCompletedSuccessfully); + var remainingCalls = getRemainigCalls(); + LeaseKeepAliveResponse response; + while (!IsAnyCallCompletedSuccessfully(remainingCalls, out response)) + { + cancellationToken.ThrowIfCancellationRequested(); + if (waitLeaseExpired.IsCompleted || !remainingCalls.Any()) + { + var exceptions = calls + .Where(c => c.IsFaulted) + .SelectMany(c => c.Exception!.InnerExceptions); + if (waitLeaseExpired.IsCompleted) { - var leaseEndOfLiveAlert = Task.Delay( - (int)leaseRemainigTTL * 1000, - attemptCancellationToken); - while (true) - { - var nextResponsePromise = leaser.ResponseStream.MoveNext(attemptCancellationToken); - - await Task.WhenAny( - leaseEndOfLiveAlert, - nextResponsePromise).Unwrap(); - attemptCancellationToken.ThrowIfCancellationRequested(); - if (leaseEndOfLiveAlert.IsCompleted) - { - throw new LeaseExpiredException(); - } - - if (nextResponsePromise.IsCompleted) - { - if (nextResponsePromise.Result == false) - { - break; - } - - retryCount = 0; - - var rsp = leaser.ResponseStream.Current; - leaseEndOfLiveAlert = Task.Delay( - (int)rsp.TTL * 1000, - attemptCancellationToken); - - var never = Task.Delay( - (int)rsp.TTL * 1000 / 3, - cancellationToken).ContinueWith( - async t => - { - await requestsChannel.Writer.WriteAsync( - new LeaseKeepAliveRequest() { ID = rsp.ID }, - cancellationToken); - }, - cancellationToken); - } - } - }, - attemptCancellationToken); + exceptions = exceptions.Append(new LeaseExpiredOrNotFoundException(leaseId)); + } + + throw new AggregateException(exceptions); + } await Task.WhenAny( - requestsTask, - responsesTask).Unwrap().ConfigureAwait(false); - await leaser.RequestStream.CompleteAsync().ConfigureAwait(false); + remainingCalls + .Append(waitLeaseExpired)).ConfigureAwait(false); + remainingCalls = getRemainigCalls(); + } + hasSuccessAttempt = true; + leaseRemainigTTL = response.TTL; } - catch (RpcException ex) + var sleepDelay = (int)(leaseRemainigTTL * 1000 / 3); + await Task.Delay(sleepDelay, cancellationToken); + leaseRemainigTTL -= sleepDelay/1000; + } + + async Task OneTimeKeepAlive( long leaseId, Connection connection, + DateTime deadline, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + using (AsyncDuplexStreamingCall leaser = + connection._leaseClient + .LeaseKeepAlive(deadline: deadline, + cancellationToken: cancellationToken)) { + cancellationToken.ThrowIfCancellationRequested(); + await leaser.RequestStream.WriteAsync(new LeaseKeepAliveRequest() + { + ID = leaseId + }, cancellationToken: cancellationToken).ConfigureAwait(false); + var result = await leaser.ResponseStream.MoveNext(cancellationToken); - retryCount++; - if (retryCount >= _balancer._numNodes) + await leaser.RequestStream.CompleteAsync().ConfigureAwait(false); + return leaser.ResponseStream.Current; + } + } + + async Task>> NewLeaseTimeToLiveAttempt( + IEnumerable> calls, + long leaseId, Connection connection, + DateTime deadline, + int attemptPeriodMs, CancellationToken cancellationToken) + { + var callResponse = OneTimeKeepAlive( + leaseId, + connection, + deadline, + cancellationToken); + calls = calls.Append(callResponse); + Task attemptDelay = Task.Delay( + attemptPeriodMs, + cancellationToken); + await Task.WhenAny( + calls.Where(c => c.IsCompletedSuccessfully) + .Append(attemptDelay)).ConfigureAwait(false); + return calls; + + } + + bool IsAnyCallCompletedSuccessfully(IEnumerable> calls, + out LeaseKeepAliveResponse response) + { + foreach (Task call in calls) + { + if (call.IsCompletedSuccessfully) { - throw; + response = call.Result; + return true; } - - cts.Cancel(); } + response = null; + return false; } } + + + /// /// LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client /// to the server and streaming keep alive responses from the server to the client. diff --git a/dotnet-etcd/multiplexer/Balancer.cs b/dotnet-etcd/multiplexer/Balancer.cs index 7ded4ed..aec4303 100644 --- a/dotnet-etcd/multiplexer/Balancer.cs +++ b/dotnet-etcd/multiplexer/Balancer.cs @@ -18,7 +18,7 @@ namespace dotnet_etcd.multiplexer internal class Balancer { - private readonly HashSet _healthyNode; + internal readonly HashSet _healthyNode; /// /// No of etcd nodes @@ -57,6 +57,7 @@ internal Balancer(List nodes, HttpMessageHandler handler = null, bool ssl = ThrowOperationCanceledOnCancellation = !useLegacyRpcExceptionForCancellation }); } + else { #if NETCOREAPP3_1 || NETCOREAPP3_0 From bf6c47177ef24a0feba3fb27117d031190457de6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Sun, 7 Aug 2022 10:22:43 +0600 Subject: [PATCH 04/17] =?UTF-8?q?=D1=83=D0=BA=D1=80=D0=B0=D1=88=D0=B0?= =?UTF-8?q?=D1=82=D0=B5=D0=BB=D1=8C=D1=81=D1=82=D0=B2=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dotnet-etcd/leaseClient.cs | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/dotnet-etcd/leaseClient.cs b/dotnet-etcd/leaseClient.cs index c499b6d..4cf9be5 100644 --- a/dotnet-etcd/leaseClient.cs +++ b/dotnet-etcd/leaseClient.cs @@ -132,7 +132,8 @@ public async Task HighlyReliableLeaseKeepAlive(long leaseId, long leaseRemainigT int retryCount = 0; startNodeIndex = ++startNodeIndex >= _balancer._numNodes ? 0 : startNodeIndex; DateTime leaseExpiredAt = DateTime.Now.ToUniversalTime().AddSeconds(leaseRemainigTTL); - var attemptPeriodMs = (int)(leaseRemainigTTL * 0.8 * 1000 / _balancer._numNodes); + double attemptPeriodCoefficient = 0.8; + int attemptPeriodMs = (int)(leaseRemainigTTL * attemptPeriodCoefficient * 1000 / _balancer._numNodes); IEnumerable> calls = new List>(); bool hasSuccessAttempt = false; while (retryCount < _balancer._numNodes) @@ -140,8 +141,10 @@ public async Task HighlyReliableLeaseKeepAlive(long leaseId, long leaseRemainigT retryCount++; cancellationToken.ThrowIfCancellationRequested(); int currentNodeIndex = startNodeIndex + retryCount; - currentNodeIndex = currentNodeIndex >= _balancer._numNodes ? currentNodeIndex - _balancer._numNodes : currentNodeIndex; - var connection = _balancer._healthyNode.ElementAt(currentNodeIndex); + currentNodeIndex = currentNodeIndex >= _balancer._numNodes + ? currentNodeIndex - _balancer._numNodes + : currentNodeIndex; + Connection connection = _balancer._healthyNode.ElementAt(currentNodeIndex); calls = await NewLeaseTimeToLiveAttempt( calls, leaseId, @@ -151,7 +154,7 @@ public async Task HighlyReliableLeaseKeepAlive(long leaseId, long leaseRemainigT cancellationToken) .ConfigureAwait(false); - if (IsAnyCallCompletedSuccessfully(calls, out var response)) + if (IsAnyCallCompletedSuccessfully(calls, out LeaseKeepAliveResponse response)) { if (response.TTL < 1) { @@ -165,10 +168,13 @@ public async Task HighlyReliableLeaseKeepAlive(long leaseId, long leaseRemainigT if (!hasSuccessAttempt) { - var leaseExpiredDuration = + TimeSpan leaseExpiredDuration = leaseExpiredAt.Subtract(DateTime.Now.ToUniversalTime()); - var waitLeaseExpired = Task.Delay(leaseExpiredDuration, - cancellationToken); //todo если лиза уже истекла??? + Task waitLeaseExpired = leaseExpiredDuration.TotalMilliseconds <= 0 + ? Task.CompletedTask + : Task.Delay( + leaseExpiredDuration, + cancellationToken); Func>> getRemainigCalls = () => calls.Where( c => c.IsCompleted == false || c.IsCompletedSuccessfully); @@ -199,7 +205,9 @@ await Task.WhenAny( hasSuccessAttempt = true; leaseRemainigTTL = response.TTL; } - var sleepDelay = (int)(leaseRemainigTTL * 1000 / 3); + + double sleepСoefficient = 1.0/3; + int sleepDelay = (int)(leaseRemainigTTL * sleepСoefficient * 1000); await Task.Delay(sleepDelay, cancellationToken); leaseRemainigTTL -= sleepDelay/1000; } @@ -218,8 +226,14 @@ await leaser.RequestStream.WriteAsync(new LeaseKeepAliveRequest() { ID = leaseId }, cancellationToken: cancellationToken).ConfigureAwait(false); - var result = await leaser.ResponseStream.MoveNext(cancellationToken); - + bool result = await leaser.ResponseStream.MoveNext(cancellationToken); + if (!result) + { + throw new RpcException( + new Status( + StatusCode.Aborted, + "didnt receive keepAlive response")); + } await leaser.RequestStream.CompleteAsync().ConfigureAwait(false); return leaser.ResponseStream.Current; } From 11fea6559bb92c49fc9bebb96a09c5f9bfc765dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Sun, 7 Aug 2022 10:44:46 +0600 Subject: [PATCH 05/17] update summary --- dotnet-etcd/leaseClient.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dotnet-etcd/leaseClient.cs b/dotnet-etcd/leaseClient.cs index 4cf9be5..0375421 100644 --- a/dotnet-etcd/leaseClient.cs +++ b/dotnet-etcd/leaseClient.cs @@ -115,11 +115,11 @@ public async Task LeaseKeepAlive(long leaseId, CancellationToken cancellationTok /// - /// LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client - /// to the server and streaming keep alive responses from the server to the client. + /// HighlyReliableLeaseKeepAlive keeps lease alive by sending keep alive requests and receiving keep alive responses. + /// Reliability is achieved by sequentially sending keep alive requests at short intervals to all etcd nodes /// - /// - /// + /// lease identifier + /// the remaining TTL at the time the method was called. used to determine initial deadlines /// /// throws an exception if no response /// is received within the lease TTL or From b673ec1e2af9538a55746f4e3340baa748ebcaa7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Mon, 8 Aug 2022 10:49:36 +0600 Subject: [PATCH 06/17] =?UTF-8?q?=D0=BC=D0=B8=D0=BB=D0=BA=D0=B8=D0=B5=20?= =?UTF-8?q?=D0=BF=D1=80=D0=B0=D0=B2=D0=BA=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dotnet-etcd/leaseClient.cs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dotnet-etcd/leaseClient.cs b/dotnet-etcd/leaseClient.cs index 0375421..1e3f5a8 100644 --- a/dotnet-etcd/leaseClient.cs +++ b/dotnet-etcd/leaseClient.cs @@ -208,7 +208,7 @@ await Task.WhenAny( double sleepСoefficient = 1.0/3; int sleepDelay = (int)(leaseRemainigTTL * sleepСoefficient * 1000); - await Task.Delay(sleepDelay, cancellationToken); + await Task.Delay(sleepDelay, cancellationToken).ConfigureAwait(false); leaseRemainigTTL -= sleepDelay/1000; } @@ -226,7 +226,7 @@ await leaser.RequestStream.WriteAsync(new LeaseKeepAliveRequest() { ID = leaseId }, cancellationToken: cancellationToken).ConfigureAwait(false); - bool result = await leaser.ResponseStream.MoveNext(cancellationToken); + bool result = await leaser.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false); if (!result) { throw new RpcException( @@ -258,7 +258,6 @@ await Task.WhenAny( calls.Where(c => c.IsCompletedSuccessfully) .Append(attemptDelay)).ConfigureAwait(false); return calls; - } bool IsAnyCallCompletedSuccessfully(IEnumerable> calls, From 7e674a04faed2aaf17e5dbf86eb961fd7375bb7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Fri, 12 Aug 2022 13:37:49 +0600 Subject: [PATCH 07/17] =?UTF-8?q?=D0=BD=D0=BE=D0=B2=D1=8B=D0=B9=20=D0=BC?= =?UTF-8?q?=D0=B5=D1=82=D0=BE=D0=B4=20=D1=81=20=D0=BF=D0=BE=D0=BB=D0=BB?= =?UTF-8?q?=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DevelopmentSandbox/Program.cs | 39 ++++-- docker-compose.yml | 2 +- dotnet-etcd/dotnet-etcd.csproj | 2 + dotnet-etcd/leaseClient.cs | 241 ++++++++++++++++++--------------- 4 files changed, 158 insertions(+), 126 deletions(-) diff --git a/DevelopmentSandbox/Program.cs b/DevelopmentSandbox/Program.cs index be0029c..d7794d7 100644 --- a/DevelopmentSandbox/Program.cs +++ b/DevelopmentSandbox/Program.cs @@ -2,6 +2,10 @@ using dotnet_etcd; using DotnetNiceGrpcLogs; using Etcdserverpb; +using Grpc.Core; +using Polly; +using Polly.Contrib.WaitAndRetry; +using Polly.Timeout; using Serilog; using Serilog.Sinks.SystemConsole.Themes; @@ -14,7 +18,7 @@ static async Task Main(string[] args) ILogger logger = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Console( theme: SystemConsoleTheme.Literate, outputTemplate: - "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}{Properties:j}{NewLine}") + "[{Timestamp:HH:mm:ss.fff} {Level:u3}] {Message:lj}{NewLine}{Exception}{Properties:j}{NewLine}") .Enrich.FromLogContext() .CreateLogger(); @@ -33,38 +37,43 @@ static async Task Main(string[] args) string connection_string = Environment.GetEnvironmentVariable("ETCD_CONNECTION_STRING"); EtcdClient client = new EtcdClient(connection_string, // handler: handler, - useLegacyRpcExceptionForCancellation: false//, - // interceptors: new GrpcLogsInterceptor( - // logger, - // new LogsInterceptorOptions - // { - // //LoggerName = null, - // IncludeLogData = true - // }) + useLegacyRpcExceptionForCancellation: false, + interceptors: new GrpcLogsInterceptor( + logger, + new LogsInterceptorOptions + { + //LoggerName = null, + IncludeLogData = true + }) ); - + Func doJob = async () => { - var leaseId = client.LeaseGrant(new LeaseGrantRequest() { TTL = 1 }).ID; - await client.HighlyReliableLeaseKeepAlive( + var leaseId = client.LeaseGrant(new LeaseGrantRequest() { TTL = 5 }).ID; + await client.HighlyReliableLeaseKeepAliveAsync( leaseId, 3, + tryDurationMs: 1000, + maxRetryBackoffMs: 2000, + sleepAfterSuccessMs: 5000/3, CancellationToken.None); // await client.LeaseKeepAlive( // leaseId, // CancellationToken.None); }; - + var jobs = Enumerable.Range( 0, - 100000).Select(i => + 1).Select(i => { Console.WriteLine(i); return doJob(); }); await await Task.WhenAny(jobs); - logger.Information("endddd"); + logger.Information("endddd"); + + } } }; \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 1a23c2c..753d2ff 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,7 @@ services: image: "gcr.io/etcd-development/etcd:v3.5.0-beta.4" ports: - "23790:2379" - - "23792379" + - "2379:2379" command: [ "etcd", diff --git a/dotnet-etcd/dotnet-etcd.csproj b/dotnet-etcd/dotnet-etcd.csproj index 3f1a0c6..601bc7f 100644 --- a/dotnet-etcd/dotnet-etcd.csproj +++ b/dotnet-etcd/dotnet-etcd.csproj @@ -66,6 +66,8 @@ Advanced uses take advantage of the consistency guarantees to implement database all runtime; build; native; contentfiles; analyzers + + diff --git a/dotnet-etcd/leaseClient.cs b/dotnet-etcd/leaseClient.cs index 1e3f5a8..5b62224 100644 --- a/dotnet-etcd/leaseClient.cs +++ b/dotnet-etcd/leaseClient.cs @@ -6,13 +6,17 @@ using System.IO; using System.Linq; using System.Net.Http.Headers; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using dotnet_etcd.multiplexer; using Etcdserverpb; +using Polly; using Grpc.Core; +using Polly.Contrib.WaitAndRetry; +using Polly.Timeout; namespace dotnet_etcd { @@ -120,150 +124,165 @@ public async Task LeaseKeepAlive(long leaseId, CancellationToken cancellationTok /// /// lease identifier /// the remaining TTL at the time the method was called. used to determine initial deadlines + /// + /// + /// /// /// throws an exception if no response /// is received within the lease TTL or - public async Task HighlyReliableLeaseKeepAlive(long leaseId, long leaseRemainigTTL, CancellationToken cancellationToken) + public async Task HighlyReliableLeaseKeepAliveAsync(long leaseId, long leaseRemainigTTL, + int tryDurationMs, int maxRetryBackoffMs, int sleepAfterSuccessMs, CancellationToken cancellationToken) { int startNodeIndex = (new Random()).Next(_balancer._numNodes); - while (true) + while (true) // keepAlive rounds { cancellationToken.ThrowIfCancellationRequested(); - int retryCount = 0; + var roundCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + int usedKeepAliveJobs = 0; + int delayBetweenUseNewKeepAliveJob = tryDurationMs / _balancer._numNodes; startNodeIndex = ++startNodeIndex >= _balancer._numNodes ? 0 : startNodeIndex; DateTime leaseExpiredAt = DateTime.Now.ToUniversalTime().AddSeconds(leaseRemainigTTL); - double attemptPeriodCoefficient = 0.8; - int attemptPeriodMs = (int)(leaseRemainigTTL * attemptPeriodCoefficient * 1000 / _balancer._numNodes); - IEnumerable> calls = new List>(); - bool hasSuccessAttempt = false; - while (retryCount < _balancer._numNodes) + List> keepAliveJobs = new List>(); + while (usedKeepAliveJobs == 0) //< _balancer._numNodes) { - retryCount++; - cancellationToken.ThrowIfCancellationRequested(); - int currentNodeIndex = startNodeIndex + retryCount; + usedKeepAliveJobs++; + roundCancellationTokenSource.Token.ThrowIfCancellationRequested(); + int currentNodeIndex = startNodeIndex + usedKeepAliveJobs; currentNodeIndex = currentNodeIndex >= _balancer._numNodes ? currentNodeIndex - _balancer._numNodes : currentNodeIndex; Connection connection = _balancer._healthyNode.ElementAt(currentNodeIndex); - calls = await NewLeaseTimeToLiveAttempt( - calls, + + Task keepAliveJob = await InvokeWithWaitLimitAsync( + () => OneTimeKeepAliveWithRetryAsync( leaseId, connection, + tryDurationMs, + maxRetryBackoffMs, leaseExpiredAt, - attemptPeriodMs, - cancellationToken) - .ConfigureAwait(false); - - if (IsAnyCallCompletedSuccessfully(calls, out LeaseKeepAliveResponse response)) + roundCancellationTokenSource.Token), + waitLimitMs: delayBetweenUseNewKeepAliveJob, + cancellationToken: roundCancellationTokenSource.Token); + keepAliveJobs.Add(keepAliveJob); + if (keepAliveJob.IsCompletedSuccessfully) { - if (response.TTL < 1) - { - throw new LeaseExpiredOrNotFoundException(leaseId); - } - hasSuccessAttempt = true; - leaseRemainigTTL = response.TTL; + roundCancellationTokenSource.Cancel(); break; } } - if (!hasSuccessAttempt) + await Task.WhenAll(keepAliveJobs); + if (IsAnyTaskCompletedSuccessfully( + keepAliveJobs, + out var keepAliveResponse) + && keepAliveResponse.TTL > 0) { - TimeSpan leaseExpiredDuration = - leaseExpiredAt.Subtract(DateTime.Now.ToUniversalTime()); - Task waitLeaseExpired = leaseExpiredDuration.TotalMilliseconds <= 0 - ? Task.CompletedTask - : Task.Delay( - leaseExpiredDuration, - cancellationToken); - Func>> getRemainigCalls = () => calls.Where( - c => c.IsCompleted == false - || c.IsCompletedSuccessfully); - var remainingCalls = getRemainigCalls(); - LeaseKeepAliveResponse response; - while (!IsAnyCallCompletedSuccessfully(remainingCalls, out response)) - { - cancellationToken.ThrowIfCancellationRequested(); - if (waitLeaseExpired.IsCompleted || !remainingCalls.Any()) - { - var exceptions = calls - .Where(c => c.IsFaulted) - .SelectMany(c => c.Exception!.InnerExceptions); - if (waitLeaseExpired.IsCompleted) - { - exceptions = exceptions.Append(new LeaseExpiredOrNotFoundException(leaseId)); - } + //lease not found, expired or revoked + await Task.Delay( + sleepAfterSuccessMs, + cancellationToken); + leaseRemainigTTL = Math.Max(0,keepAliveResponse.TTL - sleepAfterSuccessMs / 1000); + continue; //go to next round + } - throw new AggregateException(exceptions); - } + List exceptions = new List() + { + new LeaseExpiredOrNotFoundException(leaseId), + }; + exceptions.AddRange( + keepAliveJobs + .Where(job => !job.IsCompletedSuccessfully) + .Select(job => job.Exception)); // collect all exceptions + throw new AggregateException(exceptions); + } - await Task.WhenAny( - remainingCalls - .Append(waitLeaseExpired)).ConfigureAwait(false); - remainingCalls = getRemainigCalls(); + async Task> InvokeWithWaitLimitAsync(Func> func, int waitLimitMs, CancellationToken cancellationToken) + { + Task waitLimit = Task.Delay( + waitLimitMs, + cancellationToken); + var responseTask = func(); + await Task.WhenAny(responseTask,waitLimit).ConfigureAwait(false); + return responseTask; + } - } - hasSuccessAttempt = true; - leaseRemainigTTL = response.TTL; - } - double sleepСoefficient = 1.0/3; - int sleepDelay = (int)(leaseRemainigTTL * sleepСoefficient * 1000); - await Task.Delay(sleepDelay, cancellationToken).ConfigureAwait(false); - leaseRemainigTTL -= sleepDelay/1000; - } - async Task OneTimeKeepAlive( long leaseId, Connection connection, - DateTime deadline, CancellationToken cancellationToken) + async Task OneTimeKeepAliveWithRetryAsync(long leaseId, Connection connection, + int tryDurationMs, int maxRetryBackoffMs, DateTime deadline, CancellationToken cancellationToken) { + Console.WriteLine("OneTimeKeepAliveWithRetryAsync"); cancellationToken.ThrowIfCancellationRequested(); - using (AsyncDuplexStreamingCall leaser = - connection._leaseClient - .LeaseKeepAlive(deadline: deadline, - cancellationToken: cancellationToken)) + // timeoutPolicy thrown own exception, that overlap retry exceptions, + // so this list used for catch the retry exceptions. + List retryExceptions = new List(); + var timeout = deadline.ToUniversalTime() - DateTime.Now.ToUniversalTime(); + var timeoutPolicy = Policy.TimeoutAsync(timeout); + TimeSpan maxRetryBackoff = TimeSpan.FromMilliseconds(maxRetryBackoffMs); + var delay = Backoff.DecorrelatedJitterBackoffV2( + fastFirst: true, + medianFirstRetryDelay: TimeSpan.FromMilliseconds(100), + retryCount: int.MaxValue) + .Select( + s => TimeSpan.FromTicks( + Math.Min( + s.Ticks, + maxRetryBackoff.Ticks))); + var retryPolicy = Policy + .Handle(e => e is LeaseExpiredOrNotFoundException == false) + .WaitAndRetryAsync( + delay, + onRetry: (exception, _) => retryExceptions.Add(exception)); + var retryTimeoutPolicy = Policy.TimeoutAsync(TimeSpan.FromMilliseconds(tryDurationMs)); + var policy = + Policy.WrapAsync( + timeoutPolicy, + retryPolicy, + retryTimeoutPolicy); + try { - cancellationToken.ThrowIfCancellationRequested(); - await leaser.RequestStream.WriteAsync(new LeaseKeepAliveRequest() - { - ID = leaseId - }, cancellationToken: cancellationToken).ConfigureAwait(false); - bool result = await leaser.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false); - if (!result) - { - throw new RpcException( - new Status( - StatusCode.Aborted, - "didnt receive keepAlive response")); - } - await leaser.RequestStream.CompleteAsync().ConfigureAwait(false); - return leaser.ResponseStream.Current; + var response = await policy.ExecuteAsync( + continueOnCapturedContext: false, + cancellationToken: cancellationToken, + action: async retryCancellationToken => + { + retryCancellationToken.ThrowIfCancellationRequested(); + using (AsyncDuplexStreamingCall leaser = + connection._leaseClient + .LeaseKeepAlive(cancellationToken: retryCancellationToken)) + { + await leaser.RequestStream.WriteAsync( + new LeaseKeepAliveRequest() { ID = leaseId }, + cancellationToken).ConfigureAwait(false); + bool result = await leaser.ResponseStream.MoveNext(retryCancellationToken) + .ConfigureAwait(false); + if (!result) + { + throw new RpcException( + new Status( + StatusCode.Aborted, + "didnt receive keepAlive response")); + } + + await leaser.RequestStream.CompleteAsync().ConfigureAwait(false); + return leaser.ResponseStream.Current; + } + }); + return response; + } + catch (TimeoutRejectedException e) + { + throw new AggregateException( + retryExceptions + .Append(e) + .Reverse()); } } - async Task>> NewLeaseTimeToLiveAttempt( - IEnumerable> calls, - long leaseId, Connection connection, - DateTime deadline, - int attemptPeriodMs, CancellationToken cancellationToken) - { - var callResponse = OneTimeKeepAlive( - leaseId, - connection, - deadline, - cancellationToken); - calls = calls.Append(callResponse); - Task attemptDelay = Task.Delay( - attemptPeriodMs, - cancellationToken); - await Task.WhenAny( - calls.Where(c => c.IsCompletedSuccessfully) - .Append(attemptDelay)).ConfigureAwait(false); - return calls; - } - - bool IsAnyCallCompletedSuccessfully(IEnumerable> calls, - out LeaseKeepAliveResponse response) + bool IsAnyTaskCompletedSuccessfully(IEnumerable> tasks, + out T response) { - foreach (Task call in calls) + foreach (Task call in tasks) { if (call.IsCompletedSuccessfully) { @@ -271,9 +290,11 @@ bool IsAnyCallCompletedSuccessfully(IEnumerable> ca return true; } } - response = null; + + response = default; return false; } + } From 4d87069d1752bb89ff968cc72079b79dff0e66b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Mon, 15 Aug 2022 10:43:48 +0600 Subject: [PATCH 08/17] =?UTF-8?q?=D1=80=D0=B5=D1=84=D0=B0=D0=BA=D1=82?= =?UTF-8?q?=D0=BE=D1=80=D0=B8=D0=BD=D0=B3=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dotnet-etcd/leaseClient.cs | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/dotnet-etcd/leaseClient.cs b/dotnet-etcd/leaseClient.cs index 5b62224..1bcbeaa 100644 --- a/dotnet-etcd/leaseClient.cs +++ b/dotnet-etcd/leaseClient.cs @@ -128,7 +128,7 @@ public async Task LeaseKeepAlive(long leaseId, CancellationToken cancellationTok /// /// /// - /// throws an exception if no response + /// throws the exception if lease not found, expired, revoked or keep alive unsuccessfully /// is received within the lease TTL or public async Task HighlyReliableLeaseKeepAliveAsync(long leaseId, long leaseRemainigTTL, int tryDurationMs, int maxRetryBackoffMs, int sleepAfterSuccessMs, CancellationToken cancellationToken) @@ -143,7 +143,7 @@ public async Task HighlyReliableLeaseKeepAliveAsync(long leaseId, long leaseRema startNodeIndex = ++startNodeIndex >= _balancer._numNodes ? 0 : startNodeIndex; DateTime leaseExpiredAt = DateTime.Now.ToUniversalTime().AddSeconds(leaseRemainigTTL); List> keepAliveJobs = new List>(); - while (usedKeepAliveJobs == 0) //< _balancer._numNodes) + while (usedKeepAliveJobs < _balancer._numNodes) { usedKeepAliveJobs++; roundCancellationTokenSource.Token.ThrowIfCancellationRequested(); @@ -164,27 +164,35 @@ public async Task HighlyReliableLeaseKeepAliveAsync(long leaseId, long leaseRema waitLimitMs: delayBetweenUseNewKeepAliveJob, cancellationToken: roundCancellationTokenSource.Token); keepAliveJobs.Add(keepAliveJob); - if (keepAliveJob.IsCompletedSuccessfully) + if (IsAnyTaskCompletedSuccessfully(keepAliveJobs, out var _)) { roundCancellationTokenSource.Cancel(); break; } } - await Task.WhenAll(keepAliveJobs); + try + { + await Task.WhenAll(keepAliveJobs.ToArray()); + } + catch (Exception e) + { + // ignored exceptions will handled later + } + if (IsAnyTaskCompletedSuccessfully( keepAliveJobs, out var keepAliveResponse) && keepAliveResponse.TTL > 0) { - //lease not found, expired or revoked + await Task.Delay( sleepAfterSuccessMs, cancellationToken); leaseRemainigTTL = Math.Max(0,keepAliveResponse.TTL - sleepAfterSuccessMs / 1000); continue; //go to next round } - + //lease not found, expired or revoked or keep alive unsuccessfully List exceptions = new List() { new LeaseExpiredOrNotFoundException(leaseId), @@ -211,7 +219,6 @@ async Task> InvokeWithWaitLimitAsync(Func> func, int waitLimi async Task OneTimeKeepAliveWithRetryAsync(long leaseId, Connection connection, int tryDurationMs, int maxRetryBackoffMs, DateTime deadline, CancellationToken cancellationToken) { - Console.WriteLine("OneTimeKeepAliveWithRetryAsync"); cancellationToken.ThrowIfCancellationRequested(); // timeoutPolicy thrown own exception, that overlap retry exceptions, // so this list used for catch the retry exceptions. @@ -251,9 +258,9 @@ async Task OneTimeKeepAliveWithRetryAsync(long leaseId, connection._leaseClient .LeaseKeepAlive(cancellationToken: retryCancellationToken)) { + // ReSharper disable once MethodSupportsCancellation //method doesn't support cancellation await leaser.RequestStream.WriteAsync( - new LeaseKeepAliveRequest() { ID = leaseId }, - cancellationToken).ConfigureAwait(false); + new LeaseKeepAliveRequest() { ID = leaseId }).ConfigureAwait(false); bool result = await leaser.ResponseStream.MoveNext(retryCancellationToken) .ConfigureAwait(false); if (!result) From 433527ee757d0b90a224604a05ad71db5776915e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Mon, 15 Aug 2022 21:57:03 +0600 Subject: [PATCH 09/17] =?UTF-8?q?=D1=82=D0=B5=D1=81=D1=82=D1=8B=20=D0=B8?= =?UTF-8?q?=20=D1=80=D0=B5=D1=84=D0=B0=D0=BA=D1=82=D0=BE=D1=80=D0=B8=D0=BD?= =?UTF-8?q?=D0=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DevelopmentSandbox/Program.cs | 101 +++++++++------- .../LeaseExpiredOrNotFoundException.cs | 5 +- dotnet-etcd/leaseClient.cs | 111 ++++++++++-------- 3 files changed, 124 insertions(+), 93 deletions(-) diff --git a/DevelopmentSandbox/Program.cs b/DevelopmentSandbox/Program.cs index d7794d7..f2c42b9 100644 --- a/DevelopmentSandbox/Program.cs +++ b/DevelopmentSandbox/Program.cs @@ -1,4 +1,5 @@ using System.Net; +using System.Runtime.CompilerServices; using dotnet_etcd; using DotnetNiceGrpcLogs; using Etcdserverpb; @@ -6,6 +7,7 @@ using Polly; using Polly.Contrib.WaitAndRetry; using Polly.Timeout; +using Polly.Utilities; using Serilog; using Serilog.Sinks.SystemConsole.Themes; @@ -15,65 +17,80 @@ internal class Program { static async Task Main(string[] args) { + var cts = new CancellationTokenSource(); + AppDomain.CurrentDomain.ProcessExit += (_, _) => { cts.Cancel(); }; + Console.CancelKeyPress += (_, ea) => { cts.Cancel(); }; ILogger logger = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Console( theme: SystemConsoleTheme.Literate, outputTemplate: "[{Timestamp:HH:mm:ss.fff} {Level:u3}] {Message:lj}{NewLine}{Exception}{Properties:j}{NewLine}") .Enrich.FromLogContext() .CreateLogger(); - - // HttpMessageHandler handler = new SocketsHttpHandler - // { - // - // // ConnectTimeout = default, - // KeepAlivePingDelay = TimeSpan.FromMilliseconds(1000), - // KeepAlivePingTimeout = TimeSpan.FromMilliseconds(1000), - // KeepAlivePingPolicy = HttpKeepAlivePingPolicy.WithActiveRequests, - // // - // // PooledConnectionIdleTimeout = default, - // // PooledConnectionLifetime = default, - // // ResponseDrainTimeout = default - // }; + string connection_string = Environment.GetEnvironmentVariable("ETCD_CONNECTION_STRING"); - EtcdClient client = new EtcdClient(connection_string, - // handler: handler, - useLegacyRpcExceptionForCancellation: false, - interceptors: new GrpcLogsInterceptor( - logger, - new LogsInterceptorOptions - { - //LoggerName = null, - IncludeLogData = true - }) - ); - + EtcdClient client = new EtcdClient( + connection_string, + // handler: handler, + useLegacyRpcExceptionForCancellation: false //, + // interceptors: new GrpcLogsInterceptor( + // logger, + // new LogsInterceptorOptions + // { + // //LoggerName = null, + // IncludeLogData = true + // }) + ); + Func doJob = async () => { var leaseId = client.LeaseGrant(new LeaseGrantRequest() { TTL = 5 }).ID; await client.HighlyReliableLeaseKeepAliveAsync( leaseId, - 3, - tryDurationMs: 1000, - maxRetryBackoffMs: 2000, - sleepAfterSuccessMs: 5000/3, - CancellationToken.None); + 5, + retryDurationMs: 5000, + maxRetryBackoffMs: 5000, + sleepAfterSuccessMs: 5000 / 3, + CancellationToken.None).ConfigureAwait(false); + // await client.LeaseKeepAlive( // leaseId, - // CancellationToken.None); + // CancellationToken.None).ConfigureAwait(false); }; - - var jobs = Enumerable.Range( - 0, - 1).Select(i => + + List jobs = new List(1000); + + foreach (var i in Enumerable.Range( + 0, + 20000)) { - Console.WriteLine(i); - return doJob(); - }); + + await Task.Delay(5); //что бы кипалайвы были в приоритете создания новых тасков + if (cts.Token.IsCancellationRequested) + { + break; + } + + var t = Task.Run( + async () => + { + cts.Token.ThrowIfCancellationRequested(); + Console.WriteLine(i); + try + { + await doJob().ConfigureAwait(false); + } + finally + { + cts.Cancel(); + } + }, + cts.Token); + jobs.Add(t); + } + await await Task.WhenAny(jobs); - - logger.Information("endddd"); - - + logger.Information("endddd"); + } } }; \ No newline at end of file diff --git a/dotnet-etcd/LeaseExpiredOrNotFoundException.cs b/dotnet-etcd/LeaseExpiredOrNotFoundException.cs index 8249ad9..ab0ddb4 100644 --- a/dotnet-etcd/LeaseExpiredOrNotFoundException.cs +++ b/dotnet-etcd/LeaseExpiredOrNotFoundException.cs @@ -11,7 +11,7 @@ public class LeaseExpiredOrNotFoundException : Exception { private readonly long _leaseId; - public LeaseExpiredOrNotFoundException(long leaseId) + public LeaseExpiredOrNotFoundException(long leaseId) : base("leaseId=" + leaseId) { _leaseId = leaseId; } @@ -34,6 +34,9 @@ protected LeaseExpiredOrNotFoundException( info, context) { + info.AddValue( + name: "leaseId", + value: _leaseId); } } } diff --git a/dotnet-etcd/leaseClient.cs b/dotnet-etcd/leaseClient.cs index 1bcbeaa..ce23526 100644 --- a/dotnet-etcd/leaseClient.cs +++ b/dotnet-etcd/leaseClient.cs @@ -3,6 +3,8 @@ using System; using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using System.IO; using System.Linq; using System.Net.Http.Headers; @@ -124,14 +126,14 @@ public async Task LeaseKeepAlive(long leaseId, CancellationToken cancellationTok /// /// lease identifier /// the remaining TTL at the time the method was called. used to determine initial deadlines - /// + /// /// /// /// /// throws the exception if lease not found, expired, revoked or keep alive unsuccessfully /// is received within the lease TTL or public async Task HighlyReliableLeaseKeepAliveAsync(long leaseId, long leaseRemainigTTL, - int tryDurationMs, int maxRetryBackoffMs, int sleepAfterSuccessMs, CancellationToken cancellationToken) + int retryDurationMs, int maxRetryBackoffMs, int sleepAfterSuccessMs, CancellationToken cancellationToken) { int startNodeIndex = (new Random()).Next(_balancer._numNodes); while (true) // keepAlive rounds @@ -139,7 +141,7 @@ public async Task HighlyReliableLeaseKeepAliveAsync(long leaseId, long leaseRema cancellationToken.ThrowIfCancellationRequested(); var roundCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); int usedKeepAliveJobs = 0; - int delayBetweenUseNewKeepAliveJob = tryDurationMs / _balancer._numNodes; + int delayBetweenUseNewKeepAliveJob = retryDurationMs / _balancer._numNodes; startNodeIndex = ++startNodeIndex >= _balancer._numNodes ? 0 : startNodeIndex; DateTime leaseExpiredAt = DateTime.Now.ToUniversalTime().AddSeconds(leaseRemainigTTL); List> keepAliveJobs = new List>(); @@ -152,19 +154,21 @@ public async Task HighlyReliableLeaseKeepAliveAsync(long leaseId, long leaseRema ? currentNodeIndex - _balancer._numNodes : currentNodeIndex; Connection connection = _balancer._healthyNode.ElementAt(currentNodeIndex); + Task keepAliveJob = RetryUntilKeepAliveResponseAsync( + leaseId, + connection, + retryDurationMs, + maxRetryBackoffMs, + leaseExpiredAt, + roundCancellationTokenSource.Token); + keepAliveJobs.Add(keepAliveJob); - Task keepAliveJob = await InvokeWithWaitLimitAsync( - () => OneTimeKeepAliveWithRetryAsync( - leaseId, - connection, - tryDurationMs, - maxRetryBackoffMs, - leaseExpiredAt, - roundCancellationTokenSource.Token), + await WhenAnySuccessLimitedAsync( + keepAliveJobs, waitLimitMs: delayBetweenUseNewKeepAliveJob, - cancellationToken: roundCancellationTokenSource.Token); - keepAliveJobs.Add(keepAliveJob); - if (IsAnyTaskCompletedSuccessfully(keepAliveJobs, out var _)) + cancellationToken: roundCancellationTokenSource.Token).ConfigureAwait(false); + + if (IsAnyCompletedSuccessfully(keepAliveJobs, out var _)) { roundCancellationTokenSource.Cancel(); break; @@ -173,14 +177,14 @@ public async Task HighlyReliableLeaseKeepAliveAsync(long leaseId, long leaseRema try { - await Task.WhenAll(keepAliveJobs.ToArray()); + await Task.WhenAll(keepAliveJobs.ToArray()).ConfigureAwait(false); } catch (Exception e) { // ignored exceptions will handled later } - if (IsAnyTaskCompletedSuccessfully( + if (IsAnyCompletedSuccessfully( keepAliveJobs, out var keepAliveResponse) && keepAliveResponse.TTL > 0) @@ -188,7 +192,7 @@ public async Task HighlyReliableLeaseKeepAliveAsync(long leaseId, long leaseRema await Task.Delay( sleepAfterSuccessMs, - cancellationToken); + cancellationToken).ConfigureAwait(false); leaseRemainigTTL = Math.Max(0,keepAliveResponse.TTL - sleepAfterSuccessMs / 1000); continue; //go to next round } @@ -197,27 +201,34 @@ await Task.Delay( { new LeaseExpiredOrNotFoundException(leaseId), }; - exceptions.AddRange( - keepAliveJobs - .Where(job => !job.IsCompletedSuccessfully) - .Select(job => job.Exception)); // collect all exceptions + exceptions.AddRange(keepAliveJobs + .Select(job => job.Exception) + .Where(exception=>exception != null)); // collect all exceptions throw new AggregateException(exceptions); } - async Task> InvokeWithWaitLimitAsync(Func> func, int waitLimitMs, CancellationToken cancellationToken) + async Task WhenAnySuccessLimitedAsync(IEnumerable tasks,int waitLimitMs, CancellationToken cancellationToken) { + List runningTasks = new List(tasks ?? Array.Empty()); Task waitLimit = Task.Delay( waitLimitMs, cancellationToken); - var responseTask = func(); - await Task.WhenAny(responseTask,waitLimit).ConfigureAwait(false); - return responseTask; + while (runningTasks.Count > 0) + { + cancellationToken.ThrowIfCancellationRequested(); + Task completed = await Task.WhenAny(runningTasks.Append(waitLimit)).ConfigureAwait(false); + if (completed.IsCompletedSuccessfully || completed == waitLimit) + { + return; + } + runningTasks.Remove(completed); + } } - async Task OneTimeKeepAliveWithRetryAsync(long leaseId, Connection connection, - int tryDurationMs, int maxRetryBackoffMs, DateTime deadline, CancellationToken cancellationToken) + async Task RetryUntilKeepAliveResponseAsync(long leaseId, Connection connection, + int retryDurationMs, int maxRetryBackoffMs, DateTime deadline, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); // timeoutPolicy thrown own exception, that overlap retry exceptions, @@ -226,7 +237,8 @@ async Task OneTimeKeepAliveWithRetryAsync(long leaseId, var timeout = deadline.ToUniversalTime() - DateTime.Now.ToUniversalTime(); var timeoutPolicy = Policy.TimeoutAsync(timeout); TimeSpan maxRetryBackoff = TimeSpan.FromMilliseconds(maxRetryBackoffMs); - var delay = Backoff.DecorrelatedJitterBackoffV2( + var retryDelay = + Backoff.DecorrelatedJitterBackoffV2( fastFirst: true, medianFirstRetryDelay: TimeSpan.FromMilliseconds(100), retryCount: int.MaxValue) @@ -238,9 +250,9 @@ async Task OneTimeKeepAliveWithRetryAsync(long leaseId, var retryPolicy = Policy .Handle(e => e is LeaseExpiredOrNotFoundException == false) .WaitAndRetryAsync( - delay, + retryDelay, onRetry: (exception, _) => retryExceptions.Add(exception)); - var retryTimeoutPolicy = Policy.TimeoutAsync(TimeSpan.FromMilliseconds(tryDurationMs)); + var retryTimeoutPolicy = Policy.TimeoutAsync(TimeSpan.FromMilliseconds(retryDurationMs)); var policy = Policy.WrapAsync( timeoutPolicy, @@ -253,28 +265,27 @@ async Task OneTimeKeepAliveWithRetryAsync(long leaseId, cancellationToken: cancellationToken, action: async retryCancellationToken => { + // var retryCancellationToken = cancellationToken; retryCancellationToken.ThrowIfCancellationRequested(); - using (AsyncDuplexStreamingCall leaser = - connection._leaseClient - .LeaseKeepAlive(cancellationToken: retryCancellationToken)) + using AsyncDuplexStreamingCall leaser = + connection._leaseClient + .LeaseKeepAlive(cancellationToken: retryCancellationToken); + // ReSharper disable once MethodSupportsCancellation //method doesn't support cancellation + await leaser.RequestStream.WriteAsync( + new LeaseKeepAliveRequest() { ID = leaseId }); + bool result = await leaser.ResponseStream.MoveNext(retryCancellationToken) + .ConfigureAwait(false); + if (!result) { - // ReSharper disable once MethodSupportsCancellation //method doesn't support cancellation - await leaser.RequestStream.WriteAsync( - new LeaseKeepAliveRequest() { ID = leaseId }).ConfigureAwait(false); - bool result = await leaser.ResponseStream.MoveNext(retryCancellationToken) - .ConfigureAwait(false); - if (!result) - { - throw new RpcException( - new Status( - StatusCode.Aborted, - "didnt receive keepAlive response")); - } - - await leaser.RequestStream.CompleteAsync().ConfigureAwait(false); - return leaser.ResponseStream.Current; + throw new RpcException( + new Status( + StatusCode.Aborted, + "didnt receive keepAlive response")); } - }); + + await leaser.RequestStream.CompleteAsync(); + return leaser.ResponseStream.Current; + }).ConfigureAwait(false); return response; } catch (TimeoutRejectedException e) @@ -286,7 +297,7 @@ await leaser.RequestStream.WriteAsync( } } - bool IsAnyTaskCompletedSuccessfully(IEnumerable> tasks, + bool IsAnyCompletedSuccessfully(IEnumerable> tasks, out T response) { foreach (Task call in tasks) From 5b46d31cc190cb53ae23a00675d2f8609a6ba227 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Tue, 16 Aug 2022 10:44:28 +0600 Subject: [PATCH 10/17] =?UTF-8?q?=D0=B5=D1=89=D0=B5=20=D0=BD=D0=B5=D0=BC?= =?UTF-8?q?=D0=BD=D0=BE=D0=B3=D0=BE=20=D1=80=D0=B5=D1=84=D0=B0=D0=BA=D1=82?= =?UTF-8?q?=D0=BE=D1=80=D0=B8=D0=BD=D0=B3=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DevelopmentSandbox/Program.cs | 4 +-- .../LeaseExpiredOrNotFoundException.cs | 2 +- dotnet-etcd/leaseClient.cs | 28 ++++++++----------- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/DevelopmentSandbox/Program.cs b/DevelopmentSandbox/Program.cs index f2c42b9..54d371d 100644 --- a/DevelopmentSandbox/Program.cs +++ b/DevelopmentSandbox/Program.cs @@ -47,8 +47,8 @@ static async Task Main(string[] args) await client.HighlyReliableLeaseKeepAliveAsync( leaseId, 5, - retryDurationMs: 5000, - maxRetryBackoffMs: 5000, + retryDurationMs: 1000, + maxRetryBackoffMs: 400, sleepAfterSuccessMs: 5000 / 3, CancellationToken.None).ConfigureAwait(false); diff --git a/dotnet-etcd/LeaseExpiredOrNotFoundException.cs b/dotnet-etcd/LeaseExpiredOrNotFoundException.cs index ab0ddb4..cf61ff1 100644 --- a/dotnet-etcd/LeaseExpiredOrNotFoundException.cs +++ b/dotnet-etcd/LeaseExpiredOrNotFoundException.cs @@ -11,7 +11,7 @@ public class LeaseExpiredOrNotFoundException : Exception { private readonly long _leaseId; - public LeaseExpiredOrNotFoundException(long leaseId) : base("leaseId=" + leaseId) + public LeaseExpiredOrNotFoundException(long leaseId) : base("LeaseExpiredOrNotFoundException: leaseId=" + leaseId) { _leaseId = leaseId; } diff --git a/dotnet-etcd/leaseClient.cs b/dotnet-etcd/leaseClient.cs index ce23526..7d6e322 100644 --- a/dotnet-etcd/leaseClient.cs +++ b/dotnet-etcd/leaseClient.cs @@ -202,26 +202,26 @@ await Task.Delay( new LeaseExpiredOrNotFoundException(leaseId), }; exceptions.AddRange(keepAliveJobs - .Select(job => job.Exception) + .SelectMany(job => job.Exception?.InnerExceptions) .Where(exception=>exception != null)); // collect all exceptions throw new AggregateException(exceptions); } async Task WhenAnySuccessLimitedAsync(IEnumerable tasks,int waitLimitMs, CancellationToken cancellationToken) { - List runningTasks = new List(tasks ?? Array.Empty()); + List runningTasks = tasks?.ToList() ?? new List(); Task waitLimit = Task.Delay( waitLimitMs, cancellationToken); while (runningTasks.Count > 0) { cancellationToken.ThrowIfCancellationRequested(); - Task completed = await Task.WhenAny(runningTasks.Append(waitLimit)).ConfigureAwait(false); - if (completed.IsCompletedSuccessfully || completed == waitLimit) + Task completedTask = await Task.WhenAny(runningTasks.Append(waitLimit)).ConfigureAwait(false); + if (completedTask.IsCompletedSuccessfully || completedTask == waitLimit) { return; } - runningTasks.Remove(completed); + runningTasks.Remove(completedTask); } } @@ -234,20 +234,17 @@ async Task RetryUntilKeepAliveResponseAsync(long leaseId // timeoutPolicy thrown own exception, that overlap retry exceptions, // so this list used for catch the retry exceptions. List retryExceptions = new List(); - var timeout = deadline.ToUniversalTime() - DateTime.Now.ToUniversalTime(); - var timeoutPolicy = Policy.TimeoutAsync(timeout); + var timeoutPolicy = Policy.TimeoutAsync(deadline.ToUniversalTime() - DateTime.Now.ToUniversalTime()); + TimeSpan maxRetryBackoff = TimeSpan.FromMilliseconds(maxRetryBackoffMs); var retryDelay = Backoff.DecorrelatedJitterBackoffV2( - fastFirst: true, - medianFirstRetryDelay: TimeSpan.FromMilliseconds(100), - retryCount: int.MaxValue) - .Select( - s => TimeSpan.FromTicks( - Math.Min( - s.Ticks, - maxRetryBackoff.Ticks))); + fastFirst: true, + medianFirstRetryDelay: TimeSpan.FromMilliseconds(100), + retryCount: int.MaxValue) + .Select(s => s < maxRetryBackoff ? s : maxRetryBackoff); var retryPolicy = Policy + //retry on all exceptions except LeaseExpiredOrNotFoundException .Handle(e => e is LeaseExpiredOrNotFoundException == false) .WaitAndRetryAsync( retryDelay, @@ -265,7 +262,6 @@ async Task RetryUntilKeepAliveResponseAsync(long leaseId cancellationToken: cancellationToken, action: async retryCancellationToken => { - // var retryCancellationToken = cancellationToken; retryCancellationToken.ThrowIfCancellationRequested(); using AsyncDuplexStreamingCall leaser = connection._leaseClient From 1f726e6ab637b32ccbe02ad83fb5065c955ee28c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Tue, 16 Aug 2022 10:49:37 +0600 Subject: [PATCH 11/17] =?UTF-8?q?=D0=BA=D0=B0=D0=BD=D1=86=D0=B5=D0=BB?= =?UTF-8?q?=D0=B5=D0=B9=D1=88=D0=B5=D0=BD=20=D1=82=D0=BE=D0=BA=D0=B5=D0=BD?= =?UTF-8?q?=20=D0=B4=D0=BB=D1=8F=20=D0=B1=D1=8B=D1=81=D1=82=D1=80=D0=BE?= =?UTF-8?q?=D0=B9=20=D0=BE=D1=81=D1=82=D0=B0=D0=BD=D0=BE=D0=B2=D0=BA=D0=B8?= =?UTF-8?q?=20=D0=BE=D1=82=D0=BB=D0=B0=D0=B4=D0=BA=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DevelopmentSandbox/Program.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DevelopmentSandbox/Program.cs b/DevelopmentSandbox/Program.cs index 54d371d..c2944b0 100644 --- a/DevelopmentSandbox/Program.cs +++ b/DevelopmentSandbox/Program.cs @@ -50,7 +50,7 @@ await client.HighlyReliableLeaseKeepAliveAsync( retryDurationMs: 1000, maxRetryBackoffMs: 400, sleepAfterSuccessMs: 5000 / 3, - CancellationToken.None).ConfigureAwait(false); + cts.Token).ConfigureAwait(false); // await client.LeaseKeepAlive( // leaseId, From 6a945282f369d29080eaa0ff3e2f21072245db88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Thu, 18 Aug 2022 17:26:25 +0600 Subject: [PATCH 12/17] Integrations tests added --- tests/Integration/DelegateInterceptor.cs | 136 +++++++++++++++++ tests/Integration/DevelopmentProcessTests.cs | 77 ---------- tests/Integration/Framework.cs | 36 ----- tests/Integration/LeaseTests.cs | 146 +++++++++++++++++++ tests/Integration/MessageChannel.cs | 72 +++++++++ 5 files changed, 354 insertions(+), 113 deletions(-) create mode 100644 tests/Integration/DelegateInterceptor.cs delete mode 100644 tests/Integration/DevelopmentProcessTests.cs delete mode 100644 tests/Integration/Framework.cs create mode 100644 tests/Integration/LeaseTests.cs create mode 100644 tests/Integration/MessageChannel.cs diff --git a/tests/Integration/DelegateInterceptor.cs b/tests/Integration/DelegateInterceptor.cs new file mode 100644 index 0000000..60fa1a4 --- /dev/null +++ b/tests/Integration/DelegateInterceptor.cs @@ -0,0 +1,136 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Interceptors; + +namespace Integration; + +public class DelegateInterceptor : Interceptor + where TReq : class + where TRsp : class +{ + private readonly MessageChannel _requests = new(); + + private readonly MessageChannel _responses = new(); + + public override AsyncUnaryCall AsyncUnaryCall(TRequest request, + ClientInterceptorContext context, + AsyncUnaryCallContinuation continuation) + { + ValidateCall(); + + Task.Run(() => _requests.CastMessageType().WriteAsync(request)).Wait(); + var responseReceived = Task.Run(() => _responses.MoveNext()).Result; + if (!responseReceived) throw new Exception("response required"); + var response = _responses.CastMessageType().Current; + var call = new AsyncUnaryCall( + responseAsync: Task.FromResult(response), + responseHeadersAsync: Task.FromResult(new Metadata()), + getStatusFunc: () => new Status( + statusCode: StatusCode.OK, + detail: ""), + getTrailersFunc: () => new Metadata(), + disposeAction: () => { }); + return call; + } + + public override TResponse BlockingUnaryCall(TRequest request, + ClientInterceptorContext context, + BlockingUnaryCallContinuation continuation) + { + ValidateCall(); + + Task.Run(() => _requests.WriteAsync((request as TReq)!)).Wait(); + var responseReceived = Task.Run(() => _responses.MoveNext()).Result; + if (!responseReceived) throw new Exception("response required"); + return _requests.CastMessageType().Current; + } + + public override AsyncClientStreamingCall AsyncClientStreamingCall( + ClientInterceptorContext context, + AsyncClientStreamingCallContinuation continuation) + { + ValidateCall(); + + var call = new AsyncClientStreamingCall( + requestStream: _requests.CastMessageType(), + responseHeadersAsync: Task.FromResult(new Metadata()), + getStatusFunc: () => new Status( + statusCode: StatusCode.OK, + detail: ""), + getTrailersFunc: () => new Metadata(), + disposeAction: () => { }, + responseAsync: _responses.MoveNext().ContinueWith(moveNextask => + { + var responseReceived = moveNextask.Result; + if (!responseReceived) throw new Exception("response required"); + return _responses.CastMessageType().Current; + })); + return call; + } + + + public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, + ClientInterceptorContext context, + AsyncServerStreamingCallContinuation continuation) + { + ValidateCall(); + + Task.Run(() => _requests.WriteAsync((request as TReq)!)).Wait(); + var call = new AsyncServerStreamingCall( + responseStream: _responses.CastMessageType(), + responseHeadersAsync: Task.FromResult(new Metadata()), + getStatusFunc: () => new Status( + statusCode: StatusCode.OK, + detail: ""), + getTrailersFunc: () => new Metadata(), + disposeAction: () => { }); + return call; + } + + + public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall( + ClientInterceptorContext context, + AsyncDuplexStreamingCallContinuation continuation) + { + AsyncDuplexStreamingCall call = new( + requestStream: _requests.CastMessageType(), + responseStream: _responses.CastMessageType(), + responseHeadersAsync: Task.FromResult(new Metadata()), + getStatusFunc: () => new Status( + statusCode: StatusCode.OK, + detail: ""), + getTrailersFunc: () => new Metadata(), + disposeAction: () => { }); + return call; + } + + + public async Task WriteResponseAsync(TRsp rsp) + { + await _responses.WriteAsync(rsp); + } + + public async Task WriteResponseAsync(Exception exception) + { + await _responses.WriteExceptionAsync(exception); + } + + public async Task CloseResponseStreamAsync() + { + await _responses.CompleteAsync(); + } + + public IAsyncEnumerable ReadAllRequests(CancellationToken cancellationToken) + { + return _requests.ReadAllAsync(cancellationToken); + } + + private static void ValidateCall() + { + if (typeof(TReq) != typeof(TRequest) || typeof(TRsp) != typeof(TResponse)) + throw new Exception("Interceptor not applicable to these call"); + } +} \ No newline at end of file diff --git a/tests/Integration/DevelopmentProcessTests.cs b/tests/Integration/DevelopmentProcessTests.cs deleted file mode 100644 index 8f2db1a..0000000 --- a/tests/Integration/DevelopmentProcessTests.cs +++ /dev/null @@ -1,77 +0,0 @@ -using System; -using System.Diagnostics; -using System.Net; -using System.Net.Http; -using System.Security.Authentication; -using System.Threading; -using System.Threading.Tasks; -using dotnet_etcd; -using DotnetNiceGrpcLogs; -using Etcdserverpb; -using NUnit.Framework; - -namespace Integration; - -public class DevelopmentProcessTests -{ - private EtcdClient Client { get; } = new EtcdClient( - "http://127.0.0.1:2379", //todo: вытащить в конфигурацию - useLegacyRpcExceptionForCancellation: false, - interceptors: new GrpcLogsInterceptor( - Framework.Logger, - new LogsInterceptorOptions - { - //LoggerName = null, - IncludeLogData = true - })); - - [SetUp] - public async Task Setup() - { - await Framework.CleanEtcdTestsKeys(Client); - } - - [TearDown] - public async Task TearDownAsync() - { - await Framework.CleanEtcdTestsKeys(Client); - } - - //debug space here - [Test] - public async Task Test1() - { - Trace.Listeners.Add(new ConsoleTraceListener()); - // var rsp = await Client.LeaseGrantAsync(new LeaseGrantRequest() { TTL = 8 }); - // var ct = new CancellationTokenSource(); - // await Client.LeaseKeepAlive( - // rsp.ID, - // 30, - // ct.Token); - - Task.Run( - () => - { - while (true) - { - Task.Delay(500).GetAwaiter().GetResult(); - Client.Put( - "1", - DateTime.Now.ToString()); - } - - }); - - void Rsp(WatchResponse response) - { - Framework.Logger.ForContext( - "watchEvent", - response) - .Information("new watch event"); - } - - Client.WatchRange( - "", - method: (Action)Rsp); - } -} diff --git a/tests/Integration/Framework.cs b/tests/Integration/Framework.cs deleted file mode 100644 index bc45281..0000000 --- a/tests/Integration/Framework.cs +++ /dev/null @@ -1,36 +0,0 @@ -using System.Threading.Tasks; -using dotnet_etcd; -using DotnetNiceGrpcLogs; -using Serilog; -using Serilog.Sinks.SystemConsole.Themes; - -namespace Integration; - -public static class Framework -{ - public const string TestPrefix = "/Tests/"; - - public static ILogger Logger { get; } = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Console( - theme: SystemConsoleTheme.Literate, - outputTemplate: - "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}{Properties:j}{NewLine}") - .Enrich.FromLogContext() - .CreateLogger(); - - // public static EtcdClient Client { get; } = new EtcdClient( - // "http://localhost:23790,http://localhost:23791,http://localhost:23792", //todo: вытащить в конфигурацию - // useLegacyRpcExceptionForCancellation: false, - // interceptors: new GrpcLogsInterceptor( - // Logger, - // new LogsInterceptorOptions - // { - // //LoggerName = null, - // IncludeLogData = true - // })); - - public static async Task CleanEtcdTestsKeys(EtcdClient client) - { - await client.DeleteRangeAsync(TestPrefix); - } - -} \ No newline at end of file diff --git a/tests/Integration/LeaseTests.cs b/tests/Integration/LeaseTests.cs new file mode 100644 index 0000000..50ca857 --- /dev/null +++ b/tests/Integration/LeaseTests.cs @@ -0,0 +1,146 @@ +using System; +using System.Data.Common; +using System.Diagnostics; +using System.Net; +using System.Net.Http; +using System.Security.Authentication; +using System.Threading; +using System.Threading.Tasks; +using dotnet_etcd; +using DotnetNiceGrpcLogs; +using Etcdserverpb; +using Grpc.Core; +using NUnit.Framework; +using Polly; + +namespace Integration; + +public class LeaseTests +{ + private const string ConnectionString = "http://127.0.0.1:23790,http://127.0.0.1:23790,http://127.0.0.1:23790"; + + [SetUp] + public async Task Setup() + { + } + + [TearDown] + public async Task TearDownAsync() + { + } + + [Test] + public async Task LeaseGranded() + { + var delegateInterceptor = new DelegateInterceptor(); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: delegateInterceptor); + LeaseGrantRequest request = new() { ID = 777, TTL = 777 }; + LeaseGrantResponse response = new() { ID = 888, TTL = 888 }; + var responseTask = Task.Run(() => + { + return client.LeaseGrantAsync(request); + }); + await foreach (LeaseGrantRequest req in delegateInterceptor.ReadAllRequests(CancellationToken.None)) + { + Assert.AreEqual(req, request); + await delegateInterceptor.WriteResponseAsync(response); + break; + } + + var rsp = responseTask.Result; + Assert.AreEqual(rsp, response); + } + + [Test] + public async Task AfterExceptionsLeaseGranded() + { + var delegateInterceptor = new DelegateInterceptor(); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: delegateInterceptor); + LeaseGrantRequest request = new() { ID = 777, TTL = 777 }; + LeaseGrantResponse response = new() { ID = 888, TTL = 888 }; + var responseTask = Task.Run(() => + { + return client.LeaseGrantAsync(request); + }); + RpcException unavailableException = new RpcException( + new Status( + StatusCode.Unavailable, + "")); + + var iterator = delegateInterceptor.ReadAllRequests(CancellationToken.None).GetAsyncEnumerator(); + await iterator.MoveNextAsync(); + var req = iterator.Current; + await delegateInterceptor.WriteResponseAsync(unavailableException); + await delegateInterceptor.WriteResponseAsync(unavailableException); + await delegateInterceptor.WriteResponseAsync(response); + var rsp = responseTask.Result; + + Assert.AreEqual(rsp, response); + } + + [Test] + public async Task AfterThreeExceptionsLeaseGrandedFail() + { + var delegateInterceptor = new DelegateInterceptor(); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: + delegateInterceptor); + LeaseGrantRequest request = new() { ID = 777, TTL = 777 }; + var responseTask = Task.Run(() => { return client.LeaseGrantAsync(request); }); + RpcException unavailableException = new RpcException( + new Status( + StatusCode.Unavailable, + "")); + await delegateInterceptor.WriteResponseAsync(unavailableException); + await delegateInterceptor.WriteResponseAsync(unavailableException); + await delegateInterceptor.WriteResponseAsync(unavailableException); + var ex = Assert.Throws( + () => + { + try + { + responseTask.Wait(); + } + catch (AggregateException e) + { + + throw e.InnerException; + } + }); + Assert.That(ex.Status.StatusCode, Is.EqualTo(StatusCode.Unavailable)); + } + + [Test] + public async Task LeaseKeepAliveRequestSendedAfterDelay() + { + var delegateInterceptor = new DelegateInterceptor(); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: + delegateInterceptor); + var responseTask = Task.Run( + () => + { + return client.LeaseKeepAlive( + 777, + CancellationToken.None); + }); + var iterator = delegateInterceptor.ReadAllRequests(CancellationToken.None).GetAsyncEnumerator(); + await iterator.MoveNextAsync(); + var req = iterator.Current; + Assert.AreEqual(req.ID, 777); + await delegateInterceptor.WriteResponseAsync(new LeaseKeepAliveResponse() { ID = 777, TTL = 1 }); + var nextKeepAliveTask = iterator.MoveNextAsync(); + var ex = Assert.Throws( + () => nextKeepAliveTask.AsTask() + .WaitAsync(TimeSpan.FromMilliseconds(100)) + .GetAwaiter().GetResult()); + await Task.Delay(300); + Assert.True(nextKeepAliveTask.Result); + } +} diff --git a/tests/Integration/MessageChannel.cs b/tests/Integration/MessageChannel.cs new file mode 100644 index 0000000..67d3c0b --- /dev/null +++ b/tests/Integration/MessageChannel.cs @@ -0,0 +1,72 @@ +using System; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Grpc.Core; + +namespace Integration; + +public class MessageChannel : IClientStreamWriter, IAsyncStreamReader +{ + private Channel<(object, Exception?)> _channel = Channel.CreateUnbounded<(object, Exception?)>(); + private (object message, Exception? exception) _current; + private Func GetCurrentFunc { get; set; } + private Action<(object message, Exception? exception)> SetCurrentFunc { get; set; } + public MessageChannel() + { + GetCurrentFunc = () => + { + if (_current.exception != null) throw _current.exception; + return (T)_current.message!; + }; + SetCurrentFunc = current => _current = current; + } + + public MessageChannel CastMessageType() + { + var ch = new MessageChannel(); + ch._channel = _channel; + ch.GetCurrentFunc = () => + { + object current = GetCurrentFunc()!; + return (TNew)current; + }; + ch.SetCurrentFunc = this.SetCurrentFunc; + return ch; + } + + #region IClientStreamWriter + + public async Task WriteAsync(T message) + { + await _channel.Writer.WriteAsync((message, null)!); + } + + public async Task WriteExceptionAsync(Exception exception) + { + await _channel.Writer.WriteAsync((null, exception)!); + } + + public WriteOptions? WriteOptions { get; set; } + + public async Task CompleteAsync() + { + _channel.Writer.Complete(); + } + + #endregion + + #region IAsyncStreamReader + + public async Task MoveNext(CancellationToken cancellationToken) + { + var haveNext = await _channel.Reader.WaitToReadAsync(cancellationToken); + if (!haveNext) return false; + var current = await _channel.Reader.ReadAsync(cancellationToken); + SetCurrentFunc(current); + return true; + } + public T Current => GetCurrentFunc(); + + #endregion +} \ No newline at end of file From bcf08e977a3c86888736436343d6c051cead2b4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Thu, 18 Aug 2022 17:26:54 +0600 Subject: [PATCH 13/17] =?UTF-8?q?=D0=BC=D0=B5=D0=BB=D0=BA=D0=B8=D0=B5=20?= =?UTF-8?q?=D0=BF=D1=80=D0=B0=D0=B2=D0=BA=D0=B8=20=D0=B8=20=D0=B7=D0=B0?= =?UTF-8?q?=D0=BC=D0=B5=D1=87=D0=B0=D0=BD=D0=B8=D1=8F=20=D0=BF=D0=BE=D1=81?= =?UTF-8?q?=D0=BB=D0=B5=20=D1=80=D0=B5=D0=B2=D1=8C=D1=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dotnet-etcd/leaseClient.cs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/dotnet-etcd/leaseClient.cs b/dotnet-etcd/leaseClient.cs index 7d6e322..136bf85 100644 --- a/dotnet-etcd/leaseClient.cs +++ b/dotnet-etcd/leaseClient.cs @@ -143,12 +143,13 @@ public async Task HighlyReliableLeaseKeepAliveAsync(long leaseId, long leaseRema int usedKeepAliveJobs = 0; int delayBetweenUseNewKeepAliveJob = retryDurationMs / _balancer._numNodes; startNodeIndex = ++startNodeIndex >= _balancer._numNodes ? 0 : startNodeIndex; - DateTime leaseExpiredAt = DateTime.Now.ToUniversalTime().AddSeconds(leaseRemainigTTL); + DateTime leaseExpiredAt = DateTime.UtcNow.AddSeconds(leaseRemainigTTL); List> keepAliveJobs = new List>(); while (usedKeepAliveJobs < _balancer._numNodes) { usedKeepAliveJobs++; roundCancellationTokenSource.Token.ThrowIfCancellationRequested(); + //todo: вынести работу с выбором ноды в отдельный метод int currentNodeIndex = startNodeIndex + usedKeepAliveJobs; currentNodeIndex = currentNodeIndex >= _balancer._numNodes ? currentNodeIndex - _balancer._numNodes @@ -177,6 +178,7 @@ await WhenAnySuccessLimitedAsync( try { + //todo: не понятно что дедлайн лизы заложен внутри джоб, подумать как сделать проще await Task.WhenAll(keepAliveJobs.ToArray()).ConfigureAwait(false); } catch (Exception e) @@ -234,18 +236,18 @@ async Task RetryUntilKeepAliveResponseAsync(long leaseId // timeoutPolicy thrown own exception, that overlap retry exceptions, // so this list used for catch the retry exceptions. List retryExceptions = new List(); - var timeoutPolicy = Policy.TimeoutAsync(deadline.ToUniversalTime() - DateTime.Now.ToUniversalTime()); + var timeoutPolicy = Policy.TimeoutAsync(deadline - DateTime.UtcNow); TimeSpan maxRetryBackoff = TimeSpan.FromMilliseconds(maxRetryBackoffMs); var retryDelay = Backoff.DecorrelatedJitterBackoffV2( fastFirst: true, - medianFirstRetryDelay: TimeSpan.FromMilliseconds(100), + medianFirstRetryDelay: TimeSpan.FromMilliseconds(100), //todo: вынести в параметры retryCount: int.MaxValue) .Select(s => s < maxRetryBackoff ? s : maxRetryBackoff); var retryPolicy = Policy //retry on all exceptions except LeaseExpiredOrNotFoundException - .Handle(e => e is LeaseExpiredOrNotFoundException == false) + .Handle() .WaitAndRetryAsync( retryDelay, onRetry: (exception, _) => retryExceptions.Add(exception)); @@ -257,7 +259,7 @@ async Task RetryUntilKeepAliveResponseAsync(long leaseId retryTimeoutPolicy); try { - var response = await policy.ExecuteAsync( + LeaseKeepAliveResponse response = await policy.ExecuteAsync( continueOnCapturedContext: false, cancellationToken: cancellationToken, action: async retryCancellationToken => From 0d99bd21023439daba4d5afe1cd31f982455fad5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Thu, 18 Aug 2022 17:30:07 +0600 Subject: [PATCH 14/17] =?UTF-8?q?=D1=81=D0=BE=D1=85=D1=80=D0=B0=D0=BD?= =?UTF-8?q?=D0=B8=D0=BB=20=20=D0=BF=D1=80=D0=BE=D0=B5=D0=BA=D1=82=20=D0=BF?= =?UTF-8?q?=D0=B5=D1=81=D0=BE=D1=87=D0=BD=D0=B8=D1=86=D1=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DevelopmentSandbox/DevelopmentSandbox.csproj | 1 + DevelopmentSandbox/Program.cs | 151 ++++++++---------- .../Properties/launchSettings.json | 2 +- 3 files changed, 72 insertions(+), 82 deletions(-) diff --git a/DevelopmentSandbox/DevelopmentSandbox.csproj b/DevelopmentSandbox/DevelopmentSandbox.csproj index 2f45200..961e278 100644 --- a/DevelopmentSandbox/DevelopmentSandbox.csproj +++ b/DevelopmentSandbox/DevelopmentSandbox.csproj @@ -15,6 +15,7 @@ + diff --git a/DevelopmentSandbox/Program.cs b/DevelopmentSandbox/Program.cs index c2944b0..f298bef 100644 --- a/DevelopmentSandbox/Program.cs +++ b/DevelopmentSandbox/Program.cs @@ -1,96 +1,85 @@ -using System.Net; -using System.Runtime.CompilerServices; -using dotnet_etcd; -using DotnetNiceGrpcLogs; +using dotnet_etcd; using Etcdserverpb; +using Google.Protobuf; using Grpc.Core; -using Polly; -using Polly.Contrib.WaitAndRetry; -using Polly.Timeout; -using Polly.Utilities; +using Integration; using Serilog; using Serilog.Sinks.SystemConsole.Themes; -namespace DevelopmentSandbox // Note: actual namespace depends on the project name. +namespace DevelopmentSandbox; // Note: actual namespace depends on the project name. + + + +internal class Program { - internal class Program + private static async Task Main(string[] args) { - static async Task Main(string[] args) - { - var cts = new CancellationTokenSource(); - AppDomain.CurrentDomain.ProcessExit += (_, _) => { cts.Cancel(); }; - Console.CancelKeyPress += (_, ea) => { cts.Cancel(); }; - ILogger logger = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Console( - theme: SystemConsoleTheme.Literate, - outputTemplate: - "[{Timestamp:HH:mm:ss.fff} {Level:u3}] {Message:lj}{NewLine}{Exception}{Properties:j}{NewLine}") - .Enrich.FromLogContext() - .CreateLogger(); - - string connection_string = Environment.GetEnvironmentVariable("ETCD_CONNECTION_STRING"); - EtcdClient client = new EtcdClient( - connection_string, - // handler: handler, - useLegacyRpcExceptionForCancellation: false //, - // interceptors: new GrpcLogsInterceptor( - // logger, - // new LogsInterceptorOptions - // { - // //LoggerName = null, - // IncludeLogData = true - // }) - ); + var cts = new CancellationTokenSource(); + AppDomain.CurrentDomain.ProcessExit += (_, _) => { cts.Cancel(); }; + Console.CancelKeyPress += (_, ea) => { cts.Cancel(); }; + ILogger logger = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Console( + theme: SystemConsoleTheme.Literate, + outputTemplate: + "[{Timestamp:HH:mm:ss.fff} {Level:u3}] {Message:lj}{NewLine}{Exception}{Properties:j}{NewLine}") + .Enrich.FromLogContext() + .CreateLogger(); - Func doJob = async () => - { - var leaseId = client.LeaseGrant(new LeaseGrantRequest() { TTL = 5 }).ID; - await client.HighlyReliableLeaseKeepAliveAsync( - leaseId, - 5, - retryDurationMs: 1000, - maxRetryBackoffMs: 400, - sleepAfterSuccessMs: 5000 / 3, - cts.Token).ConfigureAwait(false); - - // await client.LeaseKeepAlive( - // leaseId, - // CancellationToken.None).ConfigureAwait(false); - }; + var connection_string = Environment.GetEnvironmentVariable("ETCD_CONNECTION_STRING"); - List jobs = new List(1000); + + var client = new EtcdClient( + connectionString: connection_string, + // handler: handler, + useLegacyRpcExceptionForCancellation: false); + - foreach (var i in Enumerable.Range( - 0, - 20000)) + Func doJob = async () => + { + var leaseId = client.LeaseGrant(new LeaseGrantRequest() { TTL = 5 }).ID; + await client.HighlyReliableLeaseKeepAliveAsync( + leaseId, + 5, + retryDurationMs: 1000, + maxRetryBackoffMs: 400, + sleepAfterSuccessMs: 5000 / 3, + cts.Token).ConfigureAwait(false); + + // await client.LeaseKeepAlive( + // leaseId, + // CancellationToken.None).ConfigureAwait(false); + }; + + List jobs = new List(1000); + + foreach (var i in Enumerable.Range( + 0, + 20000)) + { + + await Task.Delay(5); //что бы кипалайвы были в приоритете создания новых тасков + if (cts.Token.IsCancellationRequested) { - - await Task.Delay(5); //что бы кипалайвы были в приоритете создания новых тасков - if (cts.Token.IsCancellationRequested) + break; + } + + var t = Task.Run( + async () => { - break; - } - - var t = Task.Run( - async () => + cts.Token.ThrowIfCancellationRequested(); + Console.WriteLine(i); + try { - cts.Token.ThrowIfCancellationRequested(); - Console.WriteLine(i); - try - { - await doJob().ConfigureAwait(false); - } - finally - { - cts.Cancel(); - } - }, - cts.Token); - jobs.Add(t); - } - - await await Task.WhenAny(jobs); - logger.Information("endddd"); - + await doJob().ConfigureAwait(false); + } + finally + { + cts.Cancel(); + } + }, + cts.Token); + jobs.Add(t); } + + await await Task.WhenAny(jobs); } -}; \ No newline at end of file +} \ No newline at end of file diff --git a/DevelopmentSandbox/Properties/launchSettings.json b/DevelopmentSandbox/Properties/launchSettings.json index dcf8593..40973bc 100644 --- a/DevelopmentSandbox/Properties/launchSettings.json +++ b/DevelopmentSandbox/Properties/launchSettings.json @@ -4,7 +4,7 @@ "DevelopmentSandbox": { "commandName": "Project", "environmentVariables": { - "ETCD_CONNECTION_STRING": "http://127.0.0.1:23790,http://127.0.0.1:23791,http://127.0.0.1:23792" + "ETCD_CONNECTION_STRING": "http://127.0.0.1:23790,http://127.0.0.1:23790,http://127.0.0.1:23790" } } } From 02142f106f49615191784484ee57548a204f1b41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Thu, 18 Aug 2022 23:14:49 +0600 Subject: [PATCH 15/17] =?UTF-8?q?=D0=BF=D0=B5=D1=80=D0=B5=D0=B4=D0=B5?= =?UTF-8?q?=D0=BB=D0=B0=D0=BB=20=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D1=83=20?= =?UTF-8?q?=D1=81=20=D1=82=D0=B5=D1=81=D1=82=D0=B0=D0=BC=D0=B8=20=D0=BD?= =?UTF-8?q?=D0=B0=20=D0=BF=D0=BE=D0=B4=D0=B4=D0=B5=D1=80=D0=B6=D0=BA=D1=83?= =?UTF-8?q?=20=D0=BD=D0=B5=D1=81=D0=BA=D0=BE=D0=BB=D1=8C=D0=BA=D0=B8=D1=85?= =?UTF-8?q?=20=D0=BA=D0=B0=D0=BD=D0=B0=D0=BB=D0=BE=D0=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/Integration/DelegateInterceptor.cs | 116 ++++++++++----- tests/Integration/LeaseTests.cs | 181 +++++++++++++---------- tests/Integration/MessageChannel.cs | 72 --------- tests/Integration/MessageStore.cs | 142 ++++++++++++++++++ 4 files changed, 324 insertions(+), 187 deletions(-) delete mode 100644 tests/Integration/MessageChannel.cs create mode 100644 tests/Integration/MessageStore.cs diff --git a/tests/Integration/DelegateInterceptor.cs b/tests/Integration/DelegateInterceptor.cs index 60fa1a4..9774c32 100644 --- a/tests/Integration/DelegateInterceptor.cs +++ b/tests/Integration/DelegateInterceptor.cs @@ -1,9 +1,13 @@ using System; using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Reflection; using System.Threading; using System.Threading.Tasks; using Grpc.Core; using Grpc.Core.Interceptors; +using Grpc.Net.Client; namespace Integration; @@ -11,9 +15,8 @@ public class DelegateInterceptor : Interceptor where TReq : class where TRsp : class { - private readonly MessageChannel _requests = new(); - - private readonly MessageChannel _responses = new(); + private readonly MessageStore _requestsStore = new(); + private readonly MessageStore _responsesStore = new(); public override AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, @@ -21,10 +24,14 @@ public override AsyncUnaryCall AsyncUnaryCall(TR { ValidateCall(); - Task.Run(() => _requests.CastMessageType().WriteAsync(request)).Wait(); - var responseReceived = Task.Run(() => _responses.MoveNext()).Result; - if (!responseReceived) throw new Exception("response required"); - var response = _responses.CastMessageType().Current; + + string address = GetEtcdAdders(continuation); + _requestsStore.WriteAsync( + address, + request).Wait(); + var enumerator = _responsesStore.GetReader(address); + if (!enumerator.MoveNext().Result) throw new Exception("response required"); + var response = enumerator.Current; var call = new AsyncUnaryCall( responseAsync: Task.FromResult(response), responseHeadersAsync: Task.FromResult(new Metadata()), @@ -41,46 +48,52 @@ public override TResponse BlockingUnaryCall(TRequest reques BlockingUnaryCallContinuation continuation) { ValidateCall(); - - Task.Run(() => _requests.WriteAsync((request as TReq)!)).Wait(); - var responseReceived = Task.Run(() => _responses.MoveNext()).Result; - if (!responseReceived) throw new Exception("response required"); - return _requests.CastMessageType().Current; + + string address = GetEtcdAdders(continuation); + _requestsStore.WriteAsync( + address, + request).Wait(); + var enumerator = _responsesStore.GetReader(address); + if (!enumerator.MoveNext().Result) throw new Exception("response required"); + return enumerator.Current; } - + public override AsyncClientStreamingCall AsyncClientStreamingCall( ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) { ValidateCall(); - + string address = GetEtcdAdders(continuation); + var reader = _responsesStore.GetReader(address); var call = new AsyncClientStreamingCall( - requestStream: _requests.CastMessageType(), + requestStream: _requestsStore.GetWriter(address), responseHeadersAsync: Task.FromResult(new Metadata()), getStatusFunc: () => new Status( statusCode: StatusCode.OK, detail: ""), getTrailersFunc: () => new Metadata(), disposeAction: () => { }, - responseAsync: _responses.MoveNext().ContinueWith(moveNextask => - { - var responseReceived = moveNextask.Result; - if (!responseReceived) throw new Exception("response required"); - return _responses.CastMessageType().Current; - })); + responseAsync: Task.Run( + async () => + { + await reader.MoveNext(); + return reader.Current; + })); return call; } - - + + public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, ClientInterceptorContext context, AsyncServerStreamingCallContinuation continuation) { ValidateCall(); - - Task.Run(() => _requests.WriteAsync((request as TReq)!)).Wait(); + string address = GetEtcdAdders(continuation); + _requestsStore.WriteAsync( + address, + request).Wait(); var call = new AsyncServerStreamingCall( - responseStream: _responses.CastMessageType(), + responseStream: _responsesStore.GetReader(address), responseHeadersAsync: Task.FromResult(new Metadata()), getStatusFunc: () => new Status( statusCode: StatusCode.OK, @@ -89,15 +102,18 @@ public override AsyncServerStreamingCall AsyncServerStreamingCall { }); return call; } - - + + public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall( ClientInterceptorContext context, AsyncDuplexStreamingCallContinuation continuation) { + ValidateCall(); + string address = GetEtcdAdders(continuation); + AsyncDuplexStreamingCall call = new( - requestStream: _requests.CastMessageType(), - responseStream: _responses.CastMessageType(), + requestStream: _requestsStore.GetWriter(address), + responseStream: _responsesStore.GetReader(address), responseHeadersAsync: Task.FromResult(new Metadata()), getStatusFunc: () => new Status( statusCode: StatusCode.OK, @@ -108,24 +124,36 @@ public override AsyncDuplexStreamingCall AsyncDuplexStreami } - public async Task WriteResponseAsync(TRsp rsp) + public async Task WriteResponseAsync(string address, TRsp rsp) { - await _responses.WriteAsync(rsp); + await _responsesStore.WriteAsync(address,rsp); } - public async Task WriteResponseAsync(Exception exception) + public async Task WriteResponseAsync(string address, Exception exception) { - await _responses.WriteExceptionAsync(exception); + await _responsesStore.WriteAsync( + address, + null, + exception); } - public async Task CloseResponseStreamAsync() + public async Task CloseResponseStreamAsync(string address) { - await _responses.CompleteAsync(); + throw new NotImplementedException(); } - public IAsyncEnumerable ReadAllRequests(CancellationToken cancellationToken) + public IAsyncEnumerable ReadAllRequests(string address, CancellationToken cancellationToken) + { + return _requestsStore.GetReader(address).ReadAllAsync(cancellationToken); + } + + public async IAsyncEnumerable<(string address, TReq message)> ReadAllRequests(CancellationToken cancellationToken) { - return _requests.ReadAllAsync(cancellationToken); + await foreach (var (address, message, exception) in _requestsStore.ReadMessages()) + { + if (exception != null) throw exception; + yield return (address, (TReq)message!); + } } private static void ValidateCall() @@ -133,4 +161,16 @@ private static void ValidateCall() if (typeof(TReq) != typeof(TRequest) || typeof(TRsp) != typeof(TResponse)) throw new Exception("Interceptor not applicable to these call"); } + + private static string GetEtcdAdders(Delegate continuation) + { + object target = continuation.Target!; + object invoker = target.GetType().GetField("invoker",BindingFlags.Instance|BindingFlags.NonPublic)! + .GetValue(target)!; + GrpcChannel channel = (GrpcChannel)invoker.GetType() + .GetProperty( + "Channel", + BindingFlags.Instance | BindingFlags.NonPublic)!.GetValue(invoker)!; + return channel.Target; + } } \ No newline at end of file diff --git a/tests/Integration/LeaseTests.cs b/tests/Integration/LeaseTests.cs index 50ca857..dc0b7f9 100644 --- a/tests/Integration/LeaseTests.cs +++ b/tests/Integration/LeaseTests.cs @@ -17,7 +17,10 @@ namespace Integration; public class LeaseTests { - private const string ConnectionString = "http://127.0.0.1:23790,http://127.0.0.1:23790,http://127.0.0.1:23790"; + private const string Etcd1 = "127.0.0.1:23790"; + private const string Etcd2 = "127.0.0.1:23791"; + private const string Etcd3 = "127.0.0.1:23792"; + private const string ConnectionString = $"http://{Etcd1},http://{Etcd2},http://{Etcd3}"; [SetUp] public async Task Setup() @@ -29,8 +32,9 @@ public async Task TearDownAsync() { } + [Test] - public async Task LeaseGranded() + public async Task LeaseGranted() { var delegateInterceptor = new DelegateInterceptor(); var client = new EtcdClient( @@ -40,12 +44,12 @@ public async Task LeaseGranded() LeaseGrantResponse response = new() { ID = 888, TTL = 888 }; var responseTask = Task.Run(() => { - return client.LeaseGrantAsync(request); + return client.LeaseGrant(request); }); - await foreach (LeaseGrantRequest req in delegateInterceptor.ReadAllRequests(CancellationToken.None)) + await foreach ((string address, LeaseGrantRequest message) in delegateInterceptor.ReadAllRequests(CancellationToken.None)) { - Assert.AreEqual(req, request); - await delegateInterceptor.WriteResponseAsync(response); + Assert.AreEqual(message, request); + await delegateInterceptor.WriteResponseAsync(address,response); break; } @@ -54,7 +58,7 @@ public async Task LeaseGranded() } [Test] - public async Task AfterExceptionsLeaseGranded() + public async Task LeaseGrantedAsync() { var delegateInterceptor = new DelegateInterceptor(); var client = new EtcdClient( @@ -66,81 +70,104 @@ public async Task AfterExceptionsLeaseGranded() { return client.LeaseGrantAsync(request); }); - RpcException unavailableException = new RpcException( - new Status( - StatusCode.Unavailable, - "")); + await foreach ((string address, LeaseGrantRequest message) in delegateInterceptor.ReadAllRequests(CancellationToken.None)) + { + Assert.AreEqual(message, request); + await delegateInterceptor.WriteResponseAsync(address,response); + break; + } - var iterator = delegateInterceptor.ReadAllRequests(CancellationToken.None).GetAsyncEnumerator(); - await iterator.MoveNextAsync(); - var req = iterator.Current; - await delegateInterceptor.WriteResponseAsync(unavailableException); - await delegateInterceptor.WriteResponseAsync(unavailableException); - await delegateInterceptor.WriteResponseAsync(response); var rsp = responseTask.Result; - Assert.AreEqual(rsp, response); } + + [Test] + public async Task AfterExceptionsLeaseGranted() + { + var delegateInterceptor = new DelegateInterceptor(); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: delegateInterceptor); + LeaseGrantRequest request = new() { ID = 777, TTL = 777 }; + LeaseGrantResponse response = new() { ID = 888, TTL = 888 }; + var responseTask = Task.Run(() => + { + return client.LeaseGrantAsync(request); + }); + RpcException unavailableException = new RpcException( + new Status( + StatusCode.Unavailable, + "")); - [Test] - public async Task AfterThreeExceptionsLeaseGrandedFail() - { - var delegateInterceptor = new DelegateInterceptor(); - var client = new EtcdClient( - connectionString: ConnectionString, - interceptors: - delegateInterceptor); - LeaseGrantRequest request = new() { ID = 777, TTL = 777 }; - var responseTask = Task.Run(() => { return client.LeaseGrantAsync(request); }); - RpcException unavailableException = new RpcException( - new Status( - StatusCode.Unavailable, - "")); - await delegateInterceptor.WriteResponseAsync(unavailableException); - await delegateInterceptor.WriteResponseAsync(unavailableException); - await delegateInterceptor.WriteResponseAsync(unavailableException); - var ex = Assert.Throws( - () => - { - try - { - responseTask.Wait(); - } - catch (AggregateException e) - { + var iterator = delegateInterceptor.ReadAllRequests(CancellationToken.None).GetAsyncEnumerator(); + await iterator.MoveNextAsync(); + await delegateInterceptor.WriteResponseAsync(iterator.Current.address,unavailableException); + await iterator.MoveNextAsync(); + await delegateInterceptor.WriteResponseAsync(iterator.Current.address,unavailableException); + await iterator.MoveNextAsync(); + await delegateInterceptor.WriteResponseAsync(iterator.Current.address,response); + var rsp = responseTask.Result; + + Assert.AreEqual(rsp, response); + } - throw e.InnerException; - } - }); - Assert.That(ex.Status.StatusCode, Is.EqualTo(StatusCode.Unavailable)); - } + [Test] + public async Task AfterThreeExceptionsLeaseGrantedFail() + { + var delegateInterceptor = new DelegateInterceptor(); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: + delegateInterceptor); - [Test] - public async Task LeaseKeepAliveRequestSendedAfterDelay() - { - var delegateInterceptor = new DelegateInterceptor(); - var client = new EtcdClient( - connectionString: ConnectionString, - interceptors: - delegateInterceptor); - var responseTask = Task.Run( - () => - { - return client.LeaseKeepAlive( - 777, - CancellationToken.None); - }); - var iterator = delegateInterceptor.ReadAllRequests(CancellationToken.None).GetAsyncEnumerator(); - await iterator.MoveNextAsync(); - var req = iterator.Current; - Assert.AreEqual(req.ID, 777); - await delegateInterceptor.WriteResponseAsync(new LeaseKeepAliveResponse() { ID = 777, TTL = 1 }); - var nextKeepAliveTask = iterator.MoveNextAsync(); - var ex = Assert.Throws( - () => nextKeepAliveTask.AsTask() - .WaitAsync(TimeSpan.FromMilliseconds(100)) - .GetAwaiter().GetResult()); - await Task.Delay(300); - Assert.True(nextKeepAliveTask.Result); - } + RpcException unavailableException = new RpcException( + new Status( + StatusCode.Unavailable, + "")); + await delegateInterceptor.WriteResponseAsync(Etcd1,unavailableException); + await delegateInterceptor.WriteResponseAsync(Etcd2,unavailableException); + await delegateInterceptor.WriteResponseAsync(Etcd3,unavailableException); + LeaseGrantRequest request = new() { ID = 777, TTL = 777 }; + + var ex = Assert.Throws( + () => + { + try + { + client.LeaseGrantAsync(request).Wait(); + } + catch (AggregateException e) + { + + throw e.InnerException; + } + }); + Assert.That(ex.Status.StatusCode, Is.EqualTo(StatusCode.Unavailable)); + } + + [Test] + public async Task LeaseKeepAliveRequestSendedAfterDelay() + { + var delegateInterceptor = new DelegateInterceptor(); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: + delegateInterceptor); + var responseTask = Task.Run( + () => + { + return client.LeaseKeepAlive( + 777, + CancellationToken.None); + }); + var iterator = delegateInterceptor.ReadAllRequests(CancellationToken.None).GetAsyncEnumerator(); + await iterator.MoveNextAsync(); + Assert.AreEqual(iterator.Current.message.ID, 777); + await delegateInterceptor.WriteResponseAsync(iterator.Current.address,new LeaseKeepAliveResponse() { ID = 777, TTL = 1 }); + var nextKeepAliveTask = iterator.MoveNextAsync(); + await Task.Delay(100); + Assert.True(nextKeepAliveTask.IsCompleted == false); + await Task.Delay(300); + Assert.True(nextKeepAliveTask.Result); + } } diff --git a/tests/Integration/MessageChannel.cs b/tests/Integration/MessageChannel.cs deleted file mode 100644 index 67d3c0b..0000000 --- a/tests/Integration/MessageChannel.cs +++ /dev/null @@ -1,72 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Channels; -using System.Threading.Tasks; -using Grpc.Core; - -namespace Integration; - -public class MessageChannel : IClientStreamWriter, IAsyncStreamReader -{ - private Channel<(object, Exception?)> _channel = Channel.CreateUnbounded<(object, Exception?)>(); - private (object message, Exception? exception) _current; - private Func GetCurrentFunc { get; set; } - private Action<(object message, Exception? exception)> SetCurrentFunc { get; set; } - public MessageChannel() - { - GetCurrentFunc = () => - { - if (_current.exception != null) throw _current.exception; - return (T)_current.message!; - }; - SetCurrentFunc = current => _current = current; - } - - public MessageChannel CastMessageType() - { - var ch = new MessageChannel(); - ch._channel = _channel; - ch.GetCurrentFunc = () => - { - object current = GetCurrentFunc()!; - return (TNew)current; - }; - ch.SetCurrentFunc = this.SetCurrentFunc; - return ch; - } - - #region IClientStreamWriter - - public async Task WriteAsync(T message) - { - await _channel.Writer.WriteAsync((message, null)!); - } - - public async Task WriteExceptionAsync(Exception exception) - { - await _channel.Writer.WriteAsync((null, exception)!); - } - - public WriteOptions? WriteOptions { get; set; } - - public async Task CompleteAsync() - { - _channel.Writer.Complete(); - } - - #endregion - - #region IAsyncStreamReader - - public async Task MoveNext(CancellationToken cancellationToken) - { - var haveNext = await _channel.Reader.WaitToReadAsync(cancellationToken); - if (!haveNext) return false; - var current = await _channel.Reader.ReadAsync(cancellationToken); - SetCurrentFunc(current); - return true; - } - public T Current => GetCurrentFunc(); - - #endregion -} \ No newline at end of file diff --git a/tests/Integration/MessageStore.cs b/tests/Integration/MessageStore.cs new file mode 100644 index 0000000..2753408 --- /dev/null +++ b/tests/Integration/MessageStore.cs @@ -0,0 +1,142 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Grpc.Core; +using NUnit.Framework; +using Polly; + +namespace Integration; + +public class MessageStore +{ + private readonly object _locker = new(); + private readonly List<(string address, object? message, Exception? exception)> _store = new(); + private readonly List> _readers = new(); + + public async Task WriteAsync(string address, object? message, Exception? exception = null) + { + await Task.Run( + () => + { + lock (_locker) + { + _store.Add((address, message, exception)); + foreach (Channel<(string addres, object? message, Exception? exception)> channel in _readers) + { + channel.Writer.TryWrite((address, message, exception)); + } + } + }); + } + + + public async IAsyncEnumerable<(string address, object? message, Exception? exception)> ReadMessages() + { + + Channel<(string addres,object? message, Exception? exception)>? channel = null; + try + { + lock (_locker) + { + channel = Channel.CreateUnbounded<(string addres,object? message, Exception? exception)>(); + _readers.Add(channel); + List<(string address, object? message, Exception? exception)> existingMessages = new(_store); + foreach ((string address, object? message, Exception? exception) in existingMessages) + { + channel.Writer.TryWrite((address, message, exception)); + } + } + + await foreach ((string addres, object? message, Exception? exception) in channel.Reader.ReadAllAsync()) + { + yield return (addres, message, exception); + } + } + finally + { + lock (_locker) + { + if (channel != null) + { + _readers.Remove(channel); + } + } + } + } + + public async IAsyncEnumerable<(object? message, Exception? exception)> ReadMessages(string address) + { + await foreach (var (addr, message, exception) in ReadMessages()) + { + if (addr == address) + { + yield return (message, exception); + } + } + } + + + + public IClientStreamWriter GetWriter(string address) + { + return new DelegateStreamWriter( + async message => await WriteAsync(address, message, null), + async () => + { + }); + } + + public IAsyncStreamReader GetReader(string address) + { + return new StreamReader(ReadMessages(address).GetAsyncEnumerator()); + } + + + + private class DelegateStreamWriter : IClientStreamWriter + { + private readonly Func _onWrite; + private readonly Func _onComplete; + + public DelegateStreamWriter(Func onWrite, Func onComplete) + { + _onWrite = onWrite; + _onComplete = onComplete; + } + + + public async Task WriteAsync(T message) + { + await _onWrite(message); + } + + + public WriteOptions? WriteOptions { get; set; } + public async Task CompleteAsync() + { + await _onComplete(); + } + } + + private class StreamReader : IAsyncStreamReader + { + private readonly IAsyncEnumerator<(object? message, Exception? exception)> _enumerator; + private readonly Func _getCurrentFunc; + + public StreamReader(IAsyncEnumerator<(object?, Exception?)> enumerator) + { + _enumerator = enumerator; + _getCurrentFunc = () => _enumerator.Current.exception == null + ? (T)_enumerator.Current.message! + : throw _enumerator.Current.exception; + } + public async Task MoveNext(CancellationToken cancellationToken) + { + return await _enumerator.MoveNextAsync(); + } + + public T Current => _getCurrentFunc(); + } +} From 7d6529c94adf145a05c1c182ec66dd2b8f225409 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Fri, 19 Aug 2022 13:51:38 +0600 Subject: [PATCH 16/17] =?UTF-8?q?=D0=BF=D0=BE=D0=B4=D0=B4=D0=B5=D1=80?= =?UTF-8?q?=D0=B6=D0=BA=D0=B0=20=D0=BD=D0=B5=D1=81=D0=BA=D0=BE=D0=BB=D1=8C?= =?UTF-8?q?=D0=BA=D0=B8=D1=85=20=D0=B2=D1=8B=D0=B7=D0=BE=D0=B2=D0=BE=D0=B2?= =?UTF-8?q?=20=D0=B2=20=D0=BE=D0=B4=D0=B8=D0=BD=20=D0=BA=D0=B0=D0=BD=D0=B0?= =?UTF-8?q?=D0=BB=20+=20=D1=82=D0=B5=D1=81=D1=82=D1=8B=20=D0=B4=D0=BB?= =?UTF-8?q?=D1=8F=D1=8F=20=D1=81=D0=B5=D1=80=D0=B2=D0=B5=D1=80=D0=BD=D0=BE?= =?UTF-8?q?=D0=B3=D0=BE=20=D1=81=D1=82=D1=80=D0=B8=D0=BC=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/Integration/{LeaseTests.cs => Lease.cs} | 32 ++-- tests/Integration/Maitenance.cs | 130 ++++++++++++++ tests/Integration/MessageStore.cs | 142 --------------- .../{ => Utils}/DelegateInterceptor.cs | 68 ++++---- tests/Integration/Utils/MessageStore.cs | 162 ++++++++++++++++++ 5 files changed, 349 insertions(+), 185 deletions(-) rename tests/Integration/{LeaseTests.cs => Lease.cs} (80%) create mode 100644 tests/Integration/Maitenance.cs delete mode 100644 tests/Integration/MessageStore.cs rename tests/Integration/{ => Utils}/DelegateInterceptor.cs (74%) create mode 100644 tests/Integration/Utils/MessageStore.cs diff --git a/tests/Integration/LeaseTests.cs b/tests/Integration/Lease.cs similarity index 80% rename from tests/Integration/LeaseTests.cs rename to tests/Integration/Lease.cs index dc0b7f9..82eb293 100644 --- a/tests/Integration/LeaseTests.cs +++ b/tests/Integration/Lease.cs @@ -10,12 +10,13 @@ using DotnetNiceGrpcLogs; using Etcdserverpb; using Grpc.Core; +using Integration.Utils; using NUnit.Framework; using Polly; namespace Integration; -public class LeaseTests +public class Lease { private const string Etcd1 = "127.0.0.1:23790"; private const string Etcd2 = "127.0.0.1:23791"; @@ -46,10 +47,10 @@ public async Task LeaseGranted() { return client.LeaseGrant(request); }); - await foreach ((string address, LeaseGrantRequest message) in delegateInterceptor.ReadAllRequests(CancellationToken.None)) + await foreach ((string address, Guid callId, LeaseGrantRequest message, bool _) in delegateInterceptor.ReadAllRequests(CancellationToken.None)) { Assert.AreEqual(message, request); - await delegateInterceptor.WriteResponseAsync(address,response); + await delegateInterceptor.WriteResponseAsync(address, callId,response); break; } @@ -70,10 +71,10 @@ public async Task LeaseGrantedAsync() { return client.LeaseGrantAsync(request); }); - await foreach ((string address, LeaseGrantRequest message) in delegateInterceptor.ReadAllRequests(CancellationToken.None)) + await foreach ((string address, Guid callId, LeaseGrantRequest message, bool _) in delegateInterceptor.ReadAllRequests(CancellationToken.None)) { Assert.AreEqual(message, request); - await delegateInterceptor.WriteResponseAsync(address,response); + await delegateInterceptor.WriteResponseAsync(address, callId,response); break; } @@ -101,11 +102,14 @@ public async Task AfterExceptionsLeaseGranted() var iterator = delegateInterceptor.ReadAllRequests(CancellationToken.None).GetAsyncEnumerator(); await iterator.MoveNextAsync(); - await delegateInterceptor.WriteResponseAsync(iterator.Current.address,unavailableException); + var current = iterator.Current; + await delegateInterceptor.WriteResponseAsync(current.address, current.callId,unavailableException); await iterator.MoveNextAsync(); - await delegateInterceptor.WriteResponseAsync(iterator.Current.address,unavailableException); + current = iterator.Current; + await delegateInterceptor.WriteResponseAsync(current.address, current.callId,unavailableException); await iterator.MoveNextAsync(); - await delegateInterceptor.WriteResponseAsync(iterator.Current.address,response); + current = iterator.Current; + await delegateInterceptor.WriteResponseAsync(current.address, current.callId,response); var rsp = responseTask.Result; Assert.AreEqual(rsp, response); @@ -114,7 +118,7 @@ public async Task AfterExceptionsLeaseGranted() [Test] public async Task AfterThreeExceptionsLeaseGrantedFail() { - var delegateInterceptor = new DelegateInterceptor(); + var delegateInterceptor = new DelegateInterceptor(ignoreCallId: true); var client = new EtcdClient( connectionString: ConnectionString, interceptors: @@ -124,9 +128,9 @@ public async Task AfterThreeExceptionsLeaseGrantedFail() new Status( StatusCode.Unavailable, "")); - await delegateInterceptor.WriteResponseAsync(Etcd1,unavailableException); - await delegateInterceptor.WriteResponseAsync(Etcd2,unavailableException); - await delegateInterceptor.WriteResponseAsync(Etcd3,unavailableException); + await delegateInterceptor.WriteResponseAsync(Etcd1, Guid.Empty, unavailableException); + await delegateInterceptor.WriteResponseAsync(Etcd2, Guid.Empty,unavailableException); + await delegateInterceptor.WriteResponseAsync(Etcd3, Guid.Empty,unavailableException); LeaseGrantRequest request = new() { ID = 777, TTL = 777 }; var ex = Assert.Throws( @@ -148,7 +152,7 @@ public async Task AfterThreeExceptionsLeaseGrantedFail() [Test] public async Task LeaseKeepAliveRequestSendedAfterDelay() { - var delegateInterceptor = new DelegateInterceptor(); + var delegateInterceptor = new DelegateInterceptor(true); var client = new EtcdClient( connectionString: ConnectionString, interceptors: @@ -163,7 +167,7 @@ public async Task LeaseKeepAliveRequestSendedAfterDelay() var iterator = delegateInterceptor.ReadAllRequests(CancellationToken.None).GetAsyncEnumerator(); await iterator.MoveNextAsync(); Assert.AreEqual(iterator.Current.message.ID, 777); - await delegateInterceptor.WriteResponseAsync(iterator.Current.address,new LeaseKeepAliveResponse() { ID = 777, TTL = 1 }); + await delegateInterceptor.WriteResponseAsync(iterator.Current.address, Guid.Empty,new LeaseKeepAliveResponse() { ID = 777, TTL = 1 }); var nextKeepAliveTask = iterator.MoveNextAsync(); await Task.Delay(100); Assert.True(nextKeepAliveTask.IsCompleted == false); diff --git a/tests/Integration/Maitenance.cs b/tests/Integration/Maitenance.cs new file mode 100644 index 0000000..332eeb0 --- /dev/null +++ b/tests/Integration/Maitenance.cs @@ -0,0 +1,130 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using dotnet_etcd; +using Etcdserverpb; +using Google.Protobuf; +using Grpc.Core; +using Integration.Utils; +using NUnit.Framework; + +namespace Integration; + +public class Maitenance +{ + private const string Etcd1 = "127.0.0.1:23790"; + private const string Etcd2 = "127.0.0.1:23791"; + private const string Etcd3 = "127.0.0.1:23792"; + private const string ConnectionString = $"http://{Etcd1},http://{Etcd2},http://{Etcd3}"; + + [Test] + public async Task SnapshotTransferredConsistently() + { + var delegateInterceptor = new DelegateInterceptor(); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: delegateInterceptor); + List originalSnapshot = new() + { + new SnapshotResponse + { + RemainingBytes = 30, + Blob = ByteString.CopyFromUtf8("part1") + }, + new SnapshotResponse + { + RemainingBytes = 20, + Blob = ByteString.CopyFromUtf8("part2") + }, + new SnapshotResponse + { + RemainingBytes = 10, + Blob = ByteString.CopyFromUtf8("part3") + }, + }; + + SnapshotRequest request = new SnapshotRequest(); + List receivedSnapshot = new(); + var callTask = Task.Run( + () => client.Snapshot( + request, + rsp => + { + receivedSnapshot.Add(rsp); + }, + CancellationToken.None)); + await foreach (var (address, callId, message, _) in delegateInterceptor.ReadAllRequests(CancellationToken.None)) + { + foreach (var snapshotPart in originalSnapshot) + { + await delegateInterceptor.WriteResponseAsync(address, callId, snapshotPart); + } + + await delegateInterceptor.CloseResponseStreamAsync( + address, + callId); + break; + } + await callTask; + CollectionAssert.AreEqual(receivedSnapshot, originalSnapshot); + } + + [Test] + public async Task SnapshotTransferredConsistentlyThrowExceptions() + { + var delegateInterceptor = new DelegateInterceptor(ignoreCallId: true); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: delegateInterceptor); + + var snapshotPart1 = new SnapshotResponse + { + RemainingBytes = 30, + Blob = ByteString.CopyFromUtf8("part1") + }; + var snapshotPart2 = new SnapshotResponse + { + RemainingBytes = 20, + Blob = ByteString.CopyFromUtf8("part2") + }; + var snapshotPart3 = new SnapshotResponse + { + RemainingBytes = 10, + Blob = ByteString.CopyFromUtf8("part3") + }; + List originalSnapshot = new() + { + snapshotPart1, + snapshotPart2, + snapshotPart3, + }; + + RpcException unavailableException = new RpcException(new Status(StatusCode.Unavailable, "")); + await delegateInterceptor.WriteResponseAsync(Etcd1, Guid.Empty, snapshotPart1); + await delegateInterceptor.WriteResponseAsync(Etcd1, Guid.Empty, unavailableException); + await delegateInterceptor.CloseResponseStreamAsync(Etcd1, Guid.Empty); + await delegateInterceptor.WriteResponseAsync(Etcd2, Guid.Empty, snapshotPart1); + await delegateInterceptor.WriteResponseAsync(Etcd2, Guid.Empty, snapshotPart2); + await delegateInterceptor.WriteResponseAsync(Etcd2, Guid.Empty, unavailableException); + await delegateInterceptor.CloseResponseStreamAsync(Etcd2, Guid.Empty); + await delegateInterceptor.WriteResponseAsync(Etcd3, Guid.Empty, snapshotPart1); + await delegateInterceptor.WriteResponseAsync(Etcd3, Guid.Empty, snapshotPart2); + await delegateInterceptor.WriteResponseAsync(Etcd3, Guid.Empty, snapshotPart3); + await delegateInterceptor.CloseResponseStreamAsync(Etcd3, Guid.Empty); + + + SnapshotRequest request = new SnapshotRequest(); + List receivedSnapshot = new(); + await client.Snapshot( + request, + rsp => + { + receivedSnapshot.Add(rsp); + }, + CancellationToken.None); + + CollectionAssert.AreEqual(receivedSnapshot, originalSnapshot); + // test failed because current stream retry didnt correct + } +} \ No newline at end of file diff --git a/tests/Integration/MessageStore.cs b/tests/Integration/MessageStore.cs deleted file mode 100644 index 2753408..0000000 --- a/tests/Integration/MessageStore.cs +++ /dev/null @@ -1,142 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Channels; -using System.Threading.Tasks; -using Grpc.Core; -using NUnit.Framework; -using Polly; - -namespace Integration; - -public class MessageStore -{ - private readonly object _locker = new(); - private readonly List<(string address, object? message, Exception? exception)> _store = new(); - private readonly List> _readers = new(); - - public async Task WriteAsync(string address, object? message, Exception? exception = null) - { - await Task.Run( - () => - { - lock (_locker) - { - _store.Add((address, message, exception)); - foreach (Channel<(string addres, object? message, Exception? exception)> channel in _readers) - { - channel.Writer.TryWrite((address, message, exception)); - } - } - }); - } - - - public async IAsyncEnumerable<(string address, object? message, Exception? exception)> ReadMessages() - { - - Channel<(string addres,object? message, Exception? exception)>? channel = null; - try - { - lock (_locker) - { - channel = Channel.CreateUnbounded<(string addres,object? message, Exception? exception)>(); - _readers.Add(channel); - List<(string address, object? message, Exception? exception)> existingMessages = new(_store); - foreach ((string address, object? message, Exception? exception) in existingMessages) - { - channel.Writer.TryWrite((address, message, exception)); - } - } - - await foreach ((string addres, object? message, Exception? exception) in channel.Reader.ReadAllAsync()) - { - yield return (addres, message, exception); - } - } - finally - { - lock (_locker) - { - if (channel != null) - { - _readers.Remove(channel); - } - } - } - } - - public async IAsyncEnumerable<(object? message, Exception? exception)> ReadMessages(string address) - { - await foreach (var (addr, message, exception) in ReadMessages()) - { - if (addr == address) - { - yield return (message, exception); - } - } - } - - - - public IClientStreamWriter GetWriter(string address) - { - return new DelegateStreamWriter( - async message => await WriteAsync(address, message, null), - async () => - { - }); - } - - public IAsyncStreamReader GetReader(string address) - { - return new StreamReader(ReadMessages(address).GetAsyncEnumerator()); - } - - - - private class DelegateStreamWriter : IClientStreamWriter - { - private readonly Func _onWrite; - private readonly Func _onComplete; - - public DelegateStreamWriter(Func onWrite, Func onComplete) - { - _onWrite = onWrite; - _onComplete = onComplete; - } - - - public async Task WriteAsync(T message) - { - await _onWrite(message); - } - - - public WriteOptions? WriteOptions { get; set; } - public async Task CompleteAsync() - { - await _onComplete(); - } - } - - private class StreamReader : IAsyncStreamReader - { - private readonly IAsyncEnumerator<(object? message, Exception? exception)> _enumerator; - private readonly Func _getCurrentFunc; - - public StreamReader(IAsyncEnumerator<(object?, Exception?)> enumerator) - { - _enumerator = enumerator; - _getCurrentFunc = () => _enumerator.Current.exception == null - ? (T)_enumerator.Current.message! - : throw _enumerator.Current.exception; - } - public async Task MoveNext(CancellationToken cancellationToken) - { - return await _enumerator.MoveNextAsync(); - } - - public T Current => _getCurrentFunc(); - } -} diff --git a/tests/Integration/DelegateInterceptor.cs b/tests/Integration/Utils/DelegateInterceptor.cs similarity index 74% rename from tests/Integration/DelegateInterceptor.cs rename to tests/Integration/Utils/DelegateInterceptor.cs index 9774c32..a698013 100644 --- a/tests/Integration/DelegateInterceptor.cs +++ b/tests/Integration/Utils/DelegateInterceptor.cs @@ -1,7 +1,5 @@ using System; using System.Collections.Generic; -using System.IO; -using System.Linq; using System.Reflection; using System.Threading; using System.Threading.Tasks; @@ -9,29 +7,35 @@ using Grpc.Core.Interceptors; using Grpc.Net.Client; -namespace Integration; +namespace Integration.Utils; -public class DelegateInterceptor : Interceptor +internal class DelegateInterceptor : Interceptor where TReq : class where TRsp : class { private readonly MessageStore _requestsStore = new(); private readonly MessageStore _responsesStore = new(); + private readonly bool _ignoreCallId = false; + public DelegateInterceptor(bool ignoreCallId = false) + { + _ignoreCallId = ignoreCallId; + } public override AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, AsyncUnaryCallContinuation continuation) { ValidateCall(); - - + Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid(); string address = GetEtcdAdders(continuation); _requestsStore.WriteAsync( - address, + address, callId, request).Wait(); - var enumerator = _responsesStore.GetReader(address); + _requestsStore.Complete(address, callId); + var enumerator = _responsesStore.GetReader(address,callId); if (!enumerator.MoveNext().Result) throw new Exception("response required"); var response = enumerator.Current; + _responsesStore.Complete(address,callId); var call = new AsyncUnaryCall( responseAsync: Task.FromResult(response), responseHeadersAsync: Task.FromResult(new Metadata()), @@ -48,13 +52,15 @@ public override TResponse BlockingUnaryCall(TRequest reques BlockingUnaryCallContinuation continuation) { ValidateCall(); - + Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid(); string address = GetEtcdAdders(continuation); _requestsStore.WriteAsync( - address, + address,callId, request).Wait(); - var enumerator = _responsesStore.GetReader(address); + _requestsStore.Complete(address, callId); + var enumerator = _responsesStore.GetReader(address,callId); if (!enumerator.MoveNext().Result) throw new Exception("response required"); + _responsesStore.Complete(address,callId); return enumerator.Current; } @@ -63,10 +69,11 @@ public override AsyncClientStreamingCall AsyncClientStreami AsyncClientStreamingCallContinuation continuation) { ValidateCall(); + Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid(); string address = GetEtcdAdders(continuation); - var reader = _responsesStore.GetReader(address); + var reader = _responsesStore.GetReader(address,callId); var call = new AsyncClientStreamingCall( - requestStream: _requestsStore.GetWriter(address), + requestStream: _requestsStore.GetWriter(address, callId), responseHeadersAsync: Task.FromResult(new Metadata()), getStatusFunc: () => new Status( statusCode: StatusCode.OK, @@ -77,6 +84,7 @@ public override AsyncClientStreamingCall AsyncClientStreami async () => { await reader.MoveNext(); + _responsesStore.Complete(address,callId); return reader.Current; })); return call; @@ -88,12 +96,14 @@ public override AsyncServerStreamingCall AsyncServerStreamingCall continuation) { ValidateCall(); + Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid(); string address = GetEtcdAdders(continuation); _requestsStore.WriteAsync( - address, + address,callId, request).Wait(); + _requestsStore.Complete(address, callId); var call = new AsyncServerStreamingCall( - responseStream: _responsesStore.GetReader(address), + responseStream: _responsesStore.GetReader(address,callId), responseHeadersAsync: Task.FromResult(new Metadata()), getStatusFunc: () => new Status( statusCode: StatusCode.OK, @@ -109,11 +119,11 @@ public override AsyncDuplexStreamingCall AsyncDuplexStreami AsyncDuplexStreamingCallContinuation continuation) { ValidateCall(); + Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid(); string address = GetEtcdAdders(continuation); - AsyncDuplexStreamingCall call = new( - requestStream: _requestsStore.GetWriter(address), - responseStream: _responsesStore.GetReader(address), + requestStream: _requestsStore.GetWriter(address,callId), + responseStream: _responsesStore.GetReader(address,callId), responseHeadersAsync: Task.FromResult(new Metadata()), getStatusFunc: () => new Status( statusCode: StatusCode.OK, @@ -124,35 +134,35 @@ public override AsyncDuplexStreamingCall AsyncDuplexStreami } - public async Task WriteResponseAsync(string address, TRsp rsp) + public async Task WriteResponseAsync(string address,Guid callId, TRsp rsp) { - await _responsesStore.WriteAsync(address,rsp); + await _responsesStore.WriteAsync(address, callId,rsp); } - public async Task WriteResponseAsync(string address, Exception exception) + public async Task WriteResponseAsync(string address, Guid callId, Exception exception) { await _responsesStore.WriteAsync( - address, + address, callId, null, exception); } - public async Task CloseResponseStreamAsync(string address) + public async Task CloseResponseStreamAsync(string address, Guid callId) { - throw new NotImplementedException(); + _responsesStore.Complete(address,callId); } - public IAsyncEnumerable ReadAllRequests(string address, CancellationToken cancellationToken) + public IAsyncEnumerable ReadAllRequests(string address, Guid callId, CancellationToken cancellationToken) { - return _requestsStore.GetReader(address).ReadAllAsync(cancellationToken); + return _requestsStore.GetReader(address, callId).ReadAllAsync(cancellationToken); } - public async IAsyncEnumerable<(string address, TReq message)> ReadAllRequests(CancellationToken cancellationToken) + public async IAsyncEnumerable<(string address, Guid callId, TReq message, bool closed)> ReadAllRequests(CancellationToken cancellationToken) { - await foreach (var (address, message, exception) in _requestsStore.ReadMessages()) + await foreach (var (address, callId, message, exception, closed) in _requestsStore.ReadMessages()) { if (exception != null) throw exception; - yield return (address, (TReq)message!); + yield return (address, callId, (TReq)message!, closed); } } diff --git a/tests/Integration/Utils/MessageStore.cs b/tests/Integration/Utils/MessageStore.cs new file mode 100644 index 0000000..e490945 --- /dev/null +++ b/tests/Integration/Utils/MessageStore.cs @@ -0,0 +1,162 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Grpc.Core; +using NUnit.Framework; +using Polly; + +namespace Integration.Utils; + +internal class MessageStore +{ + private readonly object _locker = new(); + private readonly List<(string address, Guid callId, object? message, Exception? exception, bool closed)> _store = new(); + private readonly List> _readers = new(); + + + public async Task WriteAsync(string address, Guid callId, object? message, Exception? exception = null) => + await WriteAsync( + address, + callId, + message, + exception, + false); + + private async Task WriteAsync(string address, Guid callId, object? message, Exception? exception, bool closed) + { + await Task.Run( + () => + { + lock (_locker) + { + if (_store.Any(item => item.address == address && item.callId == callId && item.closed)) + throw new InvalidOperationException("call closed already"); + _store.Add((address, callId, message, exception, closed)); + foreach (Channel<(string addres, Guid callId, object? message, Exception? exception, bool closed)> channel in _readers) + { + channel.Writer.TryWrite((address, callId, message, exception, closed)); + } + } + }); + } + + public void Complete(string address, Guid callId) => + WriteAsync( + address, + callId, + null, + null, + true).Wait(); + + public async IAsyncEnumerable<(string address, Guid callId, object? message, Exception? exception, bool closed)> ReadMessages() + { + Channel<(string addres, Guid callId,object? message, Exception? exception, bool closed)>? channel = null; + try + { + lock (_locker) + { + channel = Channel.CreateUnbounded<(string addres, Guid callId, object? message, Exception? exception, bool closed)>(); + _readers.Add(channel); + List<(string address, Guid callId, object? message, Exception? exception, bool closed)> existingMessages = new(_store); + foreach ((string address, Guid callId, object? message, Exception? exception, bool closed) in existingMessages) + { + channel.Writer.TryWrite((address, callId, message, exception, closed)); + } + } + + await foreach ((string addres, Guid callId, object? message, Exception? exception, bool closed) in channel.Reader.ReadAllAsync()) + { + yield return (addres, callId, message, exception, closed); + } + } + finally + { + lock (_locker) + { + if (channel != null) + { + _readers.Remove(channel); + } + } + } + } + + public async IAsyncEnumerable<(object? message, Exception? exception)> ReadMessages(string address, Guid callId) + { + await foreach (var (addr, _callId, message, exception, closed) in ReadMessages()) + { + if (addr == address && callId == _callId) + { + if(closed) yield break; + yield return (message, exception); + } + } + } + + + + public IClientStreamWriter GetWriter(string address, Guid callId) + { + return new DelegateStreamWriter( + async message => await WriteAsync(address, callId, message, null), + async () => + { + Complete(address, callId); + }); + } + + public IAsyncStreamReader GetReader(string address, Guid callId) + { + return new StreamReader(ReadMessages(address, callId).GetAsyncEnumerator()); + } + + + + private class DelegateStreamWriter : IClientStreamWriter + { + private readonly Func _onWrite; + private readonly Func _onComplete; + + public DelegateStreamWriter(Func onWrite, Func onComplete) + { + _onWrite = onWrite; + _onComplete = onComplete; + } + + + public async Task WriteAsync(T message) + { + await _onWrite(message); + } + + + public WriteOptions? WriteOptions { get; set; } + public async Task CompleteAsync() + { + await _onComplete(); + } + } + + private class StreamReader : IAsyncStreamReader + { + private readonly IAsyncEnumerator<(object? message, Exception? exception)> _enumerator; + private readonly Func _getCurrentFunc; + + public StreamReader(IAsyncEnumerator<(object?, Exception?)> enumerator) + { + _enumerator = enumerator; + _getCurrentFunc = () => _enumerator.Current.exception == null + ? (T)_enumerator.Current.message! + : throw _enumerator.Current.exception; + } + public async Task MoveNext(CancellationToken cancellationToken) + { + return await _enumerator.MoveNextAsync(); + } + + public T Current => _getCurrentFunc(); + } +} From d3ce0673aa75d8df8433a1699e4b38519129d332 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D1=81=D0=B5=D0=B2=D0=BE=D0=BB=D0=BE=D0=B4?= Date: Mon, 24 Oct 2022 11:01:45 +0600 Subject: [PATCH 17/17] start fixes --- DevelopmentSandbox/Program.cs | 34 +++++++++++++++++-- .../Properties/launchSettings.json | 2 +- docker-compose.yml | 6 ++-- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/DevelopmentSandbox/Program.cs b/DevelopmentSandbox/Program.cs index f298bef..9266a37 100644 --- a/DevelopmentSandbox/Program.cs +++ b/DevelopmentSandbox/Program.cs @@ -31,8 +31,39 @@ private static async Task Main(string[] args) connectionString: connection_string, // handler: handler, useLegacyRpcExceptionForCancellation: false); - + + var txn = new TxnRequest(); + for (int i = 0; i < 120; i++) + { + txn.Success.Add(new RequestOp() + { + RequestPut = new PutRequest() + { + Key = ByteString.CopyFromUtf8( "asdfadsfasdfdsf"+i), + Value = ByteString.CopyFromUtf8("dfasdvasdfasdf") + + } + }); + } + + while (true) + { + try + { + await client.TransactionAsync( + txn, + deadline: DateTime.UtcNow.AddMilliseconds(10)); + + Console.WriteLine("ok"); + } + catch (Exception e) + { + Console.WriteLine("fail"); + } + } + + Func doJob = async () => { var leaseId = client.LeaseGrant(new LeaseGrantRequest() { TTL = 5 }).ID; @@ -43,7 +74,6 @@ await client.HighlyReliableLeaseKeepAliveAsync( maxRetryBackoffMs: 400, sleepAfterSuccessMs: 5000 / 3, cts.Token).ConfigureAwait(false); - // await client.LeaseKeepAlive( // leaseId, // CancellationToken.None).ConfigureAwait(false); diff --git a/DevelopmentSandbox/Properties/launchSettings.json b/DevelopmentSandbox/Properties/launchSettings.json index 40973bc..dcf8593 100644 --- a/DevelopmentSandbox/Properties/launchSettings.json +++ b/DevelopmentSandbox/Properties/launchSettings.json @@ -4,7 +4,7 @@ "DevelopmentSandbox": { "commandName": "Project", "environmentVariables": { - "ETCD_CONNECTION_STRING": "http://127.0.0.1:23790,http://127.0.0.1:23790,http://127.0.0.1:23790" + "ETCD_CONNECTION_STRING": "http://127.0.0.1:23790,http://127.0.0.1:23791,http://127.0.0.1:23792" } } } diff --git a/docker-compose.yml b/docker-compose.yml index 753d2ff..fc92529 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: '3.5' services: etcd0: - image: "gcr.io/etcd-development/etcd:v3.5.0-beta.4" + image: "gcr.io/etcd-development/etcd:v3.5.4" ports: - "23790:2379" - "2379:2379" @@ -20,7 +20,7 @@ services: ] etcd1: - image: "gcr.io/etcd-development/etcd:v3.5.0-beta.4" + image: "gcr.io/etcd-development/etcd:v3.5.4" ports: - "23791:2379" command: @@ -37,7 +37,7 @@ services: ] etcd2: - image: "gcr.io/etcd-development/etcd:v3.5.0-beta.4" + image: "gcr.io/etcd-development/etcd:v3.5.4" ports: - "23792:2379" command: