Skip to content

Commit

Permalink
[single-implicit-asset-job] allow different partitions_defs on `build…
Browse files Browse the repository at this point in the history
…_asset_job` (#23290)

## Summary & Motivation

`build_asset_job` builds a `JobDefinition` from a set of assets.
Previously, it would error if the assets targeted by the job had
different `PartitionsDefinition`s. This change adds a flag that
basically suppresses this error.

In downstream PR #23293, which
makes a single implicit asset job, this flag is used by the code paths
that build implicit asset jobs. Farther into the future, when we're
ready to allow users to make jobs with assets with different
`PartitionsDefinition`s, we'll remove this flag entirely and never raise
this error.


## How I Tested These Changes
  • Loading branch information
sryza authored Aug 1, 2024
1 parent 2ec20fb commit d44c3c2
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 14 deletions.
40 changes: 27 additions & 13 deletions python_modules/dagster/dagster/_core/definitions/asset_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def build_asset_job_lambda() -> JobDefinition:
resource_defs=resource_defs,
executor_def=executor_def,
partitions_def=partitions_def,
allow_different_partitions_defs=False,
)
job_def.validate_resource_requirements_satisfied()

Expand All @@ -110,6 +111,7 @@ def build_asset_job_lambda() -> JobDefinition:
asset_graph=asset_graph,
executor_def=executor_def,
resource_defs=resource_defs,
allow_different_partitions_defs=False,
)
job_def.validate_resource_requirements_satisfied()
if logger_defs and not job_def.has_specified_loggers:
Expand Down Expand Up @@ -149,6 +151,7 @@ def get_base_asset_jobs(
def build_asset_job(
name: str,
asset_graph: AssetGraph,
allow_different_partitions_defs: bool,
resource_defs: Optional[Mapping[str, object]] = None,
description: Optional[str] = None,
config: Optional[
Expand Down Expand Up @@ -202,7 +205,10 @@ def asset2(asset1):
resource_defs = merge_dicts({DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs)
wrapped_resource_defs = wrap_resources_for_execution(resource_defs)
partitions_def = _infer_and_validate_common_partitions_def(
asset_graph, asset_graph.executable_asset_keys, partitions_def
asset_graph,
asset_graph.executable_asset_keys,
required_partitions_def=partitions_def,
allow_different_partitions_defs=allow_different_partitions_defs,
)

deps, assets_defs_by_node_handle = build_node_deps(asset_graph)
Expand Down Expand Up @@ -271,7 +277,9 @@ def asset2(asset1):


def get_asset_graph_for_job(
parent_asset_graph: AssetGraph, selection: AssetSelection
parent_asset_graph: AssetGraph,
selection: AssetSelection,
allow_different_partitions_defs: bool = False,
) -> AssetGraph:
"""Subset an AssetGraph to create an AssetGraph representing an asset job.
Expand All @@ -295,7 +303,11 @@ def get_asset_graph_for_job(
f" Invalid selected keys: {invalid_keys}",
)

_infer_and_validate_common_partitions_def(parent_asset_graph, selected_keys)
_infer_and_validate_common_partitions_def(
parent_asset_graph,
selected_keys,
allow_different_partitions_defs=allow_different_partitions_defs,
)

selected_check_keys = selection.resolve_checks(parent_asset_graph)

Expand Down Expand Up @@ -404,6 +416,7 @@ def _subset_assets_defs(
def _infer_and_validate_common_partitions_def(
asset_graph: AssetGraph,
asset_keys: Iterable[AssetKey],
allow_different_partitions_defs: bool,
required_partitions_def: Optional[PartitionsDefinition] = None,
) -> Optional[PartitionsDefinition]:
keys_by_partitions_def = defaultdict(set)
Expand All @@ -418,18 +431,19 @@ def _infer_and_validate_common_partitions_def(
)
keys_by_partitions_def[partitions_def].add(key)

if len(keys_by_partitions_def) > 1:
keys_by_partitions_def_str = "\n".join(
f"{partitions_def}: {asset_keys}"
for partitions_def, asset_keys in keys_by_partitions_def.items()
)
raise DagsterInvalidDefinitionError(
f"Selected assets must have the same partitions definitions, but the"
f" selected assets have different partitions definitions: \n{keys_by_partitions_def_str}"
)
elif len(keys_by_partitions_def) == 1:
if len(keys_by_partitions_def) == 1:
return next(iter(keys_by_partitions_def.keys()))
else:
if len(keys_by_partitions_def) > 1 and not allow_different_partitions_defs:
keys_by_partitions_def_str = "\n".join(
f"{partitions_def}: {asset_keys}"
for partitions_def, asset_keys in keys_by_partitions_def.items()
)
raise DagsterInvalidDefinitionError(
f"Selected assets must have the same partitions definitions, but the"
f" selected assets have different partitions definitions: \n{keys_by_partitions_def_str}"
)

return None


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ def __init__(
self._config_mapping = config
elif isinstance(config, PartitionedConfig):
self._partitioned_config = config
if asset_layer:
for asset_key in asset_layer.asset_keys_by_node_output_handle.values():
asset_partitions_def = asset_layer.get(asset_key).partitions_def
check.invariant(
asset_partitions_def is None
or asset_partitions_def == config.partitions_def,
"Can't supply a PartitionedConfig for 'config' with a different PartitionsDefinition"
f" than supplied for a target asset 'partitions_def'. Asset: {asset_key.to_user_string()}",
)

elif isinstance(config, dict):
self._run_config = config
# Using config mapping here is a trick to make it so that the preset will be used even
Expand Down Expand Up @@ -806,7 +816,9 @@ def _get_job_def_for_asset_selection(
*selection_data.asset_check_selection
)

job_asset_graph = get_asset_graph_for_job(self.asset_layer.asset_graph, selection)
job_asset_graph = get_asset_graph_for_job(
self.asset_layer.asset_graph, selection, allow_different_partitions_defs=True
)

return build_asset_job(
name=self.name,
Expand All @@ -817,6 +829,7 @@ def _get_job_def_for_asset_selection(
tags=self.tags,
config=self.config_mapping or self.partitioned_config,
_asset_selection_data=selection_data,
allow_different_partitions_defs=True,
)

def _get_job_def_for_op_selection(self, op_selection: Iterable[str]) -> "JobDefinition":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def resolve(
hooks=self.hooks,
op_retry_policy=self.op_retry_policy,
resource_defs=resource_defs,
allow_different_partitions_defs=False,
)


Expand Down

0 comments on commit d44c3c2

Please sign in to comment.