-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/data 2175 kill timeout spark #45
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm, it would be nice to add more context and the PR description. E.g.: some documentation about what is best practice in ariflow handling timeout, how did you come to this implementation.
Also it would be nice to handle when we cancel/clear the job on the UI, so that also kills the running spark job.
@@ -91,6 +91,7 @@ def _create_operator(self, **kwargs): | |||
job_args=_parse_args(self._template_parameters), | |||
spark_args=_parse_spark_args(self._task.spark_args), | |||
spark_conf_args=_parse_spark_args(self._task.spark_conf_args, '=', 'conf '), | |||
spark_app_name=self._task.spark_conf_args.get("spark.app.name", ""), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉 🎆
Parameters={"commands": [kill_command]} | ||
) | ||
raise AirflowException( | ||
f"Spark job exceeded the execution timeout of {self._execution_timeout} seconds and was terminated.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this only happen if it's a timeout? What if we cancel the job on airflow UI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
truee!!
) | ||
if application_id: | ||
self.kill_spark_job(emr_master_instance_id, application_id) | ||
raise AirflowException("Task timed out and the Spark job was terminated.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are already raising an exception in the kill_spark_job, so I'm not even sure the code will ever get here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah true! > <
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm. @claudiazi Do you know why were there so many code style changes? It made the review a bit harder, because it was not clear what is an actual change and what not.
@siklosid sorry!! I blacked the code 🙈 |
@claudiazi I think black is part of the pre-commit hook. Does this mean it was not blacked before? Or was it blacked with different settings? |
@siklosid seems that the pre-commit was not working at all. 🤔 |
Feature:
kill_spark_job
handles the termination of Spark jobs via sending the yarn command to ssmon_kill
to terminate the spark via callingkill_spark_job
function when the Airflow task is marked as failed, or manually killed via the Airflow UI or CLI.kill_spark_job
Tested in datastg: