From 62a174bcd5400000c5257566a69f20d5affef662 Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Thu, 22 Aug 2024 11:57:08 -0700 Subject: [PATCH] cp --- .../content/integrations/embedded-elt/dlt.mdx | 10 +++---- .../embedded_elt/dlt_partitions.py | 10 +++---- .../dlt/dlt_computation.py | 29 ++++--------------- .../dagster_embedded_elt/dlt/resource.py | 20 ++++++++++++- .../dlt_tests/test_asset_decorator.py | 6 ++-- 5 files changed, 35 insertions(+), 40 deletions(-) diff --git a/docs/content/integrations/embedded-elt/dlt.mdx b/docs/content/integrations/embedded-elt/dlt.mdx index 7573863a99c18..2c831dadc4a67 100644 --- a/docs/content/integrations/embedded-elt/dlt.mdx +++ b/docs/content/integrations/embedded-elt/dlt.mdx @@ -322,13 +322,11 @@ That said, here is an example of using static named partitions from a dlt source from typing import Iterable, Optional import dlt -from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets +from dagster_embedded_elt.dlt import DagsterDltResource from dagster_embedded_elt.dlt.computation import ComputationContext from dagster_embedded_elt.dlt.dlt_computation import RunDlt -from dagster import AssetExecutionContext, StaticPartitionsDefinition -from dagster._core.definitions.asset_check_result import AssetCheckResult -from dagster._core.definitions.result import AssetResult +from dagster import MaterializeResult, StaticPartitionsDefinition color_partitions = StaticPartitionsDefinition(["red", "green", "blue"]) @@ -353,9 +351,9 @@ dlt_pipeline = dlt.pipeline( class PartitionedRunDlt(RunDlt): - def stream(self, context: ComputationContext) -> Iterable: + def stream(self, context: ComputationContext) -> Iterable[MaterializeResult]: color = context.partition_key - yield from DagsterDltResource().run( + yield from DagsterDltResource().stream( context=context, dlt_source=example_dlt_source(color=color) ) diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/dlt_partitions.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/dlt_partitions.py index 82fad51b9484c..34872f9ae3f65 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/dlt_partitions.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/dlt_partitions.py @@ -1,13 +1,11 @@ from typing import Iterable, Optional import dlt -from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets +from dagster_embedded_elt.dlt import DagsterDltResource from dagster_embedded_elt.dlt.computation import ComputationContext from dagster_embedded_elt.dlt.dlt_computation import RunDlt -from dagster import AssetExecutionContext, StaticPartitionsDefinition -from dagster._core.definitions.asset_check_result import AssetCheckResult -from dagster._core.definitions.result import AssetResult +from dagster import MaterializeResult, StaticPartitionsDefinition color_partitions = StaticPartitionsDefinition(["red", "green", "blue"]) @@ -32,9 +30,9 @@ def load_colors(): class PartitionedRunDlt(RunDlt): - def stream(self, context: ComputationContext) -> Iterable: + def stream(self, context: ComputationContext) -> Iterable[MaterializeResult]: color = context.partition_key - yield from DagsterDltResource().run( + yield from DagsterDltResource().stream( context=context, dlt_source=example_dlt_source(color=color) ) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/dlt_computation.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/dlt_computation.py index 9d225bc413f2a..ce283231a80e6 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/dlt_computation.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/dlt_computation.py @@ -1,19 +1,10 @@ -from typing import Iterable, Optional, Union - -from dagster import ( - AssetKey, - AssetSpec, - MaterializeResult, - _check as check, -) -from dagster._core.definitions.asset_check_result import AssetCheckResult -from dagster._core.definitions.result import AssetResult +from typing import Iterable, Optional + +from dagster import AssetKey, AssetSpec, MaterializeResult from dlt.extract.resource import DltResource from dlt.extract.source import DltSource from dlt.pipeline.pipeline import Pipeline -from dagster_embedded_elt.dlt.translator import DagsterDltTranslator - from .computation import Computation, ComputationContext, Specs, SpecsArg from .constants import META_KEY_PIPELINE, META_KEY_SOURCE, META_KEY_TRANSLATOR from .resource import DagsterDltResource @@ -127,16 +118,6 @@ def __init__( self.dlt_pipeline = dlt_pipeline super().__init__(name=name, specs=specs or self.default_specs(dlt_source, dlt_pipeline)) - def stream(self, context: ComputationContext) -> Iterable[Union[AssetResult, AssetCheckResult]]: + def stream(self, context: ComputationContext) -> Iterable[MaterializeResult]: dlt = DagsterDltResource() - for result in dlt.run( - context=context.to_asset_execution_context(), - dlt_source=self.dlt_source, - # provide dummy instance of this to avoid spurious exception - dagster_dlt_translator=DagsterDltTranslator(), - ): - yield check.inst( - result, - MaterializeResult, - "Only MaterializeResult is supported since dlt is in an asset computation", - ) + yield from dlt.stream(context, self.dlt_source, self.dlt_pipeline) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/resource.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/resource.py index c3c0462bbfc5d..ebda44aa107e4 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/resource.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/dlt/resource.py @@ -112,10 +112,28 @@ def extract_resource_metadata( return base_metadata + # computation-friendly api + def stream( + self, + context: Union[OpExecutionContext, AssetExecutionContext, ComputationContext], + dlt_source: Optional[DltSource] = None, + dlt_pipeline: Optional[Pipeline] = None, + **kwargs, + ) -> Iterator[MaterializeResult]: + for r in self.run( + context=context.to_asset_execution_context() + if isinstance(context, ComputationContext) + else context, + dlt_source=dlt_source, + dlt_pipeline=dlt_pipeline, + **kwargs, + ): + yield check.inst(r, MaterializeResult) + @public def run( self, - context: Union[OpExecutionContext, AssetExecutionContext, ComputationContext], + context: Union[OpExecutionContext, AssetExecutionContext], dlt_source: Optional[DltSource] = None, dlt_pipeline: Optional[Pipeline] = None, dagster_dlt_translator: Optional[DagsterDltTranslator] = None, diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py index ca2bb28c50031..9a52561a60d38 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/dlt_tests/test_asset_decorator.py @@ -125,9 +125,9 @@ def test_get_materialize_policy(dlt_pipeline: Pipeline): dlt_source=pipeline(), dlt_pipeline=dlt_pipeline, specs=RunDlt.default_specs(dlt_source=pipeline(), dlt_pipeline=dlt_pipeline).replace( - automation_condition=AutoMaterializePolicy.eager().with_rules( - AutoMaterializeRule.materialize_on_cron("0 1 * * *") - ).to_automation_condition() + automation_condition=AutoMaterializePolicy.eager() + .with_rules(AutoMaterializeRule.materialize_on_cron("0 1 * * *")) + .to_automation_condition() ), ).assets_def