Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does Django Channels enforce single-threaded per connection execution? #2133

Open
jmurua14 opened this issue Feb 6, 2025 · 6 comments
Open

Comments

@jmurua14
Copy link

jmurua14 commented Feb 6, 2025

I have created an application that receives audio every second from a websocket connection and I would like to process the audio in parallel. However, the consumers receive function seems single-threaded. What I mean is that even if I send audio every second the audio processing needs more than 2 seconds and this delays everything. I am using the AsyncWebsocketConsumer consumer class. In the code below I have simulated the audio processing with a 2 second sleep so that you can try i out. I am executing django with daphne (daphne -b 0.0.0.0 -p 8000 proiektua.asgi:application )

consumers.py

import json
from urllib.parse import parse_qs
from channels.generic.websocket import AsyncWebsocketConsumer
from .models import Room, Message
from channels.db import database_sync_to_async
import io
import time
import asyncio
from asgiref.sync import sync_to_async

class ChatConsumer(AsyncWebsocketConsumer):

    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f"chat_{self.room_name}"
        
        query_params = parse_qs(self.scope["query_string"].decode())
        self.role = query_params.get("role", [""])[0]  # "sender" or "receiver"
        if self.role == "sender":
            print("SENDER")
        elif self.role == "receiver":
            print("RECEIVER")

        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        # Accept the WebSocket connection
        await self.accept()

        self.aurreko_denb = time.time()

    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    async def receive(self, text_data= None, bytes_data= None):
        
        if bytes_data:
            print(f"Time between receive: {time.time() - self.aurreko_denb}")
            self.aurreko_denb = time.time()
            asyncio.create_task(self.process_audio_and_send2(bytes_data)   


    async def process_audio_and_send2(self, bytes_data):
        """ Runs the audio processing without blocking receive """
        transcript = await sync_to_async(self.process_audio)(bytes_data)
        
        # Send the transcription back
        await self.send(text_data=transcript)

    def process_audio(self, audio_data):
        """ Process audio synchronously (e.g., speech-to-text) """
        import time  # Simulate a delay
        time.sleep(2)  # Simulate a slow process (Replace with actual STT)
        return "Transcribed text here"

Logs:

127.0.0.1:43476 - - [06/Feb/2025:09:34:51] "WSCONNECT /ws/chat/1/" - -
Time between receive: 1.0896313190460205
Time between receive: 2.002781391143799
Time between receive: 2.003610134124756
Time between receive: 2.0032858848571777
Time between receive: 2.0033938884735107
Time between receive: 2.0036416053771973
Time between receive: 2.0029494762420654
Time between receive: 2.003593921661377
Time between receive: 2.0041894912719727
127.0.0.1:43476 - - [06/Feb/2025:09:35:08] "WSDISCONNECT /ws/chat/1/" - -

I have tried with asyncio create_task but it doens't work how I want. I would appreciate any help, thanks in advance.

@carltongibson
Copy link
Member

Hi.

Individual ASGI events are dispatched serially — see #1933 for a potential change there — but you're (correctly!) using create_task to move the long running work out of the consumer. You want to check that your receive call is continuing after the create_task call.

You're also using sync_to_asyc which will dispatch tasks serially on a single thread (by default). Maybe that's where you're seeing the bottleneck. (But that shouldn't be blocking receive… from completing and progressing to the next event.) 🤔

@jmurua14
Copy link
Author

jmurua14 commented Feb 6, 2025

What I want is to handle the receive calls simultaneously. With this what i mean is that the time between the receive calls should be of one second approx, but it seems that the consumer is only able to handle one receive call at a time. What i want is to shrink the time between the receive calls to one second. Is this somehow possible?

@carltongibson
Copy link
Member

As long as your receive handler completes, the next event should be handled as it comes in, yes.

You could override dispatch to check that your receive call is completing.

    async def dispatch(self, message):
        super().dipatch(message)
        # Log the time here... 

@jmurua14
Copy link
Author

jmurua14 commented Feb 7, 2025

I dont understand what you meant with the last answer. When I try the code you proposed I get the error AttributeError: 'super' object has no attribute 'dipatch'. Where should I add the function above? I am adding it after the class AsyncWebsocketConsumer. Can you explain me what is going on?

@Maryam3107
Copy link

I have created an application that receives audio every second from a websocket connection and I would like to process the audio in parallel. However, the consumers receive function seems single-threaded. What I mean is that even if I send audio every second the audio processing needs more than 2 seconds and this delays everything. I am using the AsyncWebsocketConsumer consumer class. In the code below I have simulated the audio processing with a 2 second sleep so that you can try i out. I am executing django with daphne (daphne -b 0.0.0.0 -p 8000 proiektua.asgi:application )

consumers.py

import json
from urllib.parse import parse_qs
from channels.generic.websocket import AsyncWebsocketConsumer
from .models import Room, Message
from channels.db import database_sync_to_async
import io
import time
import asyncio
from asgiref.sync import sync_to_async

class ChatConsumer(AsyncWebsocketConsumer):

    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f"chat_{self.room_name}"
        
        query_params = parse_qs(self.scope["query_string"].decode())
        self.role = query_params.get("role", [""])[0]  # "sender" or "receiver"
        if self.role == "sender":
            print("SENDER")
        elif self.role == "receiver":
            print("RECEIVER")

        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        # Accept the WebSocket connection
        await self.accept()

        self.aurreko_denb = time.time()

    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    async def receive(self, text_data= None, bytes_data= None):
        
        if bytes_data:
            print(f"Time between receive: {time.time() - self.aurreko_denb}")
            self.aurreko_denb = time.time()
            asyncio.create_task(self.process_audio_and_send2(bytes_data)   


    async def process_audio_and_send2(self, bytes_data):
        """ Runs the audio processing without blocking receive """
        transcript = await sync_to_async(self.process_audio)(bytes_data)
        
        # Send the transcription back
        await self.send(text_data=transcript)

    def process_audio(self, audio_data):
        """ Process audio synchronously (e.g., speech-to-text) """
        import time  # Simulate a delay
        time.sleep(2)  # Simulate a slow process (Replace with actual STT)
        return "Transcribed text here"

Logs:

127.0.0.1:43476 - - [06/Feb/2025:09:34:51] "WSCONNECT /ws/chat/1/" - - Time between receive: 1.0896313190460205 Time between receive: 2.002781391143799 Time between receive: 2.003610134124756 Time between receive: 2.0032858848571777 Time between receive: 2.0033938884735107 Time between receive: 2.0036416053771973 Time between receive: 2.0029494762420654 Time between receive: 2.003593921661377 Time between receive: 2.0041894912719727 127.0.0.1:43476 - - [06/Feb/2025:09:35:08] "WSDISCONNECT /ws/chat/1/" - -

I have tried with asyncio create_task but it doens't work how I want. I would appreciate any help, thanks in advance.

i will try to fix this issue. kindly assign this issue to me

@jmurua14
Copy link
Author

I have created an application that receives audio every second from a websocket connection and I would like to process the audio in parallel. However, the consumers receive function seems single-threaded. What I mean is that even if I send audio every second the audio processing needs more than 2 seconds and this delays everything. I am using the AsyncWebsocketConsumer consumer class. In the code below I have simulated the audio processing with a 2 second sleep so that you can try i out. I am executing django with daphne (daphne -b 0.0.0.0 -p 8000 proiektua.asgi:application )

consumers.py

import json
from urllib.parse import parse_qs
from channels.generic.websocket import AsyncWebsocketConsumer
from .models import Room, Message
from channels.db import database_sync_to_async
import io
import time
import asyncio
from asgiref.sync import sync_to_async

class ChatConsumer(AsyncWebsocketConsumer):

    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f"chat_{self.room_name}"
        
        query_params = parse_qs(self.scope["query_string"].decode())
        self.role = query_params.get("role", [""])[0]  # "sender" or "receiver"
        if self.role == "sender":
            print("SENDER")
        elif self.role == "receiver":
            print("RECEIVER")

        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        # Accept the WebSocket connection
        await self.accept()

        self.aurreko_denb = time.time()

    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    async def receive(self, text_data= None, bytes_data= None):
        
        if bytes_data:
            print(f"Time between receive: {time.time() - self.aurreko_denb}")
            self.aurreko_denb = time.time()
            asyncio.create_task(self.process_audio_and_send2(bytes_data)   


    async def process_audio_and_send2(self, bytes_data):
        """ Runs the audio processing without blocking receive """
        transcript = await sync_to_async(self.process_audio)(bytes_data)
        
        # Send the transcription back
        await self.send(text_data=transcript)

    def process_audio(self, audio_data):
        """ Process audio synchronously (e.g., speech-to-text) """
        import time  # Simulate a delay
        time.sleep(2)  # Simulate a slow process (Replace with actual STT)
        return "Transcribed text here"

Logs:

127.0.0.1:43476 - - [06/Feb/2025:09:34:51] "WSCONNECT /ws/chat/1/" - - Time between receive: 1.0896313190460205 Time between receive: 2.002781391143799 Time between receive: 2.003610134124756 Time between receive: 2.0032858848571777 Time between receive: 2.0033938884735107 Time between receive: 2.0036416053771973 Time between receive: 2.0029494762420654 Time between receive: 2.003593921661377 Time between receive: 2.0041894912719727 127.0.0.1:43476 - - [06/Feb/2025:09:35:08] "WSDISCONNECT /ws/chat/1/" - -
I have tried with asyncio create_task but it doens't work how I want. I would appreciate any help, thanks in advance.

i will try to fix this issue. kindly assign this issue to me

Have you been able to solve the issue or have you done any advance on the issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants