Skip to content

Commit

Permalink
[dagster-embedded-elt] Fix Updated At parameter's incorrect type (#17362
Browse files Browse the repository at this point in the history
)

Fixes an issue where the incorrect type was set for the `updated_at` parameter.
Fixes #17348
  • Loading branch information
PedramNavid authored Nov 7, 2023
1 parent adb6d66 commit cba181b
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def build_sling_asset(
target_object: str,
mode: SlingMode = SlingMode.FULL_REFRESH,
primary_key: Optional[Union[str, List[str]]] = None,
update_key: Optional[Union[str, List[str]]] = None,
update_key: Optional[str] = None,
source_options: Optional[Dict[str, Any]] = None,
target_options: Optional[Dict[str, Any]] = None,
sling_resource_key: str = "sling",
Expand All @@ -33,7 +33,7 @@ def build_sling_asset(
target_object (str): The target object to sync to. This can be a table, or a path.
mode (SlingMode, optional): The sync mode to use when syncing. Defaults to SlingMode.FULL_REFRESH.
primary_key (Optional[Union[str, List[str]]], optional): The optional primary key to use when syncing.
update_key (Optional[Union[str, List[str]]], optional): The optional update key to use when syncing.
update_key (Optional[str], optional): The optional update key to use when syncing.
source_options (Optional[Dict[str, Any]], optional): Any optional Sling source options to use when syncing.
target_options (Optional[Dict[str, Any]], optional): Any optional target options to use when syncing.
sling_resource_key (str, optional): The resource key for the SlingResource. Defaults to "sling".
Expand Down Expand Up @@ -69,9 +69,6 @@ def build_sling_asset(
if primary_key is not None and not isinstance(primary_key, list):
primary_key = [primary_key]

if update_key is not None and not isinstance(update_key, list):
update_key = [update_key]

@multi_asset(
name=asset_spec.key.to_python_identifier(),
compute_kind="sling",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def _sync(
target_object: str,
mode: SlingMode = SlingMode.FULL_REFRESH,
primary_key: Optional[List[str]] = None,
update_key: Optional[List[str]] = None,
update_key: Optional[str] = None,
source_options: Optional[Dict[str, Any]] = None,
target_options: Optional[Dict[str, Any]] = None,
) -> Generator[str, None, None]:
Expand Down Expand Up @@ -205,7 +205,7 @@ def sync(
target_object: str,
mode: SlingMode,
primary_key: Optional[List[str]] = None,
update_key: Optional[List[str]] = None,
update_key: Optional[str] = None,
source_options: Optional[Dict[str, Any]] = None,
target_options: Optional[Dict[str, Any]] = None,
) -> Generator[str, None, None]:
Expand All @@ -221,7 +221,7 @@ def sync(
name, e.g. TABLE1, SCHEMA1.TABLE2. For file targets, the target object is a path or an url to a file.
mode (SlingMode): The Sling mode to use when syncing, i.e. incremental, full-refresh
See the Sling docs for more information: https://docs.slingdata.io/sling-cli/running-tasks#modes.
primary_key (str): For incremental syncs, a primary key is used during merge statements to update
primary_key (List[str]): For incremental syncs, a primary key is used during merge statements to update
existing rows.
update_key (str): For incremental syncs, an update key is used to stream records after max(update_key)
source_options (Dict[str, Any]): Other source options to pass to Sling,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
SPECIES_CODE,SPECIES_NAME
abcdef,scrubjay
defghi,bluejay
jklnop,blackbird
SPECIES_CODE,SPECIES_NAME,UPDATED_AT
abcdef,scrubjay,1
defghi,bluejay,2
jklnop,blackbird,2
Original file line number Diff line number Diff line change
Expand Up @@ -6,87 +6,159 @@
from dagster import AssetSpec, Definitions, file_relative_path
from dagster._core.definitions import build_assets_job
from dagster_embedded_elt.sling import SlingMode, SlingResource, build_sling_asset
from dagster_embedded_elt.sling.resources import SlingSourceConnection, SlingTargetConnection
from dagster_embedded_elt.sling.resources import (
SlingSourceConnection,
SlingTargetConnection,
)


@pytest.fixture
def test_csv():
return os.path.abspath(file_relative_path(__file__, "test.csv"))


@pytest.fixture
def temp_db():
with tempfile.TemporaryDirectory() as tmpdir_path:
dbpath = os.path.join(tmpdir_path, "sqlite.db")
yield dbpath


@pytest.fixture
def sling_sqlite_resource(temp_db):
return SlingResource(
source_connection=SlingSourceConnection(type="file"),
target_connection=SlingTargetConnection(
type="sqlite", connection_string=f"sqlite://{temp_db}"
),
)


@pytest.fixture
def sqlite_connection(temp_db):
yield sqlite3.connect(temp_db)


ASSET_SPEC = AssetSpec(
key=["main", "tbl"],
group_name="etl",
description="ETL Test",
deps=["foo"],
)


@pytest.mark.parametrize(
"mode,runs,expected", [(SlingMode.INCREMENTAL, 1, 3), (SlingMode.SNAPSHOT, 2, 6)]
)
def test_build_sling_asset(mode: SlingMode, runs: int, expected: int):
with tempfile.TemporaryDirectory() as tmpdir_path:
fpath = os.path.abspath(file_relative_path(__file__, "test.csv"))
dbpath = os.path.join(tmpdir_path, "sqlite.db")
def test_build_sling_asset(
test_csv: str,
sling_sqlite_resource: SlingResource,
mode: SlingMode,
runs: int,
expected: int,
sqlite_connection: sqlite3.Connection,
):
asset_def = build_sling_asset(
asset_spec=ASSET_SPEC,
source_stream=f"file://{test_csv}",
target_object="main.tbl",
mode=mode,
primary_key="SPECIES_CODE",
sling_resource_key="sling_resource",
)

sling_resource = SlingResource(
source_connection=SlingSourceConnection(type="file"),
target_connection=SlingTargetConnection(
type="sqlite", connection_string=f"sqlite://{dbpath}"
),
)

asset_spec = AssetSpec(
key=["main", "tbl"],
group_name="etl",
description="ETL Test",
deps=["foo"],
)
asset_def = build_sling_asset(
asset_spec=asset_spec,
source_stream=f"file://{fpath}",
target_object="main.tbl",
mode=mode,
primary_key="SPECIES_CODE",
sling_resource_key="sling_resource",
)

sling_job = build_assets_job(
"sling_job",
[asset_def],
resource_defs={"sling_resource": sling_resource},
)

counts = None
for n in range(runs):
res = sling_job.execute_in_process()
assert res.success
counts = sqlite3.connect(dbpath).execute("SELECT count(1) FROM main.tbl").fetchone()[0]
assert counts == expected


def test_can_build_two_assets():
with tempfile.TemporaryDirectory() as tmpdir_path:
fpath = os.path.abspath(file_relative_path(__file__, "test.csv"))
dbpath = os.path.join(tmpdir_path, "sqlite.db")
sling_job = build_assets_job(
"sling_job",
[asset_def],
resource_defs={"sling_resource": sling_sqlite_resource},
)

counts = None
for _ in range(runs):
res = sling_job.execute_in_process()
assert res.success
counts = sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0]
assert counts == expected


def test_can_build_two_assets(
test_csv,
sling_sqlite_resource: SlingResource,
):
asset_def = build_sling_asset(
asset_spec=AssetSpec(key="asset1"),
source_stream=f"file://{test_csv}",
target_object="main.first_tbl",
mode=SlingMode.FULL_REFRESH,
primary_key="SPECIES_CODE",
sling_resource_key="sling_resource",
)

asset_def_two = build_sling_asset(
asset_spec=AssetSpec(key="asset2"),
source_stream=f"file://{test_csv}",
target_object="main.second_tbl",
mode=SlingMode.FULL_REFRESH,
primary_key="SPECIES_CODE",
sling_resource_key="sling_resource",
)

defs = Definitions(
assets=[asset_def, asset_def_two],
resources={"sling_resource": sling_sqlite_resource},
)

assert defs.get_assets_def("asset1")
assert defs.get_assets_def("asset2")


def test_update_mode(
test_csv: str,
sling_sqlite_resource: SlingResource,
sqlite_connection: sqlite3.Connection,
):
"""Creates a Sling sync using Full Refresh, manually increments the UPDATE KEY to be a higher value,
which should cause the next run not to append new rows.
"""
asset_def_base = build_sling_asset(
asset_spec=ASSET_SPEC,
source_stream=f"file://{test_csv}",
target_object="main.tbl",
mode=SlingMode.FULL_REFRESH,
sling_resource_key="sling_resource",
)

asset_def_update = build_sling_asset(
asset_spec=ASSET_SPEC,
source_stream=f"file://{test_csv}",
target_object="main.tbl",
mode=SlingMode.INCREMENTAL,
primary_key="SPECIES_NAME",
update_key="UPDATED_AT",
sling_resource_key="sling_resource",
)

sling_job_base = build_assets_job(
"sling_job",
[asset_def_base],
resource_defs={"sling_resource": sling_sqlite_resource},
)

# First run should have 3 new rows
res = sling_job_base.execute_in_process()
assert res.success
assert sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0] == 3

# Next, manually set the UPDATED_AT to a higher value, this should prevent an append job from adding new rows.
cur = sqlite_connection.cursor()
cur.execute("UPDATE main.tbl set UPDATED_AT=999")
sqlite_connection.commit()

sling_resource = SlingResource(
source_connection=SlingSourceConnection(type="file"),
target_connection=SlingTargetConnection(
type="sqlite", connection_string=f"sqlite://{dbpath}"
),
)

asset_def = build_sling_asset(
asset_spec=AssetSpec(key="asset1"),
source_stream=f"file://{fpath}",
target_object="main.first_tbl",
mode=SlingMode.FULL_REFRESH,
primary_key="SPECIES_CODE",
sling_resource_key="sling_resource",
)

asset_def_two = build_sling_asset(
asset_spec=AssetSpec(key="asset2"),
source_stream=f"file://{fpath}",
target_object="main.second_tbl",
mode=SlingMode.FULL_REFRESH,
primary_key="SPECIES_CODE",
sling_resource_key="sling_resource",
)

defs = Definitions(
assets=[asset_def, asset_def_two],
resources={"sling_resource": sling_resource},
)

assert defs.get_assets_def("asset1")
assert defs.get_assets_def("asset2")
sling_job_update = build_assets_job(
"sling_job_update",
[asset_def_update],
resource_defs={"sling_resource": sling_sqlite_resource},
)
res = sling_job_update.execute_in_process()
assert res.success
assert sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0] == 3
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

from dagster import asset, file_relative_path, materialize
from dagster_embedded_elt.sling import SlingMode, SlingResource
from dagster_embedded_elt.sling.resources import SlingSourceConnection, SlingTargetConnection
from dagster_embedded_elt.sling.resources import (
SlingSourceConnection,
SlingTargetConnection,
)


def test_simple_resource_connection():
Expand Down

0 comments on commit cba181b

Please sign in to comment.