-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdrbt.py
103 lines (83 loc) · 2.65 KB
/
drbt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import re
import os
import json
import asyncio
import websockets
from collections.abc import MutableMapping
from datetime import datetime, timedelta
from influx import get_influx_client, get_influx_write_api
# get tomorrow
dt = datetime.now() + timedelta(days=1)
#tom = dt.strftime("%d%b%y").upper()
expiry = os.getenv('EXPIRY')
# 21JUL22
# get strike
strike = int(os.getenv('STRIKE'))
#################################################################
chans = []
strike = 10000
while strike < 50000:
chans.append(f'ticker.BTC-{expiry}-{strike}-C.100ms')
chans.append(f'ticker.BTC-{expiry}-{strike}-P.100ms')
strike += 1000
msgSub = \
{
"jsonrpc": "2.0",
"id": 3600,
"method": "public/subscribe",
"params": {
"channels": chans
}
}
def flatten_dict(d: MutableMapping, parent_key: str = '', sep: str = '.') -> MutableMapping:
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, MutableMapping):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def create_data_point(data):
p = get_influx_client().Point("options").tag("version", "dev1.1")
flat = flatten_dict(data)
for f in flat:
if isinstance(flat[f], str):
p = p.tag(f, flat[f])
elif type(flat[f]) == int or type(flat[f]) == float:
p = p.field(f, flat[f])
# add strike price
strike_price = re.search(
'.*?-.*?-(.*)-[CP]', flat['instrument_name']).group(1)
p = p.field('strike_price', int(strike_price))
return p
def write_data(j):
params = j['params']
if params is None:
return
channel = params['channel']
if channel is None or channel not in chans:
return
data = params['data']
if data is None:
return
p = create_data_point(data)
res = get_influx_write_api().write(
bucket='deribit-1', org='Orbs', record=p)
if(res):
print(res)
print(data['instrument_name'], data['underlying_price'])
async def call_api(msg):
async with websockets.connect('wss://www.deribit.com/ws/api/v2') as websocket:
await websocket.send(msg)
while websocket.open:
response = await websocket.recv()
# do something with the response...
# print(response)
j = json.loads(response)
try:
write_data(j)
except Exception as e:
print('Exception:', e)
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(call_api(json.dumps(msgSub)))