diff --git a/elx/catalog.py b/elx/catalog.py index 3c99fa4..d09d7ce 100644 --- a/elx/catalog.py +++ b/elx/catalog.py @@ -14,52 +14,81 @@ class Stream(BaseModel): @property def name(self) -> str: + """The name of the stream is the stream_id""" return self.tap_stream_id @property def safe_name(self) -> str: + """""" return self.name.replace("-", "_") - @property - def stream_properties(self) -> List[str]: - """ - List with all property names found in the stream_schema. - """ - return list(self.stream_schema.get("properties", {}).keys()) - - def find_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]: + def find_metadata_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]: """ Find metadata by breadcrumb. """ for metadata in self.metadata: if metadata["breadcrumb"] == breadcrumb: - return metadata + return metadata["metadata"] return None def upsert_metadata( self, - metadata: dict | None, breadcrumb: List[str], - is_selected: bool, + metadata: dict, ) -> None: """ Updates or creates metadata for a given breadcrumb. """ + # Find metadata by breadcrumb. + metadata_record = self.find_metadata_by_breadcrumb(breadcrumb=breadcrumb) + # Update metadata if it exists - if metadata: - metadata["metadata"]["selected"] = is_selected - # Otherwise create the metadata + if metadata_record: + metadata_record.update(metadata) + + # Otherwise add the metadata else: self.metadata.append( - {"breadcrumb": breadcrumb, "metadata": {"selected": is_selected}} + { + "breadcrumb": breadcrumb, + "metadata": metadata, + } ) class Catalog(BaseModel): streams: List[Stream] = Field(default_factory=list) - def deselect(self, patterns: Optional[List[str]]) -> "Catalog": + def find_stream(self, stream_id: str) -> Optional[Stream]: + """ + Find a stream by stream_id. + + Args: + stream_id (str): The stream_id to find. + + Returns: + Optional[Stream]: The stream if found, otherwise None. + """ + for stream in self.streams: + if stream.tap_stream_id == stream_id: + return stream + + return None + + def deselect( + self, + patterns: Optional[List[str]] = None, + ) -> "Catalog": + """ + Deselect streams and properties from the catalog. + + Args: + patterns (Optional[List[str]]): List of patterns to deselect. E.g. ["users", "users.email"] + + Returns: + Catalog: A new catalog with deselected streams and properties. + """ # Make a copy of the existing catalog. catalog = self.copy(deep=True) @@ -67,45 +96,28 @@ def deselect(self, patterns: Optional[List[str]]) -> "Catalog": if patterns is None: return catalog - # Transform patterns to set for quick look-up. - patterns = set(patterns) + for pattern in patterns: + # Split the pattern into nodes. + nodes = pattern.split(".") - # Loop through the streams. - for stream in catalog.streams: - # Check if the stream name found in deselection patterns. - is_deselected = (stream.tap_stream_id in patterns) or ( - stream.safe_name in patterns - ) + # Find the stream. + stream = catalog.find_stream(nodes[0]) - # Find the stream metadata. - metadata = stream.find_by_breadcrumb(breadcrumb=[]) + # If an invalid stream is found, skip it. + if stream is None: + continue - # Upsert the metadata. + # Wether to deselect the stream or a property. + breadcrumb = ["properties"] + nodes[1:] if len(nodes) > 1 else [] + + # Update or create metadata. stream.upsert_metadata( - metadata=metadata, - breadcrumb=[], - is_selected=not is_deselected, + breadcrumb=breadcrumb, + metadata={ + "selected": False, + }, ) - # Loop over properties of stream. - for stream_property in stream.stream_properties: - # Check if stream property found in deselection patterns. - is_deselected = ( - f"{stream.safe_name}.{stream_property}" in patterns - ) or (f"{stream.name}.{stream_property}" in patterns) - - # Find the stream metadata. - metadata = stream.find_by_breadcrumb( - breadcrumb=["properties", stream_property] - ) - - # Upsert the metadata. - stream.upsert_metadata( - metadata=metadata, - breadcrumb=["properties", stream_property], - is_selected=not is_deselected, - ) - return catalog def select(self, streams: Optional[List[str]]) -> "Catalog": @@ -118,9 +130,6 @@ def select(self, streams: Optional[List[str]]) -> "Catalog": # Loop through the streams in the catalog. for stream in catalog.streams: - # Find the stream metadata. - metadata = stream.find_by_breadcrumb(breadcrumb=[]) - # Check if stream is selected is_selected = (stream.tap_stream_id in streams) or ( stream.safe_name in streams @@ -128,9 +137,10 @@ def select(self, streams: Optional[List[str]]) -> "Catalog": # Upsert the metadata. stream.upsert_metadata( - metadata=metadata, breadcrumb=[], - is_selected=is_selected, + metadata={ + "selected": is_selected, + }, ) return catalog diff --git a/tests/fixtures/tap.py b/tests/fixtures/tap.py index a77a41b..c16b870 100644 --- a/tests/fixtures/tap.py +++ b/tests/fixtures/tap.py @@ -3,7 +3,7 @@ from elx import Tap -@pytest.fixture +@pytest.fixture(scope="session") def tap() -> Generator[Tap, None, None]: """ Return a Tap instance for the tap-smoke-test executable. diff --git a/tests/test_elx/test_catalog.py b/tests/test_elx/test_catalog.py index 843e569..b5f2d1c 100644 --- a/tests/test_elx/test_catalog.py +++ b/tests/test_elx/test_catalog.py @@ -65,40 +65,41 @@ def test_catalog(tap: Tap): def test_catalog_select(tap: Tap): + """If we select a stream, the catalog should be updated.""" catalog = tap.catalog.select(["animals"]) + catalog_dict = catalog.dict(by_alias=True) - assert ( - catalog.dict(by_alias=True)["streams"][0]["metadata"][-1]["metadata"][ - "selected" - ] - == True - ) + assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == True catalog = tap.catalog.select([]) + catalog_dict = catalog.dict(by_alias=True) - assert ( - catalog.dict(by_alias=True)["streams"][0]["metadata"][-1]["metadata"][ - "selected" - ] - == False - ) + assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False + + +def test_catalog_no_deselect(tap: Tap): + """If we don't deselect anything, the catalog should be the same.""" + catalog = tap.catalog.deselect() + assert catalog == tap.catalog def test_catalog_deselect_stream(tap: Tap): + """If we deselect a stream, the catalog should be updated.""" catalog = tap.catalog.deselect(["animals"]) + catalog_dict = catalog.dict(by_alias=True) + + assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False + - assert ( - catalog.dict(by_alias=True)["streams"][0]["metadata"][-1]["metadata"][ - "selected" - ] - == False - ) +def test_catalog_deselect_invalid_stream(tap: Tap): + """If we try to deselect an invalid stream, the catalog should be the same.""" + catalog = tap.catalog.deselect(["invalid"]) + assert catalog == tap.catalog def test_catalog_deselect_property(tap: Tap): + """If we deselect a property, the catalog should be updated.""" catalog = tap.catalog.deselect(["animals.id"]) + catalog_dict = catalog.dict(by_alias=True) - assert ( - catalog.dict(by_alias=True)["streams"][0]["metadata"][0]["metadata"]["selected"] - == False - ) + assert catalog_dict["streams"][0]["metadata"][0]["metadata"]["selected"] == False