-
Notifications
You must be signed in to change notification settings - Fork 606
/
data_fetcher.py
37 lines (27 loc) · 1.08 KB
/
data_fetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# -*- encoding: UTF-8 -*-
import akshare as ak
import logging
import talib as tl
import concurrent.futures
def fetch(code_name):
stock = code_name[0]
data = ak.stock_zh_a_hist(symbol=stock, period="daily", start_date="20220101", adjust="qfq")
if data is None or data.empty:
logging.debug("股票:"+stock+" 没有数据,略过...")
return
data['p_change'] = tl.ROC(data['收盘'], 1)
return data
def run(stocks):
stocks_data = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
future_to_stock = {executor.submit(fetch, stock): stock for stock in stocks}
for future in concurrent.futures.as_completed(future_to_stock):
stock = future_to_stock[future]
try:
data = future.result()
if data is not None:
data = data.astype({'成交量': 'double'})
stocks_data[stock] = data
except Exception as exc:
print('%s(%r) generated an exception: %s' % (stock[1], stock[0], exc))
return stocks_data