Skip to content

Commit

Permalink
feat: partition files on write (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
kgpayne authored Sep 21, 2023
1 parent 380db3f commit d1a3fc7
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 72 deletions.
13 changes: 12 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pyfarmhash = "^0.3.2"
numpy = "^1.24.2"
base58 = "^2.1.1"
petname = "^2.6"
flexdict = "^0.0.1a1"

[tool.poetry.group.dev.dependencies]
pytest = "^7.3.0"
Expand Down
12 changes: 12 additions & 0 deletions src/singerlake/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
from pydantic import BaseModel


class Partition(BaseModel):
"""Partition Model."""

by: t.Literal["year", "month", "day", "hour", "minute", "second"]


class GenericPathModel(BaseModel):
"""Generic Path Model."""

Expand All @@ -15,6 +21,12 @@ class PathConfig(BaseModel):

path_type: str = "hive"
lake_root: GenericPathModel
partition_by: t.Optional[t.List[Partition]] = [
Partition(by="year"),
Partition(by="month"),
Partition(by="day"),
Partition(by="hour"),
]


class LockConfig(BaseModel):
Expand Down
12 changes: 12 additions & 0 deletions src/singerlake/singer/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from datetime import datetime


def get_time_extracted(record: dict) -> datetime:
"""Return the time extracted from a record."""
time_extracted = record.get("time_extracted") or record.get("record", {}).get(
"_sdc_extracted_at"
)
if not time_extracted:
raise ValueError("Record does not contain time_extracted")

return datetime.fromisoformat(time_extracted)
33 changes: 16 additions & 17 deletions src/singerlake/stream/file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from pathlib import Path
from uuid import uuid4

import singerlake.singer.utils as su

if t.TYPE_CHECKING:
from .stream import Stream

Expand All @@ -18,6 +20,7 @@ class SingerFileWriter:

def __init__(self, stream: "Stream") -> None:
self.stream = stream
self.records_written = 0

self._tmp_dir: Path | None = None
self._file: TextIOWrapper | None = None
Expand Down Expand Up @@ -64,22 +67,6 @@ def tmp_dir(self, value: Path) -> None:
"""Set the temporary directory."""
self._tmp_dir = value

def _get_time_extracted(self, record: dict) -> datetime:
"""Return the time extracted from a record."""
time_extracted = record.get("time_extracted") or record.get("record", {}).get(
"_sdc_extracted_at"
)
if not time_extracted:
raise ValueError("Record does not contain time_extracted")

return datetime.fromisoformat(time_extracted)

def _open_file(self, tmp_dir: Path) -> TextIOWrapper:
"""Open a file for writing."""
self.file_path = tmp_dir / f"{uuid4()}.jsonl"
self.file = self.file_path.open("w", encoding="utf-8")
return self.file

@property
def file_name(self) -> str:
"""Return the file name."""
Expand All @@ -90,6 +77,17 @@ def file_name(self) -> str:
file_stop_time = self._max_time_extracted.strftime("%Y%m%dT%H%M%SZ")
return f"{self.stream.stream_id}-{file_start_time}-{file_stop_time}.singer"

@property
def closed(self) -> bool:
"""Return True if the file is closed."""
return self._file is None

def _open_file(self, tmp_dir: Path) -> TextIOWrapper:
"""Open a file for writing."""
self.file_path = tmp_dir / f"{uuid4()}.jsonl"
self.file = self.file_path.open("w", encoding="utf-8")
return self.file

def open(self) -> SingerFileWriter:
"""Create a temporary directory and new file to write records to."""
if self._tmp_dir is None:
Expand Down Expand Up @@ -119,7 +117,7 @@ def write_record(self, record: dict) -> None:
if self._file is None:
raise ValueError("File not open")

time_extracted = self._get_time_extracted(record)
time_extracted = su.get_time_extracted(record)

if self._min_time_extracted is None:
self._min_time_extracted = time_extracted
Expand All @@ -135,6 +133,7 @@ def write_record(self, record: dict) -> None:

payload = json.dumps(record, ensure_ascii=False)
self.file.write(f"{payload}\n")
self.records_written += 1

def write_schema(self, schema: dict) -> None:
"""Write a schema to the file."""
Expand Down
71 changes: 34 additions & 37 deletions src/singerlake/stream/record_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import typing as t
from pathlib import Path

from flexdict import FlexDict

import singerlake.singer.utils as su

from .file_writer import SingerFileWriter

if t.TYPE_CHECKING:
Expand All @@ -20,50 +24,43 @@ def __init__(self, stream: Stream, output_dir: Path) -> None:
self.output_dir = output_dir

self.files: list[Path] = []
self._current_file: SingerFileWriter | None = None
self._record_count = 0

@property
def current_file(self) -> SingerFileWriter:
"""Return the current file."""
if self._current_file is None:
raise ValueError("File not open.")

return self._current_file

@current_file.setter
def current_file(self, value: SingerFileWriter) -> None:
"""Set the current file."""
self._current_file = value

def open(self) -> RecordWriter:
self.current_file = SingerFileWriter(stream=self.stream).open()
return self
self.is_finalized = False
self._open_files: FlexDict = FlexDict()

def close(self):
"""Finalize the last file."""
if self._current_file is None:
raise ValueError("File not open.")

self._finalize_current_file()

def _finalize_current_file(self):
finalized_file_path = self.current_file.close(output_dir=self.output_dir)
self._current_file = None
def _finalize_file(self, file: SingerFileWriter) -> None:
finalized_file_path = file.close(output_dir=self.output_dir)
self.files.append(finalized_file_path)

def _new_file(self, partition: t.Tuple[t.Any, ...]) -> SingerFileWriter:
"""Return a new file."""
file = SingerFileWriter(stream=self.stream).open()
self._open_files.set(keys=partition, value=file)
return file

def write(self, schema: dict, record: dict) -> None:
"""Write a record to the stream."""

if self._record_count == MAX_RECORD_COUNT:
self._finalize_current_file()
# partition the record
time_extracted = su.get_time_extracted(record)
partition = self.stream.partition_record(time_extracted) or ("default",)
file = self._open_files.get(partition)

if not file or file.closed:
file = self._new_file(partition)

if file.records_written == MAX_RECORD_COUNT:
self._finalize_file(file)
# open a new file
self.current_file = SingerFileWriter(stream=self.stream).open()
self._record_count = 0
file = self._new_file(partition)

if self._record_count == 0:
if file.records_written == 0:
# write the stream schema
self.current_file.write_schema(schema)
file.write_schema(schema)

file.write_record(record)

self.current_file.write_record(record)
self._record_count += 1
def finalize(self) -> None:
"""Finalize the stream."""
for file in self._open_files.values(nested=True):
self._finalize_file(file)
self.is_finalized = True
27 changes: 23 additions & 4 deletions src/singerlake/stream/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from .record_writer import RecordWriter

if t.TYPE_CHECKING:
from datetime import datetime
from pathlib import Path

from singerlake import Singerlake
from singerlake.config import Partition
from singerlake.tap import Tap


Expand All @@ -18,22 +20,39 @@ class Stream:
- committing files to storage
"""

def __init__(self, singerlake: "Singerlake", tap: "Tap", stream_id: str) -> None:
def __init__(
self,
singerlake: "Singerlake",
tap: "Tap",
stream_id: str,
) -> None:
self.singerlake = singerlake
self.tap = tap
self.stream_id = stream_id
self.partitions: t.List["Partition"] = (
self.singerlake.config.store.path.partition_by or []
)

self.files: list[Path] = []

def partition_record(self, time_extracted: "datetime") -> t.Tuple[str, ...]:
"""Partition a record."""
partitions = [
getattr(time_extracted, partition.by) for partition in self.partitions
]
return tuple(partitions)

@contextmanager
def record_writer(self):
"""Create a record writer for this stream."""
writer = RecordWriter(stream=self, output_dir=self.singerlake.working_dir)
writer = RecordWriter(
stream=self,
output_dir=self.singerlake.working_dir,
)
try:
writer.open()
yield writer
finally:
writer.close()
writer.finalize()
self.files.extend(writer.files)

def commit(self):
Expand Down
Loading

0 comments on commit d1a3fc7

Please sign in to comment.