Skip to content

Commit 6b21827

Browse files
authored
Merge pull request #270 from macanudo527/pair_converters/refactor_find_historical_bars
Refactor find historical bars
2 parents 3541990 + 395a132 commit 6b21827

5 files changed

+85
-66
lines changed

setup.cfg

+3-4
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,10 @@ classifiers =
1313
Intended Audience :: End Users/Desktop
1414
License :: OSI Approved :: Apache Software License
1515
Operating System :: OS Independent
16-
Programming Language :: Python :: 3.8
17-
Programming Language :: Python :: 3.9
1816
Programming Language :: Python :: 3.10
1917
Programming Language :: Python :: 3.11
2018
Programming Language :: Python :: 3.12
19+
Programming Language :: Python :: 3.13
2120
Topic :: Office/Business :: Financial :: Accounting
2221
Topic :: Utilities
2322
Typing :: Typed
@@ -27,7 +26,6 @@ project_urls =
2726
User Documentation = https://github.com/eprbell/dali-rp2/blob/main/README.md
2827
Contact = https://eprbell.github.io/eprbell/about.html
2928

30-
# vcpy > 4.4.0 is only compatible with Python 3.10+
3129
[options]
3230
package_dir =
3331
= src
@@ -38,6 +36,7 @@ install_requires =
3836
Historic-Crypto>=0.1.6
3937
jsonschema>=3.2.0
4038
pandas
39+
pandas-stubs
4140
prezzemolo>=0.0.4
4241
progressbar2>=4.2.0
4342
pyexcel-ezodf>=0.3.4
@@ -46,7 +45,7 @@ install_requires =
4645
pytz>=2021.3
4746
requests>=2.26.0
4847
rp2>=1.7.1
49-
vcrpy==4.4.0
48+
vcrpy
5049

5150
[options.extras_require]
5251
dev =

src/dali/abstract_ccxt_pair_converter_plugin.py

+29-20
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@
247247
DAYS_IN_WEEK: int = 7
248248
MANY_YEARS_IN_THE_FUTURE: relativedelta = relativedelta(years=100)
249249

250+
250251
class AssetPairAndHistoricalPrice(NamedTuple):
251252
from_asset: str
252253
to_asset: str
@@ -513,16 +514,23 @@ def find_historical_bar(self, from_asset: str, to_asset: str, timestamp: datetim
513514
def find_historical_bars(
514515
self, from_asset: str, to_asset: str, timestamp: datetime, exchange: str, all_bars: bool = False, timespan: str = _MINUTE
515516
) -> Optional[List[HistoricalBar]]:
517+
518+
# Guard clause to pull from cache
519+
if all_bars:
520+
cached_bundle = self._get_bundle_from_cache(AssetPairAndTimestamp(timestamp, from_asset, to_asset, exchange))
521+
if cached_bundle:
522+
# If the last bar in the bundle is within the last week, return the bundle
523+
if (datetime.now(timezone.utc) - cached_bundle[-1].timestamp).total_seconds() <= _TIME_GRANULARITY_STRING_TO_SECONDS[_ONE_WEEK]:
524+
return cached_bundle
525+
# If the last bar in the bundle is older than a week, we need to start from the next millisecond
526+
# We will pull the rest later and add to this bundle
527+
timestamp = cached_bundle[-1].timestamp + timedelta(milliseconds=1)
528+
529+
# Stage 1 Initialization
516530
result: List[HistoricalBar] = []
517-
retry_count: int = 0
518531
self.__transaction_count += 1
519-
if timespan in _TIME_GRANULARITY_SET:
520-
if exchange in _NONSTANDARD_GRANULARITY_EXCHANGE_SET:
521-
retry_count = _TIME_GRANULARITY_DICT[exchange].index(timespan)
522-
else:
523-
retry_count = _TIME_GRANULARITY.index(timespan)
524-
else:
525-
raise RP2ValueError("Internal error: Invalid time span passed to find_historical_bars.")
532+
retry_count: int = self._initialize_retry_count(exchange, timespan)
533+
526534
current_exchange: Any = self.__exchanges[exchange]
527535
ms_timestamp: int = int(timestamp.timestamp() * _MS_IN_SECOND)
528536
csv_pricing: Any = self.__csv_pricing_dict.get(exchange)
@@ -566,18 +574,6 @@ def find_historical_bars(
566574

567575
within_last_week: bool = False
568576

569-
# Get bundles of bars if they exist, saving us from making a call to the API
570-
if all_bars:
571-
cached_bundle: Optional[List[HistoricalBar]] = self._get_bundle_from_cache(AssetPairAndTimestamp(timestamp, from_asset, to_asset, exchange))
572-
if cached_bundle:
573-
result.extend(cached_bundle)
574-
timestamp = cached_bundle[-1].timestamp + timedelta(milliseconds=1)
575-
ms_timestamp = int(timestamp.timestamp() * _MS_IN_SECOND)
576-
577-
# If the bundle of bars is within the last week, we don't need to pull new optimization data.
578-
if result and (datetime.now(timezone.utc) - result[-1].timestamp).total_seconds() > _TIME_GRANULARITY_STRING_TO_SECONDS[_ONE_WEEK]:
579-
within_last_week = True
580-
581577
while (retry_count < len(_TIME_GRANULARITY_DICT.get(exchange, _TIME_GRANULARITY))) and not within_last_week:
582578
timeframe: str = _TIME_GRANULARITY_DICT.get(exchange, _TIME_GRANULARITY)[retry_count]
583579
request_count: int = 0
@@ -718,6 +714,8 @@ def find_historical_bars(
718714
)
719715
)
720716
elif all_bars:
717+
if cached_bundle:
718+
result = cached_bundle + result
721719
self._add_bundle_to_cache(AssetPairAndTimestamp(timestamp, from_asset, to_asset, exchange), result)
722720
break # If historical_data is empty we have hit the end of records and need to return
723721
else:
@@ -728,6 +726,17 @@ def find_historical_bars(
728726

729727
return result
730728

729+
def _initialize_retry_count(self, exchange: str, timespan: str) -> int:
730+
if timespan not in _TIME_GRANULARITY_SET:
731+
raise RP2ValueError(f"Internal error: Invalid time span '{timespan}' passed to find_historical_bars.")
732+
733+
granularity = _TIME_GRANULARITY_DICT[exchange] if exchange in _NONSTANDARD_GRANULARITY_EXCHANGE_SET else _TIME_GRANULARITY
734+
735+
if timespan not in granularity:
736+
raise RP2ValueError(f"Internal error: Time span '{timespan}' is not valid for exchange '{exchange}'.")
737+
738+
return granularity.index(timespan)
739+
731740
def _add_alternative_markets(self, graph: MappedGraph[str], current_markets: Dict[str, List[str]]) -> None:
732741
for base_asset, quote_asset in _ALT_MARKET_BY_BASE_DICT.items():
733742
alt_market = base_asset + quote_asset

src/stubs/ezodf/table.pyi

+3
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from typing import Tuple
16+
1517
class Table:
1618
name: str
1719

1820
def __init__(self, name: str) -> None: ...
1921
def nrows(self) -> int: ...
22+
def reset(self, size: Tuple[int, int]) -> None: ...

src/stubs/pandas.pyi

-37
This file was deleted.

tests/test_abstract_ccxt_pair_converter.py

+50-5
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,21 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from datetime import datetime, timedelta
15+
from datetime import datetime, timedelta, timezone
1616
from typing import Any, Dict, List, Optional, Set
1717

1818
import pytest
1919
from prezzemolo.vertex import Vertex
2020
from rp2.rp2_decimal import RP2Decimal
21+
from rp2.rp2_error import RP2ValueError
2122

2223
from dali.abstract_ccxt_pair_converter_plugin import (
24+
_BINANCE,
25+
_COINBASE_PRO,
26+
_ONE_HOUR,
27+
_SIX_HOUR,
28+
_TIME_GRANULARITY,
29+
_TIME_GRANULARITY_DICT,
2330
MARKET_PADDING_IN_WEEKS,
2431
AbstractCcxtPairConverterPlugin,
2532
)
@@ -74,10 +81,11 @@ def unoptimized_graph(self, vertex_list: Dict[str, Vertex[str]]) -> MappedGraph[
7481

7582
@pytest.fixture
7683
def historical_bars(self) -> Dict[str, HistoricalBar]:
84+
now_time = datetime.now(timezone.utc)
7785
return {
7886
MARKET_START: HistoricalBar(
7987
duration=timedelta(weeks=1),
80-
timestamp=datetime(2023, 1, 1),
88+
timestamp=now_time,
8189
open=RP2Decimal("1.0"),
8290
high=RP2Decimal("2.0"),
8391
low=RP2Decimal("0.5"),
@@ -86,7 +94,7 @@ def historical_bars(self) -> Dict[str, HistoricalBar]:
8694
),
8795
ONE_WEEK_EARLIER: HistoricalBar(
8896
duration=timedelta(weeks=1),
89-
timestamp=datetime(2023, 1, 1) - timedelta(weeks=1),
97+
timestamp=now_time - timedelta(weeks=1),
9098
open=RP2Decimal("1.1"),
9199
high=RP2Decimal("2.1"),
92100
low=RP2Decimal("0.6"),
@@ -116,7 +124,7 @@ def test_retrieve_historical_bars(
116124
plugin = MockAbstractCcxtPairConverterPlugin(Keyword.HISTORICAL_PRICE_HIGH.value)
117125
unoptimized_assets: Set[str] = {"A", "B"}
118126
optimization_candidates: Set[Vertex[str]] = {vertex_list["A"], vertex_list["B"], vertex_list["C"]}
119-
week_start_date = datetime(2023, 1, 1)
127+
week_start_date = historical_bars[MARKET_START].timestamp
120128

121129
mocker.patch.object(plugin, "_AbstractCcxtPairConverterPlugin__exchange_markets", {TEST_EXCHANGE: TEST_MARKETS})
122130

@@ -148,7 +156,7 @@ def find_historical_bars_side_effect(
148156

149157
def test_generate_optimizations(self, historical_bars: Dict[str, HistoricalBar]) -> None:
150158
plugin = MockAbstractCcxtPairConverterPlugin(Keyword.HISTORICAL_PRICE_HIGH.value)
151-
week_start_date = datetime(2023, 1, 1)
159+
week_start_date = historical_bars[MARKET_START].timestamp
152160

153161
child_bars = {"A": {"B": [historical_bars[MARKET_START], historical_bars[ONE_WEEK_EARLIER]]}}
154162

@@ -193,3 +201,40 @@ def test_refine_and_finalize_optimizations(self) -> None:
193201
assert refined_optimizations[datetime(2023, 1, 4)]["A"]["C"] == 1.0
194202
assert refined_optimizations[datetime(2023, 1, 4)]["D"]["F"] == 1.0
195203
assert "E" not in refined_optimizations[datetime(2023, 1, 4)]["D"]
204+
205+
def test_initialize_retry_count(self) -> None:
206+
plugin = MockAbstractCcxtPairConverterPlugin(Keyword.HISTORICAL_PRICE_HIGH.value)
207+
208+
assert plugin._initialize_retry_count(_BINANCE, _ONE_HOUR) == _TIME_GRANULARITY.index(_ONE_HOUR) # pylint: disable=protected-access
209+
assert plugin._initialize_retry_count(_COINBASE_PRO, _SIX_HOUR) == _TIME_GRANULARITY_DICT[_COINBASE_PRO].index( # pylint: disable=protected-access
210+
_SIX_HOUR
211+
)
212+
with pytest.raises(RP2ValueError):
213+
# Binance does not support 6 hour granularity
214+
assert plugin._initialize_retry_count(_BINANCE, _SIX_HOUR) # pylint: disable=protected-access
215+
assert plugin._initialize_retry_count(_COINBASE_PRO, "invalid") # pylint: disable=protected-access
216+
217+
def test_find_historical_bars_guard_clause(self, mocker: Any, historical_bars: Dict[str, HistoricalBar]) -> None:
218+
plugin = MockAbstractCcxtPairConverterPlugin(Keyword.HISTORICAL_PRICE_HIGH.value)
219+
220+
mocker.patch.object(plugin, "_get_bundle_from_cache", return_value=[historical_bars[MARKET_START]])
221+
222+
bars = plugin.find_historical_bars("A", "B", datetime(2023, 1, 1), TEST_EXCHANGE, True)
223+
224+
assert bars
225+
assert len(bars) == 1
226+
assert bars[0] == historical_bars[MARKET_START]
227+
228+
# To be enabled when _fetch_historical_bars is implemented
229+
def disabled_test_find_historical_bars_add_to_cache(self, mocker: Any, historical_bars: Dict[str, HistoricalBar]) -> None:
230+
plugin = MockAbstractCcxtPairConverterPlugin(Keyword.HISTORICAL_PRICE_HIGH.value)
231+
232+
mocker.patch.object(plugin, "_get_bundle_from_cache", return_value=historical_bars[ONE_WEEK_EARLIER])
233+
mocker.patch.object(plugin, "_fetch_historical_bars", return_value=[historical_bars[MARKET_START]]) # function that calls the API
234+
235+
bars = plugin.find_historical_bars("A", "B", datetime(2023, 1, 1), TEST_EXCHANGE, True)
236+
237+
assert bars
238+
assert len(bars) == 2
239+
assert bars[0] == historical_bars[ONE_WEEK_EARLIER]
240+
assert bars[1] == historical_bars[MARKET_START]

0 commit comments

Comments
 (0)