Skip to content

Commit

Permalink
Merge pull request #284 from zillow/tz/AIP-8067-2m-retry
Browse files Browse the repository at this point in the history
AIP-8067 set retry "2m" not 2
  • Loading branch information
talebzeghmi authored Feb 8, 2024
2 parents d00ae83 + f5cdff9 commit c83a345
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 12 deletions.
5 changes: 4 additions & 1 deletion metaflow/plugins/aip/aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,10 @@ def _get_minutes_between_retries(node: DAGNode) -> Optional[str]:
retry_deco = [deco for deco in node.decorators if deco.name == "retry"]
if retry_deco:
val = retry_deco[0].attributes.get("minutes_between_retries")
return f"{val}m" if isinstance(val, numbers.Number) else val
is_number = isinstance(val, numbers.Number) or (
isinstance(val, str) and val.isdecimal()
)
return f"{val}m" if is_number else val
return None

@staticmethod
Expand Down
13 changes: 8 additions & 5 deletions metaflow/plugins/aip/tests/flows/resilient_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from metaflow.exception import MetaflowExceptionWrapper


MINUTES_BETWEEN_RETRIES = 0


def validate_checkpoint_dir(checkpoint_dir: str):
key = f"{current.run_id}/checkpoints/{current.step_name}/{current.task_id}"
assert checkpoint_dir.endswith(key)
Expand All @@ -27,7 +30,7 @@ def validate_checkpoint_dir(checkpoint_dir: str):
class ResilientFlow(FlowSpec):
retry_log = "Retry count = {retry_count}"

@retry
@retry(minutes_between_retries=MINUTES_BETWEEN_RETRIES)
@step
def start(self):
self.start_retry_count = current.retry_count
Expand Down Expand Up @@ -79,7 +82,7 @@ def download_kubectl(self):
print(str(output))
subprocess.check_output("chmod u+x ./kubectl", shell=True)

@retry
@retry(minutes_between_retries=MINUTES_BETWEEN_RETRIES)
@step
def user_failure(self):
if self.start_retry_count < 1:
Expand All @@ -93,7 +96,7 @@ def user_failure(self):
print("let's succeed")
self.next(self.no_retry)

@retry(times=0)
@retry(times=0, minutes_between_retries=MINUTES_BETWEEN_RETRIES)
@step
def no_retry(self):
print("Testing logging for retries")
Expand All @@ -113,14 +116,14 @@ def no_retry(self):
self.next(self.compute)

@catch(var="compute_failed")
@retry(times=0)
@retry(times=0, minutes_between_retries=MINUTES_BETWEEN_RETRIES)
@step
def compute(self):
self.x = 1 / 0
self.next(self.platform_exception)

@catch(var="platform_exception_failed")
@retry(times=1)
@retry(times=1, minutes_between_retries=MINUTES_BETWEEN_RETRIES)
@step
def platform_exception(self):
assert type(self.compute_failed) == MetaflowExceptionWrapper
Expand Down
36 changes: 30 additions & 6 deletions metaflow/plugins/aip/tests/run_integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,21 @@
"nested_foreach_with_branching.py",
]

WITH_RETRY = "--with retry:minutes_between_retries=0"


def test_s3_sensor_flow(pytestconfig) -> None:
# ensure the s3_sensor waits for some time before the key exists
file_name: str = f"s3-sensor-file-{uuid.uuid1()}.txt"
file_name_for_formatter_test: str = f"s3-sensor-file-{uuid.uuid1()}.txt"

s3_sensor_flow_cmd: str = (
f"{_python()} flows/s3_sensor_flow.py --datastore=s3 --with retry aip run "
f"{_python()} flows/s3_sensor_flow.py --datastore=s3 {WITH_RETRY} aip run "
f"--file_name {file_name} --notify "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
s3_sensor_with_formatter_flow_cmd: str = (
f"{_python()} flows/s3_sensor_with_formatter_flow.py --datastore=s3 --with retry aip run "
f"{_python()} flows/s3_sensor_with_formatter_flow.py --datastore=s3 {WITH_RETRY} aip run "
f"--file_name_for_formatter_test {file_name_for_formatter_test} --notify "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
Expand Down Expand Up @@ -109,7 +111,7 @@ def test_s3_sensor_flow(pytestconfig) -> None:
)

validate_s3_sensor_flow_cmd: str = (
f"{_python()} flows/validate_s3_sensor_flows.py --datastore=s3 --with retry aip run "
f"{_python()} flows/validate_s3_sensor_flows.py --datastore=s3 {WITH_RETRY} aip run "
f"--file_name {file_name} --file_name_for_formatter_test {file_name_for_formatter_test} "
f"--s3_sensor_argo_workflow_name {s3_sensor_argo_workflow_name} --s3_sensor_with_formatter_argo_workflow_name {s3_sensor_with_formatter_argo_workflow_name} "
f"--argo-wait "
Expand Down Expand Up @@ -184,7 +186,7 @@ def test_error_and_opsgenie_alert(pytestconfig) -> None:
# Test logging of raise_error_flow
check_error_handling_flow_cmd: str = (
f"{_python()} flows/check_error_handling_flow.py "
f"--datastore=s3 --with retry aip run "
f"--datastore=s3 {WITH_RETRY} aip run "
f"--argo-wait --workflow-timeout 1800 "
f"--experiment metaflow_test --tag test_t1 "
f"--error_flow_id={error_flow_id} "
Expand All @@ -207,7 +209,7 @@ def test_flows(pytestconfig, flow_file_path: str) -> None:
full_path: str = os.path.join("flows", flow_file_path)

test_cmd: str = (
f"{_python()} {full_path} --datastore=s3 --with retry aip run "
f"{_python()} {full_path} --datastore=s3 {WITH_RETRY} aip run "
f"--argo-wait --workflow-timeout 1800 "
f"--max-parallelism 3 --experiment metaflow_test --tag test_t1 "
f"--sys-tag test_sys_t1:sys_tag_value "
Expand Down Expand Up @@ -336,6 +338,28 @@ def test_kfp_pod_default(pytestconfig) -> None:
)


def test_retry_strategy(pytestconfig) -> None:
with tempfile.TemporaryDirectory() as yaml_tmp_dir:
yaml_file_path: str = os.path.join(yaml_tmp_dir, "flow_triggering_flow.yaml")

compile_to_yaml_cmd: str = (
f" {_python()} flows/flow_triggering_flow.py --no-pylint --with retry aip run"
f" --no-s3-code-package --yaml-only --pipeline-path {yaml_file_path} "
f"--tag {pytestconfig.getoption('pipeline_tag')} --notify"
)
flow_yaml = get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path)

for step in flow_yaml["spec"]["templates"]:
if step.get("container"):
# AIP-8067(talebz): regression test to ensure duration is "2m" and not "2"
assert step["retryStrategy"]["limit"] == (
7 if step["name"] == "notify-email-exit-handler" else 3
)
assert step["retryStrategy"]["retryPolicy"] == "Always"
assert step["retryStrategy"]["backoff"]["duration"] == "2m"
assert step["retryStrategy"]["backoff"]["factor"] == 3


def test_user_defined_exit_handler(pytestconfig) -> None:
with tempfile.TemporaryDirectory() as yaml_tmp_dir:
yaml_file_path: str = os.path.join(yaml_tmp_dir, "s3_sensor_flow.yaml")
Expand Down Expand Up @@ -424,7 +448,7 @@ def test_toleration_and_affinity_compile_only(pytestconfig) -> None:
)

compile_to_yaml_cmd: str = (
f"{_python()} flows/toleration_and_affinity_flow.py --datastore=s3 --with retry aip run"
f"{_python()} flows/toleration_and_affinity_flow.py --datastore=s3 {WITH_RETRY} aip run"
f" --no-s3-code-package --yaml-only --pipeline-path {yaml_file_path} "
f"--tag {pytestconfig.getoption('pipeline_tag')} "
)
Expand Down

0 comments on commit c83a345

Please sign in to comment.