diff --git a/DevelopmentSandbox/DevelopmentSandbox.csproj b/DevelopmentSandbox/DevelopmentSandbox.csproj
new file mode 100644
index 0000000..961e278
--- /dev/null
+++ b/DevelopmentSandbox/DevelopmentSandbox.csproj
@@ -0,0 +1,21 @@
+
+
+
+ Exe
+ net6.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/DevelopmentSandbox/Program.cs b/DevelopmentSandbox/Program.cs
new file mode 100644
index 0000000..9266a37
--- /dev/null
+++ b/DevelopmentSandbox/Program.cs
@@ -0,0 +1,115 @@
+using dotnet_etcd;
+using Etcdserverpb;
+using Google.Protobuf;
+using Grpc.Core;
+using Integration;
+using Serilog;
+using Serilog.Sinks.SystemConsole.Themes;
+
+namespace DevelopmentSandbox; // Note: actual namespace depends on the project name.
+
+
+
+internal class Program
+{
+ private static async Task Main(string[] args)
+ {
+ var cts = new CancellationTokenSource();
+ AppDomain.CurrentDomain.ProcessExit += (_, _) => { cts.Cancel(); };
+ Console.CancelKeyPress += (_, ea) => { cts.Cancel(); };
+ ILogger logger = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Console(
+ theme: SystemConsoleTheme.Literate,
+ outputTemplate:
+ "[{Timestamp:HH:mm:ss.fff} {Level:u3}] {Message:lj}{NewLine}{Exception}{Properties:j}{NewLine}")
+ .Enrich.FromLogContext()
+ .CreateLogger();
+
+ var connection_string = Environment.GetEnvironmentVariable("ETCD_CONNECTION_STRING");
+
+
+ var client = new EtcdClient(
+ connectionString: connection_string,
+ // handler: handler,
+ useLegacyRpcExceptionForCancellation: false);
+
+
+ var txn = new TxnRequest();
+ for (int i = 0; i < 120; i++)
+ {
+ txn.Success.Add(new RequestOp()
+ {
+ RequestPut = new PutRequest()
+ {
+ Key = ByteString.CopyFromUtf8( "asdfadsfasdfdsf"+i),
+ Value = ByteString.CopyFromUtf8("dfasdvasdfasdf")
+
+ }
+ });
+ }
+
+ while (true)
+ {
+ try
+ {
+ await client.TransactionAsync(
+ txn,
+ deadline: DateTime.UtcNow.AddMilliseconds(10));
+
+ Console.WriteLine("ok");
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("fail");
+ }
+ }
+
+
+ Func doJob = async () =>
+ {
+ var leaseId = client.LeaseGrant(new LeaseGrantRequest() { TTL = 5 }).ID;
+ await client.HighlyReliableLeaseKeepAliveAsync(
+ leaseId,
+ 5,
+ retryDurationMs: 1000,
+ maxRetryBackoffMs: 400,
+ sleepAfterSuccessMs: 5000 / 3,
+ cts.Token).ConfigureAwait(false);
+ // await client.LeaseKeepAlive(
+ // leaseId,
+ // CancellationToken.None).ConfigureAwait(false);
+ };
+
+ List jobs = new List(1000);
+
+ foreach (var i in Enumerable.Range(
+ 0,
+ 20000))
+ {
+
+ await Task.Delay(5); //что бы кипалайвы были в приоритете создания новых тасков
+ if (cts.Token.IsCancellationRequested)
+ {
+ break;
+ }
+
+ var t = Task.Run(
+ async () =>
+ {
+ cts.Token.ThrowIfCancellationRequested();
+ Console.WriteLine(i);
+ try
+ {
+ await doJob().ConfigureAwait(false);
+ }
+ finally
+ {
+ cts.Cancel();
+ }
+ },
+ cts.Token);
+ jobs.Add(t);
+ }
+
+ await await Task.WhenAny(jobs);
+ }
+}
\ No newline at end of file
diff --git a/DevelopmentSandbox/Properties/launchSettings.json b/DevelopmentSandbox/Properties/launchSettings.json
new file mode 100644
index 0000000..dcf8593
--- /dev/null
+++ b/DevelopmentSandbox/Properties/launchSettings.json
@@ -0,0 +1,11 @@
+{
+ "$schema": "http://json.schemastore.org/launchsettings.json",
+ "profiles": {
+ "DevelopmentSandbox": {
+ "commandName": "Project",
+ "environmentVariables": {
+ "ETCD_CONNECTION_STRING": "http://127.0.0.1:23790,http://127.0.0.1:23791,http://127.0.0.1:23792"
+ }
+ }
+ }
+}
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..fc92529
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,54 @@
+version: '3.5'
+
+services:
+ etcd0:
+ image: "gcr.io/etcd-development/etcd:v3.5.4"
+ ports:
+ - "23790:2379"
+ - "2379:2379"
+ command:
+ [
+ "etcd",
+ "--name=etcd0",
+ "--advertise-client-urls=http://etcd0:2379",
+ "--listen-client-urls=http://0.0.0.0:2379",
+ "--initial-advertise-peer-urls=http://etcd0:2380",
+ "--listen-peer-urls=http://0.0.0.0:2380",
+ "--initial-cluster-token=etcd-cluster-1",
+ "--initial-cluster=etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380",
+ "--initial-cluster-state=new"
+ ]
+
+ etcd1:
+ image: "gcr.io/etcd-development/etcd:v3.5.4"
+ ports:
+ - "23791:2379"
+ command:
+ [
+ "etcd",
+ "--name=etcd1",
+ "--advertise-client-urls=http://etcd1:2379",
+ "--listen-client-urls=http://0.0.0.0:2379",
+ "--initial-advertise-peer-urls=http://etcd1:2380",
+ "--listen-peer-urls=http://0.0.0.0:2380",
+ "--initial-cluster-token=etcd-cluster-1",
+ "--initial-cluster=etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380",
+ "--initial-cluster-state=new"
+ ]
+
+ etcd2:
+ image: "gcr.io/etcd-development/etcd:v3.5.4"
+ ports:
+ - "23792:2379"
+ command:
+ [
+ "etcd",
+ "--name=etcd2",
+ "--advertise-client-urls=http://etcd2:2379",
+ "--listen-client-urls=http://0.0.0.0:2379",
+ "--initial-advertise-peer-urls=http://etcd2:2380",
+ "--listen-peer-urls=http://0.0.0.0:2380",
+ "--initial-cluster-token=etcd-cluster-1",
+ "--initial-cluster=etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380",
+ "--initial-cluster-state=new"
+ ]
diff --git a/dotnet-etcd.sln b/dotnet-etcd.sln
index d9eb707..223fb8a 100644
--- a/dotnet-etcd.sln
+++ b/dotnet-etcd.sln
@@ -9,6 +9,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{8645A28E
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Integration", "tests\Integration\Integration.csproj", "{617202A0-4D3A-4FA7-978E-B5A01A3152BD}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DevelopmentSandbox", "DevelopmentSandbox\DevelopmentSandbox.csproj", "{41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -23,6 +25,10 @@ Global
{617202A0-4D3A-4FA7-978E-B5A01A3152BD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{617202A0-4D3A-4FA7-978E-B5A01A3152BD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{617202A0-4D3A-4FA7-978E-B5A01A3152BD}.Release|Any CPU.Build.0 = Release|Any CPU
+ {41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/dotnet-etcd/LeaseExpiredOrNotFoundException.cs b/dotnet-etcd/LeaseExpiredOrNotFoundException.cs
new file mode 100644
index 0000000..cf61ff1
--- /dev/null
+++ b/dotnet-etcd/LeaseExpiredOrNotFoundException.cs
@@ -0,0 +1,42 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System;
+using System.Runtime.Serialization;
+
+namespace dotnet_etcd
+{
+ [Serializable]
+ public class LeaseExpiredOrNotFoundException : Exception
+ {
+ private readonly long _leaseId;
+
+ public LeaseExpiredOrNotFoundException(long leaseId) : base("LeaseExpiredOrNotFoundException: leaseId=" + leaseId)
+ {
+ _leaseId = leaseId;
+ }
+
+ public LeaseExpiredOrNotFoundException(long leaseId, string message) : base(message)
+ {
+ _leaseId = leaseId;
+ }
+
+ public LeaseExpiredOrNotFoundException(long leaseId, string message, Exception inner) : base(
+ message,
+ inner)
+ {
+ _leaseId = leaseId;
+ }
+
+ protected LeaseExpiredOrNotFoundException(
+ SerializationInfo info,
+ StreamingContext context) : base(
+ info,
+ context)
+ {
+ info.AddValue(
+ name: "leaseId",
+ value: _leaseId);
+ }
+ }
+}
diff --git a/dotnet-etcd/dotnet-etcd.csproj b/dotnet-etcd/dotnet-etcd.csproj
index ae50c36..601bc7f 100644
--- a/dotnet-etcd/dotnet-etcd.csproj
+++ b/dotnet-etcd/dotnet-etcd.csproj
@@ -66,6 +66,9 @@ Advanced uses take advantage of the consistency guarantees to implement database
all
runtime; build; native; contentfiles; analyzers
+
+
+
diff --git a/dotnet-etcd/etcdClient.cs b/dotnet-etcd/etcdClient.cs
index de14ea7..9871a90 100644
--- a/dotnet-etcd/etcdClient.cs
+++ b/dotnet-etcd/etcdClient.cs
@@ -31,7 +31,7 @@ public partial class EtcdClient : IDisposable, IEtcdClient
#region Initializers
public EtcdClient(string connectionString, int port = 2379,
- HttpClientHandler handler = null, bool ssl = false,
+ HttpMessageHandler handler = null, bool ssl = false,
bool useLegacyRpcExceptionForCancellation = false, params Interceptor[] interceptors)
{
if (string.IsNullOrWhiteSpace(connectionString))
diff --git a/dotnet-etcd/leaseClient.cs b/dotnet-etcd/leaseClient.cs
index 5e41452..136bf85 100644
--- a/dotnet-etcd/leaseClient.cs
+++ b/dotnet-etcd/leaseClient.cs
@@ -2,13 +2,23 @@
// The .NET Foundation licenses this file to you under the MIT license.
using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
using System.IO;
+using System.Linq;
+using System.Net.Http.Headers;
+using System.Runtime.InteropServices;
using System.Threading;
+using System.Threading.Channels;
using System.Threading.Tasks;
-
+using dotnet_etcd.multiplexer;
using Etcdserverpb;
+using Polly;
using Grpc.Core;
+using Polly.Contrib.WaitAndRetry;
+using Polly.Timeout;
namespace dotnet_etcd
{
@@ -109,6 +119,203 @@ public async Task LeaseKeepAlive(long leaseId, CancellationToken cancellationTok
}
}).ConfigureAwait(false);
+
+ ///
+ /// HighlyReliableLeaseKeepAlive keeps lease alive by sending keep alive requests and receiving keep alive responses.
+ /// Reliability is achieved by sequentially sending keep alive requests at short intervals to all etcd nodes
+ ///
+ /// lease identifier
+ /// the remaining TTL at the time the method was called. used to determine initial deadlines
+ ///
+ ///
+ ///
+ ///
+ /// throws the exception if lease not found, expired, revoked or keep alive unsuccessfully
+ /// is received within the lease TTL or
+ public async Task HighlyReliableLeaseKeepAliveAsync(long leaseId, long leaseRemainigTTL,
+ int retryDurationMs, int maxRetryBackoffMs, int sleepAfterSuccessMs, CancellationToken cancellationToken)
+ {
+ int startNodeIndex = (new Random()).Next(_balancer._numNodes);
+ while (true) // keepAlive rounds
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ var roundCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ int usedKeepAliveJobs = 0;
+ int delayBetweenUseNewKeepAliveJob = retryDurationMs / _balancer._numNodes;
+ startNodeIndex = ++startNodeIndex >= _balancer._numNodes ? 0 : startNodeIndex;
+ DateTime leaseExpiredAt = DateTime.UtcNow.AddSeconds(leaseRemainigTTL);
+ List> keepAliveJobs = new List>();
+ while (usedKeepAliveJobs < _balancer._numNodes)
+ {
+ usedKeepAliveJobs++;
+ roundCancellationTokenSource.Token.ThrowIfCancellationRequested();
+ //todo: вынести работу с выбором ноды в отдельный метод
+ int currentNodeIndex = startNodeIndex + usedKeepAliveJobs;
+ currentNodeIndex = currentNodeIndex >= _balancer._numNodes
+ ? currentNodeIndex - _balancer._numNodes
+ : currentNodeIndex;
+ Connection connection = _balancer._healthyNode.ElementAt(currentNodeIndex);
+ Task keepAliveJob = RetryUntilKeepAliveResponseAsync(
+ leaseId,
+ connection,
+ retryDurationMs,
+ maxRetryBackoffMs,
+ leaseExpiredAt,
+ roundCancellationTokenSource.Token);
+ keepAliveJobs.Add(keepAliveJob);
+
+ await WhenAnySuccessLimitedAsync(
+ keepAliveJobs,
+ waitLimitMs: delayBetweenUseNewKeepAliveJob,
+ cancellationToken: roundCancellationTokenSource.Token).ConfigureAwait(false);
+
+ if (IsAnyCompletedSuccessfully(keepAliveJobs, out var _))
+ {
+ roundCancellationTokenSource.Cancel();
+ break;
+ }
+ }
+
+ try
+ {
+ //todo: не понятно что дедлайн лизы заложен внутри джоб, подумать как сделать проще
+ await Task.WhenAll(keepAliveJobs.ToArray()).ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ // ignored exceptions will handled later
+ }
+
+ if (IsAnyCompletedSuccessfully(
+ keepAliveJobs,
+ out var keepAliveResponse)
+ && keepAliveResponse.TTL > 0)
+ {
+
+ await Task.Delay(
+ sleepAfterSuccessMs,
+ cancellationToken).ConfigureAwait(false);
+ leaseRemainigTTL = Math.Max(0,keepAliveResponse.TTL - sleepAfterSuccessMs / 1000);
+ continue; //go to next round
+ }
+ //lease not found, expired or revoked or keep alive unsuccessfully
+ List exceptions = new List()
+ {
+ new LeaseExpiredOrNotFoundException(leaseId),
+ };
+ exceptions.AddRange(keepAliveJobs
+ .SelectMany(job => job.Exception?.InnerExceptions)
+ .Where(exception=>exception != null)); // collect all exceptions
+ throw new AggregateException(exceptions);
+ }
+
+ async Task WhenAnySuccessLimitedAsync(IEnumerable tasks,int waitLimitMs, CancellationToken cancellationToken)
+ {
+ List runningTasks = tasks?.ToList() ?? new List();
+ Task waitLimit = Task.Delay(
+ waitLimitMs,
+ cancellationToken);
+ while (runningTasks.Count > 0)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ Task completedTask = await Task.WhenAny(runningTasks.Append(waitLimit)).ConfigureAwait(false);
+ if (completedTask.IsCompletedSuccessfully || completedTask == waitLimit)
+ {
+ return;
+ }
+ runningTasks.Remove(completedTask);
+ }
+ }
+
+
+
+ async Task RetryUntilKeepAliveResponseAsync(long leaseId, Connection connection,
+ int retryDurationMs, int maxRetryBackoffMs, DateTime deadline, CancellationToken cancellationToken)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ // timeoutPolicy thrown own exception, that overlap retry exceptions,
+ // so this list used for catch the retry exceptions.
+ List retryExceptions = new List();
+ var timeoutPolicy = Policy.TimeoutAsync(deadline - DateTime.UtcNow);
+
+ TimeSpan maxRetryBackoff = TimeSpan.FromMilliseconds(maxRetryBackoffMs);
+ var retryDelay =
+ Backoff.DecorrelatedJitterBackoffV2(
+ fastFirst: true,
+ medianFirstRetryDelay: TimeSpan.FromMilliseconds(100), //todo: вынести в параметры
+ retryCount: int.MaxValue)
+ .Select(s => s < maxRetryBackoff ? s : maxRetryBackoff);
+ var retryPolicy = Policy
+ //retry on all exceptions except LeaseExpiredOrNotFoundException
+ .Handle()
+ .WaitAndRetryAsync(
+ retryDelay,
+ onRetry: (exception, _) => retryExceptions.Add(exception));
+ var retryTimeoutPolicy = Policy.TimeoutAsync(TimeSpan.FromMilliseconds(retryDurationMs));
+ var policy =
+ Policy.WrapAsync(
+ timeoutPolicy,
+ retryPolicy,
+ retryTimeoutPolicy);
+ try
+ {
+ LeaseKeepAliveResponse response = await policy.ExecuteAsync(
+ continueOnCapturedContext: false,
+ cancellationToken: cancellationToken,
+ action: async retryCancellationToken =>
+ {
+ retryCancellationToken.ThrowIfCancellationRequested();
+ using AsyncDuplexStreamingCall leaser =
+ connection._leaseClient
+ .LeaseKeepAlive(cancellationToken: retryCancellationToken);
+ // ReSharper disable once MethodSupportsCancellation //method doesn't support cancellation
+ await leaser.RequestStream.WriteAsync(
+ new LeaseKeepAliveRequest() { ID = leaseId });
+ bool result = await leaser.ResponseStream.MoveNext(retryCancellationToken)
+ .ConfigureAwait(false);
+ if (!result)
+ {
+ throw new RpcException(
+ new Status(
+ StatusCode.Aborted,
+ "didnt receive keepAlive response"));
+ }
+
+ await leaser.RequestStream.CompleteAsync();
+ return leaser.ResponseStream.Current;
+ }).ConfigureAwait(false);
+ return response;
+ }
+ catch (TimeoutRejectedException e)
+ {
+ throw new AggregateException(
+ retryExceptions
+ .Append(e)
+ .Reverse());
+ }
+ }
+
+ bool IsAnyCompletedSuccessfully(IEnumerable> tasks,
+ out T response)
+ {
+ foreach (Task call in tasks)
+ {
+ if (call.IsCompletedSuccessfully)
+ {
+ response = call.Result;
+ return true;
+ }
+ }
+
+ response = default;
+ return false;
+ }
+
+ }
+
+
+
+
///
/// LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client
/// to the server and streaming keep alive responses from the server to the client.
@@ -273,4 +480,4 @@ public async Task LeaseTimeToLiveAsync(LeaseTimeToLiveR
CancellationToken cancellationToken = default) => await CallEtcdAsync(async (connection) => await connection._leaseClient
.LeaseTimeToLiveAsync(request, headers, deadline, cancellationToken)).ConfigureAwait(false);
}
-}
\ No newline at end of file
+}
diff --git a/dotnet-etcd/multiplexer/Balancer.cs b/dotnet-etcd/multiplexer/Balancer.cs
index 7ac998d..aec4303 100644
--- a/dotnet-etcd/multiplexer/Balancer.cs
+++ b/dotnet-etcd/multiplexer/Balancer.cs
@@ -18,7 +18,7 @@ namespace dotnet_etcd.multiplexer
internal class Balancer
{
- private readonly HashSet _healthyNode;
+ internal readonly HashSet _healthyNode;
///
/// No of etcd nodes
@@ -36,7 +36,7 @@ internal class Balancer
private static readonly Random s_random = new Random();
- internal Balancer(List nodes, HttpClientHandler handler = null, bool ssl = false,
+ internal Balancer(List nodes, HttpMessageHandler handler = null, bool ssl = false,
bool useLegacyRpcExceptionForCancellation = false, params Interceptor[] interceptors)
{
_numNodes = nodes.Count;
@@ -57,6 +57,7 @@ internal Balancer(List nodes, HttpClientHandler handler = null, bool ssl =
ThrowOperationCanceledOnCancellation = !useLegacyRpcExceptionForCancellation
});
}
+
else
{
#if NETCOREAPP3_1 || NETCOREAPP3_0
diff --git a/tests/Integration/DevelopmentProcessTests.cs b/tests/Integration/DevelopmentProcessTests.cs
deleted file mode 100644
index b9fa69f..0000000
--- a/tests/Integration/DevelopmentProcessTests.cs
+++ /dev/null
@@ -1,32 +0,0 @@
-using System.Threading.Tasks;
-using dotnet_etcd;
-using DotnetNiceGrpcLogs;
-using Etcdserverpb;
-using NUnit.Framework;
-using Serilog;
-using Serilog.Sinks.SystemConsole.Themes;
-
-namespace Integration;
-
-public class DevelopmentProcessTests
-{
-
-
- [SetUp]
- public async Task Setup()
- {
- await Framework.CleanEtcdTestsKeys();
- }
-
- [TearDown]
- public async Task TearDownAsync()
- {
- await Framework.CleanEtcdTestsKeys();
- }
-
- //debug space here
- [Test]
- public async Task Test1()
- {
- }
-}
\ No newline at end of file
diff --git a/tests/Integration/Framework.cs b/tests/Integration/Framework.cs
deleted file mode 100644
index d54a6cb..0000000
--- a/tests/Integration/Framework.cs
+++ /dev/null
@@ -1,35 +0,0 @@
-using System.Threading.Tasks;
-using dotnet_etcd;
-using DotnetNiceGrpcLogs;
-using Serilog;
-using Serilog.Sinks.SystemConsole.Themes;
-
-namespace Integration;
-
-public static class Framework
-{
- public const string TestPrefix = "/Tests/";
-
- public static ILogger Logger { get; } = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Console(
- theme: SystemConsoleTheme.Literate,
- outputTemplate:
- "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}{Properties:j}{NewLine}")
- .Enrich.FromLogContext()
- .CreateLogger();
-
- public static EtcdClient Client { get; } = new EtcdClient(
- "http://localhost:23790,http://localhost:23791,http://localhost:23792", //todo: вытащить в конфигурацию
- useLegacyRpcExceptionForCancellation: false,
- interceptors: new GrpcLogsInterceptor(
- Logger,
- new LogsInterceptorOptions
- {
- //LoggerName = null,
- IncludeLogData = true
- }));
-
- public static async Task CleanEtcdTestsKeys()
- {
- await Client.DeleteRangeAsync(TestPrefix);
- }
-}
\ No newline at end of file
diff --git a/tests/Integration/Lease.cs b/tests/Integration/Lease.cs
new file mode 100644
index 0000000..82eb293
--- /dev/null
+++ b/tests/Integration/Lease.cs
@@ -0,0 +1,177 @@
+using System;
+using System.Data.Common;
+using System.Diagnostics;
+using System.Net;
+using System.Net.Http;
+using System.Security.Authentication;
+using System.Threading;
+using System.Threading.Tasks;
+using dotnet_etcd;
+using DotnetNiceGrpcLogs;
+using Etcdserverpb;
+using Grpc.Core;
+using Integration.Utils;
+using NUnit.Framework;
+using Polly;
+
+namespace Integration;
+
+public class Lease
+{
+ private const string Etcd1 = "127.0.0.1:23790";
+ private const string Etcd2 = "127.0.0.1:23791";
+ private const string Etcd3 = "127.0.0.1:23792";
+ private const string ConnectionString = $"http://{Etcd1},http://{Etcd2},http://{Etcd3}";
+
+ [SetUp]
+ public async Task Setup()
+ {
+ }
+
+ [TearDown]
+ public async Task TearDownAsync()
+ {
+ }
+
+
+ [Test]
+ public async Task LeaseGranted()
+ {
+ var delegateInterceptor = new DelegateInterceptor();
+ var client = new EtcdClient(
+ connectionString: ConnectionString,
+ interceptors: delegateInterceptor);
+ LeaseGrantRequest request = new() { ID = 777, TTL = 777 };
+ LeaseGrantResponse response = new() { ID = 888, TTL = 888 };
+ var responseTask = Task.Run(() =>
+ {
+ return client.LeaseGrant(request);
+ });
+ await foreach ((string address, Guid callId, LeaseGrantRequest message, bool _) in delegateInterceptor.ReadAllRequests(CancellationToken.None))
+ {
+ Assert.AreEqual(message, request);
+ await delegateInterceptor.WriteResponseAsync(address, callId,response);
+ break;
+ }
+
+ var rsp = responseTask.Result;
+ Assert.AreEqual(rsp, response);
+ }
+
+ [Test]
+ public async Task LeaseGrantedAsync()
+ {
+ var delegateInterceptor = new DelegateInterceptor();
+ var client = new EtcdClient(
+ connectionString: ConnectionString,
+ interceptors: delegateInterceptor);
+ LeaseGrantRequest request = new() { ID = 777, TTL = 777 };
+ LeaseGrantResponse response = new() { ID = 888, TTL = 888 };
+ var responseTask = Task.Run(() =>
+ {
+ return client.LeaseGrantAsync(request);
+ });
+ await foreach ((string address, Guid callId, LeaseGrantRequest message, bool _) in delegateInterceptor.ReadAllRequests(CancellationToken.None))
+ {
+ Assert.AreEqual(message, request);
+ await delegateInterceptor.WriteResponseAsync(address, callId,response);
+ break;
+ }
+
+ var rsp = responseTask.Result;
+ Assert.AreEqual(rsp, response);
+ }
+
+ [Test]
+ public async Task AfterExceptionsLeaseGranted()
+ {
+ var delegateInterceptor = new DelegateInterceptor();
+ var client = new EtcdClient(
+ connectionString: ConnectionString,
+ interceptors: delegateInterceptor);
+ LeaseGrantRequest request = new() { ID = 777, TTL = 777 };
+ LeaseGrantResponse response = new() { ID = 888, TTL = 888 };
+ var responseTask = Task.Run(() =>
+ {
+ return client.LeaseGrantAsync(request);
+ });
+ RpcException unavailableException = new RpcException(
+ new Status(
+ StatusCode.Unavailable,
+ ""));
+
+ var iterator = delegateInterceptor.ReadAllRequests(CancellationToken.None).GetAsyncEnumerator();
+ await iterator.MoveNextAsync();
+ var current = iterator.Current;
+ await delegateInterceptor.WriteResponseAsync(current.address, current.callId,unavailableException);
+ await iterator.MoveNextAsync();
+ current = iterator.Current;
+ await delegateInterceptor.WriteResponseAsync(current.address, current.callId,unavailableException);
+ await iterator.MoveNextAsync();
+ current = iterator.Current;
+ await delegateInterceptor.WriteResponseAsync(current.address, current.callId,response);
+ var rsp = responseTask.Result;
+
+ Assert.AreEqual(rsp, response);
+ }
+
+ [Test]
+ public async Task AfterThreeExceptionsLeaseGrantedFail()
+ {
+ var delegateInterceptor = new DelegateInterceptor(ignoreCallId: true);
+ var client = new EtcdClient(
+ connectionString: ConnectionString,
+ interceptors:
+ delegateInterceptor);
+
+ RpcException unavailableException = new RpcException(
+ new Status(
+ StatusCode.Unavailable,
+ ""));
+ await delegateInterceptor.WriteResponseAsync(Etcd1, Guid.Empty, unavailableException);
+ await delegateInterceptor.WriteResponseAsync(Etcd2, Guid.Empty,unavailableException);
+ await delegateInterceptor.WriteResponseAsync(Etcd3, Guid.Empty,unavailableException);
+ LeaseGrantRequest request = new() { ID = 777, TTL = 777 };
+
+ var ex = Assert.Throws(
+ () =>
+ {
+ try
+ {
+ client.LeaseGrantAsync(request).Wait();
+ }
+ catch (AggregateException e)
+ {
+
+ throw e.InnerException;
+ }
+ });
+ Assert.That(ex.Status.StatusCode, Is.EqualTo(StatusCode.Unavailable));
+ }
+
+ [Test]
+ public async Task LeaseKeepAliveRequestSendedAfterDelay()
+ {
+ var delegateInterceptor = new DelegateInterceptor(true);
+ var client = new EtcdClient(
+ connectionString: ConnectionString,
+ interceptors:
+ delegateInterceptor);
+ var responseTask = Task.Run(
+ () =>
+ {
+ return client.LeaseKeepAlive(
+ 777,
+ CancellationToken.None);
+ });
+ var iterator = delegateInterceptor.ReadAllRequests(CancellationToken.None).GetAsyncEnumerator();
+ await iterator.MoveNextAsync();
+ Assert.AreEqual(iterator.Current.message.ID, 777);
+ await delegateInterceptor.WriteResponseAsync(iterator.Current.address, Guid.Empty,new LeaseKeepAliveResponse() { ID = 777, TTL = 1 });
+ var nextKeepAliveTask = iterator.MoveNextAsync();
+ await Task.Delay(100);
+ Assert.True(nextKeepAliveTask.IsCompleted == false);
+ await Task.Delay(300);
+ Assert.True(nextKeepAliveTask.Result);
+ }
+}
diff --git a/tests/Integration/Maitenance.cs b/tests/Integration/Maitenance.cs
new file mode 100644
index 0000000..332eeb0
--- /dev/null
+++ b/tests/Integration/Maitenance.cs
@@ -0,0 +1,130 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using dotnet_etcd;
+using Etcdserverpb;
+using Google.Protobuf;
+using Grpc.Core;
+using Integration.Utils;
+using NUnit.Framework;
+
+namespace Integration;
+
+public class Maitenance
+{
+ private const string Etcd1 = "127.0.0.1:23790";
+ private const string Etcd2 = "127.0.0.1:23791";
+ private const string Etcd3 = "127.0.0.1:23792";
+ private const string ConnectionString = $"http://{Etcd1},http://{Etcd2},http://{Etcd3}";
+
+ [Test]
+ public async Task SnapshotTransferredConsistently()
+ {
+ var delegateInterceptor = new DelegateInterceptor();
+ var client = new EtcdClient(
+ connectionString: ConnectionString,
+ interceptors: delegateInterceptor);
+ List originalSnapshot = new()
+ {
+ new SnapshotResponse
+ {
+ RemainingBytes = 30,
+ Blob = ByteString.CopyFromUtf8("part1")
+ },
+ new SnapshotResponse
+ {
+ RemainingBytes = 20,
+ Blob = ByteString.CopyFromUtf8("part2")
+ },
+ new SnapshotResponse
+ {
+ RemainingBytes = 10,
+ Blob = ByteString.CopyFromUtf8("part3")
+ },
+ };
+
+ SnapshotRequest request = new SnapshotRequest();
+ List receivedSnapshot = new();
+ var callTask = Task.Run(
+ () => client.Snapshot(
+ request,
+ rsp =>
+ {
+ receivedSnapshot.Add(rsp);
+ },
+ CancellationToken.None));
+ await foreach (var (address, callId, message, _) in delegateInterceptor.ReadAllRequests(CancellationToken.None))
+ {
+ foreach (var snapshotPart in originalSnapshot)
+ {
+ await delegateInterceptor.WriteResponseAsync(address, callId, snapshotPart);
+ }
+
+ await delegateInterceptor.CloseResponseStreamAsync(
+ address,
+ callId);
+ break;
+ }
+ await callTask;
+ CollectionAssert.AreEqual(receivedSnapshot, originalSnapshot);
+ }
+
+ [Test]
+ public async Task SnapshotTransferredConsistentlyThrowExceptions()
+ {
+ var delegateInterceptor = new DelegateInterceptor(ignoreCallId: true);
+ var client = new EtcdClient(
+ connectionString: ConnectionString,
+ interceptors: delegateInterceptor);
+
+ var snapshotPart1 = new SnapshotResponse
+ {
+ RemainingBytes = 30,
+ Blob = ByteString.CopyFromUtf8("part1")
+ };
+ var snapshotPart2 = new SnapshotResponse
+ {
+ RemainingBytes = 20,
+ Blob = ByteString.CopyFromUtf8("part2")
+ };
+ var snapshotPart3 = new SnapshotResponse
+ {
+ RemainingBytes = 10,
+ Blob = ByteString.CopyFromUtf8("part3")
+ };
+ List originalSnapshot = new()
+ {
+ snapshotPart1,
+ snapshotPart2,
+ snapshotPart3,
+ };
+
+ RpcException unavailableException = new RpcException(new Status(StatusCode.Unavailable, ""));
+ await delegateInterceptor.WriteResponseAsync(Etcd1, Guid.Empty, snapshotPart1);
+ await delegateInterceptor.WriteResponseAsync(Etcd1, Guid.Empty, unavailableException);
+ await delegateInterceptor.CloseResponseStreamAsync(Etcd1, Guid.Empty);
+ await delegateInterceptor.WriteResponseAsync(Etcd2, Guid.Empty, snapshotPart1);
+ await delegateInterceptor.WriteResponseAsync(Etcd2, Guid.Empty, snapshotPart2);
+ await delegateInterceptor.WriteResponseAsync(Etcd2, Guid.Empty, unavailableException);
+ await delegateInterceptor.CloseResponseStreamAsync(Etcd2, Guid.Empty);
+ await delegateInterceptor.WriteResponseAsync(Etcd3, Guid.Empty, snapshotPart1);
+ await delegateInterceptor.WriteResponseAsync(Etcd3, Guid.Empty, snapshotPart2);
+ await delegateInterceptor.WriteResponseAsync(Etcd3, Guid.Empty, snapshotPart3);
+ await delegateInterceptor.CloseResponseStreamAsync(Etcd3, Guid.Empty);
+
+
+ SnapshotRequest request = new SnapshotRequest();
+ List receivedSnapshot = new();
+ await client.Snapshot(
+ request,
+ rsp =>
+ {
+ receivedSnapshot.Add(rsp);
+ },
+ CancellationToken.None);
+
+ CollectionAssert.AreEqual(receivedSnapshot, originalSnapshot);
+ // test failed because current stream retry didnt correct
+ }
+}
\ No newline at end of file
diff --git a/tests/Integration/Utils/DelegateInterceptor.cs b/tests/Integration/Utils/DelegateInterceptor.cs
new file mode 100644
index 0000000..a698013
--- /dev/null
+++ b/tests/Integration/Utils/DelegateInterceptor.cs
@@ -0,0 +1,186 @@
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Interceptors;
+using Grpc.Net.Client;
+
+namespace Integration.Utils;
+
+internal class DelegateInterceptor : Interceptor
+ where TReq : class
+ where TRsp : class
+{
+ private readonly MessageStore _requestsStore = new();
+ private readonly MessageStore _responsesStore = new();
+ private readonly bool _ignoreCallId = false;
+
+ public DelegateInterceptor(bool ignoreCallId = false)
+ {
+ _ignoreCallId = ignoreCallId;
+ }
+ public override AsyncUnaryCall AsyncUnaryCall(TRequest request,
+ ClientInterceptorContext context,
+ AsyncUnaryCallContinuation continuation)
+ {
+ ValidateCall();
+ Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid();
+ string address = GetEtcdAdders(continuation);
+ _requestsStore.WriteAsync(
+ address, callId,
+ request).Wait();
+ _requestsStore.Complete(address, callId);
+ var enumerator = _responsesStore.GetReader(address,callId);
+ if (!enumerator.MoveNext().Result) throw new Exception("response required");
+ var response = enumerator.Current;
+ _responsesStore.Complete(address,callId);
+ var call = new AsyncUnaryCall(
+ responseAsync: Task.FromResult(response),
+ responseHeadersAsync: Task.FromResult(new Metadata()),
+ getStatusFunc: () => new Status(
+ statusCode: StatusCode.OK,
+ detail: ""),
+ getTrailersFunc: () => new Metadata(),
+ disposeAction: () => { });
+ return call;
+ }
+
+ public override TResponse BlockingUnaryCall(TRequest request,
+ ClientInterceptorContext context,
+ BlockingUnaryCallContinuation continuation)
+ {
+ ValidateCall();
+ Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid();
+ string address = GetEtcdAdders(continuation);
+ _requestsStore.WriteAsync(
+ address,callId,
+ request).Wait();
+ _requestsStore.Complete(address, callId);
+ var enumerator = _responsesStore.GetReader(address,callId);
+ if (!enumerator.MoveNext().Result) throw new Exception("response required");
+ _responsesStore.Complete(address,callId);
+ return enumerator.Current;
+ }
+
+ public override AsyncClientStreamingCall AsyncClientStreamingCall(
+ ClientInterceptorContext context,
+ AsyncClientStreamingCallContinuation continuation)
+ {
+ ValidateCall();
+ Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid();
+ string address = GetEtcdAdders(continuation);
+ var reader = _responsesStore.GetReader(address,callId);
+ var call = new AsyncClientStreamingCall(
+ requestStream: _requestsStore.GetWriter(address, callId),
+ responseHeadersAsync: Task.FromResult(new Metadata()),
+ getStatusFunc: () => new Status(
+ statusCode: StatusCode.OK,
+ detail: ""),
+ getTrailersFunc: () => new Metadata(),
+ disposeAction: () => { },
+ responseAsync: Task.Run(
+ async () =>
+ {
+ await reader.MoveNext();
+ _responsesStore.Complete(address,callId);
+ return reader.Current;
+ }));
+ return call;
+ }
+
+
+ public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request,
+ ClientInterceptorContext context,
+ AsyncServerStreamingCallContinuation continuation)
+ {
+ ValidateCall();
+ Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid();
+ string address = GetEtcdAdders(continuation);
+ _requestsStore.WriteAsync(
+ address,callId,
+ request).Wait();
+ _requestsStore.Complete(address, callId);
+ var call = new AsyncServerStreamingCall(
+ responseStream: _responsesStore.GetReader(address,callId),
+ responseHeadersAsync: Task.FromResult(new Metadata()),
+ getStatusFunc: () => new Status(
+ statusCode: StatusCode.OK,
+ detail: ""),
+ getTrailersFunc: () => new Metadata(),
+ disposeAction: () => { });
+ return call;
+ }
+
+
+ public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(
+ ClientInterceptorContext context,
+ AsyncDuplexStreamingCallContinuation continuation)
+ {
+ ValidateCall();
+ Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid();
+ string address = GetEtcdAdders(continuation);
+ AsyncDuplexStreamingCall call = new(
+ requestStream: _requestsStore.GetWriter(address,callId),
+ responseStream: _responsesStore.GetReader(address,callId),
+ responseHeadersAsync: Task.FromResult(new Metadata()),
+ getStatusFunc: () => new Status(
+ statusCode: StatusCode.OK,
+ detail: ""),
+ getTrailersFunc: () => new Metadata(),
+ disposeAction: () => { });
+ return call;
+ }
+
+
+ public async Task WriteResponseAsync(string address,Guid callId, TRsp rsp)
+ {
+ await _responsesStore.WriteAsync(address, callId,rsp);
+ }
+
+ public async Task WriteResponseAsync(string address, Guid callId, Exception exception)
+ {
+ await _responsesStore.WriteAsync(
+ address, callId,
+ null,
+ exception);
+ }
+
+ public async Task CloseResponseStreamAsync(string address, Guid callId)
+ {
+ _responsesStore.Complete(address,callId);
+ }
+
+ public IAsyncEnumerable ReadAllRequests(string address, Guid callId, CancellationToken cancellationToken)
+ {
+ return _requestsStore.GetReader(address, callId).ReadAllAsync(cancellationToken);
+ }
+
+ public async IAsyncEnumerable<(string address, Guid callId, TReq message, bool closed)> ReadAllRequests(CancellationToken cancellationToken)
+ {
+ await foreach (var (address, callId, message, exception, closed) in _requestsStore.ReadMessages())
+ {
+ if (exception != null) throw exception;
+ yield return (address, callId, (TReq)message!, closed);
+ }
+ }
+
+ private static void ValidateCall()
+ {
+ if (typeof(TReq) != typeof(TRequest) || typeof(TRsp) != typeof(TResponse))
+ throw new Exception("Interceptor not applicable to these call");
+ }
+
+ private static string GetEtcdAdders(Delegate continuation)
+ {
+ object target = continuation.Target!;
+ object invoker = target.GetType().GetField("invoker",BindingFlags.Instance|BindingFlags.NonPublic)!
+ .GetValue(target)!;
+ GrpcChannel channel = (GrpcChannel)invoker.GetType()
+ .GetProperty(
+ "Channel",
+ BindingFlags.Instance | BindingFlags.NonPublic)!.GetValue(invoker)!;
+ return channel.Target;
+ }
+}
\ No newline at end of file
diff --git a/tests/Integration/Utils/MessageStore.cs b/tests/Integration/Utils/MessageStore.cs
new file mode 100644
index 0000000..e490945
--- /dev/null
+++ b/tests/Integration/Utils/MessageStore.cs
@@ -0,0 +1,162 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using Grpc.Core;
+using NUnit.Framework;
+using Polly;
+
+namespace Integration.Utils;
+
+internal class MessageStore
+{
+ private readonly object _locker = new();
+ private readonly List<(string address, Guid callId, object? message, Exception? exception, bool closed)> _store = new();
+ private readonly List> _readers = new();
+
+
+ public async Task WriteAsync(string address, Guid callId, object? message, Exception? exception = null) =>
+ await WriteAsync(
+ address,
+ callId,
+ message,
+ exception,
+ false);
+
+ private async Task WriteAsync(string address, Guid callId, object? message, Exception? exception, bool closed)
+ {
+ await Task.Run(
+ () =>
+ {
+ lock (_locker)
+ {
+ if (_store.Any(item => item.address == address && item.callId == callId && item.closed))
+ throw new InvalidOperationException("call closed already");
+ _store.Add((address, callId, message, exception, closed));
+ foreach (Channel<(string addres, Guid callId, object? message, Exception? exception, bool closed)> channel in _readers)
+ {
+ channel.Writer.TryWrite((address, callId, message, exception, closed));
+ }
+ }
+ });
+ }
+
+ public void Complete(string address, Guid callId) =>
+ WriteAsync(
+ address,
+ callId,
+ null,
+ null,
+ true).Wait();
+
+ public async IAsyncEnumerable<(string address, Guid callId, object? message, Exception? exception, bool closed)> ReadMessages()
+ {
+ Channel<(string addres, Guid callId,object? message, Exception? exception, bool closed)>? channel = null;
+ try
+ {
+ lock (_locker)
+ {
+ channel = Channel.CreateUnbounded<(string addres, Guid callId, object? message, Exception? exception, bool closed)>();
+ _readers.Add(channel);
+ List<(string address, Guid callId, object? message, Exception? exception, bool closed)> existingMessages = new(_store);
+ foreach ((string address, Guid callId, object? message, Exception? exception, bool closed) in existingMessages)
+ {
+ channel.Writer.TryWrite((address, callId, message, exception, closed));
+ }
+ }
+
+ await foreach ((string addres, Guid callId, object? message, Exception? exception, bool closed) in channel.Reader.ReadAllAsync())
+ {
+ yield return (addres, callId, message, exception, closed);
+ }
+ }
+ finally
+ {
+ lock (_locker)
+ {
+ if (channel != null)
+ {
+ _readers.Remove(channel);
+ }
+ }
+ }
+ }
+
+ public async IAsyncEnumerable<(object? message, Exception? exception)> ReadMessages(string address, Guid callId)
+ {
+ await foreach (var (addr, _callId, message, exception, closed) in ReadMessages())
+ {
+ if (addr == address && callId == _callId)
+ {
+ if(closed) yield break;
+ yield return (message, exception);
+ }
+ }
+ }
+
+
+
+ public IClientStreamWriter GetWriter(string address, Guid callId)
+ {
+ return new DelegateStreamWriter(
+ async message => await WriteAsync(address, callId, message, null),
+ async () =>
+ {
+ Complete(address, callId);
+ });
+ }
+
+ public IAsyncStreamReader GetReader(string address, Guid callId)
+ {
+ return new StreamReader(ReadMessages(address, callId).GetAsyncEnumerator());
+ }
+
+
+
+ private class DelegateStreamWriter : IClientStreamWriter
+ {
+ private readonly Func _onWrite;
+ private readonly Func _onComplete;
+
+ public DelegateStreamWriter(Func onWrite, Func onComplete)
+ {
+ _onWrite = onWrite;
+ _onComplete = onComplete;
+ }
+
+
+ public async Task WriteAsync(T message)
+ {
+ await _onWrite(message);
+ }
+
+
+ public WriteOptions? WriteOptions { get; set; }
+ public async Task CompleteAsync()
+ {
+ await _onComplete();
+ }
+ }
+
+ private class StreamReader : IAsyncStreamReader
+ {
+ private readonly IAsyncEnumerator<(object? message, Exception? exception)> _enumerator;
+ private readonly Func _getCurrentFunc;
+
+ public StreamReader(IAsyncEnumerator<(object?, Exception?)> enumerator)
+ {
+ _enumerator = enumerator;
+ _getCurrentFunc = () => _enumerator.Current.exception == null
+ ? (T)_enumerator.Current.message!
+ : throw _enumerator.Current.exception;
+ }
+ public async Task MoveNext(CancellationToken cancellationToken)
+ {
+ return await _enumerator.MoveNextAsync();
+ }
+
+ public T Current => _getCurrentFunc();
+ }
+}