From 24127d0ee389a0897fff79bff12c7c3e429425cd Mon Sep 17 00:00:00 2001 From: Jamiel Carter <88984480+jamielxcarter@users.noreply.github.com> Date: Tue, 28 Nov 2023 17:29:52 -0500 Subject: [PATCH] feat: Add Parquet as a batch encoding option (#2044) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add parquet encoding enum and dataclass * WIP * Add parquet support and tests * Write fastparquet file with fs * Change open_with argument * Update fastparquet.write * WIP * WIP * Adding s3fs as dependency * Remove s3fs * Remove fastparquet add pyarrow as dependency * Add parquet dependency * Add support for gzip and snappy compression types for parquet * Add pyarrow as a core dependency * Add numpy for python 3.7-3.11 * Add schema parsing * Change dict to Dict for parsing types * Added Batch Factory * Remove pyarrow as core dependency and wrap logic in dependency checks * Added missing quotes * Removed json schema to pyarrow schema support Don't currently have a way to support different numeric types like Decimal.decimal. Reverting to using pyarrow's schema inference. * Updated poetry.lock to add pyarrow as extra * Updated formating * Updated for readability * Added tests to account for missing pyarrow install * Addressed ambiguous type issue * Adding type ignore * Added type ignore to correct location * Update singer_sdk/batch.py * Adding back normal imports * mypy: install extras * Ignore missig pyarrow types * Move batchers to contrib modules * Increase test coverage * Fix types * Test batcher and target --------- Co-authored-by: Edgar R. M Co-authored-by: Edgar Ramírez Mondragón <16805946+edgarrmondragon@users.noreply.github.com> --- noxfile.py | 4 +- poetry.lock | 27 ++-- pyproject.toml | 22 ++- .../schemas/continents.json | 4 +- singer_sdk/batch.py | 89 ++++++----- singer_sdk/contrib/__init__.py | 1 + singer_sdk/contrib/batch_encoder_jsonl.py | 52 +++++++ singer_sdk/contrib/batch_encoder_parquet.py | 54 +++++++ singer_sdk/helpers/_batch.py | 10 ++ singer_sdk/helpers/_compat.py | 7 +- singer_sdk/helpers/capabilities.py | 2 +- singer_sdk/sinks/core.py | 24 ++- singer_sdk/streams/core.py | 4 +- tests/contrib/__init__.py | 0 tests/contrib/test_batch_encoder_parquet.py | 49 ++++++ tests/core/resources/continents.parquet.gz | Bin 0 -> 949 bytes tests/core/resources/countries.parquet.gz | Bin 0 -> 16006 bytes tests/core/test_batch.py | 139 +++++++++++++++++- tests/core/test_singer_messages.py | 27 +++- tests/samples/test_target_sqlite.py | 46 ++++++ .../countries_write_schemas | 2 +- 21 files changed, 486 insertions(+), 77 deletions(-) create mode 100644 singer_sdk/contrib/__init__.py create mode 100644 singer_sdk/contrib/batch_encoder_jsonl.py create mode 100644 singer_sdk/contrib/batch_encoder_parquet.py create mode 100644 tests/contrib/__init__.py create mode 100644 tests/contrib/test_batch_encoder_parquet.py create mode 100644 tests/core/resources/continents.parquet.gz create mode 100644 tests/core/resources/countries.parquet.gz diff --git a/noxfile.py b/noxfile.py index f66afc223..9c5e07626 100644 --- a/noxfile.py +++ b/noxfile.py @@ -56,7 +56,7 @@ def mypy(session: Session) -> None: """Check types with mypy.""" args = session.posargs or ["singer_sdk"] - session.install(".") + session.install(".[s3,testing,parquet]") session.install( "mypy", "pytest", @@ -77,7 +77,7 @@ def mypy(session: Session) -> None: @session(python=python_versions) def tests(session: Session) -> None: """Execute pytest tests and compute coverage.""" - session.install(".[s3]") + session.install(".[s3,parquet]") session.install(*test_dependencies) sqlalchemy_version = os.environ.get("SQLALCHEMY_VERSION") diff --git a/poetry.lock b/poetry.lock index bb8df2223..128c39edb 100644 --- a/poetry.lock +++ b/poetry.lock @@ -132,17 +132,17 @@ lxml = ["lxml"] [[package]] name = "boto3" -version = "1.33.1" +version = "1.33.2" description = "The AWS SDK for Python" optional = true python-versions = ">= 3.7" files = [ - {file = "boto3-1.33.1-py3-none-any.whl", hash = "sha256:fa5aa92d16763cb906fb4a83d6eba887342202a980bea07862af5ba40827aa5a"}, - {file = "boto3-1.33.1.tar.gz", hash = "sha256:1fe5fa75ff0f0c29a6f55e818d149d33571731e692a7b785ded7a28ac832cae8"}, + {file = "boto3-1.33.2-py3-none-any.whl", hash = "sha256:fc7c0dd5fa74ae0d57e11747695bdba4ad164e62dee35db15b43762c392fbd92"}, + {file = "boto3-1.33.2.tar.gz", hash = "sha256:70626598dd6698d6da8f2854a1ae5010f175572e2a465b2aa86685c745c1013c"}, ] [package.dependencies] -botocore = ">=1.33.1,<1.34.0" +botocore = ">=1.33.2,<1.34.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.8.0,<0.9.0" @@ -151,13 +151,13 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.33.1" +version = "1.33.2" description = "Low-level, data-driven core of boto 3." optional = true python-versions = ">= 3.7" files = [ - {file = "botocore-1.33.1-py3-none-any.whl", hash = "sha256:c744b90980786c610dd9ad9c50cf2cdde3f1c4634b954a33613f6f8a1865a1de"}, - {file = "botocore-1.33.1.tar.gz", hash = "sha256:d22d29916905e5f0670b91f07688e92b2c4a2075f9a474d6edbe7d22040d8fbf"}, + {file = "botocore-1.33.2-py3-none-any.whl", hash = "sha256:5c46b7e8450efbf7ddc2a0016eee7225a5564583122e25a20ca92a29a105225c"}, + {file = "botocore-1.33.2.tar.gz", hash = "sha256:16a30faac6e6f17961c009defb74ab1a3508b8abc58fab98e7cf96af0d91ea84"}, ] [package.dependencies] @@ -1979,20 +1979,20 @@ files = [ [[package]] name = "s3transfer" -version = "0.8.0" +version = "0.8.1" description = "An Amazon S3 Transfer Manager" optional = true python-versions = ">= 3.7" files = [ - {file = "s3transfer-0.8.0-py3-none-any.whl", hash = "sha256:baa479dc2e63e5c2ed51611b4d46cdf0295e2070d8d0b86b22f335ee5b954986"}, - {file = "s3transfer-0.8.0.tar.gz", hash = "sha256:e8d6bd52ffd99841e3a57b34370a54841f12d3aab072af862cdcc50955288002"}, + {file = "s3transfer-0.8.1-py3-none-any.whl", hash = "sha256:d1c52af7bceca1650d0f27728b29bb4925184aead7b55bccacf893b79a108604"}, + {file = "s3transfer-0.8.1.tar.gz", hash = "sha256:e6cafd5643fc7b44fddfba1e5b521005675b0e07533ddad958a3554bc87d7330"}, ] [package.dependencies] -botocore = ">=1.32.7,<2.0a.0" +botocore = ">=1.33.2,<2.0a.0" [package.extras] -crt = ["botocore[crt] (>=1.32.7,<2.0a.0)"] +crt = ["botocore[crt] (>=1.33.2,<2.0a.0)"] [[package]] name = "setuptools" @@ -2853,10 +2853,11 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [extras] docs = ["furo", "myst-parser", "sphinx", "sphinx-autobuild", "sphinx-copybutton", "sphinx-inline-tabs", "sphinx-notfound-page", "sphinx-reredirects"] +parquet = ["numpy", "numpy", "pyarrow", "pyarrow"] s3 = ["fs-s3fs"] testing = ["pytest", "pytest-durations"] [metadata] lock-version = "2.0" python-versions = ">=3.7.1,<4" -content-hash = "73dc87c9c3b169c0f1a39563036de0a8a29e171f0ba4f2cdacd11768f44f5bc1" +content-hash = "280428df7a49ec32c590d73983e15254a0e1467198f331c1d13817d1b9dfaabb" diff --git a/pyproject.toml b/pyproject.toml index c41ec9f7c..c36e472d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ backports-datetime-fromisoformat = { version = ">=2.0.1", python = "<3.11" } click = "~=8.0" cryptography = ">=3.4.6,<42.0.0" fs = ">=2.4.16" -importlib-metadata = {version = "<7.0.0", markers = "python_version < \"3.8\""} +importlib-metadata = {version = "<7.0.0", python = "<3.12"} importlib-resources = {version = ">=5.12.0", markers = "python_version < \"3.9\""} inflection = ">=0.5.1" joblib = ">=1.0.1" @@ -82,6 +82,16 @@ sphinx-reredirects = {version = ">=0.1.1", optional = true} # File storage dependencies installed as optional 'filesystem' extras fs-s3fs = {version = ">=1.1.1", optional = true} +# Parquet file dependencies installed as optional 'parquet' extras +numpy = [ + { version = "<1.22", python = "<3.8", optional = true }, + { version = ">=1.22", python = ">=3.8", optional = true }, +] +pyarrow = [ + { version = ">=11", python = "<3.8", optional = true }, + { version = ">=13", python = ">=3.8", optional = true } +] + # Testing dependencies installed as optional 'testing' extras pytest = {version=">=7.2.1", optional = true} pytest-durations = {version = ">=1.2.0", optional = true} @@ -102,6 +112,7 @@ testing = [ "pytest", "pytest-durations" ] +parquet = ["numpy", "pyarrow"] [tool.poetry.group.dev.dependencies] coverage = [ @@ -114,10 +125,6 @@ mypy = [ { version = ">=1.0,<1.5", python = "<3.8" }, { version = ">=1.0", python = ">=3.8" }, ] -numpy = [ - { version = "<1.22", python = "<3.8" }, - { version = ">=1.22", python = ">=3.8" }, -] pyarrow = [ { version = ">=11,<13", python = "<3.8" }, { version = ">=11", python = ">=3.8" } @@ -217,6 +224,7 @@ module = [ "backports.datetime_fromisoformat.*", "joblib.*", # TODO: Remove when https://github.com/joblib/joblib/issues/1516 is shipped "jsonpath_ng.*", + "pyarrow.*", # TODO: Remove when https://github.com/apache/arrow/issues/32609 if implemented and released ] [build-system] @@ -226,6 +234,10 @@ build-backend = "poetry.core.masonry.api" [tool.poetry.scripts] pytest11 = { reference = "singer_sdk:testing.pytest_plugin", extras = ["testing"], type = "console" } +[tool.poetry.plugins."singer_sdk.batch_encoders"] +jsonl = "singer_sdk.contrib.batch_encoder_jsonl:JSONLinesBatcher" +parquet = "singer_sdk.contrib.batch_encoder_parquet:ParquetBatcher" + [tool.ruff] line-length = 88 src = ["samples", "singer_sdk", "tests"] diff --git a/samples/sample_tap_countries/schemas/continents.json b/samples/sample_tap_countries/schemas/continents.json index 2fab0c1ed..5d4a9a0bb 100644 --- a/samples/sample_tap_countries/schemas/continents.json +++ b/samples/sample_tap_countries/schemas/continents.json @@ -1,7 +1,7 @@ { "type": "object", "properties": { - "code": { "type": ["null", "string"] }, - "name": { "type": ["null", "string"] } + "code": { "type": ["string", "null"] }, + "name": { "type": ["string", "null"] } } } \ No newline at end of file diff --git a/singer_sdk/batch.py b/singer_sdk/batch.py index 0cbf11917..4545285e4 100644 --- a/singer_sdk/batch.py +++ b/singer_sdk/batch.py @@ -1,12 +1,12 @@ """Batching utilities for Singer SDK.""" from __future__ import annotations -import gzip import itertools -import json import typing as t +import warnings from abc import ABC, abstractmethod -from uuid import uuid4 + +from singer_sdk.helpers._compat import entry_points if t.TYPE_CHECKING: from singer_sdk.helpers._batch import BatchConfig @@ -14,6 +14,22 @@ _T = t.TypeVar("_T") +def __getattr__(name: str) -> t.Any: # noqa: ANN401 # pragma: no cover + if name == "JSONLinesBatcher": + warnings.warn( + "The class JSONLinesBatcher was moved to singer_sdk.contrib.batch_encoder_jsonl.", # noqa: E501 + DeprecationWarning, + stacklevel=2, + ) + + from singer_sdk.contrib.batch_encoder_jsonl import JSONLinesBatcher + + return JSONLinesBatcher + + msg = f"module {__name__} has no attribute {name}" + raise AttributeError(msg) + + def lazy_chunked_generator( iterable: t.Iterable[_T], chunk_size: int, @@ -71,41 +87,46 @@ def get_batches( raise NotImplementedError -class JSONLinesBatcher(BaseBatcher): - """JSON Lines Record Batcher.""" +class Batcher(BaseBatcher): + """Determines batch type and then serializes batches to that format.""" - def get_batches( - self, - records: t.Iterator[dict], - ) -> t.Iterator[list[str]]: - """Yield manifest of batches. + def get_batches(self, records: t.Iterator[dict]) -> t.Iterator[list[str]]: + """Manifest of batches. Args: records: The records to batch. - Yields: + Returns: A list of file paths (called a manifest). """ - sync_id = f"{self.tap_name}--{self.stream_name}-{uuid4()}" - prefix = self.batch_config.storage.prefix or "" - - for i, chunk in enumerate( - lazy_chunked_generator( - records, - self.batch_config.batch_size, - ), - start=1, - ): - filename = f"{prefix}{sync_id}-{i}.json.gz" - with self.batch_config.storage.fs(create=True) as fs: - # TODO: Determine compression from config. - with fs.open(filename, "wb") as f, gzip.GzipFile( - fileobj=f, - mode="wb", - ) as gz: - gz.writelines( - (json.dumps(record, default=str) + "\n").encode() - for record in chunk - ) - file_url = fs.geturl(filename) - yield [file_url] + encoding_format = self.batch_config.encoding.format + batcher_type = self.get_batcher(encoding_format) + batcher = batcher_type( + self.tap_name, + self.stream_name, + self.batch_config, + ) + return batcher.get_batches(records) + + @classmethod + def get_batcher(cls, name: str) -> type[BaseBatcher]: + """Get a batcher by name. + + Args: + name: The name of the batcher. + + Returns: + The batcher class. + + Raises: + ValueError: If the batcher is not found. + """ + plugins = entry_points(group="singer_sdk.batch_encoders") + + try: + plugin = next(filter(lambda x: x.name == name, plugins)) + except StopIteration: + message = f"Unsupported batcher: {name}" + raise ValueError(message) from None + + return plugin.load() # type: ignore[no-any-return] diff --git a/singer_sdk/contrib/__init__.py b/singer_sdk/contrib/__init__.py new file mode 100644 index 000000000..ebad0cbc7 --- /dev/null +++ b/singer_sdk/contrib/__init__.py @@ -0,0 +1 @@ +"""Singer SDK contrib modules.""" diff --git a/singer_sdk/contrib/batch_encoder_jsonl.py b/singer_sdk/contrib/batch_encoder_jsonl.py new file mode 100644 index 000000000..6ce4c8793 --- /dev/null +++ b/singer_sdk/contrib/batch_encoder_jsonl.py @@ -0,0 +1,52 @@ +"""JSON Lines Record Batcher.""" + +from __future__ import annotations + +import gzip +import json +import typing as t +from uuid import uuid4 + +from singer_sdk.batch import BaseBatcher, lazy_chunked_generator + +__all__ = ["JSONLinesBatcher"] + + +class JSONLinesBatcher(BaseBatcher): + """JSON Lines Record Batcher.""" + + def get_batches( + self, + records: t.Iterator[dict], + ) -> t.Iterator[list[str]]: + """Yield manifest of batches. + + Args: + records: The records to batch. + + Yields: + A list of file paths (called a manifest). + """ + sync_id = f"{self.tap_name}--{self.stream_name}-{uuid4()}" + prefix = self.batch_config.storage.prefix or "" + + for i, chunk in enumerate( + lazy_chunked_generator( + records, + self.batch_config.batch_size, + ), + start=1, + ): + filename = f"{prefix}{sync_id}-{i}.json.gz" + with self.batch_config.storage.fs(create=True) as fs: + # TODO: Determine compression from config. + with fs.open(filename, "wb") as f, gzip.GzipFile( + fileobj=f, + mode="wb", + ) as gz: + gz.writelines( + (json.dumps(record, default=str) + "\n").encode() + for record in chunk + ) + file_url = fs.geturl(filename) + yield [file_url] diff --git a/singer_sdk/contrib/batch_encoder_parquet.py b/singer_sdk/contrib/batch_encoder_parquet.py new file mode 100644 index 000000000..1d5ad9cc1 --- /dev/null +++ b/singer_sdk/contrib/batch_encoder_parquet.py @@ -0,0 +1,54 @@ +"""Parquet Record Batcher.""" + +from __future__ import annotations + +import typing as t +from uuid import uuid4 + +from singer_sdk.batch import BaseBatcher, lazy_chunked_generator + +__all__ = ["ParquetBatcher"] + + +class ParquetBatcher(BaseBatcher): + """Parquet Record Batcher.""" + + def get_batches( + self, + records: t.Iterator[dict], + ) -> t.Iterator[list[str]]: + """Yield manifest of batches. + + Args: + records: The records to batch. + + Yields: + A list of file paths (called a manifest). + """ + import pyarrow as pa + import pyarrow.parquet as pq + + sync_id = f"{self.tap_name}--{self.stream_name}-{uuid4()}" + prefix = self.batch_config.storage.prefix or "" + + for i, chunk in enumerate( + lazy_chunked_generator( + records, + self.batch_config.batch_size, + ), + start=1, + ): + filename = f"{prefix}{sync_id}={i}.parquet" + if self.batch_config.encoding.compression == "gzip": + filename = f"{filename}.gz" + with self.batch_config.storage.fs() as fs: + with fs.open(filename, "wb") as f: + pylist = list(chunk) + table = pa.Table.from_pylist(pylist) + if self.batch_config.encoding.compression == "gzip": + pq.write_table(table, f, compression="GZIP") + else: + pq.write_table(table, f) + + file_url = fs.geturl(filename) + yield [file_url] diff --git a/singer_sdk/helpers/_batch.py b/singer_sdk/helpers/_batch.py index 62447ddb3..0d30d0540 100644 --- a/singer_sdk/helpers/_batch.py +++ b/singer_sdk/helpers/_batch.py @@ -25,6 +25,9 @@ class BatchFileFormat(str, enum.Enum): JSONL = "jsonl" """JSON Lines format.""" + PARQUET = "parquet" + """Parquet format.""" + @dataclass class BaseBatchFileEncoding: @@ -69,6 +72,13 @@ class JSONLinesEncoding(BaseBatchFileEncoding): __encoding_format__ = "jsonl" +@dataclass +class ParquetEncoding(BaseBatchFileEncoding): + """Parquet encoding for batch files.""" + + __encoding_format__ = "parquet" + + @dataclass class SDKBatchMessage(Message): """Singer batch message in the Meltano Singer SDK flavor.""" diff --git a/singer_sdk/helpers/_compat.py b/singer_sdk/helpers/_compat.py index 20b7a399a..39042e532 100644 --- a/singer_sdk/helpers/_compat.py +++ b/singer_sdk/helpers/_compat.py @@ -11,9 +11,14 @@ from importlib import metadata from typing import final # noqa: ICN003 +if sys.version_info < (3, 12): + from importlib_metadata import entry_points +else: + from importlib.metadata import entry_points + if sys.version_info < (3, 9): import importlib_resources as resources else: from importlib import resources -__all__ = ["metadata", "final", "resources"] +__all__ = ["metadata", "final", "resources", "entry_points"] diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index 690f04db5..b7719c87c 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -62,7 +62,7 @@ Property( "format", StringType, - allowed_values=["jsonl"], + allowed_values=["jsonl", "parquet"], description="Format to use for batch files.", ), Property( diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 32d027cfa..f3a1975cf 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -5,6 +5,7 @@ import abc import copy import datetime +import importlib.util import json import sys import time @@ -541,15 +542,24 @@ def process_batch_files( tail, mode="rb", ) as file: - open_file = ( + context_file = ( gzip_open(file) if encoding.compression == "gzip" else file ) - context = { - "records": [ - json.loads(line) - for line in open_file # type: ignore[attr-defined] - ], - } + context = {"records": [json.loads(line) for line in context_file]} # type: ignore[attr-defined] + self.process_batch(context) + elif ( + importlib.util.find_spec("pyarrow") + and encoding.format == BatchFileFormat.PARQUET + ): + import pyarrow.parquet as pq + + with storage.fs(create=False) as batch_fs, batch_fs.open( + tail, + mode="rb", + ) as file: + context_file = file + table = pq.read_table(context_file) + context = {"records": table.to_pylist()} self.process_batch(context) else: msg = f"Unsupported batch encoding format: {encoding.format}" diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index f306f1ec4..d235987a7 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -15,7 +15,7 @@ import singer_sdk._singerlib as singer from singer_sdk import metrics -from singer_sdk.batch import JSONLinesBatcher +from singer_sdk.batch import Batcher from singer_sdk.exceptions import ( AbortedSyncFailedException, AbortedSyncPausedException, @@ -1349,7 +1349,7 @@ def get_batches( Yields: A tuple of (encoding, manifest) for each batch. """ - batcher = JSONLinesBatcher( + batcher = Batcher( tap_name=self.tap_name, stream_name=self.name, batch_config=batch_config, diff --git a/tests/contrib/__init__.py b/tests/contrib/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/contrib/test_batch_encoder_parquet.py b/tests/contrib/test_batch_encoder_parquet.py new file mode 100644 index 000000000..0318e41d3 --- /dev/null +++ b/tests/contrib/test_batch_encoder_parquet.py @@ -0,0 +1,49 @@ +"""Tests for the Parquet batch encoder.""" + +from __future__ import annotations + +import typing as t + +from singer_sdk.contrib.batch_encoder_parquet import ParquetBatcher +from singer_sdk.helpers._batch import BatchConfig, ParquetEncoding, StorageTarget + +if t.TYPE_CHECKING: + from pathlib import Path + + +def test_batcher(tmp_path: Path) -> None: + root = tmp_path.joinpath("batches") + root.mkdir() + config = BatchConfig( + encoding=ParquetEncoding(), + storage=StorageTarget(root=str(root)), + batch_size=2, + ) + batcher = ParquetBatcher("tap", "stream", config) + records = [ + {"id": 1, "numeric": "1.0"}, + {"id": 2, "numeric": "2.0"}, + {"id": 3, "numeric": "3.0"}, + ] + batches = list(batcher.get_batches(records)) + assert len(batches) == 2 + assert batches[0][0].endswith(".parquet") + + +def test_batcher_gzip(tmp_path: Path) -> None: + root = tmp_path.joinpath("batches") + root.mkdir() + config = BatchConfig( + encoding=ParquetEncoding(compression="gzip"), + storage=StorageTarget(root=str(root)), + batch_size=2, + ) + batcher = ParquetBatcher("tap", "stream", config) + records = [ + {"id": 1, "numeric": "1.0"}, + {"id": 2, "numeric": "2.0"}, + {"id": 3, "numeric": "3.0"}, + ] + batches = list(batcher.get_batches(records)) + assert len(batches) == 2 + assert batches[0][0].endswith(".parquet.gz") diff --git a/tests/core/resources/continents.parquet.gz b/tests/core/resources/continents.parquet.gz new file mode 100644 index 0000000000000000000000000000000000000000..8df64eb5a1f39dfe59fc21a2b46174fe2c2f9753 GIT binary patch literal 949 zcmWG=3^EjD5e*TI@e$<{We{RukniST00ZIVq=bZml4PcdbqPiiF1y;AJnY>CUcTp1 zYL;a@JO9%+Eub<{22n9lI~|~20Z}$lHW@Pp4W?j62_{E3pd_kM50X>T-dU)zG9+%R zEaC=AtNmb<5#^CIWndGKlwe8DPe~PJ5#tj(z$o^CQLTYdjRV;tHVObK zpw%p*>lj5>F=BCeGSJ~Z+NXT{o-l3vX@ zioi&}ETEwlK%T3cD_9V02$%`tLacHsjSM%;0dsx9>H{5}0zu}xJ31yix_~5rM#41( zhJxMa0oDL>D%4hYzp^OzP_PU{3qqbjLZKkBsIWA(L^rvhKsT|dD8F2xEVZaOGe1wk Y&_vHb&rnha7@MF7{mjU~5CF`u0Ij3hAOHXW literal 0 HcmV?d00001 diff --git a/tests/core/resources/countries.parquet.gz b/tests/core/resources/countries.parquet.gz new file mode 100644 index 0000000000000000000000000000000000000000..e0206506494a4b9505e9c0ea59966ab069d0ff00 GIT binary patch literal 16006 zcmeHuWmsHIwl3}-8Yft2+#y&9?oMzC?$Efq2e;tv1ouD)uEEoI&;S8~1_=`EHsqU6 z=49^7ea@WwoFB7y@7=vi*M6&3^;>IIDG95v0pI}5_<$dXa)47<02B@s6j>w^)Wh+J zdQ#c3fE8CvS;G|%8H+$#J+8C%ydzFE#_xym&d2kn%*%I6pq0s4zcI&nX%PD`CE2m$ zc?pLz^v|uXYY@501y|&ktM0AI@RDliQZ57N=$2!>%>fNH&F07%@twvCk$EmAzh{0IjA?JC}4ppJbc zpif`>l4njOO%<3V`bCLXwUypeyy6w0e5cy;@nU*YamiOp&vd5IPaUS6U@bC!;iHJ!^ zACr+&P*Odi2GY>d(K9eIF|)9;v2$>8J>}-%<>P-Q@LW(xSVUAzTtZSxT1Hk*UO`bw zS>=VQn!1LjmbQ+r-b;N0LnC7oQ!{f5ODk&|TacZ-gQJtPi>sTvho_gfkFTG9KwwaC zNNCur@QBE$=$P2J_=LpQNy#axY3Ui6S=l+cdHDr}Ma3nhZ_3ImDyyn%YU}D78k^oW zx4dh8-`3vI+11_C+t)uZ_+e;xWOQtN;v;x+YIgTofjW3&9+h2Ef z_x2ACza1T)oPIw$zqou*6)2ZGuV7?oC@5edItc)ckP8X{i;w`$*v`Zh0Ec%9i&sH{ zmx&H6L;@oH8Ban+i1tANVD4eZf_y>nhXY)20Zxg3p^dg0vQESU*fRlm;=6{_F^^hx zlRhmzxO@ zlks?;ZhvIZxWd|IA-Bs6*Tt`fVTw$gfs0raNMbn50Ps$VJFe_RTBM7?x4h#iu94#s zPYILS;&|4D*iVl;LZ4obnXSLu$=+h%&PGx%HzHWzjH;{C?iAz_GVec%px zV!h4alblR{ek+LARJuqSMxBPaaT`%>ydqrx$z#lLAbLe8e2(7i#eQpbl52EPdTSCb zzu7|S)bmH|A^eLh3U5w+(9Va7ob9Bl2So9Y%qEnNYW$cZJg32;9?OTLsX7qU0$nlk zHM~mF^p2vjdHuC=9*wJd-QOmM4cT4<3HjMP>2job88?tI#2@ z^=r8J0X$uX@_vtoG};+_+e|WBHs1H@y=@nnkyq0_;bUS#x@&*Mzzlrca`iHs z>cybY(T>j;*!_q<^X4-Zv{DNfb{e+$u}AGu-TrF34SRYmqCHY-Gk-~Dc*T=fUtO^F|~I;l__Fw@S9AF@X% zru&kL-?1!;V+39SLaxbE2{i4JtjT3KvwM?{b{W4-@i?pteZZ8^eMap?^PVjmCpPU( zpxYv$XBBtg){!QqJ<*^Vqs*Aa*QLlX343J!srG3lI6SB?r56)ub)DiPV+nbgne}j( zdoD%kz#H`B~+m{?nkZTSE2@;evN@ zn8mCJ>Mo;Yv$R^Rm}iOC`3P%#Auqqt0!LSP)6lX{HYUV8v3&!^$}O>P#aG$r@%6}? zv}v52(0(N_cVI;_3?8!)nwdjNI}{VVW@HOg9u^fAvd?i!`_ROw$9iPg1jd-OoR zNtooeU=ylh_bIJ!;%*r0rEY!s!f8INc;y2DL7CInJpFCiMv}j#z~OECzEdpljX+PajI_6uSBXWrCK*X-C}>TZBR`P;U|m=e<*~jr z**0cb#x7QZphh~M*A$gln$m^)!qJWB`x~v8Y2Ghm14)7^AYJM-{e1g$Uvr*xkrPco zzx)_&_Sp}61iSJfL3Ob57mmpp-vXGq6C9RFWL4xRs?B0FKh4m24ARg1Y(MG|u)zNo=r=8G8pS`1v7*Rm5c~ zj~}(a#i^d$ArwP|7D>c~h2MuK@XwkX=WCj9CK|Y8jt+~Ddw9Pu(b~}fF!gW>7_3Iw z&A&#x?F+U!Q*zcLY#7C#KRsWLIr(h=Y-qO=AViGR`pPJxZ@@>Vr-#Xs>fIWb7PWyJ z--KIu4u;hVTeSQsQw>mY3rV>=Alpfbg^Xxvap5JyS<$k#F)|KzOTep6UWpW~>Z3Qc zX_HwXdHSiybh0z7RARVfC$jignun@o;eaN8flW2~cUGtDTe#UyaAl%n&JttspQcRi z?1kudr%OxE1t0NMvsNDpQk%bT3bI}S*St=eOJ#S-dI!p_k)3+|Io+y&I4{qxL50od z1|Oj1s9!nP4J1{av!7aV9Ol}J=po4CinxNl+p^(Ri~|3N-V3X%P9pAJCUA7}5rvt} zUUEmR9RW(OmZq|&m+X{8S(k)p$h?WKQlQRkXc0_;VssqB9F^FA{fYMQZZ|T-?ojZ} zCy(a4obnOv>TFFp?AG`VHJX309_{pZgA|=Gj51aSU zQJ!gnh_lTx%uDMFK2qcxI!W)(DN`5T&}kFO&E)l?`@X^;?t@cqx4P>cBacZW)!vef zoS_^j(=E}Xr{W7#y9K){Yds3e5|?UuA|g0Z5vbg))aEO_Av*Iy@l)a9KDA*Zm0ih! zck6NE#Nn#xCaZXsD|xSQCD+~Gb7`WUY2LUt9LYANI$HCt2t8Y@*cQvyqbr=8V$yP>z)c#y&Oga)+4AKM)@6MO-5dQM zQ4xfm5F0~~2?44wgbWyhES;PULI2?;|D2ahvHXdbfDCPa<0YZ|cr_Hjcl5wQjC*48 zOLTw9Qsx8z?Z98K6jO~Qr$@7Zt$G;g*&!itj}I zwknDyoIA@~&$GP_c(-oH4l|b;KuhoT6=aC1Zv72yt9&bNg)42Qi@!O&@hyC+rqj9N z%vl?i!tnj5(WOFR*384oc_+1dyQyZ$I{Q^H;W1M^_QWK99 zS5iYd>l9Auv>6W)(d5KOltoLU&s@PxQqtV@;md_t>RlsvL_dJ29_5nF>6s{Zs-&29 zVUeG`8wu3Qa^8owl=La`6l-90hx~BA*)FyyI4r~^oB_Nh{Ss%QOFOXJ+w~GiyTGpT zN`ZqH?nEB#%2b=c(8aJ^bq2%Gdlt|%Sex3U_A=aJ8_D#^?$d6yr-~6#gUDx_)owWb zif(w`89tM~FFl2euskXS+{Ac{i1{BecM&$LF!}JSla^=C5qMNfXnAmNsH~N6f~EJh zJN6SboxmOSuC2J%=Ng)y$X#TRL1gh%Wa{v}OyX3i3 zU5mcJ&sJ0!Xp?YuHC9|4VNO@rK6$(LVdRx+L2%^3YG_#B%HEi@dq>D9cs`jaIOHIz z?u)DqFKc0==*#Oi_N!PYHxKm%z7(aX4$!`5_b1HE$`WpaSLWg(I-=l0N~U(~uGg!P zhhqd^JdwVA@^HQms>c-c)%4~$Tip!{`|5d^69;;lS`%fWQ5UsD3O-*n2|bG&BMnm( zxyq(oaYdq`kxMRdQQ&3WL`MlB!BDReq^p>5eoYWnSKhcQwLaVk53b?(3fmp(y+Ws_ zsO(CfmSTmf7)i5YWvfMLU^FTA`ih})k%laNJ#o~yel#{YeZWk!fS)lC{)nDMFZXhN zHl5~MW}fKUbF_T_@1>#<-C(Lc#+I3P@F+%d)X}bmQChZJd0cM_75vbSPtQQPQvsCOJ9eYQ18L?X5csLsq~ zB=d>6z!_D-xC%&tC-h5gDX3g^^nAa)64c}0oq9c+`2t1ED_(QkG6;dht z(j}doIN6nt5jj}YW5eDGS%mdmemlV1%t3AWY-g6ZswZKZ!%h#KP08iM7dGr&&^O+h zztuu=;G3qw9R93>_blhdmh{JyN8(Z0<4c$4V4LA$Pvl0|_fJQkP^0i(bhrxk*2Ru> zztZ`9UEstNWYJX1zNE=JaL2->Ud63mOv|n+uH=fH7SqJ*byJAr9PO1PXZGxCE*^cD z()B2>lV5+GxKqpPHo;``y#qC5)o}}2y?s+>P6h$Naoo4Fe)^k2!oX3~?%{ILipQpC zRZ$#Va*cZYI3MBRDG;`)we;i`=(6x%<9s#^z5?;sh2(^svu3VChQDRL%h zRkc*PBYgK3=p3Y$J5H=sSOX}X1$3?6@Wvul8th28V?IjSN%?&3scy&+p;PZ(to449 z(RZ1#>e~f3G zd1=yecx9DZkH$IrvA0~8Xu{uOOZ;1(DMeZys@npMM|MB>`V?lXjfkcEMY2$Nj z?!APjs3D!j8IP~zNofPe-3m%yCoHXF$tUm&PugB*eDUw219yL7P)41@#V!gf(8ATT zR+6}Om)<2oDUR?Qoj4TLn@$GJ9wz9@LV$cB-v36A?s`fB8Pf7^dZN1S8 z#|XP*3z$m8m<{f0Y{*#ak?bwaC%|S#_!uPVy%;T%yM)i*-en?D=&Kl1x|;i1%VNFd zc2!bLAB8{~lPF&iBXk+Vbh)h&S|XBatg^?srI74~tEW4tx4iI19_NEk0{N&oSs*HKxWbuWS5UWOG%hoZ!@NM$IA(Is)>i(vFDW z;6b;UMz8Ybu-l$Sqh?Lx{S$#!DDuX@=I>#S8?(^f&&j1NY*!}NQVNizFNvrJ;&Nl3 zO6u5RVbF29#0-Q6nN*erh?Z=C)i9(bV<=q1O~^RM)#Z7rS$?f z2qJ%BXlrNqA2$8Z+4MKvKiD(^$k5r+^*27fCxYkC2%OLWeq(*$)BgbCUsCFJLclVT z9AM(_&G1~WWs?_Kk;ECkj*%yhNmv&lJ7+hQ+ zkU?|Fol#OP?WTWPGq1N&@2nszNgd@h=G@OFtjOy>*61q`pPRpk(T+f(HUi{B>4e*( zQ)AQlVClV`sUnMBjO;+%yi879mkHuH-qPCLIj8q}iK5fp{M9?XCQ|>~Ou>>w7M0oR zZhEH$oysGru;*Gaz8Hy1SV5W+8_!i?Bi=N9Ktpm@9%U7cP$7M)H^XMM0ZTWL_#~Mk?RZaWp79gs#{BC@N`Q~T72zvx<&Q7FFR;V6#y5?JqoTS5PogS7= z=Rl$F_0Bq~M#vm(Y zGe=5dDN6L-HJI9C8Jx%jS^sBo-K*Cs!6L4?{(g>DMla1Mp%579%ryQXp6D;^T%0Y) zfBU;5KoB;&|J(cD0B@Y@FX8>ePIC*3R|*HLa{_KV+{611-sG2Pzh?%#Cz1nf|NY;J zyt;rBu_TTfdE#sF;krXioBmfnFv$&7HWA@aS=HWExVbg(&&^v-c|!$)uaIL%Uh1{z zqLAqFw&G$Om@n-`~7XQb=st?k9VXBwGBn1vfEn*{6%EK+|jMjM9UEuisGft~7_#R0Ed?|37Bse+a?mal$ zd{jx%)4A1_4a~K78JmGPqK&Lw>iES4OqMW_i*QuRAXwgYomQe#e<6pv42G>ST8W{7 zh$7P2wi#!6F2pm5z(pN0#LDjG_*wWVZo1D{Z?;G$-n7SJ5Dw<+R2nJjtiFJxTEGm~>X*Vnm2r1ZY) zwH8gRJt@5nInGHStbO;c!SYl^97F-8d>99hb;uZ-O>TqJ%08zt&id2z> zm?;A@GadQp-O5h>Mg&(U`Mlasb*MW(lC{4xP4<|Z4CvGot78p)Q*Gf&?~uc^Ma;q= z+NFG5y&7x$@kc^fmWmDVeOaSzX93wr?sKr$)fl zZN&Jz4T_qUM>D6HSlVQoFJyARrJ0$=F~CO%iX1|$RV|$(>opNo=+ie2j4fk*TsoSA z!b=j{xO0>3vpQP9AYtHD(UtkO1&hazQ!_)fgf=f+X4YVK&sq0FUa;THvC-t!B0 zw>CFqcZ1saZ%JQGY)d$}Xojz{A0@O7#G1NSEMZD`zR~fTlNj&XYDM#L%LKEkubVq3G3cvKKmDs25a+9b=xFE zGt7RdMiNYa*o;e(_Pciep~>QIVbsPyBB}#|Vb_&#RxLV?5h>iyDdWjmkgQUOv%>8o z+mv((>hjCC@i*q#nclal&`Nr`E5yicM!J`n>SKJ~v1^dySdF2KkxFD|OgJnb8)(co z(#@=-vlwaIvp@v^Uia>3*6>?-37>KW!)IA~VV)HbNmK-@Z(qnPq6xGfLtk5r78Mh7 zN?9kwwgS-1;h!i*+Pd+J=7qAQWFS(b<~}95p(o(#u)7<-hQrBwXWo|aD)wtur~_$$ zVyUf>1XP8;A&t=DhW4B0xw}XTyNS(HsQti2gp}0WwtY6>3^snUf+RIeFK`#GWj(z= zy z#fE);^d3KU2cOZI>FVfl+&B`6+Kl2e`q|9aj)5cTL#Z7@+0P9|%4tNqBvy(SkEI-k z_QyamYVhks+E?0_1XtY`RzG%*r&IczaW<{3RlQ_!Fwd-5Ft?RqFCK@@b`8F5ADI)9 zPb0QV(}v$E;=_kWtY@8ud)DMh<0YTHUq66j5dG=9myGkR@le8!iJp5X9bV#hRk;t7 zgh`Q%&q{83TNv47h8?wHpc-`xVCHTG?>L)RvQZy+P#GLx?k~$vP-qdf4b2@*jR=s1 zjapPxm2a=3dwORUUh7XljXB?QsZJV_8U!rugD51COcT!ms9P!b7psoye-Djb@n^+d=ffADtam`8+sMxWC zi+!N*U(aAuuGu)$J1<1JC!6pGx`uq*f9#PXd_dN`7os9gWtY(Qh{Bj%fohB}P~0C? zYM;G$2j#O^rm@RG|LslSRJK5jnvmn*8fw$$a{O}yn1bl)y5Rw!DB^Yt+ zHqCKWEiA|$d=mLFEOLlnVT@j1zvfObWGF4Op40D%y)nOUMV1!EO7!ODwH9o_LL4a`!tdA@w4*s|#+BrF#;q84RcMBaL{7f@hG#H0IEJRXF zB8@j6yP$54N5or%G9fNggx(~&NJLgvgM*zybeJ+etHNCui9>cdZ%P=G$yjLh@^trx@dA2! zOs{;4i5+LTzJez??oL6!SI&syKPe}Yv5TXlDahF4r*`h(pFspZ)PYBicyeWGW&vl;Ub z?2h%T&-`F%mO0T87_Wm2zKIX|r1e_nN!QvNDfRYb?xs4i34*M{WyP%P9SCps_&u_+ z&s4~4@sVM(46Ih0Cv;0IspK=okLN1`XQlm;cIKTIDZZi0>wl8ZYDsZ*V%8DWfUKoR z{YaYsEMBCLWMNc3`a-(ngyqc*T0W?D#)IG(HMyYopMFJO<)36e}Q^qa0{tNFF>m zSPJQc_waCtdj9?fLbSMOh+-nOw5&w%1F$&paVK$R%qFNBRUIQ;utFBZTIK7 zB2vM$v?$4eregGIj5NcghQeAX5rNc2O0+mJl6kq>q?(nX)T{KgFcBI;(5Se$Z2mS9 z<-#aU#8pazT8trqV=6VcD6N5CjFdH1;l%?=q;OS20*#EmUix;5uqt6Kql9$EX^H_D zL;P!%_$cE-{EOn`5Kv@#*$wD8Q%6O*E*m&=KMHkH7D=Hc2KE`I;gzaIeovRdbx#(8 z)}b{))eZqF&uK#{>i&G`-y8zUTSceEO%}=XztTpD5FuAqVuZn?gNJ6;gn^f(Q1Zv+ z995;*vjiyNIdm#Z(UOD+8S2)+G2%HADobHu1q6Di(H6>tIOVC*(jp+?zRRV>1@8p* zb<+~iLJv39xWxqWkE6G&dA4(#H3|RtV?qXI7#4SpkA*YGZ2q zCv(IaJYFw0UK1*Cu^bwhUGktee{1GnYE1(!AO`gpTBD|_tiHgCwE-)GTy*FVq?=o9 zPLBM_n4Ix!7AiWWshaHNY{rO*|3vO0f3T9RUHOibuRAnaYtXF%5{ zyg)oGd13_ufxTJH9Cl6WGU8L1YM(DQoQGdt=>)x91w1-{DxT5fV z2o#TT(dmRT08A~kJU7jr5bnvWP3e_gJmp8*>3QTYLtBeqVDof!% zvKl8$EIp2LISaBW;qp#*c%|!-ufCa1x0SAg7X5v%!zLDa<=6z))V6uc%L0x0Sx^L_ zrHo_!?p$b44uekocmB8t=w5wfQI*YjX%=GBvsxf0^qQhY!~%-|MD~c`fo0q%$S!Rvjcn zOW9rY=Dj-9tY7viw$a!PS#d=j)^rCS&>PM zBFWp#4AJ~4UT?&JQ(mUP;g>6ReO>j>RIBeos2R-H5<#Ilqkg8Ht}`|^mHuDoPDJGm zd}87c?YNdCBP+vzcS;CP`7Bn~3@rU#t`U*CD$;Lg9KC8+{k9a0)cu31rI20jn`b)N zmxZq#5-H>xL_e2u|6;g<({{0OfiP)^>C4*C5ajfqaqoZkxYyhNFN}M?*$E-zUL7&s zI~?G<4rpLR!^61uU)uYZ1K>AiKtF`c|Cb#gdSK<43|3tD3GcD7)~H)9=qPCZUIL_f zm6Qoez;}!15`0wRv0f`Dv-_@BHzxzt9e$&;es7z5pe3Ju4$iprK?zxN7b!;b0CucdPBR-&DwOb9M!IeYVg#&d7l>>h>vwa;AWFHB8;Lr>E_$c_%*hTm~=*0KHxm7z?I9Dvb z1J^>%^1qx#;3CIrncJj=$_?GigP|(*vl@1Dx*X0O0=BV*h4v_;b0x&vX7%^zW<= z00Tgfdj=7c3y$D{;X(GsKSF{t4!9RY3OP5RwqS|L<M(cK69A9HQ}+l^ZPzKQt%;A{o>|A7zy|J?%;Uc?>@MEih9c%2`haiO3HNeIw{ zRa6u;_Qmz7H{mB!>h-f|ZmZ(PGl~B`OPxL!us@?-TtjZ)s-%5Ib0wTDuu%V`kjnPkJ>{wh`ShRl6ul5l7 zv%a4R?$iGig3H24<3T)Owfpp(RtzjGJdjv%iHAZV;NCyn-+mtV`9-XZ*i~#Eo~7>7 zGq~N?cOUT^KF0TPf5PWMbS(~b4`W+(7cGtlc!lln>*MwOC-vPIC1RzeDPnITDd8k6 z|Bzn%zAM~Eh^ait_fX)^`tGX|d64%{_(J66Vl~v@vM~bveE<9Wib{~%L;L(|_{iR; zXBUIq{vYs>xKGa@4Y~appI`PLV@V5FBisA75aGN}@AS{)yMJ{_E(;S$8+L8TYuVkW z=e2^|{;7N}S{iIW+fV)>y)vZV-|GtmGGk#eNU`^g_j{u&|HMb?KKy5|d-(OU|0uX> zOaAOXKeIgGD*#16Zg1%5;9}~`WNdHGWa#K<=SJ>o>gZ%?2O?+VWM*Y%BYY2G8UObW OR0(9Cxe{at`u_poKR9Oq literal 0 HcmV?d00001 diff --git a/tests/core/test_batch.py b/tests/core/test_batch.py index 6efb3b34a..c2076e37b 100644 --- a/tests/core/test_batch.py +++ b/tests/core/test_batch.py @@ -1,40 +1,80 @@ from __future__ import annotations import decimal +import importlib.util import re from dataclasses import asdict import pytest -from singer_sdk.batch import JSONLinesBatcher +from singer_sdk.batch import Batcher +from singer_sdk.contrib.batch_encoder_jsonl import JSONLinesBatcher +from singer_sdk.contrib.batch_encoder_parquet import ParquetBatcher from singer_sdk.helpers._batch import ( BaseBatchFileEncoding, BatchConfig, JSONLinesEncoding, + ParquetEncoding, StorageTarget, ) +def is_pyarrow_installed(): + module_spec = importlib.util.find_spec("pyarrow") + return module_spec is not None + + +skip_if_no_pyarrow = pytest.mark.skipif( + not is_pyarrow_installed(), + reason="requires pyarrow", +) + + @pytest.mark.parametrize( "encoding,expected", [ (JSONLinesEncoding("gzip"), {"compression": "gzip", "format": "jsonl"}), (JSONLinesEncoding(), {"compression": None, "format": "jsonl"}), + (ParquetEncoding("gzip"), {"compression": "gzip", "format": "parquet"}), + (ParquetEncoding(), {"compression": None, "format": "parquet"}), + ], + ids=[ + "jsonl-compression-gzip", + "jsonl-compression-none", + "parquet-compression-gzip", + "parquet-compression-none", ], - ids=["jsonl-compression-gzip", "jsonl-compression-none"], ) def test_encoding_as_dict(encoding: BaseBatchFileEncoding, expected: dict) -> None: """Test encoding as dict.""" assert asdict(encoding) == expected -def test_storage_get_url(): - storage = StorageTarget("file://root_dir") +@pytest.mark.parametrize( + "file_scheme,root,prefix,expected", + [ + ( + "file://", + "root_dir", + "prefix--file.jsonl.gz", + "root_dir/prefix--file.jsonl.gz", + ), + ( + "file://", + "root_dir", + "prefix--file.parquet.gz", + "root_dir/prefix--file.parquet.gz", + ), + ], + ids=["jsonl-url", "parquet-url"], +) +def test_storage_get_url(file_scheme, root, prefix, expected): + storage = StorageTarget(file_scheme + root) with storage.fs(create=True) as fs: - url = fs.geturl("prefix--file.jsonl.gz") - assert url.startswith("file://") - assert url.replace("\\", "/").endswith("root_dir/prefix--file.jsonl.gz") + url = fs.geturl(prefix) + assert url.startswith(file_scheme) + assert url.replace("\\", "/").endswith(expected) def test_storage_get_s3_url(): @@ -69,6 +109,11 @@ def test_storage_from_url(file_url: str, root: str): assert target.root == root +def test_get_unsupported_batcher(): + with pytest.raises(ValueError, match="Unsupported batcher"): + Batcher.get_batcher("unsupported") + + @pytest.mark.parametrize( "file_url,expected", [ @@ -125,3 +170,83 @@ def test_json_lines_batcher(): for batch in batches for filepath in batch ) + + +def test_batcher_with_jsonl_encoding(): + batcher = Batcher( + "tap-test", + "stream-test", + batch_config=BatchConfig( + encoding=JSONLinesEncoding("gzip"), + storage=StorageTarget("file:///tmp/sdk-batches"), + batch_size=2, + ), + ) + records = [ + {"id": 1, "numeric": decimal.Decimal("1.0")}, + {"id": 2, "numeric": decimal.Decimal("2.0")}, + {"id": 3, "numeric": decimal.Decimal("3.0")}, + ] + + batches = list(batcher.get_batches(records)) + assert len(batches) == 2 + assert all(len(batch) == 1 for batch in batches) + assert all( + re.match(r".*tap-test--stream-test-.*\.json.gz", filepath) + for batch in batches + for filepath in batch + ) + + +@skip_if_no_pyarrow +def test_parquet_batcher(): + batcher = ParquetBatcher( + "tap-test", + "stream-test", + batch_config=BatchConfig( + encoding=ParquetEncoding("gzip"), + storage=StorageTarget("file:///tmp/sdk-batches"), + batch_size=2, + ), + ) + records = [ + {"id": 1, "numeric": decimal.Decimal("1.0")}, + {"id": 2, "numeric": decimal.Decimal("2.0")}, + {"id": 3, "numeric": decimal.Decimal("3.0")}, + ] + + batches = list(batcher.get_batches(records)) + assert len(batches) == 2 + assert all(len(batch) == 1 for batch in batches) + assert all( + re.match(r".*tap-test--stream-test-.*\.parquet.gz", filepath) + for batch in batches + for filepath in batch + ) + + +@skip_if_no_pyarrow +def test_batcher_with_parquet_encoding(): + batcher = Batcher( + "tap-test", + "stream-test", + batch_config=BatchConfig( + encoding=ParquetEncoding("gzip"), + storage=StorageTarget("file:///tmp/sdk-batches"), + batch_size=2, + ), + ) + records = [ + {"id": 1, "numeric": decimal.Decimal("1.0")}, + {"id": 2, "numeric": decimal.Decimal("2.0")}, + {"id": 3, "numeric": decimal.Decimal("3.0")}, + ] + + batches = list(batcher.get_batches(records)) + assert len(batches) == 2 + assert all(len(batch) == 1 for batch in batches) + assert all( + re.match(r".*tap-test--stream-test-.*\.parquet.gz", filepath) + for batch in batches + for filepath in batch + ) diff --git a/tests/core/test_singer_messages.py b/tests/core/test_singer_messages.py index 3a2253611..5ea161e8e 100644 --- a/tests/core/test_singer_messages.py +++ b/tests/core/test_singer_messages.py @@ -3,7 +3,11 @@ import pytest from singer_sdk._singerlib import SingerMessageType -from singer_sdk.helpers._batch import JSONLinesEncoding, SDKBatchMessage +from singer_sdk.helpers._batch import ( + JSONLinesEncoding, + ParquetEncoding, + SDKBatchMessage, +) @pytest.mark.parametrize( @@ -28,8 +32,27 @@ ], }, ), + ( + SDKBatchMessage( + stream="test_stream", + encoding=ParquetEncoding("gzip"), + manifest=[ + "path/to/file1.parquet.gz", + "path/to/file2.parquet.gz", + ], + ), + { + "type": SingerMessageType.BATCH, + "stream": "test_stream", + "encoding": {"compression": "gzip", "format": "parquet"}, + "manifest": [ + "path/to/file1.parquet.gz", + "path/to/file2.parquet.gz", + ], + }, + ), ], - ids=["batch-message-jsonl"], + ids=["batch-message-jsonl", "batch-message-parquet"], ) def test_batch_message_as_dict(message, expected): """Test batch message as dict.""" diff --git a/tests/samples/test_target_sqlite.py b/tests/samples/test_target_sqlite.py index a66805a09..59c8565c1 100644 --- a/tests/samples/test_target_sqlite.py +++ b/tests/samples/test_target_sqlite.py @@ -353,6 +353,52 @@ def test_sqlite_process_batch_message( assert cursor.fetchone()[0] == 4 +def test_sqlite_process_batch_parquet( + sqlite_target_test_config: dict, + sqlite_sample_target_batch: SQLiteTarget, +): + """Test handling a Parquet batch message for the SQLite target.""" + config = { + **sqlite_target_test_config, + "batch_config": { + "encoding": {"format": "parquet", "compression": "gzip"}, + "batch_size": 100, + }, + } + schema_message = { + "type": "SCHEMA", + "stream": "continents", + "key_properties": ["id"], + "schema": { + "required": ["id"], + "type": "object", + "properties": { + "code": {"type": "string"}, + "name": {"type": "string"}, + }, + }, + } + batch_message = { + "type": "BATCH", + "stream": "continents", + "encoding": {"format": "parquet", "compression": "gzip"}, + "manifest": [ + "file://tests/core/resources/continents.parquet.gz", + ], + } + tap_output = "\n".join([json.dumps(schema_message), json.dumps(batch_message)]) + + target_sync_test( + sqlite_sample_target_batch, + input=StringIO(tap_output), + finalize=True, + ) + db = sqlite3.connect(config["path_to_db"]) + cursor = db.cursor() + cursor.execute("SELECT COUNT(*) as count FROM continents") + assert cursor.fetchone()[0] == 7 + + def test_sqlite_column_no_morph(sqlite_sample_target: SQLTarget): """End-to-end-to-end test for SQLite tap and target. diff --git a/tests/snapshots/countries_write_schemas/countries_write_schemas b/tests/snapshots/countries_write_schemas/countries_write_schemas index b0808ce23..02b567fc9 100644 --- a/tests/snapshots/countries_write_schemas/countries_write_schemas +++ b/tests/snapshots/countries_write_schemas/countries_write_schemas @@ -1,2 +1,2 @@ -{"type": "SCHEMA", "stream": "continents", "schema": {"properties": {"code": {"type": ["null", "string"]}, "name": {"type": ["null", "string"]}}, "type": "object"}, "key_properties": ["code"]} +{"type": "SCHEMA", "stream": "continents", "schema": {"properties": {"code": {"type": ["string", "null"]}, "name": {"type": ["string", "null"]}}, "type": "object"}, "key_properties": ["code"]} {"type": "SCHEMA", "stream": "countries", "schema": {"properties": {"code": {"type": ["string", "null"]}, "name": {"type": ["string", "null"]}, "native": {"type": ["string", "null"]}, "phone": {"type": ["string", "null"]}, "capital": {"type": ["string", "null"]}, "currency": {"type": ["string", "null"]}, "emoji": {"type": ["string", "null"]}, "continent": {"properties": {"code": {"type": ["string", "null"]}, "name": {"type": ["string", "null"]}}, "type": ["object", "null"]}, "languages": {"items": {"properties": {"code": {"type": ["string", "null"]}, "name": {"type": ["string", "null"]}}, "type": "object"}, "type": ["array", "null"]}}, "type": "object"}, "key_properties": ["code"]}