autoscale: true
- FP is too hard
- FP is not pragmatic
- FP is not suited to deliver value to the business
- A sample architecture
- Introduce a bunch of building blocks
- Design architecture components
[.background-color: #FFFFFF]
- Let's assume we are provided with domain events from an Order Management Platform (e.g. OrderCreated, OrderShipped, etc..), via a RabbitMQ broker
- We need to build an Order History Service
^ Talk ONLY about REQUIREMENTS!
[.background-color: #FFFFFF]
- a component which projects a (read) model, in a MongoDB collection
- so that an HTTP service can query the collection returning orders
^ Talk ONLY about COMPONENTS!
Our focus here is NOT on the System Architecture
We'll just put our attention on implementing an architecture component (the projector) adopting Functional Programming principles
We WON'T be using:
var
throw
- methods returning
Unit
- poorly typed definitions (
Any
,Object
, etc...) - low level concurrency mechanisms (
Thread
,Actor
, etc..)
[.background-color: #FFFFFF]
- Consume a stream of events from a RabbitMQ queue
- Persist a read model to a MongoDB collection
[.background-color: #FFFFFF]
- read a bunch of configs from the env
- interact with a RabbitMQ broker 2.1 open a connection 2.2 receive a Stream of events from the given queue
- interact with a MongoDB cluster 3.1 open a connection 3.2 store the model to the given collection
object Mongo {
case class Auth(username: String, password: String)
case class Config(auth: Auth, addresses: List[String], /*...*/)
// reading from env variables
lazy val config: Config = {
val user = System.getenv("MONGO_USERNAME")
val password = System.getenv("MONGO_PASSWORD")
//...reading other env vars ... //
Config(Auth(user, password), endpoints, port, db, collection)
}
}
In most cases:
wrap the impure types/operations, only expose a safer version of its operations
- enable capturing and controlling actions - a.k.a effects - that your program wishes to perform within a resource-safe, typed context with seamless support for concurrency and coordination
- these effects may be asynchronous (callback-driven) or synchronous (directly returning values); they may return within microseconds or run infinitely.
A value of type IO[A]
is a computation that, when evaluated, can perform effects before either
- yielding exactly one result: a value of type
A
- raising a failure (
Throwable
)
- are pure and immutable
- represents just a description of a side effectful computation
- are not evaluated (suspended) until the end of the world
- respects referential transparency
An expression may be replaced by its value (or anything having the same value) without changing the result of the program
[.column]
[.code-highlight: none] [.code-highlight: 1-3] [.code-highlight: 5-10] [.code-highlight: 11-13] [.code-highlight: all]
def askInt(): Future[Int] =
Future(println("Please, give me a number:"))
.flatMap(_ => Future(io.StdIn.readLine().toInt))
def askTwoInt(): Future[(Int, Int)] =
for {
x <- askInt()
y <- askInt()
} yield (x , y)
def program(): Future[Unit] =
askTwoInt()
.flatMap(pair => Future(println(s"Result: ${pair}")))
[.column] [.code-highlight: none] [.code-highlight: all]
> Output:
> Please, give me a number:
> 4
> Please, give me a number:
> 7
> Result: (4,7)
[.column]
[.code-highlight: none] [.code-highlight: 5-10] [.code-highlight: all]
def askInt(): Future[Int] =
Future(println("Please, give me a number:"))
.flatMap(_ => Future(io.StdIn.readLine().toInt))
def askTwoInt(): Future[(Int, Int)] =
val sameAsk = askInt()
for {
x <- sameAsk
y <- sameAsk
} yield (x , y)
def program(): Future[Unit] =
askTwoInt()
.flatMap(pair => Future(println(s"Result: ${pair}")))
[.column] [.code-highlight: none] [.code-highlight: all]
> Output:
> Please, give me a number:
> 4
> Result: (4,4)
We just wanted to reduce duplication through an extract var!1
- code easier to reason about
- code easier to refactor
- code easier to compose
- we're already used to referential transparency since our math lessons!
- we're already using a lot of data types in a referential transparent manner (e.g.
List
,Option
,Try
,Either
)!
In most cases:
wrap the impure types/operations, only expose a safer version of its operations which will need to be referential transparent
[.column]
[.code-highlight: none] [.code-highlight: all]
object IO {
def delay[A](a: => A): IO[A]
def pure[A](a: A): IO[A]
def raiseError[A](e: Throwable): IO[A]
def sleep(duration: FiniteDuration): IO[Unit]
def async[A](k: /* ... */): IO[A]
...
}
[.column]
[.code-highlight: none] [.code-highlight: all]
class IO[A] {
def map[B](f: A => B): IO[B]
def flatMap[B](f: A => IO[B]): IO[B]
def *>[B](fb: IO[B]): IO[B]
...
}
[.column] [.code-highlight: 1-4] [.code-highlight: 7-8] [.code-highlight: 7-9] [.code-highlight: 7-11] [.code-highlight: 7-12] [.code-highlight: all]
val ioInt: IO[Int] =
IO.delay { println("hello") }
.map(_ => 1)
val program: IO[Unit] =
for {
i1 <- ioInt
_ <- IO.sleep(i1.second)
_ <- IO.raiseError( // not throwing!
new RuntimeException("boom!"))
i2 <- ioInt //comps is short-circuted
} yield ()
[.column] [.code-highlight: none] [.code-highlight: all]
> Output:
> hello
> <...1 second...>
> RuntimeException: boom!
[.code-highlight: 1-3, 15] [.code-highlight: 1, 5-7,13-15] [.code-highlight: 8-12] [.code-highlight: all]
object Mongo {
case class Auth(username: String, password: String)
case class Config(auth: Auth, addresses: List[String], /*...*/)
object Config {
// a delayed computation which read from env variables
val load: IO[Config] =
for {
user <- IO.delay(System.getenv("MONGO_USERNAME"))
password <- IO.delay(System.getenv("MONGO_PASSWORD"))
//...reading other env vars ... //
} yield Config(Auth(user, password), endpoints, port, db, collection)
}
}
val ioOps =
for {
mongoConfig <- Mongo.Config.load
rabbitConfig <- Rabbit.Config.load
// TODO use configs to do something!
} yield ()
^ Let's take a detour to talk about how this composed computation gets executed..
If IO values are just a description of effectful computations which can be composed and so on...
Who's gonna run the suspended computation then?
[.background-color: #FFFFFF]
[.code-highlight: 1-2,8] [.code-highlight: all]
IOApp
describes a main which executes anIO
- as the single entry point to a pure program.
object OrderHistoryProjectorApp extends IOApp.Simple {
override def run: IO[Unit] =
for {
mongoConfig <- Mongo.Config.load
rabbitConfig <- Rabbit.Config.load
// TODO use configs to start the main logic!
} yield ()
}
^ IOApp
provides an interpreter which will evaluate the IO
value returned by the run
method, dealing with all the dirty details of the JVM runtime, so you don't have to!
read a bunch of configs from the env- interact with a RabbitMQ broker 2.1 open a connection 2.2 receive a Stream of events from the given queue
- interact with a MongoDB cluster 3.1 open a connection 3.2 store the model to the given collection
Using fs2-rabbit
lib which:
- provides a purely functional api
- let me introduce you a bunch of useful data types
[.background-color: #FFFFFF]
val channel: Resource[AMQPChannel] =
for {
rabbitClient <- RabbitClient.resource(config)
channel <- rabbitClient.createConnectionChannel
} yield channel
- doesn't leak
- handles properly terminal signals (e.g.
SIGTERM
) by default (no need to register a shutdown hook) - do the right thingTM by design
- avoid the need to reboot a container every once in a while :)
[.background-color: #FFFFFF]
[.code-highlight: 1-5] [.code-highlight: 7-13] [.code-highlight: all]
object Resource {
def make[A](
acquire: IO[A])(
release: A => IO[Unit]): Resource[A]
}
class Resource[A] {
def use[B](f: A => IO[B]): IO[B]
def map[B](f: A => B): Resource[B]
def flatMap[B](f: A => Resource[B]): Resource[B]
...
}
[.footer: NB: not actual code, just a simplification sticking with IO type] ^ A note on the simplification
[.code-highlight: 1,9] [.code-highlight: 2-3] [.code-highlight: 5-6] [.code-highlight: 8] [.code-highlight: all]
def mkResource(s: String): Resource[String] = {
val acquire =
IO.delay(println(s"Acquiring $s")) *> IO.pure(s)
def release(s: String) =
IO.delay(println(s"Releasing $s"))
Resource.make(acquire)(release)
}
[.code-highlight: 1-3] [.code-highlight: 1-4] [.code-highlight: all]
[.column]
val r: Resource[(String, String)] =
for {
outer <- mkResource("outer")
inner <- mkResource("inner")
} yield (outer, inner)
r.use { case (a, b) =>
IO.delay(println(s"Using $a and $b"))
} // IO[Unit]
[.column]
[.code-highlight: none] [.code-highlight: all]
Output:
> Acquiring outer
> Acquiring inner
> Using outer and inner
> Releasing inner
> Releasing outer
[.column]
val sessionPool: Resource[MySessionPool] =
for {
connection <- openConnection()
sessions <- openSessionPool(connection)
} yield sessions
sessionPool.use { sessions =>
// use sessions to do whatever things!
}
[.column]
[.code-highlight: none] [.code-highlight: all]
Output:
> Acquiring connection
> Acquiring sessions
> Using sessions
> Releasing sessions
> Releasing connection
- Nested resources are released in reverse order of acquisition
- Easy to lift an
AutoClosable
toResource
, viaResource.fromAutoclosable
- Every time you need to use something which implements
AutoClosable
, you should really be usingResource
! - You can lift any
IO[A]
into aResource[A]
with a no-op release viaResource.eval
- not composable (no
map
,flatMap
, etc...) - no support for properly handling effects
[.code-highlight: 8] [.code-highlight: 9] [.code-highlight: 11-15] [.code-highlight: 10-16] [.code-highlight: 17] [.code-highlight: all]
object Rabbit {
//...
def consumerFrom(
config: Fs2RabbitConfig,
decoder: EnvelopeDecoder[IO, Try[OrderCreatedEvent]]
): Resource[(Acker, Consumer)] =
for {
rabbitClient <- RabbitClient.resource(config)
channel <- rabbitClient.createConnectionChannel
(acker, consumer) <- Resource.eval(
rabbitClient.createAckerConsumer(
queueName = QueueName("EventsFromOms"),
channel = channel,
decoder = decoder
) // IO[(Acker, Consumer)]
)
} yield (acker, consumer)
type Acker = AckResult => IO[Unit]
type Consumer = Stream[AmqpEnvelope[Try[OrderCreatedEvent]]]
}
type Consumer =
Stream[AmqpEnvelope[Try[OrderCreatedEvent]]]
- Simplify the way we write concurrent streaming consumers
- Pull-based, a consumer pulls its values by repeatedly performing pull steps
[.background-color: #FFFFFF]
[.code-highlight: 1-6] [.code-highlight: 8-13] [.code-highlight: all]
A stream producing output of type O
and which may evaluate IO
effects.
object Stream {
def emit[A](a: A): Stream[A]
def emits[A](as: List[A]): Stream[A]
def eval[A](f: IO[A]): Stream[A]
...
}
class Stream[O]{
def evalMap[O2](f: O => IO[O2]): Stream[O2]
...
def map[O2](f: O => O2): Stream[O2]
def flatMap[O2](f: O => Stream[O2]): Stream[O2]
}
[.footer: NB: not actual code, just a simplification sticking with IO type]
A sequence of effects...
Stream(1,2,3)
.repeat
.evalMap(i => IO.delay(println(i))
.compile
.drain
[.code-highlight: 1,14] [.code-highlight: 2] [.code-highlight: 3,12] [.code-highlight: 4-11] [.code-highlight: all]
class OrderHistoryProjector(consumer: Consumer, acker: Acker, logger: Logger) {
val project: IO[Unit] =
consumer.evalMap { envelope =>
envelope.payload match {
case Success(event) =>
logger.info("Received: " + envelope) *>
acker(AckResult.Ack(envelope.deliveryTag))
case Failure(e) =>
logger.error(e)("Error while decoding") *>
acker(AckResult.NAck(envelope.deliveryTag))
}
}
.compile.drain
}
read a bunch of configs from the envinteract with a RabbitMQ broker2.1open a connection2.2receive a Stream of events from the given queue- interact with a MongoDB cluster 3.1 open a connection 3.2 store the model to the given collection
Using mongo4cats
, a thin wrapper over the official mongodb driver, which is exposes purely functional apis
[.code-highlight: 1,12] [.code-highlight: 3,11] [.code-highlight: 3-4,11] [.code-highlight: 3-7,11] [.code-highlight: 3-9,11] [.code-highlight: 3-10,11] [.code-highlight: all]
object Mongo {
...
def collectionFrom(conf: Config): Resource[MongoCollection[Order]] = {
val clientSettings = ??? // conf to mongo-scala-driver settings
for {
client <- MongoClient.create(settings.build())
db <- Resource.eval(client.getDatabase(conf.databaseName))
collection <- Resource.eval(db.getCollectionWithCirceCodecs[Order](conf.collectionName))
} yield collection
}
}
read a bunch of configs from the envinteract with a RabbitMQ broker2.1open a connection2.2receive a Stream of events from the given queue- interact with a MongoDB cluster
3.1
open a connection3.2 store the model to the given collection
[.code-highlight: 1, 12] [.code-highlight: 2] [.code-highlight: 3-11] [.code-highlight: all]
class EventRepository(collection: MongoCollection[Order]) {
def store(event: OrderCreatedEvent): IO[Unit] =
collection
.insertOne(
Order(
orderNo = OrderNo(event.id),
company = Company(event.company),
email = Email(event.email),
lines = event.lines.map(...)
)
)
}
[.code-highlight: 1] [.code-highlight: 10] [.code-highlight: all]
class OrderHistoryProjector(eventRepo: EventRepository,
consumer: Consumer,
acker: Acker,
logger: Logger) {
val project: IO[Unit] =
consumer.evalMap { envelope =>
envelope.payload match {
case Success(event) =>
logger.info("Received: " + envelope) *>
eventRepo.store(event) *>
acker(AckResult.Ack(envelope.deliveryTag))
case Failure(e) =>
logger.error(e)("Error while decoding") *>
acker(AckResult.NAck(envelope.deliveryTag))
}
}
.compile.drain
}
read a bunch of configs from the envinteract with a RabbitMQ broker2.1open a connection2.2receive a Stream of events from the given queueinteract with a MongoDB cluster3.1open a connection3.2store the model to the given collection
How to achieve separation of concerns and have a good modularity?
- JVM application lifecycle is not so complex
- No need for magic, each dependency can be explicitly injected
- Acquiring/releasing resources should be handled as an effect
- a class with a private constructor
- a companion object with a
fromX/make
method (smart constructor)- taking dependencies as input
- usually returning
IO
/Resource
of the component class
[.footer: My view of Constructor Injection for effectful applications]
[.code-highlight: 1] [.code-highlight: 1-5] [.code-highlight: 9-13] [.code-highlight: all]
class OrderHistoryProjector private (
eventRepo: EventRepository,
consumer: Consumer,
acker: Acker,
logger: Logger) {
...
}
object OrderHistoryProjector {
def fromConfigs(mongoConfig: Mongo.Config,
rabbitConfig: Rabbit.Config
): Resource[OrderHistoryProjector] = ...
}
object OrderHistoryProjector {
def fromConfigs(
mongoConfig: Mongo.Config,
rabbitConfig: Fs2RabbitConfig
): Resource[OrderHistoryProjector] =
for {
(acker, consumer) <- Rabbit.consumerFrom(rabbitConfig, eventDecoder)
collection <- Mongo.collectionFrom(mongoConfig)
repo = EventRepository.fromCollection(collection)
logger <- Resource.eval(Slf4jLogger.create)
} yield new OrderHistoryProjector(repo, consumer, acker, logger)
}
[.code-highlight: 8-10] [.code-highlight: all]
object OrderHistoryProjectorApp extends IOApp.Simple {
def run: IO[Unit] =
for {
mongoConfig <- Mongo.Config.load
rabbitConfig <- Rabbit.Config.load
_ <- OrderHistoryProjector
.fromConfigs(mongoConfig, rabbitConfig) // acquire the needed resources
.use(_.project) // start to process the stream of events
} yield ()
}
- How to handle concurrency, execution contexts, blocking ops?
- How to track and handle errors?
- Do we really need advanced techniques?
- a production-ready component in under 300 LOC
- only 3 main datatypes:
IO
,Resource
,Stream
- no variables, no mutable state
- not even the M word!
- I could have written almost the same code in Kotlin, Swift or.. Haskell!
https://github.com/AL333Z/fp-in-industry https://typelevel.org/cats-effect/ https://fs2.io/ https://fs2-rabbit.profunktor.dev/
In all the slides I always omitted the additional effect type parameter!
Resource[F, A]
Stream[F, A]
RabbitClient[F]
MongoCollection[F]
Footnotes
-
Example gently stolen from my dear friend https://github.com/matteobaglini/onion-with-functional-programming ↩