Skip to content

Commit

Permalink
The JMS Message ID is only available after the message has been sent.…
Browse files Browse the repository at this point in the history
… This commit allows the producer.send methods to return the JMS Message ID if it was set by the underlying implementation
  • Loading branch information
adamretter committed Jan 12, 2023
1 parent d0d1b97 commit 45f52cd
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 17 deletions.
5 changes: 3 additions & 2 deletions core/src/main/scala/jms4s/JmsClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ class JmsClient[F[_]: Async] private[jms4s] (private[jms4s] val context: JmsCont
.map(JmsAcknowledgerConsumer.make(_))

def createProducer(
concurrencyLevel: Int
concurrencyLevel: Int,
disableMessageId: Boolean = false
): Resource[F, JmsProducer[F]] =
JmsProducer.make[F](context, concurrencyLevel)
JmsProducer.make[F](context, concurrencyLevel, disableMessageId)

}
15 changes: 9 additions & 6 deletions core/src/main/scala/jms4s/JmsProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ object JmsProducer {

private[jms4s] def make[F[_]: Async](
context: JmsContext[F],
concurrencyLevel: Int
concurrencyLevel: Int,
disableMessageId: Boolean = false
): Resource[F, JmsProducer[F]] =
for {
pool <- ContextPool.create(context, concurrencyLevel)
Expand All @@ -92,7 +93,7 @@ object JmsProducer {
for {
messagesWithDestinations <- f(mf)
_ <- messagesWithDestinations.traverse_ {
case (message, destinationName) => ctx.send(destinationName, message)
case (message, destinationName) => ctx.send(destinationName, message, disableMessageId)
}
} yield ()
}
Expand All @@ -106,8 +107,8 @@ object JmsProducer {
messagesWithDestinationsAndDelayes <- f(mf)
_ <- messagesWithDestinationsAndDelayes.traverse_ {
case (message, (destinatioName, duration)) =>
duration.fold(ctx.send(destinatioName, message))(delay =>
ctx.send(destinatioName, message, delay)
duration.fold(ctx.send(destinatioName, message, disableMessageId))(delay =>
ctx.send(destinatioName, message, delay, disableMessageId)
)
}

Expand All @@ -121,7 +122,9 @@ object JmsProducer {
case (ctx, mf) =>
for {
(message, (destinationName, delay)) <- f(mf)
_ <- delay.fold(ctx.send(destinationName, message))(delay => ctx.send(destinationName, message, delay))
_ <- delay.fold(ctx.send(destinationName, message, disableMessageId))(delay =>
ctx.send(destinationName, message, delay, disableMessageId)
)
} yield ()
}

Expand All @@ -130,7 +133,7 @@ object JmsProducer {
case (ctx, mf) =>
for {
(message, destination) <- f(mf)
_ <- ctx.send(destination, message)
_ <- ctx.send(destination, message, disableMessageId)
} yield ()
}

Expand Down
25 changes: 19 additions & 6 deletions core/src/main/scala/jms4s/jms/JmsContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,33 @@ class JmsContext[F[_]: Async: Logger](private val context: JMSContext) {
)
.map(context => new JmsContext(context))

def send(destinationName: DestinationName, message: JmsMessage): F[Unit] =
def send(destinationName: DestinationName, message: JmsMessage): F[Option[String]] =
send(destinationName, message, false)

def send(destinationName: DestinationName, message: JmsMessage, disableMessageId: Boolean): F[Option[String]] =
for {
destination <- createDestination(destinationName)
p <- Sync[F].blocking(context.createProducer())
p <- Sync[F].blocking(context.createProducer().setDisableMessageID(disableMessageId))
_ <- Sync[F].blocking(p.send(destination.wrapped, message.wrapped))
} yield ()
messageId <- Sync[F].pure(Option(message.wrapped.getJMSMessageID))
} yield messageId

def send(destinationName: DestinationName, message: JmsMessage, delay: FiniteDuration): F[Option[String]] =
send(destinationName, message, delay, false)

def send(destinationName: DestinationName, message: JmsMessage, delay: FiniteDuration): F[Unit] =
def send(
destinationName: DestinationName,
message: JmsMessage,
delay: FiniteDuration,
disableMessageId: Boolean
): F[Option[String]] =
for {
destination <- createDestination(destinationName)
p <- Sync[F].blocking(context.createProducer())
p <- Sync[F].blocking(context.createProducer().setDisableMessageID(disableMessageId))
_ <- Sync[F].delay(p.setDeliveryDelay(delay.toMillis))
_ <- Sync[F].blocking(p.send(destination.wrapped, message.wrapped))
} yield ()
messageId <- Sync[F].pure(Option(message.wrapped.getJMSMessageID))
} yield messageId

def createJmsConsumer(
destinationName: DestinationName,
Expand Down
6 changes: 3 additions & 3 deletions tests/src/test/scala/jms4s/jms/JmsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ trait JmsSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec {
contexts(inputQueueName).use {
case (receiveConsumer, sendContext, msg) =>
for {
_ <- sendContext.send(inputQueueName, msg)
text <- receiveBodyAsTextOrFail(receiveConsumer)
} yield assert(text == body)
messageId <- sendContext.send(inputQueueName, msg)
text <- receiveBodyAsTextOrFail(receiveConsumer)
} yield assert(text == body && messageId.nonEmpty)
}
}
"publish and then receive with a delay" in {
Expand Down

0 comments on commit 45f52cd

Please sign in to comment.