From 653651f3c5f7127688e52bae0fd86a72408a3929 Mon Sep 17 00:00:00 2001 From: Unknown Date: Tue, 17 Jul 2018 15:54:33 +0200 Subject: [PATCH 1/7] Added support for Ampq v1.0 --- .../Akka.Streams.Amqp.V1.Tests.csproj | 23 ++++++ .../AmqpConnectorsTest.cs | 55 ++++++++++++++ .../Akka.Streams.Amqp.V1.csproj | 13 ++++ Amqp.V1/Akka.Streams.Amqp.V1/AmqpSinkStage.cs | 71 +++++++++++++++++++ .../Akka.Streams.Amqp.V1/AmqpSourceStage.cs | 68 ++++++++++++++++++ Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSink.cs | 13 ++++ .../Akka.Streams.Amqp.V1/Dsl/AmpqSource.cs | 12 ++++ .../Akka.Streams.Amqp.V1/IAmpqSinkSettings.cs | 11 +++ .../IAmqpSourceSettings.cs | 11 +++ .../NamedQueueSinkSettings.cs | 34 +++++++++ .../NamedQueueSourceSettings.cs | 35 +++++++++ .../Util/AttributesExtension.cs | 13 ++++ Amqp.V1/Amqp.V1.sln | 51 +++++++++++++ 13 files changed, 410 insertions(+) create mode 100644 Amqp.V1/Akka.Streams.Amqp.V1.Tests/Akka.Streams.Amqp.V1.Tests.csproj create mode 100644 Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs create mode 100644 Amqp.V1/Akka.Streams.Amqp.V1/Akka.Streams.Amqp.V1.csproj create mode 100644 Amqp.V1/Akka.Streams.Amqp.V1/AmqpSinkStage.cs create mode 100644 Amqp.V1/Akka.Streams.Amqp.V1/AmqpSourceStage.cs create mode 100644 Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSink.cs create mode 100644 Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSource.cs create mode 100644 Amqp.V1/Akka.Streams.Amqp.V1/IAmpqSinkSettings.cs create mode 100644 Amqp.V1/Akka.Streams.Amqp.V1/IAmqpSourceSettings.cs create mode 100644 Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSinkSettings.cs create mode 100644 Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs create mode 100644 Amqp.V1/Akka.Streams.Amqp.V1/Util/AttributesExtension.cs create mode 100644 Amqp.V1/Amqp.V1.sln diff --git a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/Akka.Streams.Amqp.V1.Tests.csproj b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/Akka.Streams.Amqp.V1.Tests.csproj new file mode 100644 index 00000000..765b68b6 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/Akka.Streams.Amqp.V1.Tests.csproj @@ -0,0 +1,23 @@ + + + + netcoreapp2.0 + + false + + + + + + + + + + + + + + + + + diff --git a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs new file mode 100644 index 00000000..c6b738a3 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs @@ -0,0 +1,55 @@ +using Akka.Serialization; +using Akka.Streams.Amqp.V1.Dsl; +using Akka.Streams.Dsl; +using Amqp; +using System; +using Xunit; + +namespace Akka.Streams.Amqp.V1.Tests +{ + public class AmqpConnectorsTest : Akka.TestKit.Xunit2.TestKit + { + private readonly Serializer serializer; + private readonly Address address; + private readonly ActorMaterializer materializer; + + public AmqpConnectorsTest() + { + materializer = ActorMaterializer.Create(Sys); + var serialization = Sys.Serialization; + + serializer = serialization.FindSerializerForType(typeof(string)); + address = new Address("amqp://guest:guest@localhost:5673"); + } + + [Fact] + public void Publish_and_consume_elements_through_a_simple_queue_again_in_the_same_process() + { + Connection connection = new Connection(address); + Session session = new Session(connection); + + var queueName = "q1"; + var senderlinkName = "amqp-conn-test-sender"; + var receiverlinkName = "amqp-conn-test-sender"; + + //create sink and source + var amqpSink = AmpqSink.Create(new NamedQueueSinkSettings(session, senderlinkName, queueName, serializer)); + var amqpSource = AmpqSource.Create(new NamedQueueSourceSettings(session, receiverlinkName, queueName, 200, serializer)); + + //run sink + var input = new[] { "one", "two", "three", "four", "five" }; + Source.From(input).RunWith(amqpSink, materializer).Wait(); + + //run source + var result = amqpSource + .Take(input.Length) + .RunWith(Sink.Seq(), materializer); + + result.Wait(TimeSpan.FromSeconds(3)); + Assert.Equal(input, result.Result); + + session.Close(); + connection.Close(); + } + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/Akka.Streams.Amqp.V1.csproj b/Amqp.V1/Akka.Streams.Amqp.V1/Akka.Streams.Amqp.V1.csproj new file mode 100644 index 00000000..e65c5f1e --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/Akka.Streams.Amqp.V1.csproj @@ -0,0 +1,13 @@ + + + + netcoreapp2.0 + + + + + + + + + diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSinkStage.cs b/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSinkStage.cs new file mode 100644 index 00000000..04eabbae --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSinkStage.cs @@ -0,0 +1,71 @@ +using Akka.Streams.Stage; +using Amqp; +using System; +using System.Diagnostics; +using System.Threading.Tasks; + +namespace Akka.Streams.Amqp.V1 +{ + public sealed class AmqpSinkStage : GraphStageWithMaterializedValue, Task> + { + public readonly Inlet In = new Inlet("AmqpSink.in"); + public override SinkShape Shape => new SinkShape(In); + + public IAmpqSinkSettings AmpqSourceSettings { get; } + + public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + { + var promise = new TaskCompletionSource(); + var logic = new AmqpSinkStageLogic(this, promise, Shape); + return new LogicAndMaterializedValue(logic, promise.Task); + } + + public AmqpSinkStage(IAmpqSinkSettings ampqSourceSettings) + { + AmpqSourceSettings = ampqSourceSettings; + } + + private class AmqpSinkStageLogic : GraphStageLogic + { + private readonly AmqpSinkStage stage; + private readonly TaskCompletionSource promise; + private readonly SenderLink sender; + + public AmqpSinkStageLogic(AmqpSinkStage amqpSinkStage, TaskCompletionSource promise, SinkShape shape) : base(shape) + { + stage = amqpSinkStage; + this.promise = promise; + sender = amqpSinkStage.AmpqSourceSettings.GetSenderLink(); + + SetHandler(stage.In, () => + { + try + { + var elem = Grab(stage.In); + sender.Send(new Message(amqpSinkStage.AmpqSourceSettings.GetBytes(elem))); + Pull(stage.In); + } catch (Exception e) + { + Debugger.Break(); + } + }, + onUpstreamFinish: () => promise.SetResult(Done.Instance), + onUpstreamFailure: ex => promise.SetException(ex) + ); + } + + public override void PreStart() + { + base.PreStart(); + Pull(stage.In); + } + + public override void PostStop() + { + promise.TrySetException(new ApplicationException("stage stopped unexpectedly")); + sender.Close(); + base.PostStop(); + } + } + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSourceStage.cs b/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSourceStage.cs new file mode 100644 index 00000000..b55acab5 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSourceStage.cs @@ -0,0 +1,68 @@ +using Akka.Streams.Amqp.V1.Util; +using Akka.Streams.Stage; +using Akka.Streams.Supervision; +using Amqp; +using Amqp.Framing; +using Amqp.Types; +using System; + +namespace Akka.Streams.Amqp.V1 +{ + public sealed class AmqpSourceStage : GraphStage> + { + public AmqpSourceStage(IAmqpSourceSettings amqpSourceSettings) + { + AmqpSourceSettings = amqpSourceSettings; + } + + public override SourceShape Shape => new SourceShape(Out); + + public Outlet Out { get; } = new Outlet("AmqpSource.Out"); + public IAmqpSourceSettings AmqpSourceSettings { get; } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new AmqpSourceStageLogic(this, inheritedAttributes); + + private class AmqpSourceStageLogic : GraphStageLogic + { + private readonly Outlet outlet; + private readonly IAmqpSourceSettings ampqSourceSettings; + private readonly ReceiverLink receiver; + private readonly Decider decider; + + public AmqpSourceStageLogic(AmqpSourceStage stage, Attributes attributes) : base(stage.Shape) + { + outlet = stage.Out; + ampqSourceSettings = stage.AmqpSourceSettings; + receiver = stage.AmqpSourceSettings.GetReceiverLink(); + decider = attributes.GetDeciderOrDefault(); + + SetHandler(outlet, () => { + var message = receiver.Receive(); + if (message != null) + { + try + { + Push(outlet, ampqSourceSettings.Convert(message)); + receiver.Accept(message); + } + catch (Exception e) + { + receiver.Reject(message, new Error(new Symbol(e.Message))); + if (decider(e) == Directive.Stop) + { + FailStage(e); + } + } + + } + }, onDownstreamFinish: CompleteStage); + } + + public override void PostStop() + { + base.PostStop(); + receiver.Close(); + } + } + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSink.cs b/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSink.cs new file mode 100644 index 00000000..89d45055 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSink.cs @@ -0,0 +1,13 @@ +using Akka.Streams.Dsl; +using System.Threading.Tasks; + +namespace Akka.Streams.Amqp.V1.Dsl +{ + public static class AmpqSink + { + public static Sink Create(IAmpqSinkSettings sourceSettings) + { + return Sink.FromGraph(new AmqpSinkStage(sourceSettings)); + } + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSource.cs b/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSource.cs new file mode 100644 index 00000000..31d4da0f --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSource.cs @@ -0,0 +1,12 @@ +using Akka.Streams.Dsl; + +namespace Akka.Streams.Amqp.V1.Dsl +{ + public static class AmpqSource + { + public static Source Create(IAmqpSourceSettings sourceSettings) + { + return Source.FromGraph(new AmqpSourceStage(sourceSettings)); + } + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/IAmpqSinkSettings.cs b/Amqp.V1/Akka.Streams.Amqp.V1/IAmpqSinkSettings.cs new file mode 100644 index 00000000..1933c300 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/IAmpqSinkSettings.cs @@ -0,0 +1,11 @@ +using Akka.IO; +using Amqp; + +namespace Akka.Streams.Amqp.V1 +{ + public interface IAmpqSinkSettings + { + SenderLink GetSenderLink(); + byte[] GetBytes(T obj); + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/IAmqpSourceSettings.cs b/Amqp.V1/Akka.Streams.Amqp.V1/IAmqpSourceSettings.cs new file mode 100644 index 00000000..65b33dce --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/IAmqpSourceSettings.cs @@ -0,0 +1,11 @@ +using Amqp; + +namespace Akka.Streams.Amqp.V1 +{ + public interface IAmqpSourceSettings + { + ReceiverLink GetReceiverLink(); + int Credit { get; } + T Convert(Message message); + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSinkSettings.cs b/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSinkSettings.cs new file mode 100644 index 00000000..cb869fe3 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSinkSettings.cs @@ -0,0 +1,34 @@ +using Akka.IO; +using Amqp; +using Akka.Serialization; + +namespace Akka.Streams.Amqp.V1 +{ + + public class NamedQueueSinkSettings : IAmpqSinkSettings + { + private readonly Session session; + private readonly string linkName; + private readonly string queueName; + private readonly Serializer serializer; + + public NamedQueueSinkSettings( + Session session, + string linkName, + string queueName, + Serializer serializer) + { + this.session = session; + this.linkName = linkName; + this.queueName = queueName; + this.serializer = serializer; + } + + public byte[] GetBytes(T obj) + { + return serializer.ToBinary(obj); + } + + public SenderLink GetSenderLink() => new SenderLink(session, linkName, queueName); + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs b/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs new file mode 100644 index 00000000..f422ee92 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs @@ -0,0 +1,35 @@ +using Akka.Serialization; +using Amqp; + +namespace Akka.Streams.Amqp.V1 +{ + public class NamedQueueSourceSettings : IAmqpSourceSettings + { + private readonly Session session; + private readonly string linkName; + private readonly string queueName; + private readonly Serializer serializer; + + public NamedQueueSourceSettings( + Session session, + string linkName, + string queueName, + int credit, + Serializer serializer) + { + this.session = session; + this.linkName = linkName; + this.queueName = queueName; + Credit = credit; + this.serializer = serializer; + } + + public T Convert(Message message) { + var bString = message.GetBody(); + return serializer.FromBinary(bString); + } + + public int Credit { get; } + public ReceiverLink GetReceiverLink() => new ReceiverLink(session, linkName, queueName); + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/Util/AttributesExtension.cs b/Amqp.V1/Akka.Streams.Amqp.V1/Util/AttributesExtension.cs new file mode 100644 index 00000000..a8eebf24 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/Util/AttributesExtension.cs @@ -0,0 +1,13 @@ +using Akka.Streams.Supervision; + +namespace Akka.Streams.Amqp.V1.Util +{ + public static class AttributesExtensions + { + public static Decider GetDeciderOrDefault(this Attributes attributes) + { + var attr = attributes.GetAttribute(null); + return attr != null ? attr.Decider : Deciders.StoppingDecider; + } + } +} diff --git a/Amqp.V1/Amqp.V1.sln b/Amqp.V1/Amqp.V1.sln new file mode 100644 index 00000000..88398e25 --- /dev/null +++ b/Amqp.V1/Amqp.V1.sln @@ -0,0 +1,51 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.26124.0 +MinimumVisualStudioVersion = 15.0.26124.0 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Amqp.V1", "Akka.Streams.Amqp.V1\Akka.Streams.Amqp.V1.csproj", "{25B384BF-908C-45B2-B709-DD3D5C22A549}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Amqp.V1.Tests", "Akka.Streams.Amqp.V1.Tests\Akka.Streams.Amqp.V1.Tests.csproj", "{2C9B879B-F28E-41B0-A742-D57A043926A8}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 + Release|Any CPU = Release|Any CPU + Release|x64 = Release|x64 + Release|x86 = Release|x86 + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Debug|Any CPU.Build.0 = Debug|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Debug|x64.ActiveCfg = Debug|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Debug|x64.Build.0 = Debug|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Debug|x86.ActiveCfg = Debug|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Debug|x86.Build.0 = Debug|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Release|Any CPU.ActiveCfg = Release|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Release|Any CPU.Build.0 = Release|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Release|x64.ActiveCfg = Release|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Release|x64.Build.0 = Release|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Release|x86.ActiveCfg = Release|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Release|x86.Build.0 = Release|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Debug|x64.ActiveCfg = Debug|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Debug|x64.Build.0 = Debug|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Debug|x86.ActiveCfg = Debug|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Debug|x86.Build.0 = Debug|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Release|Any CPU.Build.0 = Release|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Release|x64.ActiveCfg = Release|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Release|x64.Build.0 = Release|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Release|x86.ActiveCfg = Release|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Release|x86.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {E6426005-29FF-454F-AE5E-0ABB3DCFEF9A} + EndGlobalSection +EndGlobal From a595b720efdabeca4bb95fa8e050bf65f573d8b0 Mon Sep 17 00:00:00 2001 From: Unknown Date: Wed, 18 Jul 2018 11:04:26 +0200 Subject: [PATCH 2/7] Changes made after review --- .../AmqpConnectorsTest.cs | 15 ++-- .../SimpleAmqpTest.cs | 35 +++++++++ Amqp.V1/Akka.Streams.Amqp.V1/AmqpSinkStage.cs | 22 ++---- .../Akka.Streams.Amqp.V1/AmqpSourceStage.cs | 78 +++++++++++++------ Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSink.cs | 4 +- .../Akka.Streams.Amqp.V1/Dsl/AmpqSource.cs | 4 +- .../NamedQueueSourceSettings.cs | 3 +- .../Util/AttributesExtension.cs | 2 +- 8 files changed, 109 insertions(+), 54 deletions(-) create mode 100644 Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs diff --git a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs index c6b738a3..5eb6d98d 100644 --- a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs +++ b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs @@ -16,10 +16,8 @@ public class AmqpConnectorsTest : Akka.TestKit.Xunit2.TestKit public AmqpConnectorsTest() { materializer = ActorMaterializer.Create(Sys); - var serialization = Sys.Serialization; - - serializer = serialization.FindSerializerForType(typeof(string)); - address = new Address("amqp://guest:guest@localhost:5673"); + serializer = Sys.Serialization.FindSerializerForType(typeof(string)); + address = new Address("amqp://guest:guest@localhost:5672"); } [Fact] @@ -30,11 +28,11 @@ public void Publish_and_consume_elements_through_a_simple_queue_again_in_the_sam var queueName = "q1"; var senderlinkName = "amqp-conn-test-sender"; - var receiverlinkName = "amqp-conn-test-sender"; + var receiverlinkName = "amqp-conn-test-receiver"; //create sink and source - var amqpSink = AmpqSink.Create(new NamedQueueSinkSettings(session, senderlinkName, queueName, serializer)); - var amqpSource = AmpqSource.Create(new NamedQueueSourceSettings(session, receiverlinkName, queueName, 200, serializer)); + var amqpSink = AmpqSink.Create(new NamedQueueSinkSettings(session, senderlinkName, queueName, serializer)); + var amqpSource = AmpqSource.Create(new NamedQueueSourceSettings(session, receiverlinkName, queueName, 200, serializer)); //run sink var input = new[] { "one", "two", "three", "four", "five" }; @@ -45,7 +43,8 @@ public void Publish_and_consume_elements_through_a_simple_queue_again_in_the_sam .Take(input.Length) .RunWith(Sink.Seq(), materializer); - result.Wait(TimeSpan.FromSeconds(3)); + result.Wait(TimeSpan.FromSeconds(30)); + Assert.True(result.IsCompleted); Assert.Equal(input, result.Result); session.Close(); diff --git a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs new file mode 100644 index 00000000..dc036576 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs @@ -0,0 +1,35 @@ +using Amqp; +using Amqp.Framing; +using Amqp.Types; +using System.Threading.Tasks; +using Xunit; + +namespace Akka.Streams.Amqp.V1.Tests +{ + public class SimpleAmqpTest + { + [Fact] + public async Task TestHelloWorld() + { + //strange, works using regular activeMQ and the amqp test broker from here: http://azure.github.io/amqpnetlite/articles/hello_amqp.html + //but this does not work in ActiveMQ Artemis + Address address = new Address("amqp://guest:guest@localhost:5672"); + Connection connection = await Connection.Factory.CreateAsync(address); + Session session = new Session(connection); + + Message message = new Message("Hello AMQP"); + + SenderLink sender = new SenderLink(session, "sender-link", "q1"); + await sender.SendAsync(message); + + ReceiverLink receiver = new ReceiverLink(session, "receiver-link", "q1"); + message = await receiver.ReceiveAsync(); + receiver.Accept(message); + + await sender.CloseAsync(); + await receiver.CloseAsync(); + await session.CloseAsync(); + await connection.CloseAsync(); + } + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSinkStage.cs b/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSinkStage.cs index 04eabbae..49c44fd2 100644 --- a/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSinkStage.cs +++ b/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSinkStage.cs @@ -1,16 +1,13 @@ using Akka.Streams.Stage; using Amqp; -using System; -using System.Diagnostics; using System.Threading.Tasks; namespace Akka.Streams.Amqp.V1 { public sealed class AmqpSinkStage : GraphStageWithMaterializedValue, Task> { - public readonly Inlet In = new Inlet("AmqpSink.in"); - public override SinkShape Shape => new SinkShape(In); - + public Inlet In { get; } + public override SinkShape Shape { get; } public IAmpqSinkSettings AmpqSourceSettings { get; } public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes) @@ -22,6 +19,8 @@ public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue public AmqpSinkStage(IAmpqSinkSettings ampqSourceSettings) { + In = new Inlet("AmqpSink.in"); + Shape = new SinkShape(In); AmpqSourceSettings = ampqSourceSettings; } @@ -39,15 +38,9 @@ public AmqpSinkStageLogic(AmqpSinkStage amqpSinkStage, TaskCompletionSource { - try - { - var elem = Grab(stage.In); - sender.Send(new Message(amqpSinkStage.AmpqSourceSettings.GetBytes(elem))); - Pull(stage.In); - } catch (Exception e) - { - Debugger.Break(); - } + var elem = Grab(stage.In); + sender.Send(new Message(amqpSinkStage.AmpqSourceSettings.GetBytes(elem))); + Pull(stage.In); }, onUpstreamFinish: () => promise.SetResult(Done.Instance), onUpstreamFailure: ex => promise.SetException(ex) @@ -62,7 +55,6 @@ public override void PreStart() public override void PostStop() { - promise.TrySetException(new ApplicationException("stage stopped unexpectedly")); sender.Close(); base.PostStop(); } diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSourceStage.cs b/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSourceStage.cs index b55acab5..a204c74e 100644 --- a/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSourceStage.cs +++ b/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSourceStage.cs @@ -5,29 +5,31 @@ using Amqp.Framing; using Amqp.Types; using System; +using System.Collections.Generic; namespace Akka.Streams.Amqp.V1 { public sealed class AmqpSourceStage : GraphStage> { + public override SourceShape Shape { get; } + public Outlet Out { get; } + public IAmqpSourceSettings AmqpSourceSettings { get; } + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new AmqpSourceStageLogic(this, inheritedAttributes); + public AmqpSourceStage(IAmqpSourceSettings amqpSourceSettings) { + Out = new Outlet("AmqpSource.Out"); + Shape = new SourceShape(Out); AmqpSourceSettings = amqpSourceSettings; } - public override SourceShape Shape => new SourceShape(Out); - - public Outlet Out { get; } = new Outlet("AmqpSource.Out"); - public IAmqpSourceSettings AmqpSourceSettings { get; } - - protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new AmqpSourceStageLogic(this, inheritedAttributes); - private class AmqpSourceStageLogic : GraphStageLogic { private readonly Outlet outlet; private readonly IAmqpSourceSettings ampqSourceSettings; private readonly ReceiverLink receiver; private readonly Decider decider; + private readonly Queue queue = new Queue(); public AmqpSourceStageLogic(AmqpSourceStage stage, Attributes attributes) : base(stage.Shape) { @@ -36,32 +38,58 @@ public AmqpSourceStageLogic(AmqpSourceStage stage, Attributes attributes) : b receiver = stage.AmqpSourceSettings.GetReceiverLink(); decider = attributes.GetDeciderOrDefault(); - SetHandler(outlet, () => { - var message = receiver.Receive(); - if (message != null) + SetHandler(outlet, () => + { + if (queue.TryDequeue(out Message msg)) { - try - { - Push(outlet, ampqSourceSettings.Convert(message)); - receiver.Accept(message); - } - catch (Exception e) - { - receiver.Reject(message, new Error(new Symbol(e.Message))); - if (decider(e) == Directive.Stop) - { - FailStage(e); - } - } - + PushMessage(msg); } }, onDownstreamFinish: CompleteStage); } + public override void PreStart() + { + base.PreStart(); + + var consumerCallback = GetAsyncCallback(HandleDelivery); + receiver.Start(ampqSourceSettings.Credit, (_, m) => consumerCallback.Invoke(m)); + } + + private void HandleDelivery(Message message) + { + queue.Enqueue(message); + //as callback could be called concurrently try to dequeue + //a pull can be waiting for the message + if (IsAvailable(outlet) && queue.TryDequeue(out Message msg)) + { + PushMessage(msg); + } + } + + private void PushMessage(Message message) + { + T obj = default(T); + try + { + obj = ampqSourceSettings.Convert(message); + receiver.Accept(message); + } + catch (Exception e) + { + if (decider(e) == Directive.Stop) + { + receiver.Reject(message, new Error(new Symbol(e.Message))); + FailStage(e); + } + return; + } + Push(outlet, obj); + } + public override void PostStop() { - base.PostStop(); receiver.Close(); + base.PostStop(); } } } diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSink.cs b/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSink.cs index 89d45055..b946f020 100644 --- a/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSink.cs +++ b/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSink.cs @@ -3,9 +3,9 @@ namespace Akka.Streams.Amqp.V1.Dsl { - public static class AmpqSink + public static class AmpqSink { - public static Sink Create(IAmpqSinkSettings sourceSettings) + public static Sink Create(IAmpqSinkSettings sourceSettings) { return Sink.FromGraph(new AmqpSinkStage(sourceSettings)); } diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSource.cs b/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSource.cs index 31d4da0f..d3480f5d 100644 --- a/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSource.cs +++ b/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSource.cs @@ -2,9 +2,9 @@ namespace Akka.Streams.Amqp.V1.Dsl { - public static class AmpqSource + public static class AmpqSource { - public static Source Create(IAmqpSourceSettings sourceSettings) + public static Source Create(IAmqpSourceSettings sourceSettings) { return Source.FromGraph(new AmqpSourceStage(sourceSettings)); } diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs b/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs index f422ee92..c0b48e50 100644 --- a/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs +++ b/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs @@ -24,7 +24,8 @@ public NamedQueueSourceSettings( this.serializer = serializer; } - public T Convert(Message message) { + public T Convert(Message message) + { var bString = message.GetBody(); return serializer.FromBinary(bString); } diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/Util/AttributesExtension.cs b/Amqp.V1/Akka.Streams.Amqp.V1/Util/AttributesExtension.cs index a8eebf24..1b47ddee 100644 --- a/Amqp.V1/Akka.Streams.Amqp.V1/Util/AttributesExtension.cs +++ b/Amqp.V1/Akka.Streams.Amqp.V1/Util/AttributesExtension.cs @@ -7,7 +7,7 @@ public static class AttributesExtensions public static Decider GetDeciderOrDefault(this Attributes attributes) { var attr = attributes.GetAttribute(null); - return attr != null ? attr.Decider : Deciders.StoppingDecider; + return attr?.Decider ?? Deciders.StoppingDecider; } } } From 4f0d74340d86f4a6bc0008093252664837a17790 Mon Sep 17 00:00:00 2001 From: Unknown Date: Wed, 18 Jul 2018 13:56:42 +0200 Subject: [PATCH 3/7] Added support for ActiveMQ Artemis queues. --- .../AmqpConnectorsTest.cs | 2 +- .../SimpleAmqpTest.cs | 22 ++++++++++++++----- .../NamedQueueSinkSettings.cs | 9 ++++++-- .../NamedQueueSourceSettings.cs | 8 ++++++- 4 files changed, 32 insertions(+), 9 deletions(-) diff --git a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs index 5eb6d98d..5f19445f 100644 --- a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs +++ b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs @@ -26,7 +26,7 @@ public void Publish_and_consume_elements_through_a_simple_queue_again_in_the_sam Connection connection = new Connection(address); Session session = new Session(connection); - var queueName = "q1"; + var queueName = "simple-queue-test" + Guid.NewGuid(); var senderlinkName = "amqp-conn-test-sender"; var receiverlinkName = "amqp-conn-test-receiver"; diff --git a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs index dc036576..36534b94 100644 --- a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs +++ b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs @@ -16,13 +16,25 @@ public async Task TestHelloWorld() Address address = new Address("amqp://guest:guest@localhost:5672"); Connection connection = await Connection.Factory.CreateAsync(address); Session session = new Session(connection); - + Message message = new Message("Hello AMQP"); - - SenderLink sender = new SenderLink(session, "sender-link", "q1"); - await sender.SendAsync(message); - ReceiverLink receiver = new ReceiverLink(session, "receiver-link", "q1"); + Target target = new Target + { + Address = "q1", + Capabilities = new Symbol[] { new Symbol("queue") } + }; + + SenderLink sender = new SenderLink(session, "sender-link", target, null); + sender.Send(message); + + Source source = new Source + { + Address = "q1", + Capabilities = new Symbol[] { new Symbol("queue") } + }; + + ReceiverLink receiver = new ReceiverLink(session, "receiver-link", source, null); message = await receiver.ReceiveAsync(); receiver.Accept(message); diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSinkSettings.cs b/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSinkSettings.cs index cb869fe3..d4ba1d95 100644 --- a/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSinkSettings.cs +++ b/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSinkSettings.cs @@ -1,10 +1,11 @@ using Akka.IO; using Amqp; using Akka.Serialization; +using Amqp.Framing; +using Amqp.Types; namespace Akka.Streams.Amqp.V1 { - public class NamedQueueSinkSettings : IAmpqSinkSettings { private readonly Session session; @@ -29,6 +30,10 @@ public byte[] GetBytes(T obj) return serializer.ToBinary(obj); } - public SenderLink GetSenderLink() => new SenderLink(session, linkName, queueName); + public SenderLink GetSenderLink() => new SenderLink(session, linkName, new Target + { + Address = queueName, + Capabilities = new Symbol[] { new Symbol("queue") } + }, null); } } diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs b/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs index c0b48e50..3a772dd6 100644 --- a/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs +++ b/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs @@ -1,5 +1,7 @@ using Akka.Serialization; using Amqp; +using Amqp.Framing; +using Amqp.Types; namespace Akka.Streams.Amqp.V1 { @@ -31,6 +33,10 @@ public T Convert(Message message) } public int Credit { get; } - public ReceiverLink GetReceiverLink() => new ReceiverLink(session, linkName, queueName); + public ReceiverLink GetReceiverLink() => new ReceiverLink(session, linkName, new Source + { + Address = queueName, + Capabilities = new Symbol[] { new Symbol("queue") } + }, null); } } From e8dadff2af65a64191c03cbe2d2f3a818c25cf6d Mon Sep 17 00:00:00 2001 From: Unknown Date: Tue, 24 Jul 2018 19:39:41 +0200 Subject: [PATCH 4/7] Made caal sync again in test. --- Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs index 36534b94..d3ce2e5b 100644 --- a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs +++ b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs @@ -26,7 +26,7 @@ public async Task TestHelloWorld() }; SenderLink sender = new SenderLink(session, "sender-link", target, null); - sender.Send(message); + await sender.SendAsync(message); Source source = new Source { From 44e63db4f1bf9760bfe2ac3da852c21a6a2289a3 Mon Sep 17 00:00:00 2001 From: Unknown Date: Mon, 13 Aug 2018 22:26:00 +0200 Subject: [PATCH 5/7] Updates interface, so no switch statement is needed to perform actions Added an event handler to the subscription to raise event when topic eof reached --- .../Akka.Streams.Kafka.csproj | 6 +-- .../Settings/Subscriptions.cs | 52 +++++++++++++++---- .../Stages/ConsumerStage.cs | 29 ++++++----- 3 files changed, 63 insertions(+), 24 deletions(-) diff --git a/Kafka/src/Akka.Streams.Kafka/Akka.Streams.Kafka.csproj b/Kafka/src/Akka.Streams.Kafka/Akka.Streams.Kafka.csproj index 34ee8883..00b287fb 100644 --- a/Kafka/src/Akka.Streams.Kafka/Akka.Streams.Kafka.csproj +++ b/Kafka/src/Akka.Streams.Kafka/Akka.Streams.Kafka.csproj @@ -1,4 +1,4 @@ - + Akka.Streams.Kafka @@ -22,9 +22,9 @@ - + - + diff --git a/Kafka/src/Akka.Streams.Kafka/Settings/Subscriptions.cs b/Kafka/src/Akka.Streams.Kafka/Settings/Subscriptions.cs index 4dfe9b21..4c9aa3f1 100644 --- a/Kafka/src/Akka.Streams.Kafka/Settings/Subscriptions.cs +++ b/Kafka/src/Akka.Streams.Kafka/Settings/Subscriptions.cs @@ -3,11 +3,28 @@ namespace Akka.Streams.Kafka.Settings { - public interface ISubscription { } - public interface IManualSubscription : ISubscription { } - public interface IAutoSubscription : ISubscription { } + public delegate void TopicPartitionEofReachedEventHandler(TopicPartitionOffset topicPartitionOffset); - internal sealed class TopicSubscription : IAutoSubscription + public interface ISubscription { + void AssignConsumer(IConsumer consumer); + + void InvokeTopicPartitionEofReached(TopicPartitionOffset topicPartitionOffset); + event TopicPartitionEofReachedEventHandler TopicPartitionEofReached; + } + + public abstract class Subscription : ISubscription + { + public event TopicPartitionEofReachedEventHandler TopicPartitionEofReached; + + public abstract void AssignConsumer(IConsumer consumer); + + public void InvokeTopicPartitionEofReached(TopicPartitionOffset topicPartitionOffset) + { + TopicPartitionEofReached?.Invoke(topicPartitionOffset); + } + } + + internal sealed class TopicSubscription : Subscription { public TopicSubscription(IImmutableSet topics) { @@ -15,9 +32,16 @@ public TopicSubscription(IImmutableSet topics) } public IImmutableSet Topics { get; } + + + + public override void AssignConsumer(IConsumer consumer) + { + consumer.Subscribe(Topics); + } } - internal sealed class Assignment : IManualSubscription + internal sealed class Assignment : Subscription { public Assignment(IImmutableSet topicPartitions) { @@ -25,9 +49,14 @@ public Assignment(IImmutableSet topicPartitions) } public IImmutableSet TopicPartitions { get; } + + public override void AssignConsumer(IConsumer consumer) + { + consumer.Assign(TopicPartitions); + } } - internal sealed class AssignmentWithOffset : IManualSubscription + internal sealed class AssignmentWithOffset : Subscription { public AssignmentWithOffset(IImmutableSet topicPartitions) { @@ -35,17 +64,22 @@ public AssignmentWithOffset(IImmutableSet topicPartitions) } public IImmutableSet TopicPartitions { get; } + + public override void AssignConsumer(IConsumer consumer) + { + consumer.Assign(TopicPartitions); + } } public static class Subscriptions { - public static IAutoSubscription Topics(params string[] topics) => + public static ISubscription Topics(params string[] topics) => new TopicSubscription(topics.ToImmutableHashSet()); - public static IManualSubscription Assignment(params TopicPartition[] topicPartitions) => + public static ISubscription Assignment(params TopicPartition[] topicPartitions) => new Assignment(topicPartitions.ToImmutableHashSet()); - public static IManualSubscription AssignmentWithOffset(params TopicPartitionOffset[] topicPartitions) => + public static ISubscription AssignmentWithOffset(params TopicPartitionOffset[] topicPartitions) => new AssignmentWithOffset(topicPartitions.ToImmutableHashSet()); } } \ No newline at end of file diff --git a/Kafka/src/Akka.Streams.Kafka/Stages/ConsumerStage.cs b/Kafka/src/Akka.Streams.Kafka/Stages/ConsumerStage.cs index 67980610..0ca3bf1c 100644 --- a/Kafka/src/Akka.Streams.Kafka/Stages/ConsumerStage.cs +++ b/Kafka/src/Akka.Streams.Kafka/Stages/ConsumerStage.cs @@ -43,6 +43,8 @@ internal class KafkaSourceStageLogic : TimerGraphStageLogic private Action> _messagesReceived; private Action> _partitionsAssigned; private Action> _partitionsRevoked; + private Action _partitionEof; + private readonly Decider _decider; private const string TimerKey = "PollTimer"; @@ -94,23 +96,14 @@ public override void PreStart() _consumer.OnError += HandleOnError; _consumer.OnPartitionsAssigned += HandleOnPartitionsAssigned; _consumer.OnPartitionsRevoked += HandleOnPartitionsRevoked; + _consumer.OnPartitionEOF += HandleOnPartitionEOF; - switch (_subscription) - { - case TopicSubscription ts: - _consumer.Subscribe(ts.Topics); - break; - case Assignment a: - _consumer.Assign(a.TopicPartitions); - break; - case AssignmentWithOffset awo: - _consumer.Assign(awo.TopicPartitions); - break; - } + _subscription.AssignConsumer(_consumer); _messagesReceived = GetAsyncCallback>(MessagesReceived); _partitionsAssigned = GetAsyncCallback>(PartitionsAssigned); _partitionsRevoked = GetAsyncCallback>(PartitionsRevoked); + _partitionEof = GetAsyncCallback(PartitionEOF); ScheduleRepeatedly(TimerKey, _settings.PollInterval); } @@ -121,6 +114,7 @@ public override void PostStop() _consumer.OnError -= HandleOnError; _consumer.OnPartitionsAssigned -= HandleOnPartitionsAssigned; _consumer.OnPartitionsRevoked -= HandleOnPartitionsRevoked; + _consumer.OnPartitionEOF -= HandleOnPartitionEOF; Log.Debug($"Consumer stopped: {_consumer.Name}"); _consumer.Dispose(); @@ -175,6 +169,11 @@ private void HandleOnPartitionsRevoked(object sender, List list) _partitionsRevoked(list); } + private void HandleOnPartitionEOF(object sender, TopicPartitionOffset tpo) + { + _partitionEof(tpo); + } + // // Async callbacks // @@ -202,6 +201,12 @@ private void PartitionsRevoked(IEnumerable partitions) _assignedPartitions = null; } + private void PartitionEOF(TopicPartitionOffset obj) + { + Log.Debug($"Partition EOF triggered: {_consumer.Name}"); + _subscription.InvokeTopicPartitionEofReached(obj); + } + private void PullQueue() { _consumer.Poll(_settings.PollTimeout); From 73c359d1a2d4e42a8f757d379db2cf6562c4e658 Mon Sep 17 00:00:00 2001 From: Unknown Date: Mon, 13 Aug 2018 22:26:00 +0200 Subject: [PATCH 6/7] Revert "Updates interface, so no switch statement is needed to perform actions" This reverts commit 44e63db4f1bf9760bfe2ac3da852c21a6a2289a3. --- .../Akka.Streams.Kafka.csproj | 6 +-- .../Settings/Subscriptions.cs | 52 ++++--------------- .../Stages/ConsumerStage.cs | 29 +++++------ 3 files changed, 24 insertions(+), 63 deletions(-) diff --git a/Kafka/src/Akka.Streams.Kafka/Akka.Streams.Kafka.csproj b/Kafka/src/Akka.Streams.Kafka/Akka.Streams.Kafka.csproj index 00b287fb..34ee8883 100644 --- a/Kafka/src/Akka.Streams.Kafka/Akka.Streams.Kafka.csproj +++ b/Kafka/src/Akka.Streams.Kafka/Akka.Streams.Kafka.csproj @@ -1,4 +1,4 @@ - + Akka.Streams.Kafka @@ -22,9 +22,9 @@ - + - + diff --git a/Kafka/src/Akka.Streams.Kafka/Settings/Subscriptions.cs b/Kafka/src/Akka.Streams.Kafka/Settings/Subscriptions.cs index 4c9aa3f1..4dfe9b21 100644 --- a/Kafka/src/Akka.Streams.Kafka/Settings/Subscriptions.cs +++ b/Kafka/src/Akka.Streams.Kafka/Settings/Subscriptions.cs @@ -3,28 +3,11 @@ namespace Akka.Streams.Kafka.Settings { - public delegate void TopicPartitionEofReachedEventHandler(TopicPartitionOffset topicPartitionOffset); + public interface ISubscription { } + public interface IManualSubscription : ISubscription { } + public interface IAutoSubscription : ISubscription { } - public interface ISubscription { - void AssignConsumer(IConsumer consumer); - - void InvokeTopicPartitionEofReached(TopicPartitionOffset topicPartitionOffset); - event TopicPartitionEofReachedEventHandler TopicPartitionEofReached; - } - - public abstract class Subscription : ISubscription - { - public event TopicPartitionEofReachedEventHandler TopicPartitionEofReached; - - public abstract void AssignConsumer(IConsumer consumer); - - public void InvokeTopicPartitionEofReached(TopicPartitionOffset topicPartitionOffset) - { - TopicPartitionEofReached?.Invoke(topicPartitionOffset); - } - } - - internal sealed class TopicSubscription : Subscription + internal sealed class TopicSubscription : IAutoSubscription { public TopicSubscription(IImmutableSet topics) { @@ -32,16 +15,9 @@ public TopicSubscription(IImmutableSet topics) } public IImmutableSet Topics { get; } - - - - public override void AssignConsumer(IConsumer consumer) - { - consumer.Subscribe(Topics); - } } - internal sealed class Assignment : Subscription + internal sealed class Assignment : IManualSubscription { public Assignment(IImmutableSet topicPartitions) { @@ -49,14 +25,9 @@ public Assignment(IImmutableSet topicPartitions) } public IImmutableSet TopicPartitions { get; } - - public override void AssignConsumer(IConsumer consumer) - { - consumer.Assign(TopicPartitions); - } } - internal sealed class AssignmentWithOffset : Subscription + internal sealed class AssignmentWithOffset : IManualSubscription { public AssignmentWithOffset(IImmutableSet topicPartitions) { @@ -64,22 +35,17 @@ public AssignmentWithOffset(IImmutableSet topicPartitions) } public IImmutableSet TopicPartitions { get; } - - public override void AssignConsumer(IConsumer consumer) - { - consumer.Assign(TopicPartitions); - } } public static class Subscriptions { - public static ISubscription Topics(params string[] topics) => + public static IAutoSubscription Topics(params string[] topics) => new TopicSubscription(topics.ToImmutableHashSet()); - public static ISubscription Assignment(params TopicPartition[] topicPartitions) => + public static IManualSubscription Assignment(params TopicPartition[] topicPartitions) => new Assignment(topicPartitions.ToImmutableHashSet()); - public static ISubscription AssignmentWithOffset(params TopicPartitionOffset[] topicPartitions) => + public static IManualSubscription AssignmentWithOffset(params TopicPartitionOffset[] topicPartitions) => new AssignmentWithOffset(topicPartitions.ToImmutableHashSet()); } } \ No newline at end of file diff --git a/Kafka/src/Akka.Streams.Kafka/Stages/ConsumerStage.cs b/Kafka/src/Akka.Streams.Kafka/Stages/ConsumerStage.cs index 0ca3bf1c..67980610 100644 --- a/Kafka/src/Akka.Streams.Kafka/Stages/ConsumerStage.cs +++ b/Kafka/src/Akka.Streams.Kafka/Stages/ConsumerStage.cs @@ -43,8 +43,6 @@ internal class KafkaSourceStageLogic : TimerGraphStageLogic private Action> _messagesReceived; private Action> _partitionsAssigned; private Action> _partitionsRevoked; - private Action _partitionEof; - private readonly Decider _decider; private const string TimerKey = "PollTimer"; @@ -96,14 +94,23 @@ public override void PreStart() _consumer.OnError += HandleOnError; _consumer.OnPartitionsAssigned += HandleOnPartitionsAssigned; _consumer.OnPartitionsRevoked += HandleOnPartitionsRevoked; - _consumer.OnPartitionEOF += HandleOnPartitionEOF; - _subscription.AssignConsumer(_consumer); + switch (_subscription) + { + case TopicSubscription ts: + _consumer.Subscribe(ts.Topics); + break; + case Assignment a: + _consumer.Assign(a.TopicPartitions); + break; + case AssignmentWithOffset awo: + _consumer.Assign(awo.TopicPartitions); + break; + } _messagesReceived = GetAsyncCallback>(MessagesReceived); _partitionsAssigned = GetAsyncCallback>(PartitionsAssigned); _partitionsRevoked = GetAsyncCallback>(PartitionsRevoked); - _partitionEof = GetAsyncCallback(PartitionEOF); ScheduleRepeatedly(TimerKey, _settings.PollInterval); } @@ -114,7 +121,6 @@ public override void PostStop() _consumer.OnError -= HandleOnError; _consumer.OnPartitionsAssigned -= HandleOnPartitionsAssigned; _consumer.OnPartitionsRevoked -= HandleOnPartitionsRevoked; - _consumer.OnPartitionEOF -= HandleOnPartitionEOF; Log.Debug($"Consumer stopped: {_consumer.Name}"); _consumer.Dispose(); @@ -169,11 +175,6 @@ private void HandleOnPartitionsRevoked(object sender, List list) _partitionsRevoked(list); } - private void HandleOnPartitionEOF(object sender, TopicPartitionOffset tpo) - { - _partitionEof(tpo); - } - // // Async callbacks // @@ -201,12 +202,6 @@ private void PartitionsRevoked(IEnumerable partitions) _assignedPartitions = null; } - private void PartitionEOF(TopicPartitionOffset obj) - { - Log.Debug($"Partition EOF triggered: {_consumer.Name}"); - _subscription.InvokeTopicPartitionEofReached(obj); - } - private void PullQueue() { _consumer.Poll(_settings.PollTimeout); From 4eaaad217d0c2c40eb4979a2df678b313e41988e Mon Sep 17 00:00:00 2001 From: Unknown Date: Wed, 19 Sep 2018 19:26:27 +0200 Subject: [PATCH 7/7] Updated NuGet packget --- .../Akka.Streams.Amqp.V1.Tests.csproj | 6 +++--- Amqp.V1/Akka.Streams.Amqp.V1/Akka.Streams.Amqp.V1.csproj | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/Akka.Streams.Amqp.V1.Tests.csproj b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/Akka.Streams.Amqp.V1.Tests.csproj index 765b68b6..19c87b67 100644 --- a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/Akka.Streams.Amqp.V1.Tests.csproj +++ b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/Akka.Streams.Amqp.V1.Tests.csproj @@ -7,9 +7,9 @@ - - - + + + diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/Akka.Streams.Amqp.V1.csproj b/Amqp.V1/Akka.Streams.Amqp.V1/Akka.Streams.Amqp.V1.csproj index e65c5f1e..d80410e1 100644 --- a/Amqp.V1/Akka.Streams.Amqp.V1/Akka.Streams.Amqp.V1.csproj +++ b/Amqp.V1/Akka.Streams.Amqp.V1/Akka.Streams.Amqp.V1.csproj @@ -5,7 +5,7 @@ - +