Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple worker executors #4869

Merged
merged 11 commits into from
Jun 4, 2021
Merged

Multiple worker executors #4869

merged 11 commits into from
Jun 4, 2021

Conversation

madsbk
Copy link
Contributor

@madsbk madsbk commented Jun 2, 2021

This is the first iteration of implementing multiple worker executors as discussed in #4655

The idea is to send task annotations to the workers and let workers have a dict of executors name -> ThreadPoolExecutor. The default value of this dict is:

        self.executors = {
            "offload": utils._offload_executor,
            "actor": ThreadPoolExecutor(1, thread_name_prefix="Dask-Actor-Threads"),
            "default": ThreadPoolExecutor(self.nthreads, thread_name_prefix="Dask-Default-Threads'"),
        }

If nothing is specified through the executor annotation, the worker will use the "default" executor.

NB: this doesn't control the number of concurrent tasks running on a worker. For now, use the already available resource management.

Example of running CPU tasks in the default ThreadPoolExecutor and GPU tasks in an dedicated ThreadPoolExecutor:

import asyncio
import threading
import dask
from dask.distributed import Client, Scheduler, Worker
from distributed.threadpoolexecutor import ThreadPoolExecutor

def get_thread_name(prefix):
    return prefix + threading.current_thread().name

async def main():
    async with Scheduler() as s:
        async with Worker(
            s.address,
            nthreads=5,
            executor={
                "GPU": ThreadPoolExecutor(1, thread_name_prefix="Dask-GPU-Threads")
            },
            resources={"GPU": 1, "CPU": 4},
        ) as w:
            async with Client(s.address, asynchronous=True) as c:
                with dask.annotate(resources={"CPU": 1}, executor="default"):
                    print(await c.submit(get_thread_name, "CPU-"))
                with dask.annotate(resources={"GPU": 1}, executor="GPU"):
                    print(await c.submit(get_thread_name, "GPU-"))

if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Output

CPU-Dask-Default-Threads'-29802-2
GPU-Dask-GPU-Threads-29802-3

@madsbk madsbk force-pushed the multiple_executors branch from a04c133 to 758fa9e Compare June 2, 2021 09:03
@sjperkins
Copy link
Member

sjperkins commented Jun 2, 2021

I'm excited by the potential introduced by this PR :-)

@madsbk madsbk changed the title Multiple executors [WIP] Multiple worker executors Jun 2, 2021
@mrocklin
Copy link
Member

mrocklin commented Jun 2, 2021

In general this looks good to me. I like the clean generalization.

If you wanted to maintain backwards compatibility, and also maybe slightly reduce the cost of updating tests (which seems minimal anyway) then we might consider adding the following property

class Worker:
    @property
    def executor(self):
        return self.executors["default"]

@sjperkins
Copy link
Member

I suspect this also closes #4560

@madsbk
Copy link
Contributor Author

madsbk commented Jun 2, 2021

In general this looks good to me. I like the clean generalization.

If you wanted to maintain backwards compatibility, and also maybe slightly reduce the cost of updating tests (which seems minimal anyway) then we might consider adding the following property

class Worker:
    @property
    def executor(self):
        return self.executors["default"]

I did this initially but decide against it to limit the API maintenance. But of cause, if people are using Worker.executor we should add it as a property. Do you know of anyone else are using it?

@madsbk madsbk marked this pull request as ready for review June 2, 2021 14:25
@mrocklin
Copy link
Member

mrocklin commented Jun 2, 2021

Do you know of anyone else are using it?

I know of groups that I wouldn't be surprised to learn that they used it. I think that providing the property is a good idea for backwards compatibility. I don't think that we lose anything from it.

@madsbk
Copy link
Contributor Author

madsbk commented Jun 2, 2021

I know of groups that I wouldn't be surprised to learn that they used it. I think that providing the property is a good idea for backwards compatibility. I don't think that we lose anything from it.

Alright, added the property and a test

@madsbk madsbk changed the title [WIP] Multiple worker executors Multiple worker executors Jun 2, 2021
@mrocklin
Copy link
Member

mrocklin commented Jun 2, 2021

This seems great to me. +1

I don't know enough about the current state of testing to know if the current failure is relevant, but it looks familiar to me, so I suspect that it's fine.

For future work I think that we might want to start thinking about how to expose this through configuration options. Maybe we optionally accept strings instead of Executor objects, and if it is a string we use distributed.utils.import_term. Then users can use things like dask-spec to create a Worker with custom executor objects from the command line.

1 similar comment
@mrocklin
Copy link
Member

mrocklin commented Jun 2, 2021

This seems great to me. +1

I don't know enough about the current state of testing to know if the current failure is relevant, but it looks familiar to me, so I suspect that it's fine.

For future work I think that we might want to start thinking about how to expose this through configuration options. Maybe we optionally accept strings instead of Executor objects, and if it is a string we use distributed.utils.import_term. Then users can use things like dask-spec to create a Worker with custom executor objects from the command line.

@madsbk madsbk force-pushed the multiple_executors branch from 8647726 to cd48a6c Compare June 3, 2021 08:05
@madsbk madsbk force-pushed the multiple_executors branch from cd48a6c to d0f3841 Compare June 3, 2021 08:06
@madsbk
Copy link
Contributor Author

madsbk commented Jun 3, 2021

@jrbourbeau do you recognize the CI errors? I cannot reproduce them locally :/

@quasiben
Copy link
Member

quasiben commented Jun 3, 2021

test_thread_time and test_AllProgress_lost_key are both known to be flaky tests: xref #4790

@madsbk
Copy link
Contributor Author

madsbk commented Jun 3, 2021

test_thread_time and test_AllProgress_lost_key are both known to be flaky tests: xref #4790

In that case, this PR is ready for review :)

@quasiben
Copy link
Member

quasiben commented Jun 3, 2021

I kicked the CI to rerun the tests

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your work on this @madsbk! This looks really nice, the diff is much smaller than I was expecting : )

if executor is utils._offload_executor:
continue # Never shutdown the offload executor
if isinstance(executor, ThreadPoolExecutor):
executor._work_queue.queue.clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why we're doing this step manually (I realize this predates this PR -- this isn't meant to be a blocker for merging the changes here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is because even with shutdown(wait=False), the ThreadPoolExecutor will still run pending tasks. By calling executor._work_queue.queue.clear(), we cancel them before shutting down.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Thanks for clarifying! FWIW it looks like a cancel_futures= keyword was added to Executor.shutdown in Python 3.9. Just pointing this out as something we can use in the future once 3.9 is our minimum Python version

executor._work_queue.queue.clear()
executor.shutdown(wait=executor_wait, timeout=timeout)
else:
executor.shutdown(wait=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also pass wait=executor_wait and timeout=timeout keywords here like we do above? For example, I might have a custom ProcessPoolExecutor I'm using and want to have wait=True.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added wait=executor_wait but timeout=timeout isn't a valid concurrent.futures.Executor argument. It is introduced in ThreadPoolExecutor.

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @madsbk! This is in

(the CI failures here are known flaky tests)

@jrbourbeau jrbourbeau merged commit 0a54d95 into dask:main Jun 4, 2021
@madsbk madsbk deleted the multiple_executors branch June 7, 2021 09:52
douglasdavis pushed a commit to douglasdavis/distributed that referenced this pull request Jun 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants