Skip to content

Commit 80a3bd0

Browse files
authored
Add tracing for multi stream append (#338)
1 parent 891abe8 commit 80a3bd0

File tree

8 files changed

+247
-17
lines changed

8 files changed

+247
-17
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ dependencies {
8484
testImplementation 'org.slf4j:slf4j-simple:2.0.17'
8585
testImplementation "io.opentelemetry:opentelemetry-sdk"
8686
testImplementation "io.opentelemetry:opentelemetry-sdk-testing"
87-
testImplementation 'io.opentelemetry:opentelemetry-exporter-logging:1.38.0'
88-
testImplementation 'io.opentelemetry:opentelemetry-exporter-otlp-trace:1.14.0'
87+
testImplementation "io.opentelemetry:opentelemetry-exporter-logging"
88+
testImplementation "io.opentelemetry:opentelemetry-exporter-otlp"
8989
}
9090

9191
tasks.withType(Test).configureEach {

src/main/java/io/kurrent/dbclient/AppendStreamFailure.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.kurrent.dbclient;
22

3+
import io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase;
4+
35
public class AppendStreamFailure {
46
private final io.kurrentdb.protocol.streams.v2.AppendStreamFailure inner;
57

@@ -12,21 +14,21 @@ public String getStreamName() {
1214
}
1315

1416
public void visit(MultiAppendStreamErrorVisitor visitor) {
15-
if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.STREAM_REVISION_CONFLICT) {
17+
if (this.inner.getErrorCase() == ErrorCase.STREAM_REVISION_CONFLICT) {
1618
visitor.onWrongExpectedRevision(this.inner.getStreamRevisionConflict().getStreamRevision());
1719
return;
1820
}
1921

20-
if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.ACCESS_DENIED) {
22+
if (this.inner.getErrorCase() == ErrorCase.ACCESS_DENIED) {
2123
visitor.onAccessDenied(this.inner.getAccessDenied());
2224
}
2325

24-
if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.STREAM_DELETED) {
26+
if (this.inner.getErrorCase() == ErrorCase.STREAM_DELETED) {
2527
visitor.onStreamDeleted();
2628
return;
2729
}
2830

29-
if (this.inner.getErrorCase() == io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) {
31+
if (this.inner.getErrorCase() == ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED) {
3032
visitor.onTransactionMaxSizeExceeded(this.inner.getTransactionMaxSizeExceeded().getMaxSize());
3133
return;
3234
}

src/main/java/io/kurrent/dbclient/ClientTelemetry.java

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@
55
import com.fasterxml.jackson.databind.node.ObjectNode;
66
import io.grpc.ManagedChannel;
77
import io.opentelemetry.api.GlobalOpenTelemetry;
8+
import io.opentelemetry.api.common.AttributeKey;
9+
import io.opentelemetry.api.common.Attributes;
810
import io.opentelemetry.api.trace.*;
911
import io.opentelemetry.context.Context;
1012
import io.opentelemetry.context.Scope;
1113

12-
import java.util.ArrayList;
13-
import java.util.List;
14-
import java.util.Map;
15-
import java.util.Objects;
14+
import java.util.*;
1615
import java.util.concurrent.CompletableFuture;
1716
import java.util.concurrent.CompletionException;
1817
import java.util.function.BiFunction;
1918

19+
import static io.kurrentdb.protocol.streams.v2.AppendStreamFailure.*;
20+
2021
class ClientTelemetry {
2122
private static final ClientTelemetryTags DEFAULT_ATTRIBUTES = new ClientTelemetryTags() {{
2223
put(ClientTelemetryAttributes.Database.SYSTEM, ClientTelemetryConstants.INSTRUMENTATION_NAME);
@@ -122,6 +123,93 @@ static CompletableFuture<WriteResult> traceAppend(
122123
}
123124
}
124125

126+
static CompletableFuture<MultiAppendWriteResult> traceMultiStreamAppend(
127+
BiFunction<WorkItemArgs, Iterator<AppendStreamRequest>, CompletableFuture<MultiAppendWriteResult>> multiAppendOperation,
128+
WorkItemArgs args,
129+
Iterator<AppendStreamRequest> requests, KurrentDBClientSettings settings) {
130+
131+
List<AppendStreamRequest> requestsWithTracing = new ArrayList<>();
132+
133+
Span span = createSpan(
134+
ClientTelemetryConstants.Operations.MULTI_APPEND,
135+
SpanKind.CLIENT,
136+
null,
137+
ClientTelemetryTags.builder()
138+
.withServerTagsFromGrpcChannel(args.getChannel())
139+
.withServerTagsFromClientSettings(settings)
140+
.withOptionalDatabaseUserTag(settings.getDefaultCredentials())
141+
.build());
142+
143+
while (requests.hasNext()) {
144+
AppendStreamRequest request = requests.next();
145+
146+
List<EventData> eventsWithTracing = new ArrayList<>();
147+
while (request.getEvents().hasNext())
148+
eventsWithTracing.add(request.getEvents().next());
149+
150+
List<EventData> tracedEvents = tryInjectTracingContext(span, eventsWithTracing);
151+
152+
requestsWithTracing.add(new AppendStreamRequest(
153+
request.getStreamName(),
154+
tracedEvents.iterator(),
155+
request.getExpectedState()
156+
));
157+
}
158+
159+
return multiAppendOperation.apply(args, requestsWithTracing.iterator())
160+
.handle((result, throwable) -> {
161+
if (throwable != null) {
162+
span.setStatus(StatusCode.ERROR);
163+
span.recordException(throwable);
164+
span.end();
165+
throw new CompletionException(throwable);
166+
} else {
167+
if (result.getFailures().isPresent()) {
168+
for (AppendStreamFailure failure : result.getFailures().get()) {
169+
failure.visit(new MultiAppendStreamErrorVisitor() {
170+
@Override
171+
public void onWrongExpectedRevision(long streamRevision) {
172+
span.addEvent("exception", Attributes.of(
173+
AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_REVISION_CONFLICT.toString(),
174+
AttributeKey.longKey("exception.revision"), streamRevision
175+
));
176+
}
177+
178+
@Override
179+
public void onAccessDenied(io.kurrentdb.protocol.streams.v2.ErrorDetails.AccessDenied detail) {
180+
span.addEvent("exception", Attributes.of(
181+
AttributeKey.stringKey("exception.type"), ErrorCase.ACCESS_DENIED.toString()
182+
));
183+
}
184+
185+
@Override
186+
public void onStreamDeleted() {
187+
span.addEvent("exception", Attributes.of(
188+
AttributeKey.stringKey("exception.type"), ErrorCase.STREAM_DELETED.toString()
189+
));
190+
}
191+
192+
@Override
193+
public void onTransactionMaxSizeExceeded(int maxSize) {
194+
span.addEvent("exception", Attributes.of(
195+
AttributeKey.stringKey("exception.type"), ErrorCase.TRANSACTION_MAX_SIZE_EXCEEDED.toString(),
196+
AttributeKey.longKey("exception.maxSize"), (long) maxSize
197+
));
198+
}
199+
});
200+
}
201+
span.setStatus(StatusCode.ERROR);
202+
span.end();
203+
} else if (result.getSuccesses().isPresent()) {
204+
span.setStatus(StatusCode.OK);
205+
span.end();
206+
}
207+
208+
return result;
209+
}
210+
});
211+
}
212+
125213
static void traceSubscribe(Runnable tracedOperation, String subscriptionId, ManagedChannel channel,
126214
KurrentDBClientSettings settings,
127215
UserCredentials optionalCallCredentials, RecordedEvent event) {

src/main/java/io/kurrent/dbclient/ClientTelemetryConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public static class Metadata {
1010

1111
public static class Operations {
1212
public static final String APPEND = "streams.append";
13+
public static final String MULTI_APPEND = "streams.multi-append";
1314
public static final String SUBSCRIBE = "streams.subscribe";
1415
}
1516
}

src/main/java/io/kurrent/dbclient/MultiStreamAppend.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,14 @@ public MultiStreamAppend(GrpcClient client, Iterator<AppendStreamRequest> reques
2525
}
2626

2727
public CompletableFuture<MultiAppendWriteResult> execute() {
28-
return this.client.runWithArgs(this::append);
28+
return this.client.runWithArgs(args -> ClientTelemetry.traceMultiStreamAppend(
29+
this::append,
30+
args,
31+
this.requests,
32+
this.client.getSettings()));
2933
}
3034

31-
private CompletableFuture<MultiAppendWriteResult> append(WorkItemArgs args) {
35+
private CompletableFuture<MultiAppendWriteResult> append(WorkItemArgs args, Iterator<AppendStreamRequest> requests) {
3236
CompletableFuture<MultiAppendWriteResult> result = new CompletableFuture<>();
3337

3438
if (!args.supportFeature(FeatureFlags.MULTI_STREAM_APPEND)) {
@@ -40,8 +44,8 @@ private CompletableFuture<MultiAppendWriteResult> append(WorkItemArgs args) {
4044
StreamObserver<io.kurrentdb.protocol.streams.v2.AppendStreamRequest> requestStream = client.multiStreamAppendSession(GrpcUtils.convertSingleResponse(result, this::onResponse));
4145

4246
try {
43-
while (this.requests.hasNext()) {
44-
AppendStreamRequest request = this.requests.next();
47+
while (requests.hasNext()) {
48+
AppendStreamRequest request = requests.next();
4549
io.kurrentdb.protocol.streams.v2.AppendStreamRequest.Builder builder = io.kurrentdb.protocol.streams.v2.AppendStreamRequest.newBuilder()
4650
.setExpectedRevision(request.getExpectedState().toRawLong())
4751
.setStream(request.getStreamName());

src/test/java/io/kurrent/dbclient/TelemetryTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@
66
import io.kurrent.dbclient.telemetry.TracingContextInjectionTests;
77
import io.opentelemetry.api.GlobalOpenTelemetry;
88
import io.opentelemetry.api.common.AttributeKey;
9+
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
910
import io.opentelemetry.sdk.OpenTelemetrySdk;
11+
import io.opentelemetry.sdk.resources.Resource;
1012
import io.opentelemetry.sdk.trace.ReadableSpan;
1113
import io.opentelemetry.sdk.trace.SdkTracerProvider;
14+
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
1215
import org.junit.jupiter.api.AfterAll;
1316
import org.junit.jupiter.api.BeforeAll;
1417
import org.junit.jupiter.api.BeforeEach;
@@ -21,6 +24,8 @@
2124
import java.util.function.Consumer;
2225
import java.util.stream.Collectors;
2326

27+
import static io.opentelemetry.semconv.ServiceAttributes.SERVICE_NAME;
28+
2429
public class TelemetryTests implements StreamsTracingInstrumentationTests, PersistentSubscriptionsTracingInstrumentationTests, TracingContextInjectionTests {
2530
static private Database database;
2631
static private Logger logger;
@@ -39,10 +44,20 @@ public void beforeEach() {
3944
GlobalOpenTelemetry.resetForTest();
4045
spanEndedHooks.add(recordedSpans::add);
4146

47+
OtlpGrpcSpanExporter otlpExporter = OtlpGrpcSpanExporter.builder()
48+
.setEndpoint("http://localhost:4317")
49+
.build();
50+
51+
Resource resource = Resource.getDefault().toBuilder()
52+
.put(SERVICE_NAME, "kurrentdb")
53+
.build();
54+
4255
OpenTelemetrySdk.builder()
4356
.setTracerProvider(SdkTracerProvider
4457
.builder()
4558
.addSpanProcessor(new SpanProcessorSpy(spanEndedHooks))
59+
.addSpanProcessor(SimpleSpanProcessor.create(otlpExporter))
60+
.setResource(resource)
4661
.build())
4762
.buildAndRegisterGlobal();
4863
}

src/test/java/io/kurrent/dbclient/databases/DockerContainerDatabase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import java.util.Map;
1313

1414
public class DockerContainerDatabase extends GenericContainer<DockerContainerDatabase> implements Database {
15-
public static final String DEFAULT_IMAGE = "docker.kurrent.io/eventstore/eventstoredb-ee:lts";
15+
public static final String DEFAULT_IMAGE = "docker.cloudsmith.io/eventstore/kurrent-staging/kurrentdb:ci";
1616

1717
public static class Builder {
1818
String image;

src/test/java/io/kurrent/dbclient/telemetry/StreamsTracingInstrumentationTests.java

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,18 @@
33
import io.kurrent.dbclient.*;
44
import com.fasterxml.jackson.databind.JsonNode;
55
import com.fasterxml.jackson.databind.node.ObjectNode;
6+
import io.kurrentdb.protocol.streams.v2.AppendStreamFailure.ErrorCase;
7+
import io.opentelemetry.api.common.AttributeKey;
68
import io.opentelemetry.api.trace.SpanContext;
9+
import io.opentelemetry.api.trace.SpanKind;
10+
import io.opentelemetry.api.trace.StatusCode;
711
import io.opentelemetry.sdk.trace.ReadableSpan;
812
import org.junit.jupiter.api.Assertions;
13+
import org.junit.jupiter.api.Assumptions;
914
import org.junit.jupiter.api.Test;
1015
import org.junit.jupiter.api.Timeout;
1116

12-
import java.util.List;
13-
import java.util.UUID;
17+
import java.util.*;
1418
import java.util.concurrent.CountDownLatch;
1519
import java.util.concurrent.ExecutionException;
1620
import java.util.concurrent.TimeUnit;
@@ -289,4 +293,120 @@ public void onEvent(Subscription subscription, ResolvedEvent event) {
289293
List<ReadableSpan> subscribeSpans = getSpansForOperation(ClientTelemetryConstants.Operations.SUBSCRIBE);
290294
Assertions.assertTrue(subscribeSpans.isEmpty(), "No spans should be recorded for deleted events");
291295
}
296+
297+
@Test
298+
default void testMultiStreamAppendIsInstrumentedWithTracingAsExpected() throws Throwable {
299+
KurrentDBClient client = getDefaultClient();
300+
301+
Optional<ServerVersion> version = client.getServerVersion().get();
302+
303+
Assumptions.assumeTrue(
304+
version.isPresent() && version.get().isGreaterOrEqualThan(25, 0),
305+
"Multi-stream append is not supported server versions below 25.0.0"
306+
);
307+
308+
String streamName1 = generateName();
309+
String streamName2 = generateName();
310+
311+
EventData event1 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
312+
.eventId(UUID.randomUUID())
313+
.build();
314+
315+
EventData event2 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
316+
.eventId(UUID.randomUUID())
317+
.build();
318+
319+
AppendStreamRequest request1 = new AppendStreamRequest(
320+
streamName1,
321+
Collections.singletonList(event1).iterator(),
322+
StreamState.noStream()
323+
);
324+
325+
AppendStreamRequest request2 = new AppendStreamRequest(
326+
streamName2,
327+
Collections.singletonList(event2).iterator(),
328+
StreamState.noStream()
329+
);
330+
331+
MultiAppendWriteResult result = client.multiStreamAppend(
332+
Arrays.asList(request1, request2).iterator()
333+
).get();
334+
335+
Assertions.assertNotNull(result);
336+
Assertions.assertTrue(result.getSuccesses().isPresent());
337+
338+
List<ReadableSpan> spans = getSpansForOperation(ClientTelemetryConstants.Operations.MULTI_APPEND);
339+
Assertions.assertEquals(1, spans.size());
340+
341+
assertSpanAttributeEquals(spans.get(0), ClientTelemetryAttributes.Database.SYSTEM, ClientTelemetryConstants.INSTRUMENTATION_NAME);
342+
assertSpanAttributeEquals(spans.get(0), ClientTelemetryAttributes.Database.OPERATION, ClientTelemetryConstants.Operations.MULTI_APPEND);
343+
assertSpanAttributeEquals(spans.get(0), ClientTelemetryAttributes.Database.USER, "admin");
344+
Assertions.assertEquals(StatusCode.OK, spans.get(0).toSpanData().getStatus().getStatusCode());
345+
Assertions.assertEquals(SpanKind.CLIENT, spans.get(0).getKind());
346+
}
347+
348+
@Test
349+
default void testMultiStreamAppendIsInstrumentedWithFailures() throws Throwable {
350+
KurrentDBClient client = getDefaultClient();
351+
352+
Optional<ServerVersion> version = client.getServerVersion().get();
353+
354+
Assumptions.assumeTrue(
355+
version.isPresent() && version.get().isGreaterOrEqualThan(25, 0),
356+
"Multi-stream append is not supported server versions below 25.0.0"
357+
);
358+
359+
String streamName1 = generateName();
360+
String streamName2 = generateName();
361+
362+
EventData event1 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
363+
.eventId(UUID.randomUUID())
364+
.build();
365+
366+
EventData event2 = EventData.builderAsJson("TestEvent", mapper.writeValueAsBytes(new Foo()))
367+
.eventId(UUID.randomUUID())
368+
.build();
369+
370+
AppendStreamRequest request1 = new AppendStreamRequest(
371+
streamName1,
372+
Collections.singletonList(event1).iterator(),
373+
StreamState.noStream()
374+
);
375+
376+
AppendStreamRequest request2 = new AppendStreamRequest(
377+
streamName2,
378+
Collections.singletonList(event2).iterator(),
379+
StreamState.streamExists()
380+
);
381+
382+
MultiAppendWriteResult result = client.multiStreamAppend(
383+
Arrays.asList(request1, request2).iterator()
384+
).get();
385+
386+
Assertions.assertNotNull(result);
387+
Assertions.assertFalse(result.getSuccesses().isPresent());
388+
Assertions.assertTrue(result.getFailures().isPresent());
389+
390+
List<ReadableSpan> spans = getSpansForOperation(ClientTelemetryConstants.Operations.MULTI_APPEND);
391+
Assertions.assertEquals(1, spans.size());
392+
393+
ReadableSpan span = spans.get(0);
394+
395+
Assertions.assertEquals(StatusCode.ERROR, span.toSpanData().getStatus().getStatusCode());
396+
Assertions.assertEquals("", span.toSpanData().getStatus().getDescription());
397+
398+
List<io.opentelemetry.sdk.trace.data.EventData> events = span.toSpanData().getEvents();
399+
400+
Assertions.assertEquals(1, events.size());
401+
402+
io.opentelemetry.sdk.trace.data.EventData failureEvent = events.get(0);
403+
404+
Assertions.assertEquals("exception", failureEvent.getName());
405+
406+
Assertions.assertEquals(ErrorCase.STREAM_REVISION_CONFLICT.toString(),
407+
failureEvent.getAttributes().get(AttributeKey.stringKey("exception.type")));
408+
409+
Assertions.assertNotNull(failureEvent.getAttributes().get(AttributeKey.longKey("exception.revision")));
410+
Assertions.assertEquals(-1L, failureEvent.getAttributes().get(AttributeKey.longKey("exception.revision")));
411+
}
292412
}

0 commit comments

Comments
 (0)