Skip to content

Commit

Permalink
cp
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Aug 22, 2024
1 parent f035e78 commit 62a174b
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 40 deletions.
10 changes: 4 additions & 6 deletions docs/content/integrations/embedded-elt/dlt.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,11 @@ That said, here is an example of using static named partitions from a dlt source
from typing import Iterable, Optional
import dlt
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
from dagster_embedded_elt.dlt import DagsterDltResource
from dagster_embedded_elt.dlt.computation import ComputationContext
from dagster_embedded_elt.dlt.dlt_computation import RunDlt
from dagster import AssetExecutionContext, StaticPartitionsDefinition
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.result import AssetResult
from dagster import MaterializeResult, StaticPartitionsDefinition
color_partitions = StaticPartitionsDefinition(["red", "green", "blue"])
Expand All @@ -353,9 +351,9 @@ dlt_pipeline = dlt.pipeline(
class PartitionedRunDlt(RunDlt):
def stream(self, context: ComputationContext) -> Iterable:
def stream(self, context: ComputationContext) -> Iterable[MaterializeResult]:
color = context.partition_key
yield from DagsterDltResource().run(
yield from DagsterDltResource().stream(
context=context, dlt_source=example_dlt_source(color=color)
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from typing import Iterable, Optional

import dlt
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
from dagster_embedded_elt.dlt import DagsterDltResource
from dagster_embedded_elt.dlt.computation import ComputationContext
from dagster_embedded_elt.dlt.dlt_computation import RunDlt

from dagster import AssetExecutionContext, StaticPartitionsDefinition
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.result import AssetResult
from dagster import MaterializeResult, StaticPartitionsDefinition

color_partitions = StaticPartitionsDefinition(["red", "green", "blue"])

Expand All @@ -32,9 +30,9 @@ def load_colors():


class PartitionedRunDlt(RunDlt):
def stream(self, context: ComputationContext) -> Iterable:
def stream(self, context: ComputationContext) -> Iterable[MaterializeResult]:
color = context.partition_key
yield from DagsterDltResource().run(
yield from DagsterDltResource().stream(
context=context, dlt_source=example_dlt_source(color=color)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
from typing import Iterable, Optional, Union

from dagster import (
AssetKey,
AssetSpec,
MaterializeResult,
_check as check,
)
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.result import AssetResult
from typing import Iterable, Optional

from dagster import AssetKey, AssetSpec, MaterializeResult
from dlt.extract.resource import DltResource
from dlt.extract.source import DltSource
from dlt.pipeline.pipeline import Pipeline

from dagster_embedded_elt.dlt.translator import DagsterDltTranslator

from .computation import Computation, ComputationContext, Specs, SpecsArg
from .constants import META_KEY_PIPELINE, META_KEY_SOURCE, META_KEY_TRANSLATOR
from .resource import DagsterDltResource
Expand Down Expand Up @@ -127,16 +118,6 @@ def __init__(
self.dlt_pipeline = dlt_pipeline
super().__init__(name=name, specs=specs or self.default_specs(dlt_source, dlt_pipeline))

def stream(self, context: ComputationContext) -> Iterable[Union[AssetResult, AssetCheckResult]]:
def stream(self, context: ComputationContext) -> Iterable[MaterializeResult]:
dlt = DagsterDltResource()
for result in dlt.run(
context=context.to_asset_execution_context(),
dlt_source=self.dlt_source,
# provide dummy instance of this to avoid spurious exception
dagster_dlt_translator=DagsterDltTranslator(),
):
yield check.inst(
result,
MaterializeResult,
"Only MaterializeResult is supported since dlt is in an asset computation",
)
yield from dlt.stream(context, self.dlt_source, self.dlt_pipeline)
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,28 @@ def extract_resource_metadata(

return base_metadata

# computation-friendly api
def stream(
self,
context: Union[OpExecutionContext, AssetExecutionContext, ComputationContext],
dlt_source: Optional[DltSource] = None,
dlt_pipeline: Optional[Pipeline] = None,
**kwargs,
) -> Iterator[MaterializeResult]:
for r in self.run(
context=context.to_asset_execution_context()
if isinstance(context, ComputationContext)
else context,
dlt_source=dlt_source,
dlt_pipeline=dlt_pipeline,
**kwargs,
):
yield check.inst(r, MaterializeResult)

@public
def run(
self,
context: Union[OpExecutionContext, AssetExecutionContext, ComputationContext],
context: Union[OpExecutionContext, AssetExecutionContext],
dlt_source: Optional[DltSource] = None,
dlt_pipeline: Optional[Pipeline] = None,
dagster_dlt_translator: Optional[DagsterDltTranslator] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ def test_get_materialize_policy(dlt_pipeline: Pipeline):
dlt_source=pipeline(),
dlt_pipeline=dlt_pipeline,
specs=RunDlt.default_specs(dlt_source=pipeline(), dlt_pipeline=dlt_pipeline).replace(
automation_condition=AutoMaterializePolicy.eager().with_rules(
AutoMaterializeRule.materialize_on_cron("0 1 * * *")
).to_automation_condition()
automation_condition=AutoMaterializePolicy.eager()
.with_rules(AutoMaterializeRule.materialize_on_cron("0 1 * * *"))
.to_automation_condition()
),
).assets_def

Expand Down

0 comments on commit 62a174b

Please sign in to comment.