Skip to content

Commit f838d27

Browse files
committed
Tracing: added OpenTelemetry integration test
The test verifies that span tree structure and status code are valid. Speculative executions are run parallel to the main thread, so some of them can finish only after query result has been returned. Thus, in order to collect span data from entire request, we decided to wait until all speculative executions end. The main thread uses conditional variable `allEnded` to wait for them and lock is used for concurrent mutation of activeSpans.
1 parent e0842d7 commit f838d27

File tree

1 file changed

+302
-0
lines changed

1 file changed

+302
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,302 @@
1+
/*
2+
* Copyright (C) 2021 ScyllaDB
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.driver.opentelemetry;
17+
18+
import static org.testng.Assert.assertEquals;
19+
import static org.testng.Assert.assertTrue;
20+
21+
import com.datastax.driver.core.CCMTestsSupport;
22+
import com.datastax.driver.core.Session;
23+
import com.datastax.driver.core.exceptions.InvalidQueryException;
24+
import com.datastax.driver.core.exceptions.SyntaxError;
25+
import com.datastax.driver.core.tracing.NoopTracingInfoFactory;
26+
import com.datastax.driver.core.tracing.PrecisionLevel;
27+
import com.datastax.driver.core.tracing.TracingInfoFactory;
28+
import io.opentelemetry.api.common.AttributeKey;
29+
import io.opentelemetry.api.common.Attributes;
30+
import io.opentelemetry.api.trace.Span;
31+
import io.opentelemetry.api.trace.StatusCode;
32+
import io.opentelemetry.api.trace.Tracer;
33+
import io.opentelemetry.context.Context;
34+
import io.opentelemetry.context.Scope;
35+
import io.opentelemetry.sdk.OpenTelemetrySdk;
36+
import io.opentelemetry.sdk.resources.Resource;
37+
import io.opentelemetry.sdk.trace.ReadWriteSpan;
38+
import io.opentelemetry.sdk.trace.ReadableSpan;
39+
import io.opentelemetry.sdk.trace.SdkTracerProvider;
40+
import io.opentelemetry.sdk.trace.SpanProcessor;
41+
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
42+
import java.util.ArrayList;
43+
import java.util.Collection;
44+
import java.util.List;
45+
import java.util.concurrent.locks.Condition;
46+
import java.util.concurrent.locks.Lock;
47+
import java.util.concurrent.locks.ReentrantLock;
48+
import java.util.function.BiConsumer;
49+
import java.util.stream.Collectors;
50+
import org.testng.annotations.Test;
51+
52+
/** Tests for OpenTelemetry integration. */
53+
public class OpenTelemetryTest extends CCMTestsSupport {
54+
/** Collects and saves spans. */
55+
private static final class BookkeepingSpanProcessor implements SpanProcessor {
56+
final Lock lock = new ReentrantLock();
57+
final Condition allEnded = lock.newCondition();
58+
59+
final Collection<ReadableSpan> startedSpans = new ArrayList<>();
60+
final Collection<ReadableSpan> spans = new ArrayList<>();
61+
62+
int activeSpans = 0;
63+
64+
@Override
65+
public void onStart(Context parentContext, ReadWriteSpan span) {
66+
lock.lock();
67+
68+
startedSpans.add(span);
69+
++activeSpans;
70+
71+
lock.unlock();
72+
}
73+
74+
@Override
75+
public boolean isStartRequired() {
76+
return true;
77+
}
78+
79+
@Override
80+
public void onEnd(ReadableSpan span) {
81+
lock.lock();
82+
83+
spans.add(span);
84+
--activeSpans;
85+
86+
if (activeSpans == 0) allEnded.signal();
87+
88+
lock.unlock();
89+
}
90+
91+
@Override
92+
public boolean isEndRequired() {
93+
return true;
94+
}
95+
96+
public Collection<ReadableSpan> getSpans() {
97+
lock.lock();
98+
99+
try {
100+
while (activeSpans > 0) allEnded.await();
101+
102+
for (ReadableSpan span : startedSpans) {
103+
assertTrue(span.hasEnded());
104+
}
105+
} catch (InterruptedException e) {
106+
assert false;
107+
} finally {
108+
lock.unlock();
109+
}
110+
111+
return spans;
112+
}
113+
}
114+
115+
private Session session;
116+
117+
/**
118+
* Prepare OpenTelemetry configuration and run test with it.
119+
*
120+
* @param precisionLevel precision level of tracing for the tests.
121+
* @param test test to run.
122+
* @return collected spans.
123+
*/
124+
private Collection<ReadableSpan> collectSpans(
125+
PrecisionLevel precisionLevel, BiConsumer<Tracer, TracingInfoFactory> test) {
126+
final Resource serviceNameResource =
127+
Resource.create(
128+
Attributes.of(ResourceAttributes.SERVICE_NAME, "Scylla Java driver - test"));
129+
130+
final BookkeepingSpanProcessor collector = new BookkeepingSpanProcessor();
131+
132+
final SdkTracerProvider tracerProvider =
133+
SdkTracerProvider.builder()
134+
.addSpanProcessor(collector)
135+
.setResource(Resource.getDefault().merge(serviceNameResource))
136+
.build();
137+
final OpenTelemetrySdk openTelemetry =
138+
OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build();
139+
140+
final Tracer tracer = openTelemetry.getTracerProvider().get("this");
141+
final OpenTelemetryTracingInfoFactory tracingInfoFactory =
142+
new OpenTelemetryTracingInfoFactory(tracer, precisionLevel);
143+
cluster().setTracingInfoFactory(tracingInfoFactory);
144+
session = cluster().connect();
145+
146+
session.execute("USE " + keyspace);
147+
session.execute("DROP TABLE IF EXISTS t");
148+
session.execute("CREATE TABLE t (k int PRIMARY KEY, v int)");
149+
collector.getSpans().clear();
150+
151+
test.accept(tracer, tracingInfoFactory);
152+
153+
tracerProvider.close();
154+
cluster().setTracingInfoFactory(new NoopTracingInfoFactory());
155+
156+
return collector.getSpans();
157+
}
158+
159+
/** Basic test for creating spans. */
160+
@Test(groups = "short")
161+
public void simpleTracingTest() {
162+
for (int i = 0; i < 10; i++) {
163+
final Collection<ReadableSpan> spans =
164+
collectSpans(
165+
PrecisionLevel.NORMAL,
166+
(tracer, tracingInfoFactory) -> {
167+
Span userSpan = tracer.spanBuilder("user span").startSpan();
168+
Scope scope = userSpan.makeCurrent();
169+
170+
session.execute("INSERT INTO t(k, v) VALUES (4, 2)");
171+
session.execute("INSERT INTO t(k, v) VALUES (2, 1)");
172+
173+
scope.close();
174+
userSpan.end();
175+
});
176+
177+
// Retrieve span created directly by tracer.
178+
final List<ReadableSpan> userSpans =
179+
spans.stream()
180+
.filter(span -> !span.getParentSpanContext().isValid())
181+
.collect(Collectors.toList());
182+
assertEquals(userSpans.size(), 1);
183+
final ReadableSpan userSpan = userSpans.get(0);
184+
185+
for (ReadableSpan span : spans) {
186+
assertTrue(span.getSpanContext().isValid());
187+
assertTrue(
188+
span.getSpanContext().equals(userSpan.getSpanContext())
189+
|| span.getParentSpanContext().isValid());
190+
}
191+
192+
// Retrieve spans representing requests.
193+
final Collection<ReadableSpan> rootSpans =
194+
spans.stream()
195+
.filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext()))
196+
.collect(Collectors.toList());
197+
assertEquals(rootSpans.size(), 2);
198+
199+
rootSpans.stream()
200+
.map(ReadableSpan::toSpanData)
201+
.forEach(
202+
spanData -> {
203+
assertEquals(spanData.getName(), "request");
204+
assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK);
205+
Attributes tags = spanData.getAttributes();
206+
207+
assertEquals(
208+
tags.get(AttributeKey.stringKey("db.scylla.consistency_level")), "ONE");
209+
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.batch_size")), "1");
210+
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.query_paged")), "false");
211+
assertEquals(
212+
tags.get(AttributeKey.stringKey("db.scylla.statement_type")), "regular");
213+
assertEquals(
214+
tags.get(AttributeKey.stringKey("db.scylla.load_balancing_policy")),
215+
"PagingOptimizingLoadBalancingPolicy");
216+
assertEquals(
217+
tags.get(AttributeKey.stringKey("db.scylla.speculative_execution_policy")),
218+
"NoSpeculativeExecutionPolicy");
219+
220+
// no such information in RegularStatement:
221+
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.keyspace")), null); //
222+
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.table")), null);
223+
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.partition_key")), null);
224+
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.db.operation")), null);
225+
// no such information with PrecisionLevel.NORMAL:
226+
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.statement")), null);
227+
// no such information with operation INSERT:
228+
assertEquals(tags.get(AttributeKey.stringKey("db.scylla.rows_count")), null);
229+
230+
// TODO: more
231+
});
232+
}
233+
}
234+
235+
/** Basic test for creating spans. */
236+
@Test(groups = "short")
237+
public void simpleRequestErrorTracingTest() {
238+
for (int i = 0; i < 10; i++) {
239+
final Collection<ReadableSpan> spans =
240+
collectSpans(
241+
PrecisionLevel.FULL,
242+
(tracer, tracingInfoFactory) -> {
243+
Span userSpan = tracer.spanBuilder("user span").startSpan();
244+
Scope scope = userSpan.makeCurrent();
245+
246+
try {
247+
session.execute("INSERT ONTO t(k, v) VALUES (4, 2)");
248+
// ^ syntax error here
249+
assert false; // exception should be thrown before this line is executed
250+
} catch (SyntaxError error) {
251+
// pass
252+
}
253+
254+
try {
255+
session.execute("INSERT INTO t(k, v) VALUES (2, 1, 3, 7)");
256+
// ^ too many values
257+
assert false; // exception should be thrown before this line is executed
258+
} catch (InvalidQueryException error) {
259+
// pass
260+
}
261+
262+
scope.close();
263+
userSpan.end();
264+
});
265+
266+
// Retrieve span created directly by tracer.
267+
final List<ReadableSpan> userSpans =
268+
spans.stream()
269+
.filter(span -> !span.getParentSpanContext().isValid())
270+
.collect(Collectors.toList());
271+
assertEquals(userSpans.size(), 1);
272+
final ReadableSpan userSpan = userSpans.get(0);
273+
274+
for (ReadableSpan span : spans) {
275+
assertTrue(span.getSpanContext().isValid());
276+
assertTrue(
277+
span.getSpanContext().equals(userSpan.getSpanContext())
278+
|| span.getParentSpanContext().isValid());
279+
}
280+
281+
// Retrieve spans representing requests.
282+
final Collection<ReadableSpan> rootSpans =
283+
spans.stream()
284+
.filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext()))
285+
.collect(Collectors.toList());
286+
assertEquals(rootSpans.size(), 2);
287+
288+
rootSpans.stream()
289+
.map(ReadableSpan::toSpanData)
290+
.forEach(
291+
spanData -> {
292+
assertEquals(spanData.getName(), "request");
293+
assertEquals(spanData.getStatus().getStatusCode(), StatusCode.ERROR);
294+
final String collectedStatement =
295+
spanData.getAttributes().get(AttributeKey.stringKey("db.scylla.statement"));
296+
assert collectedStatement.equals("INSERT INTO t(k, v) VALUES (2, 1, 3, 7)")
297+
|| collectedStatement.equals("INSERT ONTO t(k, v) VALUES (4, 2)")
298+
: "Bad statement gathered";
299+
});
300+
}
301+
}
302+
}

0 commit comments

Comments
 (0)