Skip to content

Commit

Permalink
[embedded-elt][sling] e2e test coverage Sling translator calls in Ass…
Browse files Browse the repository at this point in the history
…etSpec (#20545)
  • Loading branch information
cmpadden authored Mar 19, 2024
1 parent f35fe4f commit f3640de
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,14 @@ def my_assets(context, sling: SlingResource):
AssetSpec(
key=dagster_sling_translator.get_asset_key(stream),
deps=dagster_sling_translator.get_deps_asset_key(stream),
group_name=dagster_sling_translator.get_group_name(stream),
description=dagster_sling_translator.get_description(stream),
code_version=code_version,
metadata=dagster_sling_translator.get_metadata(stream),
group_name=dagster_sling_translator.get_group_name(stream),
freshness_policy=dagster_sling_translator.get_freshness_policy(stream),
auto_materialize_policy=dagster_sling_translator.get_auto_materialize_policy(
stream
),
code_version=code_version,
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@ streams:
deps:
- foo_one
- foo_two
group: group_2
freshness_policy:
maximum_lag_minutes: 0
cron_schedule: "5 4 * * *"
cron_schedule_timezone: UTC
public."Transactions":
mode: incremental # overwrite default mode
primary_key: id
update_key: last_updated_at
meta:
dagster:
description: Example Description!
auto_materialize_policy: true

public.all_users:
sql: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
import sqlite3

import pytest
from dagster import AssetKey, file_relative_path
from dagster import (
AssetKey,
FreshnessPolicy,
JsonMetadataValue,
file_relative_path,
)
from dagster._core.definitions.materialize import materialize
from dagster_embedded_elt.sling import (
SlingReplicationParam,
Expand Down Expand Up @@ -97,3 +102,97 @@ def my_other_assets(): ...
def my_third_sling_assets(): ...

assert my_third_sling_assets.op.name == "custom_name"


def test_base_with_meta_config_translator():
@sling_assets(
replication_config=file_relative_path(
__file__, "replication_configs/base_with_meta_config/replication.yaml"
)
)
def my_sling_assets(): ...

assert my_sling_assets.keys == {
AssetKey(["target", "public", "all_users"]),
AssetKey(["target", "public", "accounts"]),
AssetKey(["target", "public", "transactions"]),
AssetKey(["target", "departments"]),
}

assert my_sling_assets.asset_deps == {
AssetKey(["target", "public", "accounts"]): {AssetKey(["public", "accounts"])},
AssetKey(["target", "departments"]): {AssetKey(["foo_one"]), AssetKey(["foo_two"])},
AssetKey(["target", "public", "transactions"]): {AssetKey(["public", "transactions"])},
AssetKey(["target", "public", "all_users"]): {AssetKey(["public", "all_users"])},
}

assert my_sling_assets.descriptions_by_key == {
AssetKey(["target", "public", "transactions"]): "Example Description!",
AssetKey(
["target", "public", "all_users"]
): 'select all_user_id, name \nfrom public."all_Users"\n',
}

assert my_sling_assets.metadata_by_key == {
AssetKey(["target", "public", "accounts"]): {"stream_config": JsonMetadataValue(data=None)},
AssetKey(["target", "departments"]): {
"stream_config": JsonMetadataValue(
data={
"object": "departments",
"source_options": {"empty_as_null": False},
"meta": {
"dagster": {
"deps": ["foo_one", "foo_two"],
"group": "group_2",
"freshness_policy": {
"maximum_lag_minutes": 0,
"cron_schedule": "5 4 * * *",
"cron_schedule_timezone": "UTC",
},
}
},
}
)
},
AssetKey(["target", "public", "transactions"]): {
"stream_config": JsonMetadataValue(
data={
"mode": "incremental",
"primary_key": "id",
"update_key": "last_updated_at",
"meta": {
"dagster": {
"description": "Example Description!",
"auto_materialize_policy": True,
}
},
}
)
},
AssetKey(["target", "public", "all_users"]): {
"stream_config": JsonMetadataValue(
data={
"sql": 'select all_user_id, name \nfrom public."all_Users"\n',
"object": "public.all_users",
}
)
},
}

assert my_sling_assets.group_names_by_key == {
AssetKey(["target", "public", "all_users"]): "default",
AssetKey(["target", "public", "accounts"]): "default",
AssetKey(["target", "public", "transactions"]): "default",
AssetKey(["target", "departments"]): "group_2",
}

assert my_sling_assets.freshness_policies_by_key == {
AssetKey(["target", "departments"]): FreshnessPolicy(
maximum_lag_minutes=0.0, cron_schedule="5 4 * * *", cron_schedule_timezone="UTC"
)
}

assert (
AssetKey(["target", "public", "transactions"])
in my_sling_assets.auto_materialize_policies_by_key
)

0 comments on commit f3640de

Please sign in to comment.