Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/dagster elx asset loading #24

Merged
merged 13 commits into from
Nov 21, 2023
13 changes: 13 additions & 0 deletions elx/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ def safe_name(self) -> str:
""""""
return self.name.replace("-", "_")

@property
def is_selected(self) -> bool:
"""Returns boolean flag indicating whether stream has been selected or not."""
# Find the stream metadata by breadcrumb
metadata = self.find_metadata_by_breadcrumb(breadcrumb=[])

# If metadata does not exist, stream should be considered selected
if not metadata:
return True

# If metadata does exists, return value of `selected` property
return metadata.get("selected", True)

def find_metadata_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]:
"""
Find metadata by breadcrumb.
Expand Down
1 change: 1 addition & 0 deletions elx/extensions/dagster/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]:
code_version=runner.tap.hash_key,
)
for stream in runner.tap.catalog.streams
if stream.is_selected
},
can_subset=True,
group_name=dagster_safe_name(runner.tap.executable),
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from fixtures.singer import singer
from fixtures.tap import tap, tap_incremental
from fixtures.tap import tap, tap_incremental, tap_multiple_streams
from fixtures.target import target
from fixtures.runner import runner
from fixtures.runner import runner, runner_with_deselected_stream
from fixtures.state import state_manager
15 changes: 15 additions & 0 deletions tests/fixtures/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,18 @@ def runner(tmp_path, tap: Tap, target: Target) -> Generator[Runner, None, None]:
target=target,
state_manager=StateManager(base_path=str(tmp_path)),
)


@pytest.fixture
def runner_with_deselected_stream(
BernardWez marked this conversation as resolved.
Show resolved Hide resolved
tmp_path, tap_multiple_streams: Tap, target: Target
) -> Generator[Runner, None, None]:
"""
Return a Runner instance for the tap-smoke-test executable with two streams,
of which one is deselected.
"""
yield Runner(
tap=tap_multiple_streams,
target=target,
state_manager=StateManager(base_path=str(tmp_path)),
)
26 changes: 26 additions & 0 deletions tests/fixtures/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,29 @@ def tap_incremental() -> Generator[Tap, None, None]:
spec="git+https://github.com/quantile-taps/tap-mock-incremental.git",
config={},
)


@pytest.fixture(scope="session")
def tap_multiple_streams() -> Generator[Tap, None, None]:
BernardWez marked this conversation as resolved.
Show resolved Hide resolved
"""
Return a Tap instance for the tap-smoke-test executable with two streams.

One stream is selected and one stream is deselected.
"""
yield Tap(
executable="tap-smoke-test",
spec="git+https://github.com/meltano/tap-smoke-test.git",
config={
"streams": [
{
"stream_name": "animals",
"input_filename": "https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/animals-data.jsonl",
},
{
"stream_name": "animals-two",
"input_filename": "https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/animals-data.jsonl",
},
],
},
deselected=["animals-two"],
)
9 changes: 3 additions & 6 deletions tests/test_elx/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,12 @@ def test_catalog(tap: Tap):
def test_catalog_select(tap: Tap):
"""If we select a stream, the catalog should be updated."""
catalog = tap.catalog.select(["animals"])
catalog_dict = catalog.dict(by_alias=True)

assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == True
assert catalog.streams[0].is_selected == True

catalog = tap.catalog.select([])
catalog_dict = catalog.dict(by_alias=True)

assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False
assert catalog.streams[0].is_selected == False


def test_catalog_no_deselect(tap: Tap):
Expand All @@ -135,9 +133,8 @@ def test_catalog_no_deselect(tap: Tap):
def test_catalog_deselect_stream(tap: Tap):
"""If we deselect a stream, the catalog should be updated."""
catalog = tap.catalog.deselect(["animals"])
catalog_dict = catalog.dict(by_alias=True)

assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False
assert catalog.streams[0].is_selected == False


def test_catalog_deselect_invalid_stream(tap: Tap):
Expand Down
12 changes: 12 additions & 0 deletions tests/test_elx/test_extensions/test_dagster/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,15 @@ def test_asset_loading(runner: Runner):
assets = load_assets(runner)
assert len(assets) == 1
assert isinstance(assets[0], AssetsDefinition)


def test_asset_loading_with_deselected_stream(runner_with_deselected_stream: Runner):
"""
Test that assets are loaded correctly.
"""
# Verifies that the associated tap has multiple streams
assert len(runner_with_deselected_stream.tap.catalog.streams) == 2

assets = load_assets(runner_with_deselected_stream)
assert len(assets) == 1
assert isinstance(assets[0], AssetsDefinition)
Loading