Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: WebSocket Client for Live Data Streaming #2201

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Yahoo! finance API is intended for personal use only.**
- `Ticker`: single ticker data
- `Tickers`: multiple tickers' data
- `download`: download market data for multiple tickers
- `WebSocket` and `AsyncWebSocket`: live streaming data
- `Market`: get infomation about a market
- `Search`: quotes and news from search
- `Sector` and `Industry`: sector and industry information
Expand Down
14 changes: 14 additions & 0 deletions doc/source/reference/examples/live_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import asyncio
import yfinance as yf

# define your message callback
def message_handler(message):
print("Received message:", message)

async def main():
client = yf.AsyncWebSocket()
await client.subscribe(["AAPL", "BTC-USD", "ETH-USD"]) # Subscribe to symbols
await client.unsubscribe("ETH-USD") # Unsubscribe from symbols
await client.listen()

asyncio.run(main())
9 changes: 9 additions & 0 deletions doc/source/reference/examples/live_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import yfinance as yf

# define your message callback
def message_handler(message):
print("Received message:", message)

client = yf.WebSocket()
client.subscribe(["AAPL", "BTC-USD"]) # Subscribe to symbols
client.listen(message_handler)
3 changes: 3 additions & 0 deletions doc/source/reference/examples/ticker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@

# analysis
dat.analyst_price_targets

# websocket
dat.live()
5 changes: 4 additions & 1 deletion doc/source/reference/examples/tickers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@
# access each ticker using (example)
tickers.tickers['MSFT'].info
tickers.tickers['AAPL'].history(period="1mo")
tickers.tickers['GOOG'].actions
tickers.tickers['GOOG'].actions

# websocket
tickers.live()
3 changes: 3 additions & 0 deletions doc/source/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ The following are the publicly available classes, and functions exposed by the `
- :attr:`Tickers <yfinance.Tickers>`: Class for handling multiple tickers.
- :attr:`MarketSummary <yfinance.MarketSummary>`: Class for accessing market summary.
- :attr:`Search <yfinance.Search>`: Class for accessing search results.
- :class:`WebSocket <yfinance.WebSocket>`: Class for synchronously streaming live market data.
- :class:`AsyncWebSocket <yfinance.AsyncWebSocket>`: Class for asynchronously streaming live market data.
- :attr:`Sector <yfinance.Sector>`: Domain class for accessing sector information.
- :attr:`Industry <yfinance.Industry>`: Domain class for accessing industry information.
- :attr:`download <yfinance.download>`: Function to download market data for multiple tickers.
Expand All @@ -37,6 +39,7 @@ The following are the publicly available classes, and functions exposed by the `
yfinance.analysis
yfinance.marketsummary
yfinance.search
yfinance.websocket
yfinance.sector_industry
yfinance.screener
yfinance.functions
Expand Down
36 changes: 36 additions & 0 deletions doc/source/reference/yfinance.websocket.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
=====================
WebSocket
=====================

.. currentmodule:: yfinance

The `WebSocket` module allows you to stream live price data from Yahoo Finance using both synchronous and asynchronous clients.

Classes
------------

.. autosummary::
:toctree: api/

WebSocket
AsyncWebSocket

Synchronous WebSocket
----------------------

The `WebSocket` class provides a synchronous interface for subscribing to price updates.

Sample Code:

.. literalinclude:: examples/live_sync.py
:language: python

Asynchronous WebSocket
-----------------------

The `AsyncWebSocket` class provides an asynchronous interface for subscribing to price updates.

Sample Code:

.. literalinclude:: examples/live_async.py
:language: python
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ html5lib>=1.1
peewee>=3.16.2
requests_cache>=1.0
requests_ratelimiter>=0.3.1
scipy>=1.6.3
scipy>=1.6.3
protobuf>=5.29.2
websockets>=14.1
8 changes: 7 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,17 @@
'requests>=2.31', 'multitasking>=0.0.7',
'lxml>=4.9.1', 'platformdirs>=2.0.0', 'pytz>=2022.5',
'frozendict>=2.3.4', 'peewee>=3.16.2',
'beautifulsoup4>=4.11.1', 'html5lib>=1.1'],
'beautifulsoup4>=4.11.1', 'html5lib>=1.1',
'protobuf>=5.29.2', 'websockets>=14.1'],
extras_require={
'nospam': ['requests_cache>=1.0', 'requests_ratelimiter>=0.3.1'],
'repair': ['scipy>=1.6.3'],
},
# Include protobuf files for websocket support
package_data={
'yfinance': ['pricing.proto', 'pricing_pb2.py'],
},
include_package_data=True,
# Note: Pandas.read_html() needs html5lib & beautifulsoup4
entry_points={
'console_scripts': [
Expand Down
30 changes: 30 additions & 0 deletions tests/test_live.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import unittest
from unittest.mock import Mock

from yfinance.live import AsyncWebSocket


class TestAsyncWebSocket(unittest.TestCase):
def test_decode_message_valid(self):
message = ("CgdCVEMtVVNEFYoMuUcYwLCVgIplIgNVU0QqA0NDQzApOAFFPWrEP0iAgOrxvANVx/25R12csrRHZYD8skR9/"
"7i0R7ABgIDq8bwD2AEE4AGAgOrxvAPoAYCA6vG8A/IBA0JUQ4ECAAAAwPrjckGJAgAA2P5ZT3tC")

ws = AsyncWebSocket(Mock())
decoded = ws._decode_message(message)

expected = {'id': 'BTC-USD', 'price': 94745.08, 'time': '1736509140000', 'currency': 'USD', 'exchange': 'CCC',
'quote_type': 41, 'market_hours': 1, 'change_percent': 1.5344921, 'day_volume': '59712028672',
'day_high': 95227.555, 'day_low': 92517.22, 'change': 1431.8906, 'open_price': 92529.99,
'last_size': '59712028672', 'price_hint': '2', 'vol_24hr': '59712028672',
'vol_all_currencies': '59712028672', 'from_currency': 'BTC', 'circulating_supply': 19808172.0,
'market_cap': 1876726640000.0}

self.assertEqual(expected, decoded)

def test_decode_message_invalid(self):
websocket = AsyncWebSocket(Mock())
base64_message = "invalid_base64_string"
decoded = websocket._decode_message(base64_message)
assert "error" in decoded
assert "raw_base64" in decoded
self.assertEqual(base64_message, decoded["raw_base64"])
3 changes: 2 additions & 1 deletion yfinance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from .ticker import Ticker
from .tickers import Tickers
from .multi import download
from .live import WebSocket, AsyncWebSocket
from .utils import enable_debug_mode
from .cache import set_tz_cache_location
from .domain.sector import Sector
Expand All @@ -39,6 +40,6 @@
import warnings
warnings.filterwarnings('default', category=DeprecationWarning, module='^yfinance')

__all__ = ['download', 'Market', 'Search', 'Ticker', 'Tickers', 'enable_debug_mode', 'set_tz_cache_location', 'Sector', 'Industry']
__all__ = ['download', 'Market', 'Search', 'Ticker', 'Tickers', 'enable_debug_mode', 'set_tz_cache_location', 'Sector', 'Industry', 'WebSocket', 'AsyncWebSocket']
# screener stuff:
__all__ += ['EquityQuery', 'FundQuery', 'screen', 'PREDEFINED_SCREENER_QUERIES']
11 changes: 11 additions & 0 deletions yfinance/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from . import utils, cache
from .data import YfData
from .exceptions import YFEarningsDateMissing, YFRateLimitError
from .live import WebSocket
from .scrapers.analysis import Analysis
from .scrapers.fundamentals import Fundamentals
from .scrapers.holders import Holders
Expand Down Expand Up @@ -76,6 +77,9 @@ def __init__(self, ticker, session=None, proxy=None):

self._fast_info = None

self._message_handler = None
self.ws = None

@utils.log_indent_decorator
def history(self, *args, **kwargs) -> pd.DataFrame:
return self._lazy_load_price_history().history(*args, **kwargs)
Expand Down Expand Up @@ -690,3 +694,10 @@ def get_funds_data(self, proxy=None) -> Optional[FundsData]:
self._funds_data = FundsData(self._data, self.ticker)

return self._funds_data

def live(self, message_handler, verbose=True):
self._message_handler = message_handler

self.ws = WebSocket(verbose=verbose)
self.ws.subscribe(self.ticker)
self.ws.listen(self._message_handler)
178 changes: 178 additions & 0 deletions yfinance/live.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import asyncio
import base64
import json
from typing import List, Optional, Callable
import websockets

from yfinance import utils
from yfinance.pricing_pb2 import PricingData
from google.protobuf.json_format import MessageToDict


class AsyncWebSocket:
"""
Asynchronous WebSocket client for streaming real time pricing data.
"""

def __init__(self, url: str = "wss://streamer.finance.yahoo.com/?version=2", verbose=True):
"""
Initialize the AsyncWebSocket client.

Args:
url (str): The WebSocket server URL. Defaults to Yahoo Finance's WebSocket URL.
verbose (bool): Flag to enable or disable print statements. Defaults to True.
"""
self.url = url
self.verbose = verbose
self.logger = utils.get_yf_logger()
self._ws = None
self._subscriptions = set()
self._message_handler = None # Callable to handle messages

async def _connect(self):
if self._ws is None:
self._ws = await websockets.connect(self.url)
self.logger.info("Connected to WebSocket.")
if self.verbose:
print("Connected to WebSocket.")

async def subscribe(self, symbols: str | List[str]):
"""
Subscribe to a stock symbol or a list of stock symbols.

Args:
symbols (str | List[str]): Stock symbol(s) to subscribe to.
"""
await self._connect()

if isinstance(symbols, str):
symbols = [symbols]

self._subscriptions.update(symbols)

message = {"subscribe": list(self._subscriptions)}
await self._ws.send(json.dumps(message))

self.logger.info(f"Subscribed to symbols: {symbols}")
if self.verbose:
print(f"Subscribed to symbols: {symbols}")

async def unsubscribe(self, symbols: str | List[str]):
"""
Unsubscribe from a stock symbol or a list of stock symbols.

Args:
symbols (str | List[str]): Stock symbol(s) to unsubscribe from.
"""
await self._connect()

if isinstance(symbols, str):
symbols = [symbols]

self._subscriptions.difference_update(symbols)

message = {"unsubscribe": list(self._subscriptions)}
await self._ws.send(json.dumps(message))

self.logger.info(f"Unsubscribed from symbols: {symbols}")
if self.verbose:
print(f"Unsubscribed from symbols: {symbols}")

async def listen(self, message_handler: Optional[Callable[[dict], None]] = None):
"""
Start listening to messages from the WebSocket server.

Args:
message_handler (Optional[Callable[[dict], None]]): Optional function to handle received messages.
"""
await self._connect()
self._message_handler = message_handler

self.logger.info("Listening for messages...")
if self.verbose:
print("Listening for messages...")

try:
async for message in self._ws:
message_json = json.loads(message)
encoded_data = message_json.get("message", "")
decoded_message = self._decode_message(encoded_data)
if self._message_handler:
self._message_handler(decoded_message)
else:
print(decoded_message)
except Exception as e:
self.logger.error("Error while listening to messages: %s", e, exc_info=True)
if self.verbose:
print("Error while listening to messages: %s", e)

def _decode_message(self, base64_message: str) -> dict:
try:
decoded_bytes = base64.b64decode(base64_message)
pricing_data = PricingData()
pricing_data.ParseFromString(decoded_bytes)
return MessageToDict(pricing_data, preserving_proto_field_name=True)
except Exception as e:
self.logger.error("Failed to decode message: %s", e, exc_info=True)
print("Failed to decode message: %s", e)
return {
'error': str(e),
'raw_base64': base64_message
}

async def close(self):
"""Close the WebSocket connection."""
if self._ws is not None and not self._ws.closed:
await self._ws.close()
self.logger.info("WebSocket connection closed.")
if self.verbose:
print("WebSocket connection closed.")


class WebSocket:
"""
Synchronous WebSocket client for streaming real time pricing data.
"""

def __init__(self, url: str = "wss://streamer.finance.yahoo.com/?version=2", verbose=True):
"""
Initialize the WebSocket client.

Args:
url (str): The WebSocket server URL. Defaults to Yahoo Finance's WebSocket URL.
verbose (bool): Flag to enable or disable print statements. Defaults to True.
"""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.client = AsyncWebSocket(url, verbose)

def subscribe(self, symbols: str | List[str]):
"""
Subscribe to a stock symbol or a list of stock symbols.

Args:
symbols (str | List[str]): Stock symbol(s) to subscribe to.
"""
self.loop.run_until_complete(self.client.subscribe(symbols))

def unsubscribe(self, symbols: str | List[str]):
"""
Unsubscribe from a stock symbol or a list of stock symbols.

Args:
symbols (str | List[str]): Stock symbol(s) to unsubscribe from.
"""
self.loop.run_until_complete(self.client.unsubscribe(symbols))

def listen(self, message_handler: Optional[Callable[[dict], None]] = None):
"""
Start listening to messages from the WebSocket server.

Args:
message_handler (Optional[Callable[[dict], None]]): Optional function to handle received messages.
"""
self.loop.run_until_complete(self.client.listen(message_handler))

def close(self):
"""Close the WebSocket connection."""
self.loop.run_until_complete(self.client.close())
Loading
Loading