Skip to content

Commit

Permalink
Blocking ops & MessageFactory in pool (#293)
Browse files Browse the repository at this point in the history
* create Destinations and TextMessage are blocking operation

* avoid eagerly evaluating body

* extension methods class AnyVal

* create a MessageFactory for each producer context

* resolve some intelliJ type-inference issues
  • Loading branch information
faustin0 authored Apr 12, 2022
1 parent a9f5906 commit 57b5e0b
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 77 deletions.
31 changes: 24 additions & 7 deletions core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,31 @@ object JmsAcknowledgerConsumer {
object AckAction {

private[jms4s] case class Ack[F[_]]() extends AckAction[F] {
override def fold(ifAck: => F[Unit], ifNoAck: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] = ifAck

override def fold(
ifAck: => F[Unit],
ifNoAck: => F[Unit],
ifSend: AckAction.Send[F] => F[Unit]
): F[Unit] = ifAck
}

// if the client wants to ack groups of messages, it'll pass a sequence of NoAck and then a cumulative Ack
private[jms4s] case class NoAck[F[_]]() extends AckAction[F] {
override def fold(ifAck: => F[Unit], ifNoAck: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] = ifNoAck

override def fold(
ifAck: => F[Unit],
ifNoAck: => F[Unit],
ifSend: AckAction.Send[F] => F[Unit]
): F[Unit] = ifNoAck
}

case class Send[F[_]](messages: ToSend[F]) extends AckAction[F] {

override def fold(ifAck: => F[Unit], ifNoAck: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] =
override def fold(
ifAck: => F[Unit],
ifNoAck: => F[Unit],
ifSend: AckAction.Send[F] => F[Unit]
): F[Unit] =
ifSend(this)
}

Expand All @@ -128,21 +142,24 @@ object JmsAcknowledgerConsumer {

def sendN[F[_]](
messages: NonEmptyList[(JmsMessage, DestinationName)]
): Send[F] =
): AckAction[F] =
Send[F](ToSend[F](messages.map { case (message, name) => (message, (name, None)) }))

def sendNWithDelay[F[_]](
messages: NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): Send[F] = Send[F](ToSend(messages))
): AckAction[F] = Send[F](ToSend(messages))

def sendWithDelay[F[_]](
message: JmsMessage,
destination: DestinationName,
duration: Option[FiniteDuration]
): Send[F] =
): AckAction[F] =
Send[F](ToSend[F](NonEmptyList.one((message, (destination, duration)))))

def send[F[_]](message: JmsMessage, destination: DestinationName): Send[F] =
def send[F[_]](
message: JmsMessage,
destination: DestinationName
): AckAction[F] =
Send[F](ToSend[F](NonEmptyList.one((message, (destination, None)))))
}
}
24 changes: 17 additions & 7 deletions core/src/main/scala/jms4s/JmsAutoAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,39 +105,49 @@ object JmsAutoAcknowledgerConsumer {
object AutoAckAction {

private[jms4s] case class NoOp[F[_]]() extends AutoAckAction[F] {
override def fold(ifNoOp: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] = ifNoOp

override def fold(
ifNoOp: => F[Unit],
ifSend: AutoAckAction.Send[F] => F[Unit]
): F[Unit] = ifNoOp
}

case class Send[F[_]](messages: ToSend[F]) extends AutoAckAction[F] {

override def fold(ifNoOp: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] =
override def fold(
ifNoOp: => F[Unit],
ifSend: AutoAckAction.Send[F] => F[Unit]
): F[Unit] =
ifSend(this)
}

private[jms4s] case class ToSend[F[_]](
messagesAndDestinations: NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
)

def noOp[F[_]]: NoOp[F] = NoOp[F]()
def noOp[F[_]]: AutoAckAction[F] = NoOp[F]()

def sendN[F[_]](
messages: NonEmptyList[(JmsMessage, DestinationName)]
): Send[F] =
): AutoAckAction[F] =
Send[F](ToSend[F](messages.map { case (message, name) => (message, (name, None)) }))

def sendNWithDelay[F[_]](
messages: NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): Send[F] =
): AutoAckAction[F] =
Send[F](ToSend[F](messages.map { case (message, (name, delay)) => (message, (name, delay)) }))

def sendWithDelay[F[_]](
message: JmsMessage,
destination: DestinationName,
duration: Option[FiniteDuration]
): Send[F] =
): AutoAckAction[F] =
Send[F](ToSend[F](NonEmptyList.one((message, (destination, duration)))))

def send[F[_]](message: JmsMessage, destination: DestinationName): Send[F] =
def send[F[_]](
message: JmsMessage,
destination: DestinationName
): AutoAckAction[F] =
Send[F](ToSend[F](NonEmptyList.one((message, (destination, None)))))
}
}
78 changes: 44 additions & 34 deletions core/src/main/scala/jms4s/JmsProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,27 @@ trait JmsProducer[F[_]] {

}

private[jms4s] class ContextPool[F[_]: Sync](private val contextsPool: Queue[F, JmsContext[F]]) {
private[jms4s] class ContextPool[F[_]: Sync](private val contextsPool: Queue[F, (JmsContext[F], MessageFactory[F])]) {

def acquireAndUseContext[A](f: JmsContext[F] => F[A]): F[A] =
MonadCancel[F].bracket(contextsPool.take)(ctx => f(ctx))(usedCtx => contextsPool.offer(usedCtx))
def acquireAndUseContext[A](f: (JmsContext[F], MessageFactory[F]) => F[A]): F[A] =
MonadCancel[F].bracket(contextsPool.take) {
case (ctx, mf) => f(ctx, mf)
}(usedCtx => contextsPool.offer(usedCtx))
}

object ContextPool {

def create[F[_]: Async](context: JmsContext[F], concurrencyLevel: Int): Resource[F, ContextPool[F]] =
for {
pool <- Resource.eval(
Queue.bounded[F, JmsContext[F]](concurrencyLevel)
Queue.bounded[F, (JmsContext[F], MessageFactory[F])](concurrencyLevel)
)
_ <- (0 until concurrencyLevel).toList.traverse_ { _ =>
context
.createContext(SessionType.AutoAcknowledge)
.evalMap(pool.offer)
for {
ctx <- context.createContext(SessionType.AutoAcknowledge)
mf = MessageFactory[F](ctx)
_ <- Resource.eval(pool.offer((ctx, mf)))
} yield ()
}
} yield new ContextPool(pool)
}
Expand All @@ -78,51 +82,57 @@ object JmsProducer {
): Resource[F, JmsProducer[F]] =
for {
pool <- ContextPool.create(context, concurrencyLevel)
mf = MessageFactory[F](context)
} yield new JmsProducer[F] {

override def sendN(
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage, DestinationName)]]
): F[Unit] =
pool.acquireAndUseContext { ctx =>
for {
messagesWithDestinations <- f(mf)
_ <- messagesWithDestinations.traverse_ {
case (message, destinationName) => ctx.send(destinationName, message)
}
} yield ()
pool.acquireAndUseContext {
case (ctx, mf) =>
for {
messagesWithDestinations <- f(mf)
_ <- messagesWithDestinations.traverse_ {
case (message, destinationName) => ctx.send(destinationName, message)
}
} yield ()
}

override def sendNWithDelay(
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]]
): F[Unit] =
pool.acquireAndUseContext { ctx =>
for {
messagesWithDestinationsAndDelayes <- f(mf)
_ <- messagesWithDestinationsAndDelayes.traverse_ {
case (message, (destinatioName, duration)) =>
duration.fold(ctx.send(destinatioName, message))(delay => ctx.send(destinatioName, message, delay))
}

} yield ()
pool.acquireAndUseContext {
case (ctx, mf) =>
for {
messagesWithDestinationsAndDelayes <- f(mf)
_ <- messagesWithDestinationsAndDelayes.traverse_ {
case (message, (destinatioName, duration)) =>
duration.fold(ctx.send(destinatioName, message))(delay =>
ctx.send(destinatioName, message, delay)
)
}

} yield ()
}

override def sendWithDelay(
f: MessageFactory[F] => F[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): F[Unit] =
pool.acquireAndUseContext { ctx =>
for {
(message, (destinationName, delay)) <- f(mf)
_ <- delay.fold(ctx.send(destinationName, message))(delay => ctx.send(destinationName, message, delay))
} yield ()
pool.acquireAndUseContext {
case (ctx, mf) =>
for {
(message, (destinationName, delay)) <- f(mf)
_ <- delay.fold(ctx.send(destinationName, message))(delay => ctx.send(destinationName, message, delay))
} yield ()
}

override def send(f: MessageFactory[F] => F[(JmsMessage, DestinationName)]): F[Unit] =
pool.acquireAndUseContext { ctx =>
for {
(message, destination) <- f(mf)
_ <- ctx.send(destination, message)
} yield ()
pool.acquireAndUseContext {
case (ctx, mf) =>
for {
(message, destination) <- f(mf)
_ <- ctx.send(destination, message)
} yield ()
}

}
}
33 changes: 26 additions & 7 deletions core/src/main/scala/jms4s/JmsTransactedConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,30 @@ object JmsTransactedConsumer {
object TransactionAction {

private[jms4s] case class Commit[F[_]]() extends TransactionAction[F] {
override def fold(ifCommit: => F[Unit], ifRollback: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] = ifCommit

override def fold(
ifCommit: => F[Unit],
ifRollback: => F[Unit],
ifSend: TransactionAction.Send[F] => F[Unit]
): F[Unit] = ifCommit
}

private[jms4s] case class Rollback[F[_]]() extends TransactionAction[F] {
override def fold(ifCommit: => F[Unit], ifRollback: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] = ifRollback

override def fold(
ifCommit: => F[Unit],
ifRollback: => F[Unit],
ifSend: TransactionAction.Send[F] => F[Unit]
): F[Unit] = ifRollback
}

case class Send[F[_]](messages: ToSend[F]) extends TransactionAction[F] {

override def fold(ifCommit: => F[Unit], ifRollback: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] =
override def fold(
ifCommit: => F[Unit],
ifRollback: => F[Unit],
ifSend: TransactionAction.Send[F] => F[Unit]
): F[Unit] =
ifSend(this)
}

Expand All @@ -126,22 +140,27 @@ object JmsTransactedConsumer {

def rollback[F[_]]: TransactionAction[F] = Rollback[F]()

def sendN[F[_]](messages: NonEmptyList[(JmsMessage, DestinationName)]): Send[F] =
def sendN[F[_]](
messages: NonEmptyList[(JmsMessage, DestinationName)]
): TransactionAction[F] =
Send[F](ToSend[F](messages.map { case (message, name) => (message, (name, None)) }))

def sendNWithDelay[F[_]](
messages: NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): Send[F] =
): TransactionAction[F] =
Send[F](ToSend[F](messages.map { case (message, (name, delay)) => (message, (name, delay)) }))

def sendWithDelay[F[_]](
message: JmsMessage,
destination: DestinationName,
duration: Option[FiniteDuration]
): Send[F] =
): TransactionAction[F] =
Send[F](ToSend[F](NonEmptyList.one((message, (destination, duration)))))

def send[F[_]](message: JmsMessage, destination: DestinationName): Send[F] =
def send[F[_]](
message: JmsMessage,
destination: DestinationName
): TransactionAction[F] =
Send[F](ToSend[F](NonEmptyList.one((message, (destination, None)))))
}
}
36 changes: 17 additions & 19 deletions core/src/main/scala/jms4s/jms/JmsContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,35 @@ import org.typelevel.log4cats.Logger
import javax.jms.JMSContext
import scala.concurrent.duration.FiniteDuration

class JmsContext[F[_]: Async: Logger](
private val context: JMSContext
) {
class JmsContext[F[_]: Async: Logger](private val context: JMSContext) {

def createContext(sessionType: SessionType): Resource[F, JmsContext[F]] =
Resource
.make(
Logger[F].info("Creating context") *> {
for {
ctx <- Sync[F].blocking(context.createContext(sessionType.rawAcknowledgeMode))
_ <- Logger[F].info(s"Context $ctx successfully created")
} yield ctx
}
for {
_ <- Logger[F].info("Creating context")
ctx <- Sync[F].blocking(context.createContext(sessionType.rawAcknowledgeMode))
_ <- Logger[F].info(s"Context $ctx successfully created")
} yield ctx
)(context =>
Logger[F].info(s"Releasing context $context") *>
Sync[F].blocking(context.close())
)
.map(context => new JmsContext(context))

def send(destinationName: DestinationName, message: JmsMessage): F[Unit] =
createDestination(destinationName)
.flatMap(destination => Sync[F].blocking(context.createProducer().send(destination.wrapped, message.wrapped)))
.map(_ => ())
for {
destination <- createDestination(destinationName)
p <- Sync[F].blocking(context.createProducer())
_ <- Sync[F].blocking(p.send(destination.wrapped, message.wrapped))
} yield ()

def send(destinationName: DestinationName, message: JmsMessage, delay: FiniteDuration): F[Unit] =
for {
destination <- createDestination(destinationName)
p <- Sync[F].delay(context.createProducer())
_ <- Sync[F].delay(p.setDeliveryDelay(delay.toMillis)) *> Sync[F].blocking(
p.send(destination.wrapped, message.wrapped)
)
p <- Sync[F].blocking(context.createProducer())
_ <- Sync[F].delay(p.setDeliveryDelay(delay.toMillis))
_ <- Sync[F].blocking(p.send(destination.wrapped, message.wrapped))
} yield ()

def createJmsConsumer(
Expand All @@ -81,17 +79,17 @@ class JmsContext[F[_]: Async: Logger](
} yield new JmsMessageConsumer[F](consumer, pollingInterval)

def createTextMessage(value: String): F[JmsTextMessage] =
Sync[F].delay(new JmsTextMessage(context.createTextMessage(value)))
Sync[F].blocking(context.createTextMessage(value)).map(new JmsTextMessage(_))

def commit: F[Unit] = Sync[F].blocking(context.commit())

def rollback: F[Unit] = Sync[F].blocking(context.rollback())

private def createQueue(queue: QueueName): F[JmsQueue] =
Sync[F].delay(new JmsQueue(context.createQueue(queue.value)))
Sync[F].blocking(context.createQueue(queue.value)).map(new JmsQueue(_))

private def createTopic(topicName: TopicName): F[JmsTopic] =
Sync[F].delay(new JmsTopic(context.createTopic(topicName.value)))
Sync[F].blocking(context.createTopic(topicName.value)).map(new JmsTopic(_))

def createDestination(destination: DestinationName): F[JmsDestination] = destination match {
case q: QueueName => createQueue(q).widen[JmsDestination]
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/jms4s/jms/JmsMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,6 @@ object JmsMessage {
def setText(text: String): Try[Unit] =
Try(wrapped.setText(text))

val getText: Try[String] = Try(wrapped.getText)
def getText: Try[String] = Try(wrapped.getText)
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/jms4s/jms/MessageFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package jms4s.jms

import jms4s.jms.JmsMessage.JmsTextMessage

class MessageFactory[F[_]](context: JmsContext[F]) {
class MessageFactory[F[_]](private val context: JmsContext[F]) extends AnyVal {
def makeTextMessage(value: String): F[JmsTextMessage] = context.createTextMessage(value)
}

Expand Down
Loading

0 comments on commit 57b5e0b

Please sign in to comment.