diff --git a/tracing/src/main/java/com/palantir/tracing/Tracers.java b/tracing/src/main/java/com/palantir/tracing/Tracers.java index a5859af92..dc059050e 100644 --- a/tracing/src/main/java/com/palantir/tracing/Tracers.java +++ b/tracing/src/main/java/com/palantir/tracing/Tracers.java @@ -16,11 +16,13 @@ package com.palantir.tracing; +import com.google.common.util.concurrent.FutureCallback; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; +import org.checkerframework.checker.nullness.compatqual.NullableDecl; /** Utility methods for making {@link ExecutorService} and {@link Runnable} instances tracing-aware. */ public final class Tracers { @@ -133,6 +135,11 @@ public static Runnable wrap(Runnable delegate) { return new TracingAwareRunnable(Optional.empty(), delegate); } + /** Like {@link #wrap(String, Callable)}, but for Guava's FutureCallback. */ + public static FutureCallback wrap(String operation, FutureCallback delegate) { + return new TracingAwareFutureCallback<>(operation, delegate); + } + /** * Like {@link #wrap(Runnable)}, but using the given {@link String operation} is used to create a span for the * execution. @@ -352,6 +359,37 @@ public void run() { } } + /** + * Wrap a given guava future callback such that its execution operated with the {@link Trace thread-local Trace} of + * the thread the constructs the {@link TracingAwareFutureCallback} instance rather than the thread that executes + * the callback. + */ + private static class TracingAwareFutureCallback implements FutureCallback { + private final FutureCallback delegate; + private DeferredTracer deferredTracer; + + TracingAwareFutureCallback(String operation, FutureCallback delegate) { + this.delegate = delegate; + this.deferredTracer = new DeferredTracer(operation); + } + + @Override + public void onSuccess(@NullableDecl V result) { + deferredTracer.withTrace(() -> { + delegate.onSuccess(result); + return null; + }); + } + + @Override + public void onFailure(Throwable throwable) { + deferredTracer.withTrace(() -> { + delegate.onFailure(throwable); + return null; + }); + } + } + public interface ThrowingCallable { T call() throws E; } diff --git a/tracing/src/test/java/com/palantir/tracing/TracersTest.java b/tracing/src/test/java/com/palantir/tracing/TracersTest.java index 948213a09..955b748bd 100644 --- a/tracing/src/test/java/com/palantir/tracing/TracersTest.java +++ b/tracing/src/test/java/com/palantir/tracing/TracersTest.java @@ -20,18 +20,27 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.palantir.tracing.api.OpenSpan; +import com.palantir.tracing.api.Span; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.checkerframework.checker.nullness.compatqual.NullableDecl; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -500,6 +509,150 @@ public void testWrapRunnableWithAlternateTraceId_traceStateRestoredToCleared() { assertThat(Tracer.hasTraceId()).isFalse(); } + @Test + public void testWrappingFutureCallback_futureCallbackTraceIsIsolated_success() throws Exception { + CompletionAwareFutureCallback futureCallback = createFutureCallbackWithFunction( + () -> Tracer.startSpan("inside")); + + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture future = listeningExecutorService.submit(() -> null); + + Tracer.startSpan("outside"); + FutureCallback callback = Tracers.wrap("callback", futureCallback); + // Using direct executor to use same thread to verify callback doesn't modify thread state + Futures.addCallback(future, callback, MoreExecutors.directExecutor()); + + assertThat(futureCallback.waitForCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(futureCallback.wasSuccess()).isTrue(); + future.get(); + + assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("outside"); + } + + @Test + public void testWrappingFutureCallback_futureCallbackTraceIsIsolated_failure() throws Exception { + CompletionAwareFutureCallback futureCallback = createFutureCallbackWithFunction( + () -> Tracer.startSpan("inside")); + + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture future = listeningExecutorService.submit(() -> { + throw new IllegalStateException(); + }); + + Tracer.startSpan("outside"); + FutureCallback callback = Tracers.wrap("callback", futureCallback); + // Using direct executor to use same thread to verify callback doesn't modify thread state + Futures.addCallback(future, callback, MoreExecutors.directExecutor()); + + assertThat(futureCallback.waitForCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(futureCallback.wasSuccess()).isFalse(); + assertThatThrownBy(future::get).isInstanceOf(ExecutionException.class); + + assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("outside"); + } + + @Test + public void testWrappingFutureCallback_traceStateShowsCorrectlyParentedNewOperation_success_sameThread() + throws Exception { + AtomicReference span = new AtomicReference<>(); + CompletionAwareFutureCallback futureCallback = createFutureCallbackWithFunction( + () -> span.set(Tracer.completeSpan().get())); + + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture future = listeningExecutorService.submit(() -> null); + + OpenSpan beforeSpan = Tracer.startSpan("before-construction"); + FutureCallback callback = Tracers.wrap("callback", futureCallback); + Tracer.startSpan("after-construction"); + Futures.addCallback(future, callback, MoreExecutors.directExecutor()); + + assertThat(futureCallback.waitForCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(futureCallback.wasSuccess()).isTrue(); + future.get(); + + assertThat(span.get().getOperation()).isEqualTo("callback"); + assertThat(span.get().getParentSpanId().get()).isEqualTo(beforeSpan.getSpanId()); + } + + @Test + public void testWrappingFutureCallback_traceStateShowsCorrectlyParentedNewOperation_success_differentThread() + throws Exception { + AtomicReference span = new AtomicReference<>(); + CompletionAwareFutureCallback futureCallback = createFutureCallbackWithFunction( + () -> span.set(Tracer.completeSpan().get())); + + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture future = listeningExecutorService.submit(() -> null); + + OpenSpan beforeSpan = Tracer.startSpan("before-construction"); + FutureCallback callback = Tracers.wrap("callback", futureCallback); + Tracer.startSpan("after-construction"); + Futures.addCallback(future, callback, Executors.newSingleThreadExecutor()); + + assertThat(futureCallback.waitForCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(futureCallback.wasSuccess()).isTrue(); + future.get(); + + assertThat(span.get().getOperation()).isEqualTo("callback"); + assertThat(span.get().getParentSpanId().get()).isEqualTo(beforeSpan.getSpanId()); + } + + @Test + public void testWrappingFutureCallback_traceStateShowsCorrectlyParentedNewOperation_failure_sameThread() + throws Exception { + AtomicReference span = new AtomicReference<>(); + CompletionAwareFutureCallback futureCallback = createFutureCallbackWithFunction( + () -> span.set(Tracer.completeSpan().get())); + + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture future = listeningExecutorService.submit(() -> { + throw new IllegalStateException(); + }); + + OpenSpan beforeSpan = Tracer.startSpan("before-construction"); + FutureCallback callback = Tracers.wrap("callback", futureCallback); + Tracer.startSpan("after-construction"); + Futures.addCallback(future, callback, MoreExecutors.directExecutor()); + + assertThat(futureCallback.waitForCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(futureCallback.wasSuccess()).isFalse(); + assertThatThrownBy(future::get).isInstanceOf(ExecutionException.class); + + assertThat(span.get().getOperation()).isEqualTo("callback"); + assertThat(span.get().getParentSpanId().get()).isEqualTo(beforeSpan.getSpanId()); + } + + @Test + public void testWrappingFutureCallback_traceStateShowsCorrectlyParentedNewOperation_failure_differentThread() + throws Exception { + AtomicReference span = new AtomicReference<>(); + CompletionAwareFutureCallback futureCallback = createFutureCallbackWithFunction( + () -> span.set(Tracer.completeSpan().get())); + + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture future = listeningExecutorService.submit(() -> { + throw new IllegalStateException(); + }); + + OpenSpan beforeSpan = Tracer.startSpan("before-construction"); + FutureCallback callback = Tracers.wrap("callback", futureCallback); + Tracer.startSpan("after-construction"); + Futures.addCallback(future, callback, Executors.newSingleThreadExecutor()); + + assertThat(futureCallback.waitForCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(futureCallback.wasSuccess()).isFalse(); + assertThatThrownBy(future::get).isInstanceOf(ExecutionException.class); + + assertThat(span.get().getOperation()).isEqualTo("callback"); + assertThat(span.get().getParentSpanId().get()).isEqualTo(beforeSpan.getSpanId()); + } + @Test public void testTraceIdGeneration() throws Exception { assertThat(Tracers.randomId()).hasSize(16); // fails with p=1/16 if generated string is not padded @@ -615,4 +768,71 @@ private static List getCurrentTrace() { return Lists.reverse(spans); }).orElse(Collections.emptyList()); } + + private static CompletionAwareFutureCallback createFutureCallbackWithFunction(Runnable runnable) { + return new CompletionAwareFutureCallback<>(new FutureCallback() { + @Override + public void onSuccess(@NullableDecl V result) { + runnable.run(); + } + + @Override + public void onFailure(Throwable throwable) { + runnable.run(); + } + }); + } + + /** + * Defines an interface for providing information on completion of execution and the ability to wait on completion + * for use with tests involving FutureCallback. + */ + private interface CompletionAware { + boolean waitForCompletion(long timeout, TimeUnit unit) throws InterruptedException; + boolean wasSuccess(); + } + + /** + * There is no guarantee when a FutureCallback will actually run. In order to verify state from inside the + * FutureCallback, a CountDownLatch is used to track when the FutureCallback has finished executing and to provide + * a condition to wait on. + */ + private static final class CompletionAwareFutureCallback implements FutureCallback, CompletionAware { + private final CountDownLatch latch = new CountDownLatch(1); + private final FutureCallback delegate; + private AtomicReference success = new AtomicReference<>(Boolean.FALSE); + + CompletionAwareFutureCallback(FutureCallback delegate) { + this.delegate = delegate; + } + + @Override + public void onSuccess(@NullableDecl V result) { + try { + success.set(Boolean.TRUE); + delegate.onSuccess(result); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Throwable throwable) { + try { + delegate.onFailure(throwable); + } finally { + latch.countDown(); + } + } + + @Override + public boolean waitForCompletion(long timeout, TimeUnit unit) throws InterruptedException { + return latch.await(timeout, unit); + } + + @Override + public boolean wasSuccess() { + return success.get(); + } + } }