Skip to content

Commit

Permalink
Merge pull request #24 from fp-in-bo/modify-consumers
Browse files Browse the repository at this point in the history
Update handling in consumers
  • Loading branch information
AL333Z authored Apr 21, 2020
2 parents 185167f + eb4adf6 commit 2d28fa4
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 209 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ jobs:
script: sbt ++$TRAVIS_SCALA_VERSION scalafmtCheckAll
- name: test 2.13
scala: *scala_version_213
before_script: docker-compose up -d
before_script: docker-compose up --renew-anon-volumes --force-recreate -d
script: sbt ++$TRAVIS_SCALA_VERSION test
after_script: docker-compose down
- name: test 2.12
scala: *scala_version_212
before_script: docker-compose up -d
before_script: docker-compose up --renew-anon-volumes --force-recreate -d
script: sbt ++$TRAVIS_SCALA_VERSION test
after_script: docker-compose down
- name: site
Expand Down
72 changes: 31 additions & 41 deletions core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import jms4s.model.SessionType
import scala.concurrent.duration.FiniteDuration

trait JmsAcknowledgerConsumer[F[_]] {
def handle(f: JmsMessage => F[AckAction[F]]): F[Unit]
def handle(f: (JmsMessage, MessageFactory[F]) => F[AckAction[F]]): F[Unit]
}

object JmsAcknowledgerConsumer {
Expand All @@ -25,49 +25,44 @@ object JmsAcknowledgerConsumer {
concurrencyLevel: Int
): Resource[F, JmsAcknowledgerConsumer[F]] =
for {
pool <- Resource.liftF(Queue.bounded[F, (JmsContext[F], JmsMessageConsumer[F])](concurrencyLevel))
pool <- Resource.liftF(
Queue.bounded[F, (JmsContext[F], JmsMessageConsumer[F], MessageFactory[F])](concurrencyLevel)
)
_ <- (0 until concurrencyLevel).toList.traverse_ { _ =>
for {
ctx <- context.createContext(SessionType.ClientAcknowledge)
consumer <- ctx.createJmsConsumer(inputDestinationName)
_ <- Resource.liftF(pool.enqueue1((ctx, consumer)))
_ <- Resource.liftF(pool.enqueue1((ctx, consumer, MessageFactory[F](ctx))))
} yield ()
}
} yield build(pool, concurrencyLevel, context.blocker, MessageFactory[F](context))
} yield build(pool, concurrencyLevel, context.blocker)

private def build[F[_]: ContextShift: Concurrent](
pool: Queue[F, (JmsContext[F], JmsMessageConsumer[F])],
pool: Queue[F, (JmsContext[F], JmsMessageConsumer[F], MessageFactory[F])],
concurrencyLevel: Int,
blocker: Blocker,
messageFactory: MessageFactory[F]
blocker: Blocker
): JmsAcknowledgerConsumer[F] =
(f: JmsMessage => F[AckAction[F]]) =>
(f: (JmsMessage, MessageFactory[F]) => F[AckAction[F]]) =>
Stream
.emits(0 until concurrencyLevel)
.as(
Stream.eval(
for {
(context, consumer) <- pool.dequeue1
message <- consumer.receiveJmsMessage
res <- f(message)
(context, consumer, mFactory) <- pool.dequeue1
message <- consumer.receiveJmsMessage
res <- f(message, mFactory)
_ <- res.fold(
ifAck = blocker.delay(message.wrapped.acknowledge()),
ifNoAck = Sync[F].unit,
ifSend = send =>
send
.createMessages(messageFactory)
.flatMap(toSend =>
toSend.messagesAndDestinations.traverse_ {
case (message, (name, delay)) =>
delay.fold(
ifEmpty = context.send(name, message)
)(
f = d => context.send(name, message, d)
)
} *> blocker.delay(message.wrapped.acknowledge())
)
send.messages.messagesAndDestinations.traverse_ {
case (message, (name, delay)) =>
delay.fold(ifEmpty = context.send(name, message))(
f = d => context.send(name, message, d)
)
} *> blocker.delay(message.wrapped.acknowledge())
)
_ <- pool.enqueue1((context, consumer))
_ <- pool.enqueue1((context, consumer, mFactory))
} yield ()
)
)
Expand All @@ -91,9 +86,7 @@ object JmsAcknowledgerConsumer {
override def fold(ifAck: => F[Unit], ifNoAck: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] = ifNoAck
}

case class Send[F[_]](
createMessages: MessageFactory[F] => F[ToSend[F]]
) extends AckAction[F] {
case class Send[F[_]](messages: ToSend[F]) extends AckAction[F] {

override def fold(ifAck: => F[Unit], ifNoAck: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] =
ifSend(this)
Expand All @@ -108,25 +101,22 @@ object JmsAcknowledgerConsumer {
def noAck[F[_]]: AckAction[F] = NoAck()

def sendN[F[_]: Functor](
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, DestinationName)]]
messages: NonEmptyList[(JmsMessage, DestinationName)]
): Send[F] =
Send[F](mf =>
messageFactory(mf).map(nel => nel.map { case (message, name) => (message, (name, None)) }).map(ToSend[F])
)
Send[F](ToSend[F](messages.map { case (message, name) => (message, (name, None)) }))

def sendNWithDelay[F[_]: Functor](
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]]
): Send[F] =
Send[F](mf => messageFactory(mf).map(ToSend[F]))
messages: NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): Send[F] = Send[F](ToSend(messages))

def sendWithDelay[F[_]: Functor](
messageFactory: MessageFactory[F] => F[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
def sendWithDelay[F[_]](
message: JmsMessage,
destination: DestinationName,
duration: Option[FiniteDuration]
): Send[F] =
Send[F](mf => messageFactory(mf).map(x => ToSend[F](NonEmptyList.one(x))))
Send[F](ToSend[F](NonEmptyList.one((message, (destination, duration)))))

def send[F[_]: Functor](messageFactory: MessageFactory[F] => F[(JmsMessage, DestinationName)]): Send[F] =
Send[F](mf =>
messageFactory(mf).map { case (message, name) => ToSend[F](NonEmptyList.one((message, (name, None)))) }
)
def send[F[_]: Functor](message: JmsMessage, destination: DestinationName): Send[F] =
Send[F](ToSend[F](NonEmptyList.one((message, (destination, None)))))
}
}
76 changes: 35 additions & 41 deletions core/src/main/scala/jms4s/JmsAutoAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import cats.implicits._
import fs2.Stream
import fs2.concurrent.Queue
import jms4s.JmsAutoAcknowledgerConsumer.AutoAckAction
import jms4s.JmsAutoAcknowledgerConsumer.AutoAckAction.Send
import jms4s.config.DestinationName
import jms4s.jms._
import jms4s.model.SessionType

import scala.concurrent.duration.FiniteDuration

trait JmsAutoAcknowledgerConsumer[F[_]] {
def handle(f: JmsMessage => F[AutoAckAction[F]]): F[Unit]
def handle(f: (JmsMessage, MessageFactory[F]) => F[AutoAckAction[F]]): F[Unit]
}

object JmsAutoAcknowledgerConsumer {
Expand All @@ -25,47 +26,44 @@ object JmsAutoAcknowledgerConsumer {
concurrencyLevel: Int
): Resource[F, JmsAutoAcknowledgerConsumer[F]] =
for {
pool <- Resource.liftF(Queue.bounded[F, (JmsContext[F], JmsMessageConsumer[F])](concurrencyLevel))
pool <- Resource.liftF(
Queue.bounded[F, (JmsContext[F], JmsMessageConsumer[F], MessageFactory[F])](concurrencyLevel)
)
_ <- (0 until concurrencyLevel).toList.traverse_ { _ =>
for {
ctx <- context.createContext(SessionType.AutoAcknowledge)
consumer <- ctx.createJmsConsumer(inputDestinationName)
_ <- Resource.liftF(pool.enqueue1((ctx, consumer)))
_ <- Resource.liftF(pool.enqueue1((ctx, consumer, MessageFactory[F](ctx))))
} yield ()
}
} yield build(pool, concurrencyLevel, MessageFactory[F](context))
} yield build(pool, concurrencyLevel)

private def build[F[_]: ContextShift: Concurrent](
pool: Queue[F, (JmsContext[F], JmsMessageConsumer[F])],
concurrencyLevel: Int,
messageFactory: MessageFactory[F]
pool: Queue[F, (JmsContext[F], JmsMessageConsumer[F], MessageFactory[F])],
concurrencyLevel: Int
): JmsAutoAcknowledgerConsumer[F] =
(f: JmsMessage => F[AutoAckAction[F]]) =>
(f: (JmsMessage, MessageFactory[F]) => F[AutoAckAction[F]]) =>
Stream
.emits(0 until concurrencyLevel)
.as(
Stream.eval(
for {
(context, consumer) <- pool.dequeue1
message <- consumer.receiveJmsMessage
res <- f(message)
(context, consumer, mf) <- pool.dequeue1
message <- consumer.receiveJmsMessage
res: AutoAckAction[F] <- f(message, mf)
_ <- res.fold(
ifNoOp = Sync[F].unit,
ifSend = send =>
send
.createMessages(messageFactory)
.flatMap(toSend =>
toSend.messagesAndDestinations.traverse_ {
case (message, (name, delay)) =>
delay.fold(
ifEmpty = context.send(name, message)
)(
f = d => context.send(name, message, d)
)
}
)
ifSend = (send: Send[F]) =>
send.messages.messagesAndDestinations.traverse_ {
case (message, (name, delay)) =>
delay.fold(
ifEmpty = context.send(name, message)
)(
f = d => context.send(name, message, d)
)
}
)
_ <- pool.enqueue1((context, consumer))
_ <- pool.enqueue1((context, consumer, mf))
} yield ()
)
)
Expand All @@ -84,9 +82,7 @@ object JmsAutoAcknowledgerConsumer {
override def fold(ifNoOp: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] = ifNoOp
}

case class Send[F[_]](
createMessages: MessageFactory[F] => F[ToSend[F]]
) extends AutoAckAction[F] {
case class Send[F[_]](messages: ToSend[F]) extends AutoAckAction[F] {

override def fold(ifNoOp: => F[Unit], ifSend: Send[F] => F[Unit]): F[Unit] =
ifSend(this)
Expand All @@ -99,25 +95,23 @@ object JmsAutoAcknowledgerConsumer {
def noOp[F[_]]: NoOp[F] = NoOp[F]()

def sendN[F[_]: Functor](
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, DestinationName)]]
messages: NonEmptyList[(JmsMessage, DestinationName)]
): Send[F] =
Send[F](mf =>
messageFactory(mf).map(nel => nel.map { case (message, name) => (message, (name, None)) }).map(ToSend[F])
)
Send[F](ToSend[F](messages.map { case (message, name) => (message, (name, None)) }))

def sendNWithDelay[F[_]: Functor](
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]]
messages: NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): Send[F] =
Send[F](mf => messageFactory(mf).map(ToSend[F]))
Send[F](ToSend[F](messages.map { case (message, (name, delay)) => (message, (name, delay)) }))

def sendWithDelay[F[_]: Functor](
messageFactory: MessageFactory[F] => F[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
def sendWithDelay[F[_]](
message: JmsMessage,
destination: DestinationName,
duration: Option[FiniteDuration]
): Send[F] =
Send[F](mf => messageFactory(mf).map(x => ToSend[F](NonEmptyList.one(x))))
Send[F](ToSend[F](NonEmptyList.one((message, (destination, duration)))))

def send[F[_]: Functor](messageFactory: MessageFactory[F] => F[(JmsMessage, DestinationName)]): Send[F] =
Send[F](mf =>
messageFactory(mf).map { case (message, name) => ToSend[F](NonEmptyList.one((message, (name, None)))) }
)
def send[F[_]](message: JmsMessage, destination: DestinationName): Send[F] =
Send[F](ToSend[F](NonEmptyList.one((message, (destination, None)))))
}
}
Loading

0 comments on commit 2d28fa4

Please sign in to comment.