Skip to content

Commit

Permalink
Materialize assets without running checks
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Sep 12, 2023
1 parent 1e045bf commit cef7310
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def pipeline_selector_from_graphql(data: Mapping[str, Any]) -> JobSubsetSelector
AssetCheckHandle.from_graphql_input(asset_check)
for asset_check in asset_check_selection
]
if asset_check_selection
if asset_check_selection is not None
else None
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,3 +759,43 @@ def test_launch_subset_asset_and_included_check(self, graphql_context: Workspace
and log.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION
]
assert len(materializations) == 1

def test_launch_subset_asset_no_check(self, graphql_context: WorkspaceRequestContext):
selector = infer_job_or_pipeline_selector(
graphql_context,
"asset_check_job",
asset_selection=[{"path": ["asset_1"]}],
asset_check_selection=[],
)
result = execute_dagster_graphql(
graphql_context,
LAUNCH_PIPELINE_EXECUTION_MUTATION,
variables={
"executionParams": {
"selector": selector,
"mode": "default",
"stepKeys": None,
}
},
)
print(result.data) # ruff: noqa: T201
assert result.data["launchPipelineExecution"]["__typename"] == "LaunchRunSuccess"

run_id = result.data["launchPipelineExecution"]["run"]["runId"]
run = poll_for_finished_run(graphql_context.instance, run_id)

logs = graphql_context.instance.all_logs(run_id)
print(logs) # ruff: noqa: T201
assert run.is_success

for log in logs:
if log.dagster_event:
assert log.dagster_event.event_type != DagsterEventType.ASSET_CHECK_EVALUATION.value

materializations = [
log
for log in logs
if log.dagster_event
and log.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION
]
assert len(materializations) == 1
4 changes: 3 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ def __new__(
asset_check_selection: Optional[Iterable[AssetCheckHandle]] = None,
):
asset_selection = set(asset_selection) if asset_selection else None
asset_check_selection = set(asset_check_selection) if asset_check_selection else None
asset_check_selection = (
set(asset_check_selection) if asset_check_selection is not None else None
)
return super(JobSubsetSelector, cls).__new__(
cls,
location_name=check.str_param(location_name, "location_name"),
Expand Down

0 comments on commit cef7310

Please sign in to comment.