Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/dagster elx asset loading #24

Merged
merged 13 commits into from
Nov 21, 2023
13 changes: 13 additions & 0 deletions elx/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ def safe_name(self) -> str:
""""""
return self.name.replace("-", "_")

@property
def is_selected(self) -> bool:
"""Returns boolean flag indicating whether stream has been selected or not."""
# Find the stream metadata by breadcrumb
metadata = self.find_metadata_by_breadcrumb(breadcrumb=[])

# If metadata does not exist, stream should be considered selected
if not metadata:
return True

# If metadata does exists, return value of `selected` property
return metadata.get("selected", True)

def find_metadata_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]:
"""
Find metadata by breadcrumb.
Expand Down
1 change: 1 addition & 0 deletions elx/extensions/dagster/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]:
code_version=runner.tap.hash_key,
)
for stream in runner.tap.catalog.streams
if stream.is_selected
},
can_subset=True,
group_name=dagster_safe_name(runner.tap.executable),
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from fixtures.singer import singer
from fixtures.tap import tap, tap_incremental
from fixtures.tap import tap
from fixtures.target import target
from fixtures.runner import runner
from fixtures.state import state_manager
2 changes: 1 addition & 1 deletion tests/fixtures/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
@pytest.fixture
def runner(tmp_path, tap: Tap, target: Target) -> Generator[Runner, None, None]:
"""
Return a Runner instance for the tap-smoke-test executable.
Return a Runner instance for the tap-mock-fixture executable.
"""
yield Runner(
tap=tap,
Expand Down
15 changes: 4 additions & 11 deletions tests/fixtures/singer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,14 @@
from elx.singer import Singer


@pytest.fixture(params=["tap-smoke-test", None])
@pytest.fixture(params=["tap-mock-fixture", None])
def singer(request) -> Generator[Singer, None, None]:
"""
Return a Singer instance for the tap-smoke-test executable.
Return a Singer instance for the tap-mock-fixture executable.
"""
yield Singer(
# Test with and without an executable.
executable=request.param,
spec="git+https://github.com/meltano/tap-smoke-test.git",
config={
"streams": [
{
"stream_name": "users",
"input_filename": "https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/animals-data.jsonl",
},
],
},
spec="git+https://github.com/quantile-taps/tap-mock-fixture.git",
config={},
)
23 changes: 2 additions & 21 deletions tests/fixtures/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,11 @@

@pytest.fixture(scope="session")
def tap() -> Generator[Tap, None, None]:
"""
Return a Tap instance for the tap-smoke-test executable.
"""
yield Tap(
executable="tap-smoke-test",
spec="git+https://github.com/meltano/tap-smoke-test.git",
config={
"streams": [
{
"stream_name": "animals",
"input_filename": "https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/animals-data.jsonl",
},
],
},
)


@pytest.fixture(scope="session")
def tap_incremental() -> Generator[Tap, None, None]:
"""
Return a Tap instance for the executable with an incremental stream.
"""
yield Tap(
executable="tap-mock-incremental",
spec="git+https://github.com/quantile-taps/tap-mock-incremental.git",
executable="tap-mock-fixture",
spec="git+https://github.com/quantile-taps/tap-mock-fixture.git",
config={},
)
90 changes: 35 additions & 55 deletions tests/test_elx/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,73 +7,54 @@
{
"tap_stream_id": "animals",
"replication_method": "FULL_TABLE",
"key_properties": ["id"],
"replication_key": None,
"table_name": None,
"is_view": False,
"key_properties": [],
"table_name": None,
"schema": {
"properties": {
"id": {"type": "integer"},
"description": {"type": "string"},
"verified": {"type": "boolean"},
"views": {"type": "integer"},
"created_at": {"type": "string"},
"id": {"type": ["integer", "null"]},
"animal_name": {"type": ["string", "null"]},
"updated_at": {"format": "date-time", "type": ["string", "null"]},
},
"type": "object",
"required": ["created_at", "description", "id", "verified", "views"],
},
"metadata": [
{
"breadcrumb": ["properties", "id"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": ["properties", "description"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": ["properties", "verified"],
"metadata": {"inclusion": "available"},
"metadata": {"inclusion": "automatic"},
},
{
"breadcrumb": ["properties", "views"],
"breadcrumb": ["properties", "animal_name"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": ["properties", "created_at"],
"breadcrumb": ["properties", "updated_at"],
"metadata": {"inclusion": "available"},
},
{
"breadcrumb": [],
"metadata": {
"inclusion": "available",
"selected": True,
"selected-by-default": True,
"table-key-properties": [],
"selected": False,
"selected-by-default": False,
"table-key-properties": ["id"],
},
},
],
}
]
}

INCREMENTAL_CATALOG = {
"streams": [
},
{
"tap_stream_id": "users",
"replication_method": "INCREMENTAL",
"replication_key": "updated_at",
"table_name": None,
"replication_method": "INCREMENTAL",
"is_view": False,
"table_name": None,
"key_properties": ["id"],
"schema": {
"properties": {
"id": {"type": ["string", "null"]},
"id": {"type": ["integer", "null"]},
"name": {"type": ["string", "null"]},
"updated_at": {
"format": "date-time",
"type": ["string", "null"],
},
"updated_at": {"format": "date-time", "type": ["string", "null"]},
},
"type": "object",
},
Expand Down Expand Up @@ -101,7 +82,7 @@
},
},
],
}
},
]
}

Expand All @@ -116,14 +97,12 @@ def test_catalog(tap: Tap):
def test_catalog_select(tap: Tap):
"""If we select a stream, the catalog should be updated."""
catalog = tap.catalog.select(["animals"])
catalog_dict = catalog.dict(by_alias=True)

assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == True
assert catalog.streams[0].is_selected == True

catalog = tap.catalog.select([])
catalog_dict = catalog.dict(by_alias=True)

assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False
assert catalog.streams[0].is_selected == False


def test_catalog_no_deselect(tap: Tap):
Expand All @@ -134,10 +113,9 @@ def test_catalog_no_deselect(tap: Tap):

def test_catalog_deselect_stream(tap: Tap):
"""If we deselect a stream, the catalog should be updated."""
catalog = tap.catalog.deselect(["animals"])
catalog_dict = catalog.dict(by_alias=True)
catalog = tap.catalog.deselect(["users"])

assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False
assert catalog.streams[1].is_selected == False


def test_catalog_deselect_invalid_stream(tap: Tap):
Expand All @@ -154,27 +132,29 @@ def test_catalog_deselect_property(tap: Tap):
assert catalog_dict["streams"][0]["metadata"][0]["metadata"]["selected"] == False


def test_catalog_replication_method(tap_incremental: Tap):
def test_catalog_replication_method(tap: Tap):
"""If we have an incremental stream, the replication_method in the catalog should be `INCREMENTAL`."""
catalog_dict = tap_incremental.catalog.dict(by_alias=True)
catalog_dict = tap.catalog.dict(by_alias=True)

assert (
catalog_dict["streams"][0]["replication_method"]
== INCREMENTAL_CATALOG["streams"][0]["replication_method"]
catalog_dict["streams"][1]["replication_method"]
== DEFAULT_CATALOG["streams"][1]["replication_method"]
)


def test_catalog_replication_key(tap_incremental: Tap):
def test_catalog_replication_key(tap: Tap):
"""If we have an incremental stream, the catalog should have a `replication_key`."""
catalog_dict = tap_incremental.catalog.dict(by_alias=True)
catalog_dict = tap.catalog.dict(by_alias=True)

assert catalog_dict["streams"][1]["replication_key"] != None

assert (
catalog_dict["streams"][0]["replication_key"]
== INCREMENTAL_CATALOG["streams"][0]["replication_key"]
catalog_dict["streams"][1]["replication_key"]
== DEFAULT_CATALOG["streams"][1]["replication_key"]
)


def test_catalog_valid_replication_keys(tap_incremental: Tap):
def test_catalog_valid_replication_keys(tap: Tap):
"""
If we have an incremental stream, the catalog should have a metadata breadcrumb for the incremental
stream containing the key: `valid-replication-keys`.
Expand All @@ -190,11 +170,11 @@ def test_catalog_valid_replication_keys(tap_incremental: Tap):
"valid-replication-keys": ["updated_at"],
}
"""
catalog_dict = tap_incremental.catalog.dict(by_alias=True)
catalog_dict = tap.catalog.dict(by_alias=True)

replication_keys = catalog_dict["streams"][0]["metadata"][-1]["metadata"].get(
replication_keys = catalog_dict["streams"][1]["metadata"][-1]["metadata"].get(
"valid-replication-keys", None
)

# Checks that value of `valid-replication-keys` equals to the replication-key
assert replication_keys == [INCREMENTAL_CATALOG["streams"][0]["replication_key"]]
assert replication_keys == [DEFAULT_CATALOG["streams"][1]["replication_key"]]
6 changes: 6 additions & 0 deletions tests/test_elx/test_extensions/test_dagster/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ def test_asset_loading(runner: Runner):
"""
Test that assets are loaded correctly.
"""
# Verifies that the tap associated with the runner has 2 streams
assert len(runner.tap.catalog.streams) == 2

# Load assets
assets = load_assets(runner)

# Length of assets should be 1 as one stream is deselected per default
assert len(assets) == 1
assert isinstance(assets[0], AssetsDefinition)
4 changes: 2 additions & 2 deletions tests/test_elx/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def test_config_interpolation_values(runner: Runner):
"""
Make sure the tap and target names are correct.
"""
assert runner.interpolation_values["TAP_EXECUTABLE"] == "tap-smoke-test"
assert runner.interpolation_values["TAP_EXECUTABLE"] == "tap-mock-fixture"
assert runner.interpolation_values["TARGET_EXECUTABLE"] == "target-jsonl"


Expand All @@ -39,4 +39,4 @@ def test_config_interpolation_target_values(tap: Tap):

runner = Runner(tap, target)

assert runner.target.config["tap_name"] == "tap_smoke_test"
assert runner.target.config["tap_name"] == "tap_mock_fixture"
12 changes: 6 additions & 6 deletions tests/test_elx/test_singer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_singer_can_discover_executable(singer: Singer):
"""
Test that the singer executable can be discovered.
"""
assert singer.executable == "tap-smoke-test"
assert singer.executable == "tap-mock-fixture"


def test_singer_can_install(singer: Singer):
Expand Down Expand Up @@ -71,8 +71,8 @@ def test_singer_dynamic_config():
Make sure the Singer instance is able to handle dynamic config.
"""
singer = Singer(
executable="tap-smoke-test",
spec="git+https://github.com/meltano/tap-smoke-test.git",
executable="tap-mock-fixture",
spec="git+https://github.com/quantile-taps/tap-mock-fixture.git",
config=lambda: {},
)

Expand All @@ -84,8 +84,8 @@ def test_singer_config_interpolation(runner: Runner):
Make sure the Singer instance is able to handle config interpolation.
"""
singer = Singer(
executable="tap-smoke-test",
spec="git+https://github.com/meltano/tap-smoke-test.git",
executable="tap-mock-fixture",
spec="git+https://github.com/quantile-taps/tap-mock-fixture.git",
config={
"target_schema": "{TAP_NAME}",
},
Expand All @@ -94,4 +94,4 @@ def test_singer_config_interpolation(runner: Runner):
# Attach the runner
singer.runner = runner

assert singer.config == {"target_schema": "tap_smoke_test"}
assert singer.config == {"target_schema": "tap_mock_fixture"}
2 changes: 1 addition & 1 deletion tests/test_elx/test_tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def test_tap_discovery(tap: Tap):
# Make sure the catalog is of the right type.
assert type(tap.catalog) == Catalog
# Make sure the catalog has the right number of streams.
assert len(tap.catalog.streams) == 1
assert len(tap.catalog.streams) == 2
# Make sure the streams are of the right type.
assert type(tap.catalog.streams[0]) == Stream

Expand Down
Loading