Skip to content

Commit

Permalink
Merge pull request #341 from aol/reactiveAsync
Browse files Browse the repository at this point in the history
Support async NIO requests via reactive-streams
  • Loading branch information
johnmcclean authored May 15, 2017
2 parents 6ce6304 + 456a7c4 commit 45c1860
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package app.publisher.binder.direct;

import com.aol.micro.server.MicroserverApp;
import com.aol.micro.server.config.Microserver;
import com.aol.micro.server.module.ConfigurableModule;
import com.aol.micro.server.rest.jersey.AsyncBinder;
import com.aol.micro.server.testing.RestAgent;
import org.glassfish.jersey.server.ResourceConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.ExecutionException;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

@Microserver
public class AsyncPublisherTest {
RestAgent rest = new RestAgent();
MicroserverApp server;

@Before
public void startServer() {

server = new MicroserverApp(() -> "binder");
server.start();

}

@After
public void stopServer() {
server.stop();
}

@Test
public void runAppAndBasicTest() throws InterruptedException, ExecutionException {
assertThat(rest.get("http://localhost:8080/binder/test/myEndPoint"), is("hello world!"));
}
@Test
public void runAppAndBasicTest2() throws InterruptedException, ExecutionException {
assertThat(rest.get("http://localhost:8080/binder/test/async2"), is("hello"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package app.publisher.binder.direct;

import com.aol.micro.server.auto.discovery.Rest;
import cyclops.async.Future;
import cyclops.stream.ReactiveSeq;
import cyclops.stream.Spouts;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import java.util.concurrent.Executors;
import java.util.stream.Stream;


@Rest
@Path("/test")
public class AsyncResource {

private void sleep() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@GET
@Path("myEndPoint")
public Future<String> myEndPoint() {
return Future.ofSupplier(() -> {
sleep();
return "hello world!";
}, Executors.newFixedThreadPool(1));
}

@GET
@Path("async2")
public ReactiveSeq<String> async2() {
return Spouts.publishOn(Stream.of("hello"), Executors.newFixedThreadPool(1));
}


}
28 changes: 28 additions & 0 deletions micro-jersey/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,32 @@ Maven
Gradle
```groovy
compile 'com.aol.microservices:micro-jersey:x.yz'
```

## Baked in async NIO based REST

Return any reactive-streams Publisher from your REST end point to make them execute asynchronously automatically.

E.g. Using Future from [cyclops-react](cyclops-react.io)
```java
@GET
public Future<String> myEndPoint(){
return Future.ofSupplier(()->{
sleep();
return "hello world!";
}, Executors.newFixedThreadPool(1));
}
```

Would be equivalent to the following code

```java
@GET
public void myEndPoint(@Suspended AsyncResponse asyncResponse){
Future.ofSupplier(()->{
sleep();
asyncResponse.resume("hello world!");
return 1;
}, Executors.newFixedThreadPool(1));
}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.aol.micro.server.rest.jersey;

import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.spi.internal.ResourceMethodDispatcher;
import org.glassfish.jersey.server.spi.internal.ResourceMethodInvocationHandlerProvider;

import javax.inject.Singleton;


public class AsyncBinder extends AbstractBinder {

@Override
protected void configure() {
bind(AsyncDispatcher.AsyncDispatcherProvider.class).to(
ResourceMethodDispatcher.Provider.class).in(Singleton.class)
.ranked(1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.aol.micro.server.rest.jersey;

import cyclops.stream.ReactiveSeq;
import cyclops.stream.Spouts;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.glassfish.hk2.api.ServiceHandle;
import org.glassfish.hk2.api.ServiceLocator;
import org.glassfish.jersey.server.ContainerRequest;
import org.glassfish.jersey.server.internal.LocalizationMessages;
import org.glassfish.jersey.server.internal.inject.ConfiguredValidator;
import org.glassfish.jersey.server.internal.process.AsyncContext;
import javax.inject.Provider;

import org.glassfish.jersey.server.model.Invocable;
import org.glassfish.jersey.server.spi.internal.ResourceMethodDispatcher;
import org.reactivestreams.Publisher;


import javax.ws.rs.ProcessingException;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import java.lang.reflect.InvocationHandler;
import java.util.Set;
import java.util.stream.Collectors;

public class AsyncDispatcher implements ResourceMethodDispatcher {

private final ResourceMethodDispatcher originalDispatcher;

@Context
private javax.inject.Provider<AsyncContext> asyncContext;
@Context
private javax.inject.Provider<ContainerRequestContext> containerRequestContext;


public AsyncDispatcher(ResourceMethodDispatcher originalDispatcher) {
this.originalDispatcher = originalDispatcher;
}

@AllArgsConstructor
@NoArgsConstructor
static class AsyncDispatcherProvider implements Provider{
@Context
private ServiceLocator serviceLocator;
@Override
public ResourceMethodDispatcher create(Invocable method, InvocationHandler handler, ConfiguredValidator validator) {
final Class<?> returnType = method.getHandlingMethod().getReturnType();
if(Publisher.class.isAssignableFrom(returnType)){
Set<Provider> providers = serviceLocator.getAllServiceHandles(ResourceMethodDispatcher.Provider.class)
.stream()
.filter(h->!h.getActiveDescriptor()
.getImplementationClass()
.equals(AsyncDispatcherProvider.class))
.map(ServiceHandle::getService)
.collect(Collectors.toSet());

for (ResourceMethodDispatcher.Provider provider : providers) {
ResourceMethodDispatcher dispatcher = provider.create(method, handler, validator);
if (dispatcher != null) {
AsyncDispatcher result = new AsyncDispatcher(dispatcher);
serviceLocator.inject(result);
return result;
}
}

}
return null;
}
}
@Override
public Response dispatch(Object resource, ContainerRequest request) throws ProcessingException {
final AsyncContext context = this.asyncContext.get();
if(!context.suspend())
throw new ProcessingException(LocalizationMessages.ERROR_SUSPENDING_ASYNC_REQUEST());
final ContainerRequestContext requestContext = containerRequestContext.get();

Publisher pub = (Publisher)originalDispatcher.dispatch(resource, request)
.getEntity();
Spouts.from(pub).onEmptySwitch(()->Spouts.of(Response.noContent().build()))
.forEach(1,context::resume, context::resume);

return null;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public JerseyRestApplication(List<Object> allResources,List<String> packages, Li
register(next.getClass());
}
}
register(new AsyncBinder());

if (serverProperties.isEmpty()) {
property(ServerProperties.BV_SEND_ERROR_IN_RESPONSE, true);
Expand Down
2 changes: 2 additions & 0 deletions micro-reactive/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

The micro-reactive plugin integrates [cyclops-react](https://github.com/aol/cyclops-react) and [Pivotal Reactor](http://projectreactor.io/) to provide a very rich integrated reactive programming environment on top of Spring.

*NB* Microserver's Jersey plugin already makes Publisher a valid return type, converts them to asynchronously executing REST End points

Why?

cyclops-react offers a range of functional datatypes and datastructures, many of which act as reactive-streams Publishers /subscribers. Pivotal Reactor offer advanced / specialized processing capabilities for reactive-streams Publishers and subscribers.
Expand Down
29 changes: 29 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,35 @@ See the response *hello world!*

Add plugins by adding them to your build file - rerun the app to get new end points, Spring beans and more!

## Easy to use async NIO based REST

Return any reactive-streams Publisher from your REST end point to make them execute asynchronously automatically.

E.g. Using Future from [cyclops-react](cyclops-react.io)

```java
@GET
public Future<String> myEndPoint(){
return Future.ofSupplier(()->{
sleep();
return "hello world!";
}, Executors.newFixedThreadPool(1));
}
```

Would be equivalent to the following code

```java
@GET
public void myEndPoint(@Suspended AsyncResponse asyncResponse){
Future.ofSupplier(()->{
sleep();
asyncResponse.resume("hello world!");
return 1;
}, Executors.newFixedThreadPool(1));
}
```

# Why Microserver?

Microserver is a plugin engine for building Spring and Spring Boot based microservices. Microserver supports pure microservice and micro-monolith development styles. The micro-monolith style involves packaging multiple services into a single deployment - offering developers the productivity of microservice development without the operational risk. This can help teams adopt a Microservices architecture on projects that are currently monoliths.
Expand Down

0 comments on commit 45c1860

Please sign in to comment.