Skip to content

Commit

Permalink
basic implmentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Payne committed Sep 19, 2023
1 parent e2141e7 commit ac5f713
Show file tree
Hide file tree
Showing 22 changed files with 180 additions and 60 deletions.
13 changes: 12 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ filelock = "^3.11.0"
pyfarmhash = "^0.3.2"
numpy = "^1.24.2"
base58 = "^2.1.1"
petname = "^2.6"

[tool.poetry.group.dev.dependencies]
tox = "^4.4.12"
Expand Down
9 changes: 8 additions & 1 deletion src/singerlake/config.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from pydantic import BaseModel


class GenericPathModel(BaseModel):
"""Generic Path Model."""

segments: tuple[str, ...]
relative: bool = False


class PathConfig(BaseModel):
"""Singer Lake Path Config."""

path_type: str = "hive"
lake_root: tuple[str, ...]
lake_root: GenericPathModel


class LockConfig(BaseModel):
Expand Down
17 changes: 7 additions & 10 deletions src/singerlake/discovery/discovery_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,19 @@ class DiscoveryService:

def __init__(self, singerlake: "Singerlake"):
self.singerlake = singerlake
self.tap_cache: dict | None = None
self._tap_cache: t.List[str] | None = None

def list_taps(self):
def list_taps(self) -> t.List[str]:
"""List available Taps."""
if self.tap_cache is None:
lake_manifest = self.singerlake.manifest_service.get_lake_manifest()
self.tap_cache = {
tap_definition["tap_id"]: Tap(**tap_definition)
for tap_definition in lake_manifest.taps
}
if self._tap_cache is None:
lake_manifest = self.singerlake.manifest_service.lake_manifest
self._tap_cache = lake_manifest.taps

yield iter(self.tap_cache.values())
return self._tap_cache

def get_tap(self, tap_id):
"""Get a Tap by ID."""
lake_manifest = self.singerlake.manifest_service.get_lake_manifest()
lake_manifest = self.singerlake.manifest_service.lake_manifest
for tap_definition in lake_manifest.taps:
if tap_definition["id"] == tap_id:
return Tap(**tap_definition)
Expand Down
16 changes: 9 additions & 7 deletions src/singerlake/manifest/manifest_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import typing as t

from .models import LakeManifest
from .models import LakeManifest, TapManifest

if t.TYPE_CHECKING:
from singerlake import Singerlake
Expand All @@ -18,12 +18,14 @@ def __init__(self, singerlake: "Singerlake"):
def lake_manifest(self):
"""Get the Lake Manifest."""
if self._lake_manifest is None:
(
raw_lake_manifest,
lake_manifest_checksum,
) = self.singerlake.store.read_lake_manifest()
raw_lake_manifest = self.singerlake.store.read_lake_manifest()
if raw_lake_manifest:
raw_lake_manifest["checksum"] = lake_manifest_checksum
self._lake_manifest = LakeManifest.model_validate(raw_lake_manifest)
self._lake_manifest = LakeManifest(**raw_lake_manifest)

return self._lake_manifest

def get_tap_manifest(self, tap_id: str):
"""Get a Tap Manifest by ID."""
tap_manifest = self.singerlake.store.read_tap_manifest(tap_id=tap_id)
if tap_manifest:
return TapManifest(**tap_manifest)
8 changes: 4 additions & 4 deletions src/singerlake/manifest/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pydantic import BaseModel


class StreamDefinition(BaseModel):
class StreamManifest(BaseModel):
"""Stream Manifest."""

stream_id: str
Expand All @@ -13,15 +13,15 @@ class StreamDefinition(BaseModel):
versions: t.Mapping[str, str] = {}


class TapDefinition(BaseModel):
class TapManifest(BaseModel):
"""Tap Manifest."""

tap_id: str
streams: t.List[StreamDefinition] = []
streams: t.List[str] = []


class LakeManifest(BaseModel):
"""Lake Manifest."""

lake_id: str
taps: t.List[TapDefinition] = []
taps: t.List[str] = []
19 changes: 16 additions & 3 deletions src/singerlake/singerlake.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from __future__ import annotations

import typing as t
from uuid import uuid4

from singerlake.config import SingerlakeConfig
from singerlake.discovery import DiscoveryService
from singerlake.manifest import ManifestService
from singerlake.store import StoreService

if t.TYPE_CHECKING:
from singerlake.tap import Tap


class Singerlake:
"""Singer Lake."""
Expand All @@ -21,10 +25,19 @@ def __init__(self, config: dict | None = None):
self.discovery_service = DiscoveryService(singerlake=self)
self.store = StoreService(singerlake=self, config=self.config.store).get_store()

def list_taps(self):
self._lake_id = None

@property
def lake_id(self) -> str:
"""Return the Lake ID."""
if self._lake_id is None:
self._lake_id = self.manifest_service.lake_manifest.lake_id
return self._lake_id

def list_taps(self) -> list[str]:
"""Return Taps stored in this Singerlake."""
return self.discovery_service.list_taps()

def get_tap(self, tap_id: str):
def get_tap(self, tap_id: str) -> "Tap":
"""Return a Tap stored in this Singerlake."""
return self.discovery_service.get_tap(tap_id)
return self.discovery_service.get_tap(tap_id=tap_id)
4 changes: 2 additions & 2 deletions src/singerlake/store/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from .base import BaseStore
from .constant import (
from .local import LocalStore
from .path_manager.constant import (
LAKE_MANIFEST_FILENAME,
STREAM_MANIFEST_FILENAME,
TAP_MANIFEST_FILENAME,
)
from .local import LocalStore
from .store_service import StoreService

__all__ = [
Expand Down
2 changes: 1 addition & 1 deletion src/singerlake/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
TapDefinition,
)

from .constant import (
from .path_manager.constant import (
LAKE_MANIFEST_FILENAME,
STREAM_MANIFEST_FILENAME,
TAP_MANIFEST_FILENAME,
Expand Down
10 changes: 8 additions & 2 deletions src/singerlake/store/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from .base import BaseStore

if t.TYPE_CHECKING:
from singerlake.store.path_manager.base import GenericPath

from .locker.base import BaseLocker
from .path_manager.base import BasePathManager

Expand All @@ -32,14 +34,18 @@ def _read_json(self, file_path: Path):
with file_path.open("r", encoding="utf-8") as json_file:
return json.load(json_file)

def _to_path(self, generic_path: "GenericPath") -> Path:
"""Convert a GenericPath to a pathlib Path."""
return Path(*generic_path.segments)

def read_lake_manifest_checksum(self) -> str | None:
"""Read the Lake Manifest checksum."""
lake_manifest_path = Path(self.path_manager.get_lake_manifest_path())
lake_manifest_path = self._to_path(self.path_manager.lake_manifest_path)
return self._md5(lake_manifest_path)

def read_lake_manifest(self) -> tuple[dict, str] | None:
"""Read the Lake Manifest."""
lake_manifest_path = Path(self.path_manager.get_lake_manifest_path())
lake_manifest_path = self._to_path(self.path_manager.lake_manifest_path)
lake_manifest = self._read_json(lake_manifest_path)
if lake_manifest is not None:
self._lake_manifest_checksum = self.read_lake_manifest_checksum()
Expand Down
13 changes: 12 additions & 1 deletion src/singerlake/store/path_manager/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
from .base import BasePathManager
from .constant import (
LAKE_MANIFEST_FILENAME,
STREAM_MANIFEST_FILENAME,
TAP_MANIFEST_FILENAME,
)
from .path_service import PathService

__all__ = ["BasePathManager", "PathService"]
__all__ = [
"BasePathManager",
"PathService",
"LAKE_MANIFEST_FILENAME",
"TAP_MANIFEST_FILENAME",
"STREAM_MANIFEST_FILENAME",
]
61 changes: 54 additions & 7 deletions src/singerlake/store/path_manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,55 @@
import farmhash
import numpy as np

from .constant import (
LAKE_MANIFEST_FILENAME,
STREAM_MANIFEST_FILENAME,
TAP_MANIFEST_FILENAME,
)

if t.TYPE_CHECKING:
from singerlake.config import PathConfig


class GenericPath:

"""Generic path class."""

def __init__(self, segments: tuple[str], relative: bool = False):
self.segments = segments
self.relative = relative

def __str__(self):
return "/".join(self.segments)

def __repr__(self):
return f"GenericPath({self.segments})"

def __eq__(self, other):
return (self.segments == other.segments) and (self.relative == other.relative)

def __hash__(self):
return hash(self.segments)

def extend(self, *args: str) -> "GenericPath":
"""Extend the path."""
return GenericPath(self.segments + args, relative=self.relative)

@classmethod
def from_dict(cls, data: t.Mapping[str, t.Any]) -> "GenericPath":
"""Extend the path with a dict."""
return GenericPath(data["segments"], relative=data.get("relative", False))

@classmethod
def from_model(cls, model: t.Any) -> "GenericPath":
"""Extend the path with a dict."""
return GenericPath(model.segments, relative=model.relative)


class BasePathManager:
def __init__(self, config: "PathConfig"):
self.config = config
self.lake_root = GenericPath.from_model(self.config.lake_root)

def hash_stream_schema(self, stream_schema: t.Mapping[str, t.Any]) -> str:
"""Calculate a unique short-hash for given schema."""
Expand All @@ -22,11 +64,16 @@ def hash_stream_schema(self, stream_schema: t.Mapping[str, t.Any]) -> str:
return base58.b58encode(int64_hash_bytes).decode("utf-8")

@property
def lake_root(self):
"""Get the lake root."""
return self.config.lake_root

@property
def lake_manifest_path(self) -> tuple[str]:
def lake_manifest_path(self) -> GenericPath:
"""Get the lake manifest path."""
return (self.lake_root, "catalog.json")
return self.lake_root.extend(*("raw", LAKE_MANIFEST_FILENAME))

def get_tap_manifest_path(self, tap_id: str) -> GenericPath:
"""Get the tap manifest path."""
return self.lake_root.extend(*("raw", tap_id, TAP_MANIFEST_FILENAME))

def get_stream_manifest_path(self, tap_id: str, stream_id: str) -> GenericPath:
"""Get the stream manifest path."""
return self.lake_root.extend(
*("raw", tap_id, stream_id, STREAM_MANIFEST_FILENAME)
)
File renamed without changes.
8 changes: 4 additions & 4 deletions src/singerlake/stream/stream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import typing as t

from singerlake.manifest.models import StreamDefinition
from singerlake.manifest.models import StreamManifest

if t.TYPE_CHECKING:
from singerlake import Singerlake
Expand All @@ -12,12 +12,12 @@ def __init__(
self,
singerlake: "Singerlake",
tap: "Tap",
stream_definition: StreamDefinition,
stream_manifest: StreamManifest,
) -> None:
self.singerlake = singerlake
self.tap = tap
self.stream_definition = stream_definition
self.stream_manifest = stream_manifest

@property
def stream_id(self) -> str:
return self.stream_definition.stream_id
return self.stream_manifest.stream_id
Loading

0 comments on commit ac5f713

Please sign in to comment.