-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dagster-aws] add ECS Task executor #26098
Conversation
0b99a6a
to
56bd783
Compare
4b01ac0
to
e8e006e
Compare
python_modules/libraries/dagster-docker/dagster_docker/docker_executor.py
Outdated
Show resolved
Hide resolved
7a80ddd
to
1384e4f
Compare
python_modules/libraries/dagster-aws/dagster_aws/ecs/executor.py
Outdated
Show resolved
Hide resolved
3a8f5c0
to
1c5705a
Compare
Hey @gibsondan @alangenfeld, asking for an initial review here. I have tested the ECS Executor in my personal AWS account. Adding some tests involving mocks too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
broadly makes sense to me - i think the major outstanding question is how much to delegate to the run launcher, the prototype I wrote originally always looked at the current ECS task but that was before you discovered that we also needed to re-compute the run_task_kwargs potentially. I think we want to pull those from the dagster.yaml if they are set.
Do we want to make this experimental at first? or no?
"run_task_kwargs": Field( | ||
Permissive({}), | ||
is_required=False, | ||
description=( | ||
"Additional arguments to include while running the task. See" | ||
" https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task" | ||
" for the available parameters. The overrides and taskDefinition arguments will" | ||
" always be set by the run launcher." | ||
), | ||
), | ||
"cpu": Field(IntSource, is_required=False), | ||
"memory": Field(IntSource, is_required=False), | ||
"ephemeral_storage": Field(IntSource, is_required=False), | ||
"task_overrides": Field( | ||
Permissive({}), | ||
is_required=False, | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recognizing that this was all on my original PR - there's some overlap between these parameters right? All of them can be set on run_task_kwargs. I think it makes sense to put things like cpu/memory as top-level fields since they are fields that most people will want to change
What happens if you set 'overrides' on the run_task_kwargs param though? should we not allow that? should we merge them? how do we decide which one wins if you set both "task_overrides.cpu" and "cpu"? What if you set task_overrides.containerOverrides?
the snake case vs. camel case thing is kind of a bummer too (dagster is generally snake case but the boto apis expect camelCase, so you end up with grossness like task_overrides.containerOverrides), but I don't see an obvious way around that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if you set 'overrides' on the run_task_kwargs param though? should we not allow that? should we merge them? how do we decide which one wins if you set both "task_overrides.cpu" and "cpu"? What if you set task_overrides.containerOverrides?
I think the way it's being handled right now is correct - "deeper" settings take precedence over more general fields. I fixed a small logical error in this code, otherwise it was fine.
Current priorities are:
ecs/task_override
tag (which is a json string)- specific tags such as
etc/cpu
,etc/memory
- executor config
task_overrides
field - executor config fields like
cpu
,memory
I think this just has to be documented, otherwise I don't any problems with this logic.
What if you set task_overrides.containerOverrides
I made sure to merge them correctly
the snake case vs. camel case thing is kind of a bummer too
Well, what can we do
python_modules/libraries/dagster-aws/dagster_aws/ecs/executor.py
Outdated
Show resolved
Hide resolved
task_kwargs = get_task_kwargs_from_current_task( | ||
self.ec2, | ||
current_task_metadata.cluster, | ||
current_task, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmmm I think using the same task definition as the run / calling task makes sense - but in many cases (notably dagster+ hybrid and serverless agents both) the run task kwargs are explicitly passed in via the dagster.yaml rather than derived from the current task. Can we delegate to the run launcher to determine these instead so that that logic always matches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That might require some changes to the run launcher logic to cleanly pull out the bit of logic that you would need - but that might actually be ok? What bad things if any would happen if this logic was changed to
task_kwargs = self.run_launcher._run_task_kwargs(...)
? (potentially the protected method would need to be made public)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for the record: run_launcher._run_task_kwargs
will still be using the current task if dagster.yaml
doesn't have the required configuration. But that's fine since we won't have a better configuration source anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this again... why is using the run launcher better than using the current task?
- When Dagster is deployed with the ECS run launcher, the current task will always be created by the Run Launcher, and thus should contain configuration coming from the run launcher anyway.
- Not relying on the ECS run launcher will open up the possibility of using the ECS executor with the default run launcher (but when Dagster itself is deployed on ECS)
- The run launcher config is not available during executor initialization (unlike the current task), because it needs the dagster run in order to call
ECSRunLauncher._run_task_kwargs
. This leads to more convoluted executor logic, because we will have to manage some state to save these kwargs during thelaunch_step
method execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
talked offline a bit - i think the plan is to pull the part of the run launcher out where it goes from task definition to run task kwargs (Which does not require a run) and use that here. That way we don't have to call the get_task_kwargs_from_current_task function in the executor.
python_modules/libraries/dagster-aws/dagster_aws/ecs/executor.py
Outdated
Show resolved
Hide resolved
1c5705a
to
5ac9d78
Compare
@@ -178,6 +178,7 @@ def execute(self, plan_context: PlanOrchestrationContext, execution_plan: Execut | |||
instance_concurrency_context=instance_concurrency_context, | |||
) as active_execution: | |||
running_steps: Dict[str, ExecutionStep] = {} | |||
step_worker_handles: Dict[str, Optional[str]] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i could imagine different step handles parameterizing this differnetly, but starting with a string is probably fine
"run_task_kwargs": Field( | ||
Permissive({}), | ||
is_required=False, | ||
description=( | ||
"Additional arguments to include while running the task. See" | ||
" https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task" | ||
" for the available parameters. The overrides and taskDefinition arguments will" | ||
" always be set by the run launcher." | ||
), | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there some way to make it clearer that you only need to set this if you want them to be different than what is set on the run launcher?
task_kwargs = get_task_kwargs_from_current_task( | ||
self.ec2, | ||
current_task_metadata.cluster, | ||
current_task, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
talked offline a bit - i think the plan is to pull the part of the run launcher out where it goes from task definition to run task kwargs (Which does not require a run) and use that here. That way we don't have to call the get_task_kwargs_from_current_task function in the executor.
a3268ba
to
5e91e55
Compare
@gibsondan hey Daniel, I'm now pulling base configuration from the run launcher. |
python_modules/libraries/dagster-aws/dagster_aws/ecs/launcher.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few important inlines before landing, particularly the part about determining the run launcher kwargs in the constructor rather than within launch_step
requirements=multiple_process_executor_requirements(), | ||
) | ||
def ecs_executor(init_context: InitExecutorContext) -> Executor: | ||
"""Executor which launches steps as ECS tasks.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need more of a docblock to show up in the API docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate? Do you just mean this needs more docs? Like a codeblock with usage example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just wondering how it will appear here https://docs.dagster.io/_apidocs/libraries/dagster-aws#ecs
Here's how the k8s executor shows up: https://docs.dagster.io/_apidocs/libraries/dagster-k8s#dagster_k8s.k8s_job_executor
current_task_metadata = get_current_ecs_task_metadata() | ||
current_task = get_current_ecs_task( | ||
self.ecs, current_task_metadata.task_arn, current_task_metadata.cluster | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another option would to fetch the cluster and task ARN from tags on the run like we do in the run launcher here, instead of using get_current_ecs_task_metadata https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-aws/dagster_aws/ecs/launcher.py#L610-L617
This would probably have to be done later though since we do not have the run object at executor initialization time - i'm not sure if we have an ideal hook for this in the executor since you would want to do it once per run at initialization time, but not in the constructor
Not blocking at this point and something we could add later too, but something to consider
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah looks like we might need a well-defined hook for state mutations which use the run object like this one
python_modules/libraries/dagster-aws/dagster_aws/ecs/executor.py
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-aws/dagster_aws/ecs/executor.py
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-aws/dagster_aws/ecs/launcher.py
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-aws/dagster_aws/ecs/executor.py
Outdated
Show resolved
Hide resolved
task_arn = step_worker_handle | ||
cluster_arn = self._cluster_arn | ||
|
||
tasks = self.ecs.describe_tasks(tasks=[task_arn], cluster=cluster_arn).get("tasks") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not for this PR but i wonder if we should consider building in a way to batch this when there are multiple running steps at once
...aries/dagster-aws/dagster_aws_tests/ecs_tests/launcher_tests/executor_tests/test_executor.py
Show resolved
Hide resolved
3d4bf7d
to
3d4f650
Compare
requirements=multiple_process_executor_requirements(), | ||
) | ||
def ecs_executor(init_context: InitExecutorContext) -> Executor: | ||
"""Executor which launches steps as ECS tasks.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just wondering how it will appear here https://docs.dagster.io/_apidocs/libraries/dagster-aws#ecs
Here's how the k8s executor shows up: https://docs.dagster.io/_apidocs/libraries/dagster-k8s#dagster_k8s.k8s_job_executor
python_modules/libraries/dagster-aws/dagster_aws/ecs/executor.py
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-aws/dagster_aws/ecs/executor.py
Outdated
Show resolved
Hide resolved
self._run_task_kwargs = { | ||
**run_launcher_kwargs, | ||
**run_task_kwargs, | ||
"taskDefinition": current_task["taskDefinitionArn"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the user has container_context.task_definition_arn set I think we want to use that here instead
I also think this order might not be quite right? This should be first in the list so that you can override it via config if you want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the user has container_context.task_definition_arn set I think we want to use that here instead
This will come from run_launcher_kwargs
right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't know the container context yet, it is run-dependant - so it's not actually possible for it to have it here yet. We would need to check it and set it later.
the relevant code from the run launcher is here: https://github.com/dagster-io/dagster/blob/master/python_modules/libraries/dagster-aws/dagster_aws/ecs/launcher.py#L661-L664
python_modules/libraries/dagster-aws/dagster_aws/ecs/executor.py
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-aws/dagster_aws/ecs/executor.py
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-aws/dagster_aws/ecs/executor.py
Outdated
Show resolved
Hide resolved
python_modules/libraries/dagster-aws/dagster_aws/ecs/executor.py
Outdated
Show resolved
Hide resolved
job_origin = check.not_none(context.job_code_origin) | ||
return job_origin.repository_origin.container_image | ||
@staticmethod | ||
def get_image_for_run(run: DagsterRun) -> Optional[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say yes for slightly better readability
python_modules/libraries/dagster-aws/dagster_aws/ecs/launcher.py
Outdated
Show resolved
Hide resolved
15a6977
to
15bc4c2
Compare
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit 64c9d3d. |
Deploy preview for dagster-docs-beta ready! Preview available at https://dagster-docs-beta-jc613dv2c-elementl.vercel.app Direct link to changed pages: |
cae363d
to
d5526d4
Compare
Deploy preview for dagster-docs ready! Preview available at https://dagster-docs-k9c1ywlvq-elementl.vercel.app Direct link to changed pages: |
d5526d4
to
64c9d3d
Compare
…ncher and can use the same task definition of the task in which it is launched, but still allows customizing of memory,cpu, ephemeral storage, and whatever else you can override via run_task arguments.
64c9d3d
to
dfddf32
Compare
Thank you @gibsondan for your careful review! |
Summary & Motivation
This PR ads a new ECS Tasks Executor.
Currently a
ECSRunLauncher
is required in order to use the newecs_executor
and most of the ECS config comes from the run launcher. The executor can override a few important fields (such as resources and container overrides).Resolve #9671
How I Tested These Changes
Changelog
[dagster-aws] new
ecs_executor
which executes Dagster steps via AWS ECS tasks. Initially, we expect it to be used in conjunction withECSRunLauncher
.