Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PublishWithConfirmation infinte loop #15

Closed
dfelici87 opened this issue Apr 9, 2021 · 11 comments
Closed

PublishWithConfirmation infinte loop #15

dfelici87 opened this issue Apr 9, 2021 · 11 comments
Assignees
Labels
bug Something isn't working

Comments

@dfelici87
Copy link

Hi,

I use PublishWithConfirmation function but if rabbit return an error (confirmation.Ack = false) the function enter into an infinite loop because there is a "goto Publish" statement that reset the timeout and re-publish a message every times.

It's correct? or It's a bug?
Can you manage attempts?

thanks

@houseofcat
Copy link
Owner

houseofcat commented Apr 11, 2021

It's correct, confirmation.Ack == false means that the server is telling us it did not get the message and that we need to resend it.

This could in theory be an infinite loop if the server kept replying false but that would be a server side issue that needs solving. PublishWithConfirmation is working as intended to fulfill the goal to get the message server side and reliability is the most important feature. There is nothing to manage in this flow other than using a timeout. The timeout is implicitly applied to waiting response after Publishing, that should be sufficient in exiting a standard communication problem.

If you want simpler Publishing then use a different Publish call/mechanism or you can write your own version while still using ChannelPools.

@houseofcat houseofcat added the wontfix This will not be worked on label Apr 13, 2021
@ppiccolo
Copy link

I can understand Your point but how can I catch this error ?

@houseofcat
Copy link
Owner

houseofcat commented Apr 15, 2021

The infinite loop? Well you could write your own or switch to this one manual Publish method

func (pub *Publisher) PublishWithConfirmationContext(ctx context.Context, letter *Letter) {

It will stop trying after context you provide expires.

@ppiccolo
Copy link

ppiccolo commented Apr 16, 2021

Ok thank you, but how I can get back the channel error, no way ?

I think that could be useful to be able from client side to see that there is a server side error.

@houseofcat
Copy link
Owner

houseofcat commented Apr 16, 2021

Why would you need to do that? In golang you can step into the package and put a breakpoint?

If you mean after deployment and wanted to log such a message you have to get creative in golang without Dependency Injection. Here you would need writing a logging interface or return the error out. This function is designed to keep retrying on error - meaning we can't return the error - because reliable transmission is the highest priority here.

If you want Publish with errors try one of the simpler publish functions. Here, for example, we output err via a chan that you can subscribe to and log all messages.

pub.publishReceipt(letter, err)

This gives you access to the original message (presumably for retry) and the original error.

Also you can write your own version using components like the ChannelPool. You don't have to use the helpers I provided.

@houseofcat
Copy link
Owner

PublishWithConfirmation is just loop on Publish.

Switch to Publish.

func (pub *Publisher) Publish(letter *Letter, skipReceipt bool) {

@ppiccolo
Copy link

ppiccolo commented Apr 16, 2021

If you want Publish with errors try one of the simpler publish functions. Here, for example, we output err via a chan that you can subscribe to and log all messages.

This is exactly what I'm trying to do but if I try to publish a message to a non existing exchange I'm not getting back any error on the chan that you provide.

But maybe I'm missing something...

package main

import (
	log "github.com/sirupsen/logrus"
	turbo "github.com/houseofcat/turbocookedrabbit/v2/pkg/tcr"
	"time"
)

func ErrorHandler(err error){
	log.Errorf("Error: %v", err)
}

func main() {

	cfg := &turbo.RabbitSeasoning{
		EncryptionConfig:  &turbo.EncryptionConfig{
			Enabled:           false,
			Type:              "",
			Hashkey:           nil,
			TimeConsideration: 0,
			MemoryMultiplier:  0,
			Threads:           0,
		},
		CompressionConfig: &turbo.CompressionConfig{
			Enabled: false,
			Type:    "",
		},
		PoolConfig:        &turbo.PoolConfig{
			ApplicationName:      "test_error",
			URI:                  "amqp://guest:[email protected]:5672/",
			Heartbeat:            1,
			ConnectionTimeout:    10,
			SleepOnErrorInterval: 1,
			MaxConnectionCount:   100,
			MaxCacheChannelCount: 100,
			TLSConfig:            nil,
		},
		ConsumerConfigs:   nil,
		PublisherConfig:   &turbo.PublisherConfig{
			AutoAck:                false,
			SleepOnIdleInterval:    1,
			SleepOnErrorInterval:   1,
			PublishTimeOutInterval: 1,
			MaxRetryCount:          2,
		},
	}
	svc, err := turbo.NewRabbitService(cfg, "","", nil, ErrorHandler)

	if err != nil {
		log.Fatalf("%v", err)
	}

	go func() {

		for {
			select {
			case err := <-svc.CentralErr():
				log.Errorf("Error %v", err)
			case recipt := <-svc.Publisher.PublishReceipts():
				if recipt.Error != nil {
					log.Errorf("Error %v", recipt.ToString())
					break
				}
				log.Infof("Info %v", recipt.ToString())
			default:
				time.Sleep(10 * time.Microsecond)
				break
			}
		}
	}()

	log.Infof("%v", "publishing...")

	for i := 0; i < 100; i++ {
		err = svc.Publish("Hi", "nothing", "nothing", "", true, nil)
		if err != nil {
			log.Fatalf("%v", err)
		}
	}
	
	time.Sleep(10 * time.Second)

	log.Infof("%v", "Done")
}

however I'm agree with you that the behavior of PublishWithConfirmation is correct and the suggestion you given can fit the requirements.

Maybe this could be another issue.

Thanks.

@houseofcat
Copy link
Owner

houseofcat commented Apr 16, 2021

I believe what you are experiencing is a Dead Letter Queueing.
Edit: I am starting to have my doubts though. I would normally expect this to error like it does in C#.

This test publishes to an exchange that doesn't exist and uses none of my library for connectivity.

// TestBasicPublishToNonExistentExchange tests what happen when a publish to exchange
// that doesn't exist also doesn't error.
func TestBasicPublishToNonExistentExchange(t *testing.T) {
	defer leaktest.Check(t)()

	letter := tcr.CreateMockLetter("DoesNotExist", "TcrTestQueue", nil)
	amqpConn, err := amqp.Dial(Seasoning.PoolConfig.URI)
	if err != nil {
		t.Error(t, err)
		return
	}

	amqpChan, err := amqpConn.Channel()
	if err != nil {
		t.Error(t, err)
		return
	}

	err = amqpChan.Publish(
		letter.Envelope.Exchange,
		letter.Envelope.RoutingKey,
		letter.Envelope.Mandatory,
		letter.Envelope.Immediate,
		amqp.Publishing{
			ContentType: letter.Envelope.ContentType,
			Body:        letter.Body,
			MessageId:   letter.LetterID.String(),
			Timestamp:   time.Now().UTC(),
			AppId:       "TCR-Test",
		})

	if err != nil {
		t.Error(t, err)
		return
	}

	amqpChan.Close()
	amqpConn.Close()
}

As you can see the test doesn't exit out early.
image

And that exchange does not exist.
image

And you would expect no error in a DLQing scenario which happens to be what we see here.

The reason it loops in a PublishWithConfirmation call is because I get a second message (the confirmation message). This tells me it was not properly received server side at its destination - which is correct.

As good practice goes, a pattern that I have seen is that devs will commonly declare the queue and exchange (and bind them) on every app startup. It's why I added full topology support via JSON file too. You get a simple way to keep a source controlled version of your topology and also easily hop between environments.

Assuming it really is dead lettering, then alternatively - and more of devops / server administration solution - is to configure your Dead Letter Policy server side. Route the messages in this situation somewhere as to not lose them.
https://www.rabbitmq.com/dlx.html

@ppiccolo
Copy link

Edit: I am starting to have my doubts though. I would normally expect this to error like it does in C#.

Definitely this is my expectation too... need further investigation

@houseofcat houseofcat added bug Something isn't working and removed wontfix This will not be worked on labels Apr 21, 2021
@houseofcat houseofcat self-assigned this Apr 21, 2021
@houseofcat
Copy link
Owner

houseofcat commented Apr 21, 2021

@ppiccolo Hey Paolo, simple temporary work around is to use the Publish or PublishWithConfirmationContext for right now. I believe the issue is that somewhere, somehow, it's not following AMQP v0.9.1 spec in streadway/amqp. I have opened an issue with them and will keep this open to monitor the situation.

streadway/amqp#504

Also consider that you should always try to build your queues, exchanges, and bindings, on startup just to be safe that they are not missing.

@ppiccolo
Copy link

Ok, tnx a lot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants