Skip to content

Commit

Permalink
cp
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Aug 22, 2024
1 parent e8ce462 commit f035e78
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 45 deletions.
86 changes: 45 additions & 41 deletions docs/content/integrations/embedded-elt/dlt.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,8 @@ The <PyObject module="dagster_embedded_elt.dlt" object="DagsterDltTranslator" />
For example, to change how the name of the asset is derived, you can override the <PyObject module="dagster_embedded_elt.dlt" object="DagsterDltTranslator" method="get_asset_key" /> method, or if you would like to change the key of the upstream source asset, you can override the <PyObject module="dagster_embedded_elt.dlt" object="DagsterDltTranslator" method="get_deps_assets_keys" /> method.
```python file=/integrations/embedded_elt/dlt_dagster_translator.py
from collections.abc import Iterable
import dlt
from dagster_embedded_elt.dlt import (
DagsterDltResource,
DagsterDltTranslator,
dlt_assets,
)
from dagster import AssetExecutionContext, AssetKey
from dagster_embedded_elt.dlt.dlt_computation import RunDlt
@dlt.source
Expand All @@ -256,29 +248,26 @@ def example_dlt_source():
return example_resource
class CustomDagsterDltTranslator(DagsterDltTranslator):
def get_asset_key(self, resource: DagsterDltResource) -> AssetKey:
"""Overrides asset key to be the dlt resource name."""
return AssetKey(f"{resource.name}")
def get_deps_asset_keys(self, resource: DagsterDltResource) -> Iterable[AssetKey]:
"""Overrides upstream asset key to be a single source asset."""
return [AssetKey("common_upstream_dlt_dependency")]
@dlt_assets(
dlt_source = example_dlt_source()
dlt_pipeline = dlt.pipeline(
pipeline_name="example_pipeline_name",
dataset_name="example_dataset_name",
destination="snowflake",
progress="log",
)
source_asset_key = "common_upstream_dlt_dependency"
RunDlt(
name="example_dlt_assets",
dlt_source=example_dlt_source(),
dlt_pipeline=dlt.pipeline(
pipeline_name="example_pipeline_name",
dataset_name="example_dataset_name",
destination="snowflake",
progress="log",
),
dagster_dlt_translator=CustomDagsterDltTranslator(),
dlt_source=dlt_source,
dlt_pipeline=dlt_pipeline,
specs=[
RunDlt.default_spec(dlt_source, dlt_pipeline, dlt_resource)._replace(
key=dlt_resource.name, # overrides asset key to be resource name
deps=[source_asset_key], # overrides upstream to be single source asset
)
for dlt_resource in dlt_source.resources.values()
],
)
def dlt_example_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
yield from dlt.run(context=context)
```
In this example, we customized the translator to change how the dlt assets' names are defined. We also hard-coded the asset dependency upstream of our assets to provide a fan-out model from a single dependency to our dlt assets.
Expand Down Expand Up @@ -330,12 +319,16 @@ While still an experimental feature, it is possible to use partitions within you
That said, here is an example of using static named partitions from a dlt source.
```python file=/integrations/embedded_elt/dlt_partitions.py
from typing import Optional
from typing import Iterable, Optional
import dlt
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
from dagster_embedded_elt.dlt.computation import ComputationContext
from dagster_embedded_elt.dlt.dlt_computation import RunDlt
from dagster import AssetExecutionContext, StaticPartitionsDefinition
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.result import AssetResult
color_partitions = StaticPartitionsDefinition(["red", "green", "blue"])
Expand All @@ -351,19 +344,30 @@ def example_dlt_source(color: Optional[str] = None):
...
@dlt_assets(
dlt_source=example_dlt_source(),
dlt_source = example_dlt_source()
dlt_pipeline = dlt.pipeline(
pipeline_name="example_pipeline_name",
dataset_name="example_dataset_name",
destination="snowflake",
)
class PartitionedRunDlt(RunDlt):
def stream(self, context: ComputationContext) -> Iterable:
color = context.partition_key
yield from DagsterDltResource().run(
context=context, dlt_source=example_dlt_source(color=color)
)
PartitionedRunDlt(
name="example_dlt_assets",
dlt_pipeline=dlt.pipeline(
pipeline_name="example_pipeline_name",
dataset_name="example_dataset_name",
destination="snowflake",
dlt_source=dlt_source,
dlt_pipeline=dlt_pipeline,
specs=RunDlt.default_specs(dlt_source, dlt_pipeline).replace(
partitions_def=color_partitions
),
partitions_def=color_partitions,
)
def compute(context: AssetExecutionContext, dlt: DagsterDltResource):
color = context.partition_key
yield from dlt.run(context=context, dlt_source=example_dlt_source(color=color))
```
## What's next?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing_extensions import TypeAlias


# Helper class to do group operations to a collection of specs
class Specs:
def __init__(self, specs: Sequence[Union[AssetSpec, AssetCheckSpec]]):
self.specs = list(
Expand Down Expand Up @@ -54,6 +55,8 @@ def to_asset_specs(self) -> List[AssetSpec]:
return [spec for spec in self.specs if isinstance(spec, AssetSpec)]


# This might appear like overkill right now, but this would be our opportunity
# to have a sane context api
class ComputationContext:
def __init__(self, context: AssetExecutionContext):
self._ae_context = context
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
META_KEY_SOURCE = "dagster_dlt/source"
META_KEY_SOURCE_RESOURCE = "dagster_dlt/source_resource"
META_KEY_PIPELINE = "dagster_dlt/pipeline"
META_KEY_TRANSLATOR = "dagster_dlt/translator"
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def stream(self, context: ComputationContext) -> Iterable[Union[AssetResult, Ass
for result in dlt.run(
context=context.to_asset_execution_context(),
dlt_source=self.dlt_source,
# provide dummy instance of this
# provide dummy instance of this to avoid spurious exception
dagster_dlt_translator=DagsterDltTranslator(),
):
yield check.inst(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ def test_get_materialize_policy(dlt_pipeline: Pipeline):
dlt_source=pipeline(),
dlt_pipeline=dlt_pipeline,
specs=RunDlt.default_specs(dlt_source=pipeline(), dlt_pipeline=dlt_pipeline).replace(
auto_materialize_policy=AutoMaterializePolicy.eager().with_rules(
automation_condition=AutoMaterializePolicy.eager().with_rules(
AutoMaterializeRule.materialize_on_cron("0 1 * * *")
)
).to_automation_condition()
),
).assets_def

Expand Down

0 comments on commit f035e78

Please sign in to comment.