-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a new AssetExecutionType (Observable) and use it to omit materializations for observable source asset wrapping #16621
Add a new AssetExecutionType (Observable) and use it to omit materializations for observable source asset wrapping #16621
Conversation
061cda4
to
d01b7bc
Compare
c57f687
to
78ff1b8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine - think we need these metadata bits to be able to set the right bools on external asset node
asset_layer = step_context.job_def.asset_layer | ||
execution_type = ( | ||
asset_layer.assets_def_for_asset(asset_key).asset_execution_type_for_asset(asset_key) | ||
if asset_layer.has_assets_def_for_asset(asset_key) | ||
else AssetExecutionType.MATERIALIZATION | ||
) | ||
|
||
check.invariant( | ||
execution_type != AssetExecutionType.UNEXECUTABLE, | ||
"There should never be unexecutable assets here", | ||
) | ||
if execution_type == AssetExecutionType.MATERIALIZATION: | ||
for materialization in _get_output_asset_materializations( | ||
asset_key, | ||
partitions, | ||
output, | ||
output_def, | ||
manager_metadata, | ||
step_context, | ||
): | ||
yield DagsterEvent.asset_materialization(step_context, materialization) | ||
elif execution_type == AssetExecutionType.OBSERVATION: | ||
# if this is an observation execution type, yield no materializations | ||
pass | ||
else: | ||
check.failed(f"Unexpected asset execution type {execution_type}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm not sure if theres a way to make this feel cleaner, take another second to stare at it and see if you can come up with something
7ef9fad
to
a790171
Compare
78ff1b8
to
02b6c10
Compare
yield from ( | ||
( | ||
DagsterEvent.asset_materialization(step_context, materialization) | ||
for materialization in _get_output_asset_materializations( | ||
asset_key, | ||
partitions, | ||
output, | ||
output_def, | ||
manager_metadata, | ||
step_context, | ||
) | ||
) | ||
if execution_type == AssetExecutionType.MATERIALIZATION | ||
else () | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alangenfeld I think this is nicer put arguably a little overly clever as it's a little obscured that it is making a lazily computed geneator 🤷
02b6c10
to
9bb377c
Compare
9bb377c
to
b03b357
Compare
a790171
to
b5aeae1
Compare
b03b357
to
ee9c797
Compare
b5aeae1
to
73e8bf2
Compare
b7d2e1b
to
e66d383
Compare
9798d5d
to
1ecc61e
Compare
e66d383
to
d8a6069
Compare
@alangenfeld just confirming that I am planning on landing this after #16620 goes in |
## Summary & Motivation This makes an observable source asset wrappable in an AssetsDefinition. This is an imperfect intermediate state, as as spurious asset materialization is produced as a result of execution (verified in test case but fixed in #16621) but this verifies that the core execution machinery works. Makes the following code possible: ```python @observable_source_asset def an_observable_source_asset() -> DataVersion: return DataVersion("foo") defs = Definitions(assets=[ create_external_asset_from_source_asset(an_observable_source_asset) ]) ``` This `@observable_source_asset` now plugs into the rest of our infrastructure: 1) You can execute it from `dagster-webserver` using the "Materialize" button, which will be renamed to "Execute". 2) You can schedule this from schedules and sensors. AMP support has to wait because auto observation is hard-coded in the AMP logic. ## How I Tested These Changes BK
d8a6069
to
3211da4
Compare
…izations for observable source asset wrapping more bulletproof codepath defend against non-existent asset_defs better better still f-string
3211da4
to
c429cc0
Compare
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit c429cc0. |
Summary & Motivation
This adds a new execution type (Observation) which does not automatically produce a materialization on execution. This allows us to drive observation from a common execution pathway, rather than a completely parallel system.
How I Tested These Changes
BK