Skip to content

Commit

Permalink
NLog Elastic Target supports IndexFormat + IndexOffset
Browse files Browse the repository at this point in the history
  • Loading branch information
snakefoot committed Sep 4, 2024
1 parent 4c786ea commit 3c248ec
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 12 deletions.
77 changes: 66 additions & 11 deletions src/Elastic.NLog.Targets/ElasticsearchTarget.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Elastic.Ingest.Elasticsearch;
using Elastic.Ingest.Elasticsearch.CommonSchema;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Indices;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Transport;
using Elastic.Transport.Products.Elasticsearch;
Expand All @@ -24,7 +25,7 @@ public class ElasticsearchTarget : TargetWithLayout
/// <inheritdoc />
public override Layout Layout { get => _layout; set => _layout = value as Elastic.CommonSchema.NLog.EcsLayout ?? _layout; }
private Elastic.CommonSchema.NLog.EcsLayout _layout = new Elastic.CommonSchema.NLog.EcsLayout();
private EcsDataStreamChannel<NLogEcsDocument>? _channel;
private IBufferedChannel<NLogEcsDocument>? _channel;

/// <summary>
/// Gets or sets the connection pool type. Default for multiple nodes is <c>Sniffing</c>; other supported values are
Expand All @@ -51,6 +52,27 @@ public class ElasticsearchTarget : TargetWithLayout
/// <summary> User-configurable arbitrary grouping</summary>
public Layout? DataStreamNamespace { get; set; } = "default";

/// <summary>
/// Gets or sets the format string for the Elastic search index. The current <c>DateTimeOffset</c> is passed as parameter 0.
///
/// <para> Example: "dotnet-{0:yyyy.MM.dd}"</para>
/// <para> If no {0} parameter is defined the index name is effectively fixed</para>
/// </summary>
public Layout? IndexFormat { get; set; }

/// <summary>
/// Gets or sets the offset to use for the index <c>DateTimeOffset</c>. Default value is null, which uses the system local offset.
/// Use "0" for UTC.
/// </summary>
public Layout? IndexOffsetHours { get; set; }

/// <summary>
/// Control the operation header for each bulk operation. Default value is Auto.
///
/// <para> Can explicit specify Auto, Index or Create</para>
/// </summary>
public OperationMode IndexOperation { get; set; }

/// <summary>
/// The maximum number of in flight instances that can be queued in memory. If this threshold is reached, events will be dropped
/// <para>Defaults to <c>100_000</c></para>
Expand Down Expand Up @@ -124,18 +146,17 @@ public Layout? CloudId
/// <summary>
/// Provide callbacks to further configure <see cref="DataStreamChannelOptions{TEvent}"/>
/// </summary>
public Action<DataStreamChannelOptions<NLogEcsDocument>>? ConfigureChannel { get; set; }
public Action<ElasticsearchChannelOptionsBase<NLogEcsDocument>>? ConfigureChannel { get; set; }

/// <inheritdoc cref="IChannelDiagnosticsListener"/>
public IChannelDiagnosticsListener? DiagnosticsListener => _channel?.DiagnosticsListener;

/// <inheritdoc />
protected override void InitializeTarget()
{
var ilmPolicy = IlmPolicy?.Render(LogEventInfo.CreateNullEvent());
var dataStreamType = DataStreamType?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var dataStreamSet = DataStreamSet?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var dataStreamNamespace = DataStreamNamespace?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var indexFormat = IndexFormat?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var indexOffsetHours = IndexOffsetHours?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var indexOffset = string.IsNullOrEmpty(indexOffsetHours) ? default(TimeSpan?) : TimeSpan.FromHours(int.Parse(indexOffsetHours));

var connectionPool = CreateNodePool();
var config = new TransportConfiguration(connectionPool, productRegistration: ElasticsearchProductRegistration.Default);
Expand All @@ -144,11 +165,18 @@ protected override void InitializeTarget()
config = SetAuthenticationOnTransport(config);

var transport = new DistributedTransport<TransportConfiguration>(config);
var channelOptions = new DataStreamChannelOptions<NLogEcsDocument>(transport)
if (!string.IsNullOrEmpty(indexFormat))
{
DataStream = new DataStreamName(dataStreamType, dataStreamSet, dataStreamNamespace),
WriteEvent = async (stream, ctx, logEvent) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false),
};
_channel = CreateIndexChannel(transport, indexFormat, indexOffset, IndexOperation);
}
else
{
_channel = CreateDataStreamChannel(transport);
}
}

private void SetupChannelOptions(ElasticsearchChannelOptionsBase<NLogEcsDocument> channelOptions)
{
if (InboundBufferMaxSize > 0)
channelOptions.BufferOptions.InboundBufferMaxSize = InboundBufferMaxSize;
if (OutboundBufferMaxSize > 0)
Expand All @@ -160,10 +188,37 @@ protected override void InitializeTarget()
if (ExportMaxRetries >= 0)
channelOptions.BufferOptions.ExportMaxRetries = ExportMaxRetries;
ConfigureChannel?.Invoke(channelOptions);
}

private EcsDataStreamChannel<NLogEcsDocument> CreateDataStreamChannel(DistributedTransport<TransportConfiguration> transport)
{
var ilmPolicy = IlmPolicy?.Render(LogEventInfo.CreateNullEvent());
var dataStreamType = DataStreamType?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var dataStreamSet = DataStreamSet?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var dataStreamNamespace = DataStreamNamespace?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var channelOptions = new DataStreamChannelOptions<NLogEcsDocument>(transport)
{
DataStream = new DataStreamName(dataStreamType, dataStreamSet, dataStreamNamespace),
WriteEvent = async (stream, ctx, logEvent) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false),
};
SetupChannelOptions(channelOptions);
var channel = new EcsDataStreamChannel<NLogEcsDocument>(channelOptions, new[] { new InternalLoggerCallbackListener<NLogEcsDocument>() });
channel.BootstrapElasticsearch(BootstrapMethod, ilmPolicy);
_channel = channel;
return channel;
}

private EcsIndexChannel<NLogEcsDocument> CreateIndexChannel(DistributedTransport<TransportConfiguration> transport, string indexFormat, TimeSpan? indexOffset, OperationMode indexOperation)
{
var indexChannelOptions = new IndexChannelOptions<NLogEcsDocument>(transport)
{
IndexFormat = indexFormat,
IndexOffset = indexOffset,
WriteEvent = async (stream, ctx, logEvent) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false),
TimestampLookup = l => l.Timestamp,
OperationMode = indexOperation,
};
SetupChannelOptions(indexChannelOptions);
return new EcsIndexChannel<NLogEcsDocument>(indexChannelOptions);
}

/// <inheritdoc />
Expand Down
7 changes: 6 additions & 1 deletion src/Elastic.NLog.Targets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,15 @@ var logger = LogManager.GetCurrentClassLogger();
- _ExportMaxRetries_ - Max number of times to retry an export. Default = 3

* **Export DataStream**
- _DataStreamType_ - Generic type describing the data. Defaults = 'logs'
- _DataStreamType_ - Generic type describing the data. Default = 'logs'
- _DataStreamSet_ - Describes the data ingested and its structure. Default = 'dotnet'
- _DataStreamNamespace_ - User-configurable arbitrary grouping. Default = 'default'

* **Export Index**
- _IndexFormat_ - Format string for the Elastic search index (Ex. `dotnet-{0:yyyy.MM.dd}` or blank means disabled). Default = ''
- _IndexOffsetHours_ - Time offset to use for the index (Ex. `0` for UTC or blank means system local). Default = ''
- _IndexOperation_ - Operation header for each bulk operation (Auto, Index, Create). Default = 'Auto'

Notice that export depends on in-memory queue, that is lost on application-crash / -exit.
If higher gurantee of delivery is required, then consider using [Elastic.CommonSchema.NLog](https://www.nuget.org/packages/Elastic.CommonSchema.NLog)
together with NLog FileTarget and use [filebeat](https://www.elastic.co/beats/filebeat) to ship these logs.
Expand Down

0 comments on commit 3c248ec

Please sign in to comment.