diff --git a/docs/content/concepts/partitions-schedules-sensors/backfills.mdx b/docs/content/concepts/partitions-schedules-sensors/backfills.mdx
index ca2fe6346ce28..4c62003af4d52 100644
--- a/docs/content/concepts/partitions-schedules-sensors/backfills.mdx
+++ b/docs/content/concepts/partitions-schedules-sensors/backfills.mdx
@@ -42,9 +42,9 @@ By default, if you launch a backfill that covers `N` partitions, Dagster will la
Dagster supports backfills that execute as a single run that covers a range of partitions, such as executing a backfill as a single Snowflake query. After the run completes, Dagster will track that all the partitions have been filled.
- Single-run backfills only work for backfills that target assets directly, i.e.
- those launched from the asset graph or asset page. Backfills launched from the
- Job page will not respect the backfill policies of assets included in the job.
+ Single-run backfills only work if they are launched from the asset graph or
+ asset page, or if the assets are part of an asset job that shares the same
+ backfill policy across all included assets.
To get this behavior, you need to:
diff --git a/docs/content/concepts/webserver/graphql.mdx b/docs/content/concepts/webserver/graphql.mdx
index a5f765e33275a..094aa61bbc502 100644
--- a/docs/content/concepts/webserver/graphql.mdx
+++ b/docs/content/concepts/webserver/graphql.mdx
@@ -28,7 +28,7 @@ The GraphQL API is served from the webserver. To start the server, run the follo
dagster dev
```
-The webserver serves the GraphQL endpoint at the `/graphql` endpoint. If you are running the webserver locally on port 3000, you can access the API at .
+The webserver serves the GraphQL endpoint at the `/graphql` endpoint. If you are running the webserver locally on port 3000, you can access the API at .
### Using the GraphQL playground
diff --git a/docs/docs-beta/CONTRIBUTING.md b/docs/docs-beta/CONTRIBUTING.md
index 3d446d91f5e60..e855a1e52f8d6 100644
--- a/docs/docs-beta/CONTRIBUTING.md
+++ b/docs/docs-beta/CONTRIBUTING.md
@@ -6,6 +6,12 @@ There are some features in the previous docs that require changes to be made to
### Images
+#### Location
+
+Old images are in the [/docs/next/public/images](https://github.com/dagster-io/dagster/tree/master/docs/next/public/images) directory. You will need to copy them to [/docs/docs-beta/static/images](https://github.com/dagster-io/dagster/tree/master/docs/docs-beta/static/images).
+
+#### Formatting
+
Before:
```
diff --git a/python_modules/dagster/dagster/_core/definitions/asset_out.py b/python_modules/dagster/dagster/_core/definitions/asset_out.py
index 8d81cf665bcba..f563344e073eb 100644
--- a/python_modules/dagster/dagster/_core/definitions/asset_out.py
+++ b/python_modules/dagster/dagster/_core/definitions/asset_out.py
@@ -228,7 +228,7 @@ def to_spec(
key=key,
tags={**additional_tags, **self.tags} if self.tags else additional_tags,
deps=[*self._spec.deps, *deps],
- partitions_def=partitions_def,
+ partitions_def=partitions_def if partitions_def is not None else ...,
)
@public
diff --git a/python_modules/dagster/dagster/_core/execution/context/asset_execution_context.py b/python_modules/dagster/dagster/_core/execution/context/asset_execution_context.py
index e86999fd017b4..e74f6fef46775 100644
--- a/python_modules/dagster/dagster/_core/execution/context/asset_execution_context.py
+++ b/python_modules/dagster/dagster/_core/execution/context/asset_execution_context.py
@@ -6,7 +6,7 @@
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.data_version import DataProvenance, DataVersion
from dagster._core.definitions.dependency import Node, NodeHandle
-from dagster._core.definitions.events import AssetKey, UserEvent
+from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey, UserEvent
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.op_definition import OpDefinition
from dagster._core.definitions.partition import PartitionsDefinition
@@ -459,15 +459,91 @@ def add_output_metadata(
mapping_key: Optional[str] = None,
) -> None:
return self.op_execution_context.add_output_metadata(
- metadata=metadata, output_name=output_name, mapping_key=mapping_key
+ metadata=metadata,
+ output_name=output_name,
+ mapping_key=mapping_key,
+ )
+
+ @public
+ def add_asset_metadata(
+ self,
+ metadata: Mapping[str, Any],
+ asset_key: Optional[CoercibleToAssetKey] = None,
+ partition_key: Optional[str] = None,
+ ) -> None:
+ """Add metadata to an asset materialization event. This metadata will be
+ available in the Dagster UI.
+
+ Args:
+ metadata (Mapping[str, Any]): The metadata to add to the asset
+ materialization event.
+ asset_key (Optional[CoercibleToAssetKey]): The asset key to add metadata to.
+ Does not need to be provided if only one asset is currently being
+ materialized.
+ partition_key (Optional[str]): The partition key to add metadata to, if
+ applicable. Should not be provided on non-partitioned assets. If not
+ provided on a partitioned asset, the metadata will be added to all
+ partitions of the asset currently being materialized.
+
+ Examples:
+ Adding metadata to the asset materialization event for a single asset:
+
+ .. code-block:: python
+
+ import dagster as dg
+
+ @dg.asset
+ def my_asset(context):
+ # Add metadata
+ context.add_asset_metadata({"key": "value"})
+
+ Adding metadata to the asset materialization event for a particular partition of a partitioned asset:
+
+ .. code-block:: python
+
+ import dagster as dg
+
+ @dg.asset(partitions_def=dg.StaticPartitionsDefinition(["a", "b"]))
+ def my_asset(context):
+ # Adds metadata to all partitions currently being materialized, since no
+ # partition is specified.
+ context.add_asset_metadata({"key": "value"})
+
+ for partition_key in context.partition_keys:
+ # Add metadata only to the event for partition "a"
+ if partition_key == "a":
+ context.add_asset_metadata({"key": "value"}, partition_key=partition_key)
+
+ Adding metadata to the asset materialization event for a particular asset in a multi-asset.
+
+ .. code-block:: python
+
+ import dagster as dg
+
+ @dg.multi_asset(specs=[dg.AssetSpec("asset1"), dg.AssetSpec("asset2")])
+ def my_multi_asset(context):
+ # Add metadata to the materialization event for "asset1"
+ context.add_asset_metadata({"key": "value"}, asset_key="asset1")
+
+ # THIS line will fail since asset key is not specified:
+ context.add_asset_metadata({"key": "value"})
+
+ """
+ self._step_execution_context.add_asset_metadata(
+ metadata=metadata,
+ asset_key=asset_key,
+ partition_key=partition_key,
)
@_copy_docs_from_op_execution_context
def get_output_metadata(
- self, output_name: str, mapping_key: Optional[str] = None
+ self,
+ output_name: str,
+ mapping_key: Optional[str] = None,
) -> Optional[Mapping[str, Any]]:
return self.op_execution_context.get_output_metadata(
- output_name=output_name, mapping_key=mapping_key
+ output_name=output_name,
+ mapping_key=mapping_key,
)
#### asset check related
diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py
index 7f16e5cfaab35..076055c6d64e3 100644
--- a/python_modules/dagster/dagster/_core/execution/context/invocation.py
+++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py
@@ -507,6 +507,20 @@ def partition_key(self) -> str:
return self._partition_key
check.failed("Tried to access partition_key for a non-partitioned run")
+ @property
+ def partition_keys(self) -> Sequence[str]:
+ key_range = self.partition_key_range
+ partitions_def = self.assets_def.partitions_def
+ if partitions_def is None:
+ raise DagsterInvariantViolationError(
+ "Cannot access partition_keys for a non-partitioned run"
+ )
+
+ return partitions_def.get_partition_keys_in_range(
+ key_range,
+ dynamic_partitions_store=self.instance,
+ )
+
@property
def partition_key_range(self) -> PartitionKeyRange:
"""The range of partition keys for the current run.
diff --git a/python_modules/dagster/dagster/_core/execution/context/metadata_logging.py b/python_modules/dagster/dagster/_core/execution/context/metadata_logging.py
new file mode 100644
index 0000000000000..20a69574f7f73
--- /dev/null
+++ b/python_modules/dagster/dagster/_core/execution/context/metadata_logging.py
@@ -0,0 +1,84 @@
+from typing import Any, Mapping, Optional, Union
+
+from dagster._core.definitions.asset_key import AssetKey
+from dagster._record import record
+from dagster._utils.merger import merge_dicts
+
+
+@record
+class OutputMetadataHandle:
+ output_name: str
+ mapping_key: Optional[str]
+
+
+@record
+class AssetMetadataHandle:
+ asset_key: AssetKey
+ partition_key: Optional[str]
+
+
+@record
+class OutputMetadataAccumulator:
+ per_output_metadata: Mapping[
+ Union[OutputMetadataHandle, AssetMetadataHandle], Mapping[str, Any]
+ ]
+
+ @staticmethod
+ def empty() -> "OutputMetadataAccumulator":
+ return OutputMetadataAccumulator(per_output_metadata={})
+
+ def get_output_metadata(
+ self, output_name: str, mapping_key: Optional[str]
+ ) -> Mapping[str, Any]:
+ handle = OutputMetadataHandle(
+ output_name=output_name,
+ mapping_key=mapping_key,
+ )
+ return self.per_output_metadata.get(handle, {})
+
+ def get_asset_metadata(
+ self, asset_key: AssetKey, partition_key: Optional[str]
+ ) -> Mapping[str, Any]:
+ handle = AssetMetadataHandle(
+ asset_key=asset_key,
+ partition_key=partition_key,
+ )
+ return self.per_output_metadata.get(handle, {})
+
+ def with_additional_output_metadata(
+ self,
+ output_name: str,
+ mapping_key: Optional[str],
+ metadata: Mapping[str, Any],
+ ) -> "OutputMetadataAccumulator":
+ return self._with_metadata(
+ handle=OutputMetadataHandle(
+ output_name=output_name,
+ mapping_key=mapping_key,
+ ),
+ metadata=metadata,
+ )
+
+ def _with_metadata(
+ self, handle: Union[OutputMetadataHandle, AssetMetadataHandle], metadata: Mapping[str, Any]
+ ) -> "OutputMetadataAccumulator":
+ return OutputMetadataAccumulator(
+ per_output_metadata=merge_dicts(
+ self.per_output_metadata,
+ {handle: merge_dicts(self.per_output_metadata.get(handle, {}), metadata)},
+ )
+ )
+
+ def with_additional_asset_metadata(
+ self,
+ asset_key: AssetKey,
+ partition_key: Optional[str],
+ metadata: Mapping[str, Any],
+ ) -> "OutputMetadataAccumulator":
+ return self._with_metadata(
+ handle=AssetMetadataHandle(
+ asset_key=asset_key,
+ partition_key=partition_key,
+ ),
+ metadata=metadata,
+ )
diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py
index 94dc23b47e59d..a88722f3289ac 100644
--- a/python_modules/dagster/dagster/_core/execution/context/system.py
+++ b/python_modules/dagster/dagster/_core/execution/context/system.py
@@ -23,8 +23,9 @@
import dagster._check as check
from dagster._annotations import public
+from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.dependency import OpNode
-from dagster._core.definitions.events import AssetKey, AssetLineageInfo
+from dagster._core.definitions.events import AssetKey, AssetLineageInfo, CoercibleToAssetKey
from dagster._core.definitions.hook_definition import HookDefinition
from dagster._core.definitions.job_base import IJob
from dagster._core.definitions.job_definition import JobDefinition
@@ -52,6 +53,7 @@
InputAssetVersionInfo,
)
from dagster._core.execution.context.input import InputContext
+from dagster._core.execution.context.metadata_logging import OutputMetadataAccumulator
from dagster._core.execution.context.output import OutputContext, get_output_context
from dagster._core.execution.plan.handle import ResolvedFromDynamicStepHandle, StepHandle
from dagster._core.execution.plan.outputs import StepOutputHandle
@@ -467,7 +469,7 @@ def __init__(
self._step_output_capture = {}
self._step_output_metadata_capture = {}
- self._output_metadata: Dict[str, Any] = {}
+ self._metadata_accumulator = OutputMetadataAccumulator.empty()
self._seen_outputs: Dict[str, Union[str, Set[str]]] = {}
self._data_version_cache = DataVersionCache(self)
@@ -688,7 +690,7 @@ def add_output_metadata(
output_name = output_def.name
elif output_name is None:
raise DagsterInvariantViolationError(
- "Attempted to log metadata without providing output_name, but multiple outputs"
+ "Attempted to add metadata without providing output_name, but multiple outputs"
" exist. Please provide an output_name to the invocation of"
" `context.add_output_metadata`."
)
@@ -706,33 +708,79 @@ def add_output_metadata(
f" metadata for {output_desc} which has already been yielded. Metadata must be"
" logged before the output is yielded."
)
- if output_def.is_dynamic and not mapping_key:
+ if output_def.is_dynamic:
+ if not mapping_key:
+ raise DagsterInvariantViolationError(
+ f"In {self.op_def.node_type_str} '{self.op.name}', Attempted to add metadata"
+ f" for dynamic output '{output_def.name}' without providing a mapping key. When"
+ " logging metadata for a dynamic output, it is necessary to provide a mapping key."
+ )
+ self._metadata_accumulator = self._metadata_accumulator.with_additional_output_metadata(
+ output_name=output_name,
+ metadata=metadata,
+ mapping_key=mapping_key,
+ )
+
+ def add_asset_metadata(
+ self,
+ metadata: Mapping[str, Any],
+ asset_key: Optional[CoercibleToAssetKey] = None,
+ partition_key: Optional[str] = None,
+ ) -> None:
+ if not self.assets_def:
+ raise DagsterInvariantViolationError(
+ "Attempted to add metadata for a non-asset computation. Only assets should be calling this function."
+ )
+ if len(self.assets_def.keys) == 0:
raise DagsterInvariantViolationError(
- f"In {self.op_def.node_type_str} '{self.op.name}', attempted to log metadata"
- f" for dynamic output '{output_def.name}' without providing a mapping key. When"
- " logging metadata for a dynamic output, it is necessary to provide a mapping key."
+ "Attempted to add metadata without providing asset_key, but no asset_keys"
+ " are being materialized. `context.add_asset_metadata` should only be called"
+ " when materializing assets."
)
+ if asset_key is None and len(self.assets_def.keys) > 1:
+ raise DagsterInvariantViolationError(
+ "Attempted to add metadata without providing asset_key, but multiple asset_keys"
+ " can potentially be materialized. Please provide an asset_key to the invocation of"
+ " `context.add_asset_metadata`."
+ )
+ asset_key = AssetKey.from_coercible(asset_key) if asset_key else self.assets_def.key
+ if asset_key not in self.assets_def.keys:
+ raise DagsterInvariantViolationError(
+ f"Attempted to add metadata for asset key '{asset_key}' that is not being materialized."
+ )
+ if partition_key:
+ if not self.assets_def.partitions_def:
+ raise DagsterInvariantViolationError(
+ f"Attempted to add metadata for partition key '{partition_key}' without a partitions definition."
+ )
- if mapping_key:
- if output_name not in self._output_metadata:
- self._output_metadata[output_name] = {}
- if mapping_key in self._output_metadata[output_name]:
- self._output_metadata[output_name][mapping_key].update(metadata)
- else:
- self._output_metadata[output_name][mapping_key] = metadata
- else:
- if output_name in self._output_metadata:
- self._output_metadata[output_name].update(metadata)
- else:
- self._output_metadata[output_name] = metadata
+ targeted_partitions = self.assets_def.partitions_def.get_partition_keys_in_range(
+ partition_key_range=self.partition_key_range
+ )
+ if partition_key not in targeted_partitions:
+ raise DagsterInvariantViolationError(
+ f"Attempted to add metadata for partition key '{partition_key}' that is not being targeted."
+ )
+
+ self._metadata_accumulator = self._metadata_accumulator.with_additional_asset_metadata(
+ asset_key=asset_key,
+ metadata=metadata,
+ partition_key=partition_key,
+ )
def get_output_metadata(
- self, output_name: str, mapping_key: Optional[str] = None
+ self,
+ output_name: str,
+ mapping_key: Optional[str] = None,
+ ) -> Optional[Mapping[str, Any]]:
+ return self._metadata_accumulator.get_output_metadata(output_name, mapping_key)
+
+ def get_asset_metadata(
+ self,
+ asset_key: AssetKey,
+ partition_key: Optional[str] = None,
) -> Optional[Mapping[str, Any]]:
- metadata = self._output_metadata.get(output_name)
- if mapping_key and metadata:
- return metadata.get(mapping_key)
- return metadata
+ return self._metadata_accumulator.get_asset_metadata(asset_key, partition_key)
def _get_source_run_id_from_logs(self, step_output_handle: StepOutputHandle) -> Optional[str]:
# walk through event logs to find the right run_id based on the run lineage
@@ -901,16 +949,18 @@ def run_partitions_def(self) -> Optional[PartitionsDefinition]:
# or no partitions_def. Get the partitions_def from one of the assets that has one.
return self.asset_partitions_def
+ @cached_property
+ def assets_def(self) -> Optional[AssetsDefinition]:
+ return self.job_def.asset_layer.assets_def_for_node(self.node_handle)
+
@cached_property
def asset_partitions_def(self) -> Optional[PartitionsDefinition]:
"""If the current step is executing a partitioned asset, returns the PartitionsDefinition
for that asset. If there are one or more partitioned assets executing in the step, they're
expected to all have the same PartitionsDefinition.
"""
- asset_layer = self.job_def.asset_layer
- assets_def = asset_layer.assets_def_for_node(self.node_handle) if asset_layer else None
- if assets_def is not None:
- for asset_key in assets_def.keys:
+ if self.assets_def is not None:
+ for asset_key in self.assets_def.keys:
partitions_def = self.job_def.asset_layer.get(asset_key).partitions_def
if partitions_def is not None:
return partitions_def
diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py
index 0858a47da4d5e..1f2db35809687 100644
--- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py
+++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py
@@ -564,7 +564,8 @@ def _get_output_asset_events(
step_context: StepExecutionContext,
execution_type: AssetExecutionType,
) -> Iterator[Union[AssetMaterialization, AssetObservation]]:
- all_metadata = {**output.metadata, **io_manager_metadata}
+ # Metadata scoped to all events for this asset.
+ key_scoped_metadata = {**output.metadata, **io_manager_metadata}
# Clear any cached record associated with this asset, since we are about to generate a new
# materialization.
@@ -623,9 +624,21 @@ def _get_output_asset_events(
else:
check.failed(f"Unexpected asset execution type {execution_type}")
+ unpartitioned_asset_metadata = step_context.get_asset_metadata(asset_key=asset_key)
+ all_unpartitioned_asset_metadata = {
+ **key_scoped_metadata,
+ **(unpartitioned_asset_metadata or {}),
+ }
if asset_partitions:
for partition in asset_partitions:
with disable_dagster_warnings():
+ partition_scoped_metadata = step_context.get_asset_metadata(
+ asset_key=asset_key, partition_key=partition
+ )
+ all_metadata_for_partitioned_event = {
+ **all_unpartitioned_asset_metadata,
+ **(partition_scoped_metadata or {}),
+ }
all_tags.update(
get_tags_from_multi_partition_key(partition)
if isinstance(partition, MultiPartitionKey)
@@ -635,12 +648,14 @@ def _get_output_asset_events(
yield event_class(
asset_key=asset_key,
partition=partition,
- metadata=all_metadata,
+ metadata=all_metadata_for_partitioned_event,
tags=all_tags,
)
else:
with disable_dagster_warnings():
- yield event_class(asset_key=asset_key, metadata=all_metadata, tags=all_tags)
+ yield event_class(
+ asset_key=asset_key, metadata=all_unpartitioned_asset_metadata, tags=all_tags
+ )
def _get_code_version(asset_key: AssetKey, step_context: StepExecutionContext) -> str:
diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py
index db853ff7499ea..629788cd582c5 100644
--- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py
+++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py
@@ -25,6 +25,7 @@
Out,
Output,
ResourceDefinition,
+ _check as check,
build_asset_context,
define_asset_job,
fs_io_manager,
@@ -46,6 +47,7 @@
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.decorators.asset_decorator import graph_asset
from dagster._core.definitions.events import AssetMaterialization
+from dagster._core.definitions.metadata.metadata_value import TextMetadataValue
from dagster._core.definitions.result import MaterializeResult
from dagster._core.errors import (
DagsterInvalidDefinitionError,
@@ -53,6 +55,7 @@
DagsterInvalidPropertyError,
DagsterInvariantViolationError,
)
+from dagster._core.event_api import EventRecordsFilter
from dagster._core.instance import DagsterInstance
from dagster._core.storage.mem_io_manager import InMemoryIOManager
from dagster._core.test_utils import instance_for_test
@@ -2408,3 +2411,123 @@ def test_asset_dep_backcompat() -> None:
@asset(deps=AssetKey("oops")) # type: ignore # good job type checker
def _(): ...
+
+
+def test_unpartitioned_asset_metadata():
+ @asset
+ def unpartitioned_asset(context: AssetExecutionContext) -> None:
+ context.add_asset_metadata(metadata={"asset_unpartitioned": "yay"})
+ context.add_output_metadata(metadata={"output_unpartitioned": "yay"})
+
+ context.add_asset_metadata(
+ asset_key="unpartitioned_asset", metadata={"asset_key_specified": "yay"}
+ )
+ context.add_output_metadata(
+ output_name=context.assets_def.get_output_name_for_asset_key(context.assets_def.key),
+ metadata={"output_name_specified": "yay"},
+ )
+
+ with pytest.raises(DagsterInvariantViolationError):
+ context.add_asset_metadata(metadata={"wont_work": "yay"}, partition_key="nonce")
+
+ with pytest.raises(DagsterInvariantViolationError):
+ context.add_asset_metadata(metadata={"wont_work": "yay"}, asset_key="nonce")
+
+ with pytest.raises(DagsterInvariantViolationError):
+ context.add_output_metadata(metadata={"wont_work": "yay"}, output_name="nonce")
+
+ with instance_for_test() as instance:
+ result = materialize(assets=[unpartitioned_asset], instance=instance)
+ assert result.success
+
+ asset_materializations = result.asset_materializations_for_node("unpartitioned_asset")
+ assert len(asset_materializations) == 1
+ assert asset_materializations[0].metadata == {
+ "asset_unpartitioned": TextMetadataValue("yay"),
+ "output_unpartitioned": TextMetadataValue("yay"),
+ "asset_key_specified": TextMetadataValue("yay"),
+ "output_name_specified": TextMetadataValue("yay"),
+ }
+
+ output_log = instance.get_event_records(
+ event_records_filter=EventRecordsFilter(event_type=DagsterEventType.STEP_OUTPUT)
+ )[0]
+ assert check.not_none(
+ output_log.event_log_entry.dagster_event
+ ).step_output_data.metadata == {
+ "output_unpartitioned": TextMetadataValue("yay"),
+ "output_name_specified": TextMetadataValue("yay"),
+ }
+
+
+def test_unpartitioned_multiasset_metadata():
+ @multi_asset(specs=[AssetSpec("a"), AssetSpec("b")])
+ def compute(context: AssetExecutionContext):
+ # Won't work because no asset key
+ with pytest.raises(DagsterInvariantViolationError):
+ context.add_asset_metadata(metadata={"wont_work": "yay"})
+ with pytest.raises(DagsterInvariantViolationError):
+ context.add_asset_metadata(metadata={"wont_work": "yay"}, partition_key="nonce")
+
+ # Won't work because wrong asset key
+ with pytest.raises(DagsterInvariantViolationError):
+ context.add_asset_metadata(metadata={"wont_work": "yay"}, asset_key="nonce")
+
+ # Won't work because no output name
+ with pytest.raises(DagsterInvariantViolationError):
+ context.add_output_metadata(metadata={"wont_work": "yay"})
+
+ # Won't work because wrong output name
+ with pytest.raises(DagsterInvariantViolationError):
+ context.add_output_metadata(metadata={"wont_work": "yay"}, output_name="nonce")
+
+ # Won't work because partition key
+ with pytest.raises(DagsterInvariantViolationError):
+ context.add_asset_metadata(
+ metadata={"wont_work": "yay"}, asset_key="a", partition_key="nonce"
+ )
+
+ context.add_asset_metadata(asset_key="a", metadata={"asset_key_specified": "yay"})
+ context.add_asset_metadata(asset_key="b", metadata={"asset_key_specified": "yay"})
+ context.add_output_metadata(
+ output_name=context.assets_def.get_output_name_for_asset_key(AssetKey("a")),
+ metadata={"output_name_specified": "yay"},
+ )
+ context.add_asset_metadata(metadata={"additional_a_metadata": "yay"}, asset_key="a")
+
+ context.add_output_metadata(
+ output_name=context.assets_def.get_output_name_for_asset_key(AssetKey("a")),
+ metadata={"additional_a_output_metadata": "yay"},
+ )
+
+ with instance_for_test() as instance:
+ result = materialize(assets=[compute], instance=instance)
+ assert result.success
+
+ asset_materializations = result.asset_materializations_for_node("compute")
+ assert len(asset_materializations) == 2
+ a_mat = next(mat for mat in asset_materializations if mat.asset_key == AssetKey("a"))
+ assert set(a_mat.metadata.keys()) == {
+ "asset_key_specified",
+ "output_name_specified",
+ "additional_a_metadata",
+ "additional_a_output_metadata",
+ }
+ b_mat = next(mat for mat in asset_materializations if mat.asset_key == AssetKey("b"))
+ assert set(b_mat.metadata.keys()) == {"asset_key_specified"}
+
+ output_logs = instance.get_event_records(
+ event_records_filter=EventRecordsFilter(event_type=DagsterEventType.STEP_OUTPUT)
+ )
+ output_event = next(
+ check.not_none(log.event_log_entry.dagster_event).step_output_data.metadata
+ for log in output_logs
+ if check.not_none(
+ log.event_log_entry.dagster_event
+ ).step_output_data.step_output_handle.output_name
+ == "a"
+ )
+ assert set(output_event.keys()) == {
+ "output_name_specified",
+ "additional_a_output_metadata",
+ }
diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py
index 398d9dc796167..bb8ad4520d944 100644
--- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py
+++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py
@@ -36,9 +36,13 @@
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.materialize import materialize_to_memory
+from dagster._core.definitions.metadata.metadata_value import IntMetadataValue, TextMetadataValue
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.time_window_partitions import TimeWindow
from dagster._core.errors import DagsterInvariantViolationError
+from dagster._core.event_api import EventRecordsFilter
+from dagster._core.events import DagsterEventType
+from dagster._core.instance_for_test import instance_for_test
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
@@ -687,11 +691,11 @@ def myconfig(start, _end):
)
-def test_partition_range_single_run():
+def test_partition_range_single_run() -> None:
partitions_def = DailyPartitionsDefinition(start_date="2020-01-01")
@asset(partitions_def=partitions_def)
- def upstream_asset(context) -> None:
+ def upstream_asset(context: AssetExecutionContext) -> None:
key_range = PartitionKeyRange(start="2020-01-01", end="2020-01-03")
assert context.has_partition_key_range
assert context.partition_key_range == key_range
@@ -700,6 +704,10 @@ def upstream_asset(context) -> None:
partitions_def.time_window_for_partition_key(key_range.end).end,
)
assert context.partition_keys == partitions_def.get_partition_keys_in_range(key_range)
+ context.add_asset_metadata(metadata={"asset_unpartitioned": "yay"})
+ context.add_output_metadata(metadata={"output_unpartitioned": "yay"})
+ for i, key in enumerate(context.partition_keys):
+ context.add_asset_metadata(partition_key=key, metadata={"index": i})
@asset(partitions_def=partitions_def, deps=["upstream_asset"])
def downstream_asset(context: AssetExecutionContext) -> None:
@@ -709,6 +717,7 @@ def downstream_asset(context: AssetExecutionContext) -> None:
assert context.partition_key_range == PartitionKeyRange(
start="2020-01-01", end="2020-01-03"
)
+ context.add_output_metadata(metadata={"unscoped": "yay"})
the_job = define_asset_job("job").resolve(
asset_graph=AssetGraph.from_assets([upstream_asset, downstream_asset])
@@ -721,14 +730,27 @@ def downstream_asset(context: AssetExecutionContext) -> None:
}
)
- assert {
- materialization.partition
- for materialization in result.asset_materializations_for_node("upstream_asset")
- } == {"2020-01-01", "2020-01-02", "2020-01-03"}
- assert {
- materialization.partition
- for materialization in result.asset_materializations_for_node("downstream_asset")
- } == {"2020-01-01", "2020-01-02", "2020-01-03"}
+ upstream_assets = result.asset_materializations_for_node("upstream_asset")
+ downstream_assets = result.asset_materializations_for_node("downstream_asset")
+ assert {materialization.partition for materialization in upstream_assets} == {
+ "2020-01-01",
+ "2020-01-02",
+ "2020-01-03",
+ }
+ assert {materialization.partition for materialization in downstream_assets} == {
+ "2020-01-01",
+ "2020-01-02",
+ "2020-01-03",
+ }
+
+ for i, mat in enumerate(upstream_assets):
+ assert mat.metadata == {
+ "asset_unpartitioned": TextMetadataValue("yay"),
+ "output_unpartitioned": TextMetadataValue("yay"),
+ "index": IntMetadataValue(i),
+ }
+ for mat in downstream_assets:
+ assert mat.metadata == {"unscoped": TextMetadataValue("yay")}
def test_multipartition_range_single_run():
@@ -908,3 +930,73 @@ def assets4(): ...
partitions_def=partitions_def,
)
def assets5(): ...
+
+
+def test_partitioned_asset_metadata():
+ @asset(partitions_def=StaticPartitionsDefinition(["a", "b", "c"]))
+ def partitioned_asset(context: AssetExecutionContext) -> None:
+ context.add_asset_metadata(metadata={"asset_unpartitioned": "yay"})
+ context.add_output_metadata(metadata={"output_unpartitioned": "yay"})
+
+ context.add_asset_metadata(
+ asset_key="partitioned_asset", metadata={"asset_key_specified": "yay"}
+ )
+ context.add_output_metadata(
+ output_name=context.assets_def.get_output_name_for_asset_key(context.assets_def.key),
+ metadata={"output_name_specified": "yay"},
+ )
+
+ for key in context.partition_keys:
+ context.add_asset_metadata(partition_key=key, metadata={f"partition_key_{key}": "yay"})
+
+ with pytest.raises(DagsterInvariantViolationError):
+ context.add_asset_metadata(metadata={"wont_work": "yay"}, partition_key="nonce")
+
+ with pytest.raises(DagsterInvariantViolationError):
+ context.add_asset_metadata(metadata={"wont_work": "yay"}, asset_key="nonce")
+
+ with pytest.raises(DagsterInvariantViolationError):
+ context.add_output_metadata(metadata={"wont_work": "yay"}, output_name="nonce")
+
+ # partition key is valid but not currently being targeted.
+ with pytest.raises(DagsterInvariantViolationError):
+ context.add_asset_metadata(
+ metadata={"wont_work": "yay"}, asset_key="partitioned_asset", partition_key="c"
+ )
+
+ with instance_for_test() as instance:
+ result = materialize(
+ assets=[partitioned_asset],
+ instance=instance,
+ tags={ASSET_PARTITION_RANGE_START_TAG: "a", ASSET_PARTITION_RANGE_END_TAG: "b"},
+ )
+ assert result.success
+
+ asset_materializations = result.asset_materializations_for_node("partitioned_asset")
+ assert len(asset_materializations) == 2
+ a_mat = next(mat for mat in asset_materializations if mat.partition == "a")
+ b_mat = next(mat for mat in asset_materializations if mat.partition == "b")
+ assert a_mat.metadata == {
+ "asset_unpartitioned": TextMetadataValue("yay"),
+ "output_unpartitioned": TextMetadataValue("yay"),
+ "asset_key_specified": TextMetadataValue("yay"),
+ "output_name_specified": TextMetadataValue("yay"),
+ "partition_key_a": TextMetadataValue("yay"),
+ }
+ assert b_mat.metadata == {
+ "asset_unpartitioned": TextMetadataValue("yay"),
+ "output_unpartitioned": TextMetadataValue("yay"),
+ "asset_key_specified": TextMetadataValue("yay"),
+ "output_name_specified": TextMetadataValue("yay"),
+ "partition_key_b": TextMetadataValue("yay"),
+ }
+
+ output_log = instance.event_log_storage.get_event_records(
+ event_records_filter=EventRecordsFilter(event_type=DagsterEventType.STEP_OUTPUT)
+ )[0]
+ assert check.not_none(
+ output_log.event_log_entry.dagster_event
+ ).step_output_data.metadata == {
+ "output_unpartitioned": TextMetadataValue("yay"),
+ "output_name_specified": TextMetadataValue("yay"),
+ }
diff --git a/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py b/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py
index e9cd27da3d2bb..2444d2292ff96 100644
--- a/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py
+++ b/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py
@@ -1303,8 +1303,10 @@ def test_partition_range_asset_invocation():
@asset(partitions_def=partitions_def)
def foo(context: AssetExecutionContext):
- keys = partitions_def.get_partition_keys_in_range(context.partition_key_range)
- return {k: True for k in keys}
+ keys1 = partitions_def.get_partition_keys_in_range(context.partition_key_range)
+ keys2 = context.partition_keys
+ assert keys1 == keys2
+ return {k: True for k in keys1}
context = build_asset_context(
partition_key_range=PartitionKeyRange("2023-01-01", "2023-01-02"),
diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py
index 68fdaea3e2629..fa8905edb4e18 100644
--- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py
+++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py
@@ -25,6 +25,7 @@
AssetOut,
AssetsDefinition,
AssetSelection,
+ AssetSpec,
AutoMaterializePolicy,
AutomationCondition,
DagsterInvalidDefinitionError,
@@ -807,12 +808,9 @@ def build_dbt_multi_asset_args(
project=project,
)
- outs[output_name] = AssetOut(
+ spec = AssetSpec(
key=asset_key,
- dagster_type=Nothing,
- io_manager_key=io_manager_key,
description=dagster_dbt_translator.get_description(dbt_resource_props),
- is_required=False,
metadata=metadata,
owners=dagster_dbt_translator.get_owners(
{
@@ -831,6 +829,14 @@ def build_dbt_multi_asset_args(
automation_condition=dagster_dbt_translator.get_automation_condition(
dbt_resource_props
),
+ partitions_def=dagster_dbt_translator.get_partitions_def(dbt_resource_props),
+ )
+
+ outs[output_name] = AssetOut.from_spec(
+ spec=spec,
+ dagster_type=Nothing,
+ is_required=False,
+ io_manager_key=io_manager_key,
)
test_unique_ids = [
diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py b/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py
index a879588e274d5..adead4ac14639 100644
--- a/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py
+++ b/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py
@@ -10,6 +10,7 @@
_check as check,
)
from dagster._annotations import experimental, public
+from dagster._core.definitions.partition import PartitionsDefinition
from dagster._utils.tags import is_valid_tag_key
from dagster_dbt.asset_utils import (
@@ -520,6 +521,40 @@ def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Opt
auto_materialize_policy.to_automation_condition() if auto_materialize_policy else None
)
+ def get_partitions_def(
+ self, dbt_resource_props: Mapping[str, Any]
+ ) -> Optional[PartitionsDefinition]:
+ """[INTERNAL] A function that takes a dictionary representing properties of a dbt resource, and
+ returns the Dagster :py:class:`dagster.PartitionsDefinition` for that resource.
+
+ This method can be overridden to provide a custom PartitionsDefinition for a dbt resource.
+
+ Args:
+ dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource.
+
+ Returns:
+ Optional[PartitionsDefinition]: A Dagster partitions definition.
+
+ Examples:
+ Set a custom AutomationCondition for dbt resources with a specific tag:
+
+ .. code-block:: python
+
+ from typing import Any, Mapping
+
+ from dagster import DailyPartitionsDefinition
+ from dagster_dbt import DagsterDbtTranslator
+
+
+ class CustomDagsterDbtTranslator(DagsterDbtTranslator):
+ def get_partitions_def(self, dbt_resource_props: Mapping[str, Any]) -> Optional[PartitionsDefinition]:
+ if "my_custom_tag" in dbt_resource_props.get("tags", []):
+ return DailyPartitionsDefinition(start_date="2022-01-01")
+ else:
+ return None
+ """
+ return None
+
@dataclass
class DbtManifestWrapper:
diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py
index 1d5e7998a46a8..87a8028e622ac 100644
--- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py
+++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py
@@ -789,6 +789,35 @@ def my_dbt_assets(): ...
)
+def test_with_varying_partitions_defs(test_jaffle_shop_manifest: Dict[str, Any]) -> None:
+ daily_partitions = DailyPartitionsDefinition(start_date="2023-01-01")
+ override_keys = {AssetKey("customers"), AssetKey("orders")}
+
+ class CustomDagsterDbtTranslator(DagsterDbtTranslator):
+ def get_partitions_def(
+ self, dbt_resource_props: Mapping[str, Any]
+ ) -> Optional[PartitionsDefinition]:
+ asset_key = super().get_asset_key(dbt_resource_props)
+ if asset_key in override_keys:
+ return daily_partitions
+ else:
+ return None
+
+ @dbt_assets(
+ manifest=test_jaffle_shop_manifest, dagster_dbt_translator=CustomDagsterDbtTranslator()
+ )
+ def my_dbt_assets(): ...
+
+ assert set(my_dbt_assets.keys) > override_keys
+
+ for spec in my_dbt_assets.specs:
+ partitions_def = spec.partitions_def
+ if spec.key in override_keys:
+ assert partitions_def == daily_partitions, spec.key
+ else:
+ assert partitions_def is None, spec.key
+
+
def test_dbt_meta_auto_materialize_policy(test_meta_config_manifest: Dict[str, Any]) -> None:
expected_auto_materialize_policy = AutoMaterializePolicy.eager()
expected_specs_by_key = {