Skip to content

Commit

Permalink
fix(logger): update database sink
Browse files Browse the repository at this point in the history
  • Loading branch information
GZTimeWalker committed Dec 31, 2023
1 parent d02d28a commit 12ea935
Show file tree
Hide file tree
Showing 6 changed files with 1,484 additions and 33 deletions.
70 changes: 46 additions & 24 deletions src/GZCTF/Extensions/DatabaseSinkExtension.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Collections.Concurrent;
using Serilog;
using Serilog.Configuration;
using Serilog.Core;
Expand All @@ -12,22 +13,30 @@ public static LoggerConfiguration Database(this LoggerSinkConfiguration loggerCo
loggerConfiguration.Sink(new DatabaseSink(serviceProvider));
}

public class DatabaseSink : ILogEventSink
public class DatabaseSink : ILogEventSink, IDisposable
{
readonly IServiceProvider _serviceProvider;

static DateTimeOffset LastFlushTime = DateTimeOffset.FromUnixTimeSeconds(0);
static readonly List<LogModel> LockedLogBuffer = new();
static readonly List<LogModel> LogBuffer = new();
DateTimeOffset _lastFlushTime = DateTimeOffset.FromUnixTimeSeconds(0);
readonly CancellationTokenSource _tokenSource = new();
readonly ConcurrentQueue<LogModel> _logBuffer = new();

public DatabaseSink(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
Task.Run(() => WriteToDatabase(_tokenSource.Token), _tokenSource.Token);
}

public void Dispose()
{
_tokenSource.Cancel();
GC.SuppressFinalize(this);
}

public void Emit(LogEvent logEvent)
{
if (logEvent.Level < LogEventLevel.Information) return;
if (logEvent.Level < LogEventLevel.Information)
return;

LogModel logModel = new()
{
Expand All @@ -41,28 +50,41 @@ public void Emit(LogEvent logEvent)
Exception = logEvent.Exception?.ToString()
};

lock (LogBuffer)
{
LogBuffer.Add(logModel);
_logBuffer.Enqueue(logModel);
}

var needFlush = DateTimeOffset.Now - LastFlushTime > TimeSpan.FromSeconds(10);
if (!needFlush && LogBuffer.Count < 100) return;
async Task WriteToDatabase(CancellationToken token = default)
{
List<LogModel> lockedLogBuffer = new();

LockedLogBuffer.AddRange(LogBuffer);
LogBuffer.Clear();
try
{
while (!token.IsCancellationRequested)
{
while (_logBuffer.TryDequeue(out LogModel? logModel))
lockedLogBuffer.Add(logModel);

Task.Run(Flush);
}
}
if (lockedLogBuffer.Count > 50 || DateTimeOffset.Now - _lastFlushTime > TimeSpan.FromSeconds(10))
{
await using AsyncServiceScope scope = _serviceProvider.CreateAsyncScope();

async Task Flush()
{
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
await dbContext.Logs.AddRangeAsync(LockedLogBuffer);
await dbContext.SaveChangesAsync();
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
await dbContext.Logs.AddRangeAsync(lockedLogBuffer, token);

LockedLogBuffer.Clear();
LastFlushTime = DateTimeOffset.Now;
try
{
await dbContext.SaveChangesAsync(token);
}
finally
{
lockedLogBuffer.Clear();
_lastFlushTime = DateTimeOffset.Now;
}
}

await Task.Delay(TimeSpan.FromSeconds(1), token);
}
}
catch (TaskCanceledException) { }
}
}
}
Loading

0 comments on commit 12ea935

Please sign in to comment.