diff --git a/elx/catalog.py b/elx/catalog.py index d4e1caa..20109be 100644 --- a/elx/catalog.py +++ b/elx/catalog.py @@ -23,6 +23,19 @@ def safe_name(self) -> str: """""" return self.name.replace("-", "_") + @property + def is_selected(self) -> bool: + """Returns boolean flag indicating whether stream has been selected or not.""" + # Find the stream metadata by breadcrumb + metadata = self.find_metadata_by_breadcrumb(breadcrumb=[]) + + # If metadata does not exist, stream should be considered selected + if not metadata: + return True + + # If metadata does exists, return value of `selected` property + return metadata.get("selected", True) + def find_metadata_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]: """ Find metadata by breadcrumb. diff --git a/elx/extensions/dagster/assets.py b/elx/extensions/dagster/assets.py index 21ad38f..b385004 100644 --- a/elx/extensions/dagster/assets.py +++ b/elx/extensions/dagster/assets.py @@ -75,6 +75,7 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]: code_version=runner.tap.hash_key, ) for stream in runner.tap.catalog.streams + if stream.is_selected }, can_subset=True, group_name=dagster_safe_name(runner.tap.executable), diff --git a/tests/conftest.py b/tests/conftest.py index 067a7c7..746ec38 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,5 @@ from fixtures.singer import singer -from fixtures.tap import tap, tap_incremental +from fixtures.tap import tap from fixtures.target import target from fixtures.runner import runner from fixtures.state import state_manager diff --git a/tests/fixtures/runner.py b/tests/fixtures/runner.py index 311de7a..f7b2d83 100644 --- a/tests/fixtures/runner.py +++ b/tests/fixtures/runner.py @@ -8,7 +8,7 @@ @pytest.fixture def runner(tmp_path, tap: Tap, target: Target) -> Generator[Runner, None, None]: """ - Return a Runner instance for the tap-smoke-test executable. + Return a Runner instance for the tap-mock-fixture executable. """ yield Runner( tap=tap, diff --git a/tests/fixtures/singer.py b/tests/fixtures/singer.py index 4648b33..634818b 100644 --- a/tests/fixtures/singer.py +++ b/tests/fixtures/singer.py @@ -3,21 +3,14 @@ from elx.singer import Singer -@pytest.fixture(params=["tap-smoke-test", None]) +@pytest.fixture(params=["tap-mock-fixture", None]) def singer(request) -> Generator[Singer, None, None]: """ - Return a Singer instance for the tap-smoke-test executable. + Return a Singer instance for the tap-mock-fixture executable. """ yield Singer( # Test with and without an executable. executable=request.param, - spec="git+https://github.com/meltano/tap-smoke-test.git", - config={ - "streams": [ - { - "stream_name": "users", - "input_filename": "https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/animals-data.jsonl", - }, - ], - }, + spec="git+https://github.com/quantile-taps/tap-mock-fixture.git", + config={}, ) diff --git a/tests/fixtures/tap.py b/tests/fixtures/tap.py index 5780457..b334d9b 100644 --- a/tests/fixtures/tap.py +++ b/tests/fixtures/tap.py @@ -5,30 +5,11 @@ @pytest.fixture(scope="session") def tap() -> Generator[Tap, None, None]: - """ - Return a Tap instance for the tap-smoke-test executable. - """ - yield Tap( - executable="tap-smoke-test", - spec="git+https://github.com/meltano/tap-smoke-test.git", - config={ - "streams": [ - { - "stream_name": "animals", - "input_filename": "https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/animals-data.jsonl", - }, - ], - }, - ) - - -@pytest.fixture(scope="session") -def tap_incremental() -> Generator[Tap, None, None]: """ Return a Tap instance for the executable with an incremental stream. """ yield Tap( - executable="tap-mock-incremental", - spec="git+https://github.com/quantile-taps/tap-mock-incremental.git", + executable="tap-mock-fixture", + spec="git+https://github.com/quantile-taps/tap-mock-fixture.git", config={}, ) diff --git a/tests/test_elx/test_catalog.py b/tests/test_elx/test_catalog.py index ab4b026..90e3792 100644 --- a/tests/test_elx/test_catalog.py +++ b/tests/test_elx/test_catalog.py @@ -7,73 +7,54 @@ { "tap_stream_id": "animals", "replication_method": "FULL_TABLE", + "key_properties": ["id"], "replication_key": None, - "table_name": None, "is_view": False, - "key_properties": [], + "table_name": None, "schema": { "properties": { - "id": {"type": "integer"}, - "description": {"type": "string"}, - "verified": {"type": "boolean"}, - "views": {"type": "integer"}, - "created_at": {"type": "string"}, + "id": {"type": ["integer", "null"]}, + "animal_name": {"type": ["string", "null"]}, + "updated_at": {"format": "date-time", "type": ["string", "null"]}, }, "type": "object", - "required": ["created_at", "description", "id", "verified", "views"], }, "metadata": [ { "breadcrumb": ["properties", "id"], - "metadata": {"inclusion": "available"}, - }, - { - "breadcrumb": ["properties", "description"], - "metadata": {"inclusion": "available"}, - }, - { - "breadcrumb": ["properties", "verified"], - "metadata": {"inclusion": "available"}, + "metadata": {"inclusion": "automatic"}, }, { - "breadcrumb": ["properties", "views"], + "breadcrumb": ["properties", "animal_name"], "metadata": {"inclusion": "available"}, }, { - "breadcrumb": ["properties", "created_at"], + "breadcrumb": ["properties", "updated_at"], "metadata": {"inclusion": "available"}, }, { "breadcrumb": [], "metadata": { "inclusion": "available", - "selected": True, - "selected-by-default": True, - "table-key-properties": [], + "selected": False, + "selected-by-default": False, + "table-key-properties": ["id"], }, }, ], - } - ] -} - -INCREMENTAL_CATALOG = { - "streams": [ + }, { "tap_stream_id": "users", - "replication_method": "INCREMENTAL", "replication_key": "updated_at", - "table_name": None, + "replication_method": "INCREMENTAL", "is_view": False, + "table_name": None, "key_properties": ["id"], "schema": { "properties": { - "id": {"type": ["string", "null"]}, + "id": {"type": ["integer", "null"]}, "name": {"type": ["string", "null"]}, - "updated_at": { - "format": "date-time", - "type": ["string", "null"], - }, + "updated_at": {"format": "date-time", "type": ["string", "null"]}, }, "type": "object", }, @@ -101,7 +82,7 @@ }, }, ], - } + }, ] } @@ -116,14 +97,12 @@ def test_catalog(tap: Tap): def test_catalog_select(tap: Tap): """If we select a stream, the catalog should be updated.""" catalog = tap.catalog.select(["animals"]) - catalog_dict = catalog.dict(by_alias=True) - assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == True + assert catalog.streams[0].is_selected == True catalog = tap.catalog.select([]) - catalog_dict = catalog.dict(by_alias=True) - assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False + assert catalog.streams[0].is_selected == False def test_catalog_no_deselect(tap: Tap): @@ -134,10 +113,9 @@ def test_catalog_no_deselect(tap: Tap): def test_catalog_deselect_stream(tap: Tap): """If we deselect a stream, the catalog should be updated.""" - catalog = tap.catalog.deselect(["animals"]) - catalog_dict = catalog.dict(by_alias=True) + catalog = tap.catalog.deselect(["users"]) - assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False + assert catalog.streams[1].is_selected == False def test_catalog_deselect_invalid_stream(tap: Tap): @@ -154,27 +132,29 @@ def test_catalog_deselect_property(tap: Tap): assert catalog_dict["streams"][0]["metadata"][0]["metadata"]["selected"] == False -def test_catalog_replication_method(tap_incremental: Tap): +def test_catalog_replication_method(tap: Tap): """If we have an incremental stream, the replication_method in the catalog should be `INCREMENTAL`.""" - catalog_dict = tap_incremental.catalog.dict(by_alias=True) + catalog_dict = tap.catalog.dict(by_alias=True) assert ( - catalog_dict["streams"][0]["replication_method"] - == INCREMENTAL_CATALOG["streams"][0]["replication_method"] + catalog_dict["streams"][1]["replication_method"] + == DEFAULT_CATALOG["streams"][1]["replication_method"] ) -def test_catalog_replication_key(tap_incremental: Tap): +def test_catalog_replication_key(tap: Tap): """If we have an incremental stream, the catalog should have a `replication_key`.""" - catalog_dict = tap_incremental.catalog.dict(by_alias=True) + catalog_dict = tap.catalog.dict(by_alias=True) + + assert catalog_dict["streams"][1]["replication_key"] != None assert ( - catalog_dict["streams"][0]["replication_key"] - == INCREMENTAL_CATALOG["streams"][0]["replication_key"] + catalog_dict["streams"][1]["replication_key"] + == DEFAULT_CATALOG["streams"][1]["replication_key"] ) -def test_catalog_valid_replication_keys(tap_incremental: Tap): +def test_catalog_valid_replication_keys(tap: Tap): """ If we have an incremental stream, the catalog should have a metadata breadcrumb for the incremental stream containing the key: `valid-replication-keys`. @@ -190,11 +170,11 @@ def test_catalog_valid_replication_keys(tap_incremental: Tap): "valid-replication-keys": ["updated_at"], } """ - catalog_dict = tap_incremental.catalog.dict(by_alias=True) + catalog_dict = tap.catalog.dict(by_alias=True) - replication_keys = catalog_dict["streams"][0]["metadata"][-1]["metadata"].get( + replication_keys = catalog_dict["streams"][1]["metadata"][-1]["metadata"].get( "valid-replication-keys", None ) # Checks that value of `valid-replication-keys` equals to the replication-key - assert replication_keys == [INCREMENTAL_CATALOG["streams"][0]["replication_key"]] + assert replication_keys == [DEFAULT_CATALOG["streams"][1]["replication_key"]] diff --git a/tests/test_elx/test_extensions/test_dagster/test_assets.py b/tests/test_elx/test_extensions/test_dagster/test_assets.py index 7d81a67..251204f 100644 --- a/tests/test_elx/test_extensions/test_dagster/test_assets.py +++ b/tests/test_elx/test_extensions/test_dagster/test_assets.py @@ -7,6 +7,12 @@ def test_asset_loading(runner: Runner): """ Test that assets are loaded correctly. """ + # Verifies that the tap associated with the runner has 2 streams + assert len(runner.tap.catalog.streams) == 2 + + # Load assets assets = load_assets(runner) + + # Length of assets should be 1 as one stream is deselected per default assert len(assets) == 1 assert isinstance(assets[0], AssetsDefinition) diff --git a/tests/test_elx/test_runner.py b/tests/test_elx/test_runner.py index 69e62de..27ee397 100644 --- a/tests/test_elx/test_runner.py +++ b/tests/test_elx/test_runner.py @@ -22,7 +22,7 @@ def test_config_interpolation_values(runner: Runner): """ Make sure the tap and target names are correct. """ - assert runner.interpolation_values["TAP_EXECUTABLE"] == "tap-smoke-test" + assert runner.interpolation_values["TAP_EXECUTABLE"] == "tap-mock-fixture" assert runner.interpolation_values["TARGET_EXECUTABLE"] == "target-jsonl" @@ -39,4 +39,4 @@ def test_config_interpolation_target_values(tap: Tap): runner = Runner(tap, target) - assert runner.target.config["tap_name"] == "tap_smoke_test" + assert runner.target.config["tap_name"] == "tap_mock_fixture" diff --git a/tests/test_elx/test_singer.py b/tests/test_elx/test_singer.py index 234301e..eb9a082 100644 --- a/tests/test_elx/test_singer.py +++ b/tests/test_elx/test_singer.py @@ -28,7 +28,7 @@ def test_singer_can_discover_executable(singer: Singer): """ Test that the singer executable can be discovered. """ - assert singer.executable == "tap-smoke-test" + assert singer.executable == "tap-mock-fixture" def test_singer_can_install(singer: Singer): @@ -71,8 +71,8 @@ def test_singer_dynamic_config(): Make sure the Singer instance is able to handle dynamic config. """ singer = Singer( - executable="tap-smoke-test", - spec="git+https://github.com/meltano/tap-smoke-test.git", + executable="tap-mock-fixture", + spec="git+https://github.com/quantile-taps/tap-mock-fixture.git", config=lambda: {}, ) @@ -84,8 +84,8 @@ def test_singer_config_interpolation(runner: Runner): Make sure the Singer instance is able to handle config interpolation. """ singer = Singer( - executable="tap-smoke-test", - spec="git+https://github.com/meltano/tap-smoke-test.git", + executable="tap-mock-fixture", + spec="git+https://github.com/quantile-taps/tap-mock-fixture.git", config={ "target_schema": "{TAP_NAME}", }, @@ -94,4 +94,4 @@ def test_singer_config_interpolation(runner: Runner): # Attach the runner singer.runner = runner - assert singer.config == {"target_schema": "tap_smoke_test"} + assert singer.config == {"target_schema": "tap_mock_fixture"} diff --git a/tests/test_elx/test_tap.py b/tests/test_elx/test_tap.py index 95dfe77..461087f 100644 --- a/tests/test_elx/test_tap.py +++ b/tests/test_elx/test_tap.py @@ -11,7 +11,7 @@ def test_tap_discovery(tap: Tap): # Make sure the catalog is of the right type. assert type(tap.catalog) == Catalog # Make sure the catalog has the right number of streams. - assert len(tap.catalog.streams) == 1 + assert len(tap.catalog.streams) == 2 # Make sure the streams are of the right type. assert type(tap.catalog.streams[0]) == Stream