From c4d23583ff4d02ccbca0534b954cfb890bcd4480 Mon Sep 17 00:00:00 2001 From: Johann Miller Date: Tue, 30 Jan 2024 14:09:48 -0500 Subject: [PATCH] fix asset checks + step launchers (#19443) https://dagsterlabs.slack.com/archives/C058314NT5H/p1705693260012779 Populate step launched instances with ASSET_CHECK_EVALUATION_PLANNED events. The alternative would be to raise the restriction that ASSET_CHECK_EVALUATION events are preceded by planned events, either globally or just for step launcher instances. I think this approach is workable. --- .../dagster/_core/execution/context/system.py | 4 +- .../_core/execution/plan/external_step.py | 24 +++++++- .../test_external_step.py | 61 +++++++++++++++++-- 3 files changed, 81 insertions(+), 8 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 60235f365eeb2..25acd4c272b72 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -931,9 +931,9 @@ def is_in_graph_asset(self) -> bool: @property def is_asset_check_step(self) -> bool: """Whether this step corresponds to an asset check.""" - node_handle = self.node_handle return ( - self.job_def.asset_layer.asset_checks_defs_by_node_handle.get(node_handle) is not None + self.job_def.asset_layer.asset_checks_defs_by_node_handle.get(self.node_handle) + is not None ) def set_data_version(self, asset_key: AssetKey, data_version: "DataVersion") -> None: diff --git a/python_modules/dagster/dagster/_core/execution/plan/external_step.py b/python_modules/dagster/dagster/_core/execution/plan/external_step.py index 9e6cf35860487..5e35196167a3a 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/external_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/external_step.py @@ -1,3 +1,4 @@ +import logging import os import pickle import shutil @@ -13,7 +14,7 @@ from dagster._core.definitions.resource_definition import dagster_maintained_resource, resource from dagster._core.definitions.step_launcher import StepLauncher, StepRunRef from dagster._core.errors import raise_execution_interrupts -from dagster._core.events import DagsterEvent +from dagster._core.events import AssetCheckEvaluationPlanned, DagsterEvent, DagsterEventType from dagster._core.events.log import EventLogEntry from dagster._core.execution.api import create_execution_plan from dagster._core.execution.context.system import StepExecutionContext @@ -253,6 +254,27 @@ def run_step_from_ref( partitions_def_name=partitions_def.name, partition_keys=[step_context.partition_key] ) + # Note: this patches ASSET_CHECK_EVALUATION_PLANNED events into the mini event log present + # on external steps. This is necessary because we test that ASSET_CHECK_EVALUATION events + # have corresponding ASSET_CHECK_EVALUATION_PLANNED events. + for output in step_context.step.step_outputs: + asset_check_key = check.not_none(output.properties).asset_check_key + if asset_check_key: + event = DagsterEvent( + event_type_value=DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED.value, + job_name=step_context.job_name, + message=( + f"{step_context.job_name} intends to execute asset check {asset_check_key.name} on" + f" asset {asset_check_key.asset_key.to_string()}" + ), + event_specific_data=AssetCheckEvaluationPlanned( + asset_check_key.asset_key, + check_name=asset_check_key.name, + ), + step_key=step_context.step.key, + ) + instance.report_dagster_event(event, step_run_ref.run_id, logging.DEBUG) + # The step should be forced to run locally with respect to the remote process that this step # context is being deserialized in return dagster_event_sequence_for_step(step_context, force_local_execution=True) diff --git a/python_modules/dagster/dagster_tests/execution_tests/execution_plan_tests/test_external_step.py b/python_modules/dagster/dagster_tests/execution_tests/execution_plan_tests/test_external_step.py index 5a939383c4b90..dce7173d7e84d 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/execution_plan_tests/test_external_step.py +++ b/python_modules/dagster/dagster_tests/execution_tests/execution_plan_tests/test_external_step.py @@ -8,12 +8,14 @@ import pytest from dagster import ( + AssetCheckResult, AssetKey, AssetsDefinition, DynamicOut, DynamicOutput, Failure, Field, + Output, ResourceDefinition, RetryPolicy, RetryRequested, @@ -29,6 +31,8 @@ resource, with_resources, ) +from dagster._core.definitions.asset_check_spec import AssetCheckSpec +from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.cacheable_assets import ( AssetsDefinitionCacheableData, CacheableAssetsDefinition, @@ -69,7 +73,7 @@ def make_run_config(scratch_dir: str, resource_set: str): else: step_launcher_resource_keys = ["second_step_launcher"] return deep_merge_dicts( - RUN_CONFIG_BASE, + RUN_CONFIG_BASE if resource_set != "no_base" else {}, { "resources": merge_dicts( {"io_manager": {"config": {"base_dir": scratch_dir}}}, @@ -301,14 +305,42 @@ def sleepy_job(): return sleepy_job -def initialize_step_context(scratch_dir: str, instance: DagsterInstance) -> IStepContext: +def define_asset_check_job(): + @asset( + check_specs=[ + AssetCheckSpec( + asset="asset1", + name="check1", + ) + ], + resource_defs={ + "second_step_launcher": local_external_step_launcher, + "io_manager": fs_io_manager, + }, + ) + def asset1(): + yield Output(1) + yield AssetCheckResult(passed=True) + + return define_asset_job(name="asset_check_job", selection=[asset1]).resolve( + asset_graph=AssetGraph.from_assets([asset1]) + ) + + +def initialize_step_context( + scratch_dir: str, + instance: DagsterInstance, + job_def_fn=define_basic_job_external, + resource_set="external", + step_name="return_two", +) -> IStepContext: run = DagsterRun( job_name="foo_job", run_id=str(uuid.uuid4()), - run_config=make_run_config(scratch_dir, "external"), + run_config=make_run_config(scratch_dir, resource_set), ) - recon_job = reconstructable(define_basic_job_external) + recon_job = reconstructable(job_def_fn) plan = create_execution_plan(recon_job, run.run_config) @@ -325,7 +357,7 @@ def initialize_step_context(scratch_dir: str, instance: DagsterInstance) -> ISte job_context = initialization_manager.get_context() step_context = job_context.for_step( - plan.get_step_by_key("return_two"), # type: ignore + plan.get_step_by_key(step_name), # type: ignore KnownExecutionState(), ) return step_context @@ -363,6 +395,25 @@ def test_local_external_step_launcher(): assert DagsterEventType.STEP_FAILURE not in event_types +def test_asset_check_step_launcher(): + with tempfile.TemporaryDirectory() as tmpdir: + with DagsterInstance.ephemeral() as instance: + step_context = initialize_step_context( + tmpdir, + instance, + job_def_fn=define_asset_check_job, + resource_set="no_base", + step_name="asset1", + ) + + step_launcher = LocalExternalStepLauncher(tmpdir) + events = list(step_launcher.launch_step(step_context)) + event_types = [event.event_type for event in events] + assert DagsterEventType.STEP_START in event_types + assert DagsterEventType.STEP_SUCCESS in event_types + assert DagsterEventType.STEP_FAILURE not in event_types + + @pytest.mark.parametrize("resource_set", ["external", "internal_and_external"]) def test_job(resource_set): if resource_set == "external":