Skip to content

Commit

Permalink
Feature/deselect pattern (#22)
Browse files Browse the repository at this point in the history
* Add limit based on Meltano default setting

* Add buffer limit to invoke method

* Remove limit from invoke

* Define default buffer size as constant in Runner class

* Add deselection tests

* Add catalog deselector feature

* Remove changes from feature/buffer-size

* Refactored the catalog deselect code

---------

Co-authored-by: Jules Huisman <[email protected]>
  • Loading branch information
BernardWez and JulesHuisman authored Nov 10, 2023
1 parent 454e761 commit 6c33ddf
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 39 deletions.
122 changes: 100 additions & 22 deletions elx/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,112 @@ class Stream(BaseModel):

@property
def name(self) -> str:
"""The name of the stream is the stream_id"""
return self.tap_stream_id

@property
def safe_name(self) -> str:
""""""
return self.name.replace("-", "_")

def find_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]:
def find_metadata_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]:
"""
Find metadata by breadcrumb.
"""
for metadata in self.metadata:
if metadata["breadcrumb"] == breadcrumb:
return metadata
return metadata["metadata"]

return None

def upsert_metadata(
self,
breadcrumb: List[str],
metadata: dict,
) -> None:
"""
Updates or creates metadata for a given breadcrumb.
"""
# Find metadata by breadcrumb.
metadata_record = self.find_metadata_by_breadcrumb(breadcrumb=breadcrumb)

# Update metadata if it exists
if metadata_record:
metadata_record.update(metadata)

# Otherwise add the metadata
else:
self.metadata.append(
{
"breadcrumb": breadcrumb,
"metadata": metadata,
}
)


class Catalog(BaseModel):
streams: List[Stream] = Field(default_factory=list)

def find_stream(self, stream_id: str) -> Optional[Stream]:
"""
Find a stream by stream_id.
Args:
stream_id (str): The stream_id to find.
Returns:
Optional[Stream]: The stream if found, otherwise None.
"""
for stream in self.streams:
if stream.tap_stream_id == stream_id:
return stream

return None

def deselect(
self,
patterns: Optional[List[str]] = None,
) -> "Catalog":
"""
Deselect streams and properties from the catalog.
Args:
patterns (Optional[List[str]]): List of patterns to deselect. E.g. ["users", "users.email"]
Returns:
Catalog: A new catalog with deselected streams and properties.
"""
# Make a copy of the existing catalog.
catalog = self.copy(deep=True)

# Return catalog if no patterns to deselect.
if patterns is None:
return catalog

for pattern in patterns:
# Split the pattern into nodes.
nodes = pattern.split(".")

# Find the stream.
stream = catalog.find_stream(nodes[0])

# If an invalid stream is found, skip it.
if stream is None:
continue

# Wether to deselect the stream or a property.
breadcrumb = ["properties"] + nodes[1:] if len(nodes) > 1 else []

# Update or create metadata.
stream.upsert_metadata(
breadcrumb=breadcrumb,
metadata={
"selected": False,
},
)

return catalog

def select(self, streams: Optional[List[str]]) -> "Catalog":
# Make a copy of the existing catalog.
catalog = self.copy(deep=True)
Expand All @@ -44,25 +130,17 @@ def select(self, streams: Optional[List[str]]) -> "Catalog":

# Loop through the streams in the catalog.
for stream in catalog.streams:
# Find the stream metadata.
metadata = stream.find_by_breadcrumb([])

# Update the metadata if it exists.
if metadata:
metadata["metadata"]["selected"] = (
stream.tap_stream_id in streams
) or (stream.safe_name in streams)

# Otherwise, create the metadata.
else:
stream.metadata.append(
{
"breadcrumb": [],
"metadata": {
"selected": (stream.tap_stream_id in streams)
or (stream.safe_name in streams),
},
}
)
# Check if stream is selected
is_selected = (stream.tap_stream_id in streams) or (
stream.safe_name in streams
)

# Upsert the metadata.
stream.upsert_metadata(
breadcrumb=[],
metadata={
"selected": is_selected,
},
)

return catalog
6 changes: 3 additions & 3 deletions elx/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ def __init__(
spec: str,
executable: str | None = None,
config: dict = {},
selected: List[str] = None,
deselected: List[str] = None,
):
super().__init__(spec, executable, config)
self.selected = selected
self.deselected = deselected

def discover(self, config_path: Path) -> dict:
"""
Expand All @@ -45,7 +45,7 @@ def catalog(self) -> Catalog:
with json_temp_file(self.config) as config_path:
catalog = self.discover(config_path)
catalog = Catalog(**catalog)
return catalog.select(streams=self.selected)
return catalog.deselect(patterns=self.deselected)

@contextlib.asynccontextmanager
@require_install
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from elx import Tap


@pytest.fixture
@pytest.fixture(scope="session")
def tap() -> Generator[Tap, None, None]:
"""
Return a Tap instance for the tap-smoke-test executable.
Expand Down
47 changes: 34 additions & 13 deletions tests/test_elx/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,42 @@ def test_catalog(tap: Tap):
assert tap.catalog.dict(by_alias=True) == DEFAULT_CATALOG


def test_catalog_update(tap: Tap):
def test_catalog_select(tap: Tap):
"""If we select a stream, the catalog should be updated."""
catalog = tap.catalog.select(["animals"])
catalog_dict = catalog.dict(by_alias=True)

assert (
catalog.dict(by_alias=True)["streams"][0]["metadata"][-1]["metadata"][
"selected"
]
== True
)
assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == True

catalog = tap.catalog.select([])
catalog_dict = catalog.dict(by_alias=True)

assert (
catalog.dict(by_alias=True)["streams"][0]["metadata"][-1]["metadata"][
"selected"
]
== False
)
assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False


def test_catalog_no_deselect(tap: Tap):
"""If we don't deselect anything, the catalog should be the same."""
catalog = tap.catalog.deselect()
assert catalog == tap.catalog


def test_catalog_deselect_stream(tap: Tap):
"""If we deselect a stream, the catalog should be updated."""
catalog = tap.catalog.deselect(["animals"])
catalog_dict = catalog.dict(by_alias=True)

assert catalog_dict["streams"][0]["metadata"][-1]["metadata"]["selected"] == False


def test_catalog_deselect_invalid_stream(tap: Tap):
"""If we try to deselect an invalid stream, the catalog should be the same."""
catalog = tap.catalog.deselect(["invalid"])
assert catalog == tap.catalog


def test_catalog_deselect_property(tap: Tap):
"""If we deselect a property, the catalog should be updated."""
catalog = tap.catalog.deselect(["animals.id"])
catalog_dict = catalog.dict(by_alias=True)

assert catalog_dict["streams"][0]["metadata"][0]["metadata"]["selected"] == False

0 comments on commit 6c33ddf

Please sign in to comment.