Skip to content

Commit a057ae2

Browse files
committed
feat: add activity on connection
1 parent 9ecad93 commit a057ae2

8 files changed

+244
-36
lines changed

projects/Directory.Packages.props

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
<PackageVersion Include="OpenTelemetry.Api" Version="1.9.0" />
1111
<PackageVersion Include="OpenTelemetry.Exporter.InMemory" Version="1.9.0" />
1212
<PackageVersion Include="System.Collections.Immutable" Version="8.0.0" />
13+
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.4" />
1314
<!--
1415
Note: do NOT upgrade the System.IO.Pipelines dependency unless necessary
1516
See https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299
@@ -44,4 +45,4 @@
4445
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
4546
<GlobalPackageReference Include="MinVer" Version="6.0.0" />
4647
</ItemGroup>
47-
</Project>
48+
</Project>

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

+2-6
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
6060

6161
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
6262

63-
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
64-
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length)
65-
: default;
63+
using Activity? sendActivity = RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length);
6664

6765
ulong publishSequenceNumber = 0;
6866
if (publisherConfirmationInfo is not null)
@@ -115,9 +113,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
115113

116114
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
117115

118-
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
119-
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length)
120-
: default;
116+
using Activity? sendActivity = RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length);
121117

122118
ulong publishSequenceNumber = 0;
123119
if (publisherConfirmationInfo is not null)

projects/RabbitMQ.Client/Impl/Connection.cs

+4-3
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,10 @@ internal void TakeOver(Connection other)
228228
internal async ValueTask<IConnection> OpenAsync(CancellationToken cancellationToken)
229229
{
230230
cancellationToken.ThrowIfCancellationRequested();
231-
231+
using Activity? connectionActivity = RabbitMQActivitySource.OpenConnection(_frameHandler);
232232
try
233233
{
234234
RabbitMqClientEventSource.Log.ConnectionOpened();
235-
236235
cancellationToken.ThrowIfCancellationRequested();
237236

238237
// Note: this must happen *after* the frame handler is started
@@ -250,8 +249,10 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken)
250249

251250
return this;
252251
}
253-
catch
252+
catch (Exception ex)
254253
{
254+
connectionActivity?.SetStatus(ActivityStatusCode.Error);
255+
connectionActivity?.AddException(ex);
255256
try
256257
{
257258
var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen");

projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs

+17-25
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,20 @@ public static class RabbitMQActivitySource
4343
private static readonly ActivitySource s_subscriberSource =
4444
new ActivitySource(SubscriberSourceName, AssemblyVersion);
4545

46+
private static readonly ActivitySource s_connectionSource =
47+
new ActivitySource(ConnectionSourceName, AssemblyVersion);
48+
4649
public const string PublisherSourceName = "RabbitMQ.Client.Publisher";
4750
public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber";
51+
public const string ConnectionSourceName = "RabbitMQ.Client.Connection";
4852

49-
public static Action<Activity, IDictionary<string, object?>> ContextInjector { get; set; } = DefaultContextInjector;
53+
public static Action<Activity, IDictionary<string, object?>> ContextInjector { get; set; } =
54+
DefaultContextInjector;
5055

5156
public static Func<IReadOnlyBasicProperties, ActivityContext> ContextExtractor { get; set; } =
5257
DefaultContextExtractor;
5358

5459
public static bool UseRoutingKeyAsOperationName { get; set; } = true;
55-
internal static bool PublisherHasListeners => s_publisherSource.HasListeners();
5660

5761
internal static readonly IEnumerable<KeyValuePair<string, object?>> CreationTags = new[]
5862
{
@@ -61,14 +65,18 @@ public static class RabbitMQActivitySource
6165
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
6266
};
6367

68+
internal static Activity? OpenConnection(IFrameHandler frameHandler)
69+
{
70+
Activity? connectionActivity =
71+
s_connectionSource.StartRabbitMQActivity("connection attempt", ActivityKind.Client);
72+
connectionActivity?
73+
.SetNetworkTags(frameHandler);
74+
return connectionActivity;
75+
}
76+
6477
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize,
6578
ActivityContext linkedContext = default)
6679
{
67-
if (!s_publisherSource.HasListeners())
68-
{
69-
return null;
70-
}
71-
7280
Activity? activity = linkedContext == default
7381
? s_publisherSource.StartRabbitMQActivity(
7482
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicPublish} {routingKey}" : MessagingOperationNameBasicPublish,
@@ -82,16 +90,10 @@ public static class RabbitMQActivitySource
8290
}
8391

8492
return activity;
85-
8693
}
8794

8895
internal static Activity? BasicGetEmpty(string queue)
8996
{
90-
if (!s_subscriberSource.HasListeners())
91-
{
92-
return null;
93-
}
94-
9597
Activity? activity = s_subscriberSource.StartRabbitMQActivity(
9698
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGetEmpty} {queue}" : MessagingOperationNameBasicGetEmpty,
9799
ActivityKind.Consumer);
@@ -109,11 +111,6 @@ public static class RabbitMQActivitySource
109111
internal static Activity? BasicGet(string routingKey, string exchange, ulong deliveryTag,
110112
IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize)
111113
{
112-
if (!s_subscriberSource.HasListeners())
113-
{
114-
return null;
115-
}
116-
117114
// Extract the PropagationContext of the upstream parent from the message headers.
118115
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
119116
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGet} {routingKey}" : MessagingOperationNameBasicGet, ActivityKind.Consumer,
@@ -130,11 +127,6 @@ public static class RabbitMQActivitySource
130127
internal static Activity? Deliver(string routingKey, string exchange, ulong deliveryTag,
131128
IReadOnlyBasicProperties basicProperties, int bodySize)
132129
{
133-
if (!s_subscriberSource.HasListeners())
134-
{
135-
return null;
136-
}
137-
138130
// Extract the PropagationContext of the upstream parent from the message headers.
139131
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
140132
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicDeliver} {routingKey}" : MessagingOperationNameBasicDeliver,
@@ -197,15 +189,15 @@ private static void PopulateMessagingTags(string operationType, string operation
197189

198190
internal static void PopulateMessageEnvelopeSize(Activity? activity, int size)
199191
{
200-
if (activity != null && activity.IsAllDataRequested && PublisherHasListeners)
192+
if (activity?.IsAllDataRequested ?? false)
201193
{
202194
activity.SetTag(MessagingEnvelopeSize, size);
203195
}
204196
}
205197

206198
internal static void SetNetworkTags(this Activity? activity, IFrameHandler frameHandler)
207199
{
208-
if (PublisherHasListeners && activity != null && activity.IsAllDataRequested)
200+
if (activity?.IsAllDataRequested ?? false)
209201
{
210202
switch (frameHandler.RemoteEndPoint.AddressFamily)
211203
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
const RabbitMQ.Client.RabbitMQActivitySource.ConnectionSourceName = "RabbitMQ.Client.Connection" -> string!

projects/RabbitMQ.Client/RabbitMQ.Client.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
<PackageReference Include="System.IO.Pipelines" />
6868
<PackageReference Include="System.Threading.RateLimiting" />
6969
<PackageReference Include="Nullable" PrivateAssets="all" />
70+
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
7071
</ItemGroup>
7172

7273
<ItemGroup Condition="$(TargetFramework) == 'netstandard2.0'">
@@ -76,7 +77,6 @@
7677
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299
7778
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1594
7879
-->
79-
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
8080
<PackageReference Include="System.Memory" />
8181
<PackageReference Include="System.Threading.Channels" />
8282
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
+187
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Collections.Generic;
34+
using System.Diagnostics;
35+
using System.Linq;
36+
using System.Text;
37+
using System.Threading;
38+
using Xunit;
39+
40+
namespace Test
41+
{
42+
public class ActivityRecorder : IDisposable
43+
{
44+
private string _activitySourceName;
45+
private string _activityName;
46+
47+
private readonly ActivityListener _listener;
48+
private List<Activity> _finishedActivities = new();
49+
50+
private int _started;
51+
private int _stopped;
52+
53+
public int Started => _started;
54+
public int Stopped => _stopped;
55+
56+
public Predicate<Activity> Filter { get; set; } = _ => true;
57+
public bool VerifyParent { get; set; } = true;
58+
public Activity ExpectedParent { get; set; }
59+
60+
public Activity LastStartedActivity { get; private set; }
61+
public Activity LastFinishedActivity { get; private set; }
62+
public IEnumerable<Activity> FinishedActivities => _finishedActivities;
63+
64+
public ActivityRecorder(string activitySourceName, string activityName)
65+
{
66+
_activitySourceName = activitySourceName;
67+
_activityName = activityName;
68+
_listener = new ActivityListener
69+
{
70+
ShouldListenTo = (activitySource) => activitySource.Name == _activitySourceName,
71+
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllData,
72+
ActivityStarted = (activity) =>
73+
{
74+
if (activity.OperationName == _activityName && Filter(activity))
75+
{
76+
if (VerifyParent)
77+
{
78+
Assert.Same(ExpectedParent, activity.Parent);
79+
}
80+
81+
Interlocked.Increment(ref _started);
82+
83+
LastStartedActivity = activity;
84+
}
85+
},
86+
ActivityStopped = (activity) =>
87+
{
88+
if (activity.OperationName == _activityName && Filter(activity))
89+
{
90+
if (VerifyParent)
91+
{
92+
Assert.Same(ExpectedParent, activity.Parent);
93+
}
94+
95+
Interlocked.Increment(ref _stopped);
96+
97+
lock (_finishedActivities)
98+
{
99+
LastFinishedActivity = activity;
100+
_finishedActivities.Add(activity);
101+
}
102+
}
103+
}
104+
};
105+
106+
ActivitySource.AddActivityListener(_listener);
107+
}
108+
109+
public void Dispose() => _listener.Dispose();
110+
111+
public void VerifyActivityRecorded(int times)
112+
{
113+
Assert.Equal(times, Started);
114+
Assert.Equal(times, Stopped);
115+
}
116+
117+
public Activity VerifyActivityRecordedOnce()
118+
{
119+
VerifyActivityRecorded(1);
120+
return LastFinishedActivity;
121+
}
122+
}
123+
124+
public static class ActivityAssert
125+
{
126+
public static KeyValuePair<string, object> HasTag(this Activity activity, string name)
127+
{
128+
KeyValuePair<string, object> tag = activity.TagObjects.SingleOrDefault(t => t.Key == name);
129+
if (tag.Key is null)
130+
{
131+
Assert.Fail($"The Activity tags should contain {name}.");
132+
}
133+
134+
return tag;
135+
}
136+
137+
public static void HasTag<T>(this Activity activity, string name, T expectedValue)
138+
{
139+
KeyValuePair<string, object> tag = HasTag(activity, name);
140+
Assert.Equal(expectedValue, (T)tag.Value);
141+
}
142+
143+
public static void HasRecordedException(this Activity activity, Exception exception)
144+
{
145+
var exceptionEvent = activity.Events.First();
146+
Assert.Equal("exception", activity.Events.First().Name);
147+
Assert.Equal(exception.GetType().ToString(),
148+
exceptionEvent.Tags.SingleOrDefault(t => t.Key == "exception.type").Value);
149+
}
150+
151+
public static void IsInError(this Activity activity)
152+
{
153+
Assert.Equal(ActivityStatusCode.Error, activity.Status);
154+
}
155+
156+
public static void HasNoTag(this Activity activity, string name)
157+
{
158+
bool contains = activity.TagObjects.Any(t => t.Key == name);
159+
Assert.False(contains, $"The Activity tags should not contain {name}.");
160+
}
161+
162+
public static void FinishedInOrder(this Activity first, Activity second)
163+
{
164+
Assert.True(first.StartTimeUtc + first.Duration < second.StartTimeUtc + second.Duration,
165+
$"{first.OperationName} should stop before {second.OperationName}");
166+
}
167+
168+
public static string CamelToSnake(string camel)
169+
{
170+
if (string.IsNullOrEmpty(camel)) return camel;
171+
StringBuilder bld = new();
172+
bld.Append(char.ToLower(camel[0]));
173+
for (int i = 1; i < camel.Length; i++)
174+
{
175+
char c = camel[i];
176+
if (char.IsUpper(c))
177+
{
178+
bld.Append('_');
179+
}
180+
181+
bld.Append(char.ToLower(c));
182+
}
183+
184+
return bld.ToString();
185+
}
186+
}
187+
}

0 commit comments

Comments
 (0)