Skip to content

Commit d254280

Browse files
committed
lots and lots of bug fixes, in particular fixes up weird asyncio thread blocks calling RPC from zmq
1 parent e5b47fb commit d254280

File tree

9 files changed

+266
-452
lines changed

9 files changed

+266
-452
lines changed

async_cli.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
invoke_context_wrapper,
7272
monkeypatch_click_echo,
7373
portfolio_helper,
74-
resolve_conid_to_security_definition,
74+
resolve_symbol,
7575
setup_cli,
7676
strategy_helper
7777
)
@@ -80,7 +80,15 @@
8080
from trader.cli.repl_input import ReplInput
8181
from trader.common.dataclass_cache import DataClassEvent, UpdateEvent
8282
from trader.common.exceptions import TraderConnectionException, TraderException
83-
from trader.common.helpers import contract_from_dict, DictHelper, rich_dict, rich_json, rich_list, rich_table
83+
from trader.common.helpers import (
84+
contract_from_dict,
85+
DictHelper,
86+
ListHelper,
87+
rich_dict,
88+
rich_json,
89+
rich_list,
90+
rich_table
91+
)
8492
from trader.common.logging_helper import LogLevels, set_log_level, setup_logging
8593
from trader.common.reactivex import SuccessFail
8694
from trader.container import Container as TraderContainer
@@ -121,15 +129,18 @@ def __init__(self, width, height):
121129

122130
def filter_plot(self, conId: int):
123131
self.y_values = []
124-
self.rich_canvas = Group(*self.decoder.decode('waiting for data...'))
132+
self.rich_canvas = Group(*self.decoder.decode('waiting for data for conId: {} ...'.format(conId)))
125133
self.filter = conId
126134
self.dt = dt.datetime.now()
127135

128136
def make_plot(self, x, y, width, height):
137+
def format_yticks(y_value):
138+
return f"{y_value:.2f}"
129139
plt.clf()
130140
plt.grid(1, 1)
131141
plt.title(self.filter)
132142
plt.xticks([0.0, len(self.y_values) - 1], [self.dt.strftime('%H:%M:%S'), dt.datetime.now().strftime('%H:%M:%S')])
143+
plt.yticks(ticks=self.y_values, labels=[format_yticks(y_value) for y_value in self.y_values])
133144
plt.plot(
134145
list(range(0, len(self.y_values))),
135146
self.y_values,
@@ -141,7 +152,7 @@ def make_plot(self, x, y, width, height):
141152
def ticker(self, ticker: Ticker):
142153
if ticker.contract and ticker.contract.conId == self.filter:
143154
last = ticker.last if ticker.last >= 0.0 else ticker.close
144-
self.y_values.append(last)
155+
self.y_values.append(round(last, 2))
145156
canvas = self.make_plot(range(0, len(self.y_values)), self.y_values, self.width, self.height)
146157
self.rich_canvas = Group(*self.decoder.decode(canvas))
147158

@@ -398,6 +409,7 @@ def __init__(
398409
# Binding('ctrl+u', 'dialog', 'Dialog'),
399410
Binding('ctrl+p', 'plot', 'Plot'),
400411
Binding('ctrl+b', 'book', 'Book'),
412+
Binding('ctrl+l', 'listen', 'Listen'),
401413
Binding('ctrl+s', 'strategy', 'Strategies'),
402414
Binding('ctrl+q', 'quit', 'Quit'),
403415
]
@@ -409,6 +421,7 @@ def compose(self) -> ComposeResult:
409421
self.data_table: DataTable = DataTable()
410422
self.portfolio_table: DataTable = DataTable()
411423
self.book_table: DataTable = DataTable()
424+
self.listen_table: DataTable = DataTable()
412425
self.strategy_table: DataTable = DataTable()
413426

414427
self.repl_log = TextLog(id='text-box', highlight=False, wrap=True)
@@ -420,7 +433,7 @@ def compose(self) -> ComposeResult:
420433
)
421434

422435
self.plot = PlotextMixin(self.top.container_size.width, self.top.container_size.height)
423-
self.plot_static = Static(self.plot, expand=True, markup=False)
436+
self.plot_static = Static(self.plot, expand=True, markup=False, id='plot-static')
424437

425438
self.right_box = Container(
426439
Container(self.portfolio_table),
@@ -430,6 +443,7 @@ def compose(self) -> ComposeResult:
430443
self.left_box = Container(
431444
Container(self.repl_log, id='repl-log'),
432445
Container(self.book_table, id='book-table', classes='hidden'),
446+
Container(self.listen_table, id='listen-table', classes='hidden'),
433447
Container(self.strategy_table, id='strategy-table', classes='hidden'),
434448
id='bottom-left'
435449
)
@@ -521,6 +535,10 @@ def action_book(self) -> None:
521535
self.render_book()
522536
self.switch_widget('#bottom-left', 'book-table')
523537

538+
def action_listen(self) -> None:
539+
# self.render_listen()
540+
self.switch_widget('#bottom-left', 'listen-table')
541+
524542
def action_strategy(self) -> None:
525543
self.render_strategies()
526544
self.switch_widget('#bottom-left', 'strategy-table')
@@ -551,7 +569,7 @@ def start_plot(self, conid: Union[int, str]) -> None:
551569
if type(conid) == str and str(conid).isnumeric():
552570
conid = int(conid)
553571

554-
definition = resolve_conid_to_security_definition(int(conid))
572+
definition = ListHelper.first(resolve_symbol(int(conid)))
555573
if definition:
556574
contract = Universe.to_contract(definition)
557575

trader/cli/commands.py

Lines changed: 52 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,18 @@
1818
from trader.cli.cli_renderer import CliRenderer
1919
from trader.cli.command_line import common_options, default_config
2020
from trader.common.exceptions import TraderConnectionException, TraderException
21-
from trader.common.helpers import contract_from_dict, DictHelper
21+
from trader.common.helpers import contract_from_dict, DictHelper, ListHelper
2222
from trader.common.logging_helper import LogLevels, set_log_level, setup_logging
2323
from trader.common.reactivex import SuccessFail
2424
from trader.container import Container as TraderContainer
2525
from trader.data.data_access import DictData, PortfolioSummary, TickData, TickStorage
2626
from trader.data.universe import SecurityDefinition, Universe, UniverseAccessor
2727
from trader.listeners.ibreactive import IBAIORx, WhatToShow
28-
from trader.messaging.clientserver import RPCClient
28+
from trader.messaging.clientserver import consume, RPCClient
2929
from trader.messaging.trader_service_api import TraderServiceApi
3030
from trader.objects import BarSize, TradeLogSimple
3131
from trader.trading.strategy import Strategy, StrategyConfig, StrategyState
32-
from typing import Any, Dict, List, Optional, Tuple, Union
32+
from typing import Any, Callable, cast, Coroutine, Dict, List, Optional, Tuple, TypeVar, Union
3333

3434
import asyncio
3535
import click
@@ -65,7 +65,7 @@
6565
zmq_server_address=container.config()['zmq_rpc_server_address'],
6666
zmq_server_port=container.config()['zmq_rpc_server_port'],
6767
error_table=error_table,
68-
timeout=10,
68+
timeout=5,
6969
)
7070

7171
renderer = CliRenderer()
@@ -90,7 +90,7 @@ def closure(message, color=None, nl=True, err=False, **styles):
9090

9191

9292
def connect():
93-
if not remoted_client.connected:
93+
if not remoted_client.is_setup:
9494
asyncio.get_event_loop().run_until_complete(remoted_client.connect())
9595

9696

@@ -116,67 +116,68 @@ def setup_cli(cli_renderer: CliRenderer):
116116

117117
return remoted_client, cli_client_id
118118

119-
def resolve_conid_to_security_definition_db(
120-
conid: int,
119+
def resolve_symbol_arctic(
120+
symbol: Union[int, str],
121121
arctic_server_address: str,
122122
arctic_universe_library: str,
123123
) -> Optional[SecurityDefinition]:
124124
accessor = UniverseAccessor(arctic_server_address, arctic_universe_library)
125-
universe_security_definition = accessor.resolve_conid(conid)
126-
if universe_security_definition:
127-
return universe_security_definition[1]
128-
else:
129-
return None
125+
return ListHelper.first(accessor.resolve_symbol(symbol))
130126

131-
def resolve_conid_to_security_definition(
132-
conid: int,
133-
) -> Optional[SecurityDefinition]:
134-
result = remoted_client.rpc(
135-
return_type=list[tuple[Universe, SecurityDefinition]]
136-
).resolve_symbol_to_security_definitions(conid)
137-
if result:
138-
return result[0][1]
139-
else:
140-
return None
141127

142-
def resolve_symbol_to_security_definitions(
128+
def resolve_symbol(
143129
symbol: Union[str, int],
144-
) -> List[Tuple[Universe, SecurityDefinition]]:
145-
return remoted_client.rpc(
146-
return_type=list[tuple[Universe, SecurityDefinition]]
147-
).resolve_symbol_to_security_definitions(symbol)
130+
exchange: str = '',
131+
universe: str = '',
132+
) -> List[SecurityDefinition]:
133+
return consume(remoted_client.rpc(return_type=list[SecurityDefinition]).resolve_symbol(symbol, exchange, universe))
134+
148135

149136
def __resolve(
150137
symbol: Union[str, int],
151138
arctic_server_address: str,
152139
arctic_universe_library: str,
153-
primary_exchange: Optional[str] = ''
140+
exchange: str = '',
141+
universe: str = '',
154142
) -> List[Dict[str, Any]]:
155-
if not primary_exchange: primary_exchange = ''
156-
accessor = UniverseAccessor(arctic_server_address, arctic_universe_library)
157-
universe_definitions = accessor.resolve_symbol(symbol)
143+
# it's best to call the trader_runtime resolve method, as it can talk to Interactive Brokers if
144+
# the symbol is not found in any available universes
145+
temp_results: List[Tuple[str, SecurityDefinition]] = []
146+
147+
if remoted_client.is_setup:
148+
temp_results.extend(
149+
consume(
150+
remoted_client.rpc(return_type=list[tuple[str, SecurityDefinition]]).resolve_universe(symbol, exchange, universe)
151+
)
152+
)
153+
else:
154+
accessor = UniverseAccessor(arctic_server_address, arctic_universe_library)
155+
temp_results.extend(accessor.resolve_universe_name(symbol=symbol, exchange=exchange, universe=universe))
158156

159157
results: List[Dict] = []
160-
for universe, definition in universe_definitions:
158+
for universe_name, definition in temp_results:
161159
results.append({
162-
'universe': universe.name,
160+
'universe': universe_name,
163161
'conId': definition.conId,
164162
'symbol': definition.symbol,
163+
'secType': definition.secType,
165164
'exchange': definition.exchange,
166165
'primaryExchange': definition.primaryExchange,
167166
'currency': definition.currency,
168167
'longName': definition.longName,
169168
'category': definition.category,
170169
'minTick': definition.minTick,
170+
'bondType': definition.bondType,
171+
'description': definition.description,
171172
})
172-
return [r for r in results if primary_exchange in r['primaryExchange']]
173+
return results
173174

174175

175176
def __resolve_contract(
176177
symbol: Union[str, int],
177178
arctic_server_address: str,
178179
arctic_universe_library: str,
179-
primary_exchange: Optional[str] = ''
180+
primary_exchange: str = ''
180181
) -> List[Contract]:
181182
results = []
182183
descriptions = __resolve(symbol, arctic_server_address, arctic_universe_library, primary_exchange)
@@ -599,16 +600,17 @@ def mapper(portfolio: PortfolioSummary) -> List:
599600
)
600601

601602
df = pd.DataFrame(data=xs, columns=[
602-
'account', 'conId', 'localSymbol', 'dailyPNL', 'unrealizedPNL', 'realizedPNL', 'marketPrice', 'currency',
603+
'account', 'conId', 'localSymbol', 'dailyPNL', 'unrealizedPNL', 'marketPrice', 'realizedPNL', 'currency',
603604
'position', 'marketValue', 'averageCost',
604605
])
605606

606607
return df.sort_values(by='dailyPNL', ascending=False)
607608

608609
def strategy_helper() -> pd.DataFrame:
609610
connect()
610-
strategy_list: SuccessFail[List[StrategyConfig]] = remoted_client.rpc(
611-
return_type=SuccessFail[List[StrategyConfig]]).get_strategies()
611+
strategy_list: SuccessFail[List[StrategyConfig]] = consume(
612+
remoted_client.rpc(return_type=SuccessFail[List[StrategyConfig]]).get_strategies()
613+
)
612614
if strategy_list.is_success() and strategy_list.obj:
613615
result = []
614616
for s in strategy_list.obj:
@@ -788,7 +790,8 @@ def clear():
788790

789791
@cli.command(no_args_is_help=True)
790792
@click.option('--symbol', required=True, help='symbol to resolve to conId')
791-
@click.option('--primary_exchange', required=False, default='NASDAQ', help='exchange for symbol [not required]')
793+
@click.option('--exchange', required=False, help='exchange for symbol [not required]')
794+
@click.option('--universe', required=False, help='universe to check for symbol [not required]')
792795
@click.option('--ib', required=False, default=False, is_flag=True, help='force resolution from IB')
793796
@click.option('--sec_type', required=False, default='STK', help='IB security type [STK is default]')
794797
@click.option('--currency', required=False, default='USD', help='IB security currency')
@@ -798,7 +801,8 @@ def resolve(
798801
symbol: str,
799802
arctic_server_address: str,
800803
arctic_universe_library: str,
801-
primary_exchange: str,
804+
exchange: str,
805+
universe: str,
802806
ib: bool,
803807
sec_type: str,
804808
currency: str,
@@ -810,15 +814,15 @@ def resolve(
810814
contract = asyncio.get_event_loop().run_until_complete(client.get_conid(
811815
symbols=symbol,
812816
secType=sec_type,
813-
primaryExchange=primary_exchange,
817+
primaryExchange=exchange,
814818
currency=currency
815819
))
816820
if contract and type(contract) is list:
817821
renderer.rich_list(contract)
818822
elif contract and type(contract) is Contract:
819823
renderer.rich_dict(contract.__dict__)
820824
else:
821-
results = __resolve(symbol, arctic_server_address, arctic_universe_library, primary_exchange)
825+
results = __resolve(symbol, arctic_server_address, arctic_universe_library, exchange, universe)
822826
if len(results) > 0:
823827
renderer.rich_table(results, False)
824828
else:
@@ -851,7 +855,7 @@ def snapshot(
851855
)
852856

853857
# awaitable = remoted_client.rpc(return_type=Ticker).get_snapshot(contract, delayed)
854-
ticker = remoted_client.rpc(return_type=Ticker).get_snapshot(contract, delayed)
858+
ticker = consume(remoted_client.rpc(return_type=Ticker).get_snapshot(contract, delayed))
855859
snap = {
856860
'symbol': ticker.contract.symbol if ticker.contract else '',
857861
'exchange': ticker.contract.exchange if ticker.contract else '',
@@ -987,21 +991,6 @@ def options(
987991
plot_chain(symbol, list_dates, date, True, risk_free_rate)
988992

989993

990-
# @main.group()
991-
# def loadtest():
992-
# pass
993-
994-
995-
# @loadtest.command('start')
996-
# def load_test_start():
997-
# remoted_client.rpc().start_load_test()
998-
999-
1000-
# @loadtest.command('stop')
1001-
# def load_test_stop():
1002-
# remoted_client.rpc().stop_load_test()
1003-
1004-
1005994
# CLI_BOOK
1006995
@cli.group()
1007996
def book():
@@ -1081,7 +1070,7 @@ def strategy_enable(
10811070
name: str,
10821071
paper: bool,
10831072
):
1084-
success_fail = remoted_client.rpc().enable_strategy(name, paper)
1073+
success_fail = consume(remoted_client.rpc().enable_strategy(name, paper))
10851074
if success_fail.is_success():
10861075
renderer.rich_dict({'state': success_fail.obj})
10871076
else:
@@ -1093,7 +1082,7 @@ def strategy_enable(
10931082
def strategy_disable(
10941083
name: str,
10951084
):
1096-
success_fail = remoted_client.rpc().disable_strategy(name)
1085+
success_fail = consume(remoted_client.rpc().disable_strategy(name))
10971086
if success_fail.is_success():
10981087
renderer.rich_dict({'state': success_fail.obj})
10991088
else:
@@ -1168,8 +1157,7 @@ def __trade_helper(
11681157
if limit and limit <= 0.0:
11691158
raise ValueError('limit price can be less than or equal to 0.0: {}'.format(limit))
11701159

1171-
universe_definitions = resolve_symbol_to_security_definitions(symbol)
1172-
definitions = list({t[1] for t in universe_definitions})
1160+
definitions = resolve_symbol(symbol)
11731161

11741162
if len(definitions) == 0:
11751163
click.echo('no contract found for symbol {}'.format(symbol))
@@ -1184,7 +1172,7 @@ def __trade_helper(
11841172
contract = Universe.to_contract(security)
11851173

11861174
action = 'BUY' if buy else 'SELL'
1187-
trade: SuccessFail[Trade] = remoted_client.rpc(return_type=SuccessFail[Trade]).place_order_simple(
1175+
trade: SuccessFail[Trade] = consume(remoted_client.rpc(return_type=SuccessFail[Trade]).place_order_simple(
11881176
contract=contract,
11891177
action=action,
11901178
equity_amount=equity_amount,
@@ -1193,7 +1181,7 @@ def __trade_helper(
11931181
market_order=market,
11941182
stop_loss_percentage=stop_loss_percentage,
11951183
debug=debug,
1196-
)
1184+
))
11971185

11981186
output_dict = {
11991187
'result': trade.success_fail,
@@ -1400,5 +1388,3 @@ def error_out():
14001388
arctic_universe_library=arctic_universe_library,
14011389
args=args,
14021390
)
1403-
1404-

0 commit comments

Comments
 (0)