Skip to content

Commit

Permalink
softwaremill#124 - Allow for setting the redrive policy through SetQu…
Browse files Browse the repository at this point in the history
…eueAttributes
  • Loading branch information
simong committed Jan 12, 2019
1 parent 69ac38f commit 68edafa
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package org.elasticmq.rest.sqs.model
package org.elasticmq

case class RedrivePolicy(queueName: String, maxReceiveCount: Int)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ trait QueueActorQueueOps extends Logging {
case UpdateQueueReceiveMessageWait(newReceiveMessageWait) =>
logger.info(s"${queueData.name}: Updating receive message wait to $newReceiveMessageWait")
queueData = queueData.copy(receiveMessageWait = newReceiveMessageWait)
case UpdateQueueRedrivePolicy(newPolicy) =>
val dlqData = DeadLettersQueueData(newPolicy.queueName, newPolicy.maxReceiveCount)
queueData = queueData.copy(deadLettersQueue = Some(dlqData))
case ClearQueue() =>
messageQueue.clear()
case GetQueueStatistics(deliveryTime) => getQueueStatistics(deliveryTime)
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/elasticmq/msg/QueueMsg.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ case class UpdateQueueDefaultVisibilityTimeout(newDefaultVisibilityTimeout: Mill
extends QueueQueueMsg[Unit]
case class UpdateQueueDelay(newDelay: Duration) extends QueueQueueMsg[Unit]
case class UpdateQueueReceiveMessageWait(newReceiveMessageWait: Duration) extends QueueQueueMsg[Unit]
case class UpdateQueueRedrivePolicy(newRedrivePolicy: RedrivePolicy) extends QueueQueueMsg[Unit]
case class GetQueueStatistics(deliveryTime: Long) extends QueueQueueMsg[QueueStatistics]
case class ClearQueue() extends QueueQueueMsg[Unit]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import scala.util.control.Exception._
import com.amazonaws.AmazonServiceException
import org.elasticmq.util.Logging
import org.elasticmq._
import org.elasticmq.rest.sqs.model.RedrivePolicy

class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter with Logging {
val visibilityTimeoutAttribute = "VisibilityTimeout"
Expand Down Expand Up @@ -1456,7 +1455,7 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter
}
}

test("should validate redrive policy json") {
test("should validate the redrive policy json when creating a queue") {
// Then
a[AmazonSQSException] shouldBe thrownBy {
client.createQueue(
Expand All @@ -1470,6 +1469,38 @@ class AmazonJavaSdkTestSuite extends FunSuite with Matchers with BeforeAndAfter
}
}

test("should be able to set the redrive policy on an existing queue") {
import spray.json._
import org.elasticmq.rest.sqs.model.RedrivePolicyJson._
// Given two queues
client.createQueue(new CreateQueueRequest("dlq1")).getQueueUrl
val mainQueueUrl = client.createQueue(new CreateQueueRequest("q1")).getQueueUrl

// When setting the redrive policy
val policyStr = RedrivePolicy("dlq1", 1).toJson.toString()
client.setQueueAttributes(
new SetQueueAttributesRequest(mainQueueUrl,
Map(
redrivePolicyAttribute -> policyStr
)))

// The the redrive policy should be returned when getting the queue attributes
val queueAttributes = client.getQueueAttributes(mainQueueUrl, List("All")).getAttributes
queueAttributes.get(redrivePolicyAttribute) should be(policyStr)
}

test("should validate the redrive policy json when updating the attributes of a queue") {
client.createQueue(new CreateQueueRequest("dlq1")).getQueueUrl
val mainQueueUrl = client.createQueue(new CreateQueueRequest("q1")).getQueueUrl

a[AmazonSQSException] shouldBe thrownBy {
client.setQueueAttributes(mainQueueUrl, Map(redrivePolicyAttribute -> "not a proper json policy"))
}
a[AmazonSQSException] shouldBe thrownBy {
client.setQueueAttributes(mainQueueUrl, Map(redrivePolicyAttribute -> """{"wrong": "json"}"""))
}
}

def queueVisibilityTimeout(queueUrl: String) = getQueueLongAttribute(queueUrl, visibilityTimeoutAttribute)

def queueDelay(queueUrl: String) = getQueueLongAttribute(queueUrl, delaySecondsAttribute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ import org.elasticmq.rest.sqs.Constants._
import org.elasticmq.rest.sqs.CreateQueueDirectives._
import org.elasticmq.rest.sqs.ParametersUtil._
import org.elasticmq.rest.sqs.directives.ElasticMQDirectives
import org.elasticmq.{DeadLettersQueueData, MillisVisibilityTimeout, QueueData}
import org.elasticmq.{DeadLettersQueueData, MillisVisibilityTimeout, QueueData, RedrivePolicy}
import org.joda.time.{DateTime, Duration}
import spray.json._
import scala.async.Async._
import scala.concurrent.Future

import org.elasticmq.rest.sqs.model.RedrivePolicy
import spray.json.JsonParser.ParsingException

trait CreateQueueDirectives {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package org.elasticmq.rest.sqs

import org.elasticmq.MillisVisibilityTimeout
import org.elasticmq.{MillisVisibilityTimeout, RedrivePolicy}
import Constants._
import org.joda.time.Duration
import org.elasticmq.msg.{
GetQueueStatistics,
LookupQueue,
UpdateQueueDefaultVisibilityTimeout,
UpdateQueueDelay,
UpdateQueueReceiveMessageWait
UpdateQueueReceiveMessageWait,
UpdateQueueRedrivePolicy
}
import org.elasticmq.actor.reply._
import scala.concurrent.Future
import scala.util.Try

import org.elasticmq.rest.sqs.directives.ElasticMQDirectives
import org.elasticmq.rest.sqs.model.RedrivePolicy
import org.elasticmq.rest.sqs.model.RedrivePolicyJson
import spray.json._

trait QueueAttributesDirectives {
Expand All @@ -30,7 +33,7 @@ trait QueueAttributesDirectives {
val RedrivePolicyAttribute = "RedrivePolicy"

val AllUnsupportedAttributeNames = PolicyAttribute :: MaximumMessageSizeAttribute ::
MessageRetentionPeriodAttribute :: RedrivePolicyAttribute :: Nil
MessageRetentionPeriodAttribute :: Nil
}

object QueueReadableAttributeNames {
Expand Down Expand Up @@ -132,6 +135,20 @@ trait QueueAttributesDirectives {
case ReceiveMessageWaitTimeSecondsAttribute => {
queueActor ? UpdateQueueReceiveMessageWait(Duration.standardSeconds(attributeValue.toLong))
}
case RedrivePolicyParameter =>
import RedrivePolicyJson._

for {
newRedrivePolicy <- Future.fromTry(Try(attributeValue.parseJson.convertTo[RedrivePolicy]))
_ = if (newRedrivePolicy.maxReceiveCount < 1 || newRedrivePolicy.maxReceiveCount > 1000) {
throw SQSException.invalidParameterValue
}
dlq <- queueManagerActor ? LookupQueue(newRedrivePolicy.queueName)
_ = if (dlq.isEmpty) {
throw SQSException.nonExistentQueue
}
_ <- queueActor ? UpdateQueueRedrivePolicy(newRedrivePolicy)
} yield ()
case attr
if UnsupportedAttributeNames.AllUnsupportedAttributeNames
.contains(attr) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.elasticmq.rest.sqs.model

import org.elasticmq.RedrivePolicy
import spray.json.{
DefaultJsonProtocol,
JsNumber,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.elasticmq.rest.sqs

import akka.http.scaladsl.testkit.ScalatestRouteTest
import org.elasticmq.rest.sqs.model.RedrivePolicy
import org.elasticmq.RedrivePolicy
import org.scalatest.{FlatSpec, Matchers}
import spray.json._

Expand Down

0 comments on commit 68edafa

Please sign in to comment.