Skip to content

Commit

Permalink
Merge pull request #309 from zillow/yunw/allow-card-disabling
Browse files Browse the repository at this point in the history
AIP-8718 Allow disabling default cards; Allow card render failures
  • Loading branch information
cloudw authored Sep 30, 2024
2 parents 025acf0 + f551d6d commit ffc807a
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 27 deletions.
21 changes: 13 additions & 8 deletions metaflow/plugins/aip/aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def __init__(
notify_on_success=None,
sqs_url_on_error=None,
sqs_role_arn_on_error=None,
add_default_card=False,
**kwargs,
):
"""
Expand Down Expand Up @@ -204,6 +205,7 @@ def __init__(
self.notify_on_success = notify_on_success
self.sqs_url_on_error = sqs_url_on_error
self.sqs_role_arn_on_error = sqs_role_arn_on_error
self.add_default_card = add_default_card
self._client = None
self._exit_handler_created = False

Expand Down Expand Up @@ -317,7 +319,7 @@ def _create_workflow_yaml(
).items():
workflow["metadata"]["labels"][key] = value

KubeflowPipelines._add_archive_section_to_cards_artifacts(workflow)
KubeflowPipelines._update_cards_artifact_configs(workflow)

if self._exit_handler_created:
# replace entrypoint content with the exit handler handler content
Expand Down Expand Up @@ -422,14 +424,15 @@ def _create_workflow_yaml(
return workflow

@staticmethod
def _add_archive_section_to_cards_artifacts(workflow: dict):
def _update_cards_artifact_configs(workflow: dict):
# Add "archive" none section to "-cards" artifacts because by default
# they are tarred and hence not viewable in the Argo UI
for template in workflow["spec"]["templates"]:
if "outputs" in template and "artifacts" in template["outputs"]:
for artifact in template["outputs"]["artifacts"]:
if "-card" in artifact["name"]:
artifact["archive"] = {"none": {}}
artifact["optional"] = True

@staticmethod
def _config_map(workflow_name: str, max_run_concurrency: int):
Expand Down Expand Up @@ -1407,6 +1410,8 @@ def _create_metaflow_step_op(
preceding_component_inputs: List[str],
preceding_component_outputs_dict: Dict[str, dsl.PipelineParam],
) -> ContainerOp:
card_decos = [deco for deco in node.decorators if deco.name == "card"]

# TODO (hariharans): https://zbrt.atl.zillow.net/browse/AIP-5406
# (Title: Clean up output formatting of workflow and pod specs in container op)
# double json.dumps() to ensure we have the correct quotation marks
Expand Down Expand Up @@ -1448,6 +1453,10 @@ def _create_metaflow_step_op(
metaflow_execution_cmd += " --is_split_index"
if node.type == "join":
metaflow_execution_cmd += " --is-join-step"
if self.add_default_card:
metaflow_execution_cmd += " --add-default-card"
if not card_decos:
metaflow_execution_cmd += " --skip-card-artifacts"

metaflow_execution_cmd += ' --preceding_component_outputs_dict "'
for key in preceding_component_outputs_dict:
Expand Down Expand Up @@ -1476,13 +1485,9 @@ def _create_metaflow_step_op(
)

file_outputs: Dict[str, str] = {
"card": "/tmp/outputs/cards/card.html",
f"card-{i}": f"/tmp/outputs/cards/card-{i}.html"
for i in range(len(card_decos))
}
i = 1 # the default card would be i == 0
for deco in node.decorators:
if deco.name == "card":
file_outputs[f"card{i}"] = f"/tmp/outputs/cards/card{i}.html"
i = i + 1

if node.type == "foreach":
file_outputs["foreach_splits"] = "/tmp/outputs/foreach_splits/data"
Expand Down
17 changes: 17 additions & 0 deletions metaflow/plugins/aip/aip_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,14 @@ def cli_decorator(func: Callable):
"If not set, the default iam role associated with the pod will be used",
show_default=True,
)
@click.option(
"--add-default-card",
"add_default_card",
default=from_conf("METAFLOW_AIP_ADD_DEFAULT_CARD", default=True),
type=bool,
help="Whether to add default card to all workflow steps",
show_default=True,
)
@functools.wraps(func)
def wrapper_common_options(*args, **kwargs):
return func(*args, **kwargs)
Expand Down Expand Up @@ -323,6 +331,7 @@ def run(
sqs_role_arn_on_error=None,
argo_wait=False,
wait_for_completion_timeout=None,
add_default_card=True,
**kwargs,
):
"""
Expand Down Expand Up @@ -353,6 +362,7 @@ def run(
notify_on_success=notify_on_success,
sqs_url_on_error=sqs_url_on_error,
sqs_role_arn_on_error=sqs_role_arn_on_error,
add_default_card=add_default_card,
)

if yaml_only:
Expand Down Expand Up @@ -516,6 +526,7 @@ def create(
recurring_run_enable=None,
recurring_run_cron=None,
recurring_run_concurrency=None,
add_default_card=True,
**kwargs,
):
"""
Expand Down Expand Up @@ -549,6 +560,7 @@ def create(
notify_on_success=notify_on_success,
sqs_url_on_error=sqs_url_on_error,
sqs_role_arn_on_error=sqs_role_arn_on_error,
add_default_card=add_default_card,
)

if yaml_only:
Expand Down Expand Up @@ -669,6 +681,7 @@ def make_flow(
notify_on_success,
sqs_url_on_error,
sqs_role_arn_on_error,
add_default_card,
):
"""
Analogous to step_functions_cli.py
Expand All @@ -682,6 +695,9 @@ def make_flow(

# Attach AIP decorator to the flow
decorators._attach_decorators(obj.flow, [AIPInternalDecorator.name])
if add_default_card:
decorators._attach_decorators(obj.flow, ["card:id=default"])

decorators._init_step_decorators(
obj.flow, obj.graph, obj.environment, obj.flow_datastore, obj.logger
)
Expand Down Expand Up @@ -725,4 +741,5 @@ def make_flow(
notify_on_success=notify_on_success,
sqs_url_on_error=sqs_url_on_error,
sqs_role_arn_on_error=sqs_role_arn_on_error,
add_default_card=add_default_card,
)
73 changes: 54 additions & 19 deletions metaflow/plugins/aip/aip_metaflow_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import pathlib
import time
from subprocess import Popen
from typing import Dict, List

Expand All @@ -25,6 +26,7 @@
AIP_JOIN_METAFLOW_S3OP_NUM_WORKERS,
)
from metaflow.plugins.cards.card_client import get_cards, Card
from metaflow.plugins.cards.exception import CardNotPresentException
from ... import R, metaflow_version


Expand All @@ -35,23 +37,50 @@ def _write_card_artifacts(
passed_in_split_indexes: str,
run_id: str,
):
"""
Pull card artifacts from datastore and add them to the Argo artifact output path.
Cards should already be uploaded to the datastore as part of the @card decorator.
No exception is thrown: cards are available to view through Metaflow UI or CLI even if this function fails.
"""

task_id_template: str = f"{task_id}.{passed_in_split_indexes}".strip(".")
pathspec = f"{flow_name}/{run_id}/{step_name}/{task_id_template}"
workaround_instruction = "Please view cards through Metaflow UI, or refer to https://docs.metaflow.org/metaflow/visualizing-results/effortless-task-inspection-with-default-cards#accessing-cards-via-an-api"

retry_limit = 3
cards: List[Card] = []
for attempt in range(retry_limit):
try:
if attempt > 0:
time.sleep(1) # Spacing out retries to the backend / datastore
cards: List[Card] = list(get_cards(pathspec))
except:
if attempt == retry_limit - 1: # Last attempt failed
logging.exception(
f"Failed to get cards from Metaflow backend for pathspec {pathspec}. {workaround_instruction}"
)
continue
break

if not cards:
return

cards: List[Card] = list(get_cards(pathspec))
# sort such that the default card is first
sorted_cards = sorted(
cards, key=lambda card: (card.type != "default", cards.index(card))
)

i = 0
pathlib.Path("/tmp/outputs/cards/").mkdir(parents=True, exist_ok=True)
for card in sorted_cards:
iter_name = "" if i == 0 else i
file_name = f"/tmp/outputs/cards/card{iter_name}.html"
with open(file_name, "w") as card_file:
card_file.write(card.get())
i = i + 1
for index, card in enumerate(sorted_cards):
file_name = f"/tmp/outputs/cards/card-{index}.html"
try:
with open(file_name, "w") as card_file:
card_file.write(card.get())
except Exception:
logging.exception(
f"Failed to write card {index} of type {card.type} to Argo artifact output. {workaround_instruction}"
)
raise


def _step_cli(
Expand All @@ -68,6 +97,7 @@ def _step_cli(
max_user_code_retries: int,
workflow_name: str,
script_name: str,
add_default_card: bool,
) -> str:
"""
Analogous to step_functions.py
Expand Down Expand Up @@ -165,7 +195,7 @@ def _step_cli(

step: List[str] = [
"--with=aip",
"--with 'card:id=default'",
"--with 'card:id=default'" if add_default_card else "",
"step",
step_name,
"--run-id %s" % run_id,
Expand Down Expand Up @@ -275,6 +305,8 @@ def _command(
@click.option("--workflow_name")
@click.option("--is-interruptible/--not-interruptible", default=False)
@click.option("--is-join-step", is_flag=True, default=False)
@click.option("--add-default-card", is_flag=True, default=False)
@click.option("--skip-card-artifacts", is_flag=True, default=False)
def aip_metaflow_step(
volume_dir: str,
environment: str,
Expand All @@ -300,6 +332,8 @@ def aip_metaflow_step(
workflow_name: str,
is_interruptible: bool,
is_join_step: bool,
add_default_card: bool,
skip_card_artifacts: bool,
) -> None:
"""
(1) Renders and runs the Metaflow package_commands and Metaflow step
Expand Down Expand Up @@ -331,6 +365,7 @@ def aip_metaflow_step(
user_code_retries,
workflow_name,
script_name,
add_default_card,
)

# expose passed KFP passed in arguments as environment variables to
Expand Down Expand Up @@ -385,8 +420,7 @@ def aip_metaflow_step(
env["METAFLOW_PARAMETERS"] = flow_parameters_json

# TODO: Map username to KFP specific user/profile/namespace
# Running Metaflow
# KFP orchestrator -> running MF runtime (runs user code, handles state)
# Running Metaflow runtime (runs user code, handles state)
with Popen(
cmd, shell=True, universal_newlines=True, executable="/bin/bash", env=env
) as process:
Expand Down Expand Up @@ -435,14 +469,15 @@ def aip_metaflow_step(
with open(output_file, "w") as f:
f.write(str(values[idx]))

# get card and write to output file
_write_card_artifacts(
flow_name,
step_name,
task_id,
passed_in_split_indexes,
metaflow_run_id,
)
# Write card manifest (html) to Argo output artifact path.
if not skip_card_artifacts:
_write_card_artifacts(
flow_name,
step_name,
task_id,
passed_in_split_indexes,
metaflow_run_id,
)


if __name__ == "__main__":
Expand Down

0 comments on commit ffc807a

Please sign in to comment.