Skip to content

Commit

Permalink
Also return messageIds from JmsProducer#send methods
Browse files Browse the repository at this point in the history
  • Loading branch information
adamretter committed Jan 13, 2023
1 parent 27757a3 commit cada77f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 29 deletions.
52 changes: 26 additions & 26 deletions core/src/main/scala/jms4s/JmsProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ trait JmsProducer[F[_]] {

def sendN(
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, DestinationName)]]
): F[Unit]
): F[NonEmptyList[Option[String]]]

def sendNWithDelay(
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]]
): F[Unit]
): F[NonEmptyList[Option[String]]]

def sendWithDelay(
messageFactory: MessageFactory[F] => F[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): F[Unit]
): F[Option[String]]

def send(messageFactory: MessageFactory[F] => F[(JmsMessage, DestinationName)]): F[Unit]
def send(messageFactory: MessageFactory[F] => F[(JmsMessage, DestinationName)]): F[Option[String]]

}

Expand Down Expand Up @@ -87,54 +87,54 @@ object JmsProducer {

override def sendN(
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage, DestinationName)]]
): F[Unit] =
): F[NonEmptyList[Option[String]]] =
pool.acquireAndUseContext {
case (ctx, mf) =>
for {
messagesWithDestinations <- f(mf)
_ <- messagesWithDestinations.traverse_ {
case (message, destinationName) => ctx.send(destinationName, message, disableMessageId)
}
} yield ()
messageIds <- messagesWithDestinations.traverse {
case (message, destinationName) => ctx.send(destinationName, message, disableMessageId)
}
} yield messageIds
}

override def sendNWithDelay(
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]]
): F[Unit] =
): F[NonEmptyList[Option[String]]] =
pool.acquireAndUseContext {
case (ctx, mf) =>
for {
messagesWithDestinationsAndDelayes <- f(mf)
_ <- messagesWithDestinationsAndDelayes.traverse_ {
case (message, (destinatioName, duration)) =>
duration.fold(ctx.send(destinatioName, message, disableMessageId))(delay =>
ctx.send(destinatioName, message, delay, disableMessageId)
)
}

} yield ()
messageIds <- messagesWithDestinationsAndDelayes.traverse {
case (message, (destinatioName, duration)) =>
duration.fold(ctx.send(destinatioName, message, disableMessageId))(delay =>
ctx.send(destinatioName, message, delay, disableMessageId)
)
}

} yield messageIds
}

override def sendWithDelay(
f: MessageFactory[F] => F[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): F[Unit] =
): F[Option[String]] =
pool.acquireAndUseContext {
case (ctx, mf) =>
for {
(message, (destinationName, delay)) <- f(mf)
_ <- delay.fold(ctx.send(destinationName, message, disableMessageId))(delay =>
ctx.send(destinationName, message, delay, disableMessageId)
)
} yield ()
messageId <- delay.fold(ctx.send(destinationName, message, disableMessageId))(delay =>
ctx.send(destinationName, message, delay, disableMessageId)
)
} yield messageId
}

override def send(f: MessageFactory[F] => F[(JmsMessage, DestinationName)]): F[Unit] =
override def send(f: MessageFactory[F] => F[(JmsMessage, DestinationName)]): F[Option[String]] =
pool.acquireAndUseContext {
case (ctx, mf) =>
for {
(message, destination) <- f(mf)
_ <- ctx.send(destination, message, disableMessageId)
} yield ()
messageId <- ctx.send(destination, message, disableMessageId)
} yield messageId
}

}
Expand Down
8 changes: 5 additions & 3 deletions tests/src/test/scala/jms4s/JmsClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,14 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec {
res.use {
case (producer, consumer, bodies, messages) =>
for {
_ <- messages.toNel.fold(IO.unit)(ms => producer.sendN(messageFactory(ms, topicName1)))
messageIds <- messages.toNel.fold(IO.pure(List.empty[Option[String]]))(ms =>
producer.sendN(messageFactory(ms, topicName1)).map(_.toList)
)
_ <- logger.info(s"Pushed ${messages.size} messages.")
_ <- logger.info(s"Consumer to Producer started.\nCollecting messages from output queue...")
received <- Ref.of[IO, Set[String]](Set())
receivedMessages <- receiveUntil(consumer, received, nMessages).timeout(timeout) >> received.get
} yield assert(receivedMessages == bodies)
} yield assert(messages.size == messageIds.size && receivedMessages == bodies)
}
}

Expand Down Expand Up @@ -459,7 +461,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec {
_ <- (0 until poolSize).toList.traverse_ { _ =>
producer
.send(_ => IO.raiseError(new RuntimeException("failed producing")))
.handleErrorWith(logger.error(_)(""))
.handleErrorWith(logger.error(_)("").map(_ => None))
}
_ <- producer
.send(messageFactory(message, outputQueueName1))
Expand Down

0 comments on commit cada77f

Please sign in to comment.