From f035e780e2a2c7d076167eef8c9dbff6a507ef79 Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Thu, 22 Aug 2024 11:33:42 -0700 Subject: [PATCH] cp --- .../content/integrations/embedded-elt/dlt.mdx | 86 ++++++++++--------- .../dagster_embedded_elt/dlt/computation.py | 3 + .../dagster_embedded_elt/dlt/constants.py | 1 - .../dlt/dlt_computation.py | 2 +- .../dlt_tests/test_asset_decorator.py | 4 +- 5 files changed, 51 insertions(+), 45 deletions(-) diff --git a/docs/content/integrations/embedded-elt/dlt.mdx b/docs/content/integrations/embedded-elt/dlt.mdx index eb90a2779c010..7573863a99c18 100644 --- a/docs/content/integrations/embedded-elt/dlt.mdx +++ b/docs/content/integrations/embedded-elt/dlt.mdx @@ -237,16 +237,8 @@ The For example, to change how the name of the asset is derived, you can override the method, or if you would like to change the key of the upstream source asset, you can override the method. ```python file=/integrations/embedded_elt/dlt_dagster_translator.py -from collections.abc import Iterable - import dlt -from dagster_embedded_elt.dlt import ( - DagsterDltResource, - DagsterDltTranslator, - dlt_assets, -) - -from dagster import AssetExecutionContext, AssetKey +from dagster_embedded_elt.dlt.dlt_computation import RunDlt @dlt.source @@ -256,29 +248,26 @@ def example_dlt_source(): return example_resource -class CustomDagsterDltTranslator(DagsterDltTranslator): - def get_asset_key(self, resource: DagsterDltResource) -> AssetKey: - """Overrides asset key to be the dlt resource name.""" - return AssetKey(f"{resource.name}") - - def get_deps_asset_keys(self, resource: DagsterDltResource) -> Iterable[AssetKey]: - """Overrides upstream asset key to be a single source asset.""" - return [AssetKey("common_upstream_dlt_dependency")] - - -@dlt_assets( +dlt_source = example_dlt_source() +dlt_pipeline = dlt.pipeline( + pipeline_name="example_pipeline_name", + dataset_name="example_dataset_name", + destination="snowflake", + progress="log", +) +source_asset_key = "common_upstream_dlt_dependency" +RunDlt( name="example_dlt_assets", - dlt_source=example_dlt_source(), - dlt_pipeline=dlt.pipeline( - pipeline_name="example_pipeline_name", - dataset_name="example_dataset_name", - destination="snowflake", - progress="log", - ), - dagster_dlt_translator=CustomDagsterDltTranslator(), + dlt_source=dlt_source, + dlt_pipeline=dlt_pipeline, + specs=[ + RunDlt.default_spec(dlt_source, dlt_pipeline, dlt_resource)._replace( + key=dlt_resource.name, # overrides asset key to be resource name + deps=[source_asset_key], # overrides upstream to be single source asset + ) + for dlt_resource in dlt_source.resources.values() + ], ) -def dlt_example_assets(context: AssetExecutionContext, dlt: DagsterDltResource): - yield from dlt.run(context=context) ``` In this example, we customized the translator to change how the dlt assets' names are defined. We also hard-coded the asset dependency upstream of our assets to provide a fan-out model from a single dependency to our dlt assets. @@ -330,12 +319,16 @@ While still an experimental feature, it is possible to use partitions within you That said, here is an example of using static named partitions from a dlt source. ```python file=/integrations/embedded_elt/dlt_partitions.py -from typing import Optional +from typing import Iterable, Optional import dlt from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets +from dagster_embedded_elt.dlt.computation import ComputationContext +from dagster_embedded_elt.dlt.dlt_computation import RunDlt from dagster import AssetExecutionContext, StaticPartitionsDefinition +from dagster._core.definitions.asset_check_result import AssetCheckResult +from dagster._core.definitions.result import AssetResult color_partitions = StaticPartitionsDefinition(["red", "green", "blue"]) @@ -351,19 +344,30 @@ def example_dlt_source(color: Optional[str] = None): ... -@dlt_assets( - dlt_source=example_dlt_source(), +dlt_source = example_dlt_source() +dlt_pipeline = dlt.pipeline( + pipeline_name="example_pipeline_name", + dataset_name="example_dataset_name", + destination="snowflake", +) + + +class PartitionedRunDlt(RunDlt): + def stream(self, context: ComputationContext) -> Iterable: + color = context.partition_key + yield from DagsterDltResource().run( + context=context, dlt_source=example_dlt_source(color=color) + ) + + +PartitionedRunDlt( name="example_dlt_assets", - dlt_pipeline=dlt.pipeline( - pipeline_name="example_pipeline_name", - dataset_name="example_dataset_name", - destination="snowflake", + dlt_source=dlt_source, + dlt_pipeline=dlt_pipeline, + specs=RunDlt.default_specs(dlt_source, dlt_pipeline).replace( + partitions_def=color_partitions ), - partitions_def=color_partitions, ) -def compute(context: AssetExecutionContext, dlt: DagsterDltResource): - color = context.partition_key - yield from dlt.run(context=context, dlt_source=example_dlt_source(color=color)) ``` ## What's next? diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/computation.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/computation.py index 35d998aad209d..f31d516388124 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/computation.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/computation.py @@ -18,6 +18,7 @@ from typing_extensions import TypeAlias +# Helper class to do group operations to a collection of specs class Specs: def __init__(self, specs: Sequence[Union[AssetSpec, AssetCheckSpec]]): self.specs = list( @@ -54,6 +55,8 @@ def to_asset_specs(self) -> List[AssetSpec]: return [spec for spec in self.specs if isinstance(spec, AssetSpec)] +# This might appear like overkill right now, but this would be our opportunity +# to have a sane context api class ComputationContext: def __init__(self, context: AssetExecutionContext): self._ae_context = context diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/constants.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/constants.py index 4ad4c9c59449f..1355ea29eadbf 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/constants.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/constants.py @@ -1,4 +1,3 @@ META_KEY_SOURCE = "dagster_dlt/source" -META_KEY_SOURCE_RESOURCE = "dagster_dlt/source_resource" META_KEY_PIPELINE = "dagster_dlt/pipeline" META_KEY_TRANSLATOR = "dagster_dlt/translator" diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/dlt_computation.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/dlt_computation.py index cfbaebc6d59b2..9d225bc413f2a 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/dlt_computation.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/dlt_computation.py @@ -132,7 +132,7 @@ def stream(self, context: ComputationContext) -> Iterable[Union[AssetResult, Ass for result in dlt.run( context=context.to_asset_execution_context(), dlt_source=self.dlt_source, - # provide dummy instance of this + # provide dummy instance of this to avoid spurious exception dagster_dlt_translator=DagsterDltTranslator(), ): yield check.inst( diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py index ef1d1882e7894..ca2bb28c50031 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py @@ -125,9 +125,9 @@ def test_get_materialize_policy(dlt_pipeline: Pipeline): dlt_source=pipeline(), dlt_pipeline=dlt_pipeline, specs=RunDlt.default_specs(dlt_source=pipeline(), dlt_pipeline=dlt_pipeline).replace( - auto_materialize_policy=AutoMaterializePolicy.eager().with_rules( + automation_condition=AutoMaterializePolicy.eager().with_rules( AutoMaterializeRule.materialize_on_cron("0 1 * * *") - ) + ).to_automation_condition() ), ).assets_def