From 6c33ddfcfe1901e435a906074d419662fb53b877 Mon Sep 17 00:00:00 2001 From: Bernard Wezeman Date: Fri, 10 Nov 2023 19:35:22 +0100 Subject: [PATCH] Feature/deselect pattern (#22) * Add limit based on Meltano default setting * Add buffer limit to invoke method * Remove limit from invoke * Define default buffer size as constant in Runner class * Add deselection tests * Add catalog deselector feature * Remove changes from feature/buffer-size * Refactored the catalog deselect code --------- Co-authored-by: Jules Huisman --- elx/catalog.py | 122 +++++++++++++++++++++++++++------ elx/tap.py | 6 +- tests/fixtures/tap.py | 2 +- tests/test_elx/test_catalog.py | 47 +++++++++---- 4 files changed, 138 insertions(+), 39 deletions(-) diff --git a/elx/catalog.py b/elx/catalog.py index 755580e..d09d7ce 100644 --- a/elx/catalog.py +++ b/elx/catalog.py @@ -14,26 +14,112 @@ class Stream(BaseModel): @property def name(self) -> str: + """The name of the stream is the stream_id""" return self.tap_stream_id @property def safe_name(self) -> str: + """""" return self.name.replace("-", "_") - def find_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]: + def find_metadata_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]: """ Find metadata by breadcrumb. """ for metadata in self.metadata: if metadata["breadcrumb"] == breadcrumb: - return metadata + return metadata["metadata"] return None + def upsert_metadata( + self, + breadcrumb: List[str], + metadata: dict, + ) -> None: + """ + Updates or creates metadata for a given breadcrumb. + """ + # Find metadata by breadcrumb. + metadata_record = self.find_metadata_by_breadcrumb(breadcrumb=breadcrumb) + + # Update metadata if it exists + if metadata_record: + metadata_record.update(metadata) + + # Otherwise add the metadata + else: + self.metadata.append( + { + "breadcrumb": breadcrumb, + "metadata": metadata, + } + ) + class Catalog(BaseModel): streams: List[Stream] = Field(default_factory=list) + def find_stream(self, stream_id: str) -> Optional[Stream]: + """ + Find a stream by stream_id. + + Args: + stream_id (str): The stream_id to find. + + Returns: + Optional[Stream]: The stream if found, otherwise None. + """ + for stream in self.streams: + if stream.tap_stream_id == stream_id: + return stream + + return None + + def deselect( + self, + patterns: Optional[List[str]] = None, + ) -> "Catalog": + """ + Deselect streams and properties from the catalog. + + Args: + patterns (Optional[List[str]]): List of patterns to deselect. E.g. ["users", "users.email"] + + Returns: + Catalog: A new catalog with deselected streams and properties. + """ + # Make a copy of the existing catalog. + catalog = self.copy(deep=True) + + # Return catalog if no patterns to deselect. + if patterns is None: + return catalog + + for pattern in patterns: + # Split the pattern into nodes. + nodes = pattern.split(".") + + # Find the stream. + stream = catalog.find_stream(nodes[0]) + + # If an invalid stream is found, skip it. + if stream is None: + continue + + # Wether to deselect the stream or a property. + breadcrumb = ["properties"] + nodes[1:] if len(nodes) > 1 else [] + + # Update or create metadata. + stream.upsert_metadata( + breadcrumb=breadcrumb, + metadata={ + "selected": False, + }, + ) + + return catalog + def select(self, streams: Optional[List[str]]) -> "Catalog": # Make a copy of the existing catalog. catalog = self.copy(deep=True) @@ -44,25 +130,17 @@ def select(self, streams: Optional[List[str]]) -> "Catalog": # Loop through the streams in the catalog. for stream in catalog.streams: - # Find the stream metadata. - metadata = stream.find_by_breadcrumb([]) - - # Update the metadata if it exists. - if metadata: - metadata["metadata"]["selected"] = ( - stream.tap_stream_id in streams - ) or (stream.safe_name in streams) - - # Otherwise, create the metadata. - else: - stream.metadata.append( - { - "breadcrumb": [], - "metadata": { - "selected": (stream.tap_stream_id in streams) - or (stream.safe_name in streams), - }, - } - ) + # Check if stream is selected + is_selected = (stream.tap_stream_id in streams) or ( + stream.safe_name in streams + ) + + # Upsert the metadata. + stream.upsert_metadata( + breadcrumb=[], + metadata={ + "selected": is_selected, + }, + ) return catalog diff --git a/elx/tap.py b/elx/tap.py index 3ad1a2d..da352fc 100644 --- a/elx/tap.py +++ b/elx/tap.py @@ -16,10 +16,10 @@ def __init__( spec: str, executable: str | None = None, config: dict = {}, - selected: List[str] = None, + deselected: List[str] = None, ): super().__init__(spec, executable, config) - self.selected = selected + self.deselected = deselected def discover(self, config_path: Path) -> dict: """ @@ -45,7 +45,7 @@ def catalog(self) -> Catalog: with json_temp_file(self.config) as config_path: catalog = self.discover(config_path) catalog = Catalog(**catalog) - return catalog.select(streams=self.selected) + return catalog.deselect(patterns=self.deselected) @contextlib.asynccontextmanager @require_install diff --git a/tests/fixtures/tap.py b/tests/fixtures/tap.py index a77a41b..c16b870 100644 --- a/tests/fixtures/tap.py +++ b/tests/fixtures/tap.py @@ -3,7 +3,7 @@ from elx import Tap -@pytest.fixture +@pytest.fixture(scope="session") def tap() -> Generator[Tap, None, None]: """ Return a Tap instance for the tap-smoke-test executable. diff --git a/tests/test_elx/test_catalog.py b/tests/test_elx/test_catalog.py index 80bcd06..b5f2d1c 100644 --- a/tests/test_elx/test_catalog.py +++ b/tests/test_elx/test_catalog.py @@ -64,21 +64,42 @@ def test_catalog(tap: Tap): assert tap.catalog.dict(by_alias=True) == DEFAULT_CATALOG -def test_catalog_update(tap: Tap): +def test_catalog_select(tap: Tap): + """If we select a stream, the catalog should be updated.""" catalog = tap.catalog.select(["animals"]) + catalog_dict = catalog.dict(by_alias=True) - assert ( - catalog.dict(by_alias=True)["streams"][0]["metadata"][-1]["metadata"][ - "selected" - ] - == True - ) + assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == True catalog = tap.catalog.select([]) + catalog_dict = catalog.dict(by_alias=True) - assert ( - catalog.dict(by_alias=True)["streams"][0]["metadata"][-1]["metadata"][ - "selected" - ] - == False - ) + assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False + + +def test_catalog_no_deselect(tap: Tap): + """If we don't deselect anything, the catalog should be the same.""" + catalog = tap.catalog.deselect() + assert catalog == tap.catalog + + +def test_catalog_deselect_stream(tap: Tap): + """If we deselect a stream, the catalog should be updated.""" + catalog = tap.catalog.deselect(["animals"]) + catalog_dict = catalog.dict(by_alias=True) + + assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False + + +def test_catalog_deselect_invalid_stream(tap: Tap): + """If we try to deselect an invalid stream, the catalog should be the same.""" + catalog = tap.catalog.deselect(["invalid"]) + assert catalog == tap.catalog + + +def test_catalog_deselect_property(tap: Tap): + """If we deselect a property, the catalog should be updated.""" + catalog = tap.catalog.deselect(["animals.id"]) + catalog_dict = catalog.dict(by_alias=True) + + assert catalog_dict["streams"][0]["metadata"][0]["metadata"]["selected"] == False