Skip to content

Commit

Permalink
Feature: support OpenInterest (#20)
Browse files Browse the repository at this point in the history
* feat: OpenInterestProcessorManager

* remove: unsupported OpenInterest exception

* feat: init ScheduleNextRun of openInterestManager
refactor: PolygonOpenInterestProcessorManager

* feat: new ProcessFeed() to validate subscription process

* test:feat: PolygonOpenInterestProcessorManager

* remove: commented code

* test:refactor: PolygonOpenInterestProcessorManagerTests

* refactor: use new Tick constructor
feat: use new GetSubscribedSymbols by TickType
feat: wrap in try/catch RunProcessOpenInterest

* refactor: change timer delay instead of create new one

* feat: Validate MarketOpen for each symbols
refactor: naming of variables
test:feat: with different Times for OpenInterestManager

* refactor: move validation on OpenInterest in History request

* feat: skip subscription on OpenInterest in SubscriptionManager

* test:refactor: actual parameters of Option subscription + OpenInterest

* clean: spacing

* test:refactor: rename test

* refactor: lock _dataAggregator in ProcessOpenInterest
refactor: reuse UtcNow variable

* feat: use lock to all of _dataAggregator.Update()
  • Loading branch information
Romazes authored Sep 17, 2024
1 parent 426e686 commit 1ef1b36
Show file tree
Hide file tree
Showing 10 changed files with 484 additions and 23 deletions.
38 changes: 38 additions & 0 deletions QuantConnect.Polygon.Tests/PolygonDataProviderBaseTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,44 @@ private void ProcessFeed(IEnumerator<BaseData> enumerator, Action<BaseData> call
});
}

protected Task ProcessFeed(
IEnumerator<BaseData> enumerator,
CancellationToken cancellationToken,
int cancellationTokenDelayMilliseconds = 100,
Action<BaseData> callback = null,
Action throwExceptionCallback = null)
{
return Task.Factory.StartNew(() =>
{
try
{
while (enumerator.MoveNext() && !cancellationToken.IsCancellationRequested)
{
BaseData tick = enumerator.Current;

if (tick != null)
{
callback?.Invoke(tick);
}

cancellationToken.WaitHandle.WaitOne(TimeSpan.FromMilliseconds(cancellationTokenDelayMilliseconds));
}
}
catch (Exception ex)
{
Log.Debug($"{nameof(PolygonOpenInterestProcessorManagerTests)}.{nameof(ProcessFeed)}.Exception: {ex.Message}");
throw;
}
}, cancellationToken).ContinueWith(task =>
{
if (throwExceptionCallback != null)
{
throwExceptionCallback();
}
Log.Debug("The throwExceptionCallback is null.");
}, TaskContinuationOptions.OnlyOnFaulted);
}

protected SubscriptionDataConfig GetSubscriptionDataConfig<T>(Symbol symbol, Resolution resolution)
{
return new SubscriptionDataConfig(
Expand Down
12 changes: 7 additions & 5 deletions QuantConnect.Polygon.Tests/PolygonDataProviderOptionsTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
Expand Down Expand Up @@ -145,11 +145,12 @@ protected override List<SubscriptionDataConfig> GetConfigs(Resolution resolution
.Select(strike =>
{
var symbol = Symbol.CreateOption(Symbols.SPY, Market.USA, OptionStyle.American, OptionRight.Call, strike,
new DateTime(2024, 01, 05));
new DateTime(2024, 09, 20));
return new[]
{
GetSubscriptionDataConfig<TradeBar>(symbol, resolution),
GetSubscriptionDataConfig<QuoteBar>(symbol, resolution)
GetSubscriptionDataConfig<QuoteBar>(symbol, resolution),
GetSubscriptionDataConfig<OpenInterest>(symbol, resolution)
};
})
.SelectMany(x => x);
Expand All @@ -158,11 +159,12 @@ protected override List<SubscriptionDataConfig> GetConfigs(Resolution resolution
.Select(strike =>
{
var symbol = Symbol.CreateOption(Symbols.SPX, "SPXW", Market.USA, OptionStyle.American, OptionRight.Call, strike,
new DateTime(2024, 01, 05));
new DateTime(2024, 09, 20));
return new[]
{
GetSubscriptionDataConfig<TradeBar>(symbol, resolution),
GetSubscriptionDataConfig<QuoteBar>(symbol, resolution)
GetSubscriptionDataConfig<QuoteBar>(symbol, resolution),
GetSubscriptionDataConfig<OpenInterest>(symbol, resolution)
};
})
.SelectMany(x => x);
Expand Down
140 changes: 140 additions & 0 deletions QuantConnect.Polygon.Tests/PolygonOpenInterestProcessorManagerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using NUnit.Framework;
using System.Threading;
using QuantConnect.Data;
using QuantConnect.Tests;
using QuantConnect.Data.Market;
using System.Collections.Generic;
using QuantConnect.Configuration;
using System.Collections.Concurrent;
using QuantConnect.Lean.Engine.DataFeeds;

namespace QuantConnect.Lean.DataSource.Polygon.Tests
{
public class PolygonOpenInterestProcessorManagerTests : PolygonDataProviderBaseTests
{
private readonly PolygonRestApiClient _restApiClient = new(Config.Get("polygon-api-key"));

private readonly PolygonSymbolMapper symbolMapper = new();

private readonly PolygonAggregationManager dataAggregator = new();

private readonly ManualTimeProvider _timeProviderInstance = new();

private readonly EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager = new()
{
SubscribeImpl = (symbols, _) => { return true; },
UnsubscribeImpl = (symbols, _) => { return true; }
};

private object _locker = new();

[TestCase("2024-09-16T09:30:59", true, Description = "Market: After Opening")]
[TestCase("2024-09-16T15:28:59", true, Description = "Market: Before Closing")]
[TestCase("2024-09-16T16:28:59", false, Description = "Market: Closed")]
public void GetOpenInterestInDifferentTimeExchangeTime(string mockDateTime, bool isShouldReturnData)
{
var waitOneDelay = isShouldReturnData ? TimeSpan.FromSeconds(30) : TimeSpan.FromSeconds(5);
var resetEvent = new AutoResetEvent(false);
var cancellationTokenSource = new CancellationTokenSource();
var optionContractsConfigs = GetConfigs();

var symbolOpenInterest = new ConcurrentDictionary<Symbol, decimal>();
Action<BaseData> callback = (baseData) =>
{
if (baseData == null)
{
return;
}

lock (_locker)
{
symbolOpenInterest[baseData.Symbol] = baseData.Value;

if (symbolOpenInterest.Count > 5)
{
resetEvent.Set();
}
}
};

foreach (var config in optionContractsConfigs)
{
_subscriptionManager.Subscribe(config);
ProcessFeed(
Subscribe(dataAggregator, config, (sender, args) => { }),
cancellationTokenSource.Token,
callback: callback
);
}

var mockDateTimeAfterOpenExchange = DateTime.Parse(mockDateTime).ConvertTo(TimeZones.NewYork, TimeZones.Utc);
_timeProviderInstance.SetCurrentTimeUtc(mockDateTimeAfterOpenExchange);
var processor = new PolygonOpenInterestProcessorManager(_timeProviderInstance, _restApiClient, symbolMapper, _subscriptionManager, dataAggregator, GetTickTime);
processor.ScheduleNextRun();
resetEvent.WaitOne(waitOneDelay, cancellationTokenSource.Token);

if (isShouldReturnData)
{
Assert.Greater(symbolOpenInterest.Count, 0);
foreach (var (symbol, openInterest) in symbolOpenInterest)
{
Assert.Greater(openInterest, 0);
}
}
else
{
Assert.Zero(symbolOpenInterest.Count);
}

cancellationTokenSource.Cancel();
cancellationTokenSource.Dispose();
processor.Dispose();
symbolOpenInterest.Clear();
}

protected override List<SubscriptionDataConfig> GetConfigs(Resolution resolution = Resolution.Second)
{
var configs = new List<SubscriptionDataConfig>();

var expiryContractDate = new DateTime(2024, 09, 20);
var strikesAAPL = new decimal[] { 100m, 105m, 110m, 115m, 120m, 125m, 130m, 135m, 140m, 145m };

foreach (var strike in strikesAAPL)
{
var optionContract = Symbol.CreateOption(Symbols.AAPL, Market.USA, OptionStyle.American, OptionRight.Call, strike, expiryContractDate);
configs.Add(GetSubscriptionDataConfig<OpenInterest>(optionContract, resolution));
}

var strikesSPY = new decimal[] { 300m, 320m, 360m, 365m, 380m, 400m, 415m, 420m, 430m, 435m };

foreach (var strike in strikesSPY)
{
var optionContract = Symbol.CreateOption(Symbols.SPY, Market.USA, OptionStyle.American, OptionRight.Call, strike, expiryContractDate);
configs.Add(GetSubscriptionDataConfig<OpenInterest>(optionContract, resolution));
}

return configs;
}

private DateTime GetTickTime(Symbol symbol, DateTime utcTime) => utcTime.ConvertFromUtc(TimeZones.NewYork);

private IEnumerator<BaseData> Subscribe(PolygonAggregationManager dataAggregator, SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler)
=> dataAggregator.Add(dataConfig, newDataAvailableHandler);
}
}
5 changes: 0 additions & 5 deletions QuantConnect.Polygon/PolygonAggregationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ public void SetUsingAggregates(bool useAggregates)
/// </summary>
protected override IDataConsolidator GetConsolidator(SubscriptionDataConfig config)
{
if (config.TickType == TickType.OpenInterest)
{
throw new ArgumentException($"Unsupported subscription tick type {config.TickType}");
}

if (_usingAggregates)
{
// Starter plan only supports streaming aggregated data.
Expand Down
28 changes: 15 additions & 13 deletions QuantConnect.Polygon/PolygonDataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ private void Initialize(string apiKey, int maxSubscriptionsPerWebSocket, bool st
maxSubscriptionsPerWebSocket,
(securityType) => new PolygonWebSocketClientWrapper(_apiKey, _symbolMapper, securityType, OnMessage));
}
var openInterestManager = new PolygonOpenInterestProcessorManager(TimeProvider, RestApiClient, _symbolMapper, _subscriptionManager, _dataAggregator, GetTickTime);
openInterestManager.ScheduleNextRun();
}

#region IDataQueueHandler implementation
Expand Down Expand Up @@ -296,7 +298,10 @@ private void ProcessAggregate(AggregateMessage aggregate)
var period = TimeSpan.FromMilliseconds(aggregate.EndingTickTimestamp - aggregate.StartingTickTimestamp);
var bar = new TradeBar(time, symbol, aggregate.Open, aggregate.High, aggregate.Low, aggregate.Close, aggregate.Volume, period);

_dataAggregator.Update(bar);
lock (_dataAggregator)
{
_dataAggregator.Update(bar);
}
}

/// <summary>
Expand All @@ -308,7 +313,10 @@ private void ProcessTrade(TradeMessage trade)
var time = GetTickTime(symbol, trade.Timestamp);
// TODO: Map trade.Conditions to Lean sale conditions
var tick = new Tick(time, symbol, string.Empty, GetExchangeCode(trade.ExchangeID), trade.Size, trade.Price);
_dataAggregator.Update(tick);
lock (_dataAggregator)
{
_dataAggregator.Update(tick);
}
}

/// <summary>
Expand All @@ -322,7 +330,10 @@ private void ProcessQuote(QuoteMessage quote)
// Note: Polygon's quotes have bid/ask exchange IDs, but Lean only has one exchange per tick. We'll use the bid exchange.
var tick = new Tick(time, symbol, string.Empty, GetExchangeCode(quote.BidExchangeID),
quote.BidSize, quote.BidPrice, quote.AskSize, quote.AskPrice);
_dataAggregator.Update(tick);
lock (_dataAggregator)
{
_dataAggregator.Update(tick);
}
}

/// <summary>
Expand Down Expand Up @@ -428,18 +439,9 @@ private bool IsSupported(SecurityType securityType, Type dataType, TickType tick
return false;
}

if (tickType == TickType.OpenInterest)
{
if (!_unsupportedTickTypeMessagedLogged)
{
_unsupportedTickTypeMessagedLogged = true;
Log.Trace($"PolygonDataProvider.IsSupported(): Unsupported tick type: {tickType}");
}
return false;
}

if (!dataType.IsAssignableFrom(typeof(TradeBar)) &&
!dataType.IsAssignableFrom(typeof(QuoteBar)) &&
!dataType.IsAssignableFrom(typeof(OpenInterest)) &&
!dataType.IsAssignableFrom(typeof(Tick)))
{
if (!_unsupportedDataTypeMessageLogged)
Expand Down
10 changes: 10 additions & 0 deletions QuantConnect.Polygon/PolygonHistoryProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ public override void Initialize(HistoryProviderInitializeParameters parameters)
return null;
}

if (request.TickType == TickType.OpenInterest)
{
if (!_unsupportedTickTypeMessagedLogged)
{
_unsupportedTickTypeMessagedLogged = true;
Log.Trace($"PolygonDataProvider.GetHistory(): Unsupported tick type: {TickType.OpenInterest}");
}
return null;
}

// Quote data can only be fetched from Polygon from their Quote Tick endpoint,
// which would be too slow for anything above second resolution or long time spans.
if (request.TickType == TickType.Quote && request.Resolution > Resolution.Second)
Expand Down
Loading

0 comments on commit 1ef1b36

Please sign in to comment.