Skip to content

Commit

Permalink
Prepares the project for PubSub (renaming as kite and changing group …
Browse files Browse the repository at this point in the history
…to io.teris.kite)

- streamlines definitions,
- improves exceptions handling around missing resources or failed authentication,
- adds fasterxml serializer (incomplete due to gaps)
- improves documentation
  • Loading branch information
Oleg Sklyar committed Feb 2, 2018
1 parent 7554dcc commit 1329f9a
Show file tree
Hide file tree
Showing 90 changed files with 2,023 additions and 1,310 deletions.
520 changes: 316 additions & 204 deletions README.md

Large diffs are not rendered by default.

18 changes: 0 additions & 18 deletions amqp/build.gradle

This file was deleted.

43 changes: 0 additions & 43 deletions amqp/src/main/java/io/teris/rpc/amqp/AmqpServiceRouter.java

This file was deleted.

9 changes: 6 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ ext {
activemqBrokerModule = "org.apache.activemq:activemq-broker:5.15.2"
activemqClientModule = "org.apache.activemq:activemq-client:5.15.2"
activemqKahaDbModule = "org.apache.activemq:activemq-kahadb-store:5.15.2"
findbugsModule = "com.google.code.findbugs:annotations:3.0.1"
findbugsModule = "com.google.code.findbugs:jsr305:3.0.1"
geronimoJmsModule = "org.apache.geronimo.specs:geronimo-jms_1.1_spec:1.1.1"
gsonModule = "com.google.code.gson:gson:2.8.2"
jacksonDatabindModule = "com.fasterxml.jackson.core:jackson-databind:2.9.2"
jacksonDatatypeJSR310Module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.9.2"
logbackModule = "ch.qos.logback:logback-classic:1.2.3"
rabbitmqModule = "com.rabbitmq:amqp-client:5.1.2"
slf4jModule = "org.slf4j:slf4j-api:1.7.25"
vertxWebModule = "io.vertx:vertx-web:3.5.0"
vertxTestModule = "io.vertx:vertx-unit:3.5.0"
Expand All @@ -22,8 +25,8 @@ ext {
}

allprojects {
version = "0.4.0"
group = "io.teris.rpc"
version = "0.5.0"
group = "io.teris.kite"

plugins.apply(JacocoPlugin)

Expand Down
14 changes: 7 additions & 7 deletions integration-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ plugins.apply(JavaPlugin)
dependencies {
testCompile(junitModule)
testCompile(mockitoModule)
testCompile(project(":serialization-json"))

testCompile(project(":vertx"))

testCompile(project(":amqp"))

testCompile(project(":jms"))
testCompile(project(":kite"))
testCompile(project(":kite-gson"))
testCompile(project(":kite-fasterxml"))
testCompile(project(":kite-rpc"))
testCompile(project(":kite-rpc-vertx"))
testCompile(project(":kite-rpc-amqp"))
testCompile(project(":kite-rpc-jms"))
testCompile(activemqClientModule)
testCompile(activemqBrokerModule)
// amq persistence
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
* Copyright (c) teris.io & Oleg Sklyar, 2017. All rights reserved
*/

package io.teris.rpc;
package io.teris.kite.rpc;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
Expand All @@ -20,12 +22,25 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;

import io.teris.kite.Context;
import io.teris.kite.Serializer;
import io.teris.kite.gson.JsonSerializer;
import io.teris.kite.rpc.impl.AsyncServiceImpl;
import io.teris.kite.rpc.impl.SyncServiceImpl;
import io.teris.kite.rpc.impl.ThrowingServiceImpl;


public abstract class AbstractInvocationTestsuite {

@Rule
public ExpectedException exception = ExpectedException.none();

static int port;

static ServiceExporter exporter1;

static ServiceExporter exporter2;

static SyncService syncService;

static AsyncService asyncService;
Expand All @@ -36,6 +51,43 @@ public abstract class AbstractInvocationTestsuite {

private final int nrequests = 200;

static void preInit() {
while (true) {
try (ServerSocket socket = new ServerSocket((int) (49152 + Math.random() * (65535 - 49152)))) {
port = socket.getLocalPort();
break;
}
catch (IOException e) {
// repeat
}
}

Serializer serializer = JsonSerializer.builder().build();

exporter1 = ServiceExporter.serializer(serializer)
.preprocessor((context, data) -> {
CompletableFuture<Context> res = new CompletableFuture<>();
if (context.containsKey("x-technical-error")) {
res.completeExceptionally(new RuntimeException("BOOM"));
}
else if (context.containsKey("x-auth-error")) {
res.completeExceptionally(new AuthenticationException("BOOM"));
}
else {
res.complete(context);
}
return res;
})
.export(SyncService.class, new SyncServiceImpl("1"))
.export(AsyncService.class, new AsyncServiceImpl("2"))
.build();

exporter2 = ServiceExporter.serializer(serializer)
.export(ThrowingService.class, new ThrowingServiceImpl("3"))
.build();
}


@Test
public void roundtrip_single_sync_success() {
Context context = new Context();
Expand Down Expand Up @@ -75,7 +127,7 @@ public void roundtrip_dual_async_success() throws Exception {
}

@Test
public void roundtrip_exceptional_success() {
public void roundtrip_businessException() {
Context context = new Context();
CompletableFuture<Void> boomPromise = throwingService.boomThen(context);
try {
Expand All @@ -96,6 +148,49 @@ public void roundtrip_exceptional_success() {
}
}

@Test
public void roundtrip_technicalException() {
Context context = new Context();
context.put("x-technical-error", "yes");
try {
syncService.plus(context, Double.valueOf(341.2), Double.valueOf(359.3));
throw new AssertionError("unreachable code");
}
catch (TechnicalException ex) {
assertTrue(ex.getMessage().contains("BOOM"));
}
CompletableFuture<Double> promise = asyncService.plus(context, Double.valueOf(341.2), Double.valueOf(359.3));
try {
promise.get();
throw new AssertionError("unreachable code");
}
catch (ExecutionException | InterruptedException ex) {
assertTrue(ex.getMessage().contains("TechnicalException: BOOM"));
}
}

@Test
public void roundtrip_authenticationException() {
Context context = new Context();
context.put("x-auth-error", "yes");
try {
syncService.plus(context, Double.valueOf(341.2), Double.valueOf(359.3));
throw new AssertionError("unreachable code");
}
catch (AuthenticationException ex) {
assertEquals("BOOM", ex.getMessage());
}
CompletableFuture<Double> promise = asyncService.plus(context, Double.valueOf(341.2), Double.valueOf(359.3));
try {
promise.get();
throw new AssertionError("unreachable code");
}
catch (ExecutionException | InterruptedException ex) {
assertTrue(ex.getCause() instanceof AuthenticationException);
assertEquals("BOOM", ex.getCause().getMessage());
}
}

@Test
public void benchmark_async_invocationsNThreadsxMRequests() throws Exception {
List<Callable<Void>> callables = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
* Copyright (c) teris.io & Oleg Sklyar, 2017. All rights reserved
*/

package io.teris.rpc;
package io.teris.kite.rpc;

import java.util.concurrent.CompletableFuture;

import io.teris.kite.Context;
import io.teris.kite.Name;
import io.teris.kite.Service;


@Service
public interface AsyncService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
* Copyright (c) teris.io & Oleg Sklyar, 2017. All rights reserved
*/

package io.teris.rpc;
package io.teris.kite.rpc;

import io.teris.kite.Context;
import io.teris.kite.Name;
import io.teris.kite.Service;


@Service
public interface SyncService {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) teris.io & Oleg Sklyar, 2017. All rights reserved
*/

package io.teris.kite.rpc;

import org.junit.AfterClass;
import org.junit.BeforeClass;

import com.rabbitmq.client.ConnectionFactory;

import io.teris.kite.gson.JsonSerializer;
import io.teris.kite.rpc.amqp.AmqpServiceInvoker;
import io.teris.kite.rpc.amqp.AmqpServiceExporter;

//@Ignore
public class TestAmqpInvocationRoundtrip extends AbstractInvocationTestsuite {

private static final String requestExchange = "RPC";

private static AmqpServiceInvoker invoker;

private static AmqpServiceExporter provider;

@BeforeClass
public static void init() throws Exception {
preInit();
port = 5672;

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(port);

invoker = AmqpServiceInvoker.connectionFactory(connectionFactory)
.requestExchange(requestExchange)
.start();

ServiceFactory factory = ServiceFactory.invoker(invoker)
.serializer(JsonSerializer.builder().build())
.build();

syncService = factory.newInstance(SyncService.class);
asyncService = factory.newInstance(AsyncService.class);
throwingService = factory.newInstance(ThrowingService.class);

provider = AmqpServiceExporter.connectionFactory(connectionFactory)
.requestExchange(requestExchange)
.export(exporter1)
.export(exporter2)
.start();
}

@AfterClass
public static void teardown() throws Exception {
invoker.close().get();
provider.close().get();
}
}
Loading

0 comments on commit 1329f9a

Please sign in to comment.