From b51f1ddbef02a1aa334d5b347c3e782b1a5a3e8c Mon Sep 17 00:00:00 2001 From: OwenKephart Date: Fri, 20 Dec 2024 10:47:17 -0500 Subject: [PATCH 1/7] Fix direct invocation for partition ranges (#26631) ## Summary & Motivation We changed the partition_keys implementation on the main execution context class, which made it incompatible with DirectExecutionContext ## How I Tested These Changes Added test failed before, passes now ## Changelog NOCHANGELOG --- .../dagster/_core/execution/context/invocation.py | 14 ++++++++++++++ .../dagster_tests/core_tests/test_op_invocation.py | 6 ++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index 7f16e5cfaab35..076055c6d64e3 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -507,6 +507,20 @@ def partition_key(self) -> str: return self._partition_key check.failed("Tried to access partition_key for a non-partitioned run") + @property + def partition_keys(self) -> Sequence[str]: + key_range = self.partition_key_range + partitions_def = self.assets_def.partitions_def + if partitions_def is None: + raise DagsterInvariantViolationError( + "Cannot access partition_keys for a non-partitioned run" + ) + + return partitions_def.get_partition_keys_in_range( + key_range, + dynamic_partitions_store=self.instance, + ) + @property def partition_key_range(self) -> PartitionKeyRange: """The range of partition keys for the current run. diff --git a/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py b/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py index e9cd27da3d2bb..2444d2292ff96 100644 --- a/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_op_invocation.py @@ -1303,8 +1303,10 @@ def test_partition_range_asset_invocation(): @asset(partitions_def=partitions_def) def foo(context: AssetExecutionContext): - keys = partitions_def.get_partition_keys_in_range(context.partition_key_range) - return {k: True for k in keys} + keys1 = partitions_def.get_partition_keys_in_range(context.partition_key_range) + keys2 = context.partition_keys + assert keys1 == keys2 + return {k: True for k in keys1} context = build_asset_context( partition_key_range=PartitionKeyRange("2023-01-01", "2023-01-02"), From fd27b7c9a698637b0f14596dd8527d146280647a Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Fri, 20 Dec 2024 07:47:30 -0800 Subject: [PATCH 2/7] [partition-metadata] add_output_metadata refactor (#26561) ## Summary & Motivation I was finding that complex series of ternaries really hard to parse. Most of the complexity is the result of using the same dictionary to represent metadata per output name, and metadata per output name and mapping key. Instead, have a top-level accumulator class and add an "OutputMetadataHandle", so that every metadata dictionary has the same scope level. We keep the same checks in place as before, but the logic is much simpler now. ## How I Tested These Changes Existing tests --- .../context/asset_execution_context.py | 84 +++++++++++- .../execution/context/metadata_logging.py | 84 ++++++++++++ .../dagster/_core/execution/context/system.py | 106 +++++++++++---- .../_core/execution/plan/execute_step.py | 21 ++- .../asset_defs_tests/test_assets.py | 123 ++++++++++++++++++ .../test_partitioned_assets.py | 112 ++++++++++++++-- 6 files changed, 485 insertions(+), 45 deletions(-) create mode 100644 python_modules/dagster/dagster/_core/execution/context/metadata_logging.py diff --git a/python_modules/dagster/dagster/_core/execution/context/asset_execution_context.py b/python_modules/dagster/dagster/_core/execution/context/asset_execution_context.py index e86999fd017b4..e74f6fef46775 100644 --- a/python_modules/dagster/dagster/_core/execution/context/asset_execution_context.py +++ b/python_modules/dagster/dagster/_core/execution/context/asset_execution_context.py @@ -6,7 +6,7 @@ from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.data_version import DataProvenance, DataVersion from dagster._core.definitions.dependency import Node, NodeHandle -from dagster._core.definitions.events import AssetKey, UserEvent +from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey, UserEvent from dagster._core.definitions.job_definition import JobDefinition from dagster._core.definitions.op_definition import OpDefinition from dagster._core.definitions.partition import PartitionsDefinition @@ -459,15 +459,91 @@ def add_output_metadata( mapping_key: Optional[str] = None, ) -> None: return self.op_execution_context.add_output_metadata( - metadata=metadata, output_name=output_name, mapping_key=mapping_key + metadata=metadata, + output_name=output_name, + mapping_key=mapping_key, + ) + + @public + def add_asset_metadata( + self, + metadata: Mapping[str, Any], + asset_key: Optional[CoercibleToAssetKey] = None, + partition_key: Optional[str] = None, + ) -> None: + """Add metadata to an asset materialization event. This metadata will be + available in the Dagster UI. + + Args: + metadata (Mapping[str, Any]): The metadata to add to the asset + materialization event. + asset_key (Optional[CoercibleToAssetKey]): The asset key to add metadata to. + Does not need to be provided if only one asset is currently being + materialized. + partition_key (Optional[str]): The partition key to add metadata to, if + applicable. Should not be provided on non-partitioned assets. If not + provided on a partitioned asset, the metadata will be added to all + partitions of the asset currently being materialized. + + Examples: + Adding metadata to the asset materialization event for a single asset: + + .. code-block:: python + + import dagster as dg + + @dg.asset + def my_asset(context): + # Add metadata + context.add_asset_metadata({"key": "value"}) + + Adding metadata to the asset materialization event for a particular partition of a partitioned asset: + + .. code-block:: python + + import dagster as dg + + @dg.asset(partitions_def=dg.StaticPartitionsDefinition(["a", "b"])) + def my_asset(context): + # Adds metadata to all partitions currently being materialized, since no + # partition is specified. + context.add_asset_metadata({"key": "value"}) + + for partition_key in context.partition_keys: + # Add metadata only to the event for partition "a" + if partition_key == "a": + context.add_asset_metadata({"key": "value"}, partition_key=partition_key) + + Adding metadata to the asset materialization event for a particular asset in a multi-asset. + + .. code-block:: python + + import dagster as dg + + @dg.multi_asset(specs=[dg.AssetSpec("asset1"), dg.AssetSpec("asset2")]) + def my_multi_asset(context): + # Add metadata to the materialization event for "asset1" + context.add_asset_metadata({"key": "value"}, asset_key="asset1") + + # THIS line will fail since asset key is not specified: + context.add_asset_metadata({"key": "value"}) + + """ + self._step_execution_context.add_asset_metadata( + metadata=metadata, + asset_key=asset_key, + partition_key=partition_key, ) @_copy_docs_from_op_execution_context def get_output_metadata( - self, output_name: str, mapping_key: Optional[str] = None + self, + output_name: str, + mapping_key: Optional[str] = None, ) -> Optional[Mapping[str, Any]]: return self.op_execution_context.get_output_metadata( - output_name=output_name, mapping_key=mapping_key + output_name=output_name, + mapping_key=mapping_key, ) #### asset check related diff --git a/python_modules/dagster/dagster/_core/execution/context/metadata_logging.py b/python_modules/dagster/dagster/_core/execution/context/metadata_logging.py new file mode 100644 index 0000000000000..20a69574f7f73 --- /dev/null +++ b/python_modules/dagster/dagster/_core/execution/context/metadata_logging.py @@ -0,0 +1,84 @@ +from typing import Any, Mapping, Optional, Union + +from dagster._core.definitions.asset_key import AssetKey +from dagster._record import record +from dagster._utils.merger import merge_dicts + + +@record +class OutputMetadataHandle: + output_name: str + mapping_key: Optional[str] + + +@record +class AssetMetadataHandle: + asset_key: AssetKey + partition_key: Optional[str] + + +@record +class OutputMetadataAccumulator: + per_output_metadata: Mapping[ + Union[OutputMetadataHandle, AssetMetadataHandle], Mapping[str, Any] + ] + + @staticmethod + def empty() -> "OutputMetadataAccumulator": + return OutputMetadataAccumulator(per_output_metadata={}) + + def get_output_metadata( + self, output_name: str, mapping_key: Optional[str] + ) -> Mapping[str, Any]: + handle = OutputMetadataHandle( + output_name=output_name, + mapping_key=mapping_key, + ) + return self.per_output_metadata.get(handle, {}) + + def get_asset_metadata( + self, asset_key: AssetKey, partition_key: Optional[str] + ) -> Mapping[str, Any]: + handle = AssetMetadataHandle( + asset_key=asset_key, + partition_key=partition_key, + ) + return self.per_output_metadata.get(handle, {}) + + def with_additional_output_metadata( + self, + output_name: str, + mapping_key: Optional[str], + metadata: Mapping[str, Any], + ) -> "OutputMetadataAccumulator": + return self._with_metadata( + handle=OutputMetadataHandle( + output_name=output_name, + mapping_key=mapping_key, + ), + metadata=metadata, + ) + + def _with_metadata( + self, handle: Union[OutputMetadataHandle, AssetMetadataHandle], metadata: Mapping[str, Any] + ) -> "OutputMetadataAccumulator": + return OutputMetadataAccumulator( + per_output_metadata=merge_dicts( + self.per_output_metadata, + {handle: merge_dicts(self.per_output_metadata.get(handle, {}), metadata)}, + ) + ) + + def with_additional_asset_metadata( + self, + asset_key: AssetKey, + partition_key: Optional[str], + metadata: Mapping[str, Any], + ) -> "OutputMetadataAccumulator": + return self._with_metadata( + handle=AssetMetadataHandle( + asset_key=asset_key, + partition_key=partition_key, + ), + metadata=metadata, + ) diff --git a/python_modules/dagster/dagster/_core/execution/context/system.py b/python_modules/dagster/dagster/_core/execution/context/system.py index 94dc23b47e59d..a88722f3289ac 100644 --- a/python_modules/dagster/dagster/_core/execution/context/system.py +++ b/python_modules/dagster/dagster/_core/execution/context/system.py @@ -23,8 +23,9 @@ import dagster._check as check from dagster._annotations import public +from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.dependency import OpNode -from dagster._core.definitions.events import AssetKey, AssetLineageInfo +from dagster._core.definitions.events import AssetKey, AssetLineageInfo, CoercibleToAssetKey from dagster._core.definitions.hook_definition import HookDefinition from dagster._core.definitions.job_base import IJob from dagster._core.definitions.job_definition import JobDefinition @@ -52,6 +53,7 @@ InputAssetVersionInfo, ) from dagster._core.execution.context.input import InputContext +from dagster._core.execution.context.metadata_logging import OutputMetadataAccumulator from dagster._core.execution.context.output import OutputContext, get_output_context from dagster._core.execution.plan.handle import ResolvedFromDynamicStepHandle, StepHandle from dagster._core.execution.plan.outputs import StepOutputHandle @@ -467,7 +469,7 @@ def __init__( self._step_output_capture = {} self._step_output_metadata_capture = {} - self._output_metadata: Dict[str, Any] = {} + self._metadata_accumulator = OutputMetadataAccumulator.empty() self._seen_outputs: Dict[str, Union[str, Set[str]]] = {} self._data_version_cache = DataVersionCache(self) @@ -688,7 +690,7 @@ def add_output_metadata( output_name = output_def.name elif output_name is None: raise DagsterInvariantViolationError( - "Attempted to log metadata without providing output_name, but multiple outputs" + "Attempted to add metadata without providing output_name, but multiple outputs" " exist. Please provide an output_name to the invocation of" " `context.add_output_metadata`." ) @@ -706,33 +708,79 @@ def add_output_metadata( f" metadata for {output_desc} which has already been yielded. Metadata must be" " logged before the output is yielded." ) - if output_def.is_dynamic and not mapping_key: + if output_def.is_dynamic: + if not mapping_key: + raise DagsterInvariantViolationError( + f"In {self.op_def.node_type_str} '{self.op.name}', Attempted to add metadata" + f" for dynamic output '{output_def.name}' without providing a mapping key. When" + " logging metadata for a dynamic output, it is necessary to provide a mapping key." + ) + self._metadata_accumulator = self._metadata_accumulator.with_additional_output_metadata( + output_name=output_name, + metadata=metadata, + mapping_key=mapping_key, + ) + + def add_asset_metadata( + self, + metadata: Mapping[str, Any], + asset_key: Optional[CoercibleToAssetKey] = None, + partition_key: Optional[str] = None, + ) -> None: + if not self.assets_def: + raise DagsterInvariantViolationError( + "Attempted to add metadata for a non-asset computation. Only assets should be calling this function." + ) + if len(self.assets_def.keys) == 0: raise DagsterInvariantViolationError( - f"In {self.op_def.node_type_str} '{self.op.name}', attempted to log metadata" - f" for dynamic output '{output_def.name}' without providing a mapping key. When" - " logging metadata for a dynamic output, it is necessary to provide a mapping key." + "Attempted to add metadata without providing asset_key, but no asset_keys" + " are being materialized. `context.add_asset_metadata` should only be called" + " when materializing assets." ) + if asset_key is None and len(self.assets_def.keys) > 1: + raise DagsterInvariantViolationError( + "Attempted to add metadata without providing asset_key, but multiple asset_keys" + " can potentially be materialized. Please provide an asset_key to the invocation of" + " `context.add_asset_metadata`." + ) + asset_key = AssetKey.from_coercible(asset_key) if asset_key else self.assets_def.key + if asset_key not in self.assets_def.keys: + raise DagsterInvariantViolationError( + f"Attempted to add metadata for asset key '{asset_key}' that is not being materialized." + ) + if partition_key: + if not self.assets_def.partitions_def: + raise DagsterInvariantViolationError( + f"Attempted to add metadata for partition key '{partition_key}' without a partitions definition." + ) - if mapping_key: - if output_name not in self._output_metadata: - self._output_metadata[output_name] = {} - if mapping_key in self._output_metadata[output_name]: - self._output_metadata[output_name][mapping_key].update(metadata) - else: - self._output_metadata[output_name][mapping_key] = metadata - else: - if output_name in self._output_metadata: - self._output_metadata[output_name].update(metadata) - else: - self._output_metadata[output_name] = metadata + targeted_partitions = self.assets_def.partitions_def.get_partition_keys_in_range( + partition_key_range=self.partition_key_range + ) + if partition_key not in targeted_partitions: + raise DagsterInvariantViolationError( + f"Attempted to add metadata for partition key '{partition_key}' that is not being targeted." + ) + + self._metadata_accumulator = self._metadata_accumulator.with_additional_asset_metadata( + asset_key=asset_key, + metadata=metadata, + partition_key=partition_key, + ) def get_output_metadata( - self, output_name: str, mapping_key: Optional[str] = None + self, + output_name: str, + mapping_key: Optional[str] = None, + ) -> Optional[Mapping[str, Any]]: + return self._metadata_accumulator.get_output_metadata(output_name, mapping_key) + + def get_asset_metadata( + self, + asset_key: AssetKey, + partition_key: Optional[str] = None, ) -> Optional[Mapping[str, Any]]: - metadata = self._output_metadata.get(output_name) - if mapping_key and metadata: - return metadata.get(mapping_key) - return metadata + return self._metadata_accumulator.get_asset_metadata(asset_key, partition_key) def _get_source_run_id_from_logs(self, step_output_handle: StepOutputHandle) -> Optional[str]: # walk through event logs to find the right run_id based on the run lineage @@ -901,16 +949,18 @@ def run_partitions_def(self) -> Optional[PartitionsDefinition]: # or no partitions_def. Get the partitions_def from one of the assets that has one. return self.asset_partitions_def + @cached_property + def assets_def(self) -> Optional[AssetsDefinition]: + return self.job_def.asset_layer.assets_def_for_node(self.node_handle) + @cached_property def asset_partitions_def(self) -> Optional[PartitionsDefinition]: """If the current step is executing a partitioned asset, returns the PartitionsDefinition for that asset. If there are one or more partitioned assets executing in the step, they're expected to all have the same PartitionsDefinition. """ - asset_layer = self.job_def.asset_layer - assets_def = asset_layer.assets_def_for_node(self.node_handle) if asset_layer else None - if assets_def is not None: - for asset_key in assets_def.keys: + if self.assets_def is not None: + for asset_key in self.assets_def.keys: partitions_def = self.job_def.asset_layer.get(asset_key).partitions_def if partitions_def is not None: return partitions_def diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 0858a47da4d5e..1f2db35809687 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -564,7 +564,8 @@ def _get_output_asset_events( step_context: StepExecutionContext, execution_type: AssetExecutionType, ) -> Iterator[Union[AssetMaterialization, AssetObservation]]: - all_metadata = {**output.metadata, **io_manager_metadata} + # Metadata scoped to all events for this asset. + key_scoped_metadata = {**output.metadata, **io_manager_metadata} # Clear any cached record associated with this asset, since we are about to generate a new # materialization. @@ -623,9 +624,21 @@ def _get_output_asset_events( else: check.failed(f"Unexpected asset execution type {execution_type}") + unpartitioned_asset_metadata = step_context.get_asset_metadata(asset_key=asset_key) + all_unpartitioned_asset_metadata = { + **key_scoped_metadata, + **(unpartitioned_asset_metadata or {}), + } if asset_partitions: for partition in asset_partitions: with disable_dagster_warnings(): + partition_scoped_metadata = step_context.get_asset_metadata( + asset_key=asset_key, partition_key=partition + ) + all_metadata_for_partitioned_event = { + **all_unpartitioned_asset_metadata, + **(partition_scoped_metadata or {}), + } all_tags.update( get_tags_from_multi_partition_key(partition) if isinstance(partition, MultiPartitionKey) @@ -635,12 +648,14 @@ def _get_output_asset_events( yield event_class( asset_key=asset_key, partition=partition, - metadata=all_metadata, + metadata=all_metadata_for_partitioned_event, tags=all_tags, ) else: with disable_dagster_warnings(): - yield event_class(asset_key=asset_key, metadata=all_metadata, tags=all_tags) + yield event_class( + asset_key=asset_key, metadata=all_unpartitioned_asset_metadata, tags=all_tags + ) def _get_code_version(asset_key: AssetKey, step_context: StepExecutionContext) -> str: diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index db853ff7499ea..629788cd582c5 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -25,6 +25,7 @@ Out, Output, ResourceDefinition, + _check as check, build_asset_context, define_asset_job, fs_io_manager, @@ -46,6 +47,7 @@ from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.decorators.asset_decorator import graph_asset from dagster._core.definitions.events import AssetMaterialization +from dagster._core.definitions.metadata.metadata_value import TextMetadataValue from dagster._core.definitions.result import MaterializeResult from dagster._core.errors import ( DagsterInvalidDefinitionError, @@ -53,6 +55,7 @@ DagsterInvalidPropertyError, DagsterInvariantViolationError, ) +from dagster._core.event_api import EventRecordsFilter from dagster._core.instance import DagsterInstance from dagster._core.storage.mem_io_manager import InMemoryIOManager from dagster._core.test_utils import instance_for_test @@ -2408,3 +2411,123 @@ def test_asset_dep_backcompat() -> None: @asset(deps=AssetKey("oops")) # type: ignore # good job type checker def _(): ... + + +def test_unpartitioned_asset_metadata(): + @asset + def unpartitioned_asset(context: AssetExecutionContext) -> None: + context.add_asset_metadata(metadata={"asset_unpartitioned": "yay"}) + context.add_output_metadata(metadata={"output_unpartitioned": "yay"}) + + context.add_asset_metadata( + asset_key="unpartitioned_asset", metadata={"asset_key_specified": "yay"} + ) + context.add_output_metadata( + output_name=context.assets_def.get_output_name_for_asset_key(context.assets_def.key), + metadata={"output_name_specified": "yay"}, + ) + + with pytest.raises(DagsterInvariantViolationError): + context.add_asset_metadata(metadata={"wont_work": "yay"}, partition_key="nonce") + + with pytest.raises(DagsterInvariantViolationError): + context.add_asset_metadata(metadata={"wont_work": "yay"}, asset_key="nonce") + + with pytest.raises(DagsterInvariantViolationError): + context.add_output_metadata(metadata={"wont_work": "yay"}, output_name="nonce") + + with instance_for_test() as instance: + result = materialize(assets=[unpartitioned_asset], instance=instance) + assert result.success + + asset_materializations = result.asset_materializations_for_node("unpartitioned_asset") + assert len(asset_materializations) == 1 + assert asset_materializations[0].metadata == { + "asset_unpartitioned": TextMetadataValue("yay"), + "output_unpartitioned": TextMetadataValue("yay"), + "asset_key_specified": TextMetadataValue("yay"), + "output_name_specified": TextMetadataValue("yay"), + } + + output_log = instance.get_event_records( + event_records_filter=EventRecordsFilter(event_type=DagsterEventType.STEP_OUTPUT) + )[0] + assert check.not_none( + output_log.event_log_entry.dagster_event + ).step_output_data.metadata == { + "output_unpartitioned": TextMetadataValue("yay"), + "output_name_specified": TextMetadataValue("yay"), + } + + +def test_unpartitioned_multiasset_metadata(): + @multi_asset(specs=[AssetSpec("a"), AssetSpec("b")]) + def compute(context: AssetExecutionContext): + # Won't work because no asset key + with pytest.raises(DagsterInvariantViolationError): + context.add_asset_metadata(metadata={"wont_work": "yay"}) + with pytest.raises(DagsterInvariantViolationError): + context.add_asset_metadata(metadata={"wont_work": "yay"}, partition_key="nonce") + + # Won't work because wrong asset key + with pytest.raises(DagsterInvariantViolationError): + context.add_asset_metadata(metadata={"wont_work": "yay"}, asset_key="nonce") + + # Won't work because no output name + with pytest.raises(DagsterInvariantViolationError): + context.add_output_metadata(metadata={"wont_work": "yay"}) + + # Won't work because wrong output name + with pytest.raises(DagsterInvariantViolationError): + context.add_output_metadata(metadata={"wont_work": "yay"}, output_name="nonce") + + # Won't work because partition key + with pytest.raises(DagsterInvariantViolationError): + context.add_asset_metadata( + metadata={"wont_work": "yay"}, asset_key="a", partition_key="nonce" + ) + + context.add_asset_metadata(asset_key="a", metadata={"asset_key_specified": "yay"}) + context.add_asset_metadata(asset_key="b", metadata={"asset_key_specified": "yay"}) + context.add_output_metadata( + output_name=context.assets_def.get_output_name_for_asset_key(AssetKey("a")), + metadata={"output_name_specified": "yay"}, + ) + context.add_asset_metadata(metadata={"additional_a_metadata": "yay"}, asset_key="a") + + context.add_output_metadata( + output_name=context.assets_def.get_output_name_for_asset_key(AssetKey("a")), + metadata={"additional_a_output_metadata": "yay"}, + ) + + with instance_for_test() as instance: + result = materialize(assets=[compute], instance=instance) + assert result.success + + asset_materializations = result.asset_materializations_for_node("compute") + assert len(asset_materializations) == 2 + a_mat = next(mat for mat in asset_materializations if mat.asset_key == AssetKey("a")) + assert set(a_mat.metadata.keys()) == { + "asset_key_specified", + "output_name_specified", + "additional_a_metadata", + "additional_a_output_metadata", + } + b_mat = next(mat for mat in asset_materializations if mat.asset_key == AssetKey("b")) + assert set(b_mat.metadata.keys()) == {"asset_key_specified"} + + output_logs = instance.get_event_records( + event_records_filter=EventRecordsFilter(event_type=DagsterEventType.STEP_OUTPUT) + ) + output_event = next( + check.not_none(log.event_log_entry.dagster_event).step_output_data.metadata + for log in output_logs + if check.not_none( + log.event_log_entry.dagster_event + ).step_output_data.step_output_handle.output_name + == "a" + ) + assert set(output_event.keys()) == { + "output_name_specified", + "additional_a_output_metadata", + } diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py index 398d9dc796167..bb8ad4520d944 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py @@ -36,9 +36,13 @@ from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.events import AssetKey from dagster._core.definitions.materialize import materialize_to_memory +from dagster._core.definitions.metadata.metadata_value import IntMetadataValue, TextMetadataValue from dagster._core.definitions.partition_key_range import PartitionKeyRange from dagster._core.definitions.time_window_partitions import TimeWindow from dagster._core.errors import DagsterInvariantViolationError +from dagster._core.event_api import EventRecordsFilter +from dagster._core.events import DagsterEventType +from dagster._core.instance_for_test import instance_for_test from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, @@ -687,11 +691,11 @@ def myconfig(start, _end): ) -def test_partition_range_single_run(): +def test_partition_range_single_run() -> None: partitions_def = DailyPartitionsDefinition(start_date="2020-01-01") @asset(partitions_def=partitions_def) - def upstream_asset(context) -> None: + def upstream_asset(context: AssetExecutionContext) -> None: key_range = PartitionKeyRange(start="2020-01-01", end="2020-01-03") assert context.has_partition_key_range assert context.partition_key_range == key_range @@ -700,6 +704,10 @@ def upstream_asset(context) -> None: partitions_def.time_window_for_partition_key(key_range.end).end, ) assert context.partition_keys == partitions_def.get_partition_keys_in_range(key_range) + context.add_asset_metadata(metadata={"asset_unpartitioned": "yay"}) + context.add_output_metadata(metadata={"output_unpartitioned": "yay"}) + for i, key in enumerate(context.partition_keys): + context.add_asset_metadata(partition_key=key, metadata={"index": i}) @asset(partitions_def=partitions_def, deps=["upstream_asset"]) def downstream_asset(context: AssetExecutionContext) -> None: @@ -709,6 +717,7 @@ def downstream_asset(context: AssetExecutionContext) -> None: assert context.partition_key_range == PartitionKeyRange( start="2020-01-01", end="2020-01-03" ) + context.add_output_metadata(metadata={"unscoped": "yay"}) the_job = define_asset_job("job").resolve( asset_graph=AssetGraph.from_assets([upstream_asset, downstream_asset]) @@ -721,14 +730,27 @@ def downstream_asset(context: AssetExecutionContext) -> None: } ) - assert { - materialization.partition - for materialization in result.asset_materializations_for_node("upstream_asset") - } == {"2020-01-01", "2020-01-02", "2020-01-03"} - assert { - materialization.partition - for materialization in result.asset_materializations_for_node("downstream_asset") - } == {"2020-01-01", "2020-01-02", "2020-01-03"} + upstream_assets = result.asset_materializations_for_node("upstream_asset") + downstream_assets = result.asset_materializations_for_node("downstream_asset") + assert {materialization.partition for materialization in upstream_assets} == { + "2020-01-01", + "2020-01-02", + "2020-01-03", + } + assert {materialization.partition for materialization in downstream_assets} == { + "2020-01-01", + "2020-01-02", + "2020-01-03", + } + + for i, mat in enumerate(upstream_assets): + assert mat.metadata == { + "asset_unpartitioned": TextMetadataValue("yay"), + "output_unpartitioned": TextMetadataValue("yay"), + "index": IntMetadataValue(i), + } + for mat in downstream_assets: + assert mat.metadata == {"unscoped": TextMetadataValue("yay")} def test_multipartition_range_single_run(): @@ -908,3 +930,73 @@ def assets4(): ... partitions_def=partitions_def, ) def assets5(): ... + + +def test_partitioned_asset_metadata(): + @asset(partitions_def=StaticPartitionsDefinition(["a", "b", "c"])) + def partitioned_asset(context: AssetExecutionContext) -> None: + context.add_asset_metadata(metadata={"asset_unpartitioned": "yay"}) + context.add_output_metadata(metadata={"output_unpartitioned": "yay"}) + + context.add_asset_metadata( + asset_key="partitioned_asset", metadata={"asset_key_specified": "yay"} + ) + context.add_output_metadata( + output_name=context.assets_def.get_output_name_for_asset_key(context.assets_def.key), + metadata={"output_name_specified": "yay"}, + ) + + for key in context.partition_keys: + context.add_asset_metadata(partition_key=key, metadata={f"partition_key_{key}": "yay"}) + + with pytest.raises(DagsterInvariantViolationError): + context.add_asset_metadata(metadata={"wont_work": "yay"}, partition_key="nonce") + + with pytest.raises(DagsterInvariantViolationError): + context.add_asset_metadata(metadata={"wont_work": "yay"}, asset_key="nonce") + + with pytest.raises(DagsterInvariantViolationError): + context.add_output_metadata(metadata={"wont_work": "yay"}, output_name="nonce") + + # partition key is valid but not currently being targeted. + with pytest.raises(DagsterInvariantViolationError): + context.add_asset_metadata( + metadata={"wont_work": "yay"}, asset_key="partitioned_asset", partition_key="c" + ) + + with instance_for_test() as instance: + result = materialize( + assets=[partitioned_asset], + instance=instance, + tags={ASSET_PARTITION_RANGE_START_TAG: "a", ASSET_PARTITION_RANGE_END_TAG: "b"}, + ) + assert result.success + + asset_materializations = result.asset_materializations_for_node("partitioned_asset") + assert len(asset_materializations) == 2 + a_mat = next(mat for mat in asset_materializations if mat.partition == "a") + b_mat = next(mat for mat in asset_materializations if mat.partition == "b") + assert a_mat.metadata == { + "asset_unpartitioned": TextMetadataValue("yay"), + "output_unpartitioned": TextMetadataValue("yay"), + "asset_key_specified": TextMetadataValue("yay"), + "output_name_specified": TextMetadataValue("yay"), + "partition_key_a": TextMetadataValue("yay"), + } + assert b_mat.metadata == { + "asset_unpartitioned": TextMetadataValue("yay"), + "output_unpartitioned": TextMetadataValue("yay"), + "asset_key_specified": TextMetadataValue("yay"), + "output_name_specified": TextMetadataValue("yay"), + "partition_key_b": TextMetadataValue("yay"), + } + + output_log = instance.event_log_storage.get_event_records( + event_records_filter=EventRecordsFilter(event_type=DagsterEventType.STEP_OUTPUT) + )[0] + assert check.not_none( + output_log.event_log_entry.dagster_event + ).step_output_data.metadata == { + "output_unpartitioned": TextMetadataValue("yay"), + "output_name_specified": TextMetadataValue("yay"), + } From 917b7afb9b44893182a5e9d124cf6d0ff51f2f8e Mon Sep 17 00:00:00 2001 From: Rhiyo Date: Sat, 21 Dec 2024 03:05:07 +1100 Subject: [PATCH 3/7] Update backfills docs with more up to date info on single runs (#23724) ## Summary & Motivation I noticed the docs of the backfill page for single runs are out of date with 1.8 based on the changes here: https://github.com/dagster-io/dagster/pull/21259 Starting this PR to correct that - the note on the backfill was incorrent in its current form. - potentially should add an example when using with jobs - add an example with multipartitions (Static + Date Range) It seems if you do try and run a job with multiple different backfill polices it will default to the policy with least max_partitions_per_run, failing that it will default to using multi_run(max_partitions_per_run=1) --------- Co-authored-by: Colton Padden --- .../concepts/partitions-schedules-sensors/backfills.mdx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/content/concepts/partitions-schedules-sensors/backfills.mdx b/docs/content/concepts/partitions-schedules-sensors/backfills.mdx index ca2fe6346ce28..4c62003af4d52 100644 --- a/docs/content/concepts/partitions-schedules-sensors/backfills.mdx +++ b/docs/content/concepts/partitions-schedules-sensors/backfills.mdx @@ -42,9 +42,9 @@ By default, if you launch a backfill that covers `N` partitions, Dagster will la Dagster supports backfills that execute as a single run that covers a range of partitions, such as executing a backfill as a single Snowflake query. After the run completes, Dagster will track that all the partitions have been filled. - Single-run backfills only work for backfills that target assets directly, i.e. - those launched from the asset graph or asset page. Backfills launched from the - Job page will not respect the backfill policies of assets included in the job. + Single-run backfills only work if they are launched from the asset graph or + asset page, or if the assets are part of an asset job that shares the same + backfill policy across all included assets. To get this behavior, you need to: From 5831a6ab38d96bb51c7f05d891efa06a4edbe066 Mon Sep 17 00:00:00 2001 From: dwisdom-tk <98771400+dwisdom-tk@users.noreply.github.com> Date: Fri, 20 Dec 2024 08:13:18 -0800 Subject: [PATCH 4/7] [docs] change HTTPS to HTTP in GraphQL API example (#17373) ## Summary & Motivation If I start up the example project locally with `dagster dev` and then try to connect to the GraphQL endpoint with`gql` using the link in the example, I get an `SSL_WRONG_VERSION` error. I also get this error if I use `dagster_graphql.DagsterGraphQLClient` with `use_https=True`. ``` requests.exceptions.SSLError: HTTPSConnectionPool(host='127.0.0.1', port=3000): Max retries exceeded with url: /graphql (Caused by SSLError(SSLError(1, '[SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:992)'))) ``` I figured out that the error means I'm trying to connect to something that doesn't use HTTPS. Here's an example of someone explaining the error: https://github.com/psf/requests/issues/5943#issuecomment-925930558 ## How I Tested These Changes If I change the link to instead use HTTP, `gql` successfully connects and queries the endpoint. `dagster_graphql.DagsterGraphQLClient` works as expected if I set `use_https=False`. --- docs/content/concepts/webserver/graphql.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/concepts/webserver/graphql.mdx b/docs/content/concepts/webserver/graphql.mdx index a5f765e33275a..094aa61bbc502 100644 --- a/docs/content/concepts/webserver/graphql.mdx +++ b/docs/content/concepts/webserver/graphql.mdx @@ -28,7 +28,7 @@ The GraphQL API is served from the webserver. To start the server, run the follo dagster dev ``` -The webserver serves the GraphQL endpoint at the `/graphql` endpoint. If you are running the webserver locally on port 3000, you can access the API at . +The webserver serves the GraphQL endpoint at the `/graphql` endpoint. If you are running the webserver locally on port 3000, you can access the API at . ### Using the GraphQL playground From c79ff2e12c7457c3342d3243d5a5d9d6ccadf19b Mon Sep 17 00:00:00 2001 From: OwenKephart Date: Fri, 20 Dec 2024 11:28:43 -0500 Subject: [PATCH 5/7] [dagster-dbt][refactor] Use AssetOut.from_spec() to create AssetOuts for dbt_assets decorator (#26613) ## Summary & Motivation As title -- prepares us for a world in which the DagsterDbtTranslator just returns an AssetSpec ## How I Tested These Changes ## Changelog NOCHANGELOG --- .../dagster-dbt/dagster_dbt/asset_utils.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py index 68fdaea3e2629..8fa78c1a77812 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py @@ -25,6 +25,7 @@ AssetOut, AssetsDefinition, AssetSelection, + AssetSpec, AutoMaterializePolicy, AutomationCondition, DagsterInvalidDefinitionError, @@ -807,12 +808,9 @@ def build_dbt_multi_asset_args( project=project, ) - outs[output_name] = AssetOut( + spec = AssetSpec( key=asset_key, - dagster_type=Nothing, - io_manager_key=io_manager_key, description=dagster_dbt_translator.get_description(dbt_resource_props), - is_required=False, metadata=metadata, owners=dagster_dbt_translator.get_owners( { @@ -832,6 +830,14 @@ def build_dbt_multi_asset_args( dbt_resource_props ), ) + if io_manager_key: + spec = spec.with_io_manager_key(io_manager_key) + + outs[output_name] = AssetOut.from_spec( + spec=spec, + dagster_type=Nothing, + is_required=False, + ) test_unique_ids = [ child_unique_id From 1c4a630aeaf4bd36ca98a15efda04ecc86345ced Mon Sep 17 00:00:00 2001 From: OwenKephart Date: Fri, 20 Dec 2024 11:32:44 -0500 Subject: [PATCH 6/7] [dagster-dbt] Add get_partitions_def() method to DagsterDbtTranslator (#26625) ## Summary & Motivation As title. This lets individual dbt models have distinct partitions definitions ## How I Tested These Changes ## Changelog NOCHANGELOG --- .../dagster/_core/definitions/asset_out.py | 2 +- .../dagster-dbt/dagster_dbt/asset_utils.py | 4 +-- .../dagster_dbt/dagster_dbt_translator.py | 35 +++++++++++++++++++ .../core/test_asset_decorator.py | 29 +++++++++++++++ 4 files changed, 67 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_out.py b/python_modules/dagster/dagster/_core/definitions/asset_out.py index 8d81cf665bcba..f563344e073eb 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_out.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_out.py @@ -228,7 +228,7 @@ def to_spec( key=key, tags={**additional_tags, **self.tags} if self.tags else additional_tags, deps=[*self._spec.deps, *deps], - partitions_def=partitions_def, + partitions_def=partitions_def if partitions_def is not None else ..., ) @public diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py index 8fa78c1a77812..fa8905edb4e18 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py @@ -829,14 +829,14 @@ def build_dbt_multi_asset_args( automation_condition=dagster_dbt_translator.get_automation_condition( dbt_resource_props ), + partitions_def=dagster_dbt_translator.get_partitions_def(dbt_resource_props), ) - if io_manager_key: - spec = spec.with_io_manager_key(io_manager_key) outs[output_name] = AssetOut.from_spec( spec=spec, dagster_type=Nothing, is_required=False, + io_manager_key=io_manager_key, ) test_unique_ids = [ diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py b/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py index a879588e274d5..adead4ac14639 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py @@ -10,6 +10,7 @@ _check as check, ) from dagster._annotations import experimental, public +from dagster._core.definitions.partition import PartitionsDefinition from dagster._utils.tags import is_valid_tag_key from dagster_dbt.asset_utils import ( @@ -520,6 +521,40 @@ def get_automation_condition(self, dbt_resource_props: Mapping[str, Any]) -> Opt auto_materialize_policy.to_automation_condition() if auto_materialize_policy else None ) + def get_partitions_def( + self, dbt_resource_props: Mapping[str, Any] + ) -> Optional[PartitionsDefinition]: + """[INTERNAL] A function that takes a dictionary representing properties of a dbt resource, and + returns the Dagster :py:class:`dagster.PartitionsDefinition` for that resource. + + This method can be overridden to provide a custom PartitionsDefinition for a dbt resource. + + Args: + dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource. + + Returns: + Optional[PartitionsDefinition]: A Dagster partitions definition. + + Examples: + Set a custom AutomationCondition for dbt resources with a specific tag: + + .. code-block:: python + + from typing import Any, Mapping + + from dagster import DailyPartitionsDefinition + from dagster_dbt import DagsterDbtTranslator + + + class CustomDagsterDbtTranslator(DagsterDbtTranslator): + def get_partitions_def(self, dbt_resource_props: Mapping[str, Any]) -> Optional[PartitionsDefinition]: + if "my_custom_tag" in dbt_resource_props.get("tags", []): + return DailyPartitionsDefinition(start_date="2022-01-01") + else: + return None + """ + return None + @dataclass class DbtManifestWrapper: diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py index 1d5e7998a46a8..87a8028e622ac 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_asset_decorator.py @@ -789,6 +789,35 @@ def my_dbt_assets(): ... ) +def test_with_varying_partitions_defs(test_jaffle_shop_manifest: Dict[str, Any]) -> None: + daily_partitions = DailyPartitionsDefinition(start_date="2023-01-01") + override_keys = {AssetKey("customers"), AssetKey("orders")} + + class CustomDagsterDbtTranslator(DagsterDbtTranslator): + def get_partitions_def( + self, dbt_resource_props: Mapping[str, Any] + ) -> Optional[PartitionsDefinition]: + asset_key = super().get_asset_key(dbt_resource_props) + if asset_key in override_keys: + return daily_partitions + else: + return None + + @dbt_assets( + manifest=test_jaffle_shop_manifest, dagster_dbt_translator=CustomDagsterDbtTranslator() + ) + def my_dbt_assets(): ... + + assert set(my_dbt_assets.keys) > override_keys + + for spec in my_dbt_assets.specs: + partitions_def = spec.partitions_def + if spec.key in override_keys: + assert partitions_def == daily_partitions, spec.key + else: + assert partitions_def is None, spec.key + + def test_dbt_meta_auto_materialize_policy(test_meta_config_manifest: Dict[str, Any]) -> None: expected_auto_materialize_policy = AutoMaterializePolicy.eager() expected_specs_by_key = { From 49c548026339367b08496ee7f5931207cc095f86 Mon Sep 17 00:00:00 2001 From: Nikki Everett Date: Fri, 20 Dec 2024 10:49:08 -0600 Subject: [PATCH 7/7] Update new docs CONTRIBUTING with image directory links (#26637) ## Summary & Motivation Link to old and new docs image directories to help folks migrating content. ## How I Tested These Changes ## Changelog > Insert changelog entry or delete this section. Signed-off-by: nikki everett --- docs/docs-beta/CONTRIBUTING.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/docs-beta/CONTRIBUTING.md b/docs/docs-beta/CONTRIBUTING.md index 3d446d91f5e60..e855a1e52f8d6 100644 --- a/docs/docs-beta/CONTRIBUTING.md +++ b/docs/docs-beta/CONTRIBUTING.md @@ -6,6 +6,12 @@ There are some features in the previous docs that require changes to be made to ### Images +#### Location + +Old images are in the [/docs/next/public/images](https://github.com/dagster-io/dagster/tree/master/docs/next/public/images) directory. You will need to copy them to [/docs/docs-beta/static/images](https://github.com/dagster-io/dagster/tree/master/docs/docs-beta/static/images). + +#### Formatting + Before: ```