Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

producer.send now returns the JMS Message ID #363

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/main/scala/jms4s/JmsClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ class JmsClient[F[_]: Async] private[jms4s] (private[jms4s] val context: JmsCont
.map(JmsAcknowledgerConsumer.make(_))

def createProducer(
concurrencyLevel: Int
concurrencyLevel: Int,
disableMessageId: Boolean = false
): Resource[F, JmsProducer[F]] =
JmsProducer.make[F](context, concurrencyLevel)
JmsProducer.make[F](context, concurrencyLevel, disableMessageId)

}
53 changes: 28 additions & 25 deletions core/src/main/scala/jms4s/JmsProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ trait JmsProducer[F[_]] {

def sendN(
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, DestinationName)]]
): F[Unit]
): F[NonEmptyList[Option[String]]]

def sendNWithDelay(
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]]
): F[Unit]
): F[NonEmptyList[Option[String]]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can simplify this(and the others) return type with a simple
List[String]
returnig a NonEmptyList with Options inside(that can be None) does not make a lot of sense to me :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@faustin0 Do you want me to do that to be able to get this PR merged, or should this be done in a future PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adamretter can also be in a future PR but, being a change in a public api, maybe is better to "fix" now the return type of these methods. What do you think?


def sendWithDelay(
messageFactory: MessageFactory[F] => F[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): F[Unit]
): F[Option[String]]

def send(messageFactory: MessageFactory[F] => F[(JmsMessage, DestinationName)]): F[Unit]
def send(messageFactory: MessageFactory[F] => F[(JmsMessage, DestinationName)]): F[Option[String]]

}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe adding some "general" docs to the JmsProducer to tell what is returned from these methods can be helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I can add something...

Expand Down Expand Up @@ -78,60 +78,63 @@ object JmsProducer {

private[jms4s] def make[F[_]: Async](
context: JmsContext[F],
concurrencyLevel: Int
concurrencyLevel: Int,
disableMessageId: Boolean = false
): Resource[F, JmsProducer[F]] =
for {
pool <- ContextPool.create(context, concurrencyLevel)
} yield new JmsProducer[F] {

override def sendN(
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage, DestinationName)]]
): F[Unit] =
): F[NonEmptyList[Option[String]]] =
pool.acquireAndUseContext {
case (ctx, mf) =>
for {
messagesWithDestinations <- f(mf)
_ <- messagesWithDestinations.traverse_ {
case (message, destinationName) => ctx.send(destinationName, message)
}
} yield ()
messageIds <- messagesWithDestinations.traverse {
case (message, destinationName) => ctx.send(destinationName, message, disableMessageId)
}
} yield messageIds
}

override def sendNWithDelay(
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]]
): F[Unit] =
): F[NonEmptyList[Option[String]]] =
pool.acquireAndUseContext {
case (ctx, mf) =>
for {
messagesWithDestinationsAndDelayes <- f(mf)
_ <- messagesWithDestinationsAndDelayes.traverse_ {
case (message, (destinatioName, duration)) =>
duration.fold(ctx.send(destinatioName, message))(delay =>
ctx.send(destinatioName, message, delay)
)
}

} yield ()
messageIds <- messagesWithDestinationsAndDelayes.traverse {
case (message, (destinatioName, duration)) =>
duration.fold(ctx.send(destinatioName, message, disableMessageId))(delay =>
ctx.send(destinatioName, message, delay, disableMessageId)
)
}

} yield messageIds
}

override def sendWithDelay(
f: MessageFactory[F] => F[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): F[Unit] =
): F[Option[String]] =
pool.acquireAndUseContext {
case (ctx, mf) =>
for {
(message, (destinationName, delay)) <- f(mf)
_ <- delay.fold(ctx.send(destinationName, message))(delay => ctx.send(destinationName, message, delay))
} yield ()
messageId <- delay.fold(ctx.send(destinationName, message, disableMessageId))(delay =>
ctx.send(destinationName, message, delay, disableMessageId)
)
} yield messageId
}

override def send(f: MessageFactory[F] => F[(JmsMessage, DestinationName)]): F[Unit] =
override def send(f: MessageFactory[F] => F[(JmsMessage, DestinationName)]): F[Option[String]] =
pool.acquireAndUseContext {
case (ctx, mf) =>
for {
(message, destination) <- f(mf)
_ <- ctx.send(destination, message)
} yield ()
messageId <- ctx.send(destination, message, disableMessageId)
} yield messageId
}

}
Expand Down
25 changes: 19 additions & 6 deletions core/src/main/scala/jms4s/jms/JmsContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,33 @@ class JmsContext[F[_]: Async: Logger](private val context: JMSContext) {
)
.map(context => new JmsContext(context))

def send(destinationName: DestinationName, message: JmsMessage): F[Unit] =
def send(destinationName: DestinationName, message: JmsMessage): F[Option[String]] =
send(destinationName, message, false)

def send(destinationName: DestinationName, message: JmsMessage, disableMessageId: Boolean): F[Option[String]] =
for {
destination <- createDestination(destinationName)
p <- Sync[F].blocking(context.createProducer())
p <- Sync[F].blocking(context.createProducer().setDisableMessageID(disableMessageId))
_ <- Sync[F].blocking(p.send(destination.wrapped, message.wrapped))
} yield ()
messageId <- Sync[F].pure(Option(message.wrapped.getJMSMessageID))
} yield messageId

def send(destinationName: DestinationName, message: JmsMessage, delay: FiniteDuration): F[Option[String]] =
send(destinationName, message, delay, false)

def send(destinationName: DestinationName, message: JmsMessage, delay: FiniteDuration): F[Unit] =
def send(
destinationName: DestinationName,
message: JmsMessage,
delay: FiniteDuration,
disableMessageId: Boolean
): F[Option[String]] =
for {
destination <- createDestination(destinationName)
p <- Sync[F].blocking(context.createProducer())
p <- Sync[F].blocking(context.createProducer().setDisableMessageID(disableMessageId))
_ <- Sync[F].delay(p.setDeliveryDelay(delay.toMillis))
_ <- Sync[F].blocking(p.send(destination.wrapped, message.wrapped))
} yield ()
messageId <- Sync[F].pure(Option(message.wrapped.getJMSMessageID))
} yield messageId

def createJmsConsumer(
destinationName: DestinationName,
Expand Down
8 changes: 5 additions & 3 deletions tests/src/test/scala/jms4s/JmsClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,14 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec {
res.use {
case (producer, consumer, bodies, messages) =>
for {
_ <- messages.toNel.fold(IO.unit)(ms => producer.sendN(messageFactory(ms, topicName1)))
messageIds <- messages.toNel.fold(IO.pure(List.empty[Option[String]]))(ms =>
producer.sendN(messageFactory(ms, topicName1)).map(_.toList)
)
_ <- logger.info(s"Pushed ${messages.size} messages.")
_ <- logger.info(s"Consumer to Producer started.\nCollecting messages from output queue...")
received <- Ref.of[IO, Set[String]](Set())
receivedMessages <- receiveUntil(consumer, received, nMessages).timeout(timeout) >> received.get
} yield assert(receivedMessages == bodies)
} yield assert(messages.size == messageIds.size && receivedMessages == bodies)
Copy link
Contributor

@faustin0 faustin0 Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now the messageIds can be a List of Nones so i would also check that inside the list there is something, like:
messageIds.forAll(_.nonEmpty) .
On the other hand, if the sendN will return a flattened List without the Option this check is not needed anymore.

}
}

Expand Down Expand Up @@ -459,7 +461,7 @@ trait JmsClientSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec {
_ <- (0 until poolSize).toList.traverse_ { _ =>
producer
.send(_ => IO.raiseError(new RuntimeException("failed producing")))
.handleErrorWith(logger.error(_)(""))
.handleErrorWith(logger.error(_)("").map(_ => None))
}
_ <- producer
.send(messageFactory(message, outputQueueName1))
Expand Down
6 changes: 3 additions & 3 deletions tests/src/test/scala/jms4s/jms/JmsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ trait JmsSpec extends AsyncFreeSpec with AsyncIOSpec with Jms4sBaseSpec {
contexts(inputQueueName).use {
case (receiveConsumer, sendContext, msg) =>
for {
_ <- sendContext.send(inputQueueName, msg)
text <- receiveBodyAsTextOrFail(receiveConsumer)
} yield assert(text == body)
messageId <- sendContext.send(inputQueueName, msg)
text <- receiveBodyAsTextOrFail(receiveConsumer)
} yield assert(text == body && messageId.nonEmpty)
}
}
"publish and then receive with a delay" in {
Expand Down