Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make aws client selectable #16

Merged
merged 20 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ jobs:

- name: Test
env:
AWS_REGION: ap-northeast-1
AWS_ACCESS_KEY_ID: dummy
AWS_SECRET_ACCESS_KEY: dummy
run: make test
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,33 @@
# Change Log

## [Unreleased]
### Breaking Changes
* Make the `client` argument mandatory.

#### Migration
Rewrite start-consumer like below.
```clojure
;; before (using default client)
(gluttony/start-consumer queue-url consume {:num-workers 1})

;; before (using custom client)
(gluttony/start-consumer queue-url consume {:client client :num-workers 1})

;; after (using default client)
(require '[gluttony.record.cognitect-sqs-client :as g.client])
(gluttony/start-consumer queue-url consume (g.client/make-client) {:num-workers 1})

;; after (using custom client)
(require '[gluttony.record.cognitect-sqs-client :as g.client])
(gluttony/start-consumer queue-url consume (g.client/make-client client) {:num-workers 1})
```
And add api packages `com.cognitect.aws/api`, `com.cognitect.aws/endpoints` and `com.cognitect.aws/sqs` to dependencies.

### Added
* AWS API clients other than [aws-api](https://github.com/cognitect-labs/aws-api) (`com.cognitect.aws/api`) are now available.
** [aws-api](https://github.com/cognitect-labs/aws-api) is still available as `gluttony.record.cognitect-sqs-client`.
** [AWS SDK for Java 2.0](https://github.com/aws/aws-sdk-java-v2) is available as `gluttony.record.aws-sdk-client`.
** If you want to use other clients, you can implement `gluttony.protocols/ISqsClient`.

## 0.5.106
### Changed
Expand Down
19 changes: 14 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
# Gluttony
A consumer library using [core.async](https://github.com/clojure/core.async)
and [aws-api](https://github.com/cognitect-labs/aws-api) based on AWS SQS.
based on AWS SQS.

You can use this library with Standard queue but it is almost designed for FIFO queue.

[![Build and Test](https://github.com/toyokumo/gluttony/actions/workflows/build-and-test.yml/badge.svg)](https://github.com/toyokumo/gluttony/actions/workflows/build-and-test.yml)
[![cljdoc badge](https://cljdoc.org/badge/toyokumo/gluttony)](https://cljdoc.org/d/toyokumo/gluttony/CURRENT)
[![Clojars Project](https://img.shields.io/clojars/v/toyokumo/gluttony.svg)](https://clojars.org/toyokumo/gluttony)

## Installation
To install, add the following to your project `:dependencies`:

[![Clojars Project](https://clojars.org/toyokumo/gluttony/latest-version.svg)](https://clojars.org/toyokumo/gluttony)

and add [AWS SDK for Java 2.0](https://github.com/aws/aws-sdk-java-v2) to your dependencies.
(If you want to use [aws-api](https://github.com/cognitect-labs/aws-api) instead of AWS SDK, add the dependency on it.)

## Usage
### Basis
Gluttony mainly offer two APIs, `start-consumer` and `stop-consumer`.
Expand All @@ -27,7 +35,8 @@ To prevent that, call `stop-receivers` before calling `stop-consumer` and wait f

```clojure
(require '[clojure.core.async :as a]
'[gluttony.core :as gluttony])
'[gluttony.core :as gluttony]
'[gluttony.record.aws-sqs-client :as g.client])

(defn consume
"Your consume function takes three arguments.
Expand All @@ -37,7 +46,7 @@ To prevent that, call `stop-receivers` before calling `stop-consumer` and wait f
raise doesn't delete the message but change the limit of time that the message can be seen
from other receivers. raise takes zero or one argument, which control the limit of time.
default limit is zero, which means that retry will be executed as soon as possible."
[^gluttony.record.message.SQSMessage message respond raise]
[message respond raise]
(let [success? (do-my-computation-use-cpu message)]
(if success?
(do (respond)
Expand All @@ -50,7 +59,7 @@ To prevent that, call `stop-receivers` before calling `stop-consumer` and wait f
(defonce consumer (atom nil))

;; Start consumer connects to assigned queue
(reset! consumer (gluttony/start-consumer queue-url consume))
(reset! consumer (gluttony/start-consumer queue-url consume (g.client/make-client)))

;; Stop receiver and worker
(when @consumer
Expand Down Expand Up @@ -90,7 +99,7 @@ You may have to fix `dev-resources/test-config.edn`.

## License

Copyright 2019 TOYOKUMO,Inc.
Copyright 2024 TOYOKUMO,Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion build.edn
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add :licenses ?
https://github.com/liquidz/build.edn/blob/main/doc/format/licenses.adoc
Now clojars requires that pom.xml contains license information.
clojars/clojars-web#874

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{:lib toyokumo/gluttony
:version "0.5.{{git/commit-count}}"
:version "0.6.{{git/commit-count}}"
:documents [{:file "CHANGELOG.md"
:match "Unreleased"
:action :append-after
Expand Down
13 changes: 7 additions & 6 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
{:paths ["src"]
:deps {org.clojure/clojure {:mvn/version "1.11.1"}
org.clojure/core.async {:mvn/version "1.6.673"}
org.clojure/tools.logging {:mvn/version "1.2.4"}
camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"}
com.cognitect.aws/api {:mvn/version "0.8.652"}
com.cognitect.aws/endpoints {:mvn/version "1.1.12.415"}
com.cognitect.aws/sqs {:mvn/version "822.2.1109.0"}}
org.clojure/tools.logging {:mvn/version "1.2.4"}}

:aliases
{:dev {:extra-paths ["test" "dev-resources"]
:extra-deps {aero/aero {:mvn/version "1.1.6"}
:extra-deps {com.cognitect.aws/api {:mvn/version "0.8.652"}
com.cognitect.aws/endpoints {:mvn/version "1.1.12.415"}
com.cognitect.aws/sqs {:mvn/version "822.2.1109.0"}
camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"}
software.amazon.awssdk/sqs {:mvn/version "2.25.65"}
aero/aero {:mvn/version "1.1.6"}
spootnik/unilog {:mvn/version "0.7.31"}
lambdaisland/kaocha {:mvn/version "1.80.1274"}}}

Expand Down
2 changes: 1 addition & 1 deletion dev-docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3'
services:
local-sqs:
container_name: gluttony-local-sqs
image: softwaremill/elasticmq-native
image: softwaremill/elasticmq-native:1.6.4
ports:
- "9324:9324"
- "9325:9325"
Expand Down
12 changes: 3 additions & 9 deletions src/gluttony/core.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
(ns gluttony.core
(:require
[cognitect.aws.client.api :as aws]
[gluttony.protocols :as p]
[gluttony.record.consumer :as c])
(:import
Expand Down Expand Up @@ -37,10 +36,9 @@
(if success?
(respond)
(raise 10))))
client - An instance of gluttony.protocols/ISqsClient.

Optional arguments:
:client - the SQS client, which is the instance of cognitect.aws.client.Client.
if missing, cognitect.aws.client.api/client would be called.
:num-workers - the number of workers processing messages concurrently.
default: (Runtime/availableProcessors) - 1
:num-receivers - the number of receivers polling from sqs.
Expand Down Expand Up @@ -81,11 +79,8 @@
:visibility-timeout-in-heartbeat must be longer than :heartbeat.
Output:
a instance of gluttony.record.consumer.Consumer"
^Consumer [queue-url consume & [opts]]
(let [client (or (:client opts)
(aws/client {:api :sqs}))
given-client? (some? (:client opts))
num-workers (or (:num-workers opts)
^Consumer [queue-url consume client & [opts]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add an assertion for the client here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other type checks are done within gluttony.record.consumer/new-consumer, but should I do client here?

https://github.com/toyokumo/gluttony/pull/16/files#diff-2699320efc7348a4f9713eb2ec6eb23b9bb0de16775348543a0b88fadd84df5fR167

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't notice that. No action is needed then.

(let [num-workers (or (:num-workers opts)
(max 1 (dec (.availableProcessors (Runtime/getRuntime)))))
num-receivers (or (:num-receivers opts)
(max 1 (int (/ num-workers 10))))
Expand All @@ -105,7 +100,6 @@
consumer (c/new-consumer {:queue-url queue-url
:consume consume
:client client
:given-client? given-client?
:num-workers num-workers
:num-receivers num-receivers
:message-channel-size message-channel-size
Expand Down
63 changes: 63 additions & 0 deletions src/gluttony/protocols.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,66 @@
(-stop [this])
(-enable-receivers [this])
(-disable-receivers [this]))

(defprotocol ISqsClient
(receive-message [this {:keys [queue-url
max-number-of-messages
wait-time-seconds]}]
"Receive messages from AWS SQS.
Input:
queue-url - String.
The URL of the Amazon SQS queue from which messages are received.
max-number-of-messages - Number.
The maximum number of messages to return.
This value is the value of the consumer's \"receive-limit\" option.
wait-time-seconds - String.
The duration (in seconds) for which the call waits for a message to arrive in the queue before returning.
This value is the value of the consumer's \"long-polling-duration\" option.
Output:
a channel which will receive the result of the api when the operation is completed
The result is a map with the following
:messages - a coll of messages or nil (an error occurred).
Each message type is variable.
They are passed to the consumer's consume function.
:error - An error data or nil (no error occurred).
The error type is variable.")
(delete-message [this {:keys [queue-url receipt-handle]}]
"Delete the specified message from the specified queue.
Input:
queue-url - String.
The URL of the Amazon SQS queue from which messages are deleted.
receipt-handle - String.
The receipt handle associated with the message to delete.
Output:
a channel which will receive the result of the api when the operation is completed.
The result is a map with the following
:error - An error data or nil (no error occurred).
The error type is variable.")
(change-message-visibility [this {:keys [queue-url receipt-handle visibility-timeout]}]
"Changes the visibility timeout of a specified message in a queue to a new value.
The default visibility timeout for a message is 30 seconds.
The minimum is 0 seconds. The maximum is 12 hours.
Input:
queue-url - String.
The URL of the Amazon SQS queue from which messages are deleted.
receipt-handle - String.
The receipt handle associated with the message to delete.
visibility-timeout - Number.
The new value for the message's visibility timeout.
Output:
a channel which will receive the result of the api when the operation is completed.
The result is a map with the following
:error - An error data or nil (no error occurred).
The error type is variable.")
(get-message-id [this message]
"Get id of the message.
Input:
message - a message.
The element of the :messages of the result of the receive-message.")
(get-recipient-handle [this message]
"Get the receipt handle of the message.
Input:
message - a message.
The element of the :messages of the result of the receive-message.")
(stop [this]
"Stop the client. This should be called to stop the client when it is no longer needed."))
83 changes: 83 additions & 0 deletions src/gluttony/record/aws_sqs_client.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
(ns gluttony.record.aws-sqs-client
(:require
[clojure.core.async :as as]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT In another namespace, clojure.core.async is aliased as a.
We should use same alias name in a project.

[gluttony.protocols :as p])
(:import
(java.util.function
BiConsumer)
(software.amazon.awssdk.services.sqs
SqsAsyncClient)
(software.amazon.awssdk.services.sqs.model
ChangeMessageVisibilityRequest
DeleteMessageRequest
Message
MessageSystemAttributeName
ReceiveMessageRequest
ReceiveMessageResponse)))

(defrecord AwsSqsClient [^SqsAsyncClient client
given-client?]
p/ISqsClient
(receive-message [_ {:keys [queue-url
max-number-of-messages
wait-time-seconds]}]
(let [chan (as/chan 1)
request (-> (ReceiveMessageRequest/builder)
(.queueUrl queue-url)
(.messageAttributeNames ["All"])
(.messageSystemAttributeNames [MessageSystemAttributeName/ALL])
(.maxNumberOfMessages (int max-number-of-messages))
(.waitTimeSeconds (int wait-time-seconds))
(.build))]
(-> (.receiveMessage client request)
(.whenComplete (reify BiConsumer
(accept [_ message-resp error]
(as/>!! chan (if message-resp
{:messages (vec (.messages ^ReceiveMessageResponse message-resp))
:error nil}
{:message nil :error error}))
(as/close! chan)))))
chan))
(delete-message [_ {:keys [queue-url receipt-handle]}]
(let [chan (as/chan 1)
request (-> (DeleteMessageRequest/builder)
(.queueUrl queue-url)
(.receiptHandle receipt-handle)
(.build))]
(-> (.deleteMessage client request)
(.whenComplete (reify BiConsumer
(accept [_ _ error]
(as/>!! chan {:error error})
(as/close! chan)))))
chan))
(change-message-visibility [_ {:keys [queue-url receipt-handle visibility-timeout]}]
(let [chan (as/chan 1)
request (-> (ChangeMessageVisibilityRequest/builder)
(.queueUrl queue-url)
(.receiptHandle receipt-handle)
(.visibilityTimeout (int visibility-timeout))
(.build))]
(-> (.changeMessageVisibility client request)
(.whenComplete (reify BiConsumer
(accept [_ _ error]
(as/>!! chan {:error error})
(as/close! chan)))))
chan))
(get-message-id [_ message]
(.messageId ^Message message))
(get-recipient-handle [_ message]
(.receiptHandle ^Message message))
(stop [_]
(when-not given-client?
(.close client))))


(defn make-client
([]
(make-client nil))
([api-client]
{:pre [(or (instance? SqsAsyncClient api-client)
(nil? api-client))]}
(->AwsSqsClient (or api-client
(SqsAsyncClient/create))
(some? api-client))))
60 changes: 60 additions & 0 deletions src/gluttony/record/cognitect_sqs_client.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
(ns gluttony.record.cognitect-sqs-client
(:require
[camel-snake-kebab.core :as csk]
[camel-snake-kebab.extras :as cske]
[clojure.core.async :as a]
[cognitect.aws.client.api :as aws]
[gluttony.protocols :as p])
(:import
(cognitect.aws.client.impl
Client)))

(defn- invoke-async
[client op-map]
(let [op-map (update op-map :request #(cske/transform-keys csk/->PascalCaseKeyword %))]
(a/go
(when-let [res (a/<! (aws/invoke-async client op-map))]
(cske/transform-keys csk/->kebab-case-keyword res)))))

(defrecord CognitectSQSClient
;; AWS SQS client implementation using cognitect.aws.client.api
;; The structure of Output of functions are same as `(:response (:ReceiveMessage (aws/ops client)))`,
;; but the keys are kebab-case-keyword.
[client given-client?]
p/ISqsClient
(receive-message [_ request]
(a/go
(let [request (merge {:message-attribute-names ["All"]
:message-system-attribute-names ["All"]}
request)
response (a/<! (invoke-async client {:op :ReceiveMessage :request request}))]
(if (contains? response :category)
{:messages nil :error response}
{:messages (:messages response) :error nil}))))
(delete-message [_ request]
(a/go
(let [response (a/<! (invoke-async client {:op :DeleteMessage :request request}))]
{:error (when (contains? response :category)
response)})))
(change-message-visibility [_ request]
(a/go
(let [response (a/<! (invoke-async client {:op :ChangeMessageVisibility :request request}))]
{:error (when (contains? response :category)
response)})))
(get-message-id [_ message]
(:message-id message))
(get-recipient-handle [_ message]
(:receipt-handle message))
(stop [_]
(when-not given-client?
(aws/stop client))))

(defn make-client
([]
(make-client nil))
([api-client]
{:pre [(or (instance? Client api-client)
(nil? api-client))]}
(->CognitectSQSClient (or api-client
(aws/client {:api :sqs}))
(some? api-client))))
Loading
Loading