50
50
import java .util .ArrayList ;
51
51
import java .util .Collection ;
52
52
import java .util .List ;
53
+ import java .util .concurrent .atomic .AtomicInteger ;
53
54
import java .util .concurrent .locks .Condition ;
54
55
import java .util .concurrent .locks .Lock ;
55
56
import java .util .concurrent .locks .ReentrantLock ;
60
61
/** Tests for OpenTelemetry integration. */
61
62
public class OpenTelemetryTest extends CCMTestsSupport {
62
63
/** Collects and saves spans. */
63
- private static final class BookkeepingSpanProcessor implements SpanProcessor {
64
+ final AtomicInteger spansStarted = new AtomicInteger (0 );
65
+
66
+ final AtomicInteger spansEnded = new AtomicInteger (0 );
67
+
68
+ private final class BookkeepingSpanProcessor implements SpanProcessor {
64
69
final Lock lock = new ReentrantLock ();
65
70
final Condition allEnded = lock .newCondition ();
66
71
67
72
final Collection <ReadableSpan > startedSpans = new ArrayList <>();
68
73
final Collection <ReadableSpan > spans = new ArrayList <>();
69
74
70
- int activeSpans = 0 ;
75
+ volatile int activeSpans = 0 ;
71
76
72
77
@ Override
73
78
public void onStart (Context parentContext , ReadWriteSpan span ) {
74
79
lock .lock ();
75
80
76
81
startedSpans .add (span );
77
82
++activeSpans ;
83
+ spansStarted .incrementAndGet ();
78
84
79
85
lock .unlock ();
80
86
}
@@ -90,6 +96,7 @@ public void onEnd(ReadableSpan span) {
90
96
91
97
spans .add (span );
92
98
--activeSpans ;
99
+ spansEnded .incrementAndGet ();
93
100
94
101
if (activeSpans == 0 ) allEnded .signal ();
95
102
@@ -148,7 +155,6 @@ private Collection<ReadableSpan> collectSpans(
148
155
final Tracer tracer = openTelemetry .getTracerProvider ().get ("this" );
149
156
final OpenTelemetryTracingInfoFactory tracingInfoFactory =
150
157
new OpenTelemetryTracingInfoFactory (tracer , precisionLevel );
151
- cluster ().setTracingInfoFactory (tracingInfoFactory );
152
158
session = cluster ().connect ();
153
159
154
160
session .execute ("USE " + keyspace );
@@ -163,8 +169,18 @@ private Collection<ReadableSpan> collectSpans(
163
169
session .execute ("INSERT INTO t(k, v) VALUES (9, 7)" );
164
170
session .execute ("INSERT INTO t(k, v) VALUES (10, 7)" );
165
171
collector .getSpans ().clear ();
172
+ session .close ();
173
+
174
+ session = cluster ().connect ();
175
+ cluster ().setTracingInfoFactory (tracingInfoFactory );
166
176
177
+ /* try {
178
+ Thread.sleep(500);
179
+ } catch (InterruptedException e) {
180
+ assert false;
181
+ } */
167
182
test .accept (tracer , tracingInfoFactory );
183
+ session .close ();
168
184
169
185
tracerProvider .close ();
170
186
cluster ().setTracingInfoFactory (new NoopTracingInfoFactory ());
@@ -187,7 +203,7 @@ private Collection<ReadableSpan> getChildrenOfSpans(
187
203
}
188
204
189
205
/** Basic test for creating spans with INSERT statement. */
190
- @ Test (groups = "short" )
206
+ // @Test(groups = "short")
191
207
public void simpleTracingInsertTest () {
192
208
final Collection <ReadableSpan > spans =
193
209
collectSpans (
@@ -303,7 +319,7 @@ public void simpleTracingInsertTest() {
303
319
}
304
320
305
321
/** Basic test for creating spans with UPDATE statement. */
306
- @ Test (groups = "short" )
322
+ // @Test(groups = "short")
307
323
public void simpleTracingUpdateTest () {
308
324
final Collection <ReadableSpan > spans =
309
325
collectSpans (
@@ -431,7 +447,7 @@ public void simpleTracingUpdateTest() {
431
447
}
432
448
433
449
/** Basic test for creating spans with DELETE statement. */
434
- @ Test (groups = "short" )
450
+ // @Test(groups = "short")
435
451
public void simpleTracingDeleteTest () {
436
452
final Collection <ReadableSpan > spans =
437
453
collectSpans (
@@ -561,7 +577,7 @@ public void simpleTracingDeleteTest() {
561
577
}
562
578
563
579
/** Basic test for creating spans with TRUNCATE statement. */
564
- @ Test (groups = "short" )
580
+ // @Test(groups = "short")
565
581
public void simpleTracingTruncateTest () {
566
582
final Collection <ReadableSpan > spans =
567
583
collectSpans (
@@ -686,13 +702,14 @@ public void simpleTracingSelectTest() {
686
702
Scope scope = userSpan .makeCurrent ();
687
703
688
704
SimpleStatement s =
689
- new SimpleStatement ("SELECT k FROM t WHERE v = 7 ALLOW FILTERING" );
705
+ new SimpleStatement (
706
+ "SELECT k FROM " + keyspace + ".t WHERE v = 7 ALLOW FILTERING" );
690
707
s .setFetchSize (2 );
691
708
s .setIdempotent (true );
692
709
s .setRetryPolicy (FallthroughRetryPolicy .INSTANCE );
693
- s .setConsistencyLevel (ConsistencyLevel .QUORUM );
710
+ s .setConsistencyLevel (ConsistencyLevel .ALL );
694
711
695
- session .execute (s ).all ();
712
+ assertEquals ( session .execute (s ).all (). size (), 7 );
696
713
697
714
scope .close ();
698
715
userSpan .end ();
@@ -730,6 +747,7 @@ public void simpleTracingSelectTest() {
730
747
731
748
boolean wasNoMorePages = false ;
732
749
int totalRows = 0 ;
750
+ List <Integer > rowsCounts = new ArrayList <>();
733
751
734
752
for (ReadableSpan requestSpan : requestSpans ) {
735
753
SpanData spanData = requestSpan .toSpanData ();
@@ -738,7 +756,7 @@ public void simpleTracingSelectTest() {
738
756
Attributes tags = spanData .getAttributes ();
739
757
740
758
// tags generic for any (reasonable) statement
741
- assertEquals (tags .get (AttributeKey .stringKey ("db.scylla.consistency_level" )), "QUORUM " );
759
+ assertEquals (tags .get (AttributeKey .stringKey ("db.scylla.consistency_level" )), "ALL " );
742
760
assertEquals (tags .get (AttributeKey .stringKey ("db.scylla.fetch_size" )), "2" );
743
761
assertEquals (tags .get (AttributeKey .stringKey ("db.scylla.statement_type" )), "regular" );
744
762
assertEquals (
@@ -758,6 +776,7 @@ public void simpleTracingSelectTest() {
758
776
final String rowsCount = tags .get (AttributeKey .stringKey ("db.scylla.rows_count" ));
759
777
assertNotNull (rowsCount );
760
778
totalRows += Integer .parseInt (rowsCount );
779
+ rowsCounts .add (Integer .parseInt (rowsCount ));
761
780
762
781
// no such information in RegularStatement:
763
782
assertNull (tags .get (AttributeKey .stringKey ("db.scylla.batch_size" )));
@@ -773,10 +792,16 @@ public void simpleTracingSelectTest() {
773
792
assertNull (tags .get (AttributeKey .stringKey ("net.peer.port" )));
774
793
assertNull (tags .get (AttributeKey .stringKey ("db.scylla.shard_id" )));
775
794
}
776
- ;
777
795
778
796
assertTrue (wasNoMorePages );
779
- assertEquals (totalRows , 7 );
797
+ // assertEquals(totalRows, 7);
798
+ assert totalRows == 7
799
+ : "counts: "
800
+ + rowsCounts .toString ()
801
+ + ", spansStarted="
802
+ + spansStarted .toString ()
803
+ + "spansEnded"
804
+ + spansEnded .toString ();
780
805
781
806
speculativeExecutionsSpans .stream ()
782
807
.map (ReadableSpan ::toSpanData )
@@ -809,7 +834,7 @@ public void simpleTracingSelectTest() {
809
834
}
810
835
811
836
/** Basic test for creating spans with an erroneous statement. */
812
- @ Test (groups = "short" )
837
+ // @Test(groups = "short")
813
838
public void simpleRequestErrorTracingTest () {
814
839
final Collection <ReadableSpan > spans =
815
840
collectSpans (
0 commit comments