Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds out-of-the-box support for SQS message broker #197

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

thesujai
Copy link

Fixes #106

This is very incomplete now. All we can do now is simple enqueue using perform-async and execute the job.

Remaining things are:

  1. Failure handling when executing job
  2. Batch job

@thesujai
Copy link
Author

@olttwa Just a small review here for this incomplete work before I move forward

Copy link
Member

@olttwa olttwa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for showing the interest to contribute to Goose @thesujai. I have left some comments.

Adding a message broker involves a lot of complex work, which could be difficult to add without adequate context of Goose. If you are keen on contributing to Goose, I will suggest starting at some beginner friendly issues like #179, #144, #75, #46, #37 or #10

(close [_]
(println "SQS client does not require explicit closure.")))

(def default-opts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default-opts could point to an instance of SQS hosted locally for development purposes, for instance localstack, vsouza or roribio16's SQS docker images.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for letting me know that this existed! 😄


Close
(close [_]
(println "SQS client does not require explicit closure.")))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close is not required to be implemented as part of Broker protocol if not needed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay!

Just a follow-up question - RMQ doesn't implements some methods from broker protocol. For instance the cron-job function. But still we can call it through perform-every of client. Though it ends up throwing a run-time error, Is there a way to let the developer know that they cannot do this(Just so their IDE/LSP can say them not to)(I know it is already mentioned in docs)

@@ -10,7 +10,10 @@
ring/ring-core {:mvn/version "1.11.0"}
hiccup/hiccup {:mvn/version "2.0.0-RC2"}
bidi/bidi {:mvn/version "2.1.6"}
it.burning/cron-expression-descriptor {:mvn/version "1.2.10"}}
it.burning/cron-expression-descriptor {:mvn/version "1.2.10"}
com.cognitect.aws/api {:mvn/version "0.8.710-beta01"}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you explore the trade-offs between cognitect-labs/aws-api and mcohen01/amazonica? Why did you choose the former over the latter?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Core reason to choose aws-api is clojure-first approach there(It is lightweight) while the latter is a Java SDK wrapper.

The ease-of-using is same for both

(defn enqueue
"Enqueues a job for immediate execution."
[client queue-url job]
(aws/invoke client {:op :SendMessage
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an underlying connection pool that is utilized by aws/invoke? Could we have access to this pool and pass in explicitly as a dependency?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No as of now it does not expose a direct connection pool. We can customize it though https://github.com/cognitect-labs/aws-api?tab=readme-ov-file#overriding-the-http-client

(let [{:keys [client]} this
{:keys [queue-url]} opts]

(sqs-request/enqueue client queue-url (u/encode-to-str job))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why encode-to-str instead of encode?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First the reason to encode was that the json/write-str used in sqs_requests.clj ignored the namespaces
So a function like goose.core/process-job would turn to just process-job(It took hours to figure this out when I was implementing it)

So encode-to-str will convert this to a simple string, that can be serialized and deserialized into json which is more readable imo


(sqs-request/enqueue client queue-url (u/encode-to-str job))))

(start-worker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For background processing, clients might want to write to different queues, and a worker could process jobs off different queues. Need to design the enqueue & worker API keeping this in mind.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

different queues as in you mean different SQS services(in same account) and different queues within each service. Or you just mean we will allow Only one AWS account with a SQS service where they can connect to multiple queues?

Or you mean support for different accounts of AWS also?

`opts` : Additional options required for job execution."
[client queue-url opts]
(let [continue? (atom true)]
(async/go-loop []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goose uses claypoole instead of async for concurrent execution.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha!

(catch Exception e
(println "Error executing job:" e))))))
(recur))))
(fn stop-polling []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worker needs to return an implementation of goose.worker/stop for client to stop it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay!

:request {:QueueUrl queue-url
:MaxNumberOfMessages 1
:WaitTimeSeconds 10}})]
(when-let [messages (:Messages response)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dequeuing a job, holding a lock over the job to avoid double executions, executing a job, enqueuing for retry/death upon failure, and deletion from queue post processing are very complex operations. This section needs a lot of re-work keeping all these things in mind.

Checkout redis.worker/start and rmq.worker/start for reference.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright will look into it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add message-broker for AWS SQS
2 participants