Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: commit singer files to store #11

Merged
merged 8 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.vscode/
.DS_Store
.singerlake/
.ruff_cache/

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
12 changes: 6 additions & 6 deletions src/singerlake/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pydantic import BaseModel


class Partition(BaseModel):
class PartitionBy(BaseModel):
"""Partition Model."""

by: t.Literal["year", "month", "day", "hour", "minute", "second"]
Expand All @@ -21,11 +21,11 @@ class PathConfig(BaseModel):

path_type: str = "hive"
lake_root: GenericPathModel
partition_by: t.Optional[t.List[Partition]] = [
Partition(by="year"),
Partition(by="month"),
Partition(by="day"),
Partition(by="hour"),
partition_by: t.Optional[t.List[PartitionBy]] = [
PartitionBy(by="year"),
PartitionBy(by="month"),
PartitionBy(by="day"),
PartitionBy(by="hour"),
]


Expand Down
10 changes: 6 additions & 4 deletions src/singerlake/discovery/discovery_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import typing as t

from singerlake.tap import Tap
Expand All @@ -19,14 +21,14 @@ def __init__(self, singerlake: "Singerlake"):
def list_taps(self) -> t.List[str]:
"""List available Taps."""
if self._tap_cache is None:
lake_manifest = self.singerlake.manifest_service.lake_manifest
lake_manifest = self.singerlake.store.lake_manifest
self._tap_cache = lake_manifest.taps

return self._tap_cache

def get_tap(self, tap_id):
def get_tap(self, tap_id) -> Tap | None:
"""Get a Tap by ID."""
tap_manifest = self.singerlake.manifest_service.get_tap_manifest(tap_id=tap_id)
tap_manifest = self.singerlake.store.get_tap_manifest(tap_id=tap_id)
if tap_manifest:
return Tap(singerlake=self.singerlake, tap_manifest=tap_manifest)
raise ValueError(f"Tap {tap_id} not found.")
return None
4 changes: 0 additions & 4 deletions src/singerlake/manifest/__init__.py

This file was deleted.

Empty file removed src/singerlake/manifest/base.py
Empty file.
39 changes: 0 additions & 39 deletions src/singerlake/manifest/manifest_service.py

This file was deleted.

34 changes: 25 additions & 9 deletions src/singerlake/singerlake.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from uuid import uuid4

from singerlake.config import SingerlakeConfig
from singerlake.discovery import DiscoveryService
from singerlake.manifest import ManifestService
from singerlake.store import StoreService
from singerlake.store import BaseStore, StoreService

from singerlake.discovery import DiscoveryService # isort:skip

if t.TYPE_CHECKING:
from singerlake.tap import Tap
Expand All @@ -23,17 +23,16 @@ def __init__(self, config: dict | None = None):

self.instance_id = str(uuid4())
self.config = SingerlakeConfig(**config_dict)
self.manifest_service = ManifestService(singerlake=self)
self.discovery_service = DiscoveryService(singerlake=self)
self.store = StoreService(singerlake=self, config=self.config.store).get_store()

self._store: BaseStore | None = None
self._lake_id: str | None = None

@property
def lake_id(self) -> str:
"""Return the Lake ID."""
if self._lake_id is None:
self._lake_id = self.manifest_service.lake_manifest.lake_id
self._lake_id = self.store.lake_manifest.lake_id
return self._lake_id

@property
Expand All @@ -51,6 +50,15 @@ def working_dir(self) -> Path:
working_dir.mkdir(parents=True, exist_ok=True)
return working_dir

@property
def store(self) -> BaseStore:
"""Return the store instance."""
if self._store is None:
self._store = StoreService(
singerlake=self, config=self.config.store
).get_store()
return self._store

def clean_working_dir(self) -> None:
"""Clean the local working directory."""
shutil.rmtree(self.working_dir, ignore_errors=True)
Expand All @@ -59,6 +67,14 @@ def list_taps(self) -> list[str]:
"""Return Taps stored in this Singerlake."""
return self.discovery_service.list_taps()

def get_tap(self, tap_id: str) -> "Tap":
"""Return a Tap stored in this Singerlake."""
return self.discovery_service.get_tap(tap_id=tap_id)
def get_tap(self, tap_id: str, create: bool = False) -> "Tap" | None:
"""Return a Tap stored in this Singerlake.

Args:
tap_id: Tap ID.
create: If True, create a new Tap if it does not exist.
"""
tap = self.discovery_service.get_tap(tap_id=tap_id)
if tap is None and create:
tap = self.store.create_tap(tap_id=tap_id)
return tap
59 changes: 58 additions & 1 deletion src/singerlake/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,80 @@
import typing as t
from abc import ABC

from singerlake.store.manifest import LakeManifest, TapManifest

if t.TYPE_CHECKING:
from singerlake import Singerlake
from singerlake.store.locker.base import BaseLocker
from singerlake.store.path_manager.base import BasePathManager
from singerlake.stream.file_writer import SingerFile
from singerlake.tap.tap import Tap


class BaseStore(ABC):
"""Base SingerLake storage interface."""

def __init__(self, locker: "BaseLocker", path_manager: "BasePathManager") -> None:
def __init__(
self,
singerlake: "Singerlake",
locker: "BaseLocker",
path_manager: "BasePathManager",
) -> None:
"""Base SingerLake storage interface."""
self.singerlake = singerlake
self.locker = locker
self.path_manager = path_manager

self._lake_manifest: LakeManifest | None = None
self._lake_manifest_checksum: str | None = None

@property
def lake_root(self) -> t.Any:
"""Return the Lake root path."""
return self.path_manager.lake_root

@property
def lake_manifest(self) -> LakeManifest:
"""Return the Lake Manifest."""
if self._lake_manifest is None:
read_lake_manifest = self.read_lake_manifest()
if read_lake_manifest is not None:
self._lake_manifest = LakeManifest(**read_lake_manifest)
else:
raise ValueError("Lake Manifest not found.")
return self._lake_manifest

@t.final
def get_tap_manifest(self, tap_id: str) -> TapManifest | None:
"""Get a Tap Manifest by ID."""
read_tap_manifest = self.read_tap_manifest(tap_id=tap_id)
return None if read_tap_manifest is None else TapManifest(**read_tap_manifest)

# override these methods to implement a custom store
def get_lake_root(self) -> t.Any:
"""Return the Lake root path."""
raise NotImplementedError()

def read_lake_manifest(self) -> dict | None:
"""Read the Lake Manifest."""
raise NotImplementedError()

def read_tap_manifest(self, tap_id: str) -> dict | None:
"""Read a Tap Manifest."""
raise NotImplementedError()

def read_stream_manifest(self, tap_id: str, stream_id: str) -> dict | None:
"""Read a Stream Manifest."""
raise NotImplementedError()

def create_tap(self, tap_id: str) -> "Tap":
"""Create a Tap."""
raise NotImplementedError()

def commit_stream_files(self, stream_files: list["SingerFile"]) -> None:
"""Commit stream files to storage."""
raise NotImplementedError()

def write_tap_manifest(self, tap_id: str, manifest: TapManifest) -> TapManifest:
"""Write a Tap Manifest."""
raise NotImplementedError()
95 changes: 66 additions & 29 deletions src/singerlake/store/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,50 @@

import hashlib
import json
import shutil
import typing as t
from pathlib import Path

from singerlake.store.manifest import TapManifest
from singerlake.store.path_manager.base import BasePathTransformer
from singerlake.tap import Tap

from .base import BaseStore

if t.TYPE_CHECKING:
from singerlake.store.path_manager.base import GenericPath

from .locker.base import BaseLocker
from .path_manager.base import BasePathManager
from singerlake import Singerlake
from singerlake.store.locker.base import BaseLocker
from singerlake.store.path_manager.base import BasePathManager, GenericPath
from singerlake.stream.file_writer import SingerFile


class LocalPathTransformer(BasePathTransformer):
@staticmethod
def transform(generic_path: GenericPath) -> Path:
"""Transform a GenericPath to a pathlib Path."""
if generic_path.relative:
return Path.cwd() / Path(*generic_path.segments)
return Path(*generic_path.segments)


class LocalStore(BaseStore):
"""Local directory store."""

def __init__(self, locker: "BaseLocker", path_manager: "BasePathManager"):
"""Local directory store."""
super().__init__(locker=locker, path_manager=path_manager)
def __init__(
self,
singerlake: "Singerlake",
locker: "BaseLocker",
path_manager: "BasePathManager",
) -> None:
super().__init__(
singerlake=singerlake, locker=locker, path_manager=path_manager
)
self.path_manager.transformer = LocalPathTransformer()

self._lake_manifest_checksum: str | None = None
@property
def lake_manifest_has_changed(self) -> bool:
"""Return True if the Lake Manifest has changed."""
return self.read_lake_manifest_checksum() != self._lake_manifest_checksum

def _md5(self, file_path: Path):
"""Return the md5 checksum of a file."""
Expand All @@ -33,47 +57,60 @@ def _md5(self, file_path: Path):

def _read_json(self, file_path: Path):
"""Read a JSON file."""
if not file_path.exists():
return None
with file_path.open("r", encoding="utf-8") as json_file:
return json.load(json_file)

def _to_path(self, generic_path: "GenericPath") -> Path:
"""Convert a GenericPath to a pathlib Path."""
return Path(*generic_path.segments)

# Lake Manifest
def read_lake_manifest_checksum(self) -> str | None:
"""Read the Lake Manifest checksum."""
lake_manifest_path = self._to_path(self.path_manager.lake_manifest_path)
return self._md5(lake_manifest_path)
return self._md5(self.path_manager.lake_manifest_path)

def read_lake_manifest(self) -> tuple[dict, str] | None:
def read_lake_manifest(self) -> dict | None:
"""Read the Lake Manifest."""
lake_manifest_path = self._to_path(self.path_manager.lake_manifest_path)
lake_manifest = self._read_json(lake_manifest_path)
lake_manifest = self._read_json(self.path_manager.lake_manifest_path)
if lake_manifest is not None:
self._lake_manifest_checksum = self.read_lake_manifest_checksum()
return lake_manifest
return None

@property
def lake_manifest_has_changed(self) -> bool:
"""Return True if the Lake Manifest has changed."""
return self.read_lake_manifest_checksum() != self._lake_manifest_checksum

# Tap Manifest
def read_tap_manifest(self, tap_id: str) -> dict | None:
"""Read a Tap Manifest."""
tap_manifest_path = self._to_path(
self.path_manager.get_tap_manifest_path(tap_id=tap_id)
)
return self._read_json(tap_manifest_path)
return self._read_json(self.path_manager.get_tap_manifest_path(tap_id=tap_id))

# Stream Manifest
def read_stream_manifest(self, tap_id: str, stream_id: str) -> dict | None:
"""Read a Stream Manifest."""
stream_manifest_path = self._to_path(
return self._read_json(
self.path_manager.get_stream_manifest_path(
tap_id=tap_id, stream_id=stream_id
)
)
return self._read_json(stream_manifest_path)

# Stream Files
def _commit_stream_file(self, stream_file: "SingerFile") -> None:
"""Commit a singer file to storage."""
file_path = self.path_manager.get_stream_file_path(stream_file=stream_file)
if not file_path.parent.exists():
file_path.parent.mkdir(parents=True)
shutil.copy(stream_file.path, file_path)

def commit_stream_files(self, stream_files: list["SingerFile"]) -> None:
"""Commit singer files to storage."""
for stream_file in stream_files:
self._commit_stream_file(stream_file=stream_file)

def create_tap(self, tap_id: str) -> Tap:
"""Create a Tap."""
file_path = self.path_manager.get_tap_path(tap_id=tap_id)
file_path.mkdir(parents=True)
tap_manifest = self.write_tap_manifest(tap_id=tap_id, manifest=TapManifest())
return Tap(**tap_manifest.dict())

def write_tap_manifest(self, tap_id: str, manifest: TapManifest) -> TapManifest:
"""Write a Tap Manifest."""
file_path = self.path_manager.get_tap_manifest_path(tap_id=tap_id)
with file_path.open("w", encoding="utf-8") as json_file:
json.dump(manifest.dict(), json_file, indent=2)
return manifest
Loading
Loading