Skip to content

Commit

Permalink
Add catalog deselector feature
Browse files Browse the repository at this point in the history
  • Loading branch information
BernardWez committed Nov 10, 2023
1 parent b025769 commit 9fde976
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 22 deletions.
106 changes: 87 additions & 19 deletions elx/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ def name(self) -> str:
def safe_name(self) -> str:
return self.name.replace("-", "_")

@property
def stream_properties(self) -> List[str]:
"""
List with all property names found in the stream_schema.
"""
return list(self.stream_schema.get("properties", {}).keys())

def find_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]:
"""
Find metadata by breadcrumb.
Expand All @@ -30,10 +37,77 @@ def find_by_breadcrumb(self, breadcrumb: List[str]) -> Optional[dict]:

return None

def upsert_metadata(
self,
metadata: dict | None,
breadcrumb: List[str],
is_selected: bool,
) -> None:
"""
Updates or creates metadata for a given breadcrumb.
"""
# Update metadata if it exists
if metadata:
metadata["metadata"]["selected"] = is_selected
# Otherwise create the metadata
else:
self.metadata.append(
{"breadcrumb": breadcrumb, "metadata": {"selected": is_selected}}
)


class Catalog(BaseModel):
streams: List[Stream] = Field(default_factory=list)

def deselect(self, patterns: Optional[List[str]]) -> "Catalog":
# Make a copy of the existing catalog.
catalog = self.copy(deep=True)

# Return catalog if no patterns to deselect.
if patterns is None:
return catalog

# Transform patterns to set for quick look-up.
patterns = set(patterns)

# Loop through the streams.
for stream in catalog.streams:
# Check if the stream name found in deselection patterns.
is_deselected = (stream.tap_stream_id in patterns) or (
stream.safe_name in patterns
)

# Find the stream metadata.
metadata = stream.find_by_breadcrumb(breadcrumb=[])

# Upsert the metadata.
stream.upsert_metadata(
metadata=metadata,
breadcrumb=[],
is_selected=not is_deselected,
)

# Loop over properties of stream.
for stream_property in stream.stream_properties:
# Check if stream property found in deselection patterns.
is_deselected = (
f"{stream.safe_name}.{stream_property}" in patterns
) or (f"{stream.name}.{stream_property}" in patterns)

# Find the stream metadata.
metadata = stream.find_by_breadcrumb(
breadcrumb=["properties", stream_property]
)

# Upsert the metadata.
stream.upsert_metadata(
metadata=metadata,
breadcrumb=["properties", stream_property],
is_selected=not is_deselected,
)

return catalog

def select(self, streams: Optional[List[str]]) -> "Catalog":
# Make a copy of the existing catalog.
catalog = self.copy(deep=True)
Expand All @@ -45,24 +119,18 @@ def select(self, streams: Optional[List[str]]) -> "Catalog":
# Loop through the streams in the catalog.
for stream in catalog.streams:
# Find the stream metadata.
metadata = stream.find_by_breadcrumb([])

# Update the metadata if it exists.
if metadata:
metadata["metadata"]["selected"] = (
stream.tap_stream_id in streams
) or (stream.safe_name in streams)

# Otherwise, create the metadata.
else:
stream.metadata.append(
{
"breadcrumb": [],
"metadata": {
"selected": (stream.tap_stream_id in streams)
or (stream.safe_name in streams),
},
}
)
metadata = stream.find_by_breadcrumb(breadcrumb=[])

# Check if stream is selected
is_selected = (stream.tap_stream_id in streams) or (
stream.safe_name in streams
)

# Upsert the metadata.
stream.upsert_metadata(
metadata=metadata,
breadcrumb=[],
is_selected=is_selected,
)

return catalog
6 changes: 3 additions & 3 deletions elx/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ def __init__(
spec: str,
executable: str | None = None,
config: dict = {},
selected: List[str] = None,
deselected: List[str] = None,
):
super().__init__(spec, executable, config)
self.selected = selected
self.deselected = deselected

def discover(self, config_path: Path) -> dict:
"""
Expand All @@ -45,7 +45,7 @@ def catalog(self) -> Catalog:
with json_temp_file(self.config) as config_path:
catalog = self.discover(config_path)
catalog = Catalog(**catalog)
return catalog.select(streams=self.selected)
return catalog.deselect(patterns=self.deselected)

@contextlib.asynccontextmanager
@require_install
Expand Down

0 comments on commit 9fde976

Please sign in to comment.