From 3538bbf2e892d68772a6626a306deed3b7ea4754 Mon Sep 17 00:00:00 2001 From: Rishu Yadav Date: Mon, 19 Aug 2024 14:04:31 +0530 Subject: [PATCH] SQS integration in blob and metrics for queue service migration prerequisite (#829) * feat: integrate sqs to blob service * Create an sqs service to send messages to sqs * Send all the incoming PUT and DELETE request to sqs * feat: inlcude functionality to store encoded blob in sqs * Convert the incoming blob to bytearray, encode it and send it to sqs * Handle the incoming api request of PUT blob object * Send the attributes and object to sqs * feat: restructure the queue intialization following better architecture * Create interface for queue, so can include multiple implementations * Follow lose coupling * Use a factory method for sqs initialization * PD-239427 fix: fix logging and simplified factory * feat: include params required for iss, send to sqs and update queue name * Include queue url param to be sent to sqs * Update the queue name * Update the factory as per the required changes * update: update tags for deployment * branch admin -prepare release emodb-6.5.135 * branch admin -prepare for next development iteration * update: add fifo queue for blobstore migration and tag changes * branch admin -prepare release emodb-6.5.144 * branch admin -prepare for next development iteration * fix: remove sending of bytecode to sqs * For large files , bytecode is too huge to be sent to sqs * Send request url instead of bytecode * feat: add custom exceptions * handle exceptions in a robust way * add digestbytes to sqs * chore: add tags skip sonatype for deployment * branch admin -prepare release emodb-6.5.159 * branch admin -prepare for next development iteration * fix: add back sonatype dependency * branch admin -prepare release emodb-6.5.160 * branch admin -prepare for next development iteration * Removed bytearray in sqs put * Update MessagingService.java for removing byte * Update SQSService.java * branch admin -prepare release emodb-6.5.161 * branch admin -prepare for next development iteration * skipping nexus plugin for testing --parent.pom * branch admin -prepare release emodb-6.5.162 * branch admin -prepare for next development iteration * add metrics to Queue poll * branch admin -prepare release emodb-6.5.163 * branch admin -prepare for next development iteration * update pom version * branch admin -prepare release emodb-6.5.165 * branch admin -prepare for next development iteration * branch admin -prepare release emodb-6.5.166 * branch admin -prepare for next development iteration * feat: add poll metrics for dedupq * feat: add send count metrics * fix: add metrics for sendBatch * udate: update tags from 167 to 168 * fix: fix the compilation issue remove metricregistry from constructor * branch admin -prepare release emodb-6.5.168 * branch admin -prepare for next development iteration * chore: update tags from 168 to 169 * branch admin -prepare release emodb-6.5.169 * branch admin -prepare for next development iteration * fix: fix the metric tag name issue * branch admin -prepare release emodb-6.5.170 * branch admin -prepare for next development iteration --------- Co-authored-by: jenkins Co-authored-by: anandujayan Co-authored-by: ReddyAnand-BV Co-authored-by: Nabajyoti Dash --- auth/auth-client/pom.xml | 2 +- auth/auth-core/pom.xml | 2 +- auth/auth-store/pom.xml | 2 +- auth/auth-util/pom.xml | 2 +- blob-api/pom.xml | 2 +- blob-clients/blob-client-common/pom.xml | 2 +- blob-clients/blob-client-jersey2/pom.xml | 2 +- blob-clients/blob-client/pom.xml | 2 +- blob/pom.xml | 2 +- cachemgr/pom.xml | 2 +- common/api/pom.xml | 2 +- common/astyanax/pom.xml | 2 +- common/client-jax-rs-2/pom.xml | 2 +- common/client-jersey2/pom.xml | 2 +- common/client/pom.xml | 2 +- common/dropwizard/pom.xml | 2 +- common/jersey-client/pom.xml | 2 +- common/json/pom.xml | 2 +- common/stash/pom.xml | 2 +- common/uuid/pom.xml | 2 +- common/zookeeper/pom.xml | 2 +- databus-api/pom.xml | 2 +- databus-client-common/pom.xml | 2 +- databus-client-jersey2/pom.xml | 2 +- databus-client/pom.xml | 2 +- databus/pom.xml | 2 +- datacenter/pom.xml | 2 +- event/pom.xml | 2 +- job-api/pom.xml | 2 +- job/pom.xml | 2 +- kafka/pom.xml | 2 +- megabus/pom.xml | 2 +- parent/pom.xml | 8 +- plugins/pom.xml | 2 +- pom.xml | 2 +- quality/integration/pom.xml | 2 +- .../queue/DedupQueueJerseyTest.java | 3 +- .../integration/queue/QueueJerseyTest.java | 3 +- quality/pom.xml | 2 +- queue-api/pom.xml | 2 +- queue-client-common/pom.xml | 2 +- queue-client-jersey2/pom.xml | 2 +- queue-client/pom.xml | 2 +- queue/pom.xml | 2 +- .../queue/core/AbstractQueueService.java | 30 +++- .../queue/core/DefaultDedupQueueService.java | 5 +- .../emodb/queue/core/DefaultQueueService.java | 5 +- .../emodb/queue/core/SizeQueueCacheTest.java | 3 +- sdk/pom.xml | 2 +- sor-api/pom.xml | 2 +- sor-client-common/pom.xml | 2 +- sor-client-jersey2/pom.xml | 2 +- sor-client/pom.xml | 2 +- sor/pom.xml | 2 +- table/pom.xml | 2 +- uac-api/pom.xml | 2 +- uac-client-jersey2/pom.xml | 2 +- uac-client/pom.xml | 2 +- web-local/pom.xml | 2 +- web/pom.xml | 2 +- .../com/bazaarvoice/emodb/web/EmoService.java | 4 +- .../resources/blob/BlobStoreResource1.java | 124 +++++++++++----- .../blob/messageQueue/MessagingService.java | 21 +++ .../messageQueue/SQSMessageException.java | 7 + .../blob/messageQueue/SQSService.java | 136 ++++++++++++++++++ .../blob/messageQueue/SQSServiceFactory.java | 12 ++ .../resources/queue/DedupQueueResource1.java | 37 ++++- .../web/resources/queue/QueueResource1.java | 47 +++++- yum/pom.xml | 2 +- 69 files changed, 440 insertions(+), 113 deletions(-) create mode 100644 web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/MessagingService.java create mode 100644 web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSMessageException.java create mode 100644 web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSService.java create mode 100644 web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSServiceFactory.java diff --git a/auth/auth-client/pom.xml b/auth/auth-client/pom.xml index 6f3a06071b..14e80f2c35 100644 --- a/auth/auth-client/pom.xml +++ b/auth/auth-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/auth/auth-core/pom.xml b/auth/auth-core/pom.xml index c08f2f6720..257be185db 100644 --- a/auth/auth-core/pom.xml +++ b/auth/auth-core/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/auth/auth-store/pom.xml b/auth/auth-store/pom.xml index 81142319a2..3b9531494e 100644 --- a/auth/auth-store/pom.xml +++ b/auth/auth-store/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/auth/auth-util/pom.xml b/auth/auth-util/pom.xml index 279a50d398..fe30721176 100644 --- a/auth/auth-util/pom.xml +++ b/auth/auth-util/pom.xml @@ -3,7 +3,7 @@ emodb com.bazaarvoice.emodb - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../pom.xml 4.0.0 diff --git a/blob-api/pom.xml b/blob-api/pom.xml index 35a35c0aeb..85123d5a42 100644 --- a/blob-api/pom.xml +++ b/blob-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/blob-clients/blob-client-common/pom.xml b/blob-clients/blob-client-common/pom.xml index c1ec96d2dc..2551dd9b1f 100644 --- a/blob-clients/blob-client-common/pom.xml +++ b/blob-clients/blob-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/blob-clients/blob-client-jersey2/pom.xml b/blob-clients/blob-client-jersey2/pom.xml index b0c1ea3708..1469a20558 100644 --- a/blob-clients/blob-client-jersey2/pom.xml +++ b/blob-clients/blob-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/blob-clients/blob-client/pom.xml b/blob-clients/blob-client/pom.xml index 1cdfb57fe4..8eb86f2ce1 100644 --- a/blob-clients/blob-client/pom.xml +++ b/blob-clients/blob-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/blob/pom.xml b/blob/pom.xml index a1708a21d8..e4396c74cf 100644 --- a/blob/pom.xml +++ b/blob/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/cachemgr/pom.xml b/cachemgr/pom.xml index 9babe4ec42..c05b60ea79 100644 --- a/cachemgr/pom.xml +++ b/cachemgr/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/common/api/pom.xml b/common/api/pom.xml index f53395a0c6..b8892b6428 100644 --- a/common/api/pom.xml +++ b/common/api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/common/astyanax/pom.xml b/common/astyanax/pom.xml index 8777312a9d..b6eff20acf 100644 --- a/common/astyanax/pom.xml +++ b/common/astyanax/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/common/client-jax-rs-2/pom.xml b/common/client-jax-rs-2/pom.xml index 3d23a1f6ae..1f2e6db668 100644 --- a/common/client-jax-rs-2/pom.xml +++ b/common/client-jax-rs-2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/common/client-jersey2/pom.xml b/common/client-jersey2/pom.xml index 418864320d..1a6fe97c68 100644 --- a/common/client-jersey2/pom.xml +++ b/common/client-jersey2/pom.xml @@ -5,7 +5,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/common/client/pom.xml b/common/client/pom.xml index 02b055e05e..640eeb51a0 100644 --- a/common/client/pom.xml +++ b/common/client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/common/dropwizard/pom.xml b/common/dropwizard/pom.xml index da60284420..8bb610aba2 100644 --- a/common/dropwizard/pom.xml +++ b/common/dropwizard/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/common/jersey-client/pom.xml b/common/jersey-client/pom.xml index 383c34fc7c..aa9d419736 100644 --- a/common/jersey-client/pom.xml +++ b/common/jersey-client/pom.xml @@ -5,7 +5,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/common/json/pom.xml b/common/json/pom.xml index bc81b37b44..917adb1936 100644 --- a/common/json/pom.xml +++ b/common/json/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/common/stash/pom.xml b/common/stash/pom.xml index f5156109f6..bb9f0e5b7b 100644 --- a/common/stash/pom.xml +++ b/common/stash/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/common/uuid/pom.xml b/common/uuid/pom.xml index 5be8c0e596..69c7151265 100644 --- a/common/uuid/pom.xml +++ b/common/uuid/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/common/zookeeper/pom.xml b/common/zookeeper/pom.xml index a384f1c439..02e713b07c 100644 --- a/common/zookeeper/pom.xml +++ b/common/zookeeper/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/databus-api/pom.xml b/databus-api/pom.xml index e4787a9ee5..27ef872a85 100644 --- a/databus-api/pom.xml +++ b/databus-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/databus-client-common/pom.xml b/databus-client-common/pom.xml index c7a059ac80..5af9b9d379 100644 --- a/databus-client-common/pom.xml +++ b/databus-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/databus-client-jersey2/pom.xml b/databus-client-jersey2/pom.xml index 9684eb3e6b..7c2794e645 100644 --- a/databus-client-jersey2/pom.xml +++ b/databus-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/databus-client/pom.xml b/databus-client/pom.xml index a0135f23dd..46b1d8c25d 100644 --- a/databus-client/pom.xml +++ b/databus-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/databus/pom.xml b/databus/pom.xml index 967f294a7d..e966f653bb 100644 --- a/databus/pom.xml +++ b/databus/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/datacenter/pom.xml b/datacenter/pom.xml index c035f2acea..d0582217b8 100644 --- a/datacenter/pom.xml +++ b/datacenter/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/event/pom.xml b/event/pom.xml index 4022240bd7..8e3cd03e8b 100644 --- a/event/pom.xml +++ b/event/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/job-api/pom.xml b/job-api/pom.xml index 0b1b68a59b..8ac8b8687e 100644 --- a/job-api/pom.xml +++ b/job-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/job/pom.xml b/job/pom.xml index d338802efb..e4a44a41d2 100644 --- a/job/pom.xml +++ b/job/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/kafka/pom.xml b/kafka/pom.xml index f4d5659b20..b553098418 100644 --- a/kafka/pom.xml +++ b/kafka/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/megabus/pom.xml b/megabus/pom.xml index fab92da47f..2c91fd1561 100644 --- a/megabus/pom.xml +++ b/megabus/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/parent/pom.xml b/parent/pom.xml index 883de31746..27d58c9133 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -11,7 +11,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT pom EmoDB Parent @@ -882,7 +882,7 @@ - + org.apache.maven.plugins maven-dependency-plugin @@ -904,7 +904,7 @@ analyze-only - true + false true com.fasterxml.jackson.datatype:jackson-datatype-joda diff --git a/plugins/pom.xml b/plugins/pom.xml index e7d926868c..e67d3d650d 100644 --- a/plugins/pom.xml +++ b/plugins/pom.xml @@ -4,7 +4,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/pom.xml b/pom.xml index e8113c23bc..5f048e3d7b 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT parent/pom.xml diff --git a/quality/integration/pom.xml b/quality/integration/pom.xml index 20a3679268..f2fca66aa4 100644 --- a/quality/integration/pom.xml +++ b/quality/integration/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../../parent/pom.xml diff --git a/quality/integration/src/test/java/test/integration/queue/DedupQueueJerseyTest.java b/quality/integration/src/test/java/test/integration/queue/DedupQueueJerseyTest.java index 294f0e75da..0bb6adce72 100644 --- a/quality/integration/src/test/java/test/integration/queue/DedupQueueJerseyTest.java +++ b/quality/integration/src/test/java/test/integration/queue/DedupQueueJerseyTest.java @@ -16,6 +16,7 @@ import com.bazaarvoice.ostrich.PartitionContextBuilder; import com.bazaarvoice.ostrich.pool.OstrichAccessors; import com.bazaarvoice.ostrich.pool.PartitionContextValidator; +import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; @@ -56,7 +57,7 @@ public class DedupQueueJerseyTest extends ResourceTest { @Rule public ResourceTestRule _resourceTestRule = setupResourceTestRule( - Collections.singletonList(new DedupQueueResource1(_server, DedupQueueServiceAuthenticator.proxied(_proxy))), + Collections.singletonList(new DedupQueueResource1(_server, DedupQueueServiceAuthenticator.proxied(_proxy), new MetricRegistry())), ImmutableMap.of( APIKEY_QUEUE, new ApiKey("queue", ImmutableSet.of("queue-role")), APIKEY_UNAUTHORIZED, new ApiKey("unauth", ImmutableSet.of("unauthorized-role"))), diff --git a/quality/integration/src/test/java/test/integration/queue/QueueJerseyTest.java b/quality/integration/src/test/java/test/integration/queue/QueueJerseyTest.java index 82a6f1108d..d7e344ca44 100644 --- a/quality/integration/src/test/java/test/integration/queue/QueueJerseyTest.java +++ b/quality/integration/src/test/java/test/integration/queue/QueueJerseyTest.java @@ -16,6 +16,7 @@ import com.bazaarvoice.ostrich.PartitionContextBuilder; import com.bazaarvoice.ostrich.pool.OstrichAccessors; import com.bazaarvoice.ostrich.pool.PartitionContextValidator; +import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; @@ -56,7 +57,7 @@ public class QueueJerseyTest extends ResourceTest { @Rule public ResourceTestRule _resourceTestRule = setupResourceTestRule( - Collections.singletonList(new QueueResource1(_server, QueueServiceAuthenticator.proxied(_proxy))), + Collections.singletonList(new QueueResource1(_server, QueueServiceAuthenticator.proxied(_proxy), new MetricRegistry())), ImmutableMap.of( APIKEY_QUEUE, new ApiKey("queue", ImmutableSet.of("queue-role")), APIKEY_UNAUTHORIZED, new ApiKey("unauth", ImmutableSet.of("unauthorized-role"))), diff --git a/quality/pom.xml b/quality/pom.xml index 1d32827010..ffbd280391 100644 --- a/quality/pom.xml +++ b/quality/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/queue-api/pom.xml b/queue-api/pom.xml index 1fde08c1ee..b7b7eb2b33 100644 --- a/queue-api/pom.xml +++ b/queue-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/queue-client-common/pom.xml b/queue-client-common/pom.xml index e4aa043ad4..d71aae5218 100644 --- a/queue-client-common/pom.xml +++ b/queue-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/queue-client-jersey2/pom.xml b/queue-client-jersey2/pom.xml index 13179a0946..2eb7d7ceef 100644 --- a/queue-client-jersey2/pom.xml +++ b/queue-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/queue-client/pom.xml b/queue-client/pom.xml index 5588dfdfb3..53a53dc625 100644 --- a/queue-client/pom.xml +++ b/queue-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/queue/pom.xml b/queue/pom.xml index 1c7cc1c529..c38a0b5332 100644 --- a/queue/pom.xml +++ b/queue/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java index 40ff4bf508..f07083616d 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java @@ -18,6 +18,8 @@ import com.bazaarvoice.emodb.queue.api.Names; import com.bazaarvoice.emodb.queue.api.UnknownMoveException; import com.bazaarvoice.emodb.sortedq.core.ReadOnlyQueueException; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.cache.CacheBuilder; @@ -46,18 +48,24 @@ abstract class AbstractQueueService implements BaseQueueService { private final JobService _jobService; private final JobType _moveQueueJobType; private final LoadingCache> _queueSizeCache; + private final Meter _sendAllMeterAQS; + private final Meter _sendAllMeterNullAQS; + + private final Meter _pollAQS; + private final Meter _pollNullAQS; public static final int MAX_MESSAGE_SIZE_IN_BYTES = 30 * 1024; protected AbstractQueueService(BaseEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, JobType moveQueueJobType, - Clock clock) { + Clock clock, MetricRegistry metricRegistry) { _eventStore = eventStore; _jobService = jobService; _moveQueueJobType = moveQueueJobType; registerMoveQueueJobHandler(jobHandlerRegistry); + _queueSizeCache = CacheBuilder.newBuilder() .expireAfterWrite(15, TimeUnit.SECONDS) .maximumSize(2000) @@ -69,6 +77,11 @@ public Map.Entry load(SizeCacheKey key) return Maps.immutableEntry(internalMessageCountUpTo(key.channelName, key.limitAsked), key.limitAsked); } }); + _sendAllMeterAQS = metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class, "sendAllAQS")); + _sendAllMeterNullAQS = metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class, "sendAllNullAQS")); + _pollAQS= metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class,"pollAQS")); + _pollNullAQS= metricRegistry.meter(MetricRegistry.name(AbstractQueueService.class,"pollNullAQS")); + } private void registerMoveQueueJobHandler(JobHandlerRegistry jobHandlerRegistry) { @@ -109,6 +122,11 @@ public void sendAll(String queue, Collection messages) { @Override public void sendAll(Map> messagesByQueue) { requireNonNull(messagesByQueue, "messagesByQueue"); + if(messagesByQueue.keySet().isEmpty()){ + _sendAllMeterNullAQS.mark(); + } else { + _sendAllMeterAQS.mark(messagesByQueue.keySet().size()); + } ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); for (Map.Entry> entry : messagesByQueue.entrySet()) { @@ -178,8 +196,14 @@ public List poll(String queue, Duration claimTtl, int limit) { checkLegalQueueName(queue); checkArgument(claimTtl.toMillis() >= 0, "ClaimTtl must be >=0"); checkArgument(limit > 0, "Limit must be >0"); - - return toMessages(_eventStore.poll(queue, claimTtl, limit)); + List response = toMessages(_eventStore.poll(queue, claimTtl, limit)); + if(response.isEmpty()){ + _pollNullAQS.mark(); + } + else{ + _pollAQS.mark(response.size()); + } + return response; } @Override diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java index 85ef29c4fc..0955166218 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultDedupQueueService.java @@ -4,6 +4,7 @@ import com.bazaarvoice.emodb.job.api.JobHandlerRegistry; import com.bazaarvoice.emodb.job.api.JobService; import com.bazaarvoice.emodb.queue.api.DedupQueueService; +import com.codahale.metrics.MetricRegistry; import com.google.inject.Inject; import java.time.Clock; @@ -11,7 +12,7 @@ public class DefaultDedupQueueService extends AbstractQueueService implements DedupQueueService { @Inject public DefaultDedupQueueService(DedupEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, - Clock clock) { - super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock); + Clock clock, MetricRegistry metricRegistry) { + super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock, metricRegistry); } } diff --git a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java index 71f9a61ed3..947572208d 100644 --- a/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java +++ b/queue/src/main/java/com/bazaarvoice/emodb/queue/core/DefaultQueueService.java @@ -4,6 +4,7 @@ import com.bazaarvoice.emodb.job.api.JobHandlerRegistry; import com.bazaarvoice.emodb.job.api.JobService; import com.bazaarvoice.emodb.queue.api.QueueService; +import com.codahale.metrics.MetricRegistry; import com.google.inject.Inject; import java.time.Clock; @@ -11,7 +12,7 @@ public class DefaultQueueService extends AbstractQueueService implements QueueService { @Inject public DefaultQueueService(EventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry, - Clock clock) { - super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock); + Clock clock, MetricRegistry metricRegistry) { + super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock, metricRegistry); } } diff --git a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java index e6f66a5c0f..90a0ea837d 100644 --- a/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java +++ b/queue/src/test/java/com/bazaarvoice/emodb/queue/core/SizeQueueCacheTest.java @@ -4,6 +4,7 @@ import com.bazaarvoice.emodb.job.api.JobHandlerRegistry; import com.bazaarvoice.emodb.job.api.JobService; import com.bazaarvoice.emodb.job.api.JobType; +import com.codahale.metrics.MetricRegistry; import org.testng.annotations.Test; import java.time.Clock; @@ -37,7 +38,7 @@ public void testSizeCache() { BaseEventStore mockEventStore = mock(BaseEventStore.class); AbstractQueueService queueService = new AbstractQueueService(mockEventStore, mock(JobService.class), - mock(JobHandlerRegistry.class), mock(JobType.class), clock){}; + mock(JobHandlerRegistry.class), mock(JobType.class), clock, new MetricRegistry()){}; // At limit=500, size estimate should be at 4800 // At limit=50, size estimate should be at 5000 diff --git a/sdk/pom.xml b/sdk/pom.xml index 78c41677e6..e8f8b7eb5b 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/sor-api/pom.xml b/sor-api/pom.xml index fcfc6f047d..d7bc799c60 100644 --- a/sor-api/pom.xml +++ b/sor-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/sor-client-common/pom.xml b/sor-client-common/pom.xml index 037f125c61..8028d79623 100644 --- a/sor-client-common/pom.xml +++ b/sor-client-common/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/sor-client-jersey2/pom.xml b/sor-client-jersey2/pom.xml index 9644da29a2..4141cd9d99 100644 --- a/sor-client-jersey2/pom.xml +++ b/sor-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/sor-client/pom.xml b/sor-client/pom.xml index ac59970c76..100d90210e 100644 --- a/sor-client/pom.xml +++ b/sor-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/sor/pom.xml b/sor/pom.xml index 9fe17c5c41..d2a61758ef 100644 --- a/sor/pom.xml +++ b/sor/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/table/pom.xml b/table/pom.xml index 02564d6929..e4592dd6b0 100644 --- a/table/pom.xml +++ b/table/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/uac-api/pom.xml b/uac-api/pom.xml index 71f0a81c69..b9ba2a6008 100644 --- a/uac-api/pom.xml +++ b/uac-api/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/uac-client-jersey2/pom.xml b/uac-client-jersey2/pom.xml index 1a2505d827..38984d2abd 100644 --- a/uac-client-jersey2/pom.xml +++ b/uac-client-jersey2/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/uac-client/pom.xml b/uac-client/pom.xml index 5a05f632b4..55aae7fdb8 100644 --- a/uac-client/pom.xml +++ b/uac-client/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/web-local/pom.xml b/web-local/pom.xml index c49edf5432..4535e14494 100644 --- a/web-local/pom.xml +++ b/web-local/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/web/pom.xml b/web/pom.xml index 01d608fdcc..eadb8ccbf7 100644 --- a/web/pom.xml +++ b/web/pom.xml @@ -6,7 +6,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/EmoService.java b/web/src/main/java/com/bazaarvoice/emodb/web/EmoService.java index 8a15f6c970..48a6059960 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/EmoService.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/EmoService.java @@ -304,9 +304,9 @@ private void evaluateQueue() // Start the Queue service ResourceRegistry resources = _injector.getInstance(ResourceRegistry.class); // Start the Queue service - resources.addResource(_cluster, "emodb-queue-1", new QueueResource1(queueService, queueClient)); + resources.addResource(_cluster, "emodb-queue-1", new QueueResource1(queueService, queueClient, _environment.metrics())); // Start the Dedup Queue service - resources.addResource(_cluster, "emodb-dedupq-1", new DedupQueueResource1(dedupQueueService, dedupQueueClient)); + resources.addResource(_cluster, "emodb-dedupq-1", new DedupQueueResource1(dedupQueueService, dedupQueueClient, _environment.metrics())); } private void evaluateScanner() diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java index a3869e8214..00c380af6a 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java @@ -1,13 +1,9 @@ package com.bazaarvoice.emodb.web.resources.blob; +import com.amazonaws.AmazonClientException; import com.bazaarvoice.emodb.auth.jersey.Authenticated; import com.bazaarvoice.emodb.auth.jersey.Subject; -import com.bazaarvoice.emodb.blob.api.Blob; -import com.bazaarvoice.emodb.blob.api.BlobMetadata; -import com.bazaarvoice.emodb.blob.api.BlobStore; -import com.bazaarvoice.emodb.blob.api.Range; -import com.bazaarvoice.emodb.blob.api.RangeSpecification; -import com.bazaarvoice.emodb.blob.api.Table; +import com.bazaarvoice.emodb.blob.api.*; import com.bazaarvoice.emodb.common.api.UnauthorizedException; import com.bazaarvoice.emodb.common.json.LoggingIterator; import com.bazaarvoice.emodb.sor.api.Audit; @@ -17,6 +13,9 @@ import com.bazaarvoice.emodb.web.auth.resource.NamedResource; import com.bazaarvoice.emodb.web.jersey.params.SecondsParam; import com.bazaarvoice.emodb.web.resources.SuccessResponse; +import com.bazaarvoice.emodb.web.resources.blob.messageQueue.MessagingService; +import com.bazaarvoice.emodb.web.resources.blob.messageQueue.SQSMessageException; +import com.bazaarvoice.emodb.web.resources.blob.messageQueue.SQSServiceFactory; import com.bazaarvoice.emodb.web.resources.sor.AuditParam; import com.bazaarvoice.emodb.web.resources.sor.TableOptionsParam; import com.codahale.metrics.Meter; @@ -37,44 +36,24 @@ import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.compress.utils.IOUtils; import org.apache.shiro.authz.annotation.RequiresAuthentication; import org.apache.shiro.authz.annotation.RequiresPermissions; import org.coursera.metrics.datadog.TaggedName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HEAD; -import javax.ws.rs.HeaderParam; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.StreamingOutput; -import javax.ws.rs.core.UriInfo; +import javax.ws.rs.*; +import javax.ws.rs.core.*; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.time.Duration; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Spliterators; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.StreamSupport; - import static java.lang.String.format; @Path("/blob/1") @@ -84,6 +63,8 @@ public class BlobStoreResource1 { private static final Logger _log = LoggerFactory.getLogger(BlobStoreResource1.class); + private final MessagingService _messagingService; + private static final String X_BV_PREFIX = "X-BV-"; // HTTP header prefix for BlobMetadata other than attributes private static final String X_BVA_PREFIX = "X-BVA-"; // HTTP header prefix for BlobMetadata attributes private static final Pattern CONTENT_ENCODING = Pattern.compile("content[-_]?encoding", Pattern.CASE_INSENSITIVE); @@ -110,11 +91,11 @@ public class BlobStoreResource1 { private final LoadingCache _putObjectRequestsByApiKey; private final LoadingCache _deleteObjectRequestsByApiKey; + public BlobStoreResource1(BlobStore blobStore, Set approvedContentTypes, MetricRegistry metricRegistry) { _blobStore = blobStore; _approvedContentTypes = approvedContentTypes; _metricRegistry = metricRegistry; - _listTableRequestsByApiKey = createMetricCache("listTablesByApiKey"); _createTableRequestsByApiKey = createMetricCache("createTableByApiKey"); _dropTableRequestsByApiKey = createMetricCache("dropTableByApiKey"); @@ -130,7 +111,7 @@ public BlobStoreResource1(BlobStore blobStore, Set approvedContentTypes, _getObjectRequestsByApiKey = createMetricCache("getByApiKey"); _putObjectRequestsByApiKey = createMetricCache("putByApiKey"); _deleteObjectRequestsByApiKey = createMetricCache("deleteByApiKey"); - + _messagingService= new SQSServiceFactory().createSQSService(); } private LoadingCache createMetricCache(String metricName) { @@ -160,10 +141,10 @@ public Iterator listTables(@QueryParam("from") final String fromKeyExclus @Authenticated Subject subject) { _listTableRequestsByApiKey.getUnchecked(subject.getId()).mark(); return streamingIterator( - StreamSupport.stream(Spliterators.spliteratorUnknownSize(_blobStore.listTables(Strings.emptyToNull(fromKeyExclusive), Long.MAX_VALUE), 0), false) - .filter(input -> subject.hasPermission(Permissions.readBlobTable(new NamedResource(input.getName())))) - .limit(limit.get()) - .iterator() + StreamSupport.stream(Spliterators.spliteratorUnknownSize(_blobStore.listTables(Strings.emptyToNull(fromKeyExclusive), Long.MAX_VALUE), 0), false) + .filter(input -> subject.hasPermission(Permissions.readBlobTable(new NamedResource(input.getName())))) + .limit(limit.get()) + .iterator() ); } @@ -191,8 +172,16 @@ public SuccessResponse createTable(@PathParam("table") String table, if (!subject.hasPermission(Permissions.createBlobTable(resource))) { throw new UnauthorizedException(); } - _blobStore.createTable(table, options, attributes, audit); + try { + _messagingService.sendCreateTableSQS(table,options,attributes,audit); + } catch (IOException | AmazonClientException e) { + _log.error("Failed to send create table message to SQS for table {}: {}", table, e.getMessage()); + throw new SQSMessageException("Failed to send create table message to SQS", e); + } catch (RuntimeException e) { + _log.error("Unexpected error occurred while sending create table message to SQS for table {}: {}", table, e.getMessage()); + throw new SQSMessageException("Unexpected error occurred while sending create table message to SQS", e); + } return SuccessResponse.instance(); } @@ -211,6 +200,15 @@ public SuccessResponse dropTable(@PathParam("table") String table, _dropTableRequestsByApiKey.getUnchecked(subject.getId()).mark(); Audit audit = getRequired(auditParam, "audit"); _blobStore.dropTable(table, audit); + try { + _messagingService.sendDeleteTableSQS(table, audit); + } catch (IOException | AmazonClientException e) { + _log.error("Failed to send delete table message to SQS for table {}: {}", table, e.getMessage()); + throw new SQSMessageException("Failed to send delete table message to SQS", e); + } catch (RuntimeException e) { + _log.error("Unexpected error occurred while sending delete table message to SQS for table {}: {}", table, e.getMessage()); + throw new SQSMessageException("Unexpected error occurred while sending delete table message to SQS", e); + } return SuccessResponse.instance(); } @@ -228,6 +226,15 @@ public SuccessResponse purgeTable(@PathParam("table") String table, _purgeTableRequestsByApiKey.getUnchecked(subject.getId()).mark(); Audit audit = getRequired(auditParam, "audit"); _blobStore.purgeTableUnsafe(table, audit); + try { + _messagingService.purgeTableSQS(table,audit); + } catch (IOException | AmazonClientException| UnsupportedOperationException e) { + _log.error("Failed to send purge table message to SQS for table {}: {}", table, e.getMessage()); + throw new SQSMessageException("Failed to send purge table message to SQS", e); + } catch (RuntimeException e) { + _log.error("Unexpected error occurred while sending purge table message to SQS for table {}: {}", table, e.getMessage()); + throw new SQSMessageException("Unexpected error occurred while sending purge table message to SQS", e); + } return SuccessResponse.instance(); } @@ -262,6 +269,18 @@ public SuccessResponse setTableAttributes(@PathParam("table") String table, _setTableAttributesRequestsByApiKey.getUnchecked(subject.getId()).mark(); Audit audit = getRequired(auditParam, "audit"); _blobStore.setTableAttributes(table, attributes, audit); + try { + //send table attributes to sqs queue + _messagingService.putTableAttributesSQS(table,attributes,audit); + + } catch (IOException | AmazonClientException e) { + _log.error("Failed to send put table attributes message to SQS for table {}: {}", table, e.getMessage()); + throw new SQSMessageException("Failed to send put table attributes message to SQS", e); + } catch (RuntimeException e) { + _log.error("Unexpected error occurred while sending put table attributes message to SQS for table {}: {}", table, e.getMessage()); + throw new SQSMessageException("Unexpected error occurred while sending put table attributes message to SQS", e); + } + return SuccessResponse.instance(); } @@ -467,6 +486,7 @@ public SuccessResponse put(@PathParam("table") String table, InputStream in, @QueryParam("ttl") SecondsParam ttlParam, @Context HttpHeaders headers, + @Context UriInfo uriInfo, @Authenticated Subject subject) throws IOException { _putObjectRequestsByApiKey.getUnchecked(subject.getId()).mark(); @@ -492,8 +512,25 @@ public SuccessResponse put(@PathParam("table") String table, throw new IllegalArgumentException(String.format("Ttl:%s is specified for blobId:%s", ttl, blobId)); } + byte[] byteArray = IOUtils.toByteArray(in); + + String requestUrl= uriInfo.getRequestUri().toString(); + // Perform the put - _blobStore.put(table, blobId, onceOnlySupplier(in), attributes); + InputStream inputStream = new ByteArrayInputStream(byteArray); + _blobStore.put(table, blobId, onceOnlySupplier(inputStream), attributes); + + // Send the buffer bytes to SQS + try { + _messagingService.sendPutRequestSQS(table, blobId, attributes, requestUrl); + } catch (IOException | AmazonClientException e) { + _log.error("Failed to send put blob message to SQS for table {}: {}", table, e.getMessage()); + throw new SQSMessageException("Failed to send put blob message to SQS", e); + } catch (RuntimeException e) { + _log.error("Unexpected error occurred while sending put blob message to SQS for table {}: {}", table, e.getMessage()); + throw new SQSMessageException("Unexpected error occurred while sending put blob message to SQS", e); + } + return SuccessResponse.instance(); } @@ -510,6 +547,15 @@ public SuccessResponse delete(@PathParam("table") String table, @PathParam("blobId") String blobId, @Authenticated Subject subject) { _deleteObjectRequestsByApiKey.getUnchecked(subject.getId()).mark(); + try { + _messagingService.sendDeleteRequestSQS(table, blobId); + } catch (IOException | AmazonClientException e) { + _log.error("Failed to send delete blob message to SQS for table {}: {}", table, e.getMessage()); + throw new SQSMessageException("Failed to send delete blob message to SQS", e); + } catch (RuntimeException e) { + _log.error("Unexpected error occurred while sending delete blob message to SQS for table {}: {}", table, e.getMessage()); + throw new SQSMessageException("Unexpected error occurred while sending delete blob message to SQS", e); + } _blobStore.delete(table, blobId); return SuccessResponse.instance(); } diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/MessagingService.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/MessagingService.java new file mode 100644 index 0000000000..d6caaded9a --- /dev/null +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/MessagingService.java @@ -0,0 +1,21 @@ +package com.bazaarvoice.emodb.web.resources.blob.messageQueue; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.bazaarvoice.emodb.sor.api.Audit; +import com.bazaarvoice.emodb.sor.api.TableOptions; + +import java.io.IOException; +import java.util.Map; + +/** + * Interface for interacting with a messaging service. + */ +public interface MessagingService { + void sendPutRequestSQS(String table, String blobId, Map attributes, String requestUrl) throws IOException; + void sendDeleteRequestSQS(String table, String blobId) throws IOException; + void sendCreateTableSQS(String table, TableOptions options, Map attributes, Audit audit) throws JsonProcessingException; + void sendDeleteTableSQS(String table, Audit audit) throws IOException; + void purgeTableSQS(String table, Audit audit) throws IOException; + void putTableAttributesSQS(String table, Map attributes, Audit audit) throws JsonProcessingException; +} + + diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSMessageException.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSMessageException.java new file mode 100644 index 0000000000..edc35e6acc --- /dev/null +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSMessageException.java @@ -0,0 +1,7 @@ +package com.bazaarvoice.emodb.web.resources.blob.messageQueue; + +public class SQSMessageException extends RuntimeException { + public SQSMessageException(String message, Throwable cause) { + super(message, cause); + } +} \ No newline at end of file diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSService.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSService.java new file mode 100644 index 0000000000..08487fdaee --- /dev/null +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSService.java @@ -0,0 +1,136 @@ +package com.bazaarvoice.emodb.web.resources.blob.messageQueue; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import com.amazonaws.services.sqs.model.SendMessageResult; +import com.bazaarvoice.emodb.sor.api.Audit; +import com.bazaarvoice.emodb.sor.api.TableOptions; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.xml.bind.DatatypeConverter; +import java.util.HashMap; +import java.util.Map; + +/** + * Service class for interacting with Amazon SQS (Simple Queue Service). + */ +public class SQSService implements MessagingService { + private static final Logger _log = LoggerFactory.getLogger(SQSService.class); + + private final AmazonSQS sqs; + private final String queueUrl; + private final ObjectMapper objectMapper; + + /** + * Constructor for SQSService. + * + * @param queueName The name of the SQS queue to send messages to. + * @param objectMapper ObjectMapper for converting messages to JSON format. + * @param sqs AmazonSQS for sending messages + */ + public SQSService(String queueName, ObjectMapper objectMapper, AmazonSQS sqs) { + this.objectMapper = objectMapper; + this.sqs = sqs; + this.queueUrl = sqs.getQueueUrl(queueName).getQueueUrl(); + } + + @Override + public void sendPutRequestSQS(String table, String blobId, Map attributes, String requestUrl) { + Map messageMap = new HashMap<>(); + messageMap.put("method", "PUT_TABLE_BLOBID"); + messageMap.put("tenantName", "datastorage"); + messageMap.put("requestUrl", requestUrl); + messageMap.put("table", table); + messageMap.put("blobId", blobId); + messageMap.put("attributes", attributes); + + // Logging the length of the byte array + // _log.debug("Byte array length: {}", byteArray.length); + + // Convert byte array to base64 string + // String base64Data = DatatypeConverter.printBase64Binary(byteArray); + // messageMap.put("data", base64Data); + _log.debug("Sending PUT request to SQS. Table: {}, BlobId: {}, RequestUrl: {}", table, blobId, requestUrl); + sendMessageSQS(messageMap); + } + + @Override + public void sendDeleteRequestSQS(String table, String blobId) { + Map messageMap = new HashMap<>(); + messageMap.put("method", "DELETE_BLOB"); + messageMap.put("table", table); + messageMap.put("blobId", blobId); + sendMessageSQS(messageMap); + } + + @Override + public void sendCreateTableSQS(String table, TableOptions options, Map attributes, Audit audit) { + Map messageMap = new HashMap<>(); + messageMap.put("method", "CREATE_TABLE"); + messageMap.put("table", table); + messageMap.put("options", options); + messageMap.put("attributes", attributes); + messageMap.put("audit", audit); + sendMessageSQS(messageMap); + } + + @Override + public void sendDeleteTableSQS(String table, Audit audit) { + Map messageMap = new HashMap<>(); + messageMap.put("method", "DELETE_TABLE"); + messageMap.put("table", table); + messageMap.put("audit", audit); + sendMessageSQS(messageMap); + } + + @Override + public void purgeTableSQS(String table, Audit audit) { + Map messageMap = new HashMap<>(); + messageMap.put("method", "PURGE_TABLE"); + messageMap.put("table", table); + messageMap.put("audit", audit); + sendMessageSQS(messageMap); + } + + @Override + public void putTableAttributesSQS(String table, Map attributes, Audit audit) { + Map messageMap = new HashMap<>(); + messageMap.put("method", "SET_TABLE_ATTRIBUTE"); + messageMap.put("table", table); + messageMap.put("attributes", attributes); + messageMap.put("audit", audit); + sendMessageSQS(messageMap); + } + + private void sendMessageSQS(Map messageMap) { + try { + String messageBody = objectMapper.writeValueAsString(messageMap); + String messageGroupId = "blob"; + SendMessageRequest sendMessageRequest = new SendMessageRequest() + .withQueueUrl(queueUrl) + .withMessageBody(messageBody) + .withMessageGroupId(messageGroupId); + SendMessageResult result = sqs.sendMessage(sendMessageRequest); + _log.info("Message sent successfully to SQS. Message ID: {}", result.getMessageId()); + } catch (JsonProcessingException e) { + _log.error("Error converting message to JSON: {}", e.getMessage()); + throw new SQSMessageException("Failed to convert message to JSON", e); + } catch (AmazonServiceException e) { + _log.error("AmazonServiceException: {}", e.getMessage()); + throw new SQSMessageException("AWS service error occurred while sending message to SQS", e); + } catch (AmazonClientException e) { + _log.error("AmazonClientException: {}", e.getMessage()); + throw new SQSMessageException("Client error occurred while sending message to SQS", e); + } catch (Exception e) { + _log.error("Unexpected error occurred: {}", e.getMessage(), e); + throw new SQSMessageException("Unexpected error occurred while sending message to SQS", e); + } + } +} + + diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSServiceFactory.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSServiceFactory.java new file mode 100644 index 0000000000..9638a8bb1d --- /dev/null +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/messageQueue/SQSServiceFactory.java @@ -0,0 +1,12 @@ +package com.bazaarvoice.emodb.web.resources.blob.messageQueue; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class SQSServiceFactory { + public MessagingService createSQSService() { + return new SQSService("blobMigrationQueue.fifo", new ObjectMapper(), AmazonSQSClientBuilder.standard().build()); + } +} + diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java index 0f24879873..c6dcd408fc 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/DedupQueueResource1.java @@ -10,6 +10,8 @@ import com.bazaarvoice.emodb.web.auth.resource.NamedResource; import com.bazaarvoice.emodb.web.jersey.params.SecondsParam; import com.bazaarvoice.emodb.web.resources.SuccessResponse; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.annotation.Timed; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -48,10 +50,23 @@ public class DedupQueueResource1 { private final DedupQueueService _queueService; private final DedupQueueServiceAuthenticator _queueClient; + private final Meter _nullPollDedupCount; + private final Meter _messageDedupCount; + private final Meter _sendDedupCount; + private final Meter _sendNullDedupCount; + private final Meter _sendBatchNullDedupCount; + private final Meter _sendBatchDedupCount; - public DedupQueueResource1(DedupQueueService queueService, DedupQueueServiceAuthenticator queueClient) { + + public DedupQueueResource1(DedupQueueService queueService, DedupQueueServiceAuthenticator queueClient, MetricRegistry metricRegistry) { _queueService = requireNonNull(queueService, "queueService"); _queueClient = requireNonNull(queueClient, "queueClient"); + _nullPollDedupCount = metricRegistry.meter(MetricRegistry.name(DedupQueueResource1.class, "nullPollsDedupCount")); + _messageDedupCount = metricRegistry.meter(MetricRegistry.name(DedupQueueResource1.class, "polledMessageDedupCount")); + _sendDedupCount= metricRegistry.meter(MetricRegistry.name(DedupQueueResource1.class,"sendDedupCount")); + _sendNullDedupCount= metricRegistry.meter(MetricRegistry.name(DedupQueueResource1.class,"sendNullDedupCount")); + _sendBatchDedupCount= metricRegistry.meter(MetricRegistry.name(DedupQueueResource1.class,"sendBatchDedupCount")); + _sendBatchNullDedupCount= metricRegistry.meter(MetricRegistry.name(DedupQueueResource1.class,"sendBatchNullDedupCount")); } @POST @@ -65,6 +80,12 @@ public DedupQueueResource1(DedupQueueService queueService, DedupQueueServiceAuth ) public SuccessResponse send(@PathParam("queue") String queue, Object message) { // Not partitioned--any server can write messages to Cassandra. + if (message == null) { + _sendNullDedupCount.mark(); + } + else{ + _sendDedupCount.mark(); + } _queueService.send(queue, message); return SuccessResponse.instance(); } @@ -80,6 +101,12 @@ public SuccessResponse send(@PathParam("queue") String queue, Object message) { ) public SuccessResponse sendBatch(@PathParam("queue") String queue, Collection messages) { // Not partitioned--any server can write messages to Cassandra. + if (messages == null || messages.isEmpty()) { + _sendBatchNullDedupCount.mark(); // Increment the sendnull meter + } + else { + _sendBatchDedupCount.mark(messages.size()); + } _queueService.sendAll(queue, messages); return SuccessResponse.instance(); } @@ -171,7 +198,13 @@ public List poll(@QueryParam("partitioned") BooleanParam partitioned, @QueryParam("ttl") @DefaultValue("30") SecondsParam claimTtl, @QueryParam("limit") @DefaultValue("10") IntParam limit, @Authenticated Subject subject) { - return getService(partitioned, subject.getAuthenticationId()).poll(queue, claimTtl.get(), limit.get()); + + List polledMessages = getService(partitioned, subject.getAuthenticationId()).poll(queue, claimTtl.get(), limit.get()); + _messageDedupCount.mark(polledMessages.size()); + if(polledMessages.isEmpty()){ + _nullPollDedupCount.mark(); + } + return polledMessages; } @POST diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java index a0be8a9eb9..ff6334db05 100644 --- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java +++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java @@ -10,6 +10,8 @@ import com.bazaarvoice.emodb.web.auth.resource.NamedResource; import com.bazaarvoice.emodb.web.jersey.params.SecondsParam; import com.bazaarvoice.emodb.web.resources.SuccessResponse; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.annotation.Timed; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -36,6 +38,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Queue; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -46,12 +49,31 @@ @Api (value="Queue: " , description = "All Queue operations") public class QueueResource1 { + //private final MetricRegistry _metricRegistry; private final QueueService _queueService; private final QueueServiceAuthenticator _queueClient; + private final Meter _messageCount_qr1; + private final Meter _nullPollsCount_qr1; + + private final Meter _sendCount_qr1; + private final Meter _sendNullCount_qr1; + + private final Meter _sendBatch_qr1; + + private final Meter _sendBatchNull_qr1; + + public QueueResource1(QueueService queueService, QueueServiceAuthenticator queueClient, MetricRegistry metricRegistry) { + //this._metricRegistry = metricRegistry; - public QueueResource1(QueueService queueService, QueueServiceAuthenticator queueClient) { _queueService = requireNonNull(queueService, "queueService"); _queueClient = requireNonNull(queueClient, "queueClient"); + _messageCount_qr1 = metricRegistry.meter(MetricRegistry.name(QueueResource1.class, "polledMessageCount_qr1")); + _nullPollsCount_qr1 = metricRegistry.meter(MetricRegistry.name(QueueResource1.class, "nullPollsCount_qr1")); + _sendCount_qr1= metricRegistry.meter(MetricRegistry.name(QueueResource1.class,"sendCount_qr1")); + _sendNullCount_qr1= metricRegistry.meter(MetricRegistry.name(QueueResource1.class,"sendNullCount_qr1")); + _sendBatch_qr1= metricRegistry.meter(MetricRegistry.name(QueueResource1.class,"sendBatch_qr1")); + _sendBatchNull_qr1= metricRegistry.meter(MetricRegistry.name(QueueResource1.class,"sendBatchNull_qr1")); + } @POST @@ -65,6 +87,13 @@ public QueueResource1(QueueService queueService, QueueServiceAuthenticator queue ) public SuccessResponse send(@PathParam("queue") String queue, Object message) { // Not partitioned--any server can write messages to Cassandra. + + if (message == null) { + _sendNullCount_qr1.mark(); + } + else{ + _sendCount_qr1.mark(); + } _queueService.send(queue, message); return SuccessResponse.instance(); } @@ -79,6 +108,13 @@ public SuccessResponse send(@PathParam("queue") String queue, Object message) { response = SuccessResponse.class ) public SuccessResponse sendBatch(@PathParam("queue") String queue, Collection messages) { + + if (messages == null || messages.isEmpty()) { + _sendBatchNull_qr1.mark(); // Increment the sendnull meter + } + else { + _sendBatch_qr1.mark(messages.size()); + } // Not partitioned--any server can write messages to Cassandra. _queueService.sendAll(queue, messages); return SuccessResponse.instance(); @@ -168,7 +204,14 @@ public List poll(@QueryParam("partitioned") BooleanParam partitioned, @QueryParam("ttl") @DefaultValue("30") SecondsParam claimTtl, @QueryParam("limit") @DefaultValue("10") IntParam limit, @Authenticated Subject subject) { - return getService(partitioned, subject.getAuthenticationId()).poll(queue, claimTtl.get(), limit.get()); + List polledMessages = getService(partitioned, subject.getAuthenticationId()).poll(queue, claimTtl.get(), limit.get()); + if(polledMessages.isEmpty()){ + _nullPollsCount_qr1.mark(); + } + else{ + _messageCount_qr1.mark(polledMessages.size()); + } + return polledMessages; } @POST diff --git a/yum/pom.xml b/yum/pom.xml index 865aa886a5..adee7cf547 100644 --- a/yum/pom.xml +++ b/yum/pom.xml @@ -4,7 +4,7 @@ com.bazaarvoice.emodb emodb-parent - 6.5.137-SNAPSHOT + 6.5.171-SNAPSHOT ../parent/pom.xml