diff --git a/gradle.properties b/gradle.properties index aebd01dab..fadc1b36d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,8 +1,8 @@ version=0.63 -springVersion=4.2.0.RELEASE -jerseyVersion=2.19 -grizzlyVersion=2.3.21 -simpleReactVersion=0.98 -cyclopsVersion=5.1.0 -jacksonVersion=2.5.2 +springVersion=4.2.2.RELEASE +jerseyVersion=2.21 +grizzlyVersion=2.3.23 +simpleReactVersion=0.99.3 +cyclopsVersion=6.0.2 +jacksonVersion=2.6.3 diff --git a/micro-application-register/src/main/java/com/aol/micro/server/application/registry/ApplicationRegisterImpl.java b/micro-application-register/src/main/java/com/aol/micro/server/application/registry/ApplicationRegisterImpl.java index 468998b18..9eb95d11a 100644 --- a/micro-application-register/src/main/java/com/aol/micro/server/application/registry/ApplicationRegisterImpl.java +++ b/micro-application-register/src/main/java/com/aol/micro/server/application/registry/ApplicationRegisterImpl.java @@ -9,10 +9,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import com.aol.cyclops.lambda.utils.ExceptionSoftener; +import com.aol.cyclops.invokedynamic.ExceptionSoftener; import com.aol.micro.server.servers.ApplicationRegister; import com.aol.micro.server.servers.model.ServerData; @@ -38,7 +37,7 @@ public void register(ServerData[] data) { .getModule().getContext(), null)).collect(Collectors.toList())); logger.info("Registered application {} ", application); } catch (UnknownHostException e) { - ExceptionSoftener.singleton.factory.getInstance().throwSoftenedException(e); + ExceptionSoftener.throwSoftenedException(e); } } } diff --git a/micro-application-register/src/main/java/com/aol/micro/server/application/registry/ServiceRegistryResource.java b/micro-application-register/src/main/java/com/aol/micro/server/application/registry/ServiceRegistryResource.java index c533f7410..6258a0b62 100644 --- a/micro-application-register/src/main/java/com/aol/micro/server/application/registry/ServiceRegistryResource.java +++ b/micro-application-register/src/main/java/com/aol/micro/server/application/registry/ServiceRegistryResource.java @@ -42,7 +42,7 @@ public ServiceRegistryResource(Cleaner cleaner, Finder finder, Register register @Path("/list") @Produces("application/json") public void list(@Suspended AsyncResponse response) { - this.ioStream().of(this).forEach(next -> { + this.ioStreamBuilder().of(this).forEach(next -> { try{ cleaner.clean(); response.resume(finder.find()); @@ -59,7 +59,7 @@ public void list(@Suspended AsyncResponse response) { @Consumes("application/json") @Produces("application/json") public void schedule(@Suspended AsyncResponse response) { - this.ioStream().of(this).forEach(next -> { + this.ioStreamBuilder().of(this).forEach(next -> { try{ job.schedule(); response.resume(HashMapBuilder.of("status", "success")); @@ -76,7 +76,7 @@ public void schedule(@Suspended AsyncResponse response) { @Consumes("application/json") @Produces("application/json") public void register(@Suspended AsyncResponse response,RegisterEntry entry) { - this.ioStream().of(this).forEach(next -> { + this.ioStreamBuilder().of(this).forEach(next -> { try{ register.register(entry); response.resume(HashMapBuilder.of("status", "complete")); diff --git a/micro-application-register/src/test/java/app/registry/com/aol/micro/server/RegistryResource.java b/micro-application-register/src/test/java/app/registry/com/aol/micro/server/RegistryResource.java index d4b13131e..3a435c630 100644 --- a/micro-application-register/src/test/java/app/registry/com/aol/micro/server/RegistryResource.java +++ b/micro-application-register/src/test/java/app/registry/com/aol/micro/server/RegistryResource.java @@ -11,7 +11,7 @@ import org.springframework.stereotype.Component; -import com.aol.cyclops.lambda.monads.SequenceM; +import com.aol.cyclops.sequence.SequenceM; import com.aol.micro.server.auto.discovery.RestResource; import com.aol.micro.server.testing.RestAgent; import com.aol.simple.react.stream.simple.SimpleReact; @@ -34,11 +34,12 @@ public class RegistryResource implements RestResource{ @Produces("text/plain") public void expensive(@Suspended AsyncResponse asyncResponse){ - LazyFutureStream.ofIterable(urls) + LazyFutureStream.lazyFutureStreamFromIterable(urls) .then(it->client.get(it)) .onFail(it -> "") .peek(it -> System.out.println(it)) + .convertToSimpleReact() .allOf(data -> { System.out.println(data); return asyncResponse.resume(SequenceM.fromIterable(data).join(";")); }); diff --git a/micro-boot/src/main/java/com/aol/micro/server/boot/config/MicrobootApp.java b/micro-boot/src/main/java/com/aol/micro/server/boot/config/MicrobootApp.java index 646cb98a4..96f92ea5f 100644 --- a/micro-boot/src/main/java/com/aol/micro/server/boot/config/MicrobootApp.java +++ b/micro-boot/src/main/java/com/aol/micro/server/boot/config/MicrobootApp.java @@ -12,8 +12,8 @@ import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; -import com.aol.cyclops.lambda.monads.SequenceM; -import com.aol.cyclops.lambda.utils.ExceptionSoftener; +import com.aol.cyclops.invokedynamic.ExceptionSoftener; +import com.aol.cyclops.sequence.SequenceM; import com.aol.micro.server.IncorrectNumberOfServersConfiguredException; import com.aol.micro.server.Plugin; import com.aol.micro.server.PluginLoader; @@ -35,8 +35,7 @@ public class MicrobootApp { private final List modules; private final CompletableFuture end = new CompletableFuture(); - private final ExceptionSoftener softener = ExceptionSoftener.singleton.factory - .getInstance(); + @Getter private final ApplicationContext springContext; @@ -91,7 +90,7 @@ private Class extractClass() { return Class.forName(new Exception().getStackTrace()[2] .getClassName()); } catch (ClassNotFoundException e) { - softener.throwSoftenedException(e); + ExceptionSoftener.throwSoftenedException(e); } return null; // unreachable normally } @@ -154,7 +153,7 @@ private void join(Thread thread) { thread.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - softener.throwSoftenedException(e); + ExceptionSoftener.throwSoftenedException(e); } } diff --git a/micro-boot/src/main/java/com/aol/micro/server/boot/config/MicrobootConfigurator.java b/micro-boot/src/main/java/com/aol/micro/server/boot/config/MicrobootConfigurator.java index 140e56fd6..42ccd4dc1 100644 --- a/micro-boot/src/main/java/com/aol/micro/server/boot/config/MicrobootConfigurator.java +++ b/micro-boot/src/main/java/com/aol/micro/server/boot/config/MicrobootConfigurator.java @@ -9,12 +9,11 @@ import org.pcollections.HashTreePMap; import org.pcollections.HashTreePSet; -import com.aol.cyclops.lambda.monads.SequenceM; +import com.aol.cyclops.sequence.SequenceM; import com.aol.micro.server.Plugin; import com.aol.micro.server.PluginLoader; import com.aol.micro.server.config.Config; import com.aol.micro.server.config.Configurer; - import com.nurkiewicz.lazyseq.LazySeq; public class MicrobootConfigurator implements Configurer{ diff --git a/micro-client/src/test/java/app/async/com/aol/micro/server/AsyncResource.java b/micro-client/src/test/java/app/async/com/aol/micro/server/AsyncResource.java index 893611f65..acdd08268 100644 --- a/micro-client/src/test/java/app/async/com/aol/micro/server/AsyncResource.java +++ b/micro-client/src/test/java/app/async/com/aol/micro/server/AsyncResource.java @@ -33,14 +33,16 @@ public class AsyncResource implements RestResource,Reactive{ @Produces("text/plain") public void expensive(@Suspended AsyncResponse asyncResponse){ - this.async(lr -> lr.fromStream(urls.stream() + this.ioStreamBuilder().fromStream(urls.stream() .>map(it -> client.get(it))) .onFail(it -> "") .peek(it -> System.out.println(it)) + .convertToSimpleReact() .allOf(data -> { System.out.println(data); - return asyncResponse.resume(String.join(";", (List)data)); })).run(); + return asyncResponse.resume(String.join(";", (List)data)); }) + .convertToLazyStream().run(); } diff --git a/micro-client/src/test/java/com/aol/micro/server/client/ClientModuleTest.java b/micro-client/src/test/java/com/aol/micro/server/client/ClientModuleTest.java index b22b56629..535603a48 100644 --- a/micro-client/src/test/java/com/aol/micro/server/client/ClientModuleTest.java +++ b/micro-client/src/test/java/com/aol/micro/server/client/ClientModuleTest.java @@ -7,7 +7,7 @@ import org.junit.Test; -import com.aol.cyclops.lambda.monads.SequenceM; +import com.aol.cyclops.sequence.SequenceM; import com.aol.micro.server.Plugin; import com.aol.micro.server.module.ConfigurableModule; import com.aol.micro.server.module.Module; diff --git a/micro-core/src/main/java/com/aol/micro/server/MicroserverApp.java b/micro-core/src/main/java/com/aol/micro/server/MicroserverApp.java index e3d318fdf..586badc4b 100644 --- a/micro-core/src/main/java/com/aol/micro/server/MicroserverApp.java +++ b/micro-core/src/main/java/com/aol/micro/server/MicroserverApp.java @@ -12,8 +12,8 @@ import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; -import com.aol.cyclops.lambda.monads.SequenceM; -import com.aol.cyclops.lambda.utils.ExceptionSoftener; +import com.aol.cyclops.invokedynamic.ExceptionSoftener; +import com.aol.cyclops.sequence.SequenceM; import com.aol.micro.server.config.Config; import com.aol.micro.server.config.MicroserverConfigurer; import com.aol.micro.server.module.Module; @@ -35,8 +35,7 @@ public class MicroserverApp { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final List modules; private final CompletableFuture end = new CompletableFuture(); - private final ExceptionSoftener softener = ExceptionSoftener.singleton.factory - .getInstance(); + @Getter private final ApplicationContext springContext; @@ -84,7 +83,7 @@ private Class extractClass() { return Class.forName(new Exception().getStackTrace()[2] .getClassName()); } catch (ClassNotFoundException e) { - softener.throwSoftenedException(e); + ExceptionSoftener.throwSoftenedException(e); } return null; // unreachable normally } @@ -150,7 +149,7 @@ private void join(Thread thread) { thread.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - softener.throwSoftenedException(e); + ExceptionSoftener.throwSoftenedException(e); } } diff --git a/micro-core/src/main/java/com/aol/micro/server/PluginLoader.java b/micro-core/src/main/java/com/aol/micro/server/PluginLoader.java index 38de0ec7f..05234471e 100644 --- a/micro-core/src/main/java/com/aol/micro/server/PluginLoader.java +++ b/micro-core/src/main/java/com/aol/micro/server/PluginLoader.java @@ -7,15 +7,15 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; -import com.aol.cyclops.functions.Memoise; -import com.aol.cyclops.lambda.monads.SequenceM; +import com.aol.cyclops.functions.caching.Memoize; +import com.aol.cyclops.sequence.SequenceM; @NoArgsConstructor(access=AccessLevel.PRIVATE) public class PluginLoader { public final static PluginLoader INSTANCE = new PluginLoader(); - public final Supplier> plugins = Memoise.memoiseSupplier(this::load); + public final Supplier> plugins = Memoize.memoizeSupplier(this::load); private List load(){ return SequenceM.fromIterable(ServiceLoader.load(Plugin.class)).toList(); diff --git a/micro-core/src/main/java/com/aol/micro/server/config/MicroserverConfigurer.java b/micro-core/src/main/java/com/aol/micro/server/config/MicroserverConfigurer.java index 1d8c7311b..4ab07d331 100644 --- a/micro-core/src/main/java/com/aol/micro/server/config/MicroserverConfigurer.java +++ b/micro-core/src/main/java/com/aol/micro/server/config/MicroserverConfigurer.java @@ -9,7 +9,7 @@ import org.pcollections.HashTreePMap; import org.pcollections.HashTreePSet; -import com.aol.cyclops.lambda.monads.SequenceM; +import com.aol.cyclops.sequence.SequenceM; import com.aol.micro.server.Plugin; import com.aol.micro.server.PluginLoader; import com.nurkiewicz.lazyseq.LazySeq; diff --git a/micro-core/src/main/java/com/aol/micro/server/module/Module.java b/micro-core/src/main/java/com/aol/micro/server/module/Module.java index 32a00a9f0..1d2544fc7 100644 --- a/micro-core/src/main/java/com/aol/micro/server/module/Module.java +++ b/micro-core/src/main/java/com/aol/micro/server/module/Module.java @@ -19,7 +19,7 @@ import org.springframework.web.context.ContextLoaderListener; import org.springframework.web.context.WebApplicationContext; -import com.aol.cyclops.lambda.monads.SequenceM; +import com.aol.cyclops.sequence.SequenceM; import com.aol.micro.server.Plugin; import com.aol.micro.server.PluginLoader; import com.aol.micro.server.auto.discovery.Rest; diff --git a/micro-core/src/main/java/com/aol/micro/server/module/RestResourceTagBuilder.java b/micro-core/src/main/java/com/aol/micro/server/module/RestResourceTagBuilder.java index bb19b866a..352114608 100644 --- a/micro-core/src/main/java/com/aol/micro/server/module/RestResourceTagBuilder.java +++ b/micro-core/src/main/java/com/aol/micro/server/module/RestResourceTagBuilder.java @@ -12,13 +12,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.aol.cyclops.lambda.utils.ExceptionSoftener; +import com.aol.cyclops.invokedynamic.ExceptionSoftener; import com.aol.micro.server.auto.discovery.CommonRestResource; public class RestResourceTagBuilder { - private final static ExceptionSoftener softener = ExceptionSoftener.singleton.factory.getInstance(); private final static Logger logger = LoggerFactory.getLogger(RestResourceTagBuilder.class); @Setter @@ -36,7 +35,7 @@ private static Class toClass(String cl) { return Class.forName(cl); } catch (ClassNotFoundException e) { logger.error("Class not found for {}", cl); - softener.throwSoftenedException(e); + ExceptionSoftener.throwSoftenedException(e); } return null; } diff --git a/micro-core/src/main/java/com/aol/micro/server/rest/jackson/JacksonFeature.java b/micro-core/src/main/java/com/aol/micro/server/rest/jackson/JacksonFeature.java index 75f2b5aa6..1149ce4ab 100644 --- a/micro-core/src/main/java/com/aol/micro/server/rest/jackson/JacksonFeature.java +++ b/micro-core/src/main/java/com/aol/micro/server/rest/jackson/JacksonFeature.java @@ -6,8 +6,7 @@ import javax.ws.rs.core.FeatureContext; import javax.ws.rs.ext.MessageBodyReader; import javax.ws.rs.ext.MessageBodyWriter; - -import com.aol.cyclops.lambda.monads.SequenceM; +import com.aol.cyclops.sequence.SequenceM; import com.aol.micro.server.Plugin; import com.aol.micro.server.PluginLoader; import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider; diff --git a/micro-core/src/main/java/com/aol/micro/server/rest/jackson/JacksonUtil.java b/micro-core/src/main/java/com/aol/micro/server/rest/jackson/JacksonUtil.java index 78356960a..83096db12 100644 --- a/micro-core/src/main/java/com/aol/micro/server/rest/jackson/JacksonUtil.java +++ b/micro-core/src/main/java/com/aol/micro/server/rest/jackson/JacksonUtil.java @@ -6,9 +6,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.aol.cyclops.lambda.monads.SequenceM; -import com.aol.cyclops.lambda.utils.ExceptionSoftener; -import com.aol.micro.server.Plugin; +import com.aol.cyclops.invokedynamic.ExceptionSoftener; import com.aol.micro.server.PluginLoader; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; @@ -18,7 +16,7 @@ public final class JacksonUtil { - private final static ExceptionSoftener softener = ExceptionSoftener.singleton.factory.getInstance(); + private static ObjectMapper mapper = null; private static final Logger logger = LoggerFactory.getLogger(JacksonUtil.class); @@ -62,7 +60,7 @@ public static String serializeToJson(final Object data) { try { jsonString = getMapper().writeValueAsString(data); } catch (final Exception ex) { - softener.throwSoftenedException(ex); + ExceptionSoftener.throwSoftenedException(ex); } return jsonString; } @@ -74,7 +72,7 @@ public static T convertFromJson(final String jsonString, final Class type return getMapper().readValue(jsonString, type); } catch (final Exception ex) { - softener.throwSoftenedException(ex); + ExceptionSoftener.throwSoftenedException(ex); } return null; } @@ -85,7 +83,7 @@ public static T convertFromJson(final String jsonString, final JavaType type return getMapper().readValue(jsonString, type); } catch (final Exception ex) { - softener.throwSoftenedException(ex); + ExceptionSoftener.throwSoftenedException(ex); } return null; diff --git a/micro-core/src/main/java/com/aol/micro/server/servers/JaxRsServletConfigurer.java b/micro-core/src/main/java/com/aol/micro/server/servers/JaxRsServletConfigurer.java index 24fff967c..a0455570c 100644 --- a/micro-core/src/main/java/com/aol/micro/server/servers/JaxRsServletConfigurer.java +++ b/micro-core/src/main/java/com/aol/micro/server/servers/JaxRsServletConfigurer.java @@ -5,7 +5,7 @@ import javax.servlet.ServletContext; -import com.aol.cyclops.lambda.monads.SequenceM; +import com.aol.cyclops.sequence.SequenceM; import com.aol.micro.server.Plugin; import com.aol.micro.server.PluginLoader; import com.aol.micro.server.module.IncorrectJaxRsPluginsException; diff --git a/micro-core/src/main/java/com/aol/micro/server/spring/SpringApplicationConfigurator.java b/micro-core/src/main/java/com/aol/micro/server/spring/SpringApplicationConfigurator.java index 4ae1c2828..841a18f7a 100644 --- a/micro-core/src/main/java/com/aol/micro/server/spring/SpringApplicationConfigurator.java +++ b/micro-core/src/main/java/com/aol/micro/server/spring/SpringApplicationConfigurator.java @@ -8,7 +8,7 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.web.context.support.AnnotationConfigWebApplicationContext; -import com.aol.cyclops.lambda.monads.SequenceM; +import com.aol.cyclops.sequence.SequenceM; import com.aol.micro.server.Plugin; import com.aol.micro.server.PluginLoader; import com.aol.micro.server.config.Config; diff --git a/micro-core/src/main/java/com/aol/micro/server/spring/SpringContextFactory.java b/micro-core/src/main/java/com/aol/micro/server/spring/SpringContextFactory.java index d7c4a50da..4ee9251d5 100644 --- a/micro-core/src/main/java/com/aol/micro/server/spring/SpringContextFactory.java +++ b/micro-core/src/main/java/com/aol/micro/server/spring/SpringContextFactory.java @@ -12,7 +12,7 @@ import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; -import com.aol.cyclops.lambda.utils.ExceptionSoftener; +import com.aol.cyclops.invokedynamic.ExceptionSoftener; import com.aol.micro.server.ErrorCode; import com.aol.micro.server.config.Config; @@ -21,7 +21,6 @@ public class SpringContextFactory { private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final ExceptionSoftener softener = ExceptionSoftener.singleton.factory.getInstance(); private final PSet classes; private final Config config; @Wither @@ -50,7 +49,7 @@ public ApplicationContext createSpringContext() { return springContext; } catch (Exception e) { logger.error( ErrorCode.STARTUP_FAILED_SPRING_INITIALISATION.toString(),e.getMessage()); - softener.throwSoftenedException(e); + ExceptionSoftener.throwSoftenedException(e); } return null; } diff --git a/micro-core/src/test/java/com/aol/micro/server/module/ModuleTest.java b/micro-core/src/test/java/com/aol/micro/server/module/ModuleTest.java index de8b17c75..ef5bdfdc1 100644 --- a/micro-core/src/test/java/com/aol/micro/server/module/ModuleTest.java +++ b/micro-core/src/test/java/com/aol/micro/server/module/ModuleTest.java @@ -7,7 +7,7 @@ import org.junit.Test; -import com.aol.cyclops.lambda.monads.SequenceM; +import com.aol.cyclops.sequence.SequenceM; import com.aol.micro.server.Plugin; public class ModuleTest { diff --git a/micro-couchbase/readme.md b/micro-couchbase/readme.md index 9f2d4c159..2164ceccd 100644 --- a/micro-couchbase/readme.md +++ b/micro-couchbase/readme.md @@ -2,10 +2,36 @@ Basically Available Soft statE -Manifest comparator - - Versioned key for loading refreshed state +* Simple Couchbase Client +* Manifest comparator : Versioned key for loading refreshed state +* Simple Distributed lock implementation +# Manifest comparison -Simple Couchbase Client +Manifest comparison stores a manifest along with each value. The manifest contains the version for the value, if the version has changed, the latest verson of the value will be loaded. -Distributed lock \ No newline at end of file + + key : manifest [contains version info] + versionedKey : value + +This allows large immutable datastructures to be stored in as a key/value pair (with separate key/value pairing for version info), and reloaded automatically on change. + +# Distributed Lock + +A simple distributed lock implementation that could be used to select a single leader from multiple members in a cluster. + +## Getting The Microserver Couchbase Plugin + +[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.aol.microservices/micro-couchbase/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.aol.microservices/micro-couchbase) + +### Maven +```xml + + com.aol.microservices + micro-couchbase + x.yz + +``` +### Gradle + + compile 'com.aol.microservices:micro-couchbase:x.yz' \ No newline at end of file diff --git a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/SimpleCouchbaseClient.java b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/SimpleCouchbaseClient.java index c7a81a588..8ce16a533 100644 --- a/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/SimpleCouchbaseClient.java +++ b/micro-couchbase/src/main/java/com/aol/micro/server/couchbase/SimpleCouchbaseClient.java @@ -6,7 +6,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.aol.cyclops.lambda.utils.ExceptionSoftener; +import com.aol.cyclops.invokedynamic.ExceptionSoftener; import com.couchbase.client.CouchbaseClient; public class SimpleCouchbaseClient { @@ -31,7 +31,7 @@ private boolean putInternal(final CouchbaseClient client, final String key, fina try{ return client.set(key, value).get(); } catch (InterruptedException | ExecutionException e) { - ExceptionSoftener.singleton.factory.getInstance().throwSoftenedException(e); + ExceptionSoftener.throwSoftenedException(e); return false;//unreachable } } diff --git a/micro-data/src/main/java/com/aol/micro/server/spring/datasource/hibernate/HibernateSessionBuilder.java b/micro-data/src/main/java/com/aol/micro/server/spring/datasource/hibernate/HibernateSessionBuilder.java index 0bf0a98c8..aa5188e3c 100644 --- a/micro-data/src/main/java/com/aol/micro/server/spring/datasource/hibernate/HibernateSessionBuilder.java +++ b/micro-data/src/main/java/com/aol/micro/server/spring/datasource/hibernate/HibernateSessionBuilder.java @@ -14,7 +14,7 @@ import org.springframework.orm.hibernate4.HibernateTransactionManager; import org.springframework.orm.hibernate4.LocalSessionFactoryBean; -import com.aol.cyclops.lambda.utils.ExceptionSoftener; +import com.aol.cyclops.invokedynamic.ExceptionSoftener; import com.aol.micro.server.spring.datasource.JdbcConfig; @@ -22,7 +22,6 @@ @AllArgsConstructor public class HibernateSessionBuilder { private final Logger logger = LoggerFactory.getLogger( getClass()); - ExceptionSoftener softener = ExceptionSoftener.singleton.factory.getInstance(); private final JdbcConfig env; @@ -57,14 +56,14 @@ public SessionFactory sessionFactory() { sessionFactoryBean.afterPropertiesSet(); } catch (Exception e) { logger.error(e.getMessage(),e); - softener.throwSoftenedException(e); + ExceptionSoftener.throwSoftenedException(e); } try{ return sessionFactoryBean.getObject(); }catch(Exception e){ logger.error(e.getMessage(),e); - softener.throwSoftenedException(e); + ExceptionSoftener.throwSoftenedException(e); } return null; } diff --git a/micro-events/src/main/java/com/aol/micro/server/rest/resources/ActiveResource.java b/micro-events/src/main/java/com/aol/micro/server/rest/resources/ActiveResource.java index 4d6ba6243..6f07ae929 100644 --- a/micro-events/src/main/java/com/aol/micro/server/rest/resources/ActiveResource.java +++ b/micro-events/src/main/java/com/aol/micro/server/rest/resources/ActiveResource.java @@ -39,17 +39,15 @@ public ActiveResource(List activeQueries,JobsBeingExecute this.activeJobs = activeJobs; } - private void updateLogLevel(String level){ - - } + @GET @Produces("application/json") @Path("/requests") public void activeRequests(@Suspended AsyncResponse asyncResponse,@QueryParam("type") final String type) { - this.sync(lr-> lr.of((type == null ? "default" : type)) + this.cpuStreamBuilder().of((type == null ? "default" : type)) .map(typeToUse->activeQueries.get(typeToUse).toString()) - .peek(result->asyncResponse.resume(result))) + .peek(result->asyncResponse.resume(result)) .run(); } @@ -60,7 +58,7 @@ public void activeRequests(@Suspended AsyncResponse asyncResponse,@QueryParam("t @Path("/jobs") public void activeJobs(@Suspended AsyncResponse asyncResponse) { - this.cpuStream().of(this.activeJobs) + this.cpuStreamBuilder().of(this.activeJobs) .then(JobsBeingExecuted::toString) .then(str->asyncResponse.resume(str)) .run(); diff --git a/micro-events/src/main/java/com/aol/micro/server/rest/resources/ManifestResource.java b/micro-events/src/main/java/com/aol/micro/server/rest/resources/ManifestResource.java index 4c506168b..7384c59da 100644 --- a/micro-events/src/main/java/com/aol/micro/server/rest/resources/ManifestResource.java +++ b/micro-events/src/main/java/com/aol/micro/server/rest/resources/ManifestResource.java @@ -35,7 +35,7 @@ public class ManifestResource implements CommonRestResource, SingletonRestResour @Produces("application/json") public void mainfest(@Suspended AsyncResponse asyncResponse, @Context ServletContext context) { - this.ioStream().of("/META-INF/MANIFEST.MF") + this.ioStreamBuilder().of("/META-INF/MANIFEST.MF") .map(url->context.getResourceAsStream(url)) .map(this::getManifest) .peek(result->asyncResponse.resume(result)) diff --git a/micro-grizzly-with-jersey/src/test/java/app/async/com/aol/micro/server/AsyncResource.java b/micro-grizzly-with-jersey/src/test/java/app/async/com/aol/micro/server/AsyncResource.java index a86a62d96..aa167f6f2 100644 --- a/micro-grizzly-with-jersey/src/test/java/app/async/com/aol/micro/server/AsyncResource.java +++ b/micro-grizzly-with-jersey/src/test/java/app/async/com/aol/micro/server/AsyncResource.java @@ -11,7 +11,7 @@ import org.springframework.stereotype.Component; -import com.aol.cyclops.lambda.monads.SequenceM; +import com.aol.cyclops.sequence.SequenceM; import com.aol.micro.server.auto.discovery.RestResource; import com.aol.micro.server.testing.RestAgent; import com.aol.simple.react.stream.simple.SimpleReact; @@ -34,11 +34,12 @@ public class AsyncResource implements RestResource{ @Produces("text/plain") public void expensive(@Suspended AsyncResponse asyncResponse){ - LazyFutureStream.ofIterable(urls) + LazyFutureStream.lazyFutureStreamFromIterable(urls) .then(it->client.get(it)) .onFail(it -> "") .peek(it -> System.out.println(it)) + .convertToSimpleReact() .allOf(data -> { System.out.println(data); return asyncResponse.resume(SequenceM.fromIterable(data).join(";")); }); diff --git a/micro-grizzly-with-jersey/src/test/java/app/embedded/com/aol/micro/server/TestAppResource.java b/micro-grizzly-with-jersey/src/test/java/app/embedded/com/aol/micro/server/TestAppResource.java index 6b02f15b7..9711f49e3 100644 --- a/micro-grizzly-with-jersey/src/test/java/app/embedded/com/aol/micro/server/TestAppResource.java +++ b/micro-grizzly-with-jersey/src/test/java/app/embedded/com/aol/micro/server/TestAppResource.java @@ -11,7 +11,7 @@ import com.aol.micro.server.testing.RestAgent; import com.aol.simple.react.stream.simple.SimpleReact; -import com.aol.simple.react.stream.traits.EagerFutureStream; +import com.aol.simple.react.stream.traits.LazyFutureStream; @Component @Path("/test-status") public class TestAppResource implements TestAppRestResource { @@ -37,7 +37,7 @@ public String ping() { @Path("/rest-calls") public String restCallResult(){ - return EagerFutureStream.ofIterable(urls) + return LazyFutureStream.lazyFutureStreamFromIterable(urls) .map(it ->template.get(it)) .then(it -> "*"+it) .peek(loadedAndModified -> System.out.println(loadedAndModified)) diff --git a/micro-grizzly/src/main/java/com/aol/micro/server/servers/grizzly/GrizzlyApplication.java b/micro-grizzly/src/main/java/com/aol/micro/server/servers/grizzly/GrizzlyApplication.java index b5d72ce19..0ca2df8d5 100644 --- a/micro-grizzly/src/main/java/com/aol/micro/server/servers/grizzly/GrizzlyApplication.java +++ b/micro-grizzly/src/main/java/com/aol/micro/server/servers/grizzly/GrizzlyApplication.java @@ -20,7 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.aol.cyclops.lambda.utils.ExceptionSoftener; +import com.aol.cyclops.invokedynamic.ExceptionSoftener; import com.aol.micro.server.ErrorCode; import com.aol.micro.server.config.SSLProperties; import com.aol.micro.server.module.WebServerProvider; @@ -39,7 +39,6 @@ public class GrizzlyApplication implements ServerApplication { private final Logger logger = LoggerFactory.getLogger(getClass()); - private final ExceptionSoftener softener = ExceptionSoftener.singleton.factory.getInstance(); @Getter private final ServerData serverData; @@ -98,12 +97,12 @@ private void startServer(WebappContext webappContext, HttpServer httpServer, Com end.get(); } catch (IOException e) { - softener.throwSoftenedException(e); + ExceptionSoftener.throwSoftenedException(e); } catch (ExecutionException e) { - softener.throwSoftenedException(e); + ExceptionSoftener.throwSoftenedException(e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - softener.throwSoftenedException(e); + ExceptionSoftener.throwSoftenedException(e); } finally { httpServer.stop(); } diff --git a/micro-grizzly/src/test/java/app/async/com/aol/micro/server/AsyncResource.java b/micro-grizzly/src/test/java/app/async/com/aol/micro/server/AsyncResource.java index a86a62d96..aa167f6f2 100644 --- a/micro-grizzly/src/test/java/app/async/com/aol/micro/server/AsyncResource.java +++ b/micro-grizzly/src/test/java/app/async/com/aol/micro/server/AsyncResource.java @@ -11,7 +11,7 @@ import org.springframework.stereotype.Component; -import com.aol.cyclops.lambda.monads.SequenceM; +import com.aol.cyclops.sequence.SequenceM; import com.aol.micro.server.auto.discovery.RestResource; import com.aol.micro.server.testing.RestAgent; import com.aol.simple.react.stream.simple.SimpleReact; @@ -34,11 +34,12 @@ public class AsyncResource implements RestResource{ @Produces("text/plain") public void expensive(@Suspended AsyncResponse asyncResponse){ - LazyFutureStream.ofIterable(urls) + LazyFutureStream.lazyFutureStreamFromIterable(urls) .then(it->client.get(it)) .onFail(it -> "") .peek(it -> System.out.println(it)) + .convertToSimpleReact() .allOf(data -> { System.out.println(data); return asyncResponse.resume(SequenceM.fromIterable(data).join(";")); }); diff --git a/micro-grizzly/src/test/java/app/embedded/com/aol/micro/server/TestAppResource.java b/micro-grizzly/src/test/java/app/embedded/com/aol/micro/server/TestAppResource.java index 6b02f15b7..9711f49e3 100644 --- a/micro-grizzly/src/test/java/app/embedded/com/aol/micro/server/TestAppResource.java +++ b/micro-grizzly/src/test/java/app/embedded/com/aol/micro/server/TestAppResource.java @@ -11,7 +11,7 @@ import com.aol.micro.server.testing.RestAgent; import com.aol.simple.react.stream.simple.SimpleReact; -import com.aol.simple.react.stream.traits.EagerFutureStream; +import com.aol.simple.react.stream.traits.LazyFutureStream; @Component @Path("/test-status") public class TestAppResource implements TestAppRestResource { @@ -37,7 +37,7 @@ public String ping() { @Path("/rest-calls") public String restCallResult(){ - return EagerFutureStream.ofIterable(urls) + return LazyFutureStream.lazyFutureStreamFromIterable(urls) .map(it ->template.get(it)) .then(it -> "*"+it) .peek(loadedAndModified -> System.out.println(loadedAndModified)) diff --git a/micro-guava/src/test/java/app/embedded/com/aol/micro/server/TestAppResource.java b/micro-guava/src/test/java/app/embedded/com/aol/micro/server/TestAppResource.java index 20d942177..16cc34f42 100644 --- a/micro-guava/src/test/java/app/embedded/com/aol/micro/server/TestAppResource.java +++ b/micro-guava/src/test/java/app/embedded/com/aol/micro/server/TestAppResource.java @@ -1,7 +1,5 @@ package app.embedded.com.aol.micro.server; -import java.util.concurrent.CompletableFuture; - import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; @@ -10,7 +8,7 @@ import com.aol.micro.server.testing.RestAgent; import com.aol.simple.react.stream.simple.SimpleReact; -import com.aol.simple.react.stream.traits.EagerFutureStream; +import com.aol.simple.react.stream.traits.LazyFutureStream; import com.google.common.collect.ImmutableList; @Component @Path("/test-status") @@ -37,7 +35,7 @@ public String ping() { @Path("/rest-calls") public String restCallResult(){ - return EagerFutureStream.ofIterable(urls) + return LazyFutureStream.lazyFutureStreamFromIterable(urls) .map(it ->template.get(it)) .then(it -> "*"+it) .peek(loadedAndModified -> System.out.println(loadedAndModified)) diff --git a/micro-ip-tracker/readme.md b/micro-ip-tracker/readme.md index 10c9354a6..cf1b625bf 100644 --- a/micro-ip-tracker/readme.md +++ b/micro-ip-tracker/readme.md @@ -8,14 +8,16 @@ The IP Address is stored in a thread local variable & available via QueryIPRetri Simply add to the classpath -Maven +[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.aol.microservices/micro-ip-tracker/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.aol.microservices/micro-ip-tracker) + +### Maven com.aol.microservices micro-ip-tracker - 0.62 + x.yx -Gradle +### Gradle - compile 'com.aol.microservices:micro-ip-tracker:0.62' \ No newline at end of file + compile 'com.aol.microservices:micro-ip-tracker:x.yz' \ No newline at end of file diff --git a/micro-mysql/readme.md b/micro-mysql/readme.md index 8022bded1..b76c2fb03 100644 --- a/micro-mysql/readme.md +++ b/micro-mysql/readme.md @@ -1,3 +1,21 @@ # Distributed lock plugin for MySQL Allows MySQL to be used to create distributed locks. Not suitable for use against a MySQL cluster prior to version . + +Autowire com.aol.micro.server.utility.DistributedLockService into your beans to make use of Distributed locking. If you also have the couchbase plugin installed autowire com.aol.micro.server.mysql.distlock.DistributedLockServiceMySqlImpl instead. + +## Getting The Microserver MySql Plugin + +[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.aol.microservices/micro-mysql/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.aol.microservices/micro-mysql) + +### Maven +```xml + + com.aol.microservices + micro-mysql + x.yz + +``` +### Gradle + + compile 'com.aol.microservices:micro-mysql:x.yz' \ No newline at end of file diff --git a/micro-mysql/src/main/java/com/aol/micro/server/mysql/distlock/DistributedLockServiceMySqlImpl.java b/micro-mysql/src/main/java/com/aol/micro/server/mysql/distlock/DistributedLockServiceMySqlImpl.java index eeb42bf80..7c8f399dc 100644 --- a/micro-mysql/src/main/java/com/aol/micro/server/mysql/distlock/DistributedLockServiceMySqlImpl.java +++ b/micro-mysql/src/main/java/com/aol/micro/server/mysql/distlock/DistributedLockServiceMySqlImpl.java @@ -18,7 +18,7 @@ public class DistributedLockServiceMySqlImpl implements DistributedLockService { volatile JdbcTemplate jdbcTemplate; @Autowired(required = false) - @Qualifier("dataSourceForDistributedLocking") + @Qualifier("distLockingDataSource") public void setSmartDataSource(DataSource dataSource) { if (dataSource != null) this.jdbcTemplate = new JdbcTemplate(dataSource); diff --git a/micro-reactive/src/main/java/com/aol/micro/server/reactive/MicroLazyReact.java b/micro-reactive/src/main/java/com/aol/micro/server/reactive/MicroLazyReact.java deleted file mode 100644 index 2f05ef953..000000000 --- a/micro-reactive/src/main/java/com/aol/micro/server/reactive/MicroLazyReact.java +++ /dev/null @@ -1,203 +0,0 @@ -package com.aol.micro.server.reactive; - -import java.util.Collection; -import java.util.function.Supplier; -import java.util.stream.Stream; - -import lombok.AllArgsConstructor; - -import com.aol.cyclops.streams.StreamUtils; -import com.aol.simple.react.stream.lazy.LazyReact; -import com.aol.simple.react.stream.traits.LazyFutureStream; - -/** - * Builder of LazyFutureStreams - * - * @author johnmcclean - * - */ -@AllArgsConstructor -public class MicroLazyReact{ - - private final LazyReact react; - - private Supplier supplier(T element){ - return ()->element; - } - /** - * Generate a FutureStream from the specified elements. Each element will be wrapped in a Supplier and submitted to a task executor for - * execution, the returned Stream will be in synchronous mode, where any subsequent operations performed on task results will occur on the - * same thread without involving a task executor (performance difference between submitting non-tasks and continuing on calling thread is an - * order of magnitude). - * - * E.g. given 3 URLs, we can use this method to move each call onto a separte thread, but work will continue on the same thread - * once complete - *
-	 * {@code
-	 *  
-	 *       microReact.of(ioCallURL1,ioCallURL2,ioCallURL3)  //each URL is wrapped in a task to be recieved on potentially different threads
-	 *       		   .map(this::doIO)             //each I/O Call can run on a separate thread, the calling thread always executes
-	 *                 .map(this::processResult)     //map occurs on the calling thread
-	 *                 .forEach(System.out::println);
-	 * }
- * - * @param elements to create LazyFutureStream from - * @return LazyFutureStream - */ - public LazyFutureStream of(T... elements){ - - return react(Stream.of(elements).map(e->supplier(e))); - } - /** - * Generate a FutureStream from the specified Stream. Each element in the Stream will be wrapped in a Supplier and submitted to a task executor for - * execution, the returned Stream will be in synchronous mode, where any subsequent operations performed on task results will occur on the - * same thread without involving a task executor (performance difference between submitting non-tasks and continuing on calling thread is an - * order of magnitude). - * - * E.g. given 3 URLs, we can use this method to move each call onto a separte thread, but work will continue on the same thread - * once complete - *
-	 * {@code
-	 *  
-	 *       microReact.of(Stream.of(ioCallURL1,ioCallURL2,ioCallURL3))  //each URL is wrapped in a task to be recieved on potentially different threads
-	 *       		   .map(this::doIO)             //each I/O Call can run on a separate thread, the calling thread always executes
-	 *                 .map(this::processResult)     //map occurs on the calling thread
-	 *                 .forEach(System.out::println);
-	 * }
- * - * @param stream to create LazyFutureStream from - * @return LazyFutureStream - */ - public LazyFutureStream of(Stream stream){ - return react(stream.map(e->supplier(e))); - } - /** - * Generate a FutureStream from the specified Collection. Each element in the Collection will be wrapped in a Supplier and submitted to a task executor for - * execution, the returned Stream will be in synchronous mode, where any subsequent operations performed on task results will occur on the - * same thread without involving a task executor (performance difference between submitting non-tasks and continuing on calling thread is an - * order of magnitude). - * - * E.g. given 3 URLs, we can use this method to move each call onto a separte thread, but work will continue on the same thread - * once complete - *
-	 * {@code
-	 *  
-	 *       microReact.of(Arrays.asList(ioCallURL1,ioCallURL2,ioCallURL3))  //each URL is wrapped in a task to be recieved on potentially different threads
-	 *       		   .map(this::doIO)             //each I/O Call can run on a separate thread, the calling thread always executes
-	 *                 .map(this::processResult)     //map occurs on the calling thread
-	 *                 .forEach(System.out::println);
-	 * }
- * - * @param collection to create LazyFutureStream from - * @return LazyFutureStream - */ - public LazyFutureStream of(Collection collection){ - return react(collection.stream().map(e->supplier(e))); - } - /** - * Generate a FutureStream from the specified Iterable. Each element in the Iterable will be wrapped in a Supplier and submitted to a task executor for - * execution, the returned Stream will be in synchronous mode, where any subsequent operations performed on task results will occur on the - * same thread without involving a task executor (performance difference between submitting non-tasks and continuing on calling thread is an - * order of magnitude). - * - * E.g. given 3 URLs, we can use this method to move each call onto a separte thread, but work will continue on the same thread - * once complete - *
-	 * {@code
-	 *  
-	 *       microReact.ofIterable(Arrays.asList(ioCallURL1,ioCallURL2,ioCallURL3))  //each URL is wrapped in a task to be recieved on potentially different threads
-	 *       		   .map(this::doIO)             //each I/O Call can run on a separate thread, the calling thread always executes
-	 *                 .map(this::processResult)     //map occurs on the calling thread
-	 *                 .forEach(System.out::println);
-	 * }
- * - * @param iterable to create LazyFutureStream from - * @return LazyFutureStream - */ - public LazyFutureStream ofIterable(Iterable iterable){ - return react(StreamUtils.stream(iterable).map(e->supplier(e))); - } - /** - * Generate a FutureStream from the specified Suppliers. Each Supplier and submitted to a task executor for - * execution, the returned Stream will be in synchronous mode, where any subsequent operations performed on task results will occur on the - * same thread without involving a task executor (performance difference between submitting non-tasks and continuing on calling thread is an - * order of magnitude). - *
-	 * {@code
-	 *  
-	 *       microReact.react(()->ioCallURL1,()->ioCallURL2,()->ioCallURL3)  //each URL Supplier is an Async task to be recieved on potentially different threads
-	 *       		   .map(this::doIO)             //each I/O Call can run on a separate thread, the calling thread always executes
-	 *                 .map(this::processResult)     //map occurs on the calling thread
-	 *                 .forEach(System.out::println);
-	 * }
- - * - * @param suppliers to create LazyFutureStream from - * @return LazyFutureStream - */ - public LazyFutureStream react(Supplier... suppliers){ - return react.react(suppliers).sync(); - } - /** - * Generate a FutureStream from the specified Stream of Suppliers. Each Supplier and submitted to a task executor for - * execution, the returned Stream will be in synchronous mode, where any subsequent operations performed on task results will occur on the - * same thread without involving a task executor (performance difference between submitting non-tasks and continuing on calling thread is an - * order of magnitude). - *
-	 * {@code
-	 *  
-	 *       microReact.react(Stream.of(()->ioCallURL1,()->ioCallURL2,()->ioCallURL3))  //each URL Supplier is an Async task to be recieved on potentially different threads
-	 *       		   .map(this::doIO)             //each I/O Call can run on a separate thread, the calling thread always executes
-	 *                 .map(this::processResult)     //map occurs on the calling thread
-	 *                 .forEach(System.out::println);
-	 * }
- * - * @param suppliers to create LazyFutureStream from - * @return LazyFutureStream - */ - public LazyFutureStream react(Stream> suppliers){ - return react.react(suppliers).sync(); - } - /** - * Generate a FutureStream from the specified Collection of Suppliers. Each Supplier and submitted to a task executor for - * execution, the returned Stream will be in synchronous mode, where any subsequent operations performed on task results will occur on the - * same thread without involving a task executor (performance difference between submitting non-tasks and continuing on calling thread is an - * order of magnitude). - *
-	 * {@code
-	 *  
-	 *       microReact.react(Arrays.asList(()->ioCallURL1,()->ioCallURL2,()->ioCallURL3))  //each URL Supplier is an Async task to be recieved on potentially different threads
-	 *       		   .map(this::doIO)             //each I/O Call can run on a separate thread, the calling thread always executes
-	 *                 .map(this::processResult)     //map occurs on the calling thread
-	 *                 .forEach(System.out::println);
-	 * }
- * - * - * @param suppliers to create LazyFutureStream from - * @return LazyFutureStream - */ - public LazyFutureStream react(Collection> suppliers){ - return react.react(suppliers).sync(); - } - /** - * Generate a FutureStream from the specified Iterable of Suppliers. Each Supplier and submitted to a task executor for - * execution, the returned Stream will be in synchronous mode, where any subsequent operations performed on task results will occur on the - * same thread without involving a task executor (performance difference between submitting non-tasks and continuing on calling thread is an - * order of magnitude). - *
-	 * {@code
-	 *  
-	 *       microReact.reactIterable(Arrays.asList(()->ioCallURL1,()->ioCallURL2,()->ioCallURL3))  //each URL Supplier is an Async task to be recieved on potentially different threads
-	 *       		   .map(this::doIO)             //each I/O Call can run on a separate thread, the calling thread always executes
-	 *                 .map(this::processResult)     //map occurs on the calling thread
-	 *                 .forEach(System.out::println);
-	 * }
- * - * - * @param suppliers to create LazyFutureStream from - * @return LazyFutureStream - */ - public LazyFutureStream reactIterable(Iterable> suppliers){ - return react.reactIterable(suppliers).sync(); - } -} \ No newline at end of file diff --git a/micro-reactive/src/main/java/com/aol/micro/server/reactive/Pipes.java b/micro-reactive/src/main/java/com/aol/micro/server/reactive/Pipes.java index 7b525dd85..4f23b3475 100644 --- a/micro-reactive/src/main/java/com/aol/micro/server/reactive/Pipes.java +++ b/micro-reactive/src/main/java/com/aol/micro/server/reactive/Pipes.java @@ -1,12 +1,11 @@ package com.aol.micro.server.reactive; -import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import com.aol.cyclops.sequence.SequenceM; import com.aol.simple.react.async.Adapter; -import com.aol.simple.react.async.QueueFactories; +import com.aol.simple.react.async.pipes.LazyReactors; +import com.aol.simple.react.async.subscription.Subscription; import com.aol.simple.react.stream.traits.LazyFutureStream; /** @@ -15,46 +14,45 @@ * @author johnmcclean * */ -public class Pipes { - - private static final ConcurrentMap> registered = new ConcurrentHashMap<>(); - +public class Pipes extends com.aol.simple.react.async.pipes.Pipes{ + /** * @param key : Adapter identifier * @return selected Queue */ @SuppressWarnings({ "unchecked", "rawtypes" }) public static Optional> get(K key){ - return Optional.ofNullable((Adapter)registered.get(key)); + return com.aol.simple.react.async.pipes.Pipes.get(key); } /** - * Register a Queue, and get back a listening LazyFutureStream that runs on a single thread - * (not the calling thread) + * Register a Queue, and get back a listening SequenceM * - *
+	 * 
+	 * 
+	 *  
 	 * {@code
-	 * LazyFutureStream stream = Pipes.register("test", QueueFactories.
+	 * SequenceM stream = Pipes.registerForSequential("test", QueueFactories.
 											boundedNonBlockingQueue(100)
 												.build());
-		stream.filter(it->it!=null).peek(System.out::println).run();
+		stream.filter(it->it!=null)
+		      .peek(this::process)
+		      .sync()
+		      .forEach(System.out::println);
 	 * 
 	 * }
- * * @param key : Adapter identifier * @param adapter * @return LazyFutureStream from supplied Queue, optimisied for CPU bound operation */ - public static LazyFutureStream register(Object key, Adapter adapter){ - registered.put(key, adapter); - return LazyFutureStream.of(adapter.stream()); + public static SequenceM registerForSequential(Object key, Adapter adapter){ + com.aol.simple.react.async.pipes.Pipes.register(key, adapter); + return adapter.stream(); } /** * Register a Queue, and get back a listening LazyFutureStream optimized for CPU Bound operations * - * Convert the LazyFutureStream to async mode to fan out operations across threads, after the first fan out operation definition - * it should be converted to sync mode * *
 	 * {@code
@@ -62,9 +60,7 @@ public static  LazyFutureStream register(Object key, Adapter adapter){
 											boundedNonBlockingQueue(100)
 												.build());
 		stream.filter(it->it!=null)
-		      .async()
 		      .peek(this::process)
-		      .sync()
 		      .forEach(System.out::println);
 	 * 
 	 * }
@@ -73,8 +69,9 @@ public static LazyFutureStream register(Object key, Adapter adapter){ * @return LazyFutureStream from supplied Queue, optimisied for CPU bound operation */ public static LazyFutureStream registerForCPU(Object key, Adapter adapter){ - registered.put(key, adapter); - return Reactors.cpuReact.of(adapter.stream()); + com.aol.simple.react.async.pipes.Pipes.register(key, adapter); + Subscription sub = new Subscription(); + return LazyReactors.cpuReact.from(adapter.stream(sub)).withSubscription(sub); } /** * Register a Queue, and get back a listening LazyFutureStream optimized for IO Bound operations @@ -85,9 +82,7 @@ public static LazyFutureStream registerForCPU(Object key, Adapter adap boundedNonBlockingQueue(100) .build()); stream.filter(it->it!=null) - .async() .peek(this::load) - .sync() .run(System.out::println); * * }
@@ -97,37 +92,37 @@ public static LazyFutureStream registerForCPU(Object key, Adapter adap * @return LazyFutureStream from supplied Queue */ public static LazyFutureStream registerForIO(Object key, Adapter adapter){ - registered.put(key, adapter); - return Reactors.ioReact.of(adapter.stream()); + com.aol.simple.react.async.pipes.Pipes.register(key, adapter); + Subscription sub = new Subscription(); + return LazyReactors.ioReact.from(adapter.stream(sub)).withSubscription(sub); } /** * @param key : Queue identifier * @return LazyFutureStream that reads from specified Queue */ @SuppressWarnings({ "unchecked", "rawtypes" }) - public static LazyFutureStream stream(Object key){ - return LazyFutureStream.of(((Adapter)registered.get(key)).stream()); + public static SequenceM stream(Object key){ + return Pipes.get(key).get().stream(); } /** * @param key : Queue identifier * @return LazyFutureStream that reads from specified Queue */ @SuppressWarnings({ "unchecked", "rawtypes" }) - public static LazyFutureStream streamIOBound(Object key){ - return Reactors.ioReact.of(((Adapter)registered.get(key)).stream()); + public static LazyFutureStream futureStreamIOBound(Object key){ + Subscription sub = new Subscription(); + return LazyReactors.ioReact.from(Pipes.get(key).get().stream(sub)).withSubscription(sub); } /** * @param key : Queue identifier * @return LazyFutureStream that reads from specified Queue */ @SuppressWarnings({ "unchecked", "rawtypes" }) - public static LazyFutureStream streamCPUBound(Object key){ - return Reactors.cpuReact.of(((Adapter)registered.get(key)).stream()); + public static LazyFutureStream futureStreamCPUBound(Object key){ + Subscription sub = new Subscription(); + return LazyReactors.cpuReact.from(Pipes.get(key).get().stream(sub)).withSubscription(sub); } - public static void clear() { - registered.clear(); - - } + } diff --git a/micro-reactive/src/main/java/com/aol/micro/server/reactive/Reactive.java b/micro-reactive/src/main/java/com/aol/micro/server/reactive/Reactive.java index d1f994fcc..2fb3a9031 100644 --- a/micro-reactive/src/main/java/com/aol/micro/server/reactive/Reactive.java +++ b/micro-reactive/src/main/java/com/aol/micro/server/reactive/Reactive.java @@ -3,10 +3,12 @@ import java.util.Optional; import java.util.function.Function; +import com.aol.cyclops.sequence.SequenceM; import com.aol.cyclops.trycatch.Failure; import com.aol.cyclops.trycatch.Success; import com.aol.cyclops.trycatch.Try; import com.aol.simple.react.async.Adapter; +import com.aol.simple.react.async.pipes.LazyReactors; import com.aol.simple.react.stream.lazy.LazyReact; import com.aol.simple.react.stream.traits.LazyFutureStream; import com.aol.simple.react.threads.ParallelElasticPools; @@ -37,25 +39,25 @@ default Try enqueue(K key,V value){ } + default LazyFutureStream ioFutureStream(K key){ + return Pipes.futureStreamIOBound(key); + } + default SequenceM sequentialStream(K key){ + return Pipes.stream(key); + } + default SequenceM cpuFutureStream(K key){ + return Pipes.futureStreamCPUBound(key); + } + default LazyReact ioStreamBuilder(){ + return LazyReactors.ioReact; + } - /** - * - * Generate a sequentially executing single-threaded a LazyFutureStream that executes all tasks directly without involving - * a task executor between each stage (unless async operator invoked). A preconfigured LazyReact builder that will be supplied as - * input to the function supplied. The user Function should create a LazyFutureStream with any - * business logic stages predefined. This method will handle elastic scaling and pooling of Executor - * services. User code should call a terminal op on the returned LazyFutureStream - * @see Reactive#run(com.aol.simple.react.stream.traits.LazyFutureStream) - * - * @param react Function that generates a LazyFutureStream from a LazyReact builder - * @return Generated LazyFutureStream - */ - default LazyFutureStream sync(Function> react){ - LazyReact r =SequentialElasticPools.lazyReact.nextReactor().withAsync(false); - return react.apply( r) - .onFail(e->{ SequentialElasticPools.lazyReact.populate(r); throw e;}) - .peek(i->SequentialElasticPools.lazyReact.populate(r)); - + default LazyReact cpuStreamBuilder(){ + return LazyReactors.cpuReact; + } + + default SequenceM switchToSequential(LazyFutureStream stream){ + return SequenceM.fromStream(stream); } /** @@ -64,8 +66,8 @@ default LazyFutureStream sync(Function> rea * @param stream to convert to IO mode * @return LazyFutureStream in IO mode */ - default LazyFutureStream swithToIO(LazyFutureStream stream){ - LazyReact react = Reactors.ioReact; + default LazyFutureStream switchToIO(LazyFutureStream stream){ + LazyReact react = LazyReactors.ioReact; return stream.withTaskExecutor(react.getExecutor()).withRetrier(react.getRetrier()); } /** @@ -74,48 +76,10 @@ default LazyFutureStream swithToIO(LazyFutureStream stream){ * @param stream to convert to CPU bound mode * @return LazyFutureStream in CPU bound mode */ - default LazyFutureStream swithToCPU(LazyFutureStream stream){ - LazyReact react = Reactors.cpuReact; + default LazyFutureStream switchToCPU(LazyFutureStream stream){ + LazyReact react = LazyReactors.cpuReact; return stream.withTaskExecutor(react.getExecutor()).withRetrier(react.getRetrier()); } - /** - * @return Stream builder for IO Bound Streams - */ - default MicroLazyReact ioStream(){ - return new MicroLazyReact(Reactors.ioReact); - } - /** - * @return Stream builder for CPU Bound Streams - */ - default MicroLazyReact cpuStream(){ - return new MicroLazyReact(Reactors.cpuReact); - } - /** - * Generate a multi-threaded LazyFutureStream that executes all tasks via - * a task executor between each stage (unless sync operator invoked). - * A preconfigured LazyReact builder that will be supplied as - * input to the function supplied. The user Function should create a LazyFutureStream with any - * business logic stages predefined. This method will handle elastic scaling and pooling of Executor - * services. User code should call a terminal op on the returned LazyFutureStream - * @see Reactive#run(com.aol.simple.react.stream.traits.LazyFutureStream) - * - * @param react Function that generates a LazyFutureStream from a LazyReact builder - * @return Generated LazyFutureStream - */ - default LazyFutureStream async(Function> react){ - LazyReact r =ParallelElasticPools.lazyReact.nextReactor().withAsync(true); - return react.apply( r) - .onFail(e->{ SequentialElasticPools.lazyReact.populate(r); throw e;}) - .peek(i->SequentialElasticPools.lazyReact.populate(r)); - - } - /** - * Convenience method that runs a LazyFutureStream without blocking the current thread - * @param stream to execute - */ - default void run(LazyFutureStream stream){ - stream.run(); - } } diff --git a/micro-reactive/src/main/java/com/aol/micro/server/reactive/Reactors.java b/micro-reactive/src/main/java/com/aol/micro/server/reactive/Reactors.java deleted file mode 100644 index 7c5c660f6..000000000 --- a/micro-reactive/src/main/java/com/aol/micro/server/reactive/Reactors.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.aol.micro.server.reactive; - -import com.aol.simple.react.stream.lazy.LazyReact; - -public class Reactors { - private static volatile int IOThreadPoolSize=100; - private static volatile int CPUBoundThreadPoolSize=Runtime.getRuntime().availableProcessors(); - private static void setIOThreadPoolSize(int size){ - IOThreadPoolSize=size; - } - private static void setCPUBoundThreadPoolSize(int size){ - CPUBoundThreadPoolSize=size; - } - public final static LazyReact ioReact = new LazyReact(IOThreadPoolSize,IOThreadPoolSize); - public final static LazyReact cpuReact = new LazyReact(CPUBoundThreadPoolSize,CPUBoundThreadPoolSize); -} diff --git a/micro-reactive/src/test/java/app/async/com/aol/micro/server/AsyncResource.java b/micro-reactive/src/test/java/app/async/com/aol/micro/server/AsyncResource.java index 3c778ddb0..497cfe1f6 100644 --- a/micro-reactive/src/test/java/app/async/com/aol/micro/server/AsyncResource.java +++ b/micro-reactive/src/test/java/app/async/com/aol/micro/server/AsyncResource.java @@ -12,7 +12,7 @@ import org.pcollections.PStack; import org.springframework.stereotype.Component; -import com.aol.cyclops.lambda.monads.SequenceM; +import com.aol.cyclops.sequence.SequenceM; import com.aol.micro.server.auto.discovery.RestResource; import com.aol.micro.server.testing.RestAgent; import com.aol.simple.react.stream.simple.SimpleReact; @@ -35,11 +35,12 @@ public class AsyncResource implements RestResource{ @Produces("text/plain") public void expensive(@Suspended AsyncResponse asyncResponse){ - LazyFutureStream.ofIterable(urls) + LazyFutureStream.lazyFutureStreamFromIterable(urls) .then(it->client.get(it)) .onFail(it -> "") .peek(it -> System.out.println(it)) + .convertToSimpleReact() .allOf(data -> { System.out.println(data); return asyncResponse.resume(SequenceM.fromIterable(data).join(";")); }); diff --git a/micro-reactive/src/test/java/app/reactive/pipes/com/aol/micro/server/ManyProducersToOneConsumerApp.java b/micro-reactive/src/test/java/app/reactive/pipes/com/aol/micro/server/ManyProducersToOneConsumerApp.java index 32223746a..166493bfe 100644 --- a/micro-reactive/src/test/java/app/reactive/pipes/com/aol/micro/server/ManyProducersToOneConsumerApp.java +++ b/micro-reactive/src/test/java/app/reactive/pipes/com/aol/micro/server/ManyProducersToOneConsumerApp.java @@ -2,7 +2,7 @@ import com.aol.micro.server.MicroserverApp; import com.aol.micro.server.reactive.Pipes; -import com.aol.simple.react.async.QueueFactories; +import com.aol.simple.react.async.factories.QueueFactories; import com.aol.simple.react.stream.traits.LazyFutureStream; /** @@ -15,9 +15,10 @@ public class ManyProducersToOneConsumerApp { public static void main(String[] args){ - LazyFutureStream stream = Pipes.register("test", QueueFactories. + Pipes.register("test", QueueFactories. boundedNonBlockingQueue(100) .build()); + LazyFutureStream stream = Pipes.futureStreamCPUBound("test"); stream.filter(it->it!=null).peek(System.out::println).run(); new MicroserverApp(()-> "simple-app").run(); } diff --git a/micro-reactive/src/test/java/app/reactive/pipes/com/aol/micro/server/PipesRunnerTest.java b/micro-reactive/src/test/java/app/reactive/pipes/com/aol/micro/server/PipesRunnerTest.java index 0eccc4a2b..3b6ca87f1 100644 --- a/micro-reactive/src/test/java/app/reactive/pipes/com/aol/micro/server/PipesRunnerTest.java +++ b/micro-reactive/src/test/java/app/reactive/pipes/com/aol/micro/server/PipesRunnerTest.java @@ -27,8 +27,8 @@ public class PipesRunnerTest { LazyFutureStream stream; @Before public void startServer(){ - stream = Pipes.register("test", new Queue()); - + Pipes.register("test", new Queue()); + stream = Pipes.futureStreamCPUBound("test"); server = new MicroserverApp(()->"simple-app"); server.start(); diff --git a/micro-reactive/src/test/java/com/aol/micro/server/reactive/PipesTest.java b/micro-reactive/src/test/java/com/aol/micro/server/reactive/PipesTest.java index cc02cf152..51f21e5af 100644 --- a/micro-reactive/src/test/java/com/aol/micro/server/reactive/PipesTest.java +++ b/micro-reactive/src/test/java/com/aol/micro/server/reactive/PipesTest.java @@ -37,18 +37,25 @@ public void testStream() { assertThat(Pipes.stream("hello").limit(1).toList(),equalTo(Arrays.asList("world"))); } @Test + public void testStreamSequential() { + Queue queue = new Queue(); + queue.add("world"); + Pipes.register("hello",queue); + assertThat(Pipes.stream("hello").limit(1).toList(),equalTo(Arrays.asList("world"))); + } + @Test public void testStreamIO() { Queue queue = new Queue(); queue.add("world"); Pipes.register("hello",queue); - assertThat(Pipes.streamIOBound("hello").limit(1).toList(),equalTo(Arrays.asList("world"))); + assertThat(Pipes.futureStreamIOBound("hello").limit(1).toList(),equalTo(Arrays.asList("world"))); } @Test public void testStreamCPU() { Queue queue = new Queue(); queue.add("world"); Pipes.register("hello",queue); - assertThat(Pipes.streamCPUBound("hello").limit(1).toList(),equalTo(Arrays.asList("world"))); + assertThat(Pipes.futureStreamCPUBound("hello").limit(1).toList(),equalTo(Arrays.asList("world"))); } @Test public void cpuBound() { diff --git a/micro-reactive/src/test/java/com/aol/micro/server/reactive/ReactiveTest.java b/micro-reactive/src/test/java/com/aol/micro/server/reactive/ReactiveTest.java index 9b1d69d87..7dcf2d47f 100644 --- a/micro-reactive/src/test/java/com/aol/micro/server/reactive/ReactiveTest.java +++ b/micro-reactive/src/test/java/com/aol/micro/server/reactive/ReactiveTest.java @@ -27,10 +27,11 @@ public void testNoPipe() { } @Test public void testPipe() { - LazyFutureStream stream = Pipes.register("hello", new Queue()); + Pipes.register("hello", new Queue()); + LazyFutureStream stream = Pipes.futureStreamIOBound("hello"); assertTrue(new MyResource().queue().isSuccess()); - assertThat(stream.limit(1).toList(),equalTo(Arrays.asList("world"))); + assertThat(stream.peek(System.out::println).limit(1).peek(System.out::println).toList(),equalTo(Arrays.asList("world"))); } @Test @@ -39,7 +40,7 @@ public void testCPUStreamIsInSyncMode() { MyResource resource = new MyResource(); LazyFutureStream lfs = resource.asyncCPUStream(); - assertFalse(lfs.isAsync()); + assertTrue(lfs.isAsync()); } @Test @@ -48,7 +49,7 @@ public void testIOStreamIsInSyncMode() { MyResource resource = new MyResource(); LazyFutureStream lfs = resource.asyncIOStream(); - assertFalse(lfs.isAsync()); + assertTrue(lfs.isAsync()); } @Test @@ -87,22 +88,7 @@ public void testAsyncCPU() { assertThat(resource.getVal(),equalTo("HELLO")); } - @Test - public void testAsync() { - MyResource resource = new MyResource(); - resource.async(); - - assertThat(resource.getVal(),equalTo("HELLO")); - - } - @Test - public void testSync() { - MyResource resource = new MyResource(); - resource.sync(); - - assertThat(resource.getVal(),equalTo("HELLO")); - - } + static class MyResource implements RestResource,Reactive{ @Getter String val; @@ -115,38 +101,33 @@ public LazyFutureStream asyncIOStream(){ List collection = new ArrayList<>(); for(int i=0;i<1000;i++) collection.add("hello"); - return this.ioStream().of(collection).map(String::toUpperCase); + return this.ioStreamBuilder().from(collection).map(String::toUpperCase); } public LazyFutureStream asyncCPUStream(){ List collection = new ArrayList<>(); for(int i=0;i<1000;i++) collection.add("hello"); - return this.cpuStream().of(collection).map(String::toUpperCase); + return this.cpuStreamBuilder().from(collection).map(String::toUpperCase); } public Set asyncIOFanout(){ List collection = new ArrayList<>(); for(int i=0;i<1000;i++) collection.add("hello"); - return this.ioStream().of(collection).map(str-> Thread.currentThread().getId()).toSet(); + return this.ioStreamBuilder().from(collection).map(str-> Thread.currentThread().getId()).toSet(); } public Set asyncCPUFanout(){ List collection = new ArrayList<>(); for(int i=0;i<1000;i++) collection.add("hello"); - return this.cpuStream().of(collection).map(str-> Thread.currentThread().getId()).toSet(); + return this.cpuStreamBuilder().from(collection).map(str-> Thread.currentThread().getId()).toSet(); } public void asyncIO(){ - this.ioStream().of("hello").map(String::toUpperCase).peek(str->val=str).block(); + this.ioStreamBuilder().of("hello").map(String::toUpperCase).peek(str->val=str).block(); } public void asyncCPU(){ - this.cpuStream().of("hello").map(String::toUpperCase).peek(str->val=str).block(); - } - public void async(){ - this.sync(lr->lr.of("hello").map(String::toUpperCase).peek(str->val=str)).block(); - } - public void sync(){ - this.sync(lr->lr.of("hello").map(String::toUpperCase).peek(str->val=str)).block(); + this.cpuStreamBuilder().of("hello").map(String::toUpperCase).peek(str->val=str).block(); } + } } diff --git a/micro-tutorial/src/main/java/app1/simple/MyRestEndPoint.java b/micro-tutorial/src/main/java/app1/simple/MyRestEndPoint.java index 0cee9a69b..f952921b9 100644 --- a/micro-tutorial/src/main/java/app1/simple/MyRestEndPoint.java +++ b/micro-tutorial/src/main/java/app1/simple/MyRestEndPoint.java @@ -19,7 +19,8 @@ import com.aol.micro.server.ip.tracker.QueryIPRetriever; import com.aol.micro.server.rest.jackson.JacksonUtil; import com.aol.micro.server.spring.datasource.jdbc.SQL; -import com.aol.simple.react.stream.traits.EagerFutureStream; +import com.aol.simple.react.stream.lazy.LazyReact; +import com.aol.simple.react.stream.traits.LazyFutureStream; import com.google.common.collect.ImmutableList; import com.google.common.eventbus.EventBus; import com.wordnik.swagger.annotations.Api; @@ -99,8 +100,7 @@ public ImmutableList findByName(@QueryParam("name")String name) { @Produces("application/json") @ApiOperation(value = "Do Expensive operation", response = List.class) public void expensiveDb(@Suspended AsyncResponse asyncResponse){ - EagerFutureStream.sequentialBuilder() - .react(()-> dataService.findAll("time")) + new LazyReact(1,10).react(()-> dataService.findAll("time")) .map(list -> JacksonUtil.serializeToJson(list)) .peek(asyncResponse::resume);