Skip to content

Commit

Permalink
Merge pull request #24 from quantile-development/feature/dagster-elx-…
Browse files Browse the repository at this point in the history
…asset-loading

Feature/dagster elx asset loading
  • Loading branch information
BernardWez authored Nov 21, 2023
2 parents 9a65d15 + 29a2f39 commit eaec5ac
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 98 deletions.
13 changes: 13 additions & 0 deletions elx/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ def safe_name(self) -> str:
""""""
return self.name.replace("-", "_")

@property
def is_selected(self) -> bool:
"""Returns boolean flag indicating whether stream has been selected or not."""
# Find the stream metadata by breadcrumb
metadata = self.find_metadata_by_breadcrumb(breadcrumb=[])

# If metadata does not exist, stream should be considered selected
if not metadata:
return True

# If metadata does exists, return value of `selected` property
return metadata.get("selected", True)

def find_metadata_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]:
"""
Find metadata by breadcrumb.
Expand Down
1 change: 1 addition & 0 deletions elx/extensions/dagster/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]:
code_version=runner.tap.hash_key,
)
for stream in runner.tap.catalog.streams
if stream.is_selected
},
can_subset=True,
group_name=dagster_safe_name(runner.tap.executable),
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from fixtures.singer import singer
from fixtures.tap import tap, tap_incremental
from fixtures.tap import tap
from fixtures.target import target
from fixtures.runner import runner
from fixtures.state import state_manager
2 changes: 1 addition & 1 deletion tests/fixtures/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
@pytest.fixture
def runner(tmp_path, tap: Tap, target: Target) -> Generator[Runner, None, None]:
"""
Return a Runner instance for the tap-smoke-test executable.
Return a Runner instance for the tap-mock-fixture executable.
"""
yield Runner(
tap=tap,
Expand Down
15 changes: 4 additions & 11 deletions tests/fixtures/singer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,14 @@
from elx.singer import Singer


@pytest.fixture(params=["tap-smoke-test", None])
@pytest.fixture(params=["tap-mock-fixture", None])
def singer(request) -> Generator[Singer, None, None]:
"""
Return a Singer instance for the tap-smoke-test executable.
Return a Singer instance for the tap-mock-fixture executable.
"""
yield Singer(
# Test with and without an executable.
executable=request.param,
spec="git+https://github.com/meltano/tap-smoke-test.git",
config={
"streams": [
{
"stream_name": "users",
"input_filename": "https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/animals-data.jsonl",
},
],
},
spec="git+https://github.com/quantile-taps/tap-mock-fixture.git",
config={},
)
23 changes: 2 additions & 21 deletions tests/fixtures/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,11 @@

@pytest.fixture(scope="session")
def tap() -> Generator[Tap, None, None]:
"""
Return a Tap instance for the tap-smoke-test executable.
"""
yield Tap(
executable="tap-smoke-test",
spec="git+https://github.com/meltano/tap-smoke-test.git",
config={
"streams": [
{
"stream_name": "animals",
"input_filename": "https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/animals-data.jsonl",
},
],
},
)


@pytest.fixture(scope="session")
def tap_incremental() -> Generator[Tap, None, None]:
"""
Return a Tap instance for the executable with an incremental stream.
"""
yield Tap(
executable="tap-mock-incremental",
spec="git+https://github.com/quantile-taps/tap-mock-incremental.git",
executable="tap-mock-fixture",
spec="git+https://github.com/quantile-taps/tap-mock-fixture.git",
config={},
)
90 changes: 35 additions & 55 deletions tests/test_elx/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,73 +7,54 @@
{
"tap_stream_id": "animals",
"replication_method": "FULL_TABLE",
"key_properties": ["id"],
"replication_key": None,
"table_name": None,
"is_view": False,
"key_properties": [],
"table_name": None,
"schema": {
"properties": {
"id": {"type": "integer"},
"description": {"type": "string"},
"verified": {"type": "boolean"},
"views": {"type": "integer"},
"created_at": {"type": "string"},
"id": {"type": ["integer", "null"]},
"animal_name": {"type": ["string", "null"]},
"updated_at": {"format": "date-time", "type": ["string", "null"]},
},
"type": "object",
"required": ["created_at", "description", "id", "verified", "views"],
},
"metadata": [
{
"breadcrumb": ["properties", "id"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": ["properties", "description"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": ["properties", "verified"],
"metadata": {"inclusion": "available"},
"metadata": {"inclusion": "automatic"},
},
{
"breadcrumb": ["properties", "views"],
"breadcrumb": ["properties", "animal_name"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": ["properties", "created_at"],
"breadcrumb": ["properties", "updated_at"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": [],
"metadata": {
"inclusion": "available",
"selected": True,
"selected-by-default": True,
"table-key-properties": [],
"selected": False,
"selected-by-default": False,
"table-key-properties": ["id"],
},
},
],
}
]
}

INCREMENTAL_CATALOG = {
"streams": [
},
{
"tap_stream_id": "users",
"replication_method": "INCREMENTAL",
"replication_key": "updated_at",
"table_name": None,
"replication_method": "INCREMENTAL",
"is_view": False,
"table_name": None,
"key_properties": ["id"],
"schema": {
"properties": {
"id": {"type": ["string", "null"]},
"id": {"type": ["integer", "null"]},
"name": {"type": ["string", "null"]},
"updated_at": {
"format": "date-time",
"type": ["string", "null"],
},
"updated_at": {"format": "date-time", "type": ["string", "null"]},
},
"type": "object",
},
Expand Down Expand Up @@ -101,7 +82,7 @@
},
},
],
}
},
]
}

Expand All @@ -116,14 +97,12 @@ def test_catalog(tap: Tap):
def test_catalog_select(tap: Tap):
"""If we select a stream, the catalog should be updated."""
catalog = tap.catalog.select(["animals"])
catalog_dict = catalog.dict(by_alias=True)

assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == True
assert catalog.streams[0].is_selected == True

catalog = tap.catalog.select([])
catalog_dict = catalog.dict(by_alias=True)

assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False
assert catalog.streams[0].is_selected == False


def test_catalog_no_deselect(tap: Tap):
Expand All @@ -134,10 +113,9 @@ def test_catalog_no_deselect(tap: Tap):

def test_catalog_deselect_stream(tap: Tap):
"""If we deselect a stream, the catalog should be updated."""
catalog = tap.catalog.deselect(["animals"])
catalog_dict = catalog.dict(by_alias=True)
catalog = tap.catalog.deselect(["users"])

assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False
assert catalog.streams[1].is_selected == False


def test_catalog_deselect_invalid_stream(tap: Tap):
Expand All @@ -154,27 +132,29 @@ def test_catalog_deselect_property(tap: Tap):
assert catalog_dict["streams"][0]["metadata"][0]["metadata"]["selected"] == False


def test_catalog_replication_method(tap_incremental: Tap):
def test_catalog_replication_method(tap: Tap):
"""If we have an incremental stream, the replication_method in the catalog should be `INCREMENTAL`."""
catalog_dict = tap_incremental.catalog.dict(by_alias=True)
catalog_dict = tap.catalog.dict(by_alias=True)

assert (
catalog_dict["streams"][0]["replication_method"]
== INCREMENTAL_CATALOG["streams"][0]["replication_method"]
catalog_dict["streams"][1]["replication_method"]
== DEFAULT_CATALOG["streams"][1]["replication_method"]
)


def test_catalog_replication_key(tap_incremental: Tap):
def test_catalog_replication_key(tap: Tap):
"""If we have an incremental stream, the catalog should have a `replication_key`."""
catalog_dict = tap_incremental.catalog.dict(by_alias=True)
catalog_dict = tap.catalog.dict(by_alias=True)

assert catalog_dict["streams"][1]["replication_key"] != None

assert (
catalog_dict["streams"][0]["replication_key"]
== INCREMENTAL_CATALOG["streams"][0]["replication_key"]
catalog_dict["streams"][1]["replication_key"]
== DEFAULT_CATALOG["streams"][1]["replication_key"]
)


def test_catalog_valid_replication_keys(tap_incremental: Tap):
def test_catalog_valid_replication_keys(tap: Tap):
"""
If we have an incremental stream, the catalog should have a metadata breadcrumb for the incremental
stream containing the key: `valid-replication-keys`.
Expand All @@ -190,11 +170,11 @@ def test_catalog_valid_replication_keys(tap_incremental: Tap):
"valid-replication-keys": ["updated_at"],
}
"""
catalog_dict = tap_incremental.catalog.dict(by_alias=True)
catalog_dict = tap.catalog.dict(by_alias=True)

replication_keys = catalog_dict["streams"][0]["metadata"][-1]["metadata"].get(
replication_keys = catalog_dict["streams"][1]["metadata"][-1]["metadata"].get(
"valid-replication-keys", None
)

# Checks that value of `valid-replication-keys` equals to the replication-key
assert replication_keys == [INCREMENTAL_CATALOG["streams"][0]["replication_key"]]
assert replication_keys == [DEFAULT_CATALOG["streams"][1]["replication_key"]]
6 changes: 6 additions & 0 deletions tests/test_elx/test_extensions/test_dagster/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ def test_asset_loading(runner: Runner):
"""
Test that assets are loaded correctly.
"""
# Verifies that the tap associated with the runner has 2 streams
assert len(runner.tap.catalog.streams) == 2

# Load assets
assets = load_assets(runner)

# Length of assets should be 1 as one stream is deselected per default
assert len(assets) == 1
assert isinstance(assets[0], AssetsDefinition)
4 changes: 2 additions & 2 deletions tests/test_elx/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def test_config_interpolation_values(runner: Runner):
"""
Make sure the tap and target names are correct.
"""
assert runner.interpolation_values["TAP_EXECUTABLE"] == "tap-smoke-test"
assert runner.interpolation_values["TAP_EXECUTABLE"] == "tap-mock-fixture"
assert runner.interpolation_values["TARGET_EXECUTABLE"] == "target-jsonl"


Expand All @@ -39,4 +39,4 @@ def test_config_interpolation_target_values(tap: Tap):

runner = Runner(tap, target)

assert runner.target.config["tap_name"] == "tap_smoke_test"
assert runner.target.config["tap_name"] == "tap_mock_fixture"
12 changes: 6 additions & 6 deletions tests/test_elx/test_singer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_singer_can_discover_executable(singer: Singer):
"""
Test that the singer executable can be discovered.
"""
assert singer.executable == "tap-smoke-test"
assert singer.executable == "tap-mock-fixture"


def test_singer_can_install(singer: Singer):
Expand Down Expand Up @@ -71,8 +71,8 @@ def test_singer_dynamic_config():
Make sure the Singer instance is able to handle dynamic config.
"""
singer = Singer(
executable="tap-smoke-test",
spec="git+https://github.com/meltano/tap-smoke-test.git",
executable="tap-mock-fixture",
spec="git+https://github.com/quantile-taps/tap-mock-fixture.git",
config=lambda: {},
)

Expand All @@ -84,8 +84,8 @@ def test_singer_config_interpolation(runner: Runner):
Make sure the Singer instance is able to handle config interpolation.
"""
singer = Singer(
executable="tap-smoke-test",
spec="git+https://github.com/meltano/tap-smoke-test.git",
executable="tap-mock-fixture",
spec="git+https://github.com/quantile-taps/tap-mock-fixture.git",
config={
"target_schema": "{TAP_NAME}",
},
Expand All @@ -94,4 +94,4 @@ def test_singer_config_interpolation(runner: Runner):
# Attach the runner
singer.runner = runner

assert singer.config == {"target_schema": "tap_smoke_test"}
assert singer.config == {"target_schema": "tap_mock_fixture"}
2 changes: 1 addition & 1 deletion tests/test_elx/test_tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def test_tap_discovery(tap: Tap):
# Make sure the catalog is of the right type.
assert type(tap.catalog) == Catalog
# Make sure the catalog has the right number of streams.
assert len(tap.catalog.streams) == 1
assert len(tap.catalog.streams) == 2
# Make sure the streams are of the right type.
assert type(tap.catalog.streams[0]) == Stream

Expand Down

0 comments on commit eaec5ac

Please sign in to comment.