Skip to content

Commit

Permalink
fix mappings
Browse files Browse the repository at this point in the history
  • Loading branch information
dicko2 committed Dec 22, 2023
1 parent 6a3fc69 commit dd1a9f8
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public static Task OnConsumeError(IMessageContext eventContextMessageContext, Ex
{
{"topic" , eventContextMessageContext.ConsumerContext.Topic},
{"partition" , eventContextMessageContext.ConsumerContext.Partition.ToString()},
{"offset" , eventContextMessageContext.ConsumerContext.Offset.ToString()},
});
eventContextMessageContext.Items.TryGetValue("timer", out var timer);
var theTimer = (Stopwatch)timer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public static Task OnProducerError(IMessageContext eventContextMessageContext, E
{
{"topic" , eventContextMessageContext.ProducerContext.Topic},
{"partition" , eventContextMessageContext.ProducerContext.Partition.ToString()},
{"offset" , eventContextMessageContext.ProducerContext.Offset.ToString()},
});
eventContextMessageContext.Items.TryGetValue("timer", out var timer);
var theTimer = (Stopwatch)timer;
Expand Down
18 changes: 8 additions & 10 deletions src/KafkaFlow.ApplicationInsights/ExtensionMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,23 @@ public static IKafkaConfigurationBuilder AddAppInsightsInstrumentation(this IKaf
}

internal static string GetDependencyName(this IMessageContext eventContextMessageContext)
{
return eventContextMessageContext.ConsumerContext?.Topic ??
eventContextMessageContext.ProducerContext?.Topic;
}

internal static string GetData(this IMessageContext eventContextMessageContext)
{
var type = Constants.ConsumerType;
if (eventContextMessageContext.ConsumerContext == null)
{
type = Constants.ProducerType;
}

var topic = eventContextMessageContext.ConsumerContext?.Topic ??
eventContextMessageContext.ProducerContext?.Topic;

var partition = eventContextMessageContext.ConsumerContext?.Partition.ToString() ??
eventContextMessageContext.ProducerContext?.Partition.ToString();

return $"{type}/{topic}/{partition}";
}

internal static string GetOffset(this IMessageContext eventContextMessageContext)
{
return eventContextMessageContext.ConsumerContext?.Offset.ToString() ?? eventContextMessageContext.ProducerContext?.Offset.ToString();
return $"{type}/{partition}";
}

internal static string GetTarget(this IMessageContext eventContextMessageContext)
Expand All @@ -66,7 +64,7 @@ internal static void TrackKafkaDependency(this TelemetryClient telemetryClient,
telemetryClient.TrackDependency(Constants.DependencyType,
eventContextMessageContext.GetTarget(),
eventContextMessageContext.GetDependencyName(),
eventContextMessageContext.GetOffset(),
eventContextMessageContext.GetData(),
DateTimeOffset.UtcNow,
elapsedTime,
resultCode,
Expand Down

0 comments on commit dd1a9f8

Please sign in to comment.