Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NLog Elastic Target supports EcsIndexChannel with IndexFormat #429

Merged
merged 4 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 87 additions & 11 deletions src/Elastic.NLog.Targets/ElasticsearchTarget.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
using Elastic.Channels;
using Elastic.Channels.Buffers;
using Elastic.Channels.Diagnostics;
using Elastic.CommonSchema.NLog;
using Elastic.Ingest.Elasticsearch;
using Elastic.Ingest.Elasticsearch.CommonSchema;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Indices;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Transport;
using Elastic.Transport.Products.Elasticsearch;
Expand All @@ -24,7 +26,7 @@ public class ElasticsearchTarget : TargetWithLayout
/// <inheritdoc />
public override Layout Layout { get => _layout; set => _layout = value as Elastic.CommonSchema.NLog.EcsLayout ?? _layout; }
private Elastic.CommonSchema.NLog.EcsLayout _layout = new Elastic.CommonSchema.NLog.EcsLayout();
private EcsDataStreamChannel<NLogEcsDocument>? _channel;
private IBufferedChannel<NLogEcsDocument>? _channel;

/// <summary>
/// Gets or sets the connection pool type. Default for multiple nodes is <c>Sniffing</c>; other supported values are
Expand All @@ -51,6 +53,41 @@ public class ElasticsearchTarget : TargetWithLayout
/// <summary> User-configurable arbitrary grouping</summary>
public Layout? DataStreamNamespace { get; set; } = "default";

/// <summary>
/// Gets or sets the format string for the Elastic search index. The current <c>DateTimeOffset</c> is passed as parameter 0.
///
/// <para> Example: "dotnet-{0:yyyy.MM.dd}"</para>
/// <para> If no {0} parameter is defined the index name is effectively fixed</para>
/// </summary>
public Layout? IndexFormat { get; set; }

/// <summary>
/// Gets or sets the offset to use for the index <c>DateTimeOffset</c>. Default value is null, which uses the system local offset.
/// Use "0" for UTC.
/// </summary>
public Layout? IndexOffsetHours { get; set; }

/// <summary>
/// Control the operation header for each bulk operation. Default value is Auto.
///
/// <para> Can explicit specify Auto, Index or Create</para>
/// </summary>
public OperationMode IndexOperation { get; set; }

/// <summary>
/// Gets or sets the optional override of the per document `_id`.
/// </summary>
public Layout? IndexEventId
{
get => _layout.EventId;
set
{
_layout.EventId = value;
_hasIndexEventId = value is not null;
}
}
private bool _hasIndexEventId;

/// <summary>
/// The maximum number of in flight instances that can be queued in memory. If this threshold is reached, events will be dropped
/// <para>Defaults to <c>100_000</c></para>
Expand Down Expand Up @@ -124,18 +161,17 @@ public Layout? CloudId
/// <summary>
/// Provide callbacks to further configure <see cref="DataStreamChannelOptions{TEvent}"/>
/// </summary>
public Action<DataStreamChannelOptions<NLogEcsDocument>>? ConfigureChannel { get; set; }
public Action<ElasticsearchChannelOptionsBase<NLogEcsDocument>>? ConfigureChannel { get; set; }

/// <inheritdoc cref="IChannelDiagnosticsListener"/>
public IChannelDiagnosticsListener? DiagnosticsListener => _channel?.DiagnosticsListener;

/// <inheritdoc />
protected override void InitializeTarget()
{
var ilmPolicy = IlmPolicy?.Render(LogEventInfo.CreateNullEvent());
var dataStreamType = DataStreamType?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var dataStreamSet = DataStreamSet?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var dataStreamNamespace = DataStreamNamespace?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var indexFormat = IndexFormat?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var indexOffsetHours = IndexOffsetHours?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var indexOffset = string.IsNullOrEmpty(indexOffsetHours) ? default(TimeSpan?) : TimeSpan.FromHours(int.Parse(indexOffsetHours));

var connectionPool = CreateNodePool();
var config = new TransportConfiguration(connectionPool, productRegistration: ElasticsearchProductRegistration.Default);
Expand All @@ -144,11 +180,18 @@ protected override void InitializeTarget()
config = SetAuthenticationOnTransport(config);

var transport = new DistributedTransport<TransportConfiguration>(config);
var channelOptions = new DataStreamChannelOptions<NLogEcsDocument>(transport)
if (!string.IsNullOrEmpty(indexFormat))
{
DataStream = new DataStreamName(dataStreamType, dataStreamSet, dataStreamNamespace),
WriteEvent = async (stream, ctx, logEvent) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false),
};
_channel = CreateIndexChannel(transport, indexFormat, indexOffset, IndexOperation);
}
else
{
_channel = CreateDataStreamChannel(transport);
}
}

private void SetupChannelOptions(ElasticsearchChannelOptionsBase<NLogEcsDocument> channelOptions)
{
if (InboundBufferMaxSize > 0)
channelOptions.BufferOptions.InboundBufferMaxSize = InboundBufferMaxSize;
if (OutboundBufferMaxSize > 0)
Expand All @@ -160,10 +203,43 @@ protected override void InitializeTarget()
if (ExportMaxRetries >= 0)
channelOptions.BufferOptions.ExportMaxRetries = ExportMaxRetries;
ConfigureChannel?.Invoke(channelOptions);
}

private EcsDataStreamChannel<NLogEcsDocument> CreateDataStreamChannel(DistributedTransport<TransportConfiguration> transport)
{
var ilmPolicy = IlmPolicy?.Render(LogEventInfo.CreateNullEvent());
var dataStreamType = DataStreamType?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var dataStreamSet = DataStreamSet?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var dataStreamNamespace = DataStreamNamespace?.Render(LogEventInfo.CreateNullEvent()) ?? string.Empty;
var channelOptions = new DataStreamChannelOptions<NLogEcsDocument>(transport)
{
DataStream = new DataStreamName(dataStreamType, dataStreamSet, dataStreamNamespace),
WriteEvent = async (stream, ctx, logEvent) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false),
};
SetupChannelOptions(channelOptions);
var channel = new EcsDataStreamChannel<NLogEcsDocument>(channelOptions, new[] { new InternalLoggerCallbackListener<NLogEcsDocument>() });
channel.BootstrapElasticsearch(BootstrapMethod, ilmPolicy);
_channel = channel;
return channel;
}

private EcsIndexChannel<NLogEcsDocument> CreateIndexChannel(DistributedTransport<TransportConfiguration> transport, string indexFormat, TimeSpan? indexOffset, OperationMode indexOperation)
{
var indexChannelOptions = new IndexChannelOptions<NLogEcsDocument>(transport)
{
IndexFormat = indexFormat,
IndexOffset = indexOffset,
WriteEvent = async (stream, ctx, logEvent) => await logEvent.SerializeAsync(stream, ctx).ConfigureAwait(false),
TimestampLookup = l => l.Timestamp,
OperationMode = indexOperation,
};

if (_hasIndexEventId)
{
indexChannelOptions.BulkOperationIdLookup = (logEvent) => (logEvent.Event?.Id)!;
}

SetupChannelOptions(indexChannelOptions);
return new EcsIndexChannel<NLogEcsDocument>(indexChannelOptions);
}

/// <inheritdoc />
Expand Down
9 changes: 8 additions & 1 deletion src/Elastic.NLog.Targets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var logger = LogManager.GetCurrentClassLogger();
- Cloud - Pool seeded with CloudId
- _NodeUris_ - URIs of the Elasticsearch nodes in the connection pool (comma delimited)
- _CloudId_ - When using NodePoolType = Cloud
- _BootstrapMethod_ - Whether to configure / bootstrap the destination, which requires user has management capabilities (None, Silent, Failure). Default = None

* **Export Authentication**
- _ApiKey_ - When using NodePoolType = Cloud and authentication via API key.
Expand All @@ -67,10 +68,16 @@ var logger = LogManager.GetCurrentClassLogger();
- _ExportMaxRetries_ - Max number of times to retry an export. Default = 3

* **Export DataStream**
- _DataStreamType_ - Generic type describing the data. Defaults = 'logs'
- _DataStreamType_ - Generic type describing the data. Default = 'logs'
- _DataStreamSet_ - Describes the data ingested and its structure. Default = 'dotnet'
- _DataStreamNamespace_ - User-configurable arbitrary grouping. Default = 'default'

* **Export Index**
- _IndexFormat_ - Format string for the Elastic search index (Ex. `dotnet-{0:yyyy.MM.dd}` or blank means disabled). Default = ''
- _IndexOffsetHours_ - Time offset to use for the index (Ex. `0` for UTC or blank means system local). Default = ''
- _IndexOperation_ - Operation header for each bulk operation (Auto, Index, Create). Default = Auto
- _IndexEventId_ - Optional override of the per document `_id`

Notice that export depends on in-memory queue, that is lost on application-crash / -exit.
If higher gurantee of delivery is required, then consider using [Elastic.CommonSchema.NLog](https://www.nuget.org/packages/Elastic.CommonSchema.NLog)
together with NLog FileTarget and use [filebeat](https://www.elastic.co/beats/filebeat) to ship these logs.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Channels.Diagnostics;
using Elastic.Clients.Elasticsearch.IndexManagement;
using Elastic.CommonSchema;
using Elastic.Ingest.Elasticsearch;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace NLog.Targets.Elastic.IntegrationTests
{
public class LoggingToIndexIngestionTests : TestBase
{
public LoggingToIndexIngestionTests(LoggingCluster cluster, ITestOutputHelper output) : base(cluster, output) { }

[Fact]
public async Task EnsureDocumentsEndUpInIndex()
{
var indexPrefix = "catalog-data-";
var indexFormat = indexPrefix + "{0:yyyy.MM.dd}";

using var _ = CreateLogger(out var logger, out var provider, out var @namespace, out var waitHandle, out var listener, (cfg) =>
{
cfg.IndexFormat = indexFormat;
cfg.DataStreamType = "x";
cfg.DataStreamSet = "dotnet";
var nodesUris = string.Join(",", Client.ElasticsearchClientSettings.NodePool.Nodes.Select(n => n.Uri.ToString()).ToArray());
cfg.NodeUris = nodesUris;
cfg.NodePoolType = ElasticPoolType.Static;
});

var date = DateTimeOffset.Now;
var indexName = string.Format(indexFormat, date);

var index = await Client.Indices.GetAsync(new GetIndexRequest(indexName));
index.Indices.Should().BeNullOrEmpty();

logger.Error("an error occurred!");

if (!waitHandle.WaitOne(TimeSpan.FromSeconds(10)))
throw new Exception($"No flush occurred in 10 seconds: {listener}", listener.ObservedException);

listener.PublishSuccess.Should().BeTrue("{0}", listener);
listener.ObservedException.Should().BeNull();

var refreshResult = await Client.Indices.RefreshAsync(indexName);
refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation);
var searchResult = await Client.SearchAsync<EcsDocument>(s => s.Indices(indexName));
searchResult.Total.Should().Be(1);

var storedDocument = searchResult.Documents.First();
storedDocument.Message.Should().Be("an error occurred!");

var hit = searchResult.Hits.First();
hit.Index.Should().Be(indexName);
}
}
}
Loading