Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Payne committed Sep 22, 2023
1 parent dc9cc91 commit 387567a
Show file tree
Hide file tree
Showing 15 changed files with 226 additions and 88 deletions.
8 changes: 4 additions & 4 deletions src/singerlake/discovery/discovery_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ def __init__(self, singerlake: "Singerlake"):
def list_taps(self) -> t.List[str]:
"""List available Taps."""
if self._tap_cache is None:
lake_manifest = self.singerlake.manifest_service.lake_manifest
lake_manifest = self.singerlake.store.lake_manifest
self._tap_cache = lake_manifest.taps

return self._tap_cache

def get_tap(self, tap_id):
def get_tap(self, tap_id) -> Tap | None:
"""Get a Tap by ID."""
tap_manifest = self.singerlake.manifest_service.get_tap_manifest(tap_id=tap_id)
tap_manifest = self.singerlake.store.get_tap_manifest(tap_id=tap_id)
if tap_manifest:
return Tap(singerlake=self.singerlake, tap_manifest=tap_manifest)
raise ValueError(f"Tap {tap_id} not found.")
return None
4 changes: 0 additions & 4 deletions src/singerlake/manifest/__init__.py

This file was deleted.

Empty file removed src/singerlake/manifest/base.py
Empty file.
39 changes: 0 additions & 39 deletions src/singerlake/manifest/manifest_service.py

This file was deleted.

31 changes: 23 additions & 8 deletions src/singerlake/singerlake.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@

from singerlake.config import SingerlakeConfig
from singerlake.discovery import DiscoveryService
from singerlake.manifest import ManifestService
from singerlake.store import StoreService
from singerlake.store import BaseStore, StoreService

if t.TYPE_CHECKING:
from singerlake.tap import Tap
Expand All @@ -23,17 +22,16 @@ def __init__(self, config: dict | None = None):

self.instance_id = str(uuid4())
self.config = SingerlakeConfig(**config_dict)
self.manifest_service = ManifestService(singerlake=self)
self.discovery_service = DiscoveryService(singerlake=self)
self.store = StoreService(singerlake=self, config=self.config.store).get_store()

self._store: BaseStore | None = None
self._lake_id: str | None = None

@property
def lake_id(self) -> str:
"""Return the Lake ID."""
if self._lake_id is None:
self._lake_id = self.manifest_service.lake_manifest.lake_id
self._lake_id = self.store.lake_manifest.lake_id
return self._lake_id

@property
Expand All @@ -51,6 +49,15 @@ def working_dir(self) -> Path:
working_dir.mkdir(parents=True, exist_ok=True)
return working_dir

@property
def store(self) -> BaseStore:
"""Return the store instance."""
if self._store is None:
self._store = StoreService(
singerlake=self, config=self.config.store
).get_store()
return self._store

def clean_working_dir(self) -> None:
"""Clean the local working directory."""
shutil.rmtree(self.working_dir, ignore_errors=True)
Expand All @@ -59,6 +66,14 @@ def list_taps(self) -> list[str]:
"""Return Taps stored in this Singerlake."""
return self.discovery_service.list_taps()

def get_tap(self, tap_id: str) -> "Tap":
"""Return a Tap stored in this Singerlake."""
return self.discovery_service.get_tap(tap_id=tap_id)
def get_tap(self, tap_id: str, create: bool = False) -> "Tap" | None:
"""Return a Tap stored in this Singerlake.
Args:
tap_id: Tap ID.
create: If True, create a new Tap if it does not exist.
"""
tap = self.discovery_service.get_tap(tap_id=tap_id)
if tap is None and create:
tap = self.store.create_tap(tap_id=tap_id)
return tap
60 changes: 55 additions & 5 deletions src/singerlake/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,64 @@
import typing as t
from abc import ABC

if t.TYPE_CHECKING:
from pathlib import Path
from singerlake.store.manifest import LakeManifest, TapManifest

if t.TYPE_CHECKING:
from singerlake import Singerlake
from singerlake.store.locker.base import BaseLocker
from singerlake.store.path_manager.base import BasePathManager
from singerlake.stream.stream import Stream
from singerlake.stream.file_writer import SingerFile
from singerlake.tap.tap import Tap


class BaseStore(ABC):
"""Base SingerLake storage interface."""

def __init__(self, locker: "BaseLocker", path_manager: "BasePathManager") -> None:
def __init__(
self,
singerlake: "Singerlake",
locker: "BaseLocker",
path_manager: "BasePathManager",
) -> None:
"""Base SingerLake storage interface."""
self.singerlake = singerlake
self.locker = locker
self.path_manager = path_manager

self._lake_manifest: LakeManifest | None = None
self._lake_manifest_checksum: str | None = None

@property
def lake_root(self) -> t.Any:
"""Return the Lake root path."""
return self.get_lake_root()

@property
def lake_manifest(self) -> LakeManifest:
"""Return the Lake Manifest."""
if self._lake_manifest is None:
read_lake_manifest = self.read_lake_manifest()
if read_lake_manifest is not None:
self._lake_manifest = LakeManifest(**read_lake_manifest)
else:
raise ValueError("Lake Manifest not found.")
return self._lake_manifest

@t.final
def get_tap_manifest(self, tap_id: str) -> TapManifest | None:
"""Get a Tap Manifest by ID."""
read_tap_manifest = self.read_tap_manifest(tap_id=tap_id)
return None if read_tap_manifest is None else TapManifest(**read_tap_manifest)

# override these methods to implement a custom store
def get_lake_root(self) -> t.Any:
"""Return the Lake root path."""
raise NotImplementedError()

def read_lake_manifest(self) -> dict | None:
"""Read the Lake Manifest."""
raise NotImplementedError()

def read_tap_manifest(self, tap_id: str) -> dict | None:
"""Read a Tap Manifest."""
raise NotImplementedError()
Expand All @@ -27,6 +69,14 @@ def read_stream_manifest(self, tap_id: str, stream_id: str) -> dict | None:
"""Read a Stream Manifest."""
raise NotImplementedError()

def commit_stream_files(self, stream: "Stream", stream_files: list["Path"]) -> None:
def create_tap(self, tap_id: str) -> "Tap":
"""Create a Tap."""
raise NotImplementedError()

def commit_stream_files(self, stream_files: list["SingerFile"]) -> None:
"""Commit stream files to storage."""
raise NotImplementedError()

def write_tap_manifest(self, tap_id: str, manifest: TapManifest) -> TapManifest:
"""Write a Tap Manifest."""
raise NotImplementedError()
60 changes: 42 additions & 18 deletions src/singerlake/store/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,23 @@

import hashlib
import json
import shutil
import typing as t
from pathlib import Path

from singerlake.store.manifest import TapManifest
from singerlake.tap import Tap

from .base import BaseStore

if t.TYPE_CHECKING:
from singerlake.store.path_manager.base import GenericPath
from singerlake.stream.stream import Stream

from .locker.base import BaseLocker
from .path_manager.base import BasePathManager
from singerlake.stream.file_writer import SingerFile


class LocalStore(BaseStore):
"""Local directory store."""

def __init__(self, locker: "BaseLocker", path_manager: "BasePathManager"):
"""Local directory store."""
super().__init__(locker=locker, path_manager=path_manager)

self._lake_manifest_checksum: str | None = None

def _md5(self, file_path: Path):
"""Return the md5 checksum of a file."""
hash_md5 = hashlib.md5()
Expand All @@ -34,20 +29,28 @@ def _md5(self, file_path: Path):

def _read_json(self, file_path: Path):
"""Read a JSON file."""
if not file_path.exists():
return None
with file_path.open("r", encoding="utf-8") as json_file:
return json.load(json_file)

def _to_path(self, generic_path: "GenericPath") -> Path:
"""Convert a GenericPath to a pathlib Path."""
return Path(*generic_path.segments)

# Lake Manifest
def get_lake_root(self) -> Path:
"""Return the Lake root path."""
generic_path = self.path_manager.lake_root
if generic_path.relative:
return Path.cwd() / Path(*generic_path.segments)
return Path(*generic_path.segments)

def read_lake_manifest_checksum(self) -> str | None:
"""Read the Lake Manifest checksum."""
lake_manifest_path = self._to_path(self.path_manager.lake_manifest_path)
return self._md5(lake_manifest_path)

def read_lake_manifest(self) -> tuple[dict, str] | None:
def read_lake_manifest(self) -> dict | None:
"""Read the Lake Manifest."""
lake_manifest_path = self._to_path(self.path_manager.lake_manifest_path)
lake_manifest = self._read_json(lake_manifest_path)
Expand Down Expand Up @@ -80,11 +83,32 @@ def read_stream_manifest(self, tap_id: str, stream_id: str) -> dict | None:
return self._read_json(stream_manifest_path)

# Stream Files
def _commit_stream_file(self, stream: "Stream", stream_file: "Path") -> None:
"""Commit a stream file to storage."""
pass
def _commit_stream_file(self, stream_file: "SingerFile") -> None:
"""Commit a singer file to storage."""
file_path = self._to_path(
self.path_manager.get_stream_file_path(stream_file=stream_file)
)
if not file_path.parent.exists():
file_path.parent.mkdir(parents=True)
shutil.copy(stream_file.path, file_path)

def commit_stream_files(self, stream: "Stream", stream_files: list["Path"]) -> None:
"""Commit stream files to storage."""
def commit_stream_files(self, stream_files: list["SingerFile"]) -> None:
"""Commit singer files to storage."""
for stream_file in stream_files:
self._commit_stream_file(stream=stream, stream_file=stream_file)
self._commit_stream_file(stream_file=stream_file)

def create_tap(self, tap_id: str) -> Tap:
"""Create a Tap."""
file_path = self._to_path(self.path_manager.get_tap_path(tap_id=tap_id))
file_path.mkdir(parents=True)
tap_manifest = self.write_tap_manifest(tap_id=tap_id, manifest=TapManifest())
return Tap(**tap_manifest.dict())

def write_tap_manifest(self, tap_id: str, manifest: TapManifest) -> TapManifest:
"""Write a Tap Manifest."""
file_path = self._to_path(
self.path_manager.get_tap_manifest_path(tap_id=tap_id)
)
with file_path.open("w", encoding="utf-8") as json_file:
json.dump(manifest.dict(), json_file, indent=2)
return manifest
File renamed without changes.
25 changes: 22 additions & 3 deletions src/singerlake/store/path_manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

if t.TYPE_CHECKING:
from singerlake.config import PathConfig
from singerlake.stream.file_writer import SingerFile


class GenericPath:
Expand Down Expand Up @@ -70,12 +71,30 @@ def lake_manifest_path(self) -> GenericPath:
"""Get the lake manifest path."""
return self.lake_root.extend(*("raw", LAKE_MANIFEST_FILENAME))

def get_tap_path(self, tap_id: str) -> GenericPath:
"""Get the tap path."""
return self.lake_root.extend(*("raw", tap_id))

def get_tap_manifest_path(self, tap_id: str) -> GenericPath:
"""Get the tap manifest path."""
return self.lake_root.extend(*("raw", tap_id, TAP_MANIFEST_FILENAME))
return self.get_tap_path(tap_id=tap_id).extend(TAP_MANIFEST_FILENAME)

def get_stream_path(self, tap_id: str, stream_id: str) -> GenericPath:
"""Get the stream path."""
return self.get_tap_path(tap_id=tap_id).extend(stream_id)

def get_stream_manifest_path(self, tap_id: str, stream_id: str) -> GenericPath:
"""Get the stream manifest path."""
return self.lake_root.extend(
*("raw", tap_id, stream_id, STREAM_MANIFEST_FILENAME)
return self.get_stream_path(tap_id=tap_id, stream_id=stream_id).extend(
STREAM_MANIFEST_FILENAME
)

def get_stream_file_path(self, stream_file: "SingerFile") -> GenericPath:
"""Get the stream file path."""
return (
self.get_stream_path(
tap_id=stream_file.tap_id, stream_id=stream_file.stream_id
)
.extend(self.hash_stream_schema(stream_file.schema))
.extend(stream_file.filename)
)
6 changes: 3 additions & 3 deletions src/singerlake/store/store_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def get_store(self) -> "BaseStore":
singerlake=self.singerlake
)
path_manager = PathService(config=self.config.path).get_path_manager()

if self.config.store_type == "local":
return LocalStore(locker=locker, path_manager=path_manager)

return LocalStore(
singerlake=self.singerlake, locker=locker, path_manager=path_manager
)
raise ValueError(f"Unknown store type: {self.config.store_type}")
Loading

0 comments on commit 387567a

Please sign in to comment.