Skip to content

Commit

Permalink
Port EdgeHub MessageAckSeconds option to 1.0.9 (#3010)
Browse files Browse the repository at this point in the history
Port of (#2652)

This timeout can now be configured with a new environment variable in the edge hub deployment:
```MessageAckTimeoutSecs```
  • Loading branch information
and-rewsmith authored May 26, 2020
1 parent 377e51b commit 288ed4b
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.Core
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Hub.Core.Device;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
Expand All @@ -10,16 +11,18 @@ public class ConnectionProvider : IConnectionProvider
{
readonly IConnectionManager connectionManager;
readonly IEdgeHub edgeHub;
readonly TimeSpan messageAckTimeout;

public ConnectionProvider(IConnectionManager connectionManager, IEdgeHub edgeHub)
public ConnectionProvider(IConnectionManager connectionManager, IEdgeHub edgeHub, TimeSpan messageAckTimeout)
{
this.connectionManager = Preconditions.CheckNotNull(connectionManager, nameof(connectionManager));
this.edgeHub = Preconditions.CheckNotNull(edgeHub, nameof(edgeHub));
this.messageAckTimeout = messageAckTimeout;
}

public Task<IDeviceListener> GetDeviceListenerAsync(IIdentity identity)
{
IDeviceListener deviceListener = new DeviceMessageHandler(Preconditions.CheckNotNull(identity, nameof(identity)), this.edgeHub, this.connectionManager);
IDeviceListener deviceListener = new DeviceMessageHandler(Preconditions.CheckNotNull(identity, nameof(identity)), this.edgeHub, this.connectionManager, this.messageAckTimeout);
return Task.FromResult(deviceListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,23 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Device
class DeviceMessageHandler : IDeviceListener, IDeviceProxy
{
const int GenericBadRequest = 400000;
static readonly TimeSpan MessageResponseTimeout = TimeSpan.FromSeconds(30);

readonly ConcurrentDictionary<string, TaskCompletionSource<DirectMethodResponse>> methodCallTaskCompletionSources = new ConcurrentDictionary<string, TaskCompletionSource<DirectMethodResponse>>();
readonly ConcurrentDictionary<string, TaskCompletionSource<bool>> messageTaskCompletionSources = new ConcurrentDictionary<string, TaskCompletionSource<bool>>();
readonly ConcurrentDictionary<string, bool> c2dMessageTaskCompletionSources = new ConcurrentDictionary<string, bool>();

readonly IEdgeHub edgeHub;
readonly IConnectionManager connectionManager;
readonly TimeSpan messageAckTimeout;
readonly AsyncLock serializeMessagesLock = new AsyncLock();
IDeviceProxy underlyingProxy;

public DeviceMessageHandler(IIdentity identity, IEdgeHub edgeHub, IConnectionManager connectionManager)
public DeviceMessageHandler(IIdentity identity, IEdgeHub edgeHub, IConnectionManager connectionManager, TimeSpan messageAckTimeout)
{
this.Identity = Preconditions.CheckNotNull(identity, nameof(identity));
this.edgeHub = Preconditions.CheckNotNull(edgeHub, nameof(edgeHub));
this.connectionManager = Preconditions.CheckNotNull(connectionManager, nameof(connectionManager));
this.messageAckTimeout = messageAckTimeout;
}

public IIdentity Identity { get; }
Expand Down Expand Up @@ -416,7 +417,7 @@ public async Task SendMessageAsync(IMessage message, string input)
Metrics.MessageProcessingLatency(this.Identity, message);
await this.underlyingProxy.SendMessageAsync(message, input);

Task completedTask = await Task.WhenAny(taskCompletionSource.Task, Task.Delay(MessageResponseTimeout));
Task completedTask = await Task.WhenAny(taskCompletionSource.Task, Task.Delay(this.messageAckTimeout));
if (completedTask != taskCompletionSource.Task)
{
Events.MessageFeedbackTimedout(this.Identity, lockToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ void RegisterRoutingModule(
TimeSpan connectivityCheckFrequency = connectivityCheckFrequencySecs < 0 ? TimeSpan.MaxValue : TimeSpan.FromSeconds(connectivityCheckFrequencySecs);
// n Clients + 1 Edgehub
int maxConnectedClients = this.configuration.GetValue("MaxConnectedClients", 100) + 1;
int messageAckTimeoutSecs = this.configuration.GetValue("MessageAckTimeoutSecs", 30);
TimeSpan messageAckTimeout = TimeSpan.FromSeconds(messageAckTimeoutSecs);
int cloudConnectionIdleTimeoutSecs = this.configuration.GetValue("CloudConnectionIdleTimeoutSecs", 3600);
TimeSpan cloudConnectionIdleTimeout = TimeSpan.FromSeconds(cloudConnectionIdleTimeoutSecs);
bool closeCloudConnectionOnIdleTimeout = this.configuration.GetValue("CloseCloudConnectionOnIdleTimeout", true);
Expand Down Expand Up @@ -165,6 +167,7 @@ void RegisterRoutingModule(
upstreamProtocolOption,
connectivityCheckFrequency,
maxConnectedClients,
messageAckTimeout,
cloudConnectionIdleTimeout,
closeCloudConnectionOnIdleTimeout,
cloudOperationTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ static async Task<int> MainAsync(IConfigurationRoot configuration)
LogLogo(logger);
LogVersionInfo(logger);
logger.LogInformation($"OptimizeForPerformance={configuration.GetValue("OptimizeForPerformance", true)}");
logger.LogInformation($"MessageAckTimeoutSecs={configuration.GetValue("MessageAckTimeoutSecs", 30)}");
logger.LogInformation("Loaded server certificate with expiration date of {0}", certificates.ServerCertificate.NotAfter.ToString("o"));

var metricsListener = container.Resolve<IMetricsListener>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class RoutingModule : Module
readonly Option<UpstreamProtocol> upstreamProtocol;
readonly TimeSpan connectivityCheckFrequency;
readonly int maxConnectedClients;
readonly TimeSpan messageAckTimeout;
readonly TimeSpan cloudConnectionIdleTimeout;
readonly bool closeCloudConnectionOnIdleTimeout;
readonly TimeSpan operationTimeout;
Expand Down Expand Up @@ -67,6 +68,7 @@ public RoutingModule(
Option<UpstreamProtocol> upstreamProtocol,
TimeSpan connectivityCheckFrequency,
int maxConnectedClients,
TimeSpan messageAckTimeout,
TimeSpan cloudConnectionIdleTimeout,
bool closeCloudConnectionOnIdleTimeout,
TimeSpan operationTimeout,
Expand All @@ -93,6 +95,7 @@ public RoutingModule(
this.upstreamProtocol = upstreamProtocol;
this.connectivityCheckFrequency = connectivityCheckFrequency;
this.maxConnectedClients = Preconditions.CheckRange(maxConnectedClients, 1);
this.messageAckTimeout = messageAckTimeout;
this.cloudConnectionIdleTimeout = cloudConnectionIdleTimeout;
this.closeCloudConnectionOnIdleTimeout = closeCloudConnectionOnIdleTimeout;
this.operationTimeout = operationTimeout;
Expand Down Expand Up @@ -580,7 +583,7 @@ protected override void Load(ContainerBuilder builder)
var edgeHubTask = c.Resolve<Task<IEdgeHub>>();
IConnectionManager connectionManager = await connectionManagerTask;
IEdgeHub edgeHub = await edgeHubTask;
IConnectionProvider connectionProvider = new ConnectionProvider(connectionManager, edgeHub);
IConnectionProvider connectionProvider = new ConnectionProvider(connectionManager, edgeHub, this.messageAckTimeout);
return connectionProvider;
})
.As<Task<IConnectionProvider>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ public async Task TestAddRemoveDeviceConnectionTest()
IConnectionManager connectionManager = new ConnectionManager(cloudConnectionProvider, credentialsManager, GetIdentityProvider(), deviceConnectivityManager);
Try<ICloudProxy> cloudProxyTry = await connectionManager.CreateCloudConnectionAsync(deviceCredentials);
Assert.True(cloudProxyTry.Success);
var deviceListener = new DeviceMessageHandler(deviceCredentials.Identity, edgeHub.Object, connectionManager);
TimeSpan defaultMessageAckTimeout = TimeSpan.FromSeconds(30);
var deviceListener = new DeviceMessageHandler(deviceCredentials.Identity, edgeHub.Object, connectionManager, defaultMessageAckTimeout);

Option<ICloudProxy> cloudProxy = await connectionManager.GetCloudConnection(deviceId);
Assert.True(cloudProxy.HasValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Test

public class ConnectionProviderTest
{
static readonly TimeSpan DefaultMessageAckTimeout = TimeSpan.FromSeconds(30);

[Fact]
[Unit]
public void ConnectionProviderConstructorTest()
{
var connectionManager = Mock.Of<IConnectionManager>();
var edgeHub = Mock.Of<IEdgeHub>();

Assert.NotNull(new ConnectionProvider(connectionManager, edgeHub));
Assert.NotNull(new ConnectionProvider(connectionManager, edgeHub, DefaultMessageAckTimeout));
}

[Fact]
Expand All @@ -28,7 +30,7 @@ public void ConnectionProviderConstructor_NullConnectionManagerTest()
{
var edgeHub = Mock.Of<IEdgeHub>();

Assert.Throws<ArgumentNullException>(() => new ConnectionProvider(null, edgeHub));
Assert.Throws<ArgumentNullException>(() => new ConnectionProvider(null, edgeHub, DefaultMessageAckTimeout));
}

[Fact]
Expand All @@ -37,7 +39,7 @@ public void ConnectionProviderConstructor_NullEdgeHubTest()
{
var connectionManager = Mock.Of<IConnectionManager>();

Assert.Throws<ArgumentNullException>(() => new ConnectionProvider(connectionManager, null));
Assert.Throws<ArgumentNullException>(() => new ConnectionProvider(connectionManager, null, DefaultMessageAckTimeout));
}

[Fact]
Expand All @@ -48,7 +50,7 @@ public async Task GetDeviceListenerWithSasIdentityTest()
var edgeHub = Mock.Of<IEdgeHub>();
var moduleCredentials = new TokenCredentials(new ModuleIdentity("hub", "device", "module"), "token", "productInfo", false);

var connectionProvider = new ConnectionProvider(connectionManager, edgeHub);
var connectionProvider = new ConnectionProvider(connectionManager, edgeHub, DefaultMessageAckTimeout);
Assert.NotNull(await connectionProvider.GetDeviceListenerAsync(moduleCredentials.Identity));
}

Expand All @@ -62,7 +64,7 @@ public async Task GetDeviceListenerWithX509IdentityTest()
var clientCertChain = new List<X509Certificate2>();
var moduleCredentials = new X509CertCredentials(new ModuleIdentity("hub", "device", "module"), string.Empty, clientCertificate, clientCertChain);

var connectionProvider = new ConnectionProvider(connectionManager, edgeHub);
var connectionProvider = new ConnectionProvider(connectionManager, edgeHub, DefaultMessageAckTimeout);
Assert.NotNull(await connectionProvider.GetDeviceListenerAsync(moduleCredentials.Identity));
}

Expand All @@ -73,7 +75,7 @@ public async Task GetDeviceListener_NullIdentityTest()
var connectionManager = Mock.Of<IConnectionManager>();
var edgeHub = Mock.Of<IEdgeHub>();

var connectionProvider = new ConnectionProvider(connectionManager, edgeHub);
var connectionProvider = new ConnectionProvider(connectionManager, edgeHub, DefaultMessageAckTimeout);
await Assert.ThrowsAsync<ArgumentNullException>(() => connectionProvider.GetDeviceListenerAsync(null));
}
}
Expand Down
Loading

0 comments on commit 288ed4b

Please sign in to comment.