Skip to content

Commit

Permalink
Adding filters should not force initialization of Agent. (#2418)
Browse files Browse the repository at this point in the history
Instead if the agent is not yet setup we simply buffer the filters until
the agent gets constructed.

This allows you to add filters before initializing the static agent.

This manifested itself in the hosted service integrations (.NET Core)
where the agent get's constructed slightly delayed.

And calling

```
Agent.AddFilter((ISpan span) =>
{
    return span;
});
```

Would win the race for `Agent.Instance` over the ASP.NET core
integrations.
  • Loading branch information
Mpdreamz authored Aug 15, 2024
1 parent 7dfd0ea commit 8e99d01
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 37 deletions.
62 changes: 52 additions & 10 deletions src/Elastic.Apm/Agent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public interface IApmAgent : IApmAgentComponents
internal class ApmAgent : IApmAgent, IDisposable
{
internal readonly CompositeDisposable Disposables = new();

internal ApmAgent(AgentComponents agentComponents) => Components = agentComponents ?? new AgentComponents();

internal ICentralConfigurationFetcher CentralConfigurationFetcher => Components.CentralConfigurationFetcher;
Expand Down Expand Up @@ -72,16 +71,31 @@ public static class Agent
lock (InitializationLock)
{
var agent = new ApmAgent(Components);

agent.Logger?.Trace()
?.Log("Initialization - Agent instance initialized. Callstack: {callstack}", new StackTrace().ToString());

if (agent.Components.PayloadSender is not IPayloadSenderWithFilters sender)
return agent;

ErrorFilters.ForEach(f => sender.AddFilter(f));
TransactionFilters.ForEach(f => sender.AddFilter(f));
SpanFilters.ForEach(f => sender.AddFilter(f));
agent.Logger?.Trace()
?.Log(@"Initialization - Added filters to agent (errors:{{ErrorFilters}}, transactions:{TransactionFilters} spans:{SpanFilters}",
ErrorFilters.Count, TransactionFilters.Count, SpanFilters.Count);

return agent;
}
});

private static readonly object InitializationLock = new object();

private static readonly List<Func<IError, IError>> ErrorFilters = [];

private static readonly List<Func<ISpan, ISpan>> SpanFilters = [];

private static readonly List<Func<ITransaction, ITransaction>> TransactionFilters = [];

internal static AgentComponents Components { get; private set; }

public static IConfigurationReader Config => Instance.Configuration;
Expand Down Expand Up @@ -114,7 +128,16 @@ public static class Agent
/// <code>true</code> if the filter was added successfully, <code>false</code> otherwise. In case the method
/// returns <code>false</code> the filter won't be called.
/// </returns>
public static bool AddFilter(Func<ITransaction, ITransaction> filter) => CheckAndAddFilter(p => p.TransactionFilters.Add(filter));
public static bool AddFilter(Func<ITransaction, ITransaction> filter)
{
if (!IsConfigured)
{
TransactionFilters.Add(filter);
return true;
}

return CheckAndAddFilter(p => p.AddFilter(filter));
}

/// <summary>
/// Adds a filter which gets called before each span gets sent to APM Server.
Expand All @@ -133,7 +156,16 @@ public static class Agent
/// <code>true</code> if the filter was added successfully, <code>false</code> otherwise. In case the method
/// returns <code>false</code> the filter won't be called.
/// </returns>
public static bool AddFilter(Func<ISpan, ISpan> filter) => CheckAndAddFilter(p => p.SpanFilters.Add(filter));
public static bool AddFilter(Func<ISpan, ISpan> filter)
{
if (!IsConfigured)
{
SpanFilters.Add(filter);
return true;
}

return CheckAndAddFilter(p => p.AddFilter(filter));
}

/// <summary>
/// Adds a filter which gets called before each error gets sent to APM Server.
Expand All @@ -152,15 +184,23 @@ public static class Agent
/// <code>true</code> if the filter was added successfully, <code>false</code> otherwise. In case the method
/// returns <code>false</code> the filter won't be called.
/// </returns>
public static bool AddFilter(Func<IError, IError> filter) => CheckAndAddFilter(p => p.ErrorFilters.Add(filter));
public static bool AddFilter(Func<IError, IError> filter)
{
if (!IsConfigured)
{
ErrorFilters.Add(filter);
return true;
}

return CheckAndAddFilter(p => p.AddFilter(filter));
}

private static bool CheckAndAddFilter(Action<PayloadSenderV2> action)
private static bool CheckAndAddFilter(Func<IPayloadSenderWithFilters, bool> action)
{
if (!(Instance.PayloadSender is PayloadSenderV2 payloadSenderV2))
if (Instance.PayloadSender is not IPayloadSenderWithFilters sender)
return false;

action(payloadSenderV2);
return true;
return action(sender);
}

/// <summary>
Expand Down Expand Up @@ -199,10 +239,12 @@ public static void Setup(AgentComponents agentComponents)
return;
}

Components ??= agentComponents;

agentComponents?.Logger?.Trace()
?.Log("Initialization - Agent.Setup called");

Components = agentComponents;

// Force initialization
var _ = LazyApmAgent.Value;
}
Expand Down
9 changes: 9 additions & 0 deletions src/Elastic.Apm/Report/IPayloadSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using Elastic.Apm.Api;

namespace Elastic.Apm.Report
Expand All @@ -16,4 +18,11 @@ public interface IPayloadSender

void QueueTransaction(ITransaction transaction);
}

public interface IPayloadSenderWithFilters
{
bool AddFilter(Func<ITransaction, ITransaction> transactionFilter);
bool AddFilter(Func<ISpan, ISpan> spanFilter);
bool AddFilter(Func<IError, IError> errorFilter);
}
}
38 changes: 34 additions & 4 deletions src/Elastic.Apm/Report/PayloadSenderV2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ namespace Elastic.Apm.Report
/// Responsible for sending the data to APM server. Implements Intake V2.
/// Each instance creates its own thread to do the work. Therefore, instances should be reused if possible.
/// </summary>
internal class PayloadSenderV2 : BackendCommComponentBase, IPayloadSender
internal class PayloadSenderV2 : BackendCommComponentBase, IPayloadSender, IPayloadSenderWithFilters
{
private const string ThisClassName = nameof(PayloadSenderV2);
internal readonly List<Func<IError, IError>> ErrorFilters = new List<Func<IError, IError>>();
internal readonly List<Func<ISpan, ISpan>> SpanFilters = new List<Func<ISpan, ISpan>>();
internal readonly List<Func<ITransaction, ITransaction>> TransactionFilters = new List<Func<ITransaction, ITransaction>>();
internal readonly List<Func<IError, IError>> ErrorFilters = new();
internal readonly List<Func<ISpan, ISpan>> SpanFilters = new();
internal readonly List<Func<ITransaction, ITransaction>> TransactionFilters = new();

private readonly IApmServerInfo _apmServerInfo;

Expand Down Expand Up @@ -148,6 +148,7 @@ IApmLogger logger

private bool _getApmServerVersion;
private bool _getCloudMetadata;
private bool _allowFilterAdd;
private static readonly UTF8Encoding Utf8Encoding;
private static readonly MediaTypeHeaderValue MediaTypeHeaderValue;

Expand Down Expand Up @@ -244,6 +245,8 @@ protected override void WorkLoopIteration()
}

var batch = ReceiveBatch();
if (_allowFilterAdd && batch is { Length: > 0 })
_allowFilterAdd = false;
if (batch != null)
ProcessQueueItems(batch);
}
Expand Down Expand Up @@ -497,5 +500,32 @@ private T TryExecuteFilter<T>(List<Func<T, T>> filters, T item) where T : class

return item;
}

public bool AddFilter(Func<ITransaction, ITransaction> transactionFilter)
{
if (!_allowFilterAdd)
return false;

TransactionFilters.Add(transactionFilter);
return true;
}

public bool AddFilter(Func<ISpan, ISpan> spanFilter)
{
if (!_allowFilterAdd)
return false;

SpanFilters.Add(spanFilter);
return true;
}

public bool AddFilter(Func<IError, IError> errorFilter)
{
if (!_allowFilterAdd)
return false;

ErrorFilters.Add(errorFilter);
return true;
}
}
}
37 changes: 28 additions & 9 deletions test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

namespace Elastic.Apm.Tests.Utilities
{
internal class MockPayloadSender : IPayloadSender
internal class MockPayloadSender : IPayloadSender, IPayloadSenderWithFilters
{
private static readonly JObject JsonSpanTypesData =
JObject.Parse(File.ReadAllText(Path.Combine(SolutionPaths.Root, "test/Elastic.Apm.Tests.Utilities/TestResources/json-specs/span_types.json")));
Expand Down Expand Up @@ -211,7 +211,7 @@ public IReadOnlyList<IMetricSet> Metrics
get
{
lock (_metricsLock)
return CreateImmutableSnapshot<IMetricSet>(_metrics);
return CreateImmutableSnapshot(_metrics);
}
}

Expand All @@ -220,7 +220,7 @@ public IReadOnlyList<ISpan> Spans
get
{
lock (_spanLock)
return CreateImmutableSnapshot<ISpan>(_spans);
return CreateImmutableSnapshot(_spans);
}
}

Expand All @@ -229,7 +229,7 @@ public IReadOnlyList<ITransaction> Transactions
get
{
lock (_transactionLock)
return CreateImmutableSnapshot<ITransaction>(_transactions);
return CreateImmutableSnapshot(_transactions);
}
}

Expand All @@ -240,8 +240,8 @@ public void QueueError(IError error)
{
lock (_errorLock)
{
error = _errorFilters.Aggregate(error,
(current, filter) => filter(current));
error = _errorFilters
.Aggregate(error, (current, filter) => filter(current));
_errors.Add(error);
_errorWaitHandle.Set();
}
Expand All @@ -251,8 +251,8 @@ public virtual void QueueTransaction(ITransaction transaction)
{
lock (_transactionLock)
{
transaction = _transactionFilters.Aggregate(transaction,
(current, filter) => filter(current));
transaction = _transactionFilters
.Aggregate(transaction, (current, filter) => filter(current));
_transactions.Add(transaction);
_transactionWaitHandle.Set();
}
Expand All @@ -263,7 +263,8 @@ public void QueueSpan(ISpan span)
VerifySpan(span);
lock (_spanLock)
{
span = _spanFilters.Aggregate(span, (current, filter) => filter(current));
span = _spanFilters
.Aggregate(span, (current, filter) => filter(current));
_spans.Add(span);
_spanWaitHandle.Set();
}
Expand Down Expand Up @@ -305,6 +306,24 @@ private void VerifySpan(ISpan span)
}
}

public bool AddFilter(Func<ITransaction, ITransaction> transactionFilter)
{
_transactionFilters.Add(transactionFilter);
return true;
}

public bool AddFilter(Func<ISpan, ISpan> spanFilter)
{
_spanFilters.Add(spanFilter);
return true;
}

public bool AddFilter(Func<IError, IError> errorFilter)
{
_errorFilters.Add(errorFilter);
return true;
}

public void QueueMetrics(IMetricSet metricSet)
{
lock (_metricsLock)
Expand Down
11 changes: 10 additions & 1 deletion test/Elastic.Apm.Tests.Utilities/NoopPayloadSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using Elastic.Apm.Api;
using Elastic.Apm.Report;

namespace Elastic.Apm.Tests.Utilities
{
public class NoopPayloadSender : IPayloadSender
public class NoopPayloadSender : IPayloadSender, IPayloadSenderWithFilters
{
public void QueueError(IError error) { }

Expand All @@ -16,5 +18,12 @@ public void QueueTransaction(ITransaction transaction) { }
public void QueueSpan(ISpan span) { }

public void QueueMetrics(IMetricSet metricSet) { }

public bool AddFilter(Func<ITransaction, ITransaction> transactionFilter) => true;

public bool AddFilter(Func<ISpan, ISpan> spanFilter) => true;

public bool AddFilter(Func<IError, IError> errorFilter) => true;

}
}
Loading

0 comments on commit 8e99d01

Please sign in to comment.