Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
execution engine
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-ciocirlan committed Jul 10, 2024
1 parent f8dc476 commit f4af4e5
Showing 1 changed file with 73 additions and 77 deletions.
150 changes: 73 additions & 77 deletions _posts/2024-06-14-remote-code-execution-engine.md
Original file line number Diff line number Diff line change
@@ -1,37 +1,33 @@
---
title: "Distributed Remote Code Execution Engine"
title: "A Distributed Code Execution Engine with Scala and Pekko"
date: 2024-06-14
header:
image: "https://res.cloudinary.com/dkoypjlgr/image/upload/f_auto,q_auto:good,c_auto,w_1200,h_300,g_auto,fl_progressive/v1715952116/blog_cover_large_phe6ch.jpg"
tags: [scala,pekko,pekko-http,pekko-stream,pekko-cluster,docker,docker-compose,scala3]
excerpt: "Practical guide to building the distributed remote code execution engine in Scala and Pekko"
excerpt: "A Practical guide to building the distributed remote code execution engine in Scala and Pekko"
---

_by [Anzori (Nika) Ghurtchumelia](https://github.com/ghurtchu)_

## 1. Introduction

After a long hiatus, I am back with renewed passion and energy, eager to delve deeper into the Scala ecosystem. This time, I am committed to building something tangible and useful with the tools available. Let's embark on this exciting journey of exploration and learning together.
{% include video id="1uP6FTUn8_E" provider="youtube" %}

The greatest benefit of small side projects is the unique knowledge boost which can potentially be handy later in career.
## 1. Introduction

In this article we will attempt to build the remote code execution engine - the backend platform for websites such as [Hackerrank](https://hackerrank.com), [Leetcode](https://leetcode.com) and others.
In this article we will attempt to build the remote code execution engine - the backend platform for websites such as [Hackerrank](https://hackerrank.com), [LeetCode](https://leetcode.com) and others.

If, for some reason you're unfamiliar with the websites mentioned above, the basic usage flow is described below:
For such a platform, the basic usage flow is:
- Client sends code
- Backend runs it and responds with output

There you go, sounds simple, right?
Sounds simple, right? Right?...

Right, right...
Can you imagine how many things can go wrong here? The possibilities for failure are endless! However, we should address at least some of them.

Can you imagine how many things can go wrong here? It's the devil smirking in the corner, knowing, that the possibilities for failure are endless, however, we should address at least some of them.

To give you a quick idea: a separate blog post can be written only about the security, not to mention scalability, extensibility and a few other compulsory properties to make it production ready.
We can probably write a separate blog post about the security, scalability, extensibility and a few other compulsory properties to make it production ready.

The goal isn't to build the best one, nor it is to compete with the existing ones.

Put simply, the goal of this project is to get familiar with `Pekko` and its modules such as `pekko-http`, `pekko-stream`, `pekko-cluster` and a few interesting concepts revolving around actor model concurrency, such as:
Put simply, the goal of this project is to get familiar with [Apache Pekko](https://pekko.apache.org) (the open source version of Akka) and its modules such as `pekko-http`, `pekko-stream`, `pekko-cluster` and a few interesting concepts around actor model concurrency, such as:
- cluster nodes and formation
- cluster aware routers
- remote worker actors
Expand All @@ -43,9 +39,11 @@ Put simply, the goal of this project is to get familiar with `Pekko` and its mod

Let's get started then, shall we?

> _Hey, it's Daniel here. Apache Pekko is great, and Nika has done a great job showcasing its features in a compact project we can explore exhaustively in this article. If you need to get started with Pekko, I've covered Pekko (and Akka) in a comprehensive bundle of courses about [Akka/Pekko Actors](https://rockthejvm.com/p/akka-essentials), [Akka/Pekko Streams](https://rockthejvm.com/p/akka-streams), and [Akka/Pekko HTTP](https://rockthejvm.com/p/akka-http), all of which are used in this article. Check them out if you're interested._
## 2. Project Structure - here

I recommend checking out [the project on GitHub](https://github.com/ghurtchu/braindrill/) and following along that way.
To make the best of this article, I recommend checking out [the project on GitHub](https://github.com/ghurtchu/braindrill/) and following the code while reading this article, as many things will make sense along the way.

We will use `Scala 3.4.1`, `sbt 1.9.9`, `docker`, `pekko` and its modules to complete our project.

Expand All @@ -64,7 +62,8 @@ The initial project skeleton looks like the following:
- `Dockerfile` blueprint for running the app inside container
- `README.md` instructions for setting up the project locally

Nothing fancy, let's move on `build.sbt`:
Let's start with `build.sbt`:

```scala
ThisBuild / scalaVersion := "3.4.1"

Expand Down Expand Up @@ -120,14 +119,13 @@ addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1") // SBT plugin for using

## 3. Project Architecture

After a few iterations I came up with the architecture that can be horizontally scaled, if required.
Ideally, such projects must be scaled easily as long as the load is increased.
After a few iterations I came up with the architecture that can be horizontally scaled, if required. Ideally, such projects should be scaled easily as long as the load is increased.

For that we use tools such as Kubernetes or other container orchestration platforms. To make local development and deployment simpler we'll be using docker containers. More precisely we'll be using `docker-compose` to run a few containers together so that they form the cluster.
For that we use tools such as Kubernetes or other container orchestration platforms. To make local development and deployment simpler we'll be using Docker containers. More precisely we'll be using `docker-compose` to run a few containers together so that they form the cluster.

`docker-compose` doesn't support scalability out of the box because it's static, it means that we can't magically add new `worker` node to the running system. Again, for that we'd use Kubernetes, but it is out of the scope of this project.
`docker-compose` doesn't support scalability out of the box because it's static, it means that we can't magically add new `worker` nodes to the running system. Again, for that we'd use Kubernetes, but it is out of the scope of this project.

We have a `master` node and its role is to be the task distributor among `worker` nodes.
We have a `master` node and its role is to be the task distributor among `worker` nodes.

`http` is exposed on `master` node, acting as a gateway to outside world.

Expand Down Expand Up @@ -517,8 +515,8 @@ transformation {
load-balancer = 3
}
```
Here, it simply means that each node will have 32 worker actors and master node will have 3 load balancer actors.
In real world, choosing those numbers would depend on multiple variables that must be collected and analyzed in production.
Here, it means that each node will have 32 worker actors and master node will have 3 load balancer actors.
In the real world, choosing those numbers would depend on multiple variables that must be collected and analyzed in production.
In my opinion, those numbers are optimized based on empirical evidence rather than theoretical results.

### 4.2 Serialization
Expand Down Expand Up @@ -877,34 +875,34 @@ object CodeExecutor {
msg match {
case In.Execute(compiler, file, dockerImage, replyTo) =>
ctx.log.info(s"{}: executing submitted code", self)
val asyncExecuted: Future[In.Executed] = for
// timeout --signal=SIGKILL 2 docker run --rm --ulimit cpu=1 --memory=20m -v engine:/data -w /data rust rust /data/r.rust
ps <- run(
"timeout",
"--signal=SIGKILL",
"2", // 2 second timeout which sends SIGKILL if exceeded
"docker",
"run",
"--rm", // remove the container when it's done
"--ulimit", // set limits
"cpu=1", // 1 processor
"--memory=20m", // 20 M of memory
"-v", // bind volume
"engine:/data",
"-w", // set working directory to /data
"/data",
dockerImage,
compiler,
s"${file.getPath}"
)
val asyncExecuted: Future[In.Executed] = for {
// timeout --signal=SIGKILL 2 docker run --rm --ulimit cpu=1 --memory=20m -v engine:/data -w /data rust rust /data/r.rust
ps <- run(
"timeout",
"--signal=SIGKILL",
"2", // 2 second timeout which sends SIGKILL if exceeded
"docker",
"run",
"--rm", // remove the container when it's done
"--ulimit", // set limits
"cpu=1", // 1 processor
"--memory=20m", // 20 M of memory
"-v", // bind volume
"engine:/data",
"-w", // set working directory to /data
"/data",
dockerImage,
compiler,
s"${file.getPath}"
)
// error and success channels as streams
(successSource, errorSource) = src(ps.getInputStream) -> src(ps.getErrorStream)
((success, error), exitCode) <- successSource
.runWith(readOutput) // join success, error and exitCode
.zip(errorSource.runWith(readOutput))
.zip(Future(ps.waitFor))
_ = Future(file.delete) // remove file in the background to free up the memory
yield In.Executed(
} yield In.Executed(
output = if success.nonEmpty then success else error,
exitCode = exitCode,
replyTo = replyTo
Expand All @@ -913,7 +911,7 @@ object CodeExecutor {
ctx.pipeToSelf(asyncExecuted) {
case Success(executed) =>
ctx.log.info("{}: executed submitted code", self)
executed.exitCode match
executed.exitCode match {
case 124 | 137 =>
In.ExecutionFailed(
"The process was aborted because it exceeded the timeout",
Expand All @@ -925,6 +923,7 @@ object CodeExecutor {
replyTo
)
case _ => In.ExecutionSucceeded(executed.output, replyTo)
}
case Failure(exception) =>
ctx.log.warn("{}: execution failed due to {}", self, exception.getMessage)
In.ExecutionFailed(exception.getMessage, replyTo)
Expand Down Expand Up @@ -1103,26 +1102,27 @@ object ClusterSystem {
val node = cluster.selfMember
val cfg = ctx.system.settings.config

if node hasRole "worker" then
if (node hasRole "worker") {
val numberOfWorkers = Try(cfg.getInt("transformation.workers-per-node")).getOrElse(50)
// actor that sends StartExecution message to local Worker actors in a round robin fashion
val workerRouter = ctx.spawn(
behavior = Routers
.pool(numberOfWorkers) {
Behaviors
.supervise(Worker().narrow[StartExecution])
.onFailure(SupervisorStrategy.restart)
}
.withRoundRobinRouting(),
name = "worker-router"
behavior = Routers
.pool(numberOfWorkers) {
Behaviors
.supervise(Worker().narrow[StartExecution])
.onFailure(SupervisorStrategy.restart)
}
.withRoundRobinRouting(),
name = "worker-router"
)
// actors are registered to the ActorSystem receptionist using a special ServiceKey.
// All remote worker-routers will be registered to ClusterBootstrap actor system receptionist.
// When the "worker" node starts it registers the local worker-router to the Receptionist which is cluster-wide
// As a result "master" node can have access to remote worker-router and receive any updates about workers through worker-router
ctx.system.receptionist ! Receptionist.Register(Worker.WorkerRouterKey, workerRouter)

if node hasRole "master" then
}

if (node hasRole "master") {
given system: ActorSystem[Nothing] = ctx.system

given ec: ExecutionContextExecutor = ctx.executionContext
Expand All @@ -1139,21 +1139,21 @@ object ClusterSystem {
name = s"load-balancer-$n"
)
}

val route =
pathPrefix("lang" / Segment) { lang =>
post {
entity(as[String]) { code =>
val loadBalancer = Random.shuffle(loadBalancers).head
val asyncResponse = loadBalancer
.ask[ExecutionResult](StartExecution(code, lang, _))
.map(_.value)
.recover(_ => "something went wrong")

complete(asyncResponse)
}
}
}
}

val route =
pathPrefix("lang" / Segment) { lang =>
post {
entity(as[String]) { code =>
val loadBalancer = Random.shuffle(loadBalancers).head
val asyncResponse = loadBalancer
.ask[ExecutionResult](StartExecution(code, lang, _))
.map(_.value)
.recover(_ => "something went wrong")
complete(asyncResponse)
}
}
}

val host = Try(cfg.getString("http.host")).getOrElse("0.0.0.0")
val port = Try(cfg.getInt("http.port")).getOrElse(8080)
Expand Down Expand Up @@ -1332,11 +1332,12 @@ object Simulator extends App {

private def percentile(data: ArrayBuffer[Long], p: Double): Long =
if data.isEmpty then 0
else
else {
val sortedData = data.sorted
val k = (sortedData.length * (p / 100.0)).ceil.toInt - 1

sortedData(k)
}

enum Code(val value: String) {
case MemoryIntensive extends Code(Python.MemoryIntensive)
Expand Down Expand Up @@ -1465,8 +1466,3 @@ The design choices made in this project ensure that our remote code execution en
Building this distributed system with Scala 3 and Apache Pekko has been an enlightening experience. We've harnessed the power of actor-based concurrency, cluster management, and containerization to create a resilient and secure remote code execution engine. This project exemplifies how modern technologies can be integrated to solve complex problems in a scalable and efficient manner.

Whether you're looking to implement a similar system or seeking insights into distributed computing with Scala and Pekko, we hope this blog post has provided valuable knowledge and inspiration.

Additionally, you can check out:
- [Video demo](https://www.youtube.com/watch?v=sMlJC7Kr330) which includes running the `Simulator.scala`

Thank you for following along!

0 comments on commit f4af4e5

Please sign in to comment.