Skip to content

Commit

Permalink
Merge pull request #174 from fp-in-bo/ce3
Browse files Browse the repository at this point in the history
Migrate to Cats Effect 3
  • Loading branch information
faustin0 authored May 5, 2021
2 parents d903984 + 9e730a3 commit 9b2193b
Show file tree
Hide file tree
Showing 17 changed files with 155 additions and 196 deletions.
15 changes: 7 additions & 8 deletions active-mq-artemis/src/main/scala/jms4s/activemq/activeMQ.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
package jms4s.activemq

import cats.data.NonEmptyList
import cats.effect.{ Blocker, Concurrent, ContextShift, Resource }
import cats.effect.{ Async, Resource, Sync }
import cats.syntax.all._
import io.chrisdavenport.log4cats.Logger
import jms4s.JmsClient
import jms4s.jms.JmsContext
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.typelevel.log4cats.Logger

object activeMQ {

Expand All @@ -42,14 +42,13 @@ object activeMQ {
case class Endpoint(host: String, port: Int)
case class ClientId(value: String) extends AnyVal

def makeJmsClient[F[_]: ContextShift: Logger: Concurrent](
config: Config,
blocker: Blocker
def makeJmsClient[F[_]: Async: Logger](
config: Config
): Resource[F, JmsClient[F]] =
for {
context <- Resource.make(
Logger[F].info(s"Opening context to MQ at ${hosts(config.endpoints)}...") *>
blocker.delay {
Sync[F].blocking {
val factory = new ActiveMQConnectionFactory(hosts(config.endpoints))
factory.setClientID(config.clientId.value)

Expand All @@ -59,11 +58,11 @@ object activeMQ {
}
)(c =>
Logger[F].info(s"Closing context $c to MQ at ${hosts(config.endpoints)}...") *>
blocker.delay(c.close()) *>
Sync[F].blocking(c.close()) *>
Logger[F].info(s"Closed context $c to MQ at ${hosts(config.endpoints)}.")
)
_ <- Resource.eval(Logger[F].info(s"Opened context $context."))
} yield new JmsClient[F](new JmsContext[F](context, blocker))
} yield new JmsClient[F](new JmsContext[F](context))

private def hosts(endpoints: NonEmptyList[Endpoint]): String =
endpoints.map(e => s"tcp://${e.host}:${e.port}").toList.mkString(",")
Expand Down
21 changes: 10 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ val catsV = "2.6.0"
val jmsV = "2.0.1"
val ibmMQV = "9.2.2.0"
val activeMQV = "2.17.0"
val catsEffectV = "2.5.0"
val catsEffectScalaTestV = "0.5.1"
val fs2V = "2.5.5"
val log4catsV = "1.1.1"
val catsEffectV = "3.1.0"
val catsEffectScalaTestV = "1.1.0"
val fs2V = "3.0.2"
val log4catsV = "2.1.0"
val log4jSlf4jImplV = "2.14.1"

val kindProjectorV = "0.11.3"
Expand Down Expand Up @@ -214,13 +214,12 @@ lazy val commonSettings = Seq(
addCompilerPlugin("org.typelevel" %% "kind-projector" % kindProjectorV cross CrossVersion.full),
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % betterMonadicForV),
libraryDependencies ++= Seq(
"javax.jms" % "javax.jms-api" % jmsV,
"org.typelevel" %% "cats-core" % catsV,
"org.typelevel" %% "cats-effect" % catsEffectV,
"co.fs2" %% "fs2-core" % fs2V,
"co.fs2" %% "fs2-io" % fs2V,
"io.chrisdavenport" %% "log4cats-slf4j" % log4catsV,
"com.codecommit" %% "cats-effect-testing-scalatest" % catsEffectScalaTestV % Test
"javax.jms" % "javax.jms-api" % jmsV,
"org.typelevel" %% "cats-core" % catsV,
"org.typelevel" %% "cats-effect" % catsEffectV,
"co.fs2" %% "fs2-core" % fs2V,
"org.typelevel" %% "log4cats-slf4j" % log4catsV,
"org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectScalaTestV % Test
)
)

Expand Down
26 changes: 13 additions & 13 deletions core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
package jms4s

import cats.data.NonEmptyList
import cats.effect.{ Blocker, Concurrent, ContextShift, Resource, Sync }
import cats.effect.std.Queue
import cats.effect.{ Async, Resource, Sync }
import cats.syntax.all._
import fs2.Stream
import fs2.concurrent.Queue
import jms4s.JmsAcknowledgerConsumer.AckAction
import jms4s.config.DestinationName
import jms4s.jms._
Expand All @@ -39,7 +39,7 @@ trait JmsAcknowledgerConsumer[F[_]] {

object JmsAcknowledgerConsumer {

private[jms4s] def make[F[_]: ContextShift: Concurrent](
private[jms4s] def make[F[_]: Async](
context: JmsContext[F],
inputDestinationName: DestinationName,
concurrencyLevel: Int
Expand All @@ -52,44 +52,44 @@ object JmsAcknowledgerConsumer {
for {
ctx <- context.createContext(SessionType.ClientAcknowledge)
consumer <- ctx.createJmsConsumer(inputDestinationName)
_ <- Resource.eval(pool.enqueue1((ctx, consumer, MessageFactory[F](ctx))))
_ <- Resource.eval(pool.offer((ctx, consumer, MessageFactory[F](ctx))))
} yield ()
}
} yield build(pool, concurrencyLevel, context.blocker)
} yield build(pool, concurrencyLevel)

private def build[F[_]: ContextShift: Concurrent](
private def build[F[_]: Async](
pool: Queue[F, (JmsContext[F], JmsMessageConsumer[F], MessageFactory[F])],
concurrencyLevel: Int,
blocker: Blocker
concurrencyLevel: Int
): JmsAcknowledgerConsumer[F] =
(f: (JmsMessage, MessageFactory[F]) => F[AckAction[F]]) =>
(f: (JmsMessage, MessageFactory[F]) => F[AckAction[F]]) => {
Stream
.emits(0 until concurrencyLevel)
.as(
Stream.eval(
for {
(context, consumer, mFactory) <- pool.dequeue1
(context, consumer, mFactory) <- pool.take
message <- consumer.receiveJmsMessage
res <- f(message, mFactory)
_ <- res.fold(
ifAck = blocker.delay(message.wrapped.acknowledge()),
ifAck = Sync[F].blocking(message.wrapped.acknowledge()),
ifNoAck = Sync[F].unit,
ifSend = send =>
send.messages.messagesAndDestinations.traverse_ {
case (message, (name, delay)) =>
delay.fold(ifEmpty = context.send(name, message))(
f = d => context.send(name, message, d)
)
} *> blocker.delay(message.wrapped.acknowledge())
} *> Sync[F].blocking(message.wrapped.acknowledge())
)
_ <- pool.enqueue1((context, consumer, mFactory))
_ <- pool.offer((context, consumer, mFactory))
} yield ()
)
)
.parJoin(concurrencyLevel)
.repeat
.compile
.drain
}

sealed abstract class AckAction[F[_]] extends Product with Serializable {
def fold(ifAck: => F[Unit], ifNoAck: => F[Unit], ifSend: AckAction.Send[F] => F[Unit]): F[Unit]
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/jms4s/JmsAutoAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
package jms4s

import cats.data.NonEmptyList
import cats.effect.{ Concurrent, Resource, Sync }
import cats.effect.std.Queue
import cats.effect.{ Async, Resource, Sync }
import cats.syntax.all._
import fs2.Stream
import fs2.concurrent.Queue
import jms4s.JmsAutoAcknowledgerConsumer.AutoAckAction
import jms4s.JmsAutoAcknowledgerConsumer.AutoAckAction.Send
import jms4s.config.DestinationName
Expand All @@ -40,7 +40,7 @@ trait JmsAutoAcknowledgerConsumer[F[_]] {

object JmsAutoAcknowledgerConsumer {

private[jms4s] def make[F[_]: Concurrent](
private[jms4s] def make[F[_]: Async](
context: JmsContext[F],
inputDestinationName: DestinationName,
concurrencyLevel: Int
Expand All @@ -53,12 +53,12 @@ object JmsAutoAcknowledgerConsumer {
for {
ctx <- context.createContext(SessionType.AutoAcknowledge)
consumer <- ctx.createJmsConsumer(inputDestinationName)
_ <- Resource.eval(pool.enqueue1((ctx, consumer, MessageFactory[F](ctx))))
_ <- Resource.eval(pool.offer((ctx, consumer, MessageFactory[F](ctx))))
} yield ()
}
} yield build(pool, concurrencyLevel)

private def build[F[_]: Concurrent](
private def build[F[_]: Async](
pool: Queue[F, (JmsContext[F], JmsMessageConsumer[F], MessageFactory[F])],
concurrencyLevel: Int
): JmsAutoAcknowledgerConsumer[F] =
Expand All @@ -68,7 +68,7 @@ object JmsAutoAcknowledgerConsumer {
.as(
Stream.eval(
for {
(context, consumer, mf) <- pool.dequeue1
(context, consumer, mf) <- pool.take
message <- consumer.receiveJmsMessage
res: AutoAckAction[F] <- f(message, mf)
_ <- res.fold(
Expand All @@ -83,7 +83,7 @@ object JmsAutoAcknowledgerConsumer {
)
}
)
_ <- pool.enqueue1((context, consumer, mf))
_ <- pool.offer((context, consumer, mf))
} yield ()
)
)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/jms4s/JmsClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@

package jms4s

import cats.effect.{ Concurrent, ContextShift, Resource }
import cats.effect.{ Async, Resource }
import jms4s.config.DestinationName
import jms4s.jms._

class JmsClient[F[_]: ContextShift: Concurrent] private[jms4s] (private[jms4s] val context: JmsContext[F]) {
class JmsClient[F[_]: Async] private[jms4s] (private[jms4s] val context: JmsContext[F]) {

def createTransactedConsumer(
inputDestinationName: DestinationName,
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/jms4s/JmsProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
package jms4s

import cats.data.NonEmptyList
import cats.effect.{ Concurrent, Resource }
import cats.effect.std.Queue
import cats.effect.{ Async, Resource }
import cats.syntax.all._
import fs2.concurrent.Queue
import jms4s.config.DestinationName
import jms4s.jms._
import jms4s.model.SessionType
Expand All @@ -51,7 +51,7 @@ trait JmsProducer[F[_]] {

object JmsProducer {

private[jms4s] def make[F[_]: Concurrent](
private[jms4s] def make[F[_]: Async](
context: JmsContext[F],
concurrencyLevel: Int
): Resource[F, JmsProducer[F]] =
Expand All @@ -62,7 +62,7 @@ object JmsProducer {
_ <- (0 until concurrencyLevel).toList.traverse_ { _ =>
for {
c <- context.createContext(SessionType.AutoAcknowledge)
_ <- Resource.eval(pool.enqueue1(c))
_ <- Resource.eval(pool.offer(c))
} yield ()
}
mf = MessageFactory[F](context)
Expand All @@ -72,43 +72,43 @@ object JmsProducer {
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage, DestinationName)]]
): F[Unit] =
for {
ctx <- pool.dequeue1
ctx <- pool.take
messagesWithDestinations <- f(mf)
_ <- messagesWithDestinations.traverse_ {
case (message, destinationName) => ctx.send(destinationName, message)
}
_ <- pool.enqueue1(ctx)
_ <- pool.offer(ctx)
} yield ()

override def sendNWithDelay(
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]]
): F[Unit] =
for {
ctx <- pool.dequeue1
ctx <- pool.take
messagesWithDestinationsAndDelayes <- f(mf)
_ <- messagesWithDestinationsAndDelayes.traverse_ {
case (message, (destinatioName, duration)) =>
duration.fold(ctx.send(destinatioName, message))(delay => ctx.send(destinatioName, message, delay))
}
_ <- pool.enqueue1(ctx)
_ <- pool.offer(ctx)
} yield ()

override def sendWithDelay(
f: MessageFactory[F] => F[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): F[Unit] =
for {
ctx <- pool.dequeue1
ctx <- pool.take
(message, (destinationName, delay)) <- f(mf)
_ <- delay.fold(ctx.send(destinationName, message))(delay => ctx.send(destinationName, message, delay))
_ <- pool.enqueue1(ctx)
_ <- pool.offer(ctx)
} yield ()

override def send(f: MessageFactory[F] => F[(JmsMessage, DestinationName)]): F[Unit] =
for {
ctx <- pool.dequeue1
ctx <- pool.take
(message, destination) <- f(mf)
_ <- ctx.send(destination, message)
_ <- pool.enqueue1(ctx)
_ <- pool.offer(ctx)
} yield ()
}
}
24 changes: 11 additions & 13 deletions core/src/main/scala/jms4s/JmsTransactedConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
package jms4s

import cats.data.NonEmptyList
import cats.effect.{ Concurrent, Resource }
import cats.effect.std.Queue
import cats.effect.{ Async, Resource }
import cats.syntax.all._
import fs2.Stream
import fs2.concurrent.Queue
import jms4s.JmsTransactedConsumer.JmsTransactedConsumerPool.Received
import jms4s.JmsTransactedConsumer.TransactionAction
import jms4s.config.DestinationName
Expand All @@ -40,7 +40,7 @@ trait JmsTransactedConsumer[F[_]] {

object JmsTransactedConsumer {

private[jms4s] def make[F[_]: Concurrent](
private[jms4s] def make[F[_]: Async](
context: JmsContext[F],
inputDestinationName: DestinationName,
concurrencyLevel: Int
Expand All @@ -54,17 +54,16 @@ object JmsTransactedConsumer {
for {
c <- context.createContext(SessionType.Transacted)
consumer <- c.createJmsConsumer(inputDestinationName)
_ <- Resource.eval(pool.enqueue1((c, consumer, MessageFactory[F](c))))
_ <- Resource.eval(pool.offer((c, consumer, MessageFactory[F](c))))
} yield ()
)
} yield build(new JmsTransactedConsumerPool[F](pool), concurrencyLevel)

private def build[F[_]: Concurrent](
private def build[F[_]: Async](
pool: JmsTransactedConsumerPool[F],
concurrencyLevel: Int
): JmsTransactedConsumer[F] = new JmsTransactedConsumer[F] {

override def handle(f: (JmsMessage, MessageFactory[F]) => F[TransactionAction[F]]): F[Unit] =
): JmsTransactedConsumer[F] =
(f: (JmsMessage, MessageFactory[F]) => F[TransactionAction[F]]) =>
Stream
.emits(0 until concurrencyLevel)
.as(
Expand All @@ -90,28 +89,27 @@ object JmsTransactedConsumer {
.repeat
.compile
.drain
}

private[jms4s] class JmsTransactedConsumerPool[F[_]: Concurrent](
private[jms4s] class JmsTransactedConsumerPool[F[_]: Async](
pool: Queue[F, (JmsContext[F], JmsMessageConsumer[F], MessageFactory[F])]
) {

val receive: F[Received[F]] =
for {
(context, consumer, mf) <- pool.dequeue1
(context, consumer, mf) <- pool.take
message <- consumer.receiveJmsMessage
} yield Received(message, context, consumer, mf)

def commit(context: JmsContext[F], consumer: JmsMessageConsumer[F], mf: MessageFactory[F]): F[Unit] =
for {
_ <- context.commit
_ <- pool.enqueue1((context, consumer, mf))
_ <- pool.offer((context, consumer, mf))
} yield ()

def rollback(context: JmsContext[F], consumer: JmsMessageConsumer[F], mf: MessageFactory[F]): F[Unit] =
for {
_ <- context.rollback
_ <- pool.enqueue1((context, consumer, mf))
_ <- pool.offer((context, consumer, mf))
} yield ()
}

Expand Down
Loading

0 comments on commit 9b2193b

Please sign in to comment.