-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
handle "unexecutable" assets in job building and cross process representation #16637
base: master
Are you sure you want to change the base?
handle "unexecutable" assets in job building and cross process representation #16637
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
f25fe58
to
47435a4
Compare
tue night progress:
|
ce171b1
to
8ef4522
Compare
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit 71b697f. |
8ef4522
to
28340ae
Compare
Deploy preview for dagit-storybook ready! ✅ Preview Built with commit 71b697f. |
28340ae
to
954a961
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This mostly makes sense. I think this introduces a bunch of nomenclature issues that we can address for now with super verbose but clear names. See comments inline.
len(self._external_asset_node.job_names) >= 1, | ||
"Asset must be part of at least one job", | ||
len(self._external_asset_node.job_names) >= 1 | ||
or not self._external_asset_node.is_executable, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So IMO we should not have helper bools here and instead test against execution type directly. My argument here is that this is on its face confusing as a source assets is also not executable. A code reader with less context might find this very confusing.
I think or self._external_asset_node.execution_type == AssetExecutionType::UNEXECUTABLE
is more explicit and clear than not self._external_asset_node.is_executable
in this way
@@ -581,7 +582,7 @@ def resolve_assetObservations( | |||
] | |||
|
|||
def resolve_configField(self, _graphene_info: ResolveInfo) -> Optional[GrapheneConfigTypeField]: | |||
if self.is_source_asset(): | |||
if self.is_source_or_unexecutable_asset(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar spirit I think is_source_or_defined_in_unexecutable_assets_def
is more clear is_source_or_unexecutable_asset
. I think in weird dual state/transitional states in code bases being super clear is worth it and the expense of seemingly super verbose internal code paths.
def is_unexecutable_source(self): | ||
"""Returns True if this definition represents unexecutable source assets. | ||
Assumption: either all or none contained assets are unexecutable source assets. | ||
""" | ||
for key in self.keys: | ||
if not self.is_unexecutable_source_asset(key): | ||
return False | ||
return True | ||
|
||
def is_unexecutable_source_asset(self, asset_key: AssetKey) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am now confused as the precise difference between is_unexecutable_source
and is_unexecutable_source_asset
**{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}, | ||
**{ | ||
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: ( | ||
AssetExecutionType.UNEXECUTABLE_SOURCE.value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm worth talking about what this represents. Given this repurposing the original notion of AssetVarietal
or AssetType
might make more sense,
@@ -193,7 +193,7 @@ def build_caching_repository_data_from_list( | |||
|
|||
if assets_defs or source_assets or asset_checks_defs: | |||
for job_def in get_base_asset_jobs( | |||
assets=assets_defs, | |||
assets_defs=assets_defs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
@@ -1341,6 +1343,11 @@ def external_repository_data_from_def( | |||
asset_graph = external_asset_graph_from_defs( | |||
jobs, | |||
source_assets_by_key=repository_def.source_assets_by_key, | |||
unexecutable_assets={ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/ unexecutable_assets/unexecutable_assets_defs/
I think using "assets_defs" very important in code paths that co-exist with SourceAsset
instances
b426c70
to
b1222d0
Compare
b1222d0
to
aa652da
Compare
# build nodes for remaining asset keys that were referred in other assets but not defined | ||
asset_keys_without_definitions = ( | ||
all_upstream_asset_keys.difference(node_defs_by_asset_key.keys()) | ||
.difference(source_assets_by_key.keys()) | ||
.difference(unexecutable_keys) | ||
) | ||
for asset_key in asset_keys_without_definitions: | ||
asset_nodes.append( | ||
ExternalAssetNode( | ||
asset_key=asset_key, | ||
dependencies=list(deps[asset_key].values()), | ||
depended_by=list(dep_by[asset_key].values()), | ||
job_names=[], | ||
group_name=group_name_by_asset_key.get(asset_key), | ||
code_version=code_version_by_asset_key.get(asset_key), | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this part of the building (moved from above) dates back to the addition of ExternalAssetNode
https://github.com/dagster-io/dagster/pull/4742/files
not sure what conditions it fires with the source asset resolution happening above
aa652da
to
372453a
Compare
cf7b1b5
to
73878a1
Compare
73878a1
to
71b697f
Compare
Deploy preview for dagster-university ready! ✅ Preview Built with commit 71b697f. |
71b697f
to
6e332f0
Compare
6e332f0
to
1edbf01
Compare
building forward from
#16575
#16617
This propagates understanding of "unexecutable" assets in to the job building and the inter process representation.
SourceAssets
when we build jobs, allowing them to replaceSourceAsset
at the definitions levelHow I Tested These Changes
pytest python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py
in via
dagster-webserver -f
and usinginstance.report_runless_asset_event
to update materializations for associated assets . Verify asset graph renders as the nodes and their most recent materializations