diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index 551feecd1ee..2dc3d44ef2f 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -600,9 +600,6 @@ def task_finished( # local file system after the user code has finished execution. # This happens via datastore as a communication bridge. - # TODO: There is no guarantee that task_prestep executes before - # task_finished is invoked. That will result in AttributeError: - # 'KubernetesDecorator' object has no attribute 'metadata' error. if self.metadata.TYPE == "local": # Note that the datastore is *always* Amazon S3 (see # runtime_task_created function). diff --git a/metaflow/task.py b/metaflow/task.py index 6b73302652b..9d526c05863 100644 --- a/metaflow/task.py +++ b/metaflow/task.py @@ -1,6 +1,4 @@ from __future__ import print_function -from io import BytesIO -import math import sys import os import time @@ -8,12 +6,8 @@ from types import MethodType, FunctionType -from metaflow.sidecar import Message, MessageTypes -from metaflow.datastore.exceptions import DataException - from .metaflow_config import MAX_ATTEMPTS from .metadata_provider import MetaDatum -from .mflog import TASK_LOG_SOURCE from .datastore import Inputs, TaskDataStoreSet from .exception import ( MetaflowInternalError, @@ -616,35 +610,62 @@ def run_step( "graph_info": self.flow._graph_info, } ) - + task_pre_step_deco_errors = set() for deco in decorators: - deco.task_pre_step( - step_name, - output, - self.metadata, - run_id, - task_id, - self.flow, - self.flow._graph, - retry_count, - max_user_code_retries, - self.ubf_context, - inputs, + try: + deco.task_pre_step( + step_name, + output, + self.metadata, + run_id, + task_id, + self.flow, + self.flow._graph, + retry_count, + max_user_code_retries, + self.ubf_context, + inputs, + ) + except Exception as ex: + task_pre_step_deco_errors.add((deco.name, str(ex))) + if task_pre_step_deco_errors: + raise MetaflowInternalError( + f"Exceptions were encountered during **task_pre_step* with the *{step_name}* step decorator:\n" + + "\n".join( + [ + f"decorator: {name}\nerror: {err}" + for name, err in task_pre_step_deco_errors + ] + ) ) + task_decorate_deco_errors = set() for deco in decorators: # decorators can actually decorate the step function, # or they can replace it altogether. This functionality # is used e.g. by catch_decorator which switches to a # fallback code if the user code has failed too many # times. - step_func = deco.task_decorate( - step_func, - self.flow, - self.flow._graph, - retry_count, - max_user_code_retries, - self.ubf_context, + try: + step_func = deco.task_decorate( + step_func, + self.flow, + self.flow._graph, + retry_count, + max_user_code_retries, + self.ubf_context, + ) + except Exception as ex: + task_decorate_deco_errors.add((deco.name, str(ex))) + if task_decorate_deco_errors: + raise MetaflowInternalError( + f"Exceptions were encountered during **task_decorate* with the *{step_name}* step decorator:\n" + + "\n".join( + [ + f"decorator: {name}\nerror: {err}" + for name, err in task_decorate_deco_errors + ] + ) ) if join_type: @@ -652,13 +673,27 @@ def run_step( else: self._exec_step_function(step_func) + task_post_step_deco_errors = set() for deco in decorators: - deco.task_post_step( - step_name, - self.flow, - self.flow._graph, - retry_count, - max_user_code_retries, + try: + deco.task_post_step( + step_name, + self.flow, + self.flow._graph, + retry_count, + max_user_code_retries, + ) + except Exception as ex: + task_post_step_deco_errors.add((deco.name, str(ex))) + if task_post_step_deco_errors: + raise MetaflowInternalError( + f"Exceptions were encountered during **task_post_step* with the *{step_name}* step decorator:\n" + + "\n".join( + [ + f"decorator: {name}\nerror: {err}" + for name, err in task_post_step_deco_errors + ] + ) ) self.flow._task_ok = True