diff --git a/poetry.lock b/poetry.lock index 195ce1e..701f890 100644 --- a/poetry.lock +++ b/poetry.lock @@ -290,6 +290,17 @@ files = [ {file = "pathspec-0.11.2.tar.gz", hash = "sha256:e0d8d0ac2f12da61956eb2306b69f9469b42f4deb0f3cb6ed47b9cce9996ced3"}, ] +[[package]] +name = "petname" +version = "2.6" +description = "Generate human-readable, random object names" +category = "main" +optional = false +python-versions = "*" +files = [ + {file = "petname-2.6.tar.gz", hash = "sha256:981c31ef772356a373640d1bb7c67c102e0159eda14578c67a1c99d5b34c9e4c"}, +] + [[package]] name = "platformdirs" version = "3.10.0" @@ -509,4 +520,4 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess [metadata] lock-version = "2.0" python-versions = ">=3.8,<3.12" -content-hash = "eca2e0b9855012e2b572979bdacc928b97e61ce16eb2e90431c912841b771c5b" +content-hash = "49775160f0532737c14ecf229fb3de71fe16d45d831f01b2c46f4da7cba53d10" diff --git a/pyproject.toml b/pyproject.toml index f35ce4b..1161c55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ filelock = "^3.11.0" pyfarmhash = "^0.3.2" numpy = "^1.24.2" base58 = "^2.1.1" +petname = "^2.6" [tool.poetry.group.dev.dependencies] tox = "^4.4.12" diff --git a/src/singerlake/config.py b/src/singerlake/config.py index bf05c8a..a933841 100644 --- a/src/singerlake/config.py +++ b/src/singerlake/config.py @@ -1,11 +1,18 @@ from pydantic import BaseModel +class GenericPathModel(BaseModel): + """Generic Path Model.""" + + segments: tuple[str, ...] + relative: bool = False + + class PathConfig(BaseModel): """Singer Lake Path Config.""" path_type: str = "hive" - lake_root: tuple[str, ...] + lake_root: GenericPathModel class LockConfig(BaseModel): diff --git a/src/singerlake/discovery/discovery_service.py b/src/singerlake/discovery/discovery_service.py index 6581246..99103c6 100644 --- a/src/singerlake/discovery/discovery_service.py +++ b/src/singerlake/discovery/discovery_service.py @@ -14,22 +14,19 @@ class DiscoveryService: def __init__(self, singerlake: "Singerlake"): self.singerlake = singerlake - self.tap_cache: dict | None = None + self._tap_cache: t.List[str] | None = None - def list_taps(self): + def list_taps(self) -> t.List[str]: """List available Taps.""" - if self.tap_cache is None: - lake_manifest = self.singerlake.manifest_service.get_lake_manifest() - self.tap_cache = { - tap_definition["tap_id"]: Tap(**tap_definition) - for tap_definition in lake_manifest.taps - } + if self._tap_cache is None: + lake_manifest = self.singerlake.manifest_service.lake_manifest + self._tap_cache = lake_manifest.taps - yield iter(self.tap_cache.values()) + return self._tap_cache def get_tap(self, tap_id): """Get a Tap by ID.""" - lake_manifest = self.singerlake.manifest_service.get_lake_manifest() + lake_manifest = self.singerlake.manifest_service.lake_manifest for tap_definition in lake_manifest.taps: if tap_definition["id"] == tap_id: return Tap(**tap_definition) diff --git a/src/singerlake/manifest/manifest_service.py b/src/singerlake/manifest/manifest_service.py index 0a5f077..ca964a6 100644 --- a/src/singerlake/manifest/manifest_service.py +++ b/src/singerlake/manifest/manifest_service.py @@ -1,6 +1,6 @@ import typing as t -from .models import LakeManifest +from .models import LakeManifest, TapManifest if t.TYPE_CHECKING: from singerlake import Singerlake @@ -18,12 +18,14 @@ def __init__(self, singerlake: "Singerlake"): def lake_manifest(self): """Get the Lake Manifest.""" if self._lake_manifest is None: - ( - raw_lake_manifest, - lake_manifest_checksum, - ) = self.singerlake.store.read_lake_manifest() + raw_lake_manifest = self.singerlake.store.read_lake_manifest() if raw_lake_manifest: - raw_lake_manifest["checksum"] = lake_manifest_checksum - self._lake_manifest = LakeManifest.model_validate(raw_lake_manifest) + self._lake_manifest = LakeManifest(**raw_lake_manifest) return self._lake_manifest + + def get_tap_manifest(self, tap_id: str): + """Get a Tap Manifest by ID.""" + tap_manifest = self.singerlake.store.read_tap_manifest(tap_id=tap_id) + if tap_manifest: + return TapManifest(**tap_manifest) diff --git a/src/singerlake/manifest/models.py b/src/singerlake/manifest/models.py index 9ba80f8..faf27d8 100644 --- a/src/singerlake/manifest/models.py +++ b/src/singerlake/manifest/models.py @@ -4,7 +4,7 @@ from pydantic import BaseModel -class StreamDefinition(BaseModel): +class StreamManifest(BaseModel): """Stream Manifest.""" stream_id: str @@ -13,15 +13,15 @@ class StreamDefinition(BaseModel): versions: t.Mapping[str, str] = {} -class TapDefinition(BaseModel): +class TapManifest(BaseModel): """Tap Manifest.""" tap_id: str - streams: t.List[StreamDefinition] = [] + streams: t.List[str] = [] class LakeManifest(BaseModel): """Lake Manifest.""" lake_id: str - taps: t.List[TapDefinition] = [] + taps: t.List[str] = [] diff --git a/src/singerlake/singerlake.py b/src/singerlake/singerlake.py index d180bfd..6e48668 100644 --- a/src/singerlake/singerlake.py +++ b/src/singerlake/singerlake.py @@ -1,5 +1,6 @@ from __future__ import annotations +import typing as t from uuid import uuid4 from singerlake.config import SingerlakeConfig @@ -7,6 +8,9 @@ from singerlake.manifest import ManifestService from singerlake.store import StoreService +if t.TYPE_CHECKING: + from singerlake.tap import Tap + class Singerlake: """Singer Lake.""" @@ -21,10 +25,19 @@ def __init__(self, config: dict | None = None): self.discovery_service = DiscoveryService(singerlake=self) self.store = StoreService(singerlake=self, config=self.config.store).get_store() - def list_taps(self): + self._lake_id = None + + @property + def lake_id(self) -> str: + """Return the Lake ID.""" + if self._lake_id is None: + self._lake_id = self.manifest_service.lake_manifest.lake_id + return self._lake_id + + def list_taps(self) -> list[str]: """Return Taps stored in this Singerlake.""" return self.discovery_service.list_taps() - def get_tap(self, tap_id: str): + def get_tap(self, tap_id: str) -> "Tap": """Return a Tap stored in this Singerlake.""" - return self.discovery_service.get_tap(tap_id) + return self.discovery_service.get_tap(tap_id=tap_id) diff --git a/src/singerlake/store/__init__.py b/src/singerlake/store/__init__.py index ef2eeda..15e1534 100644 --- a/src/singerlake/store/__init__.py +++ b/src/singerlake/store/__init__.py @@ -1,10 +1,10 @@ from .base import BaseStore -from .constant import ( +from .local import LocalStore +from .path_manager.constant import ( LAKE_MANIFEST_FILENAME, STREAM_MANIFEST_FILENAME, TAP_MANIFEST_FILENAME, ) -from .local import LocalStore from .store_service import StoreService __all__ = [ diff --git a/src/singerlake/store/base.py b/src/singerlake/store/base.py index 2cdd9f3..0cb5bb0 100644 --- a/src/singerlake/store/base.py +++ b/src/singerlake/store/base.py @@ -12,7 +12,7 @@ TapDefinition, ) -from .constant import ( +from .path_manager.constant import ( LAKE_MANIFEST_FILENAME, STREAM_MANIFEST_FILENAME, TAP_MANIFEST_FILENAME, diff --git a/src/singerlake/store/local.py b/src/singerlake/store/local.py index 972bf25..f5432af 100644 --- a/src/singerlake/store/local.py +++ b/src/singerlake/store/local.py @@ -6,6 +6,8 @@ from .base import BaseStore if t.TYPE_CHECKING: + from singerlake.store.path_manager.base import GenericPath + from .locker.base import BaseLocker from .path_manager.base import BasePathManager @@ -32,14 +34,18 @@ def _read_json(self, file_path: Path): with file_path.open("r", encoding="utf-8") as json_file: return json.load(json_file) + def _to_path(self, generic_path: "GenericPath") -> Path: + """Convert a GenericPath to a pathlib Path.""" + return Path(*generic_path.segments) + def read_lake_manifest_checksum(self) -> str | None: """Read the Lake Manifest checksum.""" - lake_manifest_path = Path(self.path_manager.get_lake_manifest_path()) + lake_manifest_path = self._to_path(self.path_manager.lake_manifest_path) return self._md5(lake_manifest_path) def read_lake_manifest(self) -> tuple[dict, str] | None: """Read the Lake Manifest.""" - lake_manifest_path = Path(self.path_manager.get_lake_manifest_path()) + lake_manifest_path = self._to_path(self.path_manager.lake_manifest_path) lake_manifest = self._read_json(lake_manifest_path) if lake_manifest is not None: self._lake_manifest_checksum = self.read_lake_manifest_checksum() diff --git a/src/singerlake/store/path_manager/__init__.py b/src/singerlake/store/path_manager/__init__.py index a8f5757..b9ce6ab 100644 --- a/src/singerlake/store/path_manager/__init__.py +++ b/src/singerlake/store/path_manager/__init__.py @@ -1,4 +1,15 @@ from .base import BasePathManager +from .constant import ( + LAKE_MANIFEST_FILENAME, + STREAM_MANIFEST_FILENAME, + TAP_MANIFEST_FILENAME, +) from .path_service import PathService -__all__ = ["BasePathManager", "PathService"] +__all__ = [ + "BasePathManager", + "PathService", + "LAKE_MANIFEST_FILENAME", + "TAP_MANIFEST_FILENAME", + "STREAM_MANIFEST_FILENAME", +] diff --git a/src/singerlake/store/path_manager/base.py b/src/singerlake/store/path_manager/base.py index ee8cd17..9580fa1 100644 --- a/src/singerlake/store/path_manager/base.py +++ b/src/singerlake/store/path_manager/base.py @@ -5,13 +5,55 @@ import farmhash import numpy as np +from .constant import ( + LAKE_MANIFEST_FILENAME, + STREAM_MANIFEST_FILENAME, + TAP_MANIFEST_FILENAME, +) + if t.TYPE_CHECKING: from singerlake.config import PathConfig +class GenericPath: + + """Generic path class.""" + + def __init__(self, segments: tuple[str], relative: bool = False): + self.segments = segments + self.relative = relative + + def __str__(self): + return "/".join(self.segments) + + def __repr__(self): + return f"GenericPath({self.segments})" + + def __eq__(self, other): + return (self.segments == other.segments) and (self.relative == other.relative) + + def __hash__(self): + return hash(self.segments) + + def extend(self, *args: str) -> "GenericPath": + """Extend the path.""" + return GenericPath(self.segments + args, relative=self.relative) + + @classmethod + def from_dict(cls, data: t.Mapping[str, t.Any]) -> "GenericPath": + """Extend the path with a dict.""" + return GenericPath(data["segments"], relative=data.get("relative", False)) + + @classmethod + def from_model(cls, model: t.Any) -> "GenericPath": + """Extend the path with a dict.""" + return GenericPath(model.segments, relative=model.relative) + + class BasePathManager: def __init__(self, config: "PathConfig"): self.config = config + self.lake_root = GenericPath.from_model(self.config.lake_root) def hash_stream_schema(self, stream_schema: t.Mapping[str, t.Any]) -> str: """Calculate a unique short-hash for given schema.""" @@ -22,11 +64,16 @@ def hash_stream_schema(self, stream_schema: t.Mapping[str, t.Any]) -> str: return base58.b58encode(int64_hash_bytes).decode("utf-8") @property - def lake_root(self): - """Get the lake root.""" - return self.config.lake_root - - @property - def lake_manifest_path(self) -> tuple[str]: + def lake_manifest_path(self) -> GenericPath: """Get the lake manifest path.""" - return (self.lake_root, "catalog.json") + return self.lake_root.extend(*("raw", LAKE_MANIFEST_FILENAME)) + + def get_tap_manifest_path(self, tap_id: str) -> GenericPath: + """Get the tap manifest path.""" + return self.lake_root.extend(*("raw", tap_id, TAP_MANIFEST_FILENAME)) + + def get_stream_manifest_path(self, tap_id: str, stream_id: str) -> GenericPath: + """Get the stream manifest path.""" + return self.lake_root.extend( + *("raw", tap_id, stream_id, STREAM_MANIFEST_FILENAME) + ) diff --git a/src/singerlake/store/constant.py b/src/singerlake/store/path_manager/constant.py similarity index 100% rename from src/singerlake/store/constant.py rename to src/singerlake/store/path_manager/constant.py diff --git a/src/singerlake/stream/stream.py b/src/singerlake/stream/stream.py index 8aa8536..a0b4a10 100644 --- a/src/singerlake/stream/stream.py +++ b/src/singerlake/stream/stream.py @@ -1,6 +1,6 @@ import typing as t -from singerlake.manifest.models import StreamDefinition +from singerlake.manifest.models import StreamManifest if t.TYPE_CHECKING: from singerlake import Singerlake @@ -12,12 +12,12 @@ def __init__( self, singerlake: "Singerlake", tap: "Tap", - stream_definition: StreamDefinition, + stream_manifest: StreamManifest, ) -> None: self.singerlake = singerlake self.tap = tap - self.stream_definition = stream_definition + self.stream_manifest = stream_manifest @property def stream_id(self) -> str: - return self.stream_definition.stream_id + return self.stream_manifest.stream_id diff --git a/src/singerlake/tap/tap.py b/src/singerlake/tap/tap.py index f53496e..47e3575 100644 --- a/src/singerlake/tap/tap.py +++ b/src/singerlake/tap/tap.py @@ -4,39 +4,44 @@ if t.TYPE_CHECKING: from singerlake import Singerlake - from singerlake.manifest.models import TapDefinition + from singerlake.manifest.models import TapManifest class Tap: """Tap.""" - def __init__( - self, singerlake: "Singerlake", tap_definition: "TapDefinition" - ) -> None: + def __init__(self, singerlake: "Singerlake", tap_manifest: "TapManifest") -> None: """Tap.""" self.singerlake = singerlake - self.tap_definition = tap_definition - self.stream_cache = None + self.tap_manifest = tap_manifest + self._stream_cache = None @property def tap_id(self) -> str: """Tap ID.""" - return self.tap_definition.tap_id + return self.tap_manifest.tap_id @property - def streams(self) -> list[Stream]: + def streams(self) -> t.Mapping[str, Stream]: """Streams.""" - if self.stream_cache is None: - self.stream_cache = { - stream_definition.stream_id: Stream( + if self._stream_cache is None: + self._stream_cache = {} + for stream_id in self.tap_manifest.streams: + stream_manifest = self.singerlake.manifest_service.get_stream_manifest( + tap_id=self.tap_id, stream_id=stream_id + ) + self._stream_cache[stream_id] = Stream( singerlake=self.singerlake, tap=self, - stream_definition=stream_definition, + stream_manifest=stream_manifest, ) - for stream_definition in self.tap_definition.streams - } - return [self.stream_cache[stream_id] for stream_id in self.stream_cache] + return self._stream_cache + + @property + def stream_ids(self) -> list[str]: + """Stream IDs.""" + return self.tap_manifest.streams def get_stream(self, stream_id: str) -> Stream: """Get Stream.""" - return self.stream_cache.get(stream_id) + return self.streams.get(stream_id) diff --git a/tests/conftest.py b/tests/conftest.py index c7a4b39..193f3fd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,10 @@ def singerlake_config(): "store_type": "local", "path": { "path_type": "hive", - "lake_root": ("tests", "data", "lake"), + "lake_root": { + "segments": ("tests", "data", "lake"), + "relative": True, + }, }, "lock": { "lock_type": "local", diff --git a/tests/data/lake/raw/manifest.json b/tests/data/lake/raw/manifest.json new file mode 100644 index 0000000..d5f50f9 --- /dev/null +++ b/tests/data/lake/raw/manifest.json @@ -0,0 +1,4 @@ +{ + "lake_id": "sound-oryx", + "taps": ["tap-carbon-intensity"] +} diff --git a/tests/data/lake/raw/tap-carbon-intensity/entry/manifest.json b/tests/data/lake/raw/tap-carbon-intensity/entry/manifest.json new file mode 100644 index 0000000..e69de29 diff --git a/tests/data/lake/raw/tap-carbon-intensity/generationmix/manifest.json b/tests/data/lake/raw/tap-carbon-intensity/generationmix/manifest.json new file mode 100644 index 0000000..e69de29 diff --git a/tests/data/lake/raw/tap-carbon-intensity/manifest.json b/tests/data/lake/raw/tap-carbon-intensity/manifest.json new file mode 100644 index 0000000..e69de29 diff --git a/tests/data/lake/raw/tap-carbon-intensity/region/manifest.json b/tests/data/lake/raw/tap-carbon-intensity/region/manifest.json new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_singerlake.py b/tests/test_singerlake.py index cef1558..5d7d562 100644 --- a/tests/test_singerlake.py +++ b/tests/test_singerlake.py @@ -7,3 +7,16 @@ def test_singerlake(singerlake): assert singerlake.store is not None assert singerlake.manifest_service is not None assert singerlake.discovery_service is not None + + +def test_discovery(singerlake): + assert singerlake.lake_id == "sound-oryx" + assert singerlake.list_taps() == ["tap-carbon-intensity"] + + tap = singerlake.get_tap("tap-carbon-intensity") + + assert tap.stream_ids == [ + "entry", + "generationmix", + "region", + ]