autoscale: true
- Senior Software Engineer @ Moneyfarm
- Several years in the scala/typelevel ecosystem
- Member of @FPinBO
- a sample use case, a reference library to wrap
- designing library apis
- introduce a bunch of building blocks
- refine edges, evaluate alternatives, iterate
Our focus here is NOT on building the coolest library doing the coolest thing ever.
We'll just put our attention on designing a set of APIs which wraps an existing lib written in the good old imperative way, using Pure Functional Programming and the Typelevel stack.
Java Message Service a.k.a. JMS
- can be used to facilitate the sending and receiving of messages between enterprise software systems, whatever it means enterprise!
- a bunch of Java interfaces
- each provider offers an implementation (e.g. IBM MQ, ActiveMQ, RabbitMQ, etc...)
oldstable enough (born in 1998, latest revision in 2015)- its apis are a good testbed for sketching a purely functional wrapper
- found pretty much nothing about (no FP-like bindings...)
public void receiveMessage(ConnectionFactory connectionFactory, String queueName){
try (
JMSContext context = connectionFactory.createContext(Session.SESSION_TRANSACTED);
){
Queue queue = context.createQueue(queueName);
JMSConsumer consumer = context.createConsumer(queue);
Message msg = consumer.receive();
// ... do something useful ...
context.commit();
} catch (JMSRuntimeException ex) {
// ...
}
}
^ - JMSContext
is in charge of opening low level stuff (connections, sessions, ...), implements AutoClosable
(see the try-with-resources block)
JMSConsumer
is in charge of receiving messages, via:
Message receive()
will block indefinitelyMessage receive(long timeout)
will block up to a timeoutMessage receiveNoWait()
receives the next message if one is immediately available- other variants...
- JMSConsumer is AutoClosable as well
- In this usage of JMS we're using transacted sessions in order to explicitly commit or rollback the context (all the pending messages)
JMSRuntimeException
is an unchecked exception
- evaluate what is the design which better supports our intent
- prevent the developer using our lib from doing wrong things (e.g. unconfirmed messages, deadlocks, etc...) by design
- offering a high-level set of APIs
public void receiveMessage(ConnectionFactory connectionFactory, String queueName){
try (
JMSContext context = connectionFactory.createContext(Session.SESSION_TRANSACTED);
){
Queue queue = context.createQueue(queueName);
JMSConsumer consumer = context.createConsumer(queue);
Message msg = consumer.receive();
// ... do something useful ...
context.commit();
} catch (JMSRuntimeException ex) {
// ...
}
}
Let's start from the low level stuff...
- how to handle side-effects?
- how to handle the resource lifecycle?
- enable capturing and controlling actions - a.k.a effects - that your program wishes to perform within a resource-safe, typed context with seamless support for concurrency and coordination
- these effects may be asynchronous (callback-driven) or synchronous (directly returning values); they may return within microseconds or run infinitely.
[.column]
[.code-highlight: 1-8] [.code-highlight: all]
object IO {
def delay[A](a: => A): IO[A]
def raiseError[A](e: Throwable): IO[A]
def async[A](k: /* ... */): IO[A]
def blocking[A](/* ... */): IO[A]
...
}
class IO[A] {
def map[B](f: A => B): IO[B]
def flatMap[B](f: A => IO[B]): IO[B]
...
}
[.column]
[.code-highlight: none] [.code-highlight: 1-4] [.code-highlight: 7-8] [.code-highlight: 7-10] [.code-highlight: 7-11] [.code-highlight: all]
val ioInt: IO[Int] =
IO.delay { println("hello") }
.map(_ => 1)
val program: IO[Unit] =
for {
i1 <- ioInt
_ <- IO.sleep(i1.second)
_ <- IO.raiseError( // not throwing!
new RuntimeException("boom!"))
i2 <- ioInt //comps is short-circuted
} yield ()
[.column]
[.code-highlight: none] [.code-highlight: all]
> Output:
> hello
> <...1 second...>
> RuntimeException: boom!
[.background-color: #FFFFFF]
[.code-highlight: 1, 11] [.code-highlight: 1-4, 11] [.code-highlight: 1-11] [.code-highlight: all]
sealed abstract class JmsContext(private[lib] val raw: javax.jms.JMSContext) {
def createQueue(queue: QueueName): IO[JmsQueue] =
IO.delay(new JmsQueue(raw.createQueue(queue.value)))
def makeJmsConsumer(queueName: QueueName): IO[JmsMessageConsumer] =
for {
destination <- createQueue(queueName)
consumer <- IO.delay(raw.createConsumer(destination.wrapped))
} yield new JmsMessageConsumer(consumer)
}
class JmsTransactedContext private[lib] (
override private[lib] val raw: javax.jms.JMSContext) extends JmsContext(raw)
- handle JMSRuntimeException ✅
- handle the resource lifecycle ❌
- doesn't leak
- handles properly terminal signals (e.g.
SIGTERM
) by default (no need to register a shutdown hook) - do the right thingTM by design
[.column]
[.code-highlight: 1-8] [.code-highlight: 10-16] [.code-highlight: all]
object Resource {
def make[A](
acquire: IO[A])(
release: A => IO[Unit]): Resource[A]
def fromAutoCloseable[A <: AutoCloseable](
acquire: IO[A]): Resource[A]
}
class Resource[A] {
def map[B](f: A => B): Resource[B]
def flatMap[B](f: A => Resource[B]): Resource[B]
def use[B](f: A => IO[B]): IO[B]
...
}
[.footer: NB: not actual code, Resource is polymorphic in the effect type]
^ A note on the simplification
^ Every time you need to use something which implements AutoClosable
, you should really be using Resource
!
[.column]
[.code-highlight: none] [.code-highlight: 1-5] [.code-highlight: all]
val sessionPool: Resource[MySessionPool] =
for {
connection <- openConnection()
sessions <- openSessionPool(connection)
} yield sessions
sessionPool.use { sessions =>
// use sessions to do whatever things!
}
[.column]
[.code-highlight: none] [.code-highlight: all]
Output:
> Acquiring connection
> Acquiring sessions
> Using sessions
> Releasing sessions
> Releasing connection
^ Nested resources are released in reverse order of acquisition
[.background-color: #FFFFFF]
[.code-highlight: 8-10] [.code-highlight: 6-11] [.code-highlight: all]
sealed abstract class JmsContext(private[lib] val raw: javax.jms.JMSContext) {
def createQueue(queue: QueueName): IO[JmsQueue] =
IO.delay(new JmsQueue(raw.createQueue(queue.value)))
def makeJmsConsumer(queueName: QueueName): Resource[IO, JmsMessageConsumer] =
for {
destination <- Resource.eval(createQueue(queueName))
consumer <- Resource.fromAutoCloseable(
IO.delay(raw.createConsumer(destination.wrapped)))
} yield new JmsMessageConsumer(consumer)
}
class JmsTransactedContext private[lib] (
override private[lib] val raw: javax.jms.JMSContext) extends JmsContext(raw)
- handle JMSRuntimeException ✅
- handle the resource lifecycle ✅
^ You can lift any IO[A]
into a Resource[A]
with a no-op release via Resource.eval
[.code-highlight: 1-3, 12] [.code-highlight: 4] [.code-highlight: 4-6] [.code-highlight: 4-10] [.code-highlight: all]
class JmsMessageConsumer private[lib] (
private[lib] val wrapped: javax.jms.JMSConsumer
) {
val receive: IO[JmsMessage] =
for {
recOpt <- IO.delay(Option(wrapped.receiveNoWait()))
rec <- recOpt match {
case Some(message) => IO.pure(new JmsMessage(message))
case None => receive
}
} yield rec
}
- only exposing
receive
, which is anIO
value which:- repeats a check-and-receive operation (
receiveNoWait()
) till a message is ready - completes the IO with the message read
- repeats a check-and-receive operation (
[.code-highlight: 8] [.code-highlight: all]
class JmsMessageConsumer private[lib] (
private[lib] val wrapped: javax.jms.JMSConsumer,
private[lib] val pollingInterval: FiniteDuration
) {
val receive: IO[JmsMessage] =
for {
recOpt <- IO.blocking(Option(wrapped.receive(pollingInterval.toMillis)))
rec <- recOpt match {
case Some(message) => IO.pure(new JmsMessage(message))
case None => receive
}
} yield rec
}
- pretty much the same as the former
- leveraging
receive(timeout)
and wrapping the blocking operation inIO.blocking
object SampleConsumer extends IOApp.Simple {
override def run: IO[Unit] = {
val jmsConsumerRes = for {
jmsContext <- ??? // A Resource[JmsContext] instance for a given provider
consumer <- jmsContext.makeJmsConsumer(queueName)
} yield consumer
jmsConsumerRes
.use(consumer =>
for {
msg <- consumer.receive
textMsg <- IO.fromTry(msg.tryAsJmsTextMessage)
_ <- logger.info(s"Got 1 message with text: $textMsg. Ending now.")
} yield ()
)
}
}
IOApp
describes a main which executes anIO
- It's the single entry point to a pure program (a.k.a. End of the world).
- It runs (interprets) the effects described in the
IO
!
object ibmMQ {
// ...
def makeJmsTransactedContext(config: Config): Resource[IO, JmsTransactedContext] =
for {
context <- Resource.fromAutoCloseable(IO.delay {
val connectionFactory: MQConnectionFactory = new MQConnectionFactory()
connectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT)
connectionFactory.setQueueManager(config.qm.value)
connectionFactory.setConnectionNameList(hosts(config.endpoints))
connectionFactory.setChannel(config.channel.value)
connectionFactory.createContext(
username.value,
config.password,
javax.jms.Session.SESSION_TRANSACTED // support for at-least-once
)
})
} yield new JmsTransactedContext(context)
}
That's it!
- effects handled respecting referential transparency
- resources get acquired and released in order, the user can't leak them
- the business logic is made by pure functions
- still low level
- how to specify message confirmation?
- what if the user needs to implement a never-ending concurrent message consumer?
- Let's evaluate how we can model an api for a never-ending message consumer!
[.column] [.code-highlight: 1, 27] [.code-highlight: 1, 3-7, 27] [.code-highlight: 1, 9, 27] [.code-highlight: 1, 10, 27] [.code-highlight: 1, 3-13, 27]
object AtLeastOnceConsumer {
sealed trait CommitAction
object CommitAction {
case object Commit extends CommitAction
case object Rollback extends CommitAction
}
type Committer = CommitAction => IO[Unit]
type Consumer = Stream[IO, JmsMessage]
def make(context: JmsTransactedContext,
queueName: QueueName): Resource[IO, (Consumer, Committer)] = {
val committer = (txRes: CommitAction) =>
txRes match {
case CommitAction.Commit => IO.blocking(context.raw.commit())
case CommitAction.Rollback => IO.blocking(context.raw.rollback())
}
val buildStreamingConsumer = (consumer: JmsMessageConsumer) =>
Stream.eval(consumer.receive).repeat
context
.makeJmsConsumer(queueName)
.map(buildStreamingConsumer)
.map(consumer => (consumer, committer))
}
}
[.column]
[.code-highlight: none]
object Demo extends IOApp.Simple {
override def run: IO[Unit] =
jmsTransactedContextRes.flatMap(ctx =>
AtLeastOnceConsumer.make(ctx, queueName))
.use {
case (consumer, committer) =>
consumer.evalMap { msg =>
// whatever business logic you need to perform
logger.info(msg.show) >>
committer(CommitAction.Commit)
}
.compile.drain
}
}
[.background-color: #FFFFFF]
[.code-highlight: 1-6] [.code-highlight: 8-14] [.code-highlight: all]
A stream producing output of type O
and which may evaluate IO
effects.
object Stream {
def emit[A](a: A): Stream[A]
def emits[A](as: List[A]): Stream[A]
def eval[A](f: IO[A]): Stream[A]
...
}
class Stream[O]{
def map[O2](f: O => O2): Stream[O2]
def flatMap[O2](f: O => Stream[O2]): Stream[O2]
...
def evalMap[O2](f: O => IO[O2]): Stream[O2]
def repeat: Stream[O]
}
[.footer: NB: not actual code, just a simplification sticking with the IO type]
[.column] [.code-highlight: 1, 3-13, 27]
object AtLeastOnceConsumer {
sealed trait CommitAction
object CommitAction {
case object Commit extends CommitAction
case object Rollback extends CommitAction
}
type Committer = CommitAction => IO[Unit]
type Consumer = Stream[IO, JmsMessage]
def make(context: JmsTransactedContext,
queueName: QueueName): Resource[IO, (Consumer, Committer)] = {
val committer = (txRes: CommitAction) =>
txRes match {
case CommitAction.Commit => IO.blocking(context.raw.commit())
case CommitAction.Rollback => IO.blocking(context.raw.rollback())
}
val buildStreamingConsumer = (consumer: JmsMessageConsumer) =>
Stream.eval(consumer.receive).repeat
context
.makeJmsConsumer(queueName)
.map(buildStreamingConsumer)
.map(consumer => (consumer, committer))
}
}
[.column]
[.code-highlight: none] [.code-highlight: 5-12]
object Demo extends IOApp.Simple {
override def run: IO[Unit] =
jmsTransactedContextRes.flatMap(ctx =>
AtLeastOnceConsumer.make(ctx, queueName))
.use {
case (consumer, committer) =>
consumer.evalMap { msg =>
// whatever business logic you need to perform
logger.info(msg.show) >>
committer(CommitAction.Commit)
}
.compile.drain
}
}
[.column] [.code-highlight: 9-13, 26] [.code-highlight: 9, 14-18, 26] [.code-highlight: 10, 19-20, 26] [.code-highlight: 9-25, 26] [.code-highlight: all]
object AtLeastOnceConsumer {
sealed trait CommitAction
object CommitAction {
case object Commit extends CommitAction
case object Rollback extends CommitAction
}
type Committer = CommitAction => IO[Unit]
type Consumer = Stream[IO, JmsMessage]
def make(context: JmsTransactedContext,
queueName: QueueName): Resource[IO, (Consumer, Committer)] = {
val committer = (txRes: CommitAction) =>
txRes match {
case CommitAction.Commit => IO.blocking(context.raw.commit())
case CommitAction.Rollback => IO.blocking(context.raw.rollback())
}
val buildStreamingConsumer = (consumer: JmsMessageConsumer) =>
Stream.eval(consumer.receive).repeat
context
.makeJmsConsumer(queueName)
.map(buildStreamingConsumer)
.map(consumer => (consumer, committer))
}
}
[.column]
[.code-highlight: none] [.code-highlight: all]
object Demo extends IOApp.Simple {
override def run: IO[Unit] =
jmsTransactedContextRes.flatMap(ctx =>
AtLeastOnceConsumer.make(ctx, queueName))
.use {
case (consumer, committer) =>
consumer.evalMap { msg =>
// whatever business logic you need to perform
logger.info(msg.show) >>
committer(CommitAction.Commit)
}
.compile.drain
}
}
- Inspired by fs2-rabbit
- all effects are expressed in the types (
IO
, etc...) ✅ - resource lifecycle handled via
Resource
✅ - messages in the queue are exposed via a
Stream
✅
[.column]
-
what happens if the client forget to
commit
/rollback
?consumer.evalMap { msg => logger.info(msg.show) }
-
what happens if the client
commit
/rollback
multiple times the same message?consumer.evalMap { msg => committer(CommitAction.Commit) >> committer(CommitAction.Rollback) }
[.column]
-
what happens if the client evaluates the stream multiple times?
consumer.evalMap{ ... } ++ consumer.evalMap{ ... }
-
how to support concurrency?
- Let's think how is the API we'd like to expose...
- And evaluate how to actually implement that!
Ideally...
consumer.handle { msg =>
for {
_ <- logger.info(msg.show)
_ <- ??? // ... actual business logic...
} yield TransactionResult.Commit
}
handle
should be provided with a functionJmsMessage
=>IO[TransactionResult]
- lower chanches for the client to do the wrong thing!
- if errors are raised in the handle function, this is a bug and the program will terminate without confirming the message
- errors regarding the business logic should be handled inside the program, reacting accordingly (ending with either a commit or a rollback)
[.column] [.code-highlight: 1-4,15] [.code-highlight: 1-13,15] [.code-highlight: 1-15] [.code-highlight: all]
class AtLeastOnceConsumer private[lib] (
private[lib] val ctx: JmsContext,
private[lib] val consumer: JmsMessageConsumer
) {
def handle(
runBusinessLogic: JmsMessage => IO[TransactionResult]): IO[Nothing] =
consumer.receive
.flatMap(runBusinessLogic)
.flatMap {
case TransactionResult.Commit => IO.blocking(ctx.raw.commit())
case TransactionResult.Rollback => IO.blocking(ctx.raw.rollback())
}
.foreverM
}
object AtLeastOnceConsumer {
sealed trait TransactionResult
object TransactionResult {
case object Commit extends TransactionResult
case object Rollback extends TransactionResult
}
def make(
context: JmsTransactedContext,
queueName: QueueName): Resource[IO, AtLeastOnceConsumer] =
context.makeJmsConsumer(queueName).map(consumer =>
new AtLeastOnceConsumer(context, consumer))
}
[.column] [.code-highlight: none] [.code-highlight: 5-13]
object Demo extends IOApp.Simple {
override def run: IO[Unit] =
jmsTransactedContextRes.flatMap(ctx =>
AtLeastOnceConsumer.make(ctx, queueName))
.use(consumer =>
consumer.handle { msg =>
for {
_ <- logger.info(msg.show)
_ <- ??? // ... actual business logic...
} yield TransactionResult.Commit
}
)
}
[.column]
- all effects are expressed in the types (
IO
, etc...) ✅ - resource lifecycle handled via
Resource
✅ - not exposing messages to
Stream
anymore, it made things harder to get the design right! - the client is
forcedguided to do the right thingTM ✅
Still, concurrency is not there yet...
- A
JMSContext
is the main interface in the simplified JMS API introduced for JMS 2.0. - In terms of the JMS 1.1 API a
JMSContext
should be thought of as representing both aConnection
and aSession
- A connection represents a physical link to the JMS server and a session represents a single-threaded context for sending and receiving messages.
- Applications which require multiple sessions to be created on the same connection should:
- create a root contenxt using the
createContext
methods on theConnectionFactory
- then use the
createContext
method on the root context to create additional contexts instances that use the same connection - all these
JMSContext
objects are application-managed and must be closed when no longer needed by calling their close method.
- create a root contenxt using the
- JmsContext is not thread-safe!
Ref: https://docs.oracle.com/javaee/7/api/javax/jms/JMSContext.html
[.column] [.code-highlight: none] [.code-highlight: 8-15] [.code-highlight: 8-16] [.code-highlight: 1-17] [.code-highlight: 1-24] [.code-highlight: 1-25, 37] [.code-highlight: 1-26, 36-37] [.code-highlight: 1-37] [.code-highlight: all]
object AtLeastOnceConsumer {
def make(
rootContext: JmsTransactedContext,
queueName: QueueName,
concurrencyLevel: Int
): Resource[IO, AtLeastOnceConsumer] =
Pool.Builder(
for {
ctx <- rootContext.makeTransactedContext
consumer <- ctx.makeJmsConsumer(queueName)
} yield (ctx, consumer)
)
.withMaxTotal(concurrencyLevel)
.build
.map(pool => new AtLeastOnceConsumer(pool, concurrencyLevel))
}
class AtLeastOnceConsumer private[lib] (
private[lib] val pool: Pool[IO, (JmsContext, JmsMessageConsumer)],
private[lib] val concurrencyLevel: Int
) {
def handle(runBusinessLogic: JmsMessage => IO[TransactionResult]): IO[Nothing] =
IO.parSequenceN(concurrencyLevel) {
pool.take.use { res =>
val (ctx, consumer) = res.value
for {
message <- consumer.receive
txRes <- runBusinessLogic(message)
_ <- txRes match {
case TransactionResult.Commit => IO.blocking(ctx.raw.commit())
case TransactionResult.Rollback => IO.blocking(ctx.raw.rollback())
}
} yield ()
}
}
.foreverM
}
[.column]
[.code-highlight: none] [.code-highlight: 5-13]
object Demo extends IOApp.Simple {
override def run: IO[Unit] =
jmsTransactedContextRes.flatMap(ctx =>
AtLeastOnceConsumer.make(ctx, queueName, 5))
.use(consumer =>
consumer.handle { msg =>
for {
_ <- logger.info(msg.show)
_ <- ??? // ... actual business logic...
} yield TransactionResult.Commit
}
)
}
- Using a proper resource pool, from
typelevel/keypool
- all effects are expressed in the types (
IO
, etc...) ✅ - resource lifecycle handled via
Resource
✅ - the client is
forcedguided to do the right thingTM ✅ - concurrency ✅
- We used a bunch of data types (
IO
,Resource
,Stream
) - We used a bunch of common operators (
map
,flatMap
) - We wrote a little code, iteratively improving the design
- We achieved what we needed: a fully functioning functional minimal lib
- https://github.com/AL333Z/fp-lib-design
- https://github.com/typelevel/cats-effect
- https://github.com/fp-in-bo/jms4s
- https://github.com/profunktor/fs2-rabbit
- Couldn't fit in 45 minutes :)
- The actual lib is written with Tagless Final.
- Not worth it, for very different reasons.
- Lightbend stack: not as composable as the FP counterpart, side-effects, missing referential transparent abstractions for effects.
- ZIO: I just don't like their rhetoric.