-
Notifications
You must be signed in to change notification settings - Fork 212
Example : async calculator
johnmcclean-aol edited this page Dec 13, 2015
·
7 revisions
A short microservices that asynchronously sums user inputs and returns values to all participating users once all required numbers for the sum have been input.
In the example below, 2 users participate in each sum - both will receive the answer when the last user enters their number
package async.calculator;
...
@Rest
@Path("/add")
public class Calculator implements Reactive{
static int numbersToSum = 2;
public static void main(String[] args){
//create an asynchronous Stream from data piped in across threads via a wait-free Queue
SequenceM<PTuple2<Integer,AsyncResponse>> stream =
Pipes.registerForSequential("add", new Queue<>());
//sum every group of entries of a set size in the Stream
stream.batchBySize(numbersToSum)
.map(Calculator::sum)
.peek(p-> p.v2()
.forEach(response->response.resume(p.v1())))
// have the Stream start processing on another thread
.hotStream(Executors.newSingleThreadExecutor());
//start a microservice on port 8080 with context 'calc'
new MicroserverApp(()->"calc").run();
}
private static PTuple2<String,SequenceM<AsyncResponse>> sum(
Collection<PTuple2<Integer,AsyncResponse>> toSum){
//sum the numbers in the provided collection
int sum = SequenceM.fromIterable(toSum)
.map(v->v.v1())
.reduce(Reducers.toTotalInt());
//create a string representation of the sum
String calc =SequenceM.fromIterable(toSum)
.map(v->v.v1())
.join("+");
SequenceM<AsyncResponse> toRespond = SequenceM.fromIterable(toSum)
.map(v->v.v2());
return PowerTuples.tuple(calc+"="+sum, toRespond);
}
@GET
public void calc(@QueryParam(value = "num") int number,
@Suspended AsyncResponse response){
//push all user input into our summing Stream via a pipe
Pipes.get("add")
.ifPresent(q->q.offer(PowerTuples.tuple(number,response)));
}
}