Skip to content

Commit

Permalink
Refactored the catalog deselect code
Browse files Browse the repository at this point in the history
  • Loading branch information
JulesHuisman committed Nov 10, 2023
1 parent ec87b42 commit e5d7e3b
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 77 deletions.
118 changes: 64 additions & 54 deletions elx/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,98 +14,110 @@ class Stream(BaseModel):

@property
def name(self) -> str:
"""The name of the stream is the stream_id"""
return self.tap_stream_id

@property
def safe_name(self) -> str:
""""""
return self.name.replace("-", "_")

@property
def stream_properties(self) -> List[str]:
"""
List with all property names found in the stream_schema.
"""
return list(self.stream_schema.get("properties", {}).keys())

def find_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]:
def find_metadata_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]:
"""
Find metadata by breadcrumb.
"""
for metadata in self.metadata:
if metadata["breadcrumb"] == breadcrumb:
return metadata
return metadata["metadata"]

return None

def upsert_metadata(
self,
metadata: dict | None,
breadcrumb: List[str],
is_selected: bool,
metadata: dict,
) -> None:
"""
Updates or creates metadata for a given breadcrumb.
"""
# Find metadata by breadcrumb.
metadata_record = self.find_metadata_by_breadcrumb(breadcrumb=breadcrumb)

# Update metadata if it exists
if metadata:
metadata["metadata"]["selected"] = is_selected
# Otherwise create the metadata
if metadata_record:
metadata_record.update(metadata)

# Otherwise add the metadata
else:
self.metadata.append(
{"breadcrumb": breadcrumb, "metadata": {"selected": is_selected}}
{
"breadcrumb": breadcrumb,
"metadata": metadata,
}
)


class Catalog(BaseModel):
streams: List[Stream] = Field(default_factory=list)

def deselect(self, patterns: Optional[List[str]]) -> "Catalog":
def find_stream(self, stream_id: str) -> Optional[Stream]:
"""
Find a stream by stream_id.
Args:
stream_id (str): The stream_id to find.
Returns:
Optional[Stream]: The stream if found, otherwise None.
"""
for stream in self.streams:
if stream.tap_stream_id == stream_id:
return stream

return None

def deselect(
self,
patterns: Optional[List[str]] = None,
) -> "Catalog":
"""
Deselect streams and properties from the catalog.
Args:
patterns (Optional[List[str]]): List of patterns to deselect. E.g. ["users", "users.email"]
Returns:
Catalog: A new catalog with deselected streams and properties.
"""
# Make a copy of the existing catalog.
catalog = self.copy(deep=True)

# Return catalog if no patterns to deselect.
if patterns is None:
return catalog

# Transform patterns to set for quick look-up.
patterns = set(patterns)
for pattern in patterns:
# Split the pattern into nodes.
nodes = pattern.split(".")

# Loop through the streams.
for stream in catalog.streams:
# Check if the stream name found in deselection patterns.
is_deselected = (stream.tap_stream_id in patterns) or (
stream.safe_name in patterns
)
# Find the stream.
stream = catalog.find_stream(nodes[0])

# Find the stream metadata.
metadata = stream.find_by_breadcrumb(breadcrumb=[])
# If an invalid stream is found, skip it.
if stream is None:
continue

# Upsert the metadata.
# Wether to deselect the stream or a property.
breadcrumb = ["properties"] + nodes[1:] if len(nodes) > 1 else []

# Update or create metadata.
stream.upsert_metadata(
metadata=metadata,
breadcrumb=[],
is_selected=not is_deselected,
breadcrumb=breadcrumb,
metadata={
"selected": False,
},
)

# Loop over properties of stream.
for stream_property in stream.stream_properties:
# Check if stream property found in deselection patterns.
is_deselected = (
f"{stream.safe_name}.{stream_property}" in patterns
) or (f"{stream.name}.{stream_property}" in patterns)

# Find the stream metadata.
metadata = stream.find_by_breadcrumb(
breadcrumb=["properties", stream_property]
)

# Upsert the metadata.
stream.upsert_metadata(
metadata=metadata,
breadcrumb=["properties", stream_property],
is_selected=not is_deselected,
)

return catalog

def select(self, streams: Optional[List[str]]) -> "Catalog":
Expand All @@ -118,19 +130,17 @@ def select(self, streams: Optional[List[str]]) -> "Catalog":

# Loop through the streams in the catalog.
for stream in catalog.streams:
# Find the stream metadata.
metadata = stream.find_by_breadcrumb(breadcrumb=[])

# Check if stream is selected
is_selected = (stream.tap_stream_id in streams) or (
stream.safe_name in streams
)

# Upsert the metadata.
stream.upsert_metadata(
metadata=metadata,
breadcrumb=[],
is_selected=is_selected,
metadata={
"selected": is_selected,
},
)

return catalog
2 changes: 1 addition & 1 deletion tests/fixtures/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from elx import Tap


@pytest.fixture
@pytest.fixture(scope="session")
def tap() -> Generator[Tap, None, None]:
"""
Return a Tap instance for the tap-smoke-test executable.
Expand Down
45 changes: 23 additions & 22 deletions tests/test_elx/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,40 +65,41 @@ def test_catalog(tap: Tap):


def test_catalog_select(tap: Tap):
"""If we select a stream, the catalog should be updated."""
catalog = tap.catalog.select(["animals"])
catalog_dict = catalog.dict(by_alias=True)

assert (
catalog.dict(by_alias=True)["streams"][0]["metadata"][-1]["metadata"][
"selected"
]
== True
)
assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == True

catalog = tap.catalog.select([])
catalog_dict = catalog.dict(by_alias=True)

assert (
catalog.dict(by_alias=True)["streams"][0]["metadata"][-1]["metadata"][
"selected"
]
== False
)
assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False


def test_catalog_no_deselect(tap: Tap):
"""If we don't deselect anything, the catalog should be the same."""
catalog = tap.catalog.deselect()
assert catalog == tap.catalog


def test_catalog_deselect_stream(tap: Tap):
"""If we deselect a stream, the catalog should be updated."""
catalog = tap.catalog.deselect(["animals"])
catalog_dict = catalog.dict(by_alias=True)

assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False


assert (
catalog.dict(by_alias=True)["streams"][0]["metadata"][-1]["metadata"][
"selected"
]
== False
)
def test_catalog_deselect_invalid_stream(tap: Tap):
"""If we try to deselect an invalid stream, the catalog should be the same."""
catalog = tap.catalog.deselect(["invalid"])
assert catalog == tap.catalog


def test_catalog_deselect_property(tap: Tap):
"""If we deselect a property, the catalog should be updated."""
catalog = tap.catalog.deselect(["animals.id"])
catalog_dict = catalog.dict(by_alias=True)

assert (
catalog.dict(by_alias=True)["streams"][0]["metadata"][0]["metadata"]["selected"]
== False
)
assert catalog_dict["streams"][0]["metadata"][0]["metadata"]["selected"] == False

0 comments on commit e5d7e3b

Please sign in to comment.