diff --git a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/Akka.Streams.Amqp.V1.Tests.csproj b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/Akka.Streams.Amqp.V1.Tests.csproj new file mode 100644 index 00000000..19c87b67 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/Akka.Streams.Amqp.V1.Tests.csproj @@ -0,0 +1,23 @@ + + + + netcoreapp2.0 + + false + + + + + + + + + + + + + + + + + diff --git a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs new file mode 100644 index 00000000..5f19445f --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs @@ -0,0 +1,54 @@ +using Akka.Serialization; +using Akka.Streams.Amqp.V1.Dsl; +using Akka.Streams.Dsl; +using Amqp; +using System; +using Xunit; + +namespace Akka.Streams.Amqp.V1.Tests +{ + public class AmqpConnectorsTest : Akka.TestKit.Xunit2.TestKit + { + private readonly Serializer serializer; + private readonly Address address; + private readonly ActorMaterializer materializer; + + public AmqpConnectorsTest() + { + materializer = ActorMaterializer.Create(Sys); + serializer = Sys.Serialization.FindSerializerForType(typeof(string)); + address = new Address("amqp://guest:guest@localhost:5672"); + } + + [Fact] + public void Publish_and_consume_elements_through_a_simple_queue_again_in_the_same_process() + { + Connection connection = new Connection(address); + Session session = new Session(connection); + + var queueName = "simple-queue-test" + Guid.NewGuid(); + var senderlinkName = "amqp-conn-test-sender"; + var receiverlinkName = "amqp-conn-test-receiver"; + + //create sink and source + var amqpSink = AmpqSink.Create(new NamedQueueSinkSettings(session, senderlinkName, queueName, serializer)); + var amqpSource = AmpqSource.Create(new NamedQueueSourceSettings(session, receiverlinkName, queueName, 200, serializer)); + + //run sink + var input = new[] { "one", "two", "three", "four", "five" }; + Source.From(input).RunWith(amqpSink, materializer).Wait(); + + //run source + var result = amqpSource + .Take(input.Length) + .RunWith(Sink.Seq(), materializer); + + result.Wait(TimeSpan.FromSeconds(30)); + Assert.True(result.IsCompleted); + Assert.Equal(input, result.Result); + + session.Close(); + connection.Close(); + } + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs new file mode 100644 index 00000000..d3ce2e5b --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs @@ -0,0 +1,47 @@ +using Amqp; +using Amqp.Framing; +using Amqp.Types; +using System.Threading.Tasks; +using Xunit; + +namespace Akka.Streams.Amqp.V1.Tests +{ + public class SimpleAmqpTest + { + [Fact] + public async Task TestHelloWorld() + { + //strange, works using regular activeMQ and the amqp test broker from here: http://azure.github.io/amqpnetlite/articles/hello_amqp.html + //but this does not work in ActiveMQ Artemis + Address address = new Address("amqp://guest:guest@localhost:5672"); + Connection connection = await Connection.Factory.CreateAsync(address); + Session session = new Session(connection); + + Message message = new Message("Hello AMQP"); + + Target target = new Target + { + Address = "q1", + Capabilities = new Symbol[] { new Symbol("queue") } + }; + + SenderLink sender = new SenderLink(session, "sender-link", target, null); + await sender.SendAsync(message); + + Source source = new Source + { + Address = "q1", + Capabilities = new Symbol[] { new Symbol("queue") } + }; + + ReceiverLink receiver = new ReceiverLink(session, "receiver-link", source, null); + message = await receiver.ReceiveAsync(); + receiver.Accept(message); + + await sender.CloseAsync(); + await receiver.CloseAsync(); + await session.CloseAsync(); + await connection.CloseAsync(); + } + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/Akka.Streams.Amqp.V1.csproj b/Amqp.V1/Akka.Streams.Amqp.V1/Akka.Streams.Amqp.V1.csproj new file mode 100644 index 00000000..d80410e1 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/Akka.Streams.Amqp.V1.csproj @@ -0,0 +1,13 @@ + + + + netcoreapp2.0 + + + + + + + + + diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSinkStage.cs b/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSinkStage.cs new file mode 100644 index 00000000..49c44fd2 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSinkStage.cs @@ -0,0 +1,63 @@ +using Akka.Streams.Stage; +using Amqp; +using System.Threading.Tasks; + +namespace Akka.Streams.Amqp.V1 +{ + public sealed class AmqpSinkStage : GraphStageWithMaterializedValue, Task> + { + public Inlet In { get; } + public override SinkShape Shape { get; } + public IAmpqSinkSettings AmpqSourceSettings { get; } + + public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + { + var promise = new TaskCompletionSource(); + var logic = new AmqpSinkStageLogic(this, promise, Shape); + return new LogicAndMaterializedValue(logic, promise.Task); + } + + public AmqpSinkStage(IAmpqSinkSettings ampqSourceSettings) + { + In = new Inlet("AmqpSink.in"); + Shape = new SinkShape(In); + AmpqSourceSettings = ampqSourceSettings; + } + + private class AmqpSinkStageLogic : GraphStageLogic + { + private readonly AmqpSinkStage stage; + private readonly TaskCompletionSource promise; + private readonly SenderLink sender; + + public AmqpSinkStageLogic(AmqpSinkStage amqpSinkStage, TaskCompletionSource promise, SinkShape shape) : base(shape) + { + stage = amqpSinkStage; + this.promise = promise; + sender = amqpSinkStage.AmpqSourceSettings.GetSenderLink(); + + SetHandler(stage.In, () => + { + var elem = Grab(stage.In); + sender.Send(new Message(amqpSinkStage.AmpqSourceSettings.GetBytes(elem))); + Pull(stage.In); + }, + onUpstreamFinish: () => promise.SetResult(Done.Instance), + onUpstreamFailure: ex => promise.SetException(ex) + ); + } + + public override void PreStart() + { + base.PreStart(); + Pull(stage.In); + } + + public override void PostStop() + { + sender.Close(); + base.PostStop(); + } + } + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSourceStage.cs b/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSourceStage.cs new file mode 100644 index 00000000..a204c74e --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/AmqpSourceStage.cs @@ -0,0 +1,96 @@ +using Akka.Streams.Amqp.V1.Util; +using Akka.Streams.Stage; +using Akka.Streams.Supervision; +using Amqp; +using Amqp.Framing; +using Amqp.Types; +using System; +using System.Collections.Generic; + +namespace Akka.Streams.Amqp.V1 +{ + public sealed class AmqpSourceStage : GraphStage> + { + public override SourceShape Shape { get; } + public Outlet Out { get; } + public IAmqpSourceSettings AmqpSourceSettings { get; } + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new AmqpSourceStageLogic(this, inheritedAttributes); + + public AmqpSourceStage(IAmqpSourceSettings amqpSourceSettings) + { + Out = new Outlet("AmqpSource.Out"); + Shape = new SourceShape(Out); + AmqpSourceSettings = amqpSourceSettings; + } + + private class AmqpSourceStageLogic : GraphStageLogic + { + private readonly Outlet outlet; + private readonly IAmqpSourceSettings ampqSourceSettings; + private readonly ReceiverLink receiver; + private readonly Decider decider; + private readonly Queue queue = new Queue(); + + public AmqpSourceStageLogic(AmqpSourceStage stage, Attributes attributes) : base(stage.Shape) + { + outlet = stage.Out; + ampqSourceSettings = stage.AmqpSourceSettings; + receiver = stage.AmqpSourceSettings.GetReceiverLink(); + decider = attributes.GetDeciderOrDefault(); + + SetHandler(outlet, () => + { + if (queue.TryDequeue(out Message msg)) + { + PushMessage(msg); + } + }, onDownstreamFinish: CompleteStage); + } + + public override void PreStart() + { + base.PreStart(); + + var consumerCallback = GetAsyncCallback(HandleDelivery); + receiver.Start(ampqSourceSettings.Credit, (_, m) => consumerCallback.Invoke(m)); + } + + private void HandleDelivery(Message message) + { + queue.Enqueue(message); + //as callback could be called concurrently try to dequeue + //a pull can be waiting for the message + if (IsAvailable(outlet) && queue.TryDequeue(out Message msg)) + { + PushMessage(msg); + } + } + + private void PushMessage(Message message) + { + T obj = default(T); + try + { + obj = ampqSourceSettings.Convert(message); + receiver.Accept(message); + } + catch (Exception e) + { + if (decider(e) == Directive.Stop) + { + receiver.Reject(message, new Error(new Symbol(e.Message))); + FailStage(e); + } + return; + } + Push(outlet, obj); + } + + public override void PostStop() + { + receiver.Close(); + base.PostStop(); + } + } + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSink.cs b/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSink.cs new file mode 100644 index 00000000..b946f020 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSink.cs @@ -0,0 +1,13 @@ +using Akka.Streams.Dsl; +using System.Threading.Tasks; + +namespace Akka.Streams.Amqp.V1.Dsl +{ + public static class AmpqSink + { + public static Sink Create(IAmpqSinkSettings sourceSettings) + { + return Sink.FromGraph(new AmqpSinkStage(sourceSettings)); + } + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSource.cs b/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSource.cs new file mode 100644 index 00000000..d3480f5d --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSource.cs @@ -0,0 +1,12 @@ +using Akka.Streams.Dsl; + +namespace Akka.Streams.Amqp.V1.Dsl +{ + public static class AmpqSource + { + public static Source Create(IAmqpSourceSettings sourceSettings) + { + return Source.FromGraph(new AmqpSourceStage(sourceSettings)); + } + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/IAmpqSinkSettings.cs b/Amqp.V1/Akka.Streams.Amqp.V1/IAmpqSinkSettings.cs new file mode 100644 index 00000000..1933c300 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/IAmpqSinkSettings.cs @@ -0,0 +1,11 @@ +using Akka.IO; +using Amqp; + +namespace Akka.Streams.Amqp.V1 +{ + public interface IAmpqSinkSettings + { + SenderLink GetSenderLink(); + byte[] GetBytes(T obj); + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/IAmqpSourceSettings.cs b/Amqp.V1/Akka.Streams.Amqp.V1/IAmqpSourceSettings.cs new file mode 100644 index 00000000..65b33dce --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/IAmqpSourceSettings.cs @@ -0,0 +1,11 @@ +using Amqp; + +namespace Akka.Streams.Amqp.V1 +{ + public interface IAmqpSourceSettings + { + ReceiverLink GetReceiverLink(); + int Credit { get; } + T Convert(Message message); + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSinkSettings.cs b/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSinkSettings.cs new file mode 100644 index 00000000..d4ba1d95 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSinkSettings.cs @@ -0,0 +1,39 @@ +using Akka.IO; +using Amqp; +using Akka.Serialization; +using Amqp.Framing; +using Amqp.Types; + +namespace Akka.Streams.Amqp.V1 +{ + public class NamedQueueSinkSettings : IAmpqSinkSettings + { + private readonly Session session; + private readonly string linkName; + private readonly string queueName; + private readonly Serializer serializer; + + public NamedQueueSinkSettings( + Session session, + string linkName, + string queueName, + Serializer serializer) + { + this.session = session; + this.linkName = linkName; + this.queueName = queueName; + this.serializer = serializer; + } + + public byte[] GetBytes(T obj) + { + return serializer.ToBinary(obj); + } + + public SenderLink GetSenderLink() => new SenderLink(session, linkName, new Target + { + Address = queueName, + Capabilities = new Symbol[] { new Symbol("queue") } + }, null); + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs b/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs new file mode 100644 index 00000000..3a772dd6 --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSourceSettings.cs @@ -0,0 +1,42 @@ +using Akka.Serialization; +using Amqp; +using Amqp.Framing; +using Amqp.Types; + +namespace Akka.Streams.Amqp.V1 +{ + public class NamedQueueSourceSettings : IAmqpSourceSettings + { + private readonly Session session; + private readonly string linkName; + private readonly string queueName; + private readonly Serializer serializer; + + public NamedQueueSourceSettings( + Session session, + string linkName, + string queueName, + int credit, + Serializer serializer) + { + this.session = session; + this.linkName = linkName; + this.queueName = queueName; + Credit = credit; + this.serializer = serializer; + } + + public T Convert(Message message) + { + var bString = message.GetBody(); + return serializer.FromBinary(bString); + } + + public int Credit { get; } + public ReceiverLink GetReceiverLink() => new ReceiverLink(session, linkName, new Source + { + Address = queueName, + Capabilities = new Symbol[] { new Symbol("queue") } + }, null); + } +} diff --git a/Amqp.V1/Akka.Streams.Amqp.V1/Util/AttributesExtension.cs b/Amqp.V1/Akka.Streams.Amqp.V1/Util/AttributesExtension.cs new file mode 100644 index 00000000..1b47ddee --- /dev/null +++ b/Amqp.V1/Akka.Streams.Amqp.V1/Util/AttributesExtension.cs @@ -0,0 +1,13 @@ +using Akka.Streams.Supervision; + +namespace Akka.Streams.Amqp.V1.Util +{ + public static class AttributesExtensions + { + public static Decider GetDeciderOrDefault(this Attributes attributes) + { + var attr = attributes.GetAttribute(null); + return attr?.Decider ?? Deciders.StoppingDecider; + } + } +} diff --git a/Amqp.V1/Amqp.V1.sln b/Amqp.V1/Amqp.V1.sln new file mode 100644 index 00000000..88398e25 --- /dev/null +++ b/Amqp.V1/Amqp.V1.sln @@ -0,0 +1,51 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.26124.0 +MinimumVisualStudioVersion = 15.0.26124.0 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Amqp.V1", "Akka.Streams.Amqp.V1\Akka.Streams.Amqp.V1.csproj", "{25B384BF-908C-45B2-B709-DD3D5C22A549}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Amqp.V1.Tests", "Akka.Streams.Amqp.V1.Tests\Akka.Streams.Amqp.V1.Tests.csproj", "{2C9B879B-F28E-41B0-A742-D57A043926A8}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 + Release|Any CPU = Release|Any CPU + Release|x64 = Release|x64 + Release|x86 = Release|x86 + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Debug|Any CPU.Build.0 = Debug|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Debug|x64.ActiveCfg = Debug|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Debug|x64.Build.0 = Debug|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Debug|x86.ActiveCfg = Debug|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Debug|x86.Build.0 = Debug|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Release|Any CPU.ActiveCfg = Release|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Release|Any CPU.Build.0 = Release|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Release|x64.ActiveCfg = Release|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Release|x64.Build.0 = Release|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Release|x86.ActiveCfg = Release|Any CPU + {25B384BF-908C-45B2-B709-DD3D5C22A549}.Release|x86.Build.0 = Release|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Debug|x64.ActiveCfg = Debug|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Debug|x64.Build.0 = Debug|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Debug|x86.ActiveCfg = Debug|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Debug|x86.Build.0 = Debug|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Release|Any CPU.Build.0 = Release|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Release|x64.ActiveCfg = Release|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Release|x64.Build.0 = Release|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Release|x86.ActiveCfg = Release|Any CPU + {2C9B879B-F28E-41B0-A742-D57A043926A8}.Release|x86.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {E6426005-29FF-454F-AE5E-0ABB3DCFEF9A} + EndGlobalSection +EndGlobal