Skip to content

Commit

Permalink
Describe External Assets; rename create_unexecutable_observable_asset…
Browse files Browse the repository at this point in the history
…s_def to external_assets_from_specs (#16754)

## Summary & Motivation

This PR renames `create_unexecutable_observable_assets_def` to
`external_assets_from_specs`. I also want this PR to serve as the final
discussion on naming this feature and documents this capability. The
verbiage in the docblock:

```
Create an external assets definition from a sequence of asset specs.

An external asset is an asset that is not materialized by Dagster, but is tracked in the
asset graph and asset catalog.

A common use case for external assets is modeling data produced by an process not
under Dagster's control. For example a daily drop of a file from a third party in s3.

In most systems these are described as sources. This includes Dagster, which includes
:py:class:`SourceAsset`, which will be supplanted by external assets in the near-term
future, as external assets are a superset of the functionality
of Source Assets.

External assets can act as sources, but that is not their only use.

In particular, external assets have themselves have lineage-specified through the
``deps`` argument of :py:class:`AssetSpec`- and can depend on other external assets.
External assets are not allowed to depend on non-external assets.

The user can emit `AssetMaterialization`, `AssetObservation`, and `AssetCheckEvaluations`
events attached external assets.  And Dagster now has the ability to have "runless"
events to enable many use cases that were previously not possible.  Runless events
are events generated outside the context of a particular run (for example, in a
sensor or by an script), allowing for greater flexibility in event generation.
This can be done in a few ways:

Note to reviewers that this in an in-progress doc block and the below will have links and examples.

1) DagsterInstance exposes `report_runless_event` that can be used to generate events for
    external assets directly on an instance. See docs.
2) Sensors can build these events and return them using :py:class:`SensorResult`. A use
    case for this is using a sensor to continously monitor the metadata exhaust from
    an external system and inserting events that
    reflect that exhaust. See docs.
3) Dagster Cloud exposes a REST API for ingesting runless events. Users can copy and
    paste a curl command in the their external computations (such as Airflow operator)
    to register metadata associated with those computations See docs.
4) Dagster ops can generate these events directly and yield them or by calling
    ``log_event`` on :py:class:`OpExecutionContext`.  Use cases for this include
    querying metadata in an external system that is too expensive to do so in a sensor. Or
    for adapting pure op-based Dagster code to take advantage of asset-oriented lineage,
    observability, and data quality features, without having to port them wholesale
    to `@asset`- and `@multi_asset`-based code.

This feature set allows users to use Dagster as an observability, lineage, and
data quality tool for assets that are not materialized by Dagster. In addition to
traditional use cases like sources, this feature can model entire lineage graphs of
assets that are scheduled and materialized by other tools and workflow engines. This
allows users to use Dagster as a cross-cutting observability tool without migrating
their entire data platform to a single orchestration engine.

External assets do not have all the features of normal assets: they cannot be
materialized ad hoc by Dagster (this is diabled in the UI); cannot be backfilled; cannot
be scheduled using auto-materialize policies; and opt out of other features around
direct materialization, both now and in the future. External assets also provide fewer
guarantees around the correctness of information of their information in the asset
catalog. In other words, in exchange for the flexibility Dagster provides less guardrails
for external assets than assets that are materialized by Dagster, and there is an increased
chance that they will insert non-sensical information into the asset catalog, potentially
eroding trust.
```

Suggesting alternative lanuage in this docblock is the best way to talk
about an alternative name IMO.

## How I Tested These Changes
  • Loading branch information
schrockn authored Sep 27, 2023
1 parent 9a7ebe0 commit 56219ae
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48266,56 +48266,6 @@
"scalar_kind": null,
"type_param_keys": null
},
"Shape.006812248eeed5f01a7a8b7c8aa6d541d96b375a": {
"__class__": "ConfigTypeSnap",
"description": null,
"enum_values": null,
"fields": [
{
"__class__": "ConfigFieldSnap",
"default_provided": true,
"default_value_as_json_str": "{\"config\": {\"retries\": {\"enabled\": {}}}}",
"description": "Configure how steps are executed within a run.",
"is_required": false,
"name": "execution",
"type_key": "Shape.09d73f0755bf4752d3f121837669c8660dcf451e"
},
{
"__class__": "ConfigFieldSnap",
"default_provided": true,
"default_value_as_json_str": "{}",
"description": "Configure how loggers emit messages within a run.",
"is_required": false,
"name": "loggers",
"type_key": "Shape.e895d95ee6d0eff1b884c76f44a2ab7089f0c49b"
},
{
"__class__": "ConfigFieldSnap",
"default_provided": true,
"default_value_as_json_str": "{\"an_asset\": {\"config\": {}}, \"executable_asset\": {}}",
"description": "Configure runtime parameters for ops or assets.",
"is_required": false,
"name": "ops",
"type_key": "Shape.b8d5bc3385ac93557bbf436b3886579c7e541df6"
},
{
"__class__": "ConfigFieldSnap",
"default_provided": true,
"default_value_as_json_str": "{\"io_manager\": {}}",
"description": "Configure how shared resources are implemented within a run.",
"is_required": false,
"name": "resources",
"type_key": "Shape.1578133c1c71e8e3c9cf3ad46c216eb51b48c778"
}
],
"given_name": null,
"key": "Shape.006812248eeed5f01a7a8b7c8aa6d541d96b375a",
"kind": {
"__enum__": "ConfigTypeKind.STRICT_SHAPE"
},
"scalar_kind": null,
"type_param_keys": null
},
"Shape.081354663b9d4b8fbfd1cb8e358763912953913f": {
"__class__": "ConfigTypeSnap",
"description": null,
Expand Down Expand Up @@ -48518,7 +48468,7 @@
"scalar_kind": null,
"type_param_keys": null
},
"Shape.b8d5bc3385ac93557bbf436b3886579c7e541df6": {
"Shape.b31f9017806ff4b8385cd662d325a47a819a2815": {
"__class__": "ConfigTypeSnap",
"description": null,
"enum_values": null,
Expand All @@ -48529,7 +48479,7 @@
"default_value_as_json_str": "{\"config\": {}}",
"description": null,
"is_required": false,
"name": "an_asset",
"name": "_external_assets_def",
"type_key": "Shape.62edccaf30696e25335ae92685bdc41e204e30e6"
},
{
Expand All @@ -48543,7 +48493,57 @@
}
],
"given_name": null,
"key": "Shape.b8d5bc3385ac93557bbf436b3886579c7e541df6",
"key": "Shape.b31f9017806ff4b8385cd662d325a47a819a2815",
"kind": {
"__enum__": "ConfigTypeKind.STRICT_SHAPE"
},
"scalar_kind": null,
"type_param_keys": null
},
"Shape.b7f072a80a1fdee0bfc5e256ea343cd7e3a8818b": {
"__class__": "ConfigTypeSnap",
"description": null,
"enum_values": null,
"fields": [
{
"__class__": "ConfigFieldSnap",
"default_provided": true,
"default_value_as_json_str": "{\"config\": {\"retries\": {\"enabled\": {}}}}",
"description": "Configure how steps are executed within a run.",
"is_required": false,
"name": "execution",
"type_key": "Shape.09d73f0755bf4752d3f121837669c8660dcf451e"
},
{
"__class__": "ConfigFieldSnap",
"default_provided": true,
"default_value_as_json_str": "{}",
"description": "Configure how loggers emit messages within a run.",
"is_required": false,
"name": "loggers",
"type_key": "Shape.e895d95ee6d0eff1b884c76f44a2ab7089f0c49b"
},
{
"__class__": "ConfigFieldSnap",
"default_provided": true,
"default_value_as_json_str": "{\"_external_assets_def\": {\"config\": {}}, \"executable_asset\": {}}",
"description": "Configure runtime parameters for ops or assets.",
"is_required": false,
"name": "ops",
"type_key": "Shape.b31f9017806ff4b8385cd662d325a47a819a2815"
},
{
"__class__": "ConfigFieldSnap",
"default_provided": true,
"default_value_as_json_str": "{\"io_manager\": {}}",
"description": "Configure how shared resources are implemented within a run.",
"is_required": false,
"name": "resources",
"type_key": "Shape.1578133c1c71e8e3c9cf3ad46c216eb51b48c778"
}
],
"given_name": null,
"key": "Shape.b7f072a80a1fdee0bfc5e256ea343cd7e3a8818b",
"kind": {
"__enum__": "ConfigTypeKind.STRICT_SHAPE"
},
Expand Down Expand Up @@ -48699,8 +48699,8 @@
"__class__": "SolidInvocationSnap",
"input_dep_snaps": [],
"is_dynamic_mapped": false,
"solid_def_name": "an_asset",
"solid_name": "an_asset",
"solid_def_name": "_external_assets_def",
"solid_name": "_external_assets_def",
"tags": {}
},
{
Expand Down Expand Up @@ -48753,7 +48753,7 @@
"name": "io_manager"
}
],
"root_config_key": "Shape.006812248eeed5f01a7a8b7c8aa6d541d96b375a"
"root_config_key": "Shape.b7f072a80a1fdee0bfc5e256ea343cd7e3a8818b"
}
],
"name": "executable_test_job",
Expand All @@ -48774,7 +48774,7 @@
},
"description": null,
"input_def_snaps": [],
"name": "an_asset",
"name": "_external_assets_def",
"output_def_snaps": [
{
"__class__": "OutputDefSnap",
Expand Down Expand Up @@ -48822,7 +48822,7 @@
'''
# ---
# name: test_all_snapshot_ids[47]
'43ee457f7b442944a111b7d729f827ab9618a4ae'
'7f7ef891f97ea4a6a6c2222533b79da418717189'
# ---
# name: test_all_snapshot_ids[48]
'''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@
dict({
'repositoryOrError': dict({
'usedSolids': list([
dict({
'__typename': 'UsedSolid',
'definition': dict({
'name': '_external_assets_def',
}),
'invocations': list([
dict({
'pipeline': dict({
'name': 'executable_test_job',
}),
'solidHandle': dict({
'handleID': '_external_assets_def',
}),
}),
]),
}),
dict({
'__typename': 'UsedSolid',
'definition': dict({
Expand Down Expand Up @@ -130,22 +146,6 @@
}),
]),
}),
dict({
'__typename': 'UsedSolid',
'definition': dict({
'name': 'an_asset',
}),
'invocations': list([
dict({
'pipeline': dict({
'name': 'executable_test_job',
}),
'solidHandle': dict({
'handleID': 'an_asset',
}),
}),
]),
}),
dict({
'__typename': 'UsedSolid',
'definition': dict({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.events import Failure
from dagster._core.definitions.executor_definition import in_process_executor
from dagster._core.definitions.external_asset import external_assets_from_specs
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.metadata import MetadataValue
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition
from dagster._core.definitions.observable_asset import create_unexecutable_observable_assets_def
from dagster._core.definitions.partition import PartitionedConfig
from dagster._core.definitions.reconstruct import ReconstructableRepository
from dagster._core.definitions.sensor_definition import RunRequest, SkipReason
Expand Down Expand Up @@ -1383,7 +1383,7 @@ def executable_asset() -> None:
pass


unexecutable_asset = create_unexecutable_observable_assets_def([AssetSpec("unexecutable_asset")])
unexecutable_asset = next(iter(external_assets_from_specs([AssetSpec("unexecutable_asset")])))

executable_test_job = build_assets_job(
name="executable_test_job", assets=[executable_asset, unexecutable_asset]
Expand Down
158 changes: 158 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/external_asset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from typing import List, Sequence

from dagster import _check as check
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
AssetExecutionType,
AssetSpec,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.context.compute import AssetExecutionContext


def external_assets_from_specs(specs: Sequence[AssetSpec]) -> List[AssetsDefinition]:
"""Create an external assets definition from a sequence of asset specs.
An external asset is an asset that is not materialized by Dagster, but is tracked in the
asset graph and asset catalog.
A common use case for external assets is modeling data produced by an process not
under Dagster's control. For example a daily drop of a file from a third party in s3.
In most systems these are described as sources. This includes Dagster, which includes
:py:class:`SourceAsset`, which will be supplanted by external assets in the near-term
future, as external assets are a superset of the functionality
of Source Assets.
External assets can act as sources, but that is not their only use.
In particular, external assets have themselves have lineage-specified through the
``deps`` argument of :py:class:`AssetSpec`- and can depend on other external assets.
External assets are not allowed to depend on non-external assets.
The user can emit `AssetMaterialization`, `AssetObservation`, and `AssetCheckEvaluations`
events attached external assets. And Dagster now has the ability to have "runless"
events to enable many use cases that were previously not possible. Runless events
are events generated outside the context of a particular run (for example, in a
sensor or by an script), allowing for greater flexibility in event generation.
This can be done in a few ways:
Note to reviewers that this in an in-progress doc block and the below will have links and examples.
1) DagsterInstance exposes `report_runless_event` that can be used to generate events for
external assets directly on an instance. See docs.
2) Sensors can build these events and return them using :py:class:`SensorResult`. A use
case for this is using a sensor to continously monitor the metadata exhaust from
an external system and inserting events that
reflect that exhaust. See docs.
3) Dagster Cloud exposes a REST API for ingesting runless events. Users can copy and
paste a curl command in the their external computations (such as Airflow operator)
to register metadata associated with those computations See docs.
4) Dagster ops can generate these events directly and yield them or by calling
``log_event`` on :py:class:`OpExecutionContext`. Use cases for this include
querying metadata in an external system that is too expensive to do so in a sensor. Or
for adapting pure op-based Dagster code to take advantage of asset-oriented lineage,
observability, and data quality features, without having to port them wholesale
to `@asset`- and `@multi_asset`-based code.
This feature set allows users to use Dagster as an observability, lineage, and
data quality tool for assets that are not materialized by Dagster. In addition to
traditional use cases like sources, this feature can model entire lineage graphs of
assets that are scheduled and materialized by other tools and workflow engines. This
allows users to use Dagster as a cross-cutting observability tool without migrating
their entire data platform to a single orchestration engine.
External assets do not have all the features of normal assets: they cannot be
materialized ad hoc by Dagster (this is diabled in the UI); cannot be backfilled; cannot
be scheduled using auto-materialize policies; and opt out of other features around
direct materialization, both now and in the future. External assets also provide fewer
guarantees around the correctness of information of their information in the asset
catalog. In other words, in exchange for the flexibility Dagster provides less guardrails
for external assets than assets that are materialized by Dagster, and there is an increased
chance that they will insert non-sensical information into the asset catalog, potentially
eroding trust.
Args:
specs (Sequence[AssetSpec]): The specs for the assets.
"""
assets_defs = []
for spec in specs:
check.invariant(
spec.auto_materialize_policy is None,
"auto_materialize_policy must be None since it is ignored",
)
check.invariant(spec.code_version is None, "code_version must be None since it is ignored")
check.invariant(
spec.freshness_policy is None, "freshness_policy must be None since it is ignored"
)
check.invariant(
spec.skippable is False,
"skippable must be False since it is ignored and False is the default",
)

@multi_asset(
specs=[
AssetSpec(
key=spec.key,
description=spec.description,
group_name=spec.group_name,
metadata={
**(spec.metadata or {}),
**{
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: (
AssetExecutionType.UNEXECUTABLE.value
)
},
},
deps=spec.deps,
)
]
)
def _external_assets_def(context: AssetExecutionContext) -> None:
raise DagsterInvariantViolationError(
"You have attempted to execute an unexecutable asset"
f" {context.asset_key.to_user_string}."
)

assets_defs.append(_external_assets_def)

return assets_defs


def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition:
check.invariant(
source_asset.observe_fn is None,
"Observable source assets not supported yet: observe_fn should be None",
)
check.invariant(
source_asset.auto_observe_interval_minutes is None,
"Observable source assets not supported yet: auto_observe_interval_minutes should be None",
)

kwargs = {
"key": source_asset.key,
"metadata": {
**source_asset.metadata,
**{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value},
},
"group_name": source_asset.group_name,
"description": source_asset.description,
"partitions_def": source_asset.partitions_def,
}

if source_asset.io_manager_def:
kwargs["io_manager_def"] = source_asset.io_manager_def
elif source_asset.io_manager_key:
kwargs["io_manager_key"] = source_asset.io_manager_key

@asset(**kwargs)
def _external_assets_def() -> None:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

check.invariant(isinstance(_external_assets_def, AssetsDefinition))
assert isinstance(_external_assets_def, AssetsDefinition) # appese pyright

return _external_assets_def
Loading

0 comments on commit 56219ae

Please sign in to comment.