-
-
Notifications
You must be signed in to change notification settings - Fork 139
/
Copy pathcryptostore.py
173 lines (157 loc) · 7.91 KB
/
cryptostore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
'''
Copyright (C) 2018-2024 Bryant Moscon - [email protected]
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from datetime import datetime
import os
from cryptofeed import FeedHandler
from cryptofeed.raw_data_collection import AsyncFileCallback
from cryptofeed.exchanges import EXCHANGE_MAP
from cryptofeed.feed import Feed
from cryptofeed.defines import L2_BOOK, TICKER, TRADES, FUNDING, CANDLES, OPEN_INTEREST, LIQUIDATIONS
from cryptofeed.backends.redis import BookRedis, TradeRedis, TickerRedis, FundingRedis, CandlesRedis, OpenInterestRedis, LiquidationsRedis
from cryptofeed.backends.redis import BookStream, TradeStream, TickerStream, FundingStream, CandlesStream, OpenInterestStream, LiquidationsStream
from cryptofeed.backends.mongo import BookMongo, TradeMongo, TickerMongo, FundingMongo, CandlesMongo, OpenInterestMongo, LiquidationsMongo
from cryptofeed.backends.postgres import BookPostgres, TradePostgres, TickerPostgres, FundingPostgres, CandlesPostgres, OpenInterestPostgres, LiquidationsPostgres
from cryptofeed.backends.socket import BookSocket, TradeSocket, TickerSocket, FundingSocket, CandlesSocket, OpenInterestSocket, LiquidationsSocket
from cryptofeed.backends.influxdb import BookInflux, TradeInflux, TickerInflux, FundingInflux, CandlesInflux, OpenInterestInflux, LiquidationsInflux
from cryptofeed.backends.quest import BookQuest, TradeQuest, TickerQuest, FundingQuest, CandlesQuest, OpenInterestQuest, LiquidationsQuest
async def tty(obj, receipt_ts):
# For debugging purposes
rts = datetime.utcfromtimestamp(receipt_ts).strftime('%Y-%m-%d %H:%M:%S')
print(f"{rts} - {obj}")
def load_config() -> Feed:
exchange = os.environ.get('EXCHANGE')
symbols = os.environ.get('SYMBOLS')
if symbols is None:
raise ValueError("Symbols must be specified")
symbols = symbols.split(",")
channels = os.environ.get('CHANNELS')
if channels is None:
raise ValueError("Channels must be specified")
channels = channels.split(",")
config = os.environ.get('CONFIG')
backend = os.environ.get('BACKEND')
snap_only = os.environ.get('SNAPSHOT_ONLY', False)
if snap_only:
if snap_only.lower().startswith('f'):
snap_only = False
elif snap_only.lower().startswith('t'):
snap_only = True
else:
raise ValueError('Invalid value specified for SNAPSHOT_ONLY')
snap_interval = os.environ.get('SNAPSHOT_INTERVAL', 1000)
snap_interval = int(snap_interval)
host = os.environ.get('HOST', '127.0.0.1')
port = os.environ.get('PORT')
if port:
port = int(port)
candle_interval = os.environ.get('CANDLE_INTERVAL', '1m')
database = os.environ.get('DATABASE')
user = os.environ.get('USER')
password = os.environ.get('PASSWORD')
org = os.environ.get('ORG')
bucket = os.environ.get('BUCKET')
token = os.environ.get('TOKEN')
cbs = None
allowed_backends = ['REDIS', 'REDISSTREAM', 'MONGO', 'POSTGRES', 'TCP', 'UDP', 'UDS', 'INFLUX', 'QUEST', 'TTY']
if backend in allowed_backends:
if backend == 'REDIS' or backend == 'REDISSTREAM':
kwargs = {'host': host, 'port': port if port else 6379}
cbs = {
L2_BOOK: BookRedis(snapshot_interval=snap_interval, snapshots_only=snap_only, **kwargs) if backend == 'REDIS' else BookStream(snapshot_interval=snap_interval, snapshots_only=snap_only, **kwargs),
TRADES: TradeRedis(**kwargs) if backend == 'REDIS' else TradeStream(**kwargs),
TICKER: TickerRedis(**kwargs) if backend == 'REDIS' else TickerStream(**kwargs),
FUNDING: FundingRedis(**kwargs) if backend == 'REDIS' else FundingStream(**kwargs),
CANDLES: CandlesRedis(**kwargs) if backend == 'REDIS' else CandlesStream(**kwargs),
OPEN_INTEREST: OpenInterestRedis(**kwargs) if backend == 'REDIS' else OpenInterestStream(**kwargs),
LIQUIDATIONS: LiquidationsRedis(**kwargs) if backend == 'REDIS' else LiquidationsStream(**kwargs)
}
elif backend == 'MONGO':
kwargs = {'host': host, 'port': port if port else 27101}
cbs = {
L2_BOOK: BookMongo(database, snapshot_interval=snap_interval, snapshots_only=snap_only, **kwargs),
TRADES: TradeMongo(database, **kwargs),
TICKER: TickerMongo(database, **kwargs),
FUNDING: FundingMongo(database, **kwargs),
CANDLES: CandlesMongo(database, **kwargs),
OPEN_INTEREST: OpenInterestMongo(database, **kwargs),
LIQUIDATIONS: LiquidationsMongo(database, **kwargs)
}
elif backend == 'POSTGRES':
kwargs = {'db': database, 'host': host, 'port': port if port else 5432, 'user': user, 'pw': password}
cbs = {
L2_BOOK: BookPostgres(snapshot_interval=snap_interval, snapshots_only=snap_only, **kwargs),
TRADES: TradePostgres(**kwargs),
TICKER: TickerPostgres(**kwargs),
FUNDING: FundingPostgres(**kwargs),
CANDLES: CandlesPostgres(**kwargs),
OPEN_INTEREST: OpenInterestPostgres(**kwargs),
LIQUIDATIONS: LiquidationsPostgres(**kwargs)
}
elif backend in ('TCP', 'UDP', 'UDS'):
kwargs = {'port': port}
cbs = {
L2_BOOK: BookSocket(host, snapshot_interval=snap_interval, snapshots_only=snap_only, **kwargs),
TRADES: TradeSocket(host, **kwargs),
TICKER: TickerSocket(host, **kwargs),
FUNDING: FundingSocket(host, **kwargs),
CANDLES: CandlesSocket(host, **kwargs),
OPEN_INTEREST: OpenInterestSocket(host, **kwargs),
LIQUIDATIONS: LiquidationsSocket(host, **kwargs)
}
elif backend == 'INFLUX':
args = (host, org, bucket, token)
cbs = {
L2_BOOK: BookInflux(*args, snapshot_interval=snap_interval, snapshots_only=snap_only),
TRADES: TradeInflux(*args),
TICKER: TickerInflux(*args),
FUNDING: FundingInflux(*args),
CANDLES: CandlesInflux(*args),
OPEN_INTEREST: OpenInterestInflux(*args),
LIQUIDATIONS: LiquidationsInflux(*args)
}
elif backend == 'QUEST':
kwargs = {'host': host, 'port': port if port else 9009}
cbs = {
L2_BOOK: BookQuest(**kwargs),
TRADES: TradeQuest(**kwargs),
TICKER: TickerQuest(**kwargs),
FUNDING: FundingQuest(**kwargs),
CANDLES: CandlesQuest(**kwargs),
OPEN_INTEREST: OpenInterestQuest(**kwargs),
LIQUIDATIONS: LiquidationsQuest(**kwargs)
}
elif backend == 'TTY':
cbs = {
L2_BOOK: tty,
TRADES: tty,
TICKER: tty,
FUNDING: tty,
CANDLES: tty,
OPEN_INTEREST: tty,
LIQUIDATIONS: tty
}
else:
raise ValueError(f"Invalid backend '{backend}' specified - must be in {allowed_backends}")
# Prune unused callbacks
remove = [chan for chan in cbs if chan not in channels]
for r in remove:
del cbs[r]
return EXCHANGE_MAP[exchange](candle_interval=candle_interval, symbols=symbols, channels=channels, config=config, callbacks=cbs)
def main():
save_raw = os.environ.get('SAVE_RAW', False)
if save_raw:
if save_raw.lower().startswith('f'):
save_raw = False
elif save_raw.lower().startswith('t'):
save_raw = True
else:
raise ValueError('Invalid value specified for SAVE_RAW')
fh = FeedHandler(raw_data_collection=AsyncFileCallback("./raw_data") if save_raw else None)
cfg = load_config()
fh.add_feed(cfg)
fh.run()
if __name__ == '__main__':
main()