Skip to content

Commit

Permalink
tweaks to ckozak's DetachedSpan & demo usage (#247)
Browse files Browse the repository at this point in the history
  • Loading branch information
iamdanfox authored and bulldozer-bot[bot] committed Sep 2, 2019
1 parent d074362 commit bc20b95
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 267 deletions.
177 changes: 111 additions & 66 deletions tracing-demos/src/test/java/com/palantir/tracing/TracingDemos.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
Expand All @@ -48,16 +48,17 @@ void handles_async_spans() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
CountDownLatch countDownLatch = new CountDownLatch(numTasks);

try (CloseableTracer root = CloseableTracer.startSpan("root")) {
IntStream.range(0, numTasks).forEach(i -> {
// DetachedSpan detachedSpan = DetachedSpan.start("task-queue-time" + i);
executorService.submit(() -> {
// detachedSpan.close();
IntStream.range(0, numTasks).forEach(i -> {
Tracer.clearCurrentTrace(); // just pretending all these tasks are on a fresh request

DetachedSpan crossThread = DetachedSpan.start("task-queue-time" + i);
executorService.submit(() -> {
try (CloseableSpan t = crossThread.completeAndStartChild("task" + i)) {
emit_nested_spans();
countDownLatch.countDown();
});
}
});
}
});

assertThat(countDownLatch.await(expectedDurationMillis + 1000, TimeUnit.MILLISECONDS)).isTrue();
}
Expand All @@ -67,29 +68,42 @@ void handles_async_spans() throws Exception {
void async_future() throws InterruptedException {
int numThreads = 2;
int numCallbacks = 10;
ExecutorService executorService = Tracers.wrap(Executors.newFixedThreadPool(numThreads));
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
final SettableFuture<Object> future = SettableFuture.create();
CountDownLatch latch = new CountDownLatch(numCallbacks);

IntStream.range(0, numCallbacks).forEach(i ->
try (CloseableTracer tracer = CloseableTracer.startSpan("I am a root span")) {
String traceId = Tracer.getTraceId();

IntStream.range(0, numCallbacks).forEach(i -> {

DetachedSpan span = DetachedSpan.start("callback-pending" + i + " (cross thread span)");

Futures.addCallback(future, new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object result) {
sleep(10, "success" + i);
latch.countDown();
assertThat(Tracer.hasTraceId()).isFalse();
try (CloseableSpan tracer = span.completeAndStartChild("success" + i)) {
assertThat(Tracer.getTraceId()).isEqualTo(traceId);
sleep(10);
latch.countDown();
}
}

@Override
public void onFailure(Throwable throwable) {
Assertions.fail();
}
}, executorService));
}, executorService);
});

executorService.submit(() -> {
try (CloseableTracer root = CloseableTracer.startSpan("root")) {
future.set(null);
try (CloseableTracer root = CloseableTracer.startSpan("bbb")) {
executorService.submit(() -> {
future.set(null);
});
}
});

}
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}

Expand All @@ -98,28 +112,42 @@ public void onFailure(Throwable throwable) {
void multi_producer_single_consumer() throws InterruptedException {
int numProducers = 2;
int numElem = 20;
PriorityBlockingQueue<String> work = new PriorityBlockingQueue<>();
ArrayBlockingQueue<QueuedWork> work = new ArrayBlockingQueue<QueuedWork>(numElem);

CountDownLatch submitLatch = new CountDownLatch(numElem);
CountDownLatch consumeLatch = new CountDownLatch(numElem);
ExecutorService producerExecutorService = Tracers.wrap(Executors.newFixedThreadPool(numProducers));
ExecutorService consumerExecutorService = Tracers.wrap(Executors.newFixedThreadPool(1));
ExecutorService producerExecutorService = Executors.newFixedThreadPool(numProducers);
ExecutorService consumerExecutorService = Executors.newFixedThreadPool(1);

try (CloseableTracer submit = CloseableTracer.startSpan("submit")) {
IntStream.range(0, numElem).forEach(i -> {

Tracer.clearCurrentTrace(); // just pretending all these tasks are on a fresh request

DetachedSpan span = DetachedSpan.start("callback-pending" + i + " (cross thread span)");
producerExecutorService.submit(() -> {
try (CloseableTracer closeableTracer = CloseableTracer.startSpan("submit-work" + i)) {
work.add("work" + i);
submitLatch.countDown();
}
work.add(new QueuedWork() {
@Override
public String name() {
return "work" + i;
}

@Override
public DetachedSpan span() {
return span;
}
});
submitLatch.countDown();
});
});
assertThat(submitLatch.await(10, TimeUnit.SECONDS)).isTrue();

consumerExecutorService.submit(() -> {
for (int i = 0; i < numElem; i++) {
String poll = work.take();
sleep(10, "processing" + poll);
QueuedWork queuedWork = work.take();
try (CloseableSpan span = queuedWork.span().completeAndStartChild("consume" + queuedWork.name())) {
Thread.sleep(10);
}
consumeLatch.countDown();
}
return null;
Expand All @@ -129,35 +157,30 @@ void multi_producer_single_consumer() throws InterruptedException {
}

@Test
@TestTracing(snapshot = true, layout = LayoutStrategy.SPLIT_BY_TRACE)
@TestTracing(snapshot = true, layout = LayoutStrategy.CHRONOLOGICAL)
void backoffs_on_a_scheduled_executor() throws InterruptedException {
ScheduledExecutorService executor = Tracers.wrap(Executors.newScheduledThreadPool(2));
CountDownLatch latch = new CountDownLatch(1);

try (CloseableTracer t = CloseableTracer.startSpan("some-request")) {
executor.execute(() -> {
// first attempt at a network call
sleep(100, "first attempt");

executor.schedule(() -> {
// attempt number 2
sleep(100, "second attempt");

executor.schedule(() -> {
// attempt number 3
sleep(100, "final attempt");
DetachedSpan overall = DetachedSpan.start("overall request");
executor.execute(() -> {

latch.countDown();
}, 100, TimeUnit.MILLISECONDS);
try (CloseableTracer t = CloseableTracer.startSpan("first network call (pretending this fails)")) {
sleep(100);
}

sleep(200, "second tidying");
}, 100, TimeUnit.MILLISECONDS);
DetachedSpan backoff = overall.childDetachedSpan("backoff");
executor.schedule(() -> {
try (CloseableSpan attempt2 = backoff.completeAndStartChild("secondAttempt")) {
sleep(100);
overall.complete();
latch.countDown();

sleep(200, "first tidying");
});
}
}, 20, TimeUnit.MILLISECONDS);
});

assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();

MoreExecutors.shutdownAndAwaitTermination(executor, 1, TimeUnit.SECONDS);
}
Expand All @@ -167,54 +190,76 @@ void backoffs_on_a_scheduled_executor() throws InterruptedException {
@SuppressWarnings("CheckReturnValue")
void transformed_future() throws InterruptedException {
SettableFuture<Object> future = SettableFuture.create();
ScheduledExecutorService executor = Tracers.wrap(Executors.newScheduledThreadPool(2));
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
CountDownLatch latch = new CountDownLatch(1);

DetachedSpan foo = DetachedSpan.start("foo");
FluentFuture.from(future)
.transform(result -> {
sleep(100, "first");
return result;
try (CloseableSpan t = foo.childSpan("first transform")) {
sleep(1000);
return result;
}
}, executor)
.transform(result -> {
sleep(100, "second");
latch.countDown();
return result;
try (CloseableSpan t = foo.childSpan("second transform")) {
sleep(1000);
latch.countDown();
return result;
}
}, executor)
.addCallback(new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object result) {
foo.complete();
}

@Override
public void onFailure(Throwable throwable) {
foo.complete();
}
}, executor);

executor.submit(() -> {
try (CloseableTracer root = CloseableTracer.startSpan("root")) {
future.set(null);
}
future.set(null);
});

assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}

private static void sleep(int millis, String operation) {
try (CloseableTracer t = CloseableTracer.startSpan(operation)) {
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException("dont care", e);
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

private static void sleep(int millis) {
sleep(millis, "sleep " + millis);
private static void sleepSpan(int millis) {
try (CloseableTracer t = CloseableTracer.startSpan("sleep " + millis)) {
sleep(millis);
}
}

@SuppressWarnings("NestedTryDepth")
private static void emit_nested_spans() {
try (CloseableTracer root = CloseableTracer.startSpan("root")) {
try (CloseableTracer root = CloseableTracer.startSpan("emit_nested_spans")) {
try (CloseableTracer first = CloseableTracer.startSpan("first")) {
sleep(100);
sleepSpan(100);
try (CloseableTracer nested = CloseableTracer.startSpan("nested")) {
sleep(90);
sleepSpan(90);
}
sleep(10);
sleepSpan(10);
}
try (CloseableTracer second = CloseableTracer.startSpan("second")) {
sleep(100);
sleepSpan(100);
}
}
}

interface QueuedWork {
String name();
DetachedSpan span();
}
}
Loading

0 comments on commit bc20b95

Please sign in to comment.