Skip to content

Commit

Permalink
feat: Add Parquet as a batch encoding option (meltano#2044)
Browse files Browse the repository at this point in the history
* Add parquet encoding enum and dataclass

* WIP

* Add parquet support and tests

* Write fastparquet file with fs

* Change open_with argument

* Update fastparquet.write

* WIP

* WIP

* Adding s3fs as dependency

* Remove s3fs

* Remove fastparquet add pyarrow as dependency

* Add parquet dependency

* Add support for gzip and snappy compression types for parquet

* Add pyarrow as a core dependency

* Add numpy for python 3.7-3.11

* Add schema parsing

* Change dict to Dict for parsing types

* Added Batch Factory

* Remove pyarrow as core dependency and wrap logic in dependency checks

* Added missing quotes

* Removed json schema to pyarrow schema support

Don't currently have a way to support different numeric types like
Decimal.decimal. Reverting to using pyarrow's
schema inference.

* Updated poetry.lock to add pyarrow as extra

* Updated formating

* Updated for readability

* Added tests to account for missing pyarrow install

* Addressed ambiguous type issue

* Adding type ignore

* Added type ignore to correct location

* Update singer_sdk/batch.py

* Adding back normal imports

* mypy: install extras

* Ignore missig pyarrow types

* Move batchers to contrib modules

* Increase test coverage

* Fix types

* Test batcher and target

---------

Co-authored-by: Edgar R. M <[email protected]>
Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
  • Loading branch information
3 people authored Nov 28, 2023
1 parent 2289173 commit 24127d0
Show file tree
Hide file tree
Showing 21 changed files with 486 additions and 77 deletions.
4 changes: 2 additions & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
def mypy(session: Session) -> None:
"""Check types with mypy."""
args = session.posargs or ["singer_sdk"]
session.install(".")
session.install(".[s3,testing,parquet]")
session.install(
"mypy",
"pytest",
Expand All @@ -77,7 +77,7 @@ def mypy(session: Session) -> None:
@session(python=python_versions)
def tests(session: Session) -> None:
"""Execute pytest tests and compute coverage."""
session.install(".[s3]")
session.install(".[s3,parquet]")
session.install(*test_dependencies)

sqlalchemy_version = os.environ.get("SQLALCHEMY_VERSION")
Expand Down
27 changes: 14 additions & 13 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 17 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ backports-datetime-fromisoformat = { version = ">=2.0.1", python = "<3.11" }
click = "~=8.0"
cryptography = ">=3.4.6,<42.0.0"
fs = ">=2.4.16"
importlib-metadata = {version = "<7.0.0", markers = "python_version < \"3.8\""}
importlib-metadata = {version = "<7.0.0", python = "<3.12"}
importlib-resources = {version = ">=5.12.0", markers = "python_version < \"3.9\""}
inflection = ">=0.5.1"
joblib = ">=1.0.1"
Expand Down Expand Up @@ -82,6 +82,16 @@ sphinx-reredirects = {version = ">=0.1.1", optional = true}
# File storage dependencies installed as optional 'filesystem' extras
fs-s3fs = {version = ">=1.1.1", optional = true}

# Parquet file dependencies installed as optional 'parquet' extras
numpy = [
{ version = "<1.22", python = "<3.8", optional = true },
{ version = ">=1.22", python = ">=3.8", optional = true },
]
pyarrow = [
{ version = ">=11", python = "<3.8", optional = true },
{ version = ">=13", python = ">=3.8", optional = true }
]

# Testing dependencies installed as optional 'testing' extras
pytest = {version=">=7.2.1", optional = true}
pytest-durations = {version = ">=1.2.0", optional = true}
Expand All @@ -102,6 +112,7 @@ testing = [
"pytest",
"pytest-durations"
]
parquet = ["numpy", "pyarrow"]

[tool.poetry.group.dev.dependencies]
coverage = [
Expand All @@ -114,10 +125,6 @@ mypy = [
{ version = ">=1.0,<1.5", python = "<3.8" },
{ version = ">=1.0", python = ">=3.8" },
]
numpy = [
{ version = "<1.22", python = "<3.8" },
{ version = ">=1.22", python = ">=3.8" },
]
pyarrow = [
{ version = ">=11,<13", python = "<3.8" },
{ version = ">=11", python = ">=3.8" }
Expand Down Expand Up @@ -217,6 +224,7 @@ module = [
"backports.datetime_fromisoformat.*",
"joblib.*", # TODO: Remove when https://github.com/joblib/joblib/issues/1516 is shipped
"jsonpath_ng.*",
"pyarrow.*", # TODO: Remove when https://github.com/apache/arrow/issues/32609 if implemented and released
]

[build-system]
Expand All @@ -226,6 +234,10 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
pytest11 = { reference = "singer_sdk:testing.pytest_plugin", extras = ["testing"], type = "console" }

[tool.poetry.plugins."singer_sdk.batch_encoders"]
jsonl = "singer_sdk.contrib.batch_encoder_jsonl:JSONLinesBatcher"
parquet = "singer_sdk.contrib.batch_encoder_parquet:ParquetBatcher"

[tool.ruff]
line-length = 88
src = ["samples", "singer_sdk", "tests"]
Expand Down
4 changes: 2 additions & 2 deletions samples/sample_tap_countries/schemas/continents.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"type": "object",
"properties": {
"code": { "type": ["null", "string"] },
"name": { "type": ["null", "string"] }
"code": { "type": ["string", "null"] },
"name": { "type": ["string", "null"] }
}
}
89 changes: 55 additions & 34 deletions singer_sdk/batch.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,35 @@
"""Batching utilities for Singer SDK."""
from __future__ import annotations

import gzip
import itertools
import json
import typing as t
import warnings
from abc import ABC, abstractmethod
from uuid import uuid4

from singer_sdk.helpers._compat import entry_points

if t.TYPE_CHECKING:
from singer_sdk.helpers._batch import BatchConfig

_T = t.TypeVar("_T")


def __getattr__(name: str) -> t.Any: # noqa: ANN401 # pragma: no cover
if name == "JSONLinesBatcher":
warnings.warn(
"The class JSONLinesBatcher was moved to singer_sdk.contrib.batch_encoder_jsonl.", # noqa: E501
DeprecationWarning,
stacklevel=2,
)

from singer_sdk.contrib.batch_encoder_jsonl import JSONLinesBatcher

return JSONLinesBatcher

msg = f"module {__name__} has no attribute {name}"
raise AttributeError(msg)


def lazy_chunked_generator(
iterable: t.Iterable[_T],
chunk_size: int,
Expand Down Expand Up @@ -71,41 +87,46 @@ def get_batches(
raise NotImplementedError


class JSONLinesBatcher(BaseBatcher):
"""JSON Lines Record Batcher."""
class Batcher(BaseBatcher):
"""Determines batch type and then serializes batches to that format."""

def get_batches(
self,
records: t.Iterator[dict],
) -> t.Iterator[list[str]]:
"""Yield manifest of batches.
def get_batches(self, records: t.Iterator[dict]) -> t.Iterator[list[str]]:
"""Manifest of batches.
Args:
records: The records to batch.
Yields:
Returns:
A list of file paths (called a manifest).
"""
sync_id = f"{self.tap_name}--{self.stream_name}-{uuid4()}"
prefix = self.batch_config.storage.prefix or ""

for i, chunk in enumerate(
lazy_chunked_generator(
records,
self.batch_config.batch_size,
),
start=1,
):
filename = f"{prefix}{sync_id}-{i}.json.gz"
with self.batch_config.storage.fs(create=True) as fs:
# TODO: Determine compression from config.
with fs.open(filename, "wb") as f, gzip.GzipFile(
fileobj=f,
mode="wb",
) as gz:
gz.writelines(
(json.dumps(record, default=str) + "\n").encode()
for record in chunk
)
file_url = fs.geturl(filename)
yield [file_url]
encoding_format = self.batch_config.encoding.format
batcher_type = self.get_batcher(encoding_format)
batcher = batcher_type(
self.tap_name,
self.stream_name,
self.batch_config,
)
return batcher.get_batches(records)

@classmethod
def get_batcher(cls, name: str) -> type[BaseBatcher]:
"""Get a batcher by name.
Args:
name: The name of the batcher.
Returns:
The batcher class.
Raises:
ValueError: If the batcher is not found.
"""
plugins = entry_points(group="singer_sdk.batch_encoders")

try:
plugin = next(filter(lambda x: x.name == name, plugins))
except StopIteration:
message = f"Unsupported batcher: {name}"
raise ValueError(message) from None

return plugin.load() # type: ignore[no-any-return]
1 change: 1 addition & 0 deletions singer_sdk/contrib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Singer SDK contrib modules."""
52 changes: 52 additions & 0 deletions singer_sdk/contrib/batch_encoder_jsonl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""JSON Lines Record Batcher."""

from __future__ import annotations

import gzip
import json
import typing as t
from uuid import uuid4

from singer_sdk.batch import BaseBatcher, lazy_chunked_generator

__all__ = ["JSONLinesBatcher"]


class JSONLinesBatcher(BaseBatcher):
"""JSON Lines Record Batcher."""

def get_batches(
self,
records: t.Iterator[dict],
) -> t.Iterator[list[str]]:
"""Yield manifest of batches.
Args:
records: The records to batch.
Yields:
A list of file paths (called a manifest).
"""
sync_id = f"{self.tap_name}--{self.stream_name}-{uuid4()}"
prefix = self.batch_config.storage.prefix or ""

for i, chunk in enumerate(
lazy_chunked_generator(
records,
self.batch_config.batch_size,
),
start=1,
):
filename = f"{prefix}{sync_id}-{i}.json.gz"
with self.batch_config.storage.fs(create=True) as fs:
# TODO: Determine compression from config.
with fs.open(filename, "wb") as f, gzip.GzipFile(
fileobj=f,
mode="wb",
) as gz:
gz.writelines(
(json.dumps(record, default=str) + "\n").encode()
for record in chunk
)
file_url = fs.geturl(filename)
yield [file_url]
54 changes: 54 additions & 0 deletions singer_sdk/contrib/batch_encoder_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Parquet Record Batcher."""

from __future__ import annotations

import typing as t
from uuid import uuid4

from singer_sdk.batch import BaseBatcher, lazy_chunked_generator

__all__ = ["ParquetBatcher"]


class ParquetBatcher(BaseBatcher):
"""Parquet Record Batcher."""

def get_batches(
self,
records: t.Iterator[dict],
) -> t.Iterator[list[str]]:
"""Yield manifest of batches.
Args:
records: The records to batch.
Yields:
A list of file paths (called a manifest).
"""
import pyarrow as pa
import pyarrow.parquet as pq

sync_id = f"{self.tap_name}--{self.stream_name}-{uuid4()}"
prefix = self.batch_config.storage.prefix or ""

for i, chunk in enumerate(
lazy_chunked_generator(
records,
self.batch_config.batch_size,
),
start=1,
):
filename = f"{prefix}{sync_id}={i}.parquet"
if self.batch_config.encoding.compression == "gzip":
filename = f"{filename}.gz"
with self.batch_config.storage.fs() as fs:
with fs.open(filename, "wb") as f:
pylist = list(chunk)
table = pa.Table.from_pylist(pylist)
if self.batch_config.encoding.compression == "gzip":
pq.write_table(table, f, compression="GZIP")
else:
pq.write_table(table, f)

file_url = fs.geturl(filename)
yield [file_url]
Loading

0 comments on commit 24127d0

Please sign in to comment.