Skip to content

Commit

Permalink
Merge branch 'master' into nikki/docs/guides-structure
Browse files Browse the repository at this point in the history
  • Loading branch information
neverett committed Dec 20, 2024
2 parents f65a6cb + 49c5480 commit 7775cab
Show file tree
Hide file tree
Showing 15 changed files with 588 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ By default, if you launch a backfill that covers `N` partitions, Dagster will la
Dagster supports backfills that execute as a single run that covers a range of partitions, such as executing a backfill as a single Snowflake query. After the run completes, Dagster will track that all the partitions have been filled.

<Note>
Single-run backfills only work for backfills that target assets directly, i.e.
those launched from the asset graph or asset page. Backfills launched from the
Job page will not respect the backfill policies of assets included in the job.
Single-run backfills only work if they are launched from the asset graph or
asset page, or if the assets are part of an asset job that shares the same
backfill policy across all included assets.
</Note>

To get this behavior, you need to:
Expand Down
2 changes: 1 addition & 1 deletion docs/content/concepts/webserver/graphql.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ The GraphQL API is served from the webserver. To start the server, run the follo
dagster dev
```

The webserver serves the GraphQL endpoint at the `/graphql` endpoint. If you are running the webserver locally on port 3000, you can access the API at <https://localhost:3000/graphql>.
The webserver serves the GraphQL endpoint at the `/graphql` endpoint. If you are running the webserver locally on port 3000, you can access the API at <http://localhost:3000/graphql>.

### Using the GraphQL playground

Expand Down
6 changes: 6 additions & 0 deletions docs/docs-beta/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ There are some features in the previous docs that require changes to be made to

### Images

#### Location

Old images are in the [/docs/next/public/images](https://github.com/dagster-io/dagster/tree/master/docs/next/public/images) directory. You will need to copy them to [/docs/docs-beta/static/images](https://github.com/dagster-io/dagster/tree/master/docs/docs-beta/static/images).

#### Formatting

Before:

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def to_spec(
key=key,
tags={**additional_tags, **self.tags} if self.tags else additional_tags,
deps=[*self._spec.deps, *deps],
partitions_def=partitions_def,
partitions_def=partitions_def if partitions_def is not None else ...,
)

@public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.data_version import DataProvenance, DataVersion
from dagster._core.definitions.dependency import Node, NodeHandle
from dagster._core.definitions.events import AssetKey, UserEvent
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey, UserEvent
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.op_definition import OpDefinition
from dagster._core.definitions.partition import PartitionsDefinition
Expand Down Expand Up @@ -459,15 +459,91 @@ def add_output_metadata(
mapping_key: Optional[str] = None,
) -> None:
return self.op_execution_context.add_output_metadata(
metadata=metadata, output_name=output_name, mapping_key=mapping_key
metadata=metadata,
output_name=output_name,
mapping_key=mapping_key,
)

@public
def add_asset_metadata(
self,
metadata: Mapping[str, Any],
asset_key: Optional[CoercibleToAssetKey] = None,
partition_key: Optional[str] = None,
) -> None:
"""Add metadata to an asset materialization event. This metadata will be
available in the Dagster UI.
Args:
metadata (Mapping[str, Any]): The metadata to add to the asset
materialization event.
asset_key (Optional[CoercibleToAssetKey]): The asset key to add metadata to.
Does not need to be provided if only one asset is currently being
materialized.
partition_key (Optional[str]): The partition key to add metadata to, if
applicable. Should not be provided on non-partitioned assets. If not
provided on a partitioned asset, the metadata will be added to all
partitions of the asset currently being materialized.
Examples:
Adding metadata to the asset materialization event for a single asset:
.. code-block:: python
import dagster as dg
@dg.asset
def my_asset(context):
# Add metadata
context.add_asset_metadata({"key": "value"})
Adding metadata to the asset materialization event for a particular partition of a partitioned asset:
.. code-block:: python
import dagster as dg
@dg.asset(partitions_def=dg.StaticPartitionsDefinition(["a", "b"]))
def my_asset(context):
# Adds metadata to all partitions currently being materialized, since no
# partition is specified.
context.add_asset_metadata({"key": "value"})
for partition_key in context.partition_keys:
# Add metadata only to the event for partition "a"
if partition_key == "a":
context.add_asset_metadata({"key": "value"}, partition_key=partition_key)
Adding metadata to the asset materialization event for a particular asset in a multi-asset.
.. code-block:: python
import dagster as dg
@dg.multi_asset(specs=[dg.AssetSpec("asset1"), dg.AssetSpec("asset2")])
def my_multi_asset(context):
# Add metadata to the materialization event for "asset1"
context.add_asset_metadata({"key": "value"}, asset_key="asset1")
# THIS line will fail since asset key is not specified:
context.add_asset_metadata({"key": "value"})
"""
self._step_execution_context.add_asset_metadata(
metadata=metadata,
asset_key=asset_key,
partition_key=partition_key,
)

@_copy_docs_from_op_execution_context
def get_output_metadata(
self, output_name: str, mapping_key: Optional[str] = None
self,
output_name: str,
mapping_key: Optional[str] = None,
) -> Optional[Mapping[str, Any]]:
return self.op_execution_context.get_output_metadata(
output_name=output_name, mapping_key=mapping_key
output_name=output_name,
mapping_key=mapping_key,
)

#### asset check related
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,20 @@ def partition_key(self) -> str:
return self._partition_key
check.failed("Tried to access partition_key for a non-partitioned run")

@property
def partition_keys(self) -> Sequence[str]:
key_range = self.partition_key_range
partitions_def = self.assets_def.partitions_def
if partitions_def is None:
raise DagsterInvariantViolationError(
"Cannot access partition_keys for a non-partitioned run"
)

return partitions_def.get_partition_keys_in_range(
key_range,
dynamic_partitions_store=self.instance,
)

@property
def partition_key_range(self) -> PartitionKeyRange:
"""The range of partition keys for the current run.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from typing import Any, Mapping, Optional, Union

from dagster._core.definitions.asset_key import AssetKey
from dagster._record import record
from dagster._utils.merger import merge_dicts


@record
class OutputMetadataHandle:
output_name: str
mapping_key: Optional[str]


@record
class AssetMetadataHandle:
asset_key: AssetKey
partition_key: Optional[str]


@record
class OutputMetadataAccumulator:
per_output_metadata: Mapping[
Union[OutputMetadataHandle, AssetMetadataHandle], Mapping[str, Any]
]

@staticmethod
def empty() -> "OutputMetadataAccumulator":
return OutputMetadataAccumulator(per_output_metadata={})

def get_output_metadata(
self, output_name: str, mapping_key: Optional[str]
) -> Mapping[str, Any]:
handle = OutputMetadataHandle(
output_name=output_name,
mapping_key=mapping_key,
)
return self.per_output_metadata.get(handle, {})

def get_asset_metadata(
self, asset_key: AssetKey, partition_key: Optional[str]
) -> Mapping[str, Any]:
handle = AssetMetadataHandle(
asset_key=asset_key,
partition_key=partition_key,
)
return self.per_output_metadata.get(handle, {})

def with_additional_output_metadata(
self,
output_name: str,
mapping_key: Optional[str],
metadata: Mapping[str, Any],
) -> "OutputMetadataAccumulator":
return self._with_metadata(
handle=OutputMetadataHandle(
output_name=output_name,
mapping_key=mapping_key,
),
metadata=metadata,
)

def _with_metadata(
self, handle: Union[OutputMetadataHandle, AssetMetadataHandle], metadata: Mapping[str, Any]
) -> "OutputMetadataAccumulator":
return OutputMetadataAccumulator(
per_output_metadata=merge_dicts(
self.per_output_metadata,
{handle: merge_dicts(self.per_output_metadata.get(handle, {}), metadata)},
)
)

def with_additional_asset_metadata(
self,
asset_key: AssetKey,
partition_key: Optional[str],
metadata: Mapping[str, Any],
) -> "OutputMetadataAccumulator":
return self._with_metadata(
handle=AssetMetadataHandle(
asset_key=asset_key,
partition_key=partition_key,
),
metadata=metadata,
)
Loading

0 comments on commit 7775cab

Please sign in to comment.