Skip to content

Commit

Permalink
#374 Pass structures to aqua run as JSON from an argument or a file (
Browse files Browse the repository at this point in the history
  • Loading branch information
DieMyst authored Dec 10, 2021
1 parent 3e762d6 commit 71ea87a
Show file tree
Hide file tree
Showing 11 changed files with 520 additions and 186 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ val circeVersion = "0.14.1"
name := "aqua-hll"

val commons = Seq(
baseAquaVersion := "0.5.0",
baseAquaVersion := "0.5.1",
version := baseAquaVersion.value + "-" + sys.env.getOrElse("BUILD_NUMBER", "SNAPSHOT"),
scalaVersion := dottyVersion,
libraryDependencies ++= Seq(
Expand Down
40 changes: 40 additions & 0 deletions cli/.js/src/main/scala/aqua/builder/ArgumentGetter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package aqua.builder

import aqua.js.{CallJsFunction, CallServiceHandler, FluencePeer}
import aqua.model.func.Call
import aqua.model.func.raw.CallServiceTag
import aqua.model.{LiteralModel, VarModel}

import scala.concurrent.Promise

// Service that can return argument to use it from a code
case class ArgumentGetter(serviceId: String, value: VarModel, arg: scalajs.js.Dynamic)
extends ServiceFunction {

def registerService(peer: FluencePeer): CallServiceHandler = {
CallJsFunction.registerService(
peer,
serviceId,
value.name,
_ => arg
)
}

def callTag(): CallServiceTag = {
CallServiceTag(
LiteralModel.quote(serviceId),
value.name,
Call(List.empty, List(Call.Export(value.name, value.`type`)))
)
}

}

object ArgumentGetter {

val ServiceId = "getDataSrv"

def apply(value: VarModel, arg: scalajs.js.Dynamic): ArgumentGetter = {
ArgumentGetter(ServiceId, value, arg)
}
}
35 changes: 35 additions & 0 deletions cli/.js/src/main/scala/aqua/builder/Console.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package aqua.builder

import aqua.io.OutputPrinter
import aqua.js.{CallJsFunction, CallServiceHandler, FluencePeer}
import aqua.model.func.Call
import aqua.model.func.raw.CallServiceTag
import aqua.model.{LiteralModel, VarModel}

import scala.scalajs.js.JSON

// Function to print any variables that passed as arguments
class Console(serviceId: String, fnName: String) extends ServiceFunction {

def callTag(variables: List[VarModel]): CallServiceTag = {
CallServiceTag(
LiteralModel.quote(serviceId),
fnName,
Call(variables, Nil)
)
}

def registerService(peer: FluencePeer): CallServiceHandler = {
CallJsFunction.registerUnitService(
peer,
serviceId,
fnName,
args => {
val str = JSON.stringify(args, space = 2)
// if an input function returns a result, our success will be after it is printed
// otherwise finish after JS SDK will finish sending a request
OutputPrinter.print(str)
}
)
}
}
43 changes: 43 additions & 0 deletions cli/.js/src/main/scala/aqua/builder/Finisher.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package aqua.builder

import aqua.io.OutputPrinter
import aqua.js.{CallJsFunction, FluencePeer}
import aqua.model.func.Call
import aqua.model.func.raw.CallServiceTag
import aqua.model.{LiteralModel, VarModel}

import scala.concurrent.Promise
import scala.scalajs.js.JSON

// Will finish promise on service call
case class Finisher private (
serviceId: String,
fnName: String,
promise: Promise[Unit]
) extends ServiceFunction {

def callTag(): CallServiceTag = {
CallServiceTag(
LiteralModel.quote(serviceId),
fnName,
Call(Nil, Nil)
)
}

def registerService(peer: FluencePeer) = {
CallJsFunction.registerUnitService(
peer,
serviceId,
fnName,
_ => {
promise.success(())
}
)
}
}

object Finisher {

def apply(serviceId: String, fnName: String) =
new Finisher(serviceId, fnName, Promise[Unit]())
}
7 changes: 7 additions & 0 deletions cli/.js/src/main/scala/aqua/builder/ServiceFunction.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package aqua.builder

import aqua.js.{CallServiceHandler, FluencePeer}

trait ServiceFunction {
def registerService(peer: FluencePeer): CallServiceHandler
}
27 changes: 22 additions & 5 deletions cli/.js/src/main/scala/aqua/js/CallJsFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,41 @@ import scala.scalajs.js.JSConverters.*

object CallJsFunction {

// Register a service that returns no result
def registerUnitService(
def registerService(
peer: FluencePeer,
serviceId: String,
fnName: String,
handler: (js.Array[js.Any]) => Unit
handler: js.Array[js.Any] => js.Dynamic
): CallServiceHandler = {
peer.internals.callServiceHandler.use((req, resp, next) => {
if (req.serviceId == serviceId && req.fnName == fnName) {
handler(req.args)
val result = handler(req.args)
resp.retCode = ResultCodes.success
resp.result = new js.Object {}
resp.result = result
}

next()
})
}

// Register a service that returns no result
def registerUnitService(
peer: FluencePeer,
serviceId: String,
fnName: String,
handler: js.Array[js.Any] => Unit
): CallServiceHandler = {
registerService(
peer,
serviceId,
fnName,
arr => {
handler(arr)
js.Dynamic.literal()
}
)
}

// Call a function with generated air script
def funcCallJs(
air: String,
Expand Down
77 changes: 77 additions & 0 deletions cli/.js/src/main/scala/aqua/run/FuncCaller.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package aqua.run

import aqua.LogLevelTransformer
import aqua.backend.FunctionDef
import aqua.builder.{Console, Finisher}
import aqua.io.OutputPrinter
import aqua.js.{CallJsFunction, Fluence, FluenceUtils, PeerConfig}
import aqua.run.RunCommand.createKeyPair
import cats.effect.{Resource, Sync}
import cats.effect.kernel.Async
import cats.syntax.applicative.*

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.scalajs.js.JSON

object FuncCaller {

/**
* Register services and call an air code with FluenceJS SDK.
* @param multiaddr relay to connect to
* @param air code to call
* @return
*/
def funcCall[F[_]: Async](
multiaddr: String,
air: String,
functionDef: FunctionDef,
config: RunConfig,
consoleService: Console,
finisherService: Finisher
)(implicit
ec: ExecutionContext
): F[Unit] = {
FluenceUtils.setLogLevel(LogLevelTransformer.logLevelToFluenceJS(config.logLevel))

// stops peer in any way at the end of execution
val resource = Resource.make(Fluence.getPeer().pure[F]) { peer =>
Async[F].fromFuture(Sync[F].delay(peer.stop().toFuture))
}

resource.use { peer =>
Async[F].fromFuture {
(for {
keyPair <- createKeyPair(config.secretKey)
_ <- Fluence
.start(
PeerConfig(
multiaddr,
config.timeout,
LogLevelTransformer.logLevelToAvm(config.logLevel),
keyPair.orNull
)
)
.toFuture
_ = OutputPrinter.print("Your peerId: " + peer.getStatus().peerId)
_ = consoleService.registerService(peer)
_ = finisherService.registerService(peer)
_ = config.argumentGetters.values.map(_.registerService(peer))
callFuture = CallJsFunction.funcCallJs(
air,
functionDef,
List.empty
)
_ <- Future.firstCompletedOf(finisherService.promise.future :: callFuture :: Nil)
} yield ()).recover(handleFuncCallErrors).pure[F]
}
}
}

private def handleFuncCallErrors: PartialFunction[Throwable, Unit] = { t =>
val message = if (t.getMessage.contains("Request timed out after")) {
"Function execution failed by timeout. You can increase timeout with '--timeout' option in milliseconds or check if your code can hang while executing."
} else JSON.stringify(t.toString)

OutputPrinter.error(message)
}
}
Loading

0 comments on commit 71ea87a

Please sign in to comment.