Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set ThreadPool as default executor #383

Open
kyboi opened this issue Nov 27, 2024 · 3 comments
Open

Set ThreadPool as default executor #383

kyboi opened this issue Nov 27, 2024 · 3 comments

Comments

@kyboi
Copy link

kyboi commented Nov 27, 2024

There are many workflows that require interweaving async and non async (CPU intensive) blocking code. These cannot each be split up into separate tasks because there are locally stored files involved. The best solution is thus to offload the blocking tasks to the executor so as to not block the asyncio loop.

If I understand correctly, each worker process starts a ThreadPoolExecutor in which sync tasks are run. Being able to access this thread pool instead of making another one would be ideal. Currently we are working around this by having a custom receiver, accessing the instance of the threadpool and storing the reference in the application state.

from taskiq.receiver import Receiver


class CustomReceiver(Receiver):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Store the executor in the broker's state for global access
        # This allows us to run CPU-heavy code on the workers
        # without blocking the asyncio loop
        self.broker.state.executor = self.executor

But I believe a much better solution would be to simply set the created threadpool as the default executor for the asyncio loop so it can be used without passing the reference around:

with ThreadPoolExecutor(args.max_threadpool_threads) as pool:
    loop = asyncio.get_event_loop()
    loop.set_default_executor(self.executor)
await asyncio.get_running_loop().run_in_executor(None, func)

Or in addition / at the minimum allow us to get the instance of the executor from the API.

@kyboi
Copy link
Author

kyboi commented Dec 12, 2024

In fact, it doesn't seem like a good idea to have more than one thread in that threadpool if all it is used for is genuine CPU-intensive sync tasks, provided IO tasks are run with asyncio.

As far as I can tell, the default of having many threads in a thread pool predates the widespread use of asyncio. If you are only doing blocking tasks on threads, due to the GIL, it is counter-productive to have more than one thread, and using the --workers options with multiprocessing should instead be used to match the CPU count.

image
image

https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

@s3rius
Copy link
Member

s3rius commented Dec 13, 2024

I agree that adding the ability to reuse the pool is a good addition to the API. However, I disagree with the suggestion that the default thread pool size should be set to 1. The Global Interpreter Lock (GIL) in Python only prevents multiple Python bytecode instructions from running concurrently in the same process. In most cases, when performing calculations, you'll be using libraries like numpy or similar, which are written in C or C++ and operate in native extensions. Since these libraries perform their computations in C, the GIL is not a bottleneck during execution. Here's a small benchmark:

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor

def my_sync_task():
    sum = 0
    for _ in range(50_000_000):
        sum += 1


async def my_async_task(tp: ThreadPoolExecutor):
    loop = asyncio.get_running_loop()

    task1 = loop.run_in_executor(tp, my_sync_task)
    task2 = loop.run_in_executor(tp, my_sync_task)
    await asyncio.gather(task1, task2)


async def test_run(workers: int, tasks: int):
    with ThreadPoolExecutor(max_workers=workers) as tp:
        start = time.monotonic()
        await asyncio.gather(*[my_async_task(tp) for _ in range(tasks)])
        print(f"Time taken: {time.monotonic() - start}")


async def main():
    await test_run(10, 5)
    await test_run(1, 5)


if __name__ == "__main__":
    asyncio.run(main())

When executing this code, the GIL will actually slow down the program, so the execution time for both cases will be almost the same:

Time taken: 15.220620960004453  # With 10 workers and 10 sync tasks running.
Time taken: 14.688892211997882  # With 1 worker and 10 sync tasks running.

Note

I have an AMD Ryzen 5 7530U @ 4.546GHz, so you might need to adjust some values to get the similar execution speed.

Now, let's modify the test to replace my_sync_task with something that offloads calculations to a native extension, like numpy:

import numpy as np


def my_sync_task():
    rand = np.random.default_rng()
    a = rand.integers(1, 100, size=(1300, 1200))
    b = rand.integers(1, 100, size=(1200, 1300))
    c = a @ b # Perform a big matrix multiplication

In that case GIL will only impact parts of the library that are written in Python and sending data between the interpreter and native extension. Here are results:

Time taken: 7.917032391997054  # With 10 workers and 10 sync tasks running.
Time taken: 20.60293159300636  # With 1 worker and 10 sync tasks running.

Although it's not as fast as one might expect (due to tasks sharing the CPU and the data transfer overhead between Python and C++), you can see a significant improvement when using multiple workers.

So, I would say that setting the worker count to 1 will generally decrease performance, especially for CPU-bound tasks that rely on native extensions. Even when using Python code, there's often little to no difference between having 1 or n threads in the pool.

But to give more flexibility to TaskIQ I would really like adding an ability to set custom executors.

To do so, I suggest the following:

  1. Add broker field that holds a reference to an executor (ThreadPoolExecutor by default).
  2. Add a method like with_executor that will be able to set an executor to something else.

In that case:

  • it will be a super easy to get the executor from a broker. Because you can get it from a broker's field like broker.sync_executor
  • You can set it explicitly during broker creation. Like: broker = Broker().with_executor(ThreadPool...)
  • You can set it to None, to use default (global) executor.
  • You can set global executor during broker's startup.

What do you think?

@kyboi
Copy link
Author

kyboi commented Dec 16, 2024

It is probably best to allow the users to change / set and access the Executor, and document that as a feature. You can then have the default amount of workers equal to the number of CPUs on the system (e.g. via multiprocessing.cpu_count()). This would allow for the most flexibility in use-cases.

when performing calculations, you'll be using libraries like numpy or similar, which are written in C or C++ and operate in native extensions

This is actually a big assumption and not really reflected in what one would expect from a worker library for Python. For a particular example I am working on, I am going through PDFs with pdfium, which does not release the GIL, despite it using an underlying C library, meaning it is effectively CPU-intensive to the main Python process. Within this task, I then call a bunch of OCR tasks using tesserocr, which does release the GIL. To best utilise the CPUs, I set the main Taskiq ThreadPool to 1 worker, and then made a separate ThreadPool which # CPUs as the # of workers, on which I schedule the tesserocr tasks, and then await all of them. This means there is only ever at most # CPUs' worth of Python threads running, since the main pool can only run one task, and it is waiting on results from the other pool before continuing.

If one schedules a bunch of truly CPU-intensive sync tasks on the main Taskiq executor (not releasing the GIL), it is very easy to end up with degraded performance as it keeps context-switching between the various threads. This is not obvious from a user perspective and a common pitfall. Additionally, depending on how the tasks get distributed to workers, it is easy for one worker to pick up multiple sync tasks that it then can't truly run in parallel, instead of them getting spread out over other workers which can do the sync work. Of course, none of this apply to just asyncio tasks.

In light of this, I believe there is still a case for setting the default sync threads to 1, or at the very least documenting the trade-offs in detail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants