Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-8718 Allow disabling default cards; Allow card render failures #309

Merged
merged 15 commits into from
Sep 30, 2024
5 changes: 3 additions & 2 deletions metaflow/plugins/aip/aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def _create_workflow_yaml(
).items():
workflow["metadata"]["labels"][key] = value

KubeflowPipelines._add_archive_section_to_cards_artifacts(workflow)
KubeflowPipelines._update_cards_artifact_configs(workflow)

if self._exit_handler_created:
# replace entrypoint content with the exit handler handler content
Expand Down Expand Up @@ -422,14 +422,15 @@ def _create_workflow_yaml(
return workflow

@staticmethod
def _add_archive_section_to_cards_artifacts(workflow: dict):
def _update_cards_artifact_configs(workflow: dict):
# Add "archive" none section to "-cards" artifacts because by default
# they are tarred and hence not viewable in the Argo UI
for template in workflow["spec"]["templates"]:
if "outputs" in template and "artifacts" in template["outputs"]:
for artifact in template["outputs"]["artifacts"]:
if "-card" in artifact["name"]:
artifact["archive"] = {"none": {}}
artifact["optional"] = True

@staticmethod
def _config_map(workflow_name: str, max_run_concurrency: int):
Expand Down
15 changes: 15 additions & 0 deletions metaflow/plugins/aip/aip_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,13 @@ def cli_decorator(func: Callable):
"If not set, the default iam role associated with the pod will be used",
show_default=True,
)
@click.option(
"--add-default-cards",
"add_default_cards",
default=True,
help="Whether to add default card to all workflow steps",
show_default=True,
)
cloudw marked this conversation as resolved.
Show resolved Hide resolved
@functools.wraps(func)
def wrapper_common_options(*args, **kwargs):
return func(*args, **kwargs)
Expand Down Expand Up @@ -323,6 +330,7 @@ def run(
sqs_role_arn_on_error=None,
argo_wait=False,
wait_for_completion_timeout=None,
add_default_cards=True,
**kwargs,
):
"""
Expand Down Expand Up @@ -353,6 +361,7 @@ def run(
notify_on_success=notify_on_success,
sqs_url_on_error=sqs_url_on_error,
sqs_role_arn_on_error=sqs_role_arn_on_error,
add_default_cards=add_default_cards,
)

if yaml_only:
Expand Down Expand Up @@ -516,6 +525,7 @@ def create(
recurring_run_enable=None,
recurring_run_cron=None,
recurring_run_concurrency=None,
add_default_cards=True,
**kwargs,
):
"""
Expand Down Expand Up @@ -549,6 +559,7 @@ def create(
notify_on_success=notify_on_success,
sqs_url_on_error=sqs_url_on_error,
sqs_role_arn_on_error=sqs_role_arn_on_error,
add_default_cards=add_default_cards,
)

if yaml_only:
Expand Down Expand Up @@ -669,6 +680,7 @@ def make_flow(
notify_on_success,
sqs_url_on_error,
sqs_role_arn_on_error,
add_default_cards,
):
"""
Analogous to step_functions_cli.py
Expand All @@ -682,6 +694,9 @@ def make_flow(

# Attach AIP decorator to the flow
decorators._attach_decorators(obj.flow, [AIPInternalDecorator.name])
if add_default_cards:
decorators._attach_decorators(obj.flow, ["card:id=default"])

decorators._init_step_decorators(
obj.flow, obj.graph, obj.environment, obj.flow_datastore, obj.logger
)
Expand Down
42 changes: 24 additions & 18 deletions metaflow/plugins/aip/aip_metaflow_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ def _write_card_artifacts(
for card in sorted_cards:
iter_name = "" if i == 0 else i
file_name = f"/tmp/outputs/cards/card{iter_name}.html"
with open(file_name, "w") as card_file:
card_file.write(card.get())
try:
with open(file_name, "w") as card_file:
card_file.write(card.get())
except Exception:
logging.error(f"Failed to write card {i} of type {card.type}")
raise
i = i + 1


Expand Down Expand Up @@ -129,12 +133,12 @@ def _step_cli(
"--max-value-size=0",
input_paths,
]
cmd: str = "if ! %s >/dev/null 2>/dev/null; then %s && %s; fi" % (
param_cmd: str = "if ! %s >/dev/null 2>/dev/null; then %s && %s; fi" % (
cloudw marked this conversation as resolved.
Show resolved Hide resolved
" ".join(exists),
export_params,
" ".join(params),
)
cmds.append(cmd)
cmds.append(param_cmd)

top_level: List[str] = [
"--quiet",
Expand Down Expand Up @@ -165,7 +169,6 @@ def _step_cli(

step: List[str] = [
"--with=aip",
"--with 'card:id=default'",
"step",
step_name,
"--run-id %s" % run_id,
Expand Down Expand Up @@ -352,7 +355,7 @@ def aip_metaflow_step(
flow_name,
is_interruptible,
)
cmd: str = cmd_template.format(
step_cmd: str = cmd_template.format(
cloudw marked this conversation as resolved.
Show resolved Hide resolved
run_id=metaflow_run_id,
passed_in_split_indexes=passed_in_split_indexes,
)
Expand Down Expand Up @@ -385,16 +388,15 @@ def aip_metaflow_step(
env["METAFLOW_PARAMETERS"] = flow_parameters_json

# TODO: Map username to KFP specific user/profile/namespace
# Running Metaflow
# KFP orchestrator -> running MF runtime (runs user code, handles state)
# Running Metaflow runtime (runs user code, handles state)
with Popen(
cmd, shell=True, universal_newlines=True, executable="/bin/bash", env=env
step_cmd, shell=True, universal_newlines=True, executable="/bin/bash", env=env
) as process:
pass

if process.returncode != 0:
logging.info(f"---- Following command returned: {process.returncode}")
logging.info(cmd.replace(" && ", "\n"))
logging.info(step_cmd.replace(" && ", "\n"))
logging.info("----")
raise Exception("Returned: %s" % process.returncode)

Expand Down Expand Up @@ -435,14 +437,18 @@ def aip_metaflow_step(
with open(output_file, "w") as f:
f.write(str(values[idx]))

# get card and write to output file
_write_card_artifacts(
flow_name,
step_name,
task_id,
passed_in_split_indexes,
metaflow_run_id,
)
# Write card manifest (html) to Argo output artifact path.
try:
cloudw marked this conversation as resolved.
Show resolved Hide resolved
_write_card_artifacts(
flow_name,
step_name,
task_id,
passed_in_split_indexes,
metaflow_run_id,
)
except Exception as e:
# Workflow should still succeed even if cards fail to render
logging.error(f"Failed to write card artifacts: {e}")


if __name__ == "__main__":
Expand Down
Loading