adapter){
+ registered.put(key, adapter);
+ return LazyFutureStream.of(adapter.stream());
+ }
/**
- * Register a Queue
+ * Register a Queue, and get back a listening LazyFutureStream optimized for CPU Bound operations
+ *
+ * Convert the LazyFutureStream to async mode to fan out operations across threads, after the first fan out operation definition
+ * it should be converted to sync mode
+ *
+ *
+ * {@code
+ * LazyFutureStream stream = Pipes.registerForCPU("test", QueueFactories.
+ boundedNonBlockingQueue(100)
+ .build());
+ stream.filter(it->it!=null)
+ .async()
+ .peek(this::process)
+ .sync()
+ .forEach(System.out::println);
+ *
+ * }
+ * @param key : Adapter identifier
+ * @param adapter
+ * @return LazyFutureStream from supplied Queue, optimisied for CPU bound operation
+ */
+ public static LazyFutureStream registerForCPU(Object key, Adapter adapter){
+ registered.put(key, adapter);
+ return Reactors.cpuReact.of(adapter.stream());
+ }
+ /**
+ * Register a Queue, and get back a listening LazyFutureStream optimized for IO Bound operations
+ *
+ *
+ * {@code
+ * LazyFutureStream stream = Pipes.registerForIO("test", QueueFactories.
+ boundedNonBlockingQueue(100)
+ .build());
+ stream.filter(it->it!=null)
+ .async()
+ .peek(this::load)
+ .sync()
+ .run(System.out::println);
+ *
+ * }
*
* @param key : Adapter identifier
* @param adapter
* @return LazyFutureStream from supplied Queue
*/
- public static LazyFutureStream register(Object key, Adapter adapter){
+ public static LazyFutureStream registerForIO(Object key, Adapter adapter){
registered.put(key, adapter);
- return LazyFutureStream.of(adapter.stream());
+ return Reactors.ioReact.of(adapter.stream());
}
/**
* @param key : Queue identifier
@@ -47,6 +108,22 @@ public static LazyFutureStream register(Object key, Adapter adapter){
public static LazyFutureStream stream(Object key){
return LazyFutureStream.of(((Adapter)registered.get(key)).stream());
}
+ /**
+ * @param key : Queue identifier
+ * @return LazyFutureStream that reads from specified Queue
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public static LazyFutureStream streamIOBound(Object key){
+ return Reactors.ioReact.of(((Adapter)registered.get(key)).stream());
+ }
+ /**
+ * @param key : Queue identifier
+ * @return LazyFutureStream that reads from specified Queue
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public static LazyFutureStream streamCPUBound(Object key){
+ return Reactors.cpuReact.of(((Adapter)registered.get(key)).stream());
+ }
public static void clear() {
registered.clear();
diff --git a/micro-reactive/src/test/java/app/reactive/pipes/com/aol/micro/server/ManyProducersToOneConsumerApp.java b/micro-reactive/src/test/java/app/reactive/pipes/com/aol/micro/server/ManyProducersToOneConsumerApp.java
index 0da5d3358..23659e8cd 100644
--- a/micro-reactive/src/test/java/app/reactive/pipes/com/aol/micro/server/ManyProducersToOneConsumerApp.java
+++ b/micro-reactive/src/test/java/app/reactive/pipes/com/aol/micro/server/ManyProducersToOneConsumerApp.java
@@ -15,10 +15,10 @@
public class ManyProducersToOneConsumerApp {
public static void main(String[] args){
- LazyFutureStream stream = Pipes.register("test", QueueFactories.
+ LazyFutureStream stream = Pipes.registerForIO("test", QueueFactories.
boundedNonBlockingQueue(100)
.build());
- stream.filter(it->it!=null).async().peek(System.out::println).run();
+ stream.filter(it->it!=null).async().peek(System.out::println).sync().run();
new MicroserverApp(()-> "simple-app").run();
}
diff --git a/micro-reactive/src/test/java/com/aol/micro/server/reactive/PipesTest.java b/micro-reactive/src/test/java/com/aol/micro/server/reactive/PipesTest.java
index b3c42e064..cc02cf152 100644
--- a/micro-reactive/src/test/java/com/aol/micro/server/reactive/PipesTest.java
+++ b/micro-reactive/src/test/java/com/aol/micro/server/reactive/PipesTest.java
@@ -1,16 +1,23 @@
package com.aol.micro.server.reactive;
-import static org.hamcrest.Matchers.*;
-import static org.junit.Assert.*;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import java.util.Arrays;
+import org.junit.Before;
import org.junit.Test;
import com.aol.simple.react.async.Queue;
+import com.aol.simple.react.stream.traits.LazyFutureStream;
public class PipesTest {
-
+ @Before
+ public void setup() {
+ Pipes.clear();
+ }
@Test
public void testGetAbsent() {
Pipes.clear();
@@ -29,5 +36,34 @@ public void testStream() {
Pipes.register("hello",queue);
assertThat(Pipes.stream("hello").limit(1).toList(),equalTo(Arrays.asList("world")));
}
-
+ @Test
+ public void testStreamIO() {
+ Queue queue = new Queue();
+ queue.add("world");
+ Pipes.register("hello",queue);
+ assertThat(Pipes.streamIOBound("hello").limit(1).toList(),equalTo(Arrays.asList("world")));
+ }
+ @Test
+ public void testStreamCPU() {
+ Queue queue = new Queue();
+ queue.add("world");
+ Pipes.register("hello",queue);
+ assertThat(Pipes.streamCPUBound("hello").limit(1).toList(),equalTo(Arrays.asList("world")));
+ }
+ @Test
+ public void cpuBound() {
+ Queue queue = new Queue();
+ LazyFutureStream stream = Pipes.registerForCPU("hello", queue);
+ queue.add("world");
+ assertTrue(Pipes.get("hello").isPresent());
+ assertThat(stream.limit(1).toList(),equalTo(Arrays.asList("world")));
+ }
+ @Test
+ public void ioBound() {
+ Queue queue = new Queue();
+ LazyFutureStream stream = Pipes.registerForIO("hello", queue);
+ queue.add("world");
+ assertTrue(Pipes.get("hello").isPresent());
+ assertThat(stream.limit(1).toList(),equalTo(Arrays.asList("world")));
+ }
}
diff --git a/micro-swagger/build.gradle b/micro-swagger/build.gradle
index 0e3441027..827eb6f3e 100644
--- a/micro-swagger/build.gradle
+++ b/micro-swagger/build.gradle
@@ -14,7 +14,7 @@ modifyPom {
inceptionYear '2015'
groupId 'com.aol.microservices'
- artifactId 'microserver-swagger'
+ artifactId 'micro-swagger'
version "$version"
diff --git a/micro-swagger/readme.md b/micro-swagger/readme.md
new file mode 100644
index 000000000..8b1f405bd
--- /dev/null
+++ b/micro-swagger/readme.md
@@ -0,0 +1,21 @@
+# Swagger plugin for Microserver
+
+[micro-swagger example apps](https://github.com/aol/micro-server/tree/master/micro-swagger/src/test/java/app/swagger/com/aol/micro/server)
+
+Also can run standalone outside of Microserver
+
+## To use
+
+Simply add to the classpath
+
+Maven
+
+
+ com.aol.microservices
+ micro-swagger
+ 0.62
+
+
+Gradle
+
+ compile 'com.aol.microservices:micro-swagger:0.62'
\ No newline at end of file