Skip to content

Commit

Permalink
Store traded securities' subscriptions in results (#7567)
Browse files Browse the repository at this point in the history
* Store traded securities subscription data configs to disk

* Delegate traded subscriptions storage to the data monitor

* Add unit tests

* Address peer review

* Minor changes
  • Loading branch information
jhonabreul authored Nov 13, 2023
1 parent 5c22631 commit 145ead7
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Algorithm.Python/QuantConnect.Algorithm.Python.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<Content Include="CustomPartialFillModelAlgorithm.py" />
<Content Include="AddRiskManagementAlgorithm.py" />
<Content Include="AddUniverseSelectionModelAlgorithm.py" />
<Compile Include="IndicatorWithRenkoBarsRegressionAlgorithm.py" />
<Content Include="IndicatorWithRenkoBarsRegressionAlgorithm.py" />
<Content Include="Alphas\ContingentClaimsAnalysisDefaultPredictionAlpha.py" />
<Content Include="Alphas\GasAndCrudeOilEnergyCorrelationAlpha.py" />
<Content Include="Alphas\GlobalEquityMeanReversionIBSAlpha.py" />
Expand Down
117 changes: 98 additions & 19 deletions Common/Data/DataMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using Newtonsoft.Json;
using QuantConnect.Configuration;
using QuantConnect.Data.Serialization;
using QuantConnect.Interfaces;
using QuantConnect.Orders;
using QuantConnect.Util;

namespace QuantConnect.Data
Expand All @@ -31,8 +34,8 @@ public class DataMonitor : IDataMonitor
{
private bool _exited;

private TextWriter _succeededDataRequestsWriter;
private TextWriter _failedDataRequestsWriter;
private Lazy<TextWriter> _succeededDataRequestsWriter;
private Lazy<TextWriter> _failedDataRequestsWriter;

private long _succeededDataRequestsCount;
private long _failedDataRequestsCount;
Expand All @@ -53,16 +56,34 @@ public class DataMonitor : IDataMonitor

private readonly object _threadLock = new();

private SubscriptionManager _subscriptionManager;
private readonly Dictionary<Symbol, List<SubscriptionDataConfig>> _tradedSecuritiesSubscriptions = new();

/// <summary>
/// Time provider instance used to get the current time (for testing purposes)
/// </summary>
protected ITimeProvider TimeProvider { get; set; }

/// <summary>
/// Initializes a new instance of the <see cref="DataMonitor"/> class
/// </summary>
public DataMonitor()
{
TimeProvider = new RealTimeProvider();
_resultsDestinationFolder = Config.Get("results-destination-folder", Directory.GetCurrentDirectory());
_succeededDataRequestsFileName = GetFilePath("succeeded-data-requests.txt");
_failedDataRequestsFileName = GetFilePath("failed-data-requests.txt");
}

/// <summary>
/// Set the <see cref="SubscriptionManager"/> instance to use to select the traded subscriptions
/// </summary>
/// <param name="subscriptionManager">The subscription manager to use</param>
public void SetSubscriptionManager(SubscriptionManager subscriptionManager)
{
_subscriptionManager = subscriptionManager;
}

/// <summary>
/// Event handler for the <see cref="IDataProvider.NewDataRequest"/> event
/// </summary>
Expand All @@ -87,7 +108,7 @@ public void OnNewDataRequest(object sender, DataProviderNewDataRequestEventArgs

if (e.Succeded)
{
WriteLineToFile(_succeededDataRequestsWriter, path, _succeededDataRequestsFileName);
WriteLineToFile(_succeededDataRequestsWriter.Value, path, _succeededDataRequestsFileName);
Interlocked.Increment(ref _succeededDataRequestsCount);
if (isUniverseData)
{
Expand All @@ -96,7 +117,7 @@ public void OnNewDataRequest(object sender, DataProviderNewDataRequestEventArgs
}
else
{
WriteLineToFile(_failedDataRequestsWriter, path, _failedDataRequestsFileName);
WriteLineToFile(_failedDataRequestsWriter.Value, path, _failedDataRequestsFileName);
Interlocked.Increment(ref _failedDataRequestsCount);
if (isUniverseData)
{
Expand All @@ -110,26 +131,64 @@ public void OnNewDataRequest(object sender, DataProviderNewDataRequestEventArgs
}
}

/// <summary>
/// Registers a new order event to select the trade subscriptions
/// </summary>
public void OnOrderEvent(object sender, OrderEvent orderEvent)
{
if (_exited)
{
return;
}

if (!_tradedSecuritiesSubscriptions.ContainsKey(orderEvent.Symbol))
{
_tradedSecuritiesSubscriptions[orderEvent.Symbol] = _subscriptionManager.SubscriptionDataConfigService
.GetSubscriptionDataConfigs(orderEvent.Symbol, includeInternalConfigs: false)
.ToList();
}
}

/// <summary>
/// Terminates the data monitor generating a final report
/// </summary>
public void Exit()
{
if (_exited || _requestRateCalculationThread == null)
if (_exited)
{
return;
}
_exited = true;

_requestRateCalculationThread.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource);
_succeededDataRequestsWriter?.Close();
_failedDataRequestsWriter?.Close();
if (_requestRateCalculationThread != null)
{
_requestRateCalculationThread.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource);

StoreDataMonitorReport(GenerateReport());
var succeededDataRequestWriterWasCreated = _succeededDataRequestsWriter?.IsValueCreated ?? false;
if (succeededDataRequestWriterWasCreated)
{
_succeededDataRequestsWriter.Value.Close();
}
var failedDataRequestsWriterWasCreated = _failedDataRequestsWriter?.IsValueCreated ?? false;
if (failedDataRequestsWriterWasCreated)
{
_failedDataRequestsWriter.Value.Close();
}

StoreDataMonitorReport(GenerateReport());

if (failedDataRequestsWriterWasCreated)
{
_succeededDataRequestsWriter.Value.DisposeSafely();
}
if (failedDataRequestsWriterWasCreated)
{
_failedDataRequestsWriter.Value.DisposeSafely();
}
_cancellationTokenSource.DisposeSafely();
}

_succeededDataRequestsWriter.DisposeSafely();
_failedDataRequestsWriter.DisposeSafely();
_cancellationTokenSource.DisposeSafely();
StoreTradedSubscriptions();
}

public void Dispose()
Expand Down Expand Up @@ -163,8 +222,8 @@ private void Initialize()
return;
}
// we create the files on demand
_succeededDataRequestsWriter = OpenStream(_succeededDataRequestsFileName);
_failedDataRequestsWriter = OpenStream(_failedDataRequestsFileName);
_succeededDataRequestsWriter = new(() => OpenStream(_succeededDataRequestsFileName));
_failedDataRequestsWriter = new(() => OpenStream(_failedDataRequestsFileName));

_cancellationTokenSource = new CancellationTokenSource();

Expand Down Expand Up @@ -209,13 +268,13 @@ private void ComputeFileRequestFrequency()
{
// First time we calculate the request rate.
// We don't have a previous value to compare to so we just store the current value.
_lastRequestRateCalculationTime = DateTime.UtcNow;
_lastRequestRateCalculationTime = TimeProvider.GetUtcNow();
_prevRequestsCount = requestsCount;
return;
}

var requestsCountDelta = requestsCount - _prevRequestsCount;
var now = DateTime.UtcNow;
var now = TimeProvider.GetUtcNow();
var timeDelta = now - _lastRequestRateCalculationTime;

_requestRates.Add(Math.Round(requestsCountDelta / timeDelta.TotalSeconds));
Expand All @@ -227,7 +286,7 @@ private void ComputeFileRequestFrequency()
/// Stores the data monitor report
/// </summary>
/// <param name="report">The data monitor report to be stored<param>
private void StoreDataMonitorReport(DataMonitorReport report)
protected void StoreDataMonitorReport(DataMonitorReport report)
{
if (report == null)
{
Expand All @@ -239,10 +298,30 @@ private void StoreDataMonitorReport(DataMonitorReport report)
File.WriteAllText(path, data);
}

private string GetFilePath(string filename)
/// <summary>
/// Stores the traded securities subscription configurations
/// </summary>
protected void StoreTradedSubscriptions()
{
var configs = Enumerable.Empty<SerializedSubscriptionDataConfig>();
if (_subscriptionManager != null)
{
configs = _tradedSecuritiesSubscriptions.Values.Select(x => new SerializedSubscriptionDataConfig(x
// Get the highest resolution config for each symbol
.GroupBy(config => config.Resolution)
.OrderBy(grouping => grouping.Key)
.First()));
}

var path = GetFilePath("traded-securities-subscriptions.json");
var data = JsonConvert.SerializeObject(configs, Formatting.None);
File.WriteAllText(path, data);
}

protected string GetFilePath(string filename)
{
var baseFilename = Path.GetFileNameWithoutExtension(filename);
var timestamp = DateTime.UtcNow.ToStringInvariant("yyyyMMddHHmmssfff");
var timestamp = TimeProvider.GetUtcNow().ToStringInvariant("yyyyMMddHHmmssfff");
var extension = Path.GetExtension(filename);
return Path.Combine(_resultsDestinationFolder, $"{baseFilename}-{timestamp}{extension}");
}
Expand Down
133 changes: 133 additions & 0 deletions Common/Data/Serialization/SerializedSubscriptionDataConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using System.Collections.Generic;
using System.Linq;

namespace QuantConnect.Data.Serialization
{
/// <summary>
/// Data transfer object used for serializing a <see cref="SubscriptionDataConfig"/>
/// </summary>
public class SerializedSubscriptionDataConfig
{
/// <summary>
/// Easy access to the order symbol associated with this event.
/// </summary>
[JsonProperty("symbol")]
public Symbol Symbol { get; set; }

/// <summary>
/// Security type
/// </summary>
[JsonProperty("security-type"), JsonConverter(typeof(StringEnumConverter), true)]
public SecurityType SecurityType { get; set; }

/// <summary>
/// Subscription resolution
/// </summary>
[JsonProperty("resolution"), JsonConverter(typeof(StringEnumConverter), true)]
public Resolution Resolution { get; set; }

/// <summary>
/// Extended market hours
/// </summary>
[JsonProperty("extended-market-hours")]
public bool ExtendedMarketHours { get; set; }

/// <summary>
/// Data normalization mode
/// </summary>
[JsonProperty("data-normalization-mode"), JsonConverter(typeof(StringEnumConverter), true)]
public DataNormalizationMode DataNormalizationMode { get; set; }

/// <summary>
/// Data mapping mode
/// </summary>
[JsonProperty("data-mapping-mode"), JsonConverter(typeof(StringEnumConverter), true)]
public DataMappingMode DataMappingMode { get; set; }

/// <summary>
/// Contract depth offset
/// </summary>
[JsonProperty("contract-depth-offset")]
public uint ContractDepthOffset { get; set; }

/// <summary>
/// Whether the subscription configuration is for a custom data type
/// </summary>
[JsonProperty("is-custom-data")]
public bool IsCustomData { get; set; }

/// <summary>
/// The subscription data configuration tick type
/// </summary>
[JsonProperty("tick-types", ItemConverterType = typeof(StringEnumConverter))]
public List<TickType> TickTypes { get; set; }

/// <summary>
/// The data type
/// </summary>
[JsonProperty("type")]
public string Type { get; set; }

/// <summary>
/// Empty constructor required for JSON converter.
/// </summary>
protected SerializedSubscriptionDataConfig()
{
}

/// <summary>
/// Creates a new instance based on the provided config
/// </summary>
public SerializedSubscriptionDataConfig(SubscriptionDataConfig config)
{
Symbol = config.Symbol;
SecurityType = config.SecurityType;
Resolution = config.Resolution;
ExtendedMarketHours = config.ExtendedMarketHours;
DataNormalizationMode = config.DataNormalizationMode;
DataMappingMode = config.DataMappingMode;
ContractDepthOffset = config.ContractDepthOffset;
IsCustomData = config.IsCustomData;
TickTypes = new() { config.TickType };
Type = config.Type.ToString();
}

/// <summary>
/// Creates a new instance based on the provided configs for the same symbol
/// </summary>
public SerializedSubscriptionDataConfig(IEnumerable<SubscriptionDataConfig> configs)
: this(configs.First())
{
var tickTypes = configs.Select(config => config.TickType);
if (SubscriptionManager.DefaultDataTypes().TryGetValue(SecurityType, out var dataTypes))
{
// Sort tick types by the order of the default data types.
// Using IndexOf is acceptably here because the number of tick types is quite small.
tickTypes = tickTypes.OrderBy(tickType =>
{
var index = dataTypes.IndexOf(tickType);
return index != -1 ? index : int.MaxValue;
});
}

TickTypes = tickTypes.ToList();
}
}
}
Loading

0 comments on commit 145ead7

Please sign in to comment.