Skip to content
This repository has been archived by the owner. It is now read-only.

Commit

Permalink
Merge pull request #72 from fomkin/issue-71
Browse files Browse the repository at this point in the history
Fix #71: Korolev initialization hangs
  • Loading branch information
fomkin authored Feb 12, 2017
2 parents 9685033 + fef2720 commit bae798a
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 22 deletions.
8 changes: 4 additions & 4 deletions bin/travis_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

if [[ $TRAVIS_SCALA_VERSION == *"2.12"* ]]
then
sbt ++$TRAVIS_SCALA_VERSION -Dfile.encoding=UTF8 -J-XX:MaxPermSize=1024M test
./bin/sauce_setup.sh
./bin/sauce_start.sh
sbt ++$TRAVIS_SCALA_VERSION integration-tests/run
sbt ++$TRAVIS_SCALA_VERSION -Dfile.encoding=UTF8 -J-XX:MaxPermSize=1024M test && \
./bin/sauce_setup.sh && \
./bin/sauce_start.sh && \
sbt ++$TRAVIS_SCALA_VERSION integration-tests/run && \
./bin/sauce_stop.sh
else
sbt ++$TRAVIS_SCALA_VERSION -Dfile.encoding=UTF8 -J-XX:MaxPermSize=1024M test
Expand Down
12 changes: 6 additions & 6 deletions bridge/src/main/scala/bridge/JSAccess.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.concurrent.atomic.AtomicInteger

import korolev.Async

import scala.collection.mutable
import scala.language.higherKinds
import scala.util.{Failure, Success}

Expand Down Expand Up @@ -45,9 +46,9 @@ abstract class JSAccess[F[+_]: Async] { self ⇒
* List of promises of requests. Resolves by
* income messages
*/
@volatile protected var promises = Map.empty[Int, Async.Promise[F, Any]]
protected def promises: mutable.Map[Int, Async.Promise[F, Any]]

@volatile protected var callbacks = Map.empty[String, Any Unit]
protected def callbacks: mutable.Map[String, Any Unit]

/**
* Abstract method sends message to remote page
Expand All @@ -65,8 +66,7 @@ abstract class JSAccess[F[+_]: Async] { self ⇒
def request[A](args: Any*): F[A] = {
val promise = Async[F].promise[Any]
val requestId = lastReqId.getAndIncrement()
val pair = (requestId, promise)
promises += pair
promises.put(requestId, promise)

sendRequest(Request(requestId, Seq(requestId) ++ args, promise))

Expand Down Expand Up @@ -146,6 +146,7 @@ abstract class JSAccess[F[+_]: Async] { self ⇒
}
promises -= reqId
case None
println(s"Promise for $reqId not found")
}
}

Expand All @@ -155,8 +156,7 @@ abstract class JSAccess[F[+_]: Async] { self ⇒

def registerCallback[T](f: T Unit): F[JSObj[F]] = {
val callbackId = s"^cb${lastCallbackId.getAndIncrement()}"
val pair = (callbackId, f.asInstanceOf[Any Unit])
callbacks += pair
callbacks.put(callbackId, f.asInstanceOf[Any Unit])
request("registerCallback", callbackId)
}

Expand Down
5 changes: 5 additions & 0 deletions bridge/src/test/scala/bridge/JSAccessSuite.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package bridge

import korolev.Async.Promise
import utest._

import scala.collection.concurrent.TrieMap
import scala.collection.mutable
import scala.concurrent.Future
import scala.util.{Failure, Success}

Expand All @@ -15,6 +18,8 @@ object JSAccessSuite extends TestSuite {
class TestJSAccess extends JSAccess[Future] {

var outgoing = List.empty[Seq[Any]]
protected val promises = mutable.Map.empty[Int, Promise[Future, Any]]
protected val callbacks = mutable.Map.empty[String, (Any) => Unit]

def receive(msg: Any*): Unit = {
val reqId = msg(0).asInstanceOf[Int]
Expand Down
25 changes: 25 additions & 0 deletions integration-tests/src/main/resources/static/debug-console.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
window.document.addEventListener("DOMContentLoaded", function() {
Bridge.setProtocolDebugEnabled(true);
var display = document.createElement("pre");
display.innerHTML = "<div><strong>Client log</strong></div>"
document.body.appendChild(display);
console.log = function() {
var line = document.createElement("div");
for (var i = 0; i < arguments.length; i++)
line.textContent = arguments[i];
display.appendChild(line);
}
console.error = function() {
var line = document.createElement("div");
line.setAttribute("style", "color: red");
for (var i = 0; i < arguments.length; i++)
line.textContent = arguments[i];
display.appendChild(line);
}
document.body.addEventListener('click', function(e) {
console.log('! click on ' + e.target);
});
window.onerror = function(e) {
console.error(e);
};
});
3 changes: 2 additions & 1 deletion integration-tests/src/main/scala/gp/GuineaPigServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ object GuineaPigServer {
'href /= "/main.css",
'rel /= "stylesheet",
'type /= "text/css"
)
),
'script('src /= "/debug-console.js")
),
render = {
case state =>
Expand Down
3 changes: 3 additions & 0 deletions integration-tests/src/main/scala/gp/GuineaPigTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package gp

import korolev.blazeServer._
import org.openqa.selenium.By
import slogging.{LoggerConfig, SLF4JLoggerFactory}
import tools._

import scala.collection.JavaConverters._
import scala.concurrent.duration._

object GuineaPigTests extends App {

LoggerConfig.factory = SLF4JLoggerFactory()

val server = {

import korolev.blazeServer.defaultExecutor
Expand Down
6 changes: 5 additions & 1 deletion korolev/src/test/scala/Issue14Spec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ import bridge.JSAccess
import korolev.Effects.Event
import korolev._
import org.scalatest.{FlatSpec, Matchers}

import RunNowExecutionContext.instance
import korolev.Async.Promise

import scala.collection.mutable
import scala.concurrent.Future

/**
Expand All @@ -20,6 +22,8 @@ class Issue14Spec extends FlatSpec with Matchers {

val jSAccess = new JSAccess {
def send(args: Seq[Any]): Unit = {}
protected val promises = mutable.Map.empty[Int, Promise[Future, Any]]
protected val callbacks = mutable.Map.empty[String, (Any) => Unit]
implicit val executionContext = RunNowExecutionContext.instance
}

Expand Down
27 changes: 17 additions & 10 deletions server/src/main/scala/korolev/server/JsonQueuedJsAccess.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package korolev.server

import java.util.concurrent.ConcurrentLinkedQueue

import bridge.JSAccess
import korolev.Async
import korolev.Async.Promise

import scala.annotation.switch
import scala.collection.immutable.Queue
import scala.collection.concurrent.TrieMap
import scala.collection.mutable
import scala.language.higherKinds

/**
* @author Aleksey Fomkin <[email protected]>
*/
case class JsonQueuedJsAccess[F[+_]: Async](sendJson: String => Unit) extends JSAccess[F] {

@volatile var queue = Queue.empty[String]
protected val promises = TrieMap.empty[Int, Promise[F, Any]]
protected val callbacks = TrieMap.empty[String, (Any) => Unit]
val queue = new ConcurrentLinkedQueue[String]()

def escape(sb: StringBuilder, s: String, unicode: Boolean): Unit = {
sb.append('"')
Expand Down Expand Up @@ -58,18 +63,20 @@ case class JsonQueuedJsAccess[F[+_]: Async](sendJson: String => Unit) extends JS
*/
def send(args: Seq[Any]): Unit = {
val message = seqToJSON(args)
queue = queue.enqueue(message)
queue.add(message)
}


override def flush(): Unit = {
val rawRequests = synchronized {
val items = queue
queue = Queue.empty
items
val buffer = mutable.Buffer.empty[String]
while (!queue.isEmpty) {
buffer += queue.poll()
}
if (buffer.size == 1) {
sendJson(buffer.head)
}
if (rawRequests.nonEmpty) {
val requests = rawRequests.mkString(",")
else if (buffer.nonEmpty) {
val requests = buffer.mkString(",")
sendJson(s"""["batch",$requests]""")
}
}
Expand Down

0 comments on commit bae798a

Please sign in to comment.