Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proper way to Complete the message #3

Closed
mikhailshilkov opened this issue Aug 22, 2016 · 8 comments
Closed

Proper way to Complete the message #3

mikhailshilkov opened this issue Aug 22, 2016 · 8 comments

Comments

@mikhailshilkov
Copy link

That's not an issue of the library, but more a question on how to use it properly.

What's the right point to Complete the service bus message? Ideally, I want to do that after all the processing is done, so it the end of the stream graph. Here is my example:

var source = ServiceBusSource.Create(client);
var getBody = Flow.Create<BrokeredMessage>().Select(x => Tuple.Create(x, x.GetBody<string>()));
var work = Flow.Create<Tuple<BrokeredMessage, string>>().Select(i =>
{
    Console.WriteLine($"Doing work: {i.Item2}");
    return i;
});
var sink = Sink.ForEach<Tuple<BrokeredMessage, string>>(i =>
{
    Console.WriteLine($"Complete: {i.Item2}");
    i.Item1.Complete();
}).MapMaterializedValue(_ => NotUsed.Instance);

var g = RunnableGraph.FromGraph(GraphDsl.Create(builder =>
{
    builder.From(source).Via(getBody).Via(work).To(sink);
    return ClosedShape.Instance;
}));
g.Run(mat);

It has 4 parts:

  1. Source of SB messages.
  2. Flow step to extract message body.
  3. Flow step to simulate some real time-consuming error-prone work done on the message.
  4. Sink to Complete the message.

As you see, I had to pass the initial SB message in a tuple all the way through the pipeline, which doesn't look nice. I would have to deal with tuples which can be problematic for large pipelines and makes the code more complicated.

Are there other approaches? E.g. have the Source or other independent Flow steps which could be signaled about the message processing success. Or should I ignore it altogether, Complete the message immediately and then rely on Akka to do error handling? That would assume that Akka will never fail, which is wrong for machine-level failures I guess.

Would be nice to get your opinion or a link to appropriate discussion.

@mikhailshilkov
Copy link
Author

Found a post from Scala/Rabit world, which probably means there's no easy solution for my issue:
http://tim.theenchanter.com/2015/07/the-need-for-acknowledgement-in-streams.html

@marcpiechura
Copy link
Owner

marcpiechura commented Aug 23, 2016

This topic is complicated and there isn't a general solution for Akka.Streams yet. You can read more about it in context of the Kafka connector from the jvm here and here.

A possible solution for you problem could be to use Broadcast in combination with ZipWith:

            var source = ServiceBusSource.Create(client);
            var getBody = Flow.Create<BrokeredMessage>().Select(x => x.GetBody<string>());
            var work = Flow.Create<string>().Select(i =>
            {
                Console.WriteLine($"Doing work: {i}");
                return i;
            });
            var sink = Sink.ForEach<BrokeredMessage>(i =>
            {
                Console.WriteLine($"Complete: {i}");
                i.Complete();
            }).MapMaterializedValue(_ => NotUsed.Instance);

            var g = RunnableGraph.FromGraph(GraphDsl.Create(builder =>
            {
                var broadcast = builder.Add(new Broadcast<BrokeredMessage>(2));
                var merge = builder.Add(new ZipWith<BrokeredMessage, string, BrokeredMessage>(Keep.Left));

                builder.From(source).To(broadcast);
                builder.From(broadcast).To(merge.In0);
                builder.From(broadcast).Via(getBody).Via(work).To(merge.In1);
                builder.From(merge.Out).To(sink);

                return ClosedShape.Instance;
            }));
            g.Run(mat);

But you must make sure that you doesn't use Restart or Resume strategy in your getBody or work flow, because this would deadlock the stream since Broadcast only sends a new item if both outputs signal demand and ZipWith only grabs elements from upstream if both inputs have an element.

Another approach would be to use a envelope instead of Tuple but that's of course only syntactic sugar.

@marcpiechura
Copy link
Owner

@mikhailshilkov there is a new blog post out about writing akka streams connectors. The AMQP connector ack's the messages as soon as they arrive, as you can see here.
I could also implement this behavior and make it configurable via parameter, would this fit you requirements?

@mikhailshilkov
Copy link
Author

If you make it configurable, that's perfectly fine of course.
For some critical queues I still want the ability to complete the messages after the whole pipeline is passed, but I see this being a bit counter to the streams architecture. I guess auto-ACK is a good default for most cases.

@marcpiechura
Copy link
Owner

@mikhailshilkov sorry for the delay, was quite busy the last weeks.
Would appreciate some feedback for efa4f0b
If you're good I would release a new version.

@mikhailshilkov
Copy link
Author

mikhailshilkov commented Sep 18, 2016

It looks good, other than the fact that I don't see a possibility to configure if I want to auto-complete or not.
I will have a look at your Event Hubs source. ACK should be less of a problem there, as the messages are preserved for subsequent reads anyway.

@marcpiechura
Copy link
Owner

I struggled a little bit withe the API, I had a additional ack parameter but then I encountered the problem with brokeredMessage.Complete vs CompleteAsync, maybe one would like to emit only completed messages downstream or use complete more like fire and forget and don't await the async call...
In the end you now can use the default behavior via Create<string>(client), that completes the message before it is send downstream, if you don't want to complete the message you can use Create(client) , which emits the BrokeredMessage as before, so you can complete it later. And the third option is to use `Create(client, msg => {...}) there you can decide weather you want to provide a different xml serializer to GetBody or if you want to complete the message async without awaiting the result.

Does that make sense ?

@mikhailshilkov
Copy link
Author

Yes, nice

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants