Skip to content

Commit

Permalink
Merge pull request #23 from fp-in-bo/usability-improvements
Browse files Browse the repository at this point in the history
Usability improvements on JmsMessage
  • Loading branch information
AL333Z authored Apr 19, 2020
2 parents 1b195cd + baacc23 commit 059a126
Show file tree
Hide file tree
Showing 19 changed files with 161 additions and 177 deletions.
14 changes: 7 additions & 7 deletions core/src/main/scala/jms4s/JmsAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import jms4s.model.SessionType
import scala.concurrent.duration.FiniteDuration

trait JmsAcknowledgerConsumer[F[_]] {
def handle(f: JmsMessage[F] => F[AckAction[F]]): F[Unit]
def handle(f: JmsMessage => F[AckAction[F]]): F[Unit]
}

object JmsAcknowledgerConsumer {
Expand All @@ -41,7 +41,7 @@ object JmsAcknowledgerConsumer {
blocker: Blocker,
messageFactory: MessageFactory[F]
): JmsAcknowledgerConsumer[F] =
(f: JmsMessage[F] => F[AckAction[F]]) =>
(f: JmsMessage => F[AckAction[F]]) =>
Stream
.emits(0 until concurrencyLevel)
.as(
Expand Down Expand Up @@ -100,31 +100,31 @@ object JmsAcknowledgerConsumer {
}

private[jms4s] case class ToSend[F[_]](
messagesAndDestinations: NonEmptyList[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]
messagesAndDestinations: NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
)

def ack[F[_]]: AckAction[F] = Ack()

def noAck[F[_]]: AckAction[F] = NoAck()

def sendN[F[_]: Functor](
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], DestinationName)]]
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, DestinationName)]]
): Send[F] =
Send[F](mf =>
messageFactory(mf).map(nel => nel.map { case (message, name) => (message, (name, None)) }).map(ToSend[F])
)

def sendNWithDelay[F[_]: Functor](
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]]
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]]
): Send[F] =
Send[F](mf => messageFactory(mf).map(ToSend[F]))

def sendWithDelay[F[_]: Functor](
messageFactory: MessageFactory[F] => F[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]
messageFactory: MessageFactory[F] => F[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): Send[F] =
Send[F](mf => messageFactory(mf).map(x => ToSend[F](NonEmptyList.one(x))))

def send[F[_]: Functor](messageFactory: MessageFactory[F] => F[(JmsMessage[F], DestinationName)]): Send[F] =
def send[F[_]: Functor](messageFactory: MessageFactory[F] => F[(JmsMessage, DestinationName)]): Send[F] =
Send[F](mf =>
messageFactory(mf).map { case (message, name) => ToSend[F](NonEmptyList.one((message, (name, None)))) }
)
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/jms4s/JmsAutoAcknowledgerConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import jms4s.model.SessionType
import scala.concurrent.duration.FiniteDuration

trait JmsAutoAcknowledgerConsumer[F[_]] {
def handle(f: JmsMessage[F] => F[AutoAckAction[F]]): F[Unit]
def handle(f: JmsMessage => F[AutoAckAction[F]]): F[Unit]
}

object JmsAutoAcknowledgerConsumer {
Expand All @@ -40,7 +40,7 @@ object JmsAutoAcknowledgerConsumer {
concurrencyLevel: Int,
messageFactory: MessageFactory[F]
): JmsAutoAcknowledgerConsumer[F] =
(f: JmsMessage[F] => F[AutoAckAction[F]]) =>
(f: JmsMessage => F[AutoAckAction[F]]) =>
Stream
.emits(0 until concurrencyLevel)
.as(
Expand Down Expand Up @@ -93,29 +93,29 @@ object JmsAutoAcknowledgerConsumer {
}

private[jms4s] case class ToSend[F[_]](
messagesAndDestinations: NonEmptyList[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]
messagesAndDestinations: NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
)

def noOp[F[_]]: NoOp[F] = NoOp[F]()

def sendN[F[_]: Functor](
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], DestinationName)]]
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, DestinationName)]]
): Send[F] =
Send[F](mf =>
messageFactory(mf).map(nel => nel.map { case (message, name) => (message, (name, None)) }).map(ToSend[F])
)

def sendNWithDelay[F[_]: Functor](
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]]
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]]
): Send[F] =
Send[F](mf => messageFactory(mf).map(ToSend[F]))

def sendWithDelay[F[_]: Functor](
messageFactory: MessageFactory[F] => F[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]
messageFactory: MessageFactory[F] => F[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): Send[F] =
Send[F](mf => messageFactory(mf).map(x => ToSend[F](NonEmptyList.one(x))))

def send[F[_]: Functor](messageFactory: MessageFactory[F] => F[(JmsMessage[F], DestinationName)]): Send[F] =
def send[F[_]: Functor](messageFactory: MessageFactory[F] => F[(JmsMessage, DestinationName)]): Send[F] =
Send[F](mf =>
messageFactory(mf).map { case (message, name) => ToSend[F](NonEmptyList.one((message, (name, None)))) }
)
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/jms4s/JmsProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ import scala.concurrent.duration.FiniteDuration
trait JmsProducer[F[_]] {

def sendN(
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], DestinationName)]]
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, DestinationName)]]
): F[Unit]

def sendNWithDelay(
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]]
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]]
): F[Unit]

def sendWithDelay(
messageFactory: MessageFactory[F] => F[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]
messageFactory: MessageFactory[F] => F[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): F[Unit]

def send(messageFactory: MessageFactory[F] => F[(JmsMessage[F], DestinationName)]): F[Unit]
def send(messageFactory: MessageFactory[F] => F[(JmsMessage, DestinationName)]): F[Unit]

}

Expand All @@ -48,7 +48,7 @@ object JmsProducer {
} yield new JmsProducer[F] {

override def sendN(
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], DestinationName)]]
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage, DestinationName)]]
): F[Unit] =
for {
ctx <- pool.dequeue1
Expand All @@ -60,7 +60,7 @@ object JmsProducer {
} yield ()

override def sendNWithDelay(
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]]
f: MessageFactory[F] => F[NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]]
): F[Unit] =
for {
ctx <- pool.dequeue1
Expand All @@ -73,7 +73,7 @@ object JmsProducer {
} yield ()

override def sendWithDelay(
f: MessageFactory[F] => F[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]
f: MessageFactory[F] => F[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): F[Unit] =
for {
ctx <- pool.dequeue1
Expand All @@ -82,7 +82,7 @@ object JmsProducer {
_ <- pool.enqueue1(ctx)
} yield ()

override def send(f: MessageFactory[F] => F[(JmsMessage[F], DestinationName)]): F[Unit] =
override def send(f: MessageFactory[F] => F[(JmsMessage, DestinationName)]): F[Unit] =
for {
ctx <- pool.dequeue1
(message, destination) <- f(mf)
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/jms4s/JmsTransactedConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import jms4s.model.SessionType
import scala.concurrent.duration.FiniteDuration

trait JmsTransactedConsumer[F[_]] {
def handle(f: JmsMessage[F] => F[TransactionAction[F]]): F[Unit]
def handle(f: JmsMessage => F[TransactionAction[F]]): F[Unit]
}

object JmsTransactedConsumer {
Expand All @@ -42,7 +42,7 @@ object JmsTransactedConsumer {
concurrencyLevel: Int,
messageFactory: MessageFactory[F]
): JmsTransactedConsumer[F] =
(f: JmsMessage[F] => F[TransactionAction[F]]) =>
(f: JmsMessage => F[TransactionAction[F]]) =>
Stream
.emits(0 until concurrencyLevel)
.as(
Expand Down Expand Up @@ -99,7 +99,7 @@ object JmsTransactedConsumer {
}

object JmsTransactedConsumerPool {
case class Received[F[_]](message: JmsMessage[F], context: JmsContext[F], consumer: JmsMessageConsumer[F])
case class Received[F[_]](message: JmsMessage, context: JmsContext[F], consumer: JmsMessageConsumer[F])
}

sealed abstract class TransactionAction[F[_]] extends Product with Serializable {
Expand All @@ -125,31 +125,31 @@ object JmsTransactedConsumer {
}

private[jms4s] case class ToSend[F[_]](
messagesAndDestinations: NonEmptyList[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]
messagesAndDestinations: NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
)

def commit[F[_]]: TransactionAction[F] = Commit[F]()

def rollback[F[_]]: TransactionAction[F] = Rollback[F]()

def sendN[F[_]: Functor](
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], DestinationName)]]
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, DestinationName)]]
): Send[F] =
Send[F](mf =>
messageFactory(mf).map(nel => nel.map { case (message, name) => (message, (name, None)) }).map(ToSend[F])
)

def sendNWithDelay[F[_]: Functor](
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]]
messageFactory: MessageFactory[F] => F[NonEmptyList[(JmsMessage, (DestinationName, Option[FiniteDuration]))]]
): Send[F] =
Send[F](mf => messageFactory(mf).map(ToSend[F]))

def sendWithDelay[F[_]: Functor](
messageFactory: MessageFactory[F] => F[(JmsMessage[F], (DestinationName, Option[FiniteDuration]))]
messageFactory: MessageFactory[F] => F[(JmsMessage, (DestinationName, Option[FiniteDuration]))]
): Send[F] =
Send[F](mf => messageFactory(mf).map(x => ToSend[F](NonEmptyList.one(x))))

def send[F[_]: Functor](messageFactory: MessageFactory[F] => F[(JmsMessage[F], DestinationName)]): Send[F] =
def send[F[_]: Functor](messageFactory: MessageFactory[F] => F[(JmsMessage, DestinationName)]): Send[F] =
Send[F](mf =>
messageFactory(mf).map { case (message, name) => ToSend[F](NonEmptyList.one((message, (name, None)))) }
)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/jms4s/jms/JmsContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ class JmsContext[F[_]: Sync: Logger: ContextShift: Concurrent](
)
.map(context => new JmsContext(context, blocker))

def send(destinationName: DestinationName, message: JmsMessage[F]): F[Unit] =
def send(destinationName: DestinationName, message: JmsMessage): F[Unit] =
createDestination(destinationName)
.flatMap(destination => blocker.delay(context.createProducer().send(destination.wrapped, message.wrapped)))
.map(_ => ())

def send(destinationName: DestinationName, message: JmsMessage[F], delay: FiniteDuration): F[Unit] =
def send(destinationName: DestinationName, message: JmsMessage, delay: FiniteDuration): F[Unit] =
for {
destination <- createDestination(destinationName)
p <- Sync[F].delay(context.createProducer())
Expand All @@ -57,7 +57,7 @@ class JmsContext[F[_]: Sync: Logger: ContextShift: Concurrent](
)
} yield new JmsMessageConsumer[F](consumer)

def createTextMessage(value: String): F[JmsTextMessage[F]] =
def createTextMessage(value: String): F[JmsTextMessage] =
Sync[F].delay(new JmsTextMessage(context.createTextMessage(value)))

def commit: F[Unit] = blocker.delay(context.commit())
Expand Down
Loading

0 comments on commit 059a126

Please sign in to comment.