Skip to content

Commit

Permalink
Support temporary destinations (#377)
Browse files Browse the repository at this point in the history
* Add actual JMS Queue name to log, helps to diagnose issues (especially with temporary queues/topics)

* Enhance the API so that the JmsDestination may be passed, this is important for Temporary Queues and Temporary Topics, otherwise these are created at least twice (once for the consumer, and at least once per message sent)

* rework temp queue implementation

* remove JmsDestination / DestinationName logic from consumers/producer

* add a more meaningful tests

* get DestinationName from replyTo

---------

Co-authored-by: Adam Retter <[email protected]>
  • Loading branch information
faustin0 and adamretter authored Mar 19, 2023
1 parent f3d6f73 commit 40c9327
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 60 deletions.
7 changes: 3 additions & 4 deletions core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ object JmsAcknowledgerConsumer {
ifNoAck = Sync[F].unit,
ifSend = send =>
send.messages.messagesAndDestinations.traverse_ {
case (message, (name, delay)) =>
delay.fold(ifEmpty = context.send(name, message))(
f = d => context.send(name, message, d)
)
case (message, (name, Some(delay))) => context.send(name, message, delay)
case (message, (name, None)) => context.send(name, message)
} *> Sync[F].blocking(message.wrapped.acknowledge())
)
} yield ()
Expand Down Expand Up @@ -122,5 +120,6 @@ object JmsAcknowledgerConsumer {
destination: DestinationName
): AckAction[F] =
Send[F](ToSend[F](NonEmptyList.one((message, (destination, None)))))

}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/jms4s/JmsAutoAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ object JmsAutoAcknowledgerConsumer {
ifNoOp = Sync[F].unit,
ifSend = (send: Send[F]) =>
send.messages.messagesAndDestinations.traverse_ {
case (message, (name, delay)) =>
delay.fold(context.send(name, message))(delay => context.send(name, message, delay))
case (message, (name, Some(delay))) => context.send(name, message, delay)
case (message, (name, None)) => context.send(name, message)
}
)
} yield ()
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/jms4s/JmsClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
package jms4s

import cats.effect.{ Async, Resource }
import jms4s.config.DestinationName
import cats.syntax.all._
import jms4s.config.{ DestinationName, TemporaryQueueName, TemporaryTopicName }
import jms4s.jms._
import jms4s.model.SessionType

Expand Down Expand Up @@ -62,4 +63,9 @@ class JmsClient[F[_]: Async] private[jms4s] (private[jms4s] val context: JmsCont
): Resource[F, JmsProducer[F]] =
JmsProducer.make[F](context, concurrencyLevel)

def createTemporaryQueue: F[TemporaryQueueName] =
context.createTemporaryQueue.map(TemporaryQueueName)

def createTemporaryTopic: F[TemporaryTopicName] =
context.createTemporaryTopic.map(TemporaryTopicName)
}
1 change: 0 additions & 1 deletion core/src/main/scala/jms4s/JmsProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ trait JmsProducer[F[_]] {
): F[Unit]

def send(messageFactory: MessageFactory[F] => F[(JmsMessage, DestinationName)]): F[Unit]

}

private[jms4s] class ContextPool[F[_]: Sync](private val contextsPool: Queue[F, (JmsContext[F], MessageFactory[F])]) {
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/jms4s/JmsTransactedConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,8 @@ object JmsTransactedConsumer {
ifRollback = context.rollback,
ifSend = send =>
send.messages.messagesAndDestinations.traverse_ {
case (message, (name, delay)) =>
delay.fold(
context.send(name, message)
)(d => context.send(name, message, d))
case (message, (name, Some(delay))) => context.send(name, message, delay)
case (message, (name, None)) => context.send(name, message)
} *> context.commit
)
} yield ()
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/jms4s/config/config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,21 @@
package jms4s.config

import cats.Order
import jms4s.jms.JmsDestination.{ JmsQueue, JmsTopic }

sealed trait DestinationName extends Product with Serializable

sealed trait DestinationName extends Product with Serializable
case class QueueName(value: String) extends DestinationName
case class TopicName(value: String) extends DestinationName

case class TemporaryQueueName(destination: JmsQueue) extends DestinationName {
def value: String = destination.name
}

case class TemporaryTopicName(destination: JmsTopic) extends DestinationName {
def value: String = destination.name
}

object DestinationName {

implicit val orderingDestinationName: Order[DestinationName] = Order.from[DestinationName] {
Expand Down
56 changes: 41 additions & 15 deletions core/src/main/scala/jms4s/jms/JmsContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package jms4s.jms

import cats.effect.{ Async, Resource, Sync }
import cats.syntax.all._
import jms4s.config.{ DestinationName, QueueName, TopicName }
import jms4s.config._
import jms4s.jms.JmsDestination.{ JmsQueue, JmsTopic }
import jms4s.jms.JmsMessage.JmsTextMessage
import jms4s.model.SessionType
Expand All @@ -49,31 +49,49 @@ class JmsContext[F[_]: Async: Logger](private val context: JMSContext) {
.map(context => new JmsContext(context))

def send(destinationName: DestinationName, message: JmsMessage): F[Unit] =
toJmsDestination(destinationName).flatMap(send(_, message))

def send(destinationName: DestinationName, message: JmsMessage, delay: FiniteDuration): F[Unit] =
toJmsDestination(destinationName).flatMap(send(_, message, delay))

def send(jmsDestination: JmsDestination, message: JmsMessage): F[Unit] =
for {
destination <- createDestination(destinationName)
p <- Sync[F].blocking(context.createProducer())
_ <- Sync[F].blocking(p.send(destination.wrapped, message.wrapped))
_ <- Logger[F].trace(s"Sending message id=${message.getJMSMessageId} to $jmsDestination")
_ <- Sync[F].blocking {
context
.createProducer()
.send(jmsDestination.wrapped, message.wrapped)
}
_ <- Logger[F].trace(s"Sent message id=${message.getJMSMessageId} to $jmsDestination")
} yield ()

def send(destinationName: DestinationName, message: JmsMessage, delay: FiniteDuration): F[Unit] =
def send(jmsDestination: JmsDestination, message: JmsMessage, delay: FiniteDuration): F[Unit] =
for {
destination <- createDestination(destinationName)
p <- Sync[F].blocking(context.createProducer())
_ <- Sync[F].delay(p.setDeliveryDelay(delay.toMillis))
_ <- Sync[F].blocking(p.send(destination.wrapped, message.wrapped))
_ <- Logger[F].trace(
s"Sending message id=${message.getJMSMessageId} with delay=${delay.toMillis} to $jmsDestination"
)
_ <- Sync[F].blocking {
context
.createProducer()
.setDeliveryDelay(delay.toMillis)
.send(jmsDestination.wrapped, message.wrapped)
}
_ <- Logger[F].trace(
s"Sent message id=${message.getJMSMessageId} with delay=${delay.toMillis} to $jmsDestination"
)
} yield ()

def createJmsConsumer(
destinationName: DestinationName,
pollingInterval: FiniteDuration
): Resource[F, JmsMessageConsumer[F]] =
for {
destination <- Resource.eval(createDestination(destinationName))
destination <- Resource.eval(toJmsDestination(destinationName))
consumer <- Resource.make(
Logger[F].info(s"Creating consumer for destination $destinationName") *>
Logger[F].info(s"Creating consumer for destination $destination") *>
Sync[F].blocking(context.createConsumer(destination.wrapped))
)(consumer =>
Logger[F].info(s"Closing consumer for destination $destinationName") *>
Logger[F].info(s"Closing consumer for destination $destination") *>
Sync[F].blocking(consumer.close())
)
} yield new JmsMessageConsumer[F](consumer, pollingInterval)
Expand All @@ -91,9 +109,17 @@ class JmsContext[F[_]: Async: Logger](private val context: JMSContext) {
private def createTopic(topicName: TopicName): F[JmsTopic] =
Sync[F].blocking(context.createTopic(topicName.value)).map(new JmsTopic(_))

def createDestination(destination: DestinationName): F[JmsDestination] = destination match {
case q: QueueName => createQueue(q).widen[JmsDestination]
case t: TopicName => createTopic(t).widen[JmsDestination]
def createTemporaryTopic: F[JmsTopic] =
Sync[F].blocking(context.createTemporaryTopic()).map(new JmsTopic(_))

def createTemporaryQueue: F[JmsQueue] =
Sync[F].blocking(context.createTemporaryQueue()).map(new JmsQueue(_))

private def toJmsDestination(destination: DestinationName): F[JmsDestination] = destination match {
case qn: QueueName => createQueue(qn).widen[JmsDestination]
case tn: TopicName => createTopic(tn).widen[JmsDestination]
case TemporaryQueueName(destination) => destination.pure[F].widen[JmsDestination]
case TemporaryTopicName(destination) => destination.pure[F].widen[JmsDestination]
}

}
28 changes: 26 additions & 2 deletions core/src/main/scala/jms4s/jms/JmsDestination.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,37 @@

package jms4s.jms

import cats.Show

import javax.jms.{ Destination, Queue, Topic }

sealed abstract class JmsDestination {
private[jms4s] val wrapped: Destination
def name: String

override def toString: String = s"${getClass.getSimpleName}($name)"
}

object JmsDestination {
class JmsQueue private[jms4s] (private[jms4s] val wrapped: Queue) extends JmsDestination
class JmsTopic private[jms4s] (private[jms4s] val wrapped: Topic) extends JmsDestination

class JmsQueue private[jms4s] (private[jms4s] val wrapped: Queue) extends JmsDestination {
override def name: String = wrapped.getQueueName
}

class JmsTopic private[jms4s] (private[jms4s] val wrapped: Topic) extends JmsDestination {
override def name: String = wrapped.getTopicName
}

class Other private[jms4s] (private[jms4s] val wrapped: Destination) extends JmsDestination {
override def name: String = wrapped.toString
}

def fromDestination(destination: Destination): JmsDestination =
destination match {
case queue: Queue => new JmsQueue(queue)
case topic: Topic => new JmsTopic(topic)
case x => new Other(x)
}

implicit val showDestination: Show[JmsDestination] = Show.fromToString[JmsDestination]
}
29 changes: 20 additions & 9 deletions core/src/main/scala/jms4s/jms/JmsMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
package jms4s.jms

import cats.syntax.all._
import cats.{ ApplicativeError, Show }
import cats.{ ApplicativeError, MonadThrow, Show }
import jms4s.config.{ DestinationName, QueueName, TopicName }
import jms4s.jms.JmsMessage.{ JmsTextMessage, UnsupportedMessage }
import jms4s.jms.utils.TryUtils._

import javax.jms.{ Destination, Message, TextMessage }
import javax.jms._
import scala.util.control.NoStackTrace
import scala.util.{ Failure, Success, Try }

Expand Down Expand Up @@ -68,13 +69,23 @@ class JmsMessage private[jms4s] (private[jms4s] val wrapped: Message) {
def getJMSCorrelationId: Option[String] = Try(Option(wrapped.getJMSCorrelationID)).toOpt
def getJMSCorrelationIdAsBytes: Option[Array[Byte]] = Try(Option(wrapped.getJMSCorrelationIDAsBytes)).toOpt
def getJMSReplyTo: Option[Destination] = Try(Option(wrapped.getJMSReplyTo)).toOpt
def getJMSDestination: Option[Destination] = Try(Option(wrapped.getJMSDestination)).toOpt
def getJMSDeliveryMode: Option[Int] = Try(Option(wrapped.getJMSDeliveryMode)).toOpt
def getJMSRedelivered: Option[Boolean] = Try(Option(wrapped.getJMSRedelivered)).toOpt
def getJMSType: Option[String] = Try(Option(wrapped.getJMSType)).toOpt
def getJMSExpiration: Option[Long] = Try(Option(wrapped.getJMSExpiration)).toOpt
def getJMSPriority: Option[Int] = Try(Option(wrapped.getJMSPriority)).toOpt
def getJMSDeliveryTime: Option[Long] = Try(Option(wrapped.getJMSDeliveryTime)).toOpt

def getJMSReplyToNameF[F[_]: MonadThrow]: F[DestinationName] =
MonadThrow[F]
.catchNonFatal(wrapped.getJMSReplyTo)
.ensureOr(_ => new NoSuchElementException("ReplyTo is null"))(_ != null)
.flatMap {
case q: Queue => QueueName(q.getQueueName).pure.widen
case t: Topic => TopicName(t.getTopicName).pure.widen
case x => new Exception(s"Failure extracting JMSReplyTo, unsupported Destination: $x").raiseError
}
def getJMSDestination: Option[Destination] = Try(Option(wrapped.getJMSDestination)).toOpt
def getJMSDeliveryMode: Option[Int] = Try(Option(wrapped.getJMSDeliveryMode)).toOpt
def getJMSRedelivered: Option[Boolean] = Try(Option(wrapped.getJMSRedelivered)).toOpt
def getJMSType: Option[String] = Try(Option(wrapped.getJMSType)).toOpt
def getJMSExpiration: Option[Long] = Try(Option(wrapped.getJMSExpiration)).toOpt
def getJMSPriority: Option[Int] = Try(Option(wrapped.getJMSPriority)).toOpt
def getJMSDeliveryTime: Option[Long] = Try(Option(wrapped.getJMSDeliveryTime)).toOpt

def getBooleanProperty(name: String): Option[Boolean] =
Try(Option(wrapped.getBooleanProperty(name))).toOpt
Expand Down
16 changes: 3 additions & 13 deletions core/src/main/scala/jms4s/jms/MessageFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ package jms4s.jms

import cats.syntax.all._
import cats.{ Applicative, MonadThrow }
import jms4s.jms.JmsDestination.{ JmsQueue, JmsTopic }
import jms4s.jms.JmsMessage.JmsTextMessage

import javax.jms.{ Queue, Topic }
import scala.util.{ Failure, Try }
import scala.util.Try

class MessageFactory[F[_]](private val context: JmsContext[F]) extends AnyVal {
def makeTextMessage(value: String): F[JmsTextMessage] = context.createTextMessage(value)
Expand All @@ -48,16 +46,8 @@ class MessageFactory[F[_]](private val context: JmsContext[F]) extends AnyVal {
from.getJMSMessageId.traverse_(to.setJMSMessageID),
from.getJMSTimestamp.traverse_(to.setJMSTimestamp),
from.getJMSCorrelationId.traverse_(to.setJMSCorrelationId),
from.getJMSReplyTo.traverse_ {
case queue: Queue => to.setJMSReplyTo(new JmsQueue(queue))
case topic: Topic => to.setJMSReplyTo(new JmsTopic(topic))
case d => Failure(new RuntimeException(s"Unsupported destination: $d"))
},
from.getJMSDestination.traverse_ {
case queue: Queue => to.setJMSDestination(new JmsQueue(queue))
case topic: Topic => to.setJMSDestination(new JmsTopic(topic))
case d => Failure(new RuntimeException(s"Unsupported destination: $d"))
},
from.getJMSReplyTo.traverse_(d => to.setJMSReplyTo(JmsDestination.fromDestination(d))),
from.getJMSDestination.traverse_(d => to.setJMSDestination(JmsDestination.fromDestination(d))),
from.getJMSDeliveryMode.traverse_(to.setJMSDeliveryMode),
from.getJMSRedelivered.traverse_(to.setJMSRedelivered),
from.getJMSType.traverse_(to.setJMSType),
Expand Down
5 changes: 4 additions & 1 deletion scripts/definitions.mqsc
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ DEFINE QLOCAL(DEV.QUEUE.2) REPLACE;
DEFINE QLOCAL(DEV.QUEUE.3) REPLACE;

DEFINE TOPIC ('DEV.CUSTOM.TOPIC.1') TYPE(LOCAL) TOPICSTR('dev1/') REPLACE;
DEFINE TOPIC ('DEV.CUSTOM.TOPIC.2') TYPE(LOCAL) TOPICSTR('dev2/') REPLACE;
DEFINE TOPIC ('DEV.CUSTOM.TOPIC.2') TYPE(LOCAL) TOPICSTR('dev2/') REPLACE;

SET AUTHREC PROFILE('SYSTEM.DEFAULT.MODEL.QUEUE') OBJTYPE(QUEUE) PRINCIPAL('app') AUTHADD(BROWSE,INQ,GET,PUT,SET)
SET AUTHREC PROFILE('SYSTEM.BASE.TOPIC') OBJTYPE(TOPIC) PRINCIPAL('app') AUTHADD(SUB,RESUME,PUB)
Loading

0 comments on commit 40c9327

Please sign in to comment.