Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[external-assets] ObserveResult #17824

Merged
merged 1 commit into from
Feb 5, 2024

Conversation

smackesey
Copy link
Collaborator

@smackesey smackesey commented Nov 8, 2023

Summary & Motivation

Add ObserveResult counterpart to MaterializeResult and a common base class AssetResult. The base class was added because ObserveResult and MaterializeResult are currently exactly the same data structure, and much of the existing machinery used for auto-generating materializations to be used to generate observations. ObserveResult is converted to Output at the same point that MaterializeResult is converted to Output.

The initial plan here was to include in this PR a change to the source asset observation function implementation to use ObserveResult. However, this is not possible until we find a solution to the "partition-scoped metadata/data version" question (see https://github.com/dagster-io/internal/discussions/7529), because ObserveResult goes through Output (which does not currently support partition-scoped data versions), and observable source assets can return partition-scoped data versions.

Therefore this just adds ObserveResult and the capability to create an observable source asset equivalent from an external asset. There is some workaround logic for auto-converted source-assets that prevents auto-generation of AssetObservation.

How I Tested These Changes

New unit tests.

@smackesey
Copy link
Collaborator Author

smackesey commented Nov 8, 2023

Current dependencies on/for this PR:

This stack of pull requests is managed by Graphite.

@smackesey smackesey changed the base branch from master to sean/metadata-by-partition November 20, 2023 16:56
@smackesey smackesey force-pushed the sean/external-assets-observe-result branch from b48ebba to a7fb194 Compare November 20, 2023 16:56
@smackesey smackesey changed the base branch from sean/metadata-by-partition to master November 22, 2023 13:18
@smackesey smackesey force-pushed the sean/external-assets-observe-result branch 2 times, most recently from ad61e04 to 84ab166 Compare November 22, 2023 14:14
if source_asset.observe_fn is None
else {}
)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated, but this block was duplicated

corresponding AssetMaterialization event.
data_version (Optional[DataVersion]): The data version of the asset that was observed.
"""

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check_results and data_version were missing from MaterializeResult docstring

@smackesey smackesey force-pushed the sean/external-assets-observe-result branch from 84ab166 to 5503f7f Compare November 22, 2023 14:55
@smackesey smackesey marked this pull request as ready for review November 22, 2023 15:28
@smackesey smackesey requested a review from schrockn November 22, 2023 15:28
@schrockn
Copy link
Member

This PR really begs the question of what is the difference between an observation and a materialization? We have parallel objects in multiple layers (ObserveResult and MaterializeResult; AssetMaterialization and AssetObservation) and in fact you have a codepath which relies on them having the same __init__ arguments.

I wonder if there is at a minimum an internal factor here that can reconcile the codepaths better and also keep observations and materialization more in sync

Perhaps the right mental model is that a materialization contains an instance of an observation.

@smackesey
Copy link
Collaborator Author

This PR really begs the question of what is the difference between an observation and a materialization?

Yes-- I was thinking about this while writing the PR. Here is my mental model:

  • A Dagster deployment models some set of data entities ("assets").
  • Every asset modeled has an asset key as identifier. The set of assets modeled by a Dagster deployment is the union of all asset keys that have either: (a) a corresponding AssetsDefinition/SourceAsset; or (b) at least one event in the log.
  • There are "materializable assets" (i.e. standard SDAs) and "external assets". External assets here include both assets with a corresponding definition and those with no definition but at least one event in the log. The meaning of AssetMaterialization and AssetObservation events differs for the two kinds of assets.
  • For materializable assets:
    • The asset state specifies either one or zero values (zero if it has no AssetMaterialization on record) at any given time.
    • The most recent AssetMaterialization event represents the computation of the asset's current value.
    • AssetObservation events provide additional information about the value represented by the closest preceding materialization.
  • For external assets:
    • The asset is assumed to always correspond to a value, even if there is no AssetMaterialization on record.
    • AssetMaterializations represent computations of the asset's value, but there is no guarantee that all such computations have corresponding AssetMaterialization events. It is not necessary for there to be any AssetMaterialization event on record.
    • Consequently, we can't draw any relationship between AssetObservation and AssetMaterialization events like we can with materializable assets. All we can say about an AssetObservation is that it corresponds to the value at the time the observation was conducted-- but whether it corresponds to the current value is unknown.

So the ontology is fuzzy around external assets. If I were designing this de novo I would require all modeled assets to have an AssetMaterialization, and then the meaning of AssetObservation would be clear in that it always contains information about the value represented by the preceding materialization.

In the current state, I'm not sure.

I wonder if there is at a minimum an internal factor here that can reconcile the codepaths better and also keep observations and materialization more in sync

I thought about factoring out a common AssetExecutionResult class for MaterializeResult and ObserveResult, but that would seem like it should apply to AssetCheckResult as well-- but this doesn't have a data_version. So I thought it clearer to not do this.

Perhaps the right mental model is that a materialization contains an instance of an observation.

Seems reasonable but it is kind of a confusing interpretation to apply to existing event logs.

@smackesey smackesey force-pushed the sean/external-assets-observe-result branch from 5503f7f to 33d754c Compare January 29, 2024 22:39
Copy link

Deploy preview for dagster-university ready!

✅ Preview
https://dagster-university-2vymrotg3-elementl.vercel.app
https://sean-external-assets-observe-result.dagster-university.dagster-docs.io

Built with commit 33d754c.
This pull request is being automatically deployed with vercel-action

Copy link

Deploy preview for dagit-storybook ready!

✅ Preview
https://dagit-storybook-9gta3x4oc-elementl.vercel.app
https://sean-external-assets-observe-result.components-storybook.dagster-docs.io

Built with commit 33d754c.
This pull request is being automatically deployed with vercel-action

@smackesey smackesey force-pushed the sean/external-assets-observe-result branch from 33d754c to fc5f310 Compare February 1, 2024 17:21
@smackesey smackesey changed the base branch from master to sean/bump-ruff February 1, 2024 17:21
This was referenced Feb 1, 2024
@smackesey smackesey force-pushed the sean/external-assets-observe-result branch from fc5f310 to 375c613 Compare February 1, 2024 17:45
f" asset_key, options are: {context.per_invocation_properties.assets_def.keys}"
)


def _output_name_for_result_obj(
event: MaterializeResult,
event: Union[MaterializeResult, ObserveResult],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me think we should add a marker interface AssetResult that both MaterializeResult and ObserveResult implement for code paths like this

Copy link
Collaborator Author

@smackesey smackesey Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One reason I didn't do something like this is because there is also AssetCheckResult which would not be included so is kind of confusing, but I'm open to it-- do you think that's preferable to a common base class?

OK I see below comment on base class now

@@ -1,7 +1,7 @@
from typing import NamedTuple, Optional, Sequence

import dagster._check as check
from dagster._annotations import PublicAttr
from dagster._annotations import PublicAttr, experimental
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.data_version import DataVersion

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Re: lines 15 to 15]

looking at this let's add the base class. I think `MaterializeResult` and `ObserveResult` can just trivially inherit
class AssetResult(NamedTuple(...)):
   # all the stuff

class MaterializeResult(AssetResult):
  pass

class ObserveResult(AssetResult):
  pass
```<!--__GRAPHITE_HTML_TAG_START__--><span class='graphite__hidden'><br/><br/>See this comment inline on <a href="https://app.graphite.dev/github/pr/dagster-io/dagster/17824?utm_source=unchanged-line-comment">Graphite</a>.</span>

<!--__GRAPHITE_HTML_TAG_END__-->

@@ -168,10 +168,12 @@ def _filter_expected_output_defs(
result_tuple = (
(result,) if not isinstance(result, tuple) or is_named_tuple_instance(result) else result
)
materialize_results = [x for x in result_tuple if isinstance(x, MaterializeResult)]
materialize_or_observe_results = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah asset_results is better

# This is a temporary workaround to prevent duplicate observation events from external
# observable assets that were auto-converted from source assets. These assets yield
# observation events through the context in their body, and will continue to do so until we
# can convert them to using ObserveResult, which requires a solution to partition-scoped
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add linear tasks for these follow ups?

@smackesey smackesey force-pushed the sean/external-assets-observe-result branch 2 times, most recently from 542b6a7 to 80b7df4 Compare February 2, 2024 16:04
@smackesey smackesey changed the base branch from sean/bump-ruff to master February 2, 2024 16:04
@smackesey smackesey force-pushed the sean/external-assets-observe-result branch 3 times, most recently from dc76212 to 1625c87 Compare February 2, 2024 19:44
@smackesey smackesey requested a review from schrockn February 2, 2024 19:59
@erinkcochran87 erinkcochran87 removed their request for review February 2, 2024 20:24
@@ -359,7 +359,7 @@ def dagster_internal_init(
is_subset=is_subset,
)

def __call__(self, *args: object, **kwargs: object) -> object:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change in this PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it slipped in, removed

if execution_type == AssetExecutionType.MATERIALIZATION
else ()
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would strongly prefer to have this structured so that there is no elif to demonstrate to the code reader that under no circumstances will these blocks be silently skipped.. The check against UNEXECUTABLE in elif is unnecessary given the invariant at top of function.

If you still want to check against unexecutable down here, do so as an invariant.

else:
   check.invariant(execution_type != AssetExecutionType.UNEXECUTABLE)
   yield from (...)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, changed to else

Copy link
Member

@schrockn schrockn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok this looks good. Please heed final comment about the elif in core execution.

Also @sryza definitely want your signoff on this change.

Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add this to the api docs, right?


@experimental
class ObserveResult(AssetResult):
"""An object representing a successful observation of an asset. These can be returned from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments aren't accurate, right? (I know that in the future they may become accurate if we choose to go the execution_type route.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments are accurate, with the caveat that you need a special metadata key/value pair (setting execution type to OBSERVATION) for it to work. I added some clarification in the docstring.

@@ -330,7 +330,9 @@
make_values_resource as make_values_resource,
resource as resource,
)
from dagster._core.definitions.result import MaterializeResult as MaterializeResult
from dagster._core.definitions.result import (
MaterializeResult as MaterializeResult,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should ObserveResult be added here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided to leave it private for the immediate future

@smackesey smackesey force-pushed the sean/external-assets-observe-result branch from 1625c87 to 1670c1f Compare February 5, 2024 14:33
@smackesey
Copy link
Collaborator Author

We should add this to the api docs, right?

When we make it public

@smackesey smackesey merged commit 7072555 into master Feb 5, 2024
1 check passed
@smackesey smackesey deleted the sean/external-assets-observe-result branch February 5, 2024 15:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants