Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-8718 Allow disabling default cards; Allow card render failures #309

Merged
merged 15 commits into from
Sep 30, 2024
21 changes: 13 additions & 8 deletions metaflow/plugins/aip/aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def __init__(
notify_on_success=None,
sqs_url_on_error=None,
sqs_role_arn_on_error=None,
add_default_card=False,
**kwargs,
):
"""
Expand Down Expand Up @@ -204,6 +205,7 @@ def __init__(
self.notify_on_success = notify_on_success
self.sqs_url_on_error = sqs_url_on_error
self.sqs_role_arn_on_error = sqs_role_arn_on_error
self.add_default_card = add_default_card
self._client = None
self._exit_handler_created = False

Expand Down Expand Up @@ -317,7 +319,7 @@ def _create_workflow_yaml(
).items():
workflow["metadata"]["labels"][key] = value

KubeflowPipelines._add_archive_section_to_cards_artifacts(workflow)
KubeflowPipelines._update_cards_artifact_configs(workflow)

if self._exit_handler_created:
# replace entrypoint content with the exit handler handler content
Expand Down Expand Up @@ -422,14 +424,15 @@ def _create_workflow_yaml(
return workflow

@staticmethod
def _add_archive_section_to_cards_artifacts(workflow: dict):
def _update_cards_artifact_configs(workflow: dict):
# Add "archive" none section to "-cards" artifacts because by default
# they are tarred and hence not viewable in the Argo UI
for template in workflow["spec"]["templates"]:
if "outputs" in template and "artifacts" in template["outputs"]:
for artifact in template["outputs"]["artifacts"]:
if "-card" in artifact["name"]:
artifact["archive"] = {"none": {}}
artifact["optional"] = True

@staticmethod
def _config_map(workflow_name: str, max_run_concurrency: int):
Expand Down Expand Up @@ -1407,6 +1410,8 @@ def _create_metaflow_step_op(
preceding_component_inputs: List[str],
preceding_component_outputs_dict: Dict[str, dsl.PipelineParam],
) -> ContainerOp:
card_decos = [deco for deco in node.decorators if deco.name == "card"]
cloudw marked this conversation as resolved.
Show resolved Hide resolved

# TODO (hariharans): https://zbrt.atl.zillow.net/browse/AIP-5406
# (Title: Clean up output formatting of workflow and pod specs in container op)
# double json.dumps() to ensure we have the correct quotation marks
Expand Down Expand Up @@ -1448,6 +1453,10 @@ def _create_metaflow_step_op(
metaflow_execution_cmd += " --is_split_index"
if node.type == "join":
metaflow_execution_cmd += " --is-join-step"
if self.add_default_card:
metaflow_execution_cmd += " --add-default-card"
if not card_decos:
metaflow_execution_cmd += " --skip-card-artifacts"

metaflow_execution_cmd += ' --preceding_component_outputs_dict "'
for key in preceding_component_outputs_dict:
Expand Down Expand Up @@ -1476,13 +1485,9 @@ def _create_metaflow_step_op(
)

file_outputs: Dict[str, str] = {
"card": "/tmp/outputs/cards/card.html",
f"card-{i}": f"/tmp/outputs/cards/card-{i}.html"
for i in range(len(card_decos))
}
i = 1 # the default card would be i == 0
for deco in node.decorators:
if deco.name == "card":
file_outputs[f"card{i}"] = f"/tmp/outputs/cards/card{i}.html"
i = i + 1

if node.type == "foreach":
file_outputs["foreach_splits"] = "/tmp/outputs/foreach_splits/data"
Expand Down
17 changes: 17 additions & 0 deletions metaflow/plugins/aip/aip_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,14 @@ def cli_decorator(func: Callable):
"If not set, the default iam role associated with the pod will be used",
show_default=True,
)
@click.option(
"--add-default-card",
"add_default_card",
default=from_conf("METAFLOW_AIP_ADD_DEFAULT_CARD", default=True),
type=bool,
help="Whether to add default card to all workflow steps",
show_default=True,
)
cloudw marked this conversation as resolved.
Show resolved Hide resolved
@functools.wraps(func)
def wrapper_common_options(*args, **kwargs):
return func(*args, **kwargs)
Expand Down Expand Up @@ -323,6 +331,7 @@ def run(
sqs_role_arn_on_error=None,
argo_wait=False,
wait_for_completion_timeout=None,
add_default_card=True,
**kwargs,
):
"""
Expand Down Expand Up @@ -353,6 +362,7 @@ def run(
notify_on_success=notify_on_success,
sqs_url_on_error=sqs_url_on_error,
sqs_role_arn_on_error=sqs_role_arn_on_error,
add_default_card=add_default_card,
)

if yaml_only:
Expand Down Expand Up @@ -516,6 +526,7 @@ def create(
recurring_run_enable=None,
recurring_run_cron=None,
recurring_run_concurrency=None,
add_default_card=True,
**kwargs,
):
"""
Expand Down Expand Up @@ -549,6 +560,7 @@ def create(
notify_on_success=notify_on_success,
sqs_url_on_error=sqs_url_on_error,
sqs_role_arn_on_error=sqs_role_arn_on_error,
add_default_card=add_default_card,
)

if yaml_only:
Expand Down Expand Up @@ -669,6 +681,7 @@ def make_flow(
notify_on_success,
sqs_url_on_error,
sqs_role_arn_on_error,
add_default_card,
):
"""
Analogous to step_functions_cli.py
Expand All @@ -682,6 +695,9 @@ def make_flow(

# Attach AIP decorator to the flow
decorators._attach_decorators(obj.flow, [AIPInternalDecorator.name])
if add_default_card:
decorators._attach_decorators(obj.flow, ["card:id=default"])

decorators._init_step_decorators(
obj.flow, obj.graph, obj.environment, obj.flow_datastore, obj.logger
)
Expand Down Expand Up @@ -725,4 +741,5 @@ def make_flow(
notify_on_success=notify_on_success,
sqs_url_on_error=sqs_url_on_error,
sqs_role_arn_on_error=sqs_role_arn_on_error,
add_default_card=add_default_card,
)
71 changes: 52 additions & 19 deletions metaflow/plugins/aip/aip_metaflow_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
AIP_JOIN_METAFLOW_S3OP_NUM_WORKERS,
)
from metaflow.plugins.cards.card_client import get_cards, Card
from metaflow.plugins.cards.exception import CardNotPresentException
from ... import R, metaflow_version


Expand All @@ -35,23 +36,49 @@ def _write_card_artifacts(
passed_in_split_indexes: str,
run_id: str,
):
"""
Pull card artifacts from datastore and add them to the Argo artifact output path.
Cards should already be uploaded to the datastore as part of the @card decorator.
No exception is thrown: cards are available to view through Metaflow UI or CLI even if this function fails.
"""

task_id_template: str = f"{task_id}.{passed_in_split_indexes}".strip(".")
pathspec = f"{flow_name}/{run_id}/{step_name}/{task_id_template}"

cards: List[Card] = list(get_cards(pathspec))
retry_limit = 3
cards: List[Card] = []
for attempt in range(retry_limit):
try:
cards: List[Card] = list(get_cards(pathspec))
except:
if attempt == retry_limit - 1: # Last attempt failed
logging.exception(
f"Failed to get cards from Metaflow backend for pathspec {pathspec}"
"Please view cards through Metaflow UI, or refer to https://docs.metaflow.org/metaflow/visualizing-results/effortless-task-inspection-with-default-cards#accessing-cards-via-an-api"
)
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we retry calling the metaflow metadata store, we should put a sleep in between retries, even a of 1s or 0.5 would make a difference.

break

if not cards:
return

# sort such that the default card is first
sorted_cards = sorted(
cards, key=lambda card: (card.type != "default", cards.index(card))
)

i = 0
pathlib.Path("/tmp/outputs/cards/").mkdir(parents=True, exist_ok=True)
for card in sorted_cards:
iter_name = "" if i == 0 else i
file_name = f"/tmp/outputs/cards/card{iter_name}.html"
with open(file_name, "w") as card_file:
card_file.write(card.get())
i = i + 1
for index, card in enumerate(sorted_cards):
file_name = f"/tmp/outputs/cards/card-{index}.html"
try:
with open(file_name, "w") as card_file:
card_file.write(card.get())
except Exception:
logging.exception(
f"Failed to write card {index} of type {card.type} to Argo artifact output."
"Please view cards through Metaflow UI, or refer to https://docs.metaflow.org/metaflow/visualizing-results/effortless-task-inspection-with-default-cards#accessing-cards-via-an-api"
cloudw marked this conversation as resolved.
Show resolved Hide resolved
)
raise


def _step_cli(
Expand All @@ -68,6 +95,7 @@ def _step_cli(
max_user_code_retries: int,
workflow_name: str,
script_name: str,
add_default_card: bool,
) -> str:
"""
Analogous to step_functions.py
Expand Down Expand Up @@ -165,7 +193,7 @@ def _step_cli(

step: List[str] = [
"--with=aip",
"--with 'card:id=default'",
"--with 'card:id=default'" if add_default_card else "",
"step",
step_name,
"--run-id %s" % run_id,
Expand Down Expand Up @@ -275,6 +303,8 @@ def _command(
@click.option("--workflow_name")
@click.option("--is-interruptible/--not-interruptible", default=False)
@click.option("--is-join-step", is_flag=True, default=False)
@click.option("--add-default-card", is_flag=True, default=False)
@click.option("--skip-card-artifacts", is_flag=True, default=False)
def aip_metaflow_step(
volume_dir: str,
environment: str,
Expand All @@ -300,6 +330,8 @@ def aip_metaflow_step(
workflow_name: str,
is_interruptible: bool,
is_join_step: bool,
add_default_card: bool,
skip_card_artifacts: bool,
) -> None:
"""
(1) Renders and runs the Metaflow package_commands and Metaflow step
Expand Down Expand Up @@ -331,6 +363,7 @@ def aip_metaflow_step(
user_code_retries,
workflow_name,
script_name,
add_default_card,
)

# expose passed KFP passed in arguments as environment variables to
Expand Down Expand Up @@ -385,8 +418,7 @@ def aip_metaflow_step(
env["METAFLOW_PARAMETERS"] = flow_parameters_json

# TODO: Map username to KFP specific user/profile/namespace
# Running Metaflow
# KFP orchestrator -> running MF runtime (runs user code, handles state)
# Running Metaflow runtime (runs user code, handles state)
with Popen(
cmd, shell=True, universal_newlines=True, executable="/bin/bash", env=env
) as process:
Expand Down Expand Up @@ -435,14 +467,15 @@ def aip_metaflow_step(
with open(output_file, "w") as f:
f.write(str(values[idx]))

# get card and write to output file
_write_card_artifacts(
flow_name,
step_name,
task_id,
passed_in_split_indexes,
metaflow_run_id,
)
# Write card manifest (html) to Argo output artifact path.
if not skip_card_artifacts:
_write_card_artifacts(
flow_name,
step_name,
task_id,
passed_in_split_indexes,
metaflow_run_id,
)


if __name__ == "__main__":
Expand Down
Loading