Skip to content

Commit

Permalink
c2 signaling via websockets for webrtc
Browse files Browse the repository at this point in the history
  • Loading branch information
pschichtel committed Oct 5, 2024
1 parent 35e1917 commit 45b8577
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 6 deletions.
6 changes: 4 additions & 2 deletions app/Banana4Loader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Banana4Components(context: ApplicationLoader.Context)
private val youtubeService = new YoutubeService(configuration, executionContext)
private val twitchService = new TwitchService(configuration, wsClient, executionContext)
private val searchIndexService = new SearchIndex
private val ld56MasterServer = new Ld56MasterServer
//private val ld56MasterServer = new Ld56MasterServer

// ActionBuilders
private val cached = new Cached(defaultCacheApi)
Expand All @@ -41,12 +41,14 @@ class Banana4Components(context: ApplicationLoader.Context)
private val errorHandler = new ErrorHandler(environment, configuration, devContext.map(_.sourceMapper), Some(router))
private val blogController = new BlogController(cached, githubService, tumblrService, ldjamService, youtubeService, twitchService, searchIndexService, executionContext, controllerComponents)
private val mainController = new MainController(cached, githubService, tumblrService, ldjamService, youtubeService, twitchService, searchIndexService, executionContext, controllerComponents)
private val ld56C2Controller = new Ld56C2Controller(controllerComponents)(actorSystem, materializer)

// The router
override def router: Router = new _root_.router.Routes(
errorHandler,
mainController,
blogController,
assets
assets,
ld56C2Controller,
)
}
149 changes: 149 additions & 0 deletions app/controllers/Ld56C2Controller.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package controllers

import org.apache.pekko.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props}
import org.apache.pekko.stream.Materializer
import play.api.Logger
import play.api.libs.json.{Format, JsError, JsObject, JsResult, JsString, JsValue, Json}
import play.api.libs.streams.ActorFlow
import play.api.mvc.{ControllerComponents, WebSocket}

import java.net.InetAddress
import java.time.{Duration, Instant}
import java.util.UUID
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import scala.collection.mutable
import scala.math.Numeric.Implicits.infixNumericOps
import scala.math.Ordering.Implicits.infixOrderingOps

private val logger = Logger(classOf[Ld56C2Controller])

final case class JoinRequestMessage(id: UUID, offer: String)
object JoinRequestMessage {
implicit val format: Format[JoinRequestMessage] = Json.format
}

final case class JoinAcceptMessage(answer: String)
object JoinAcceptMessage {
implicit val format: Format[JoinAcceptMessage] = Json.format
}

sealed trait HosterMessage
final case class HostingMessage(playerCount: Int) extends HosterMessage
final case class HostAcceptsJoinMessage(id: UUID, answer: String) extends HosterMessage


object HosterMessage {
implicit val hostingFormat: Format[HostingMessage] = Json.format
implicit val hostAcceptsJoinMessageFormat: Format[HostAcceptsJoinMessage] = Json.format
implicit val hosterMessageFormat: Format[HosterMessage] = Json.format
}

sealed trait JoinerMessage
final case class JoinMessage(offer: String) extends JoinerMessage

object JoinerMessage {
implicit val joinFormat: Format[JoinMessage] = Json.format
implicit val joinerFormat: Format[JoinerMessage] = Json.format
}

final case class GameHost(id: UUID, playerCount: Int, lastUpdated: Instant)

class Ld56C2Controller(cc: ControllerComponents)(implicit system: ActorSystem, mat: Materializer) {
private val hosterConnections = ConcurrentHashMap[UUID, ActorRef]()
private val joinerConnections = ConcurrentHashMap[UUID, ActorRef]()
private val hosts = ConcurrentHashMap[UUID, GameHost]()

def signalHost(id: String) = WebSocket.accept[String, String] { request =>
val clientId = UUID.fromString(id)
ActorFlow.actorRef { out => Props(HostHandler(out, hosterConnections, joinerConnections, clientId, request.connection.remoteAddress, hosts)) }
}

def signalJoin(id: String) = WebSocket.accept[String, String] { request =>
val clientId = UUID.fromString(id)
ActorFlow.actorRef { out => Props(JoinHandler(out, hosterConnections, joinerConnections, clientId, request.connection.remoteAddress, hosts)) }
}
}

abstract class SignalActor(val out: ActorRef, val id: UUID, val remote: InetAddress, private val connections: ConcurrentMap[UUID, ActorRef]) extends Actor {

override def preStart(): Unit = {
logger.info(s"$id - $remote - Connected!")
connections.put(id, out)
}

final def receive = {
case msg: String =>
logger.info(s"$id - $remote - Received: $msg")
receiveText(msg)
}

def receiveText(text: String): Unit

override def postStop() = {
logger.info(s"$id - $remote - Connection closed!")
connections.remove(id)
}
}

class HostHandler(out: ActorRef,
hosters: ConcurrentMap[UUID, ActorRef],
private val joiners: ConcurrentMap[UUID, ActorRef],
id: UUID,
remote: InetAddress,
private val hosts: ConcurrentMap[UUID, GameHost]) extends SignalActor(out, id, remote, hosters) {
override def receiveText(text: String) = {
try
Json.parse(text).as[HosterMessage] match
case HostingMessage(playerCount) =>
hosts.put(id, GameHost(id, playerCount, Instant.now()))
case controllers.HostAcceptsJoinMessage(id, answer) =>
val joiner = joiners.get(id)
if (joiner != null) {
joiner ! Json.toJson(JoinAcceptMessage(answer)).toString
}
catch
case e: Exception =>
logger.error("Kaputt", e)
}

override def postStop(): Unit = {
super.postStop()
hosts.remove(id)
}
}

class JoinHandler(out: ActorRef,
private val hosters: ConcurrentMap[UUID, ActorRef],
joiners: ConcurrentMap[UUID, ActorRef],
id: UUID,
remote: InetAddress,
private val hosts: ConcurrentMap[UUID, GameHost]) extends SignalActor(out, id, remote, joiners) {
override def receiveText(text: String) = {
logger.info(Json.toJson[JoinerMessage](JoinMessage("SOME OFFER")).toString)
Json.parse(text).as[JoinerMessage] match
case JoinMessage(offer) =>
val it = hosts.entrySet().iterator()
var latestUpdate = Instant.MIN
var latestHost: GameHost = null
while (it.hasNext) {
val entry = it.next()
val host = entry.getValue
if (host.lastUpdated < Instant.now().minus(Duration.ofSeconds(1000))) {
val actor = hosters.remove(entry.getKey)

it.remove()
actor ! PoisonPill
} else {
if (host.lastUpdated > latestUpdate) {
latestUpdate = host.lastUpdated
latestHost = host
}
}
}
if (latestHost != null) {
hosters.get(latestHost.id) ! Json.toJson(JoinRequestMessage(id, offer)).toString
} else {
logger.warn("No host available!")
}
}
}
3 changes: 0 additions & 3 deletions app/service/Ld56MasterServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import play.api.libs.json.*
import java.net.{InetSocketAddress, SocketAddress}
import java.nio.ByteBuffer
import java.nio.channels.DatagramChannel
import java.nio.charset.StandardCharsets
import java.time.{Duration, Instant}
import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.math.Ordering.Implicits.infixOrderingOps
import scala.util.Random

sealed trait ResponseMessage
final case class HostResponseMessage(host: String, port: Int) extends ResponseMessage
Expand Down Expand Up @@ -63,7 +61,6 @@ class Ld56MasterServer {
private val channel = DatagramChannel.open()
channel.bind(InetSocketAddress(39875))
private val readBuffer = ByteBuffer.allocateDirect(8196)
private val writeBuffer = ByteBuffer.allocateDirect(8196)
private val hostMap = ConcurrentHashMap[String, HostingHost]()
logger.info("LD56 C2 Server listening!")

Expand Down
5 changes: 4 additions & 1 deletion conf/routes
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ GET /search controllers.MainController.search(q
# Map static resources from the /public folder to the /assets URL path
GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset)

GET /dev controllers.MainController.dev()
GET /dev controllers.MainController.dev()

GET /ld56/signal/:id/host controllers.Ld56C2Controller.signalHost(id)
GET /ld56/signal/:id/join controllers.Ld56C2Controller.signalJoin(id)

0 comments on commit 45b8577

Please sign in to comment.