Skip to content

Commit

Permalink
investigate DotNetty clean shutdown (#2817)
Browse files Browse the repository at this point in the history
* added dotnetty shutdown specs

* added specs to verify that DotNetty transport instances are shutdown correctly

* #2575 - added DotNetty shutdown specs

* added copyright headers
  • Loading branch information
Aaronontheweb authored and Arkatufus committed Jul 8, 2017
1 parent 127b7d5 commit 1e5fa50
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 55 deletions.
1 change: 1 addition & 0 deletions src/core/Akka.Remote.Tests/Akka.Remote.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@
<Compile Include="Transport\AkkaProtocolStressTest.cs" />
<Compile Include="Transport\DotNettySslSupportSpec.cs" />
<Compile Include="Transport\DotNettyTransportDnsResolutionSpec.cs" />
<Compile Include="Transport\DotNettyTransportShutdownSpec.cs" />
<Compile Include="Transport\GenericTransportSpec.cs" />
<Compile Include="Transport\TestTransportSpec.cs" />
<Compile Include="Transport\ThrottleModeSpec.cs" />
Expand Down
283 changes: 283 additions & 0 deletions src/core/Akka.Remote.Tests/Transport/DotNettyTransportShutdownSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
//-----------------------------------------------------------------------
// <copyright file="DotNettyTransportShutdownSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Remote.Transport;
using Akka.Remote.Transport.DotNetty;
using Akka.TestKit;
using Xunit;
using FluentAssertions;

namespace Akka.Remote.Tests.Transport
{
/// <summary>
/// Verify that the <see cref="DotNettyTransport"/> can cleanly shut itself down.
/// </summary>
public class DotNettyTransportShutdownSpec : AkkaSpec
{
private static readonly Config Config = ConfigurationFactory.ParseString(@"
akka {
loglevel = DEBUG
actor.provider = ""Akka.Remote.RemoteActorRefProvider,Akka.Remote""
remote {
dot-netty.tcp {
port = 0
hostname = ""127.0.0.1""
log-transport = true
}
}
}");

public DotNettyTransportShutdownSpec() : base(Config)
{

}

[Fact]
public async Task DotNettyTcpTransport_should_cleanly_terminate_unused_inbound_endpoint()
{
var t1 = new TcpTransport(Sys, Sys.Settings.Config.GetConfig("akka.remote.dot-netty.tcp"));
try
{
// bind
await t1.Listen();

// verify that ServerChannel is active and open
var sc = t1.ServerChannel;
sc.Should().NotBeNull();
sc.Active.Should().BeTrue();
sc.Open.Should().BeTrue();

// shutdown
await t1.Shutdown();
sc.Open.Should().BeFalse();
sc.CloseCompletion.IsCompleted.Should().BeTrue();
}
finally
{
await t1.Shutdown();
}
}

[Fact]
public async Task DotNettyTcpTransport_should_cleanly_terminate_active_endpoints_upon_outbound_disassociate()
{
var t1 = new TcpTransport(Sys, Sys.Settings.Config.GetConfig("akka.remote.dot-netty.tcp"));
var t2 = new TcpTransport(Sys, Sys.Settings.Config.GetConfig("akka.remote.dot-netty.tcp"));
try
{
var p1 = CreateTestProbe();
var p2 = CreateTestProbe();

// bind
var c1 = await t1.Listen();
c1.Item2.SetResult(new ActorAssociationEventListener(p1));
var c2 = await t2.Listen();
c2.Item2.SetResult(new ActorAssociationEventListener(p2));

// t1 --> t2 association
var handle = await t1.Associate(c2.Item1);
handle.ReadHandlerSource.SetResult(new ActorHandleEventListener(p1));
var inboundHandle = p2.ExpectMsg<InboundAssociation>().Association; // wait for the inbound association handle to show up
inboundHandle.ReadHandlerSource.SetResult(new ActorHandleEventListener(p2));

t1.ConnectionGroup.Count.Should().Be(2);
t2.ConnectionGroup.Count.Should().Be(2);

var chan1 = t1.ConnectionGroup.Single(x => !x.Id.Equals(t1.ServerChannel.Id));
var chan2 = t2.ConnectionGroup.Single(x => !x.Id.Equals(t2.ServerChannel.Id));

// force a disassociation
handle.Disassociate();

// verify that the connections are terminated
p1.ExpectMsg<Disassociated>();
AwaitCondition(() => t1.ConnectionGroup.Count == 1);
AwaitCondition(() => t2.ConnectionGroup.Count == 1);

// verify that the connection channels were terminated on both ends
chan1.CloseCompletion.IsCompleted.Should().BeTrue();
chan2.CloseCompletion.IsCompleted.Should().BeTrue();
}
finally
{
await t1.Shutdown();
await t2.Shutdown();
}
}

[Fact]
public async Task DotNettyTcpTransport_should_cleanly_terminate_active_endpoints_upon_outbound_shutdown()
{
var t1 = new TcpTransport(Sys, Sys.Settings.Config.GetConfig("akka.remote.dot-netty.tcp"));
var t2 = new TcpTransport(Sys, Sys.Settings.Config.GetConfig("akka.remote.dot-netty.tcp"));
try
{
var p1 = CreateTestProbe();
var p2 = CreateTestProbe();

// bind
var c1 = await t1.Listen();
c1.Item2.SetResult(new ActorAssociationEventListener(p1));
var c2 = await t2.Listen();
c2.Item2.SetResult(new ActorAssociationEventListener(p2));

// t1 --> t2 association
var handle = await t1.Associate(c2.Item1);
handle.ReadHandlerSource.SetResult(new ActorHandleEventListener(p1));
var inboundHandle = p2.ExpectMsg<InboundAssociation>().Association; // wait for the inbound association handle to show up
inboundHandle.ReadHandlerSource.SetResult(new ActorHandleEventListener(p2));

t1.ConnectionGroup.Count.Should().Be(2);
t2.ConnectionGroup.Count.Should().Be(2);

var chan1 = t1.ConnectionGroup.Single(x => !x.Id.Equals(t1.ServerChannel.Id));
var chan2 = t2.ConnectionGroup.Single(x => !x.Id.Equals(t2.ServerChannel.Id));

// shutdown remoting on t1
await t1.Shutdown();

p2.ExpectMsg<Disassociated>();
// verify that the connections are terminated
AwaitCondition(() => t1.ConnectionGroup.Count == 0, null, message: $"Expected 0 open connection but found {t1.ConnectionGroup.Count}");
AwaitCondition(() => t2.ConnectionGroup.Count == 1, null,message: $"Expected 1 open connection but found {t2.ConnectionGroup.Count}");

// verify that the connection channels were terminated on both ends
chan1.CloseCompletion.IsCompleted.Should().BeTrue();
chan2.CloseCompletion.IsCompleted.Should().BeTrue();
}
finally
{
await t1.Shutdown();
await t2.Shutdown();
}
}

[Fact]
public async Task DotNettyTcpTransport_should_cleanly_terminate_active_endpoints_upon_inbound_disassociate()
{
var t1 = new TcpTransport(Sys, Sys.Settings.Config.GetConfig("akka.remote.dot-netty.tcp"));
var t2 = new TcpTransport(Sys, Sys.Settings.Config.GetConfig("akka.remote.dot-netty.tcp"));
try
{
var p1 = CreateTestProbe();
var p2 = CreateTestProbe();

// bind
var c1 = await t1.Listen();
c1.Item2.SetResult(new ActorAssociationEventListener(p1));
var c2 = await t2.Listen();
c2.Item2.SetResult(new ActorAssociationEventListener(p2));

// t1 --> t2 association
var handle = await t1.Associate(c2.Item1);
handle.ReadHandlerSource.SetResult(new ActorHandleEventListener(p1));
var inboundHandle = p2.ExpectMsg<InboundAssociation>().Association; // wait for the inbound association handle to show up
inboundHandle.ReadHandlerSource.SetResult(new ActorHandleEventListener(p2));

t1.ConnectionGroup.Count.Should().Be(2);
t2.ConnectionGroup.Count.Should().Be(2);

var chan1 = t1.ConnectionGroup.Single(x => !x.Id.Equals(t1.ServerChannel.Id));
var chan2 = t2.ConnectionGroup.Single(x => !x.Id.Equals(t2.ServerChannel.Id));

// force a disassociation
inboundHandle.Disassociate();

// verify that the connections are terminated
AwaitCondition(() => t1.ConnectionGroup.Count == 1, null, message: $"Expected 1 open connection but found {t1.ConnectionGroup.Count}");
AwaitCondition(() => t2.ConnectionGroup.Count == 1, null, message: $"Expected 1 open connection but found {t2.ConnectionGroup.Count}");

// verify that the connection channels were terminated on both ends
chan1.CloseCompletion.IsCompleted.Should().BeTrue();
chan2.CloseCompletion.IsCompleted.Should().BeTrue();
}
finally
{
await t1.Shutdown();
await t2.Shutdown();
}
}

[Fact]
public async Task DotNettyTcpTransport_should_cleanly_terminate_active_endpoints_upon_inbound_shutdown()
{
var t1 = new TcpTransport(Sys, Sys.Settings.Config.GetConfig("akka.remote.dot-netty.tcp"));
var t2 = new TcpTransport(Sys, Sys.Settings.Config.GetConfig("akka.remote.dot-netty.tcp"));
try
{
var p1 = CreateTestProbe();
var p2 = CreateTestProbe();

// bind
var c1 = await t1.Listen();
c1.Item2.SetResult(new ActorAssociationEventListener(p1));
var c2 = await t2.Listen();
c2.Item2.SetResult(new ActorAssociationEventListener(p2));

// t1 --> t2 association
var handle = await t1.Associate(c2.Item1);
handle.ReadHandlerSource.SetResult(new ActorHandleEventListener(p1));
var inboundHandle = p2.ExpectMsg<InboundAssociation>().Association; // wait for the inbound association handle to show up
inboundHandle.ReadHandlerSource.SetResult(new ActorHandleEventListener(p2));

t1.ConnectionGroup.Count.Should().Be(2);
t2.ConnectionGroup.Count.Should().Be(2);

var chan1 = t1.ConnectionGroup.Single(x => !x.Id.Equals(t1.ServerChannel.Id));
var chan2 = t2.ConnectionGroup.Single(x => !x.Id.Equals(t2.ServerChannel.Id));

// shutdown inbound side
await t2.Shutdown();

// verify that the connections are terminated
AwaitCondition(() => t1.ConnectionGroup.Count == 1, null, message: $"Expected 1 open connection but found {t1.ConnectionGroup.Count}");
AwaitCondition(() => t2.ConnectionGroup.Count == 0, null, message: $"Expected 0 open connection but found {t2.ConnectionGroup.Count}");

// verify that the connection channels were terminated on both ends
chan1.CloseCompletion.IsCompleted.Should().BeTrue();
chan2.CloseCompletion.IsCompleted.Should().BeTrue();
}
finally
{
await t1.Shutdown();
await t2.Shutdown();
}
}

[Fact]
public async Task DotNettyTcpTransport_should_cleanly_terminate_endpoints_upon_failed_outbound_connection()
{
var t1 = new TcpTransport(Sys, Sys.Settings.Config.GetConfig("akka.remote.dot-netty.tcp"));
try
{
var p1 = CreateTestProbe();

// bind
var c1 = await t1.Listen();
c1.Item2.SetResult(new ActorAssociationEventListener(p1));

// t1 --> t2 association
await Assert.ThrowsAsync<InvalidAssociationException>(async () =>
{
var a = await t1.Associate(c1.Item1.WithPort(c1.Item1.Port + 100));
});


t1.ConnectionGroup.Count.Should().Be(1);
}
finally
{
await t1.Shutdown();
}
}
}
}
10 changes: 5 additions & 5 deletions src/core/Akka.Remote/RemoteActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ public class RemoteActorRefProvider : IActorRefProvider
private readonly ILoggingAdapter _log;

/// <summary>
/// TBD
/// Creates a new remote actor ref provider instance.
/// </summary>
/// <param name="systemName">TBD</param>
/// <param name="settings">TBD</param>
/// <param name="eventStream">TBD</param>
/// <param name="systemName">Name of the actor system.</param>
/// <param name="settings">The actor system settings.</param>
/// <param name="eventStream">The <see cref="EventStream"/> instance used by this system.</param>
public RemoteActorRefProvider(string systemName, Settings settings, EventStream eventStream)
{
settings.InjectTopLevelFallback(RemoteConfigFactory.Default());
Expand Down Expand Up @@ -67,6 +67,7 @@ private Internals CreateInternals()
/// Remoting system daemon responsible for powering remote deployment capabilities.
/// </summary>
public IInternalActorRef RemoteDaemon { get { return RemoteInternals.RemoteDaemon; } }

/// <summary>
/// The remote transport. Wraps all of the underlying physical network transports.
/// </summary>
Expand All @@ -85,7 +86,6 @@ public ActorPath RootPath
get { return _local.RootPath; }
}


/// <inheritdoc/>
public IInternalActorRef RootGuardian { get { return _local.RootGuardian; } }

Expand Down
Loading

0 comments on commit 1e5fa50

Please sign in to comment.