hyperpeer-py is the python module for implementing media servers or backend peers in applications based on Hyperpeer. This module provides a class called Peer which manages both the connection with the signaling server and the peer-to-peer communication via WebRTC with remote peers. It also provides an Enum class called PeerState that defines the possible states of a Peer instance.
- Built on top of asyncio, Python’s standard asynchronous I/O framework.
- Based on the popular modules aiortc and websockets.
Peer(self, serverAddress, peer_type='media-server', id=None, key=None, media_source=None, media_sink=None, frame_generator=None, frame_consumer=None, frame_rate=30, ssl_context=None, datachannel_options=None, media_source_format=None)
The Peer class represents the local peer in a WebRTC application based on Hyperpeer. It manages both the Websocket connection with the signaling server and the peer-to-peer communication via WebRTC with remote peers.
Attributes
id (string)
: id of the instance.readyState (PeerState)
: State of the peer instance. It may have one of the values specified in the class PeerState.disconnection_event (asyncio.Event)
: Notify that the current peer connection is closing
Arguments
- server_address (str): URL of the Hyperpeer signaling server, it should include the protocol prefix ws:// or wss:// that specify the websocket protocol to use.
- peer_type (str): Peer type. It can be used by other peers to know the role of the peer in the current application.
- id (str): Peer unique identification string. Must be unique among all connected peers. If it's undefined or null, the server will assign a random string.
- key (str): Peer validation string. It may be used by the server to verify the peer.
- media_source (str): Path or URL of the media source or file.
- media_source_format (str): Specific format of the media source. Defaults to autodect.
- media_sink (str): Path or filename to write with incoming video.
- frame_generator (generator function): Generator function that produces video frames as NumPy arrays with sRGB format with 24 bits per pixel (8 bits for each color). It should use the
yield
statement to generate arrays with elements of typeuint8
and with shape (vertical-resolution, horizontal-resolution, 3). - frame_consumer (function): Function used to consume incoming video frames as NumPy arrays with sRGB format with 24 bits per pixel (8 bits for each color). It should receive an argument called
frame
which will be a NumPy array with elements of typeuint8
and with shape (vertical-resolution, horizontal-resolution, 3). - frame_rate (int): Streaming frame rate
- ssl_context (ssl.SSLContext): Oject used to manage SSL settings and certificates in the connection with the signaling server when using wss. See ssl documentation for more details.
- datachannel_options (dict): Dictionary with the following keys: label, maxPacketLifeTime, maxRetransmits, ordered, and protocol. See the documentation of RTCPeerConnection.createDataChannel() method of the WebRTC API for more details.
Example
from hyperpeer import Peer, PeerState
import asyncio
import numpy
# Function used to generate video frames. It simply produce random images.
def video_frame_generator():
while True:
frame = numpy.random.rand(720, 1280, 3)
frame = numpy.uint8(frame * 100)
yield frame
# Frame counter
received_frames = 0
# Function used for consuming incoming video frames. It simply counts frames.
def video_frame_consumer(frame):
global received_frames
received_frames += 1
# Function used to consume incoming data. It simply print messages.
def on_data(data):
print('Remote message:')
print(data)
# Data channel settings. It sets the values for maximun throughout using UDP.
datachannel_options = {
'label': 'data_channel',
'maxPacketLifeTime': None,
'maxRetransmits': 0,
'ordered': False,
'protocol': ''
}
# Instanciate peer
peer = Peer('wss://localhost:8080', peer_type='media-server', id='server1', frame_generator=video_frame_generator, frame_consumer=video_frame_consumer, ssl_context=ssl_context, datachannel_options=datachannel_options)
# Coroutine used to produce and send data to remote peer. It simply send the value of the frame counter 10 times per second.
async def sender():
global peer
global received_frames
while peer.readyState == PeerState.CONNECTED:
data = { 'received_frames': received_frames }
await peer.send(data)
await asyncio.sleep(0.1)
# Main loop
async def main():
# Open server connection
await peer.open()
# Add data handler
peer.add_data_handler(on_data)
# List connected peers
peers = await peer.get_peers()
print(peers) # [{'id': 'server1', 'type': 'media-server', 'busy': False}, ... ]
try:
while True:
global received_frames
received_frames = 0
# Wait for incoming connections
remotePeerId = await peer.listen_connections()
# Accept incoming connection
await peer.accept_connection()
# Send data while connected
await sender()
# If still disconnecting wait to be online to start over again
while peer.readyState != PeerState.ONLINE:
await asyncio.sleep(1)
except Exception as err:
print(err)
raise
finally:
# Close connection before leaving
await peer.close()
# Run main loop
asyncio.run(main())
Peer.open(self)
(Coroutine) Open the connection with the Hyperpeer signaling server.
It returns when the Websocket connection with the signaling server is established.
Peer.close(self)
(Coroutine) Close the connection with the signaling server and with any remote peer.
It returns when both WebRTC peer connection and Websocket server connection are closed.
Peer.get_peers(self)
(Coroutine) Returns the list of peers currently connected to the signaling server.
Returns
list
: List of peers currently connected to the signaling server. Each peer is represented by a dictionary with the following keys: id(str
), type(str
), busy(bool
).
Raises
Exception
: Ifpeer.readyState
is notPeerState.ONLINE
Peer.connect_to(self, remote_peer_id)
(Coroutine) Request a peer-to-peer connection with a remote peer.
Arguments
- remote_peer_id (str): id of the remote peer to connect to.
Raises
Exception
: Ifpeer.readyState
is notPeerState.ONLINE
Exception
: If a peer with id equal to remote_peer_id do not exist.Exception
: If remote peer is busy.
Peer.listen_connections(self)
(Coroutine) Wait for incoming connections.
It returns when a connection request is received setting peer.readyState
as PeerState.CONNECTING
Raises
Exception
: Ifpeer.readyState
is notPeerState.ONLINE
Peer.accept_connection(self)
(Coroutine) Accept an incoming connection from a remote peer. You should call to the Peer.listen_connections
method first.
Raises
Exception
: Ifpeer.readyState
is notPeerState.CONNECTING
Peer.send(self, data)
(Coroutine) Send a message to the connected remote peer using the established WebRTC data channel.
Arguments
- data (object): Data to send. It should be a string, number, list, or dictionary in order to be JSON serialized.
Raises
Exception
: Ifpeer.readyState
is notPeerState.CONNECTED
Peer.recv(self)
(Coroutine) Wait until a message from the remote peer is received.
Returns
object
: Data received.
Raises
Exception
: Ifpeer.readyState
is notPeerState.CONNECTED
Peer.add_data_handler(self, handler)
Adds a function to the list of handlers to call whenever data is received.
Arguments
- handler (function): A function that will be called with the an argument called 'data'.
Peer.remove_data_handler(self, handler)
Removes a function from the list of data handlers.
Arguments
- handler (function): The function that will be removed.
Peer.disconnect(self, error=None)
(Coroutine) Terminate the WebRTC peer-to-peer connection with the remote peer.
PeerState(self, /, *args, **kwargs)
Enum
class that represents the possible states of a Peer instance.
Attributes
STARTING (enum)
: connecting to signaling server.ONLINE (enum)
: connected to signaling server but not paired to any peer.LISTENING (enum)
: pairing and establishing a WebRTC connection with peer.CONNECTING (enum)
: WebRTC peer connection and data channel are ready.CONNECTED (enum)
: closing peer connection.DISCONNECTING (enum)
: waiting for incoming connections.CLOSING (enum)
: disconnecting from signaling server.CLOSED (enum)
: disconnected from signaling server and not longer usable.