From ce600f6ac3fa7b9d1b1259ad2ef72d817723236f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Wed, 11 Sep 2024 18:01:56 -0600 Subject: [PATCH] Simpler interface --- poetry.lock | 17 ++- pyproject.toml | 1 + singer_sdk/contrib/filesystem/base.py | 121 ++++++++++++++------ singer_sdk/contrib/filesystem/local.py | 115 ++++++------------- singer_sdk/contrib/filesystem/s3.py | 147 +++++++++++-------------- tests/contrib/filesystem/test_local.py | 88 +++++---------- tests/contrib/filesystem/test_s3.py | 62 +++-------- 7 files changed, 252 insertions(+), 299 deletions(-) diff --git a/poetry.lock b/poetry.lock index d730125ac..2f81946db 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2070,6 +2070,21 @@ files = [ [package.dependencies] pytest = ">=3.0.0" +[[package]] +name = "pytest-subtests" +version = "0.13.1" +description = "unittest subTest() support and subtests fixture" +optional = false +python-versions = ">=3.7" +files = [ + {file = "pytest_subtests-0.13.1-py3-none-any.whl", hash = "sha256:ab616a22f64cd17c1aee65f18af94dbc30c444f8683de2b30895c3778265e3bd"}, + {file = "pytest_subtests-0.13.1.tar.gz", hash = "sha256:989e38f0f1c01bc7c6b2e04db7d9fd859db35d77c2c1a430c831a70cbf3fde2d"}, +] + +[package.dependencies] +attrs = ">=19.2.0" +pytest = ">=7.0" + [[package]] name = "python-dateutil" version = "2.9.0.post0" @@ -3162,4 +3177,4 @@ testing = ["pytest"] [metadata] lock-version = "2.0" python-versions = ">=3.8" -content-hash = "d0a19ba182fa413102368e142cf2b7ac4415aa310c4264ae307cb9a0c140f264" +content-hash = "df14798dc3d9c7c2b19a5e551465e7f16ec88eda8a4df7fc5f5e9745c7b569ef" diff --git a/pyproject.toml b/pyproject.toml index 6b0d483f9..0bdd6df99 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -135,6 +135,7 @@ fastjsonschema = ">=2.19.1" moto = ">=5.0.14" pytest-benchmark = ">=4.0.0" pytest-snapshot = ">=0.9.0" +pytest-subtests = ">=0.13.1" pytz = ">=2022.2.1" requests-mock = ">=1.10.0" rfc3339-validator = ">=0.1.4" diff --git a/singer_sdk/contrib/filesystem/base.py b/singer_sdk/contrib/filesystem/base.py index 5b5520090..0d44fb554 100644 --- a/singer_sdk/contrib/filesystem/base.py +++ b/singer_sdk/contrib/filesystem/base.py @@ -3,66 +3,121 @@ from __future__ import annotations import abc +import enum +import io import typing as t if t.TYPE_CHECKING: import datetime + import types -__all__ = ["AbstractDirectory", "AbstractFile", "AbstractFileSystem"] +__all__ = ["AbstractFile", "AbstractFileSystem"] + +_F = t.TypeVar("_F") + + +class FileMode(str, enum.Enum): + read = "rb" + write = "wb" class AbstractFile(abc.ABC): """Abstract class for file operations.""" - def read_text(self, *, encoding: str = "utf-8") -> str: - """Read the entire file as text. + def __init__(self, buffer: io.BytesIO, filename: str): + """Create a new AbstractFile instance.""" + self.buffer = buffer + self.filename = filename + + def read(self, size: int = -1) -> bytes: + """Read the file contents. Args: - encoding: The text encoding to use. + size: The number of bytes to read. If -1, read the entire file. Returns: - The file contents as a string. + The file contents as bytes. """ - return self.read().decode(encoding) + return self.buffer.read(size) - @abc.abstractmethod - def read(self, size: int = -1) -> bytes: - """Read the file contents.""" + def write(self, data: bytes) -> int: + """Write data to the file. - @property - def creation_time(self) -> datetime.datetime: - """Get the creation time of the file.""" - raise NotImplementedError + Args: + data: The data to write. - @property - def modified_time(self) -> datetime.datetime: - """Get the last modified time of the file.""" - raise NotImplementedError + Returns: + The number of bytes written. + """ + return self.buffer.write(data) + def seek(self, offset: int, whence: int = io.SEEK_SET) -> int: + """Seek to a position in the file. -_F = t.TypeVar("_F") -_D = t.TypeVar("_D") + Args: + offset: The offset to seek to. + whence: The reference point for the offset. + Returns: + The new position in the file. + """ + return self.buffer.seek(offset, whence) -class AbstractDirectory(abc.ABC, t.Generic[_F]): - """Abstract class for directory operations.""" + def tell(self) -> int: + """Get the current position in the file. - @abc.abstractmethod - def list_contents(self: _D) -> t.Generator[_F | _D, None, None]: - """List files in the directory. + Returns: + The current position in the file. + """ + return self.buffer.tell() + + def __enter__(self: _F) -> _F: + """Enter the context manager. - Yields: - A file or directory node + Returns: + The file object. """ - yield self - yield from [] + return self + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: types.TracebackType | None, + ) -> None: + """Close the file. -class AbstractFileSystem(abc.ABC, t.Generic[_F, _D]): + Args: + exc_type: The exception type. + exc_value: The exception value. + traceback: The traceback. + """ + self.close() + + @abc.abstractmethod + def close(self) -> None: + """Close the file.""" + + @abc.abstractmethod + def __iter__(self) -> t.Iterator[str]: + """Iterate over the file contents as lines.""" + + @abc.abstractmethod + def seekable(self) -> bool: + """Whether the file is seekable.""" + + +class AbstractFileSystem(abc.ABC, t.Generic[_F]): """Abstract class for file system operations.""" - @property @abc.abstractmethod - def root(self) -> _D: - """Get the root path.""" - raise NotImplementedError + def open(self, filename: str, mode: str, newline: str, encoding: str) -> _F: + """Open a file.""" + + @abc.abstractmethod + def modified(self, filename: str) -> datetime.datetime: + """Get the last modified time of a file.""" + + @abc.abstractmethod + def created(self, filename: str) -> datetime.datetime: + """Get the creation time of a file.""" diff --git a/singer_sdk/contrib/filesystem/local.py b/singer_sdk/contrib/filesystem/local.py index 0dd742ec1..d23546a7c 100644 --- a/singer_sdk/contrib/filesystem/local.py +++ b/singer_sdk/contrib/filesystem/local.py @@ -8,99 +8,54 @@ from singer_sdk.contrib.filesystem import base -__all__ = ["LocalDirectory", "LocalFile", "LocalFileSystem"] +if t.TYPE_CHECKING: + import contextlib + +__all__ = ["LocalFile", "LocalFileSystem"] class LocalFile(base.AbstractFile): """Local file operations.""" - def __init__(self, filepath: str | Path): - """Create a new LocalFile instance.""" - self._filepath = filepath - self.path = Path(self._filepath).absolute() - - def __repr__(self) -> str: - """A string representation of the LocalFile. - - Returns: - A string representation of the LocalFile. - """ - return f"LocalFile({self._filepath})" - - def read(self, size: int = -1) -> bytes: - """Read the file contents. + def close(self) -> None: + """Close the file.""" + return self.buffer.close() - Args: - size: Number of bytes to read. If not specified, the entire file is read. - - Returns: - The file contents as a string. - """ - with self.path.open("rb") as file: - return file.read(size) - - @property - def creation_time(self) -> datetime: - """Get the creation time of the file. + def seekable(self) -> bool: # noqa: D102, PLR6301 + return True - Returns: - The creation time of the file. - """ - stat = self.path.stat() - try: - return datetime.fromtimestamp(stat.st_birthtime).astimezone() # type: ignore[attr-defined] - except AttributeError: - return datetime.fromtimestamp(stat.st_ctime).astimezone() - - @property - def modified_time(self) -> datetime: - """Get the last modified time of the file. + def __iter__(self) -> contextlib.Iterator[str]: # noqa: D105 + return iter(self.buffer) - Returns: - The last modified time of the file. - """ - return datetime.fromtimestamp(self.path.stat().st_mtime).astimezone() - -class LocalDirectory(base.AbstractDirectory[LocalFile]): - """Local directory operations.""" - - def __init__(self, dirpath: str | Path): - """Create a new LocalDirectory instance.""" - self._dirpath = dirpath - self.path = Path(self._dirpath).absolute() - - def __repr__(self) -> str: - """A string representation of the LocalDirectory. - - Returns: - A string representation of the LocalDirectory. - """ - return f"LocalDirectory({self._dirpath})" - - def list_contents(self) -> t.Generator[LocalFile | LocalDirectory, None, None]: - """List files in the directory. - - Yields: - A file or directory node - """ - for child in self.path.iterdir(): - if child.is_dir(): - subdir = LocalDirectory(child) - yield subdir - yield from subdir.list_contents() - else: - yield LocalFile(child) - - -class LocalFileSystem(base.AbstractFileSystem[LocalFile, LocalDirectory]): +class LocalFileSystem(base.AbstractFileSystem[LocalFile]): """Local filesystem operations.""" def __init__(self, root: str) -> None: """Create a new LocalFileSystem instance.""" - self._root_dir = LocalDirectory(root) + self._root_path = Path(root).absolute() @property - def root(self) -> LocalDirectory: + def root(self) -> Path: """Get the root path.""" - return self._root_dir + return self._root_path + + def open( # noqa: D102 + self, + filename: str, + *, + mode: base.FileMode = base.FileMode.read, + ) -> LocalFile: + filepath = self.root / filename + return LocalFile(filepath.open(mode=mode), filename) + + def modified(self, filename: str) -> datetime: # noqa: D102 + stat = (self.root / filename).stat() + return datetime.fromtimestamp(stat.st_mtime).astimezone() + + def created(self, filename: str) -> datetime: # noqa: D102 + stat = (self.root / filename).stat() + try: + return datetime.fromtimestamp(stat.st_birthtime).astimezone() # type: ignore[attr-defined] + except AttributeError: + return datetime.fromtimestamp(stat.st_ctime).astimezone() diff --git a/singer_sdk/contrib/filesystem/s3.py b/singer_sdk/contrib/filesystem/s3.py index 423385517..6d9e90fba 100644 --- a/singer_sdk/contrib/filesystem/s3.py +++ b/singer_sdk/contrib/filesystem/s3.py @@ -2,7 +2,9 @@ from __future__ import annotations +import io import typing as t +from datetime import datetime from singer_sdk.contrib.filesystem import base @@ -10,105 +12,88 @@ from datetime import datetime from mypy_boto3_s3.client import S3Client - from mypy_boto3_s3.type_defs import GetObjectOutputTypeDef -__all__ = ["S3Directory", "S3File", "S3FileSystem"] +__all__ = ["S3File", "S3FileSystem"] class S3File(base.AbstractFile): """S3 file operations.""" - def __init__(self, client: S3Client, bucket: str, key: str): - """Create a new S3File instance.""" - self._client = client - self._bucket = bucket - self._key = key - self._s3_object: GetObjectOutputTypeDef | None = None - - def __repr__(self) -> str: - """A string representation of the S3File. + def seekable(self) -> bool: # noqa: PLR6301 + """Check if the file is seekable. Returns: - A string representation of the S3File. + False """ - return f"S3File({self._key})" - - @property - def s3_object(self) -> GetObjectOutputTypeDef: - """Get the S3 object.""" - if self._s3_object is None: - self._s3_object = self._client.get_object( - Bucket=self._bucket, - Key=self._key, - ) + return False - return self._s3_object + def __iter__(self) -> t.Iterator[str]: # noqa: D105 + return iter(self.buffer) - def read(self, size: int = -1) -> bytes: - """Read the file contents. - - Args: - size: Number of bytes to read. If not specified, the entire file is read. - - Returns: - The file contents as a string. - """ - data = self.s3_object["Body"].read(amt=size) - # Clear the cache so that the next read will re-fetch the object - self._s3_object = None - return data - - @property - def modified_time(self) -> datetime: - """Get the last modified time of the file. - - Returns: - The last modified time of the file. - """ - return self.s3_object["LastModified"] - - -class S3Directory(base.AbstractDirectory[S3File]): - """S3 directory operations.""" +class S3FileSystem(base.AbstractFileSystem): + """S3 file system operations.""" - def __init__(self, client: S3Client, bucket: str, prefix: str): - """Create a new S3Directory instance.""" - self._client = client + def __init__( # noqa: D107 + self, + *, + bucket: str, + prefix: str = "", + region: str | None = None, + endpoint_url: str | None = None, + ): + super().__init__() self._bucket = bucket self._prefix = prefix + self._region = region + self._endpoint_url = endpoint_url - def __repr__(self) -> str: - """A string representation of the S3Directory. - - Returns: - A string representation of the S3Directory. - """ - return f"S3Directory({self._prefix})" - - def list_contents(self) -> t.Generator[S3File | S3Directory, None, None]: - """List files in the directory. - - Yields: - A file or directory node - """ - paginator = self._client.get_paginator("list_objects_v2") - for page in paginator.paginate(Bucket=self._bucket, Prefix=self._prefix): - for obj in page.get("Contents", []): - yield S3File(self._client, bucket=self._bucket, key=obj["Key"]) + self._client: S3Client | None = None + @property + def client(self) -> S3Client: + """Get the S3 client.""" + if self._client is None: + import boto3 # noqa: PLC0415 + + self._client = boto3.client( + "s3", + region_name=self._region, + endpoint_url=self._endpoint_url, + ) -class S3FileSystem(base.AbstractFileSystem): - """S3 file system operations.""" + return self._client - def __init__(self, client: S3Client, *, bucket: str, prefix: str): - """Create a new S3FileSystem instance.""" - super().__init__() - self._client = client - self._bucket = bucket - self._prefix = prefix + @property + def bucket(self) -> str: # noqa: D102 + return self._bucket @property - def root(self) -> S3Directory: - """Get the root directory.""" - return S3Directory(self._client, self._bucket, self._prefix) + def prefix(self) -> str: # noqa: D102 + return self._prefix + + def open( # noqa: D102 + self, + filename: str, + *, + mode: base.FileMode = base.FileMode.read, + ) -> S3File: + key = f"{self.prefix}{filename}" + if mode == base.FileMode.read: + response = self._client.get_object(Bucket=self._bucket, Key=key) + return S3File(response["Body"], filename) + if mode == base.FileMode.write: + buffer = io.BytesIO() + self._client.upload_fileobj(buffer, self._bucket, key) + return S3File(buffer, filename) + + msg = "Only read mode is supported." + raise ValueError(msg) + + def modified(self, filename: str) -> datetime: # noqa: D102 + response = self._client.get_object_attributes(Bucket=self._bucket, Key=filename) + return response["LastModified"] + + def created(self, filename: str) -> datetime: # noqa: ARG002, D102, PLR6301 + msg = "S3 does not support file creation time." + raise NotImplementedError(msg) diff --git a/tests/contrib/filesystem/test_local.py b/tests/contrib/filesystem/test_local.py index 109db82b9..75c47e4e1 100644 --- a/tests/contrib/filesystem/test_local.py +++ b/tests/contrib/filesystem/test_local.py @@ -2,85 +2,57 @@ import datetime import typing as t -import unittest.mock + +import pytest from singer_sdk.contrib.filesystem import local if t.TYPE_CHECKING: import pathlib + from pytest_subtests import SubTests + class TestLocalFilesystem: - def test_file_read_text(self, tmp_path: pathlib.Path): + @pytest.fixture + def filesystem(self, tmp_path: pathlib.Path) -> local.LocalFileSystem: + return local.LocalFileSystem(tmp_path) + + def test_file_read(self, filesystem: local.LocalFileSystem): """Test reading a file.""" # Write a test file - path = tmp_path / "test.txt" + path = filesystem.root / "test.txt" path.write_text("Hello, world!") - file = local.LocalFile(path) - assert file.read_text() == "Hello, world!" + with filesystem.open("test.txt") as file: + assert file.read() == b"Hello, world!" - def test_file_read(self, tmp_path: pathlib.Path): - """Test reading a file.""" + def test_file_read_write( + self, + filesystem: local.LocalFileSystem, + subtests: SubTests, + ): + """Test writing a file.""" - # Write a test file - path = tmp_path / "test.txt" - path.write_text("Hello, world!") + with subtests.test("write"), filesystem.open("test.txt", mode="wb") as file: + file.write(b"Hello, world!") - file = local.LocalFile(path) - assert file.read(3) == b"Hel" + with subtests.test("read"), filesystem.open("test.txt") as file: + assert file.read() == b"Hello, world!" - def test_file_creation_time(self, tmp_path: pathlib.Path): + def test_file_creation_time(self, filesystem: local.LocalFileSystem): """Test getting the creation time of a file.""" - path = tmp_path / "test.txt" - path.write_text("Hello, world!") - - file = local.LocalFile(path) - assert isinstance(file.creation_time, datetime.datetime) + with filesystem.open("test.txt", mode="wb") as file: + file.write(b"Hello, world!") - with unittest.mock.patch("pathlib.Path.stat") as mock_stat: - ts = 1704067200 - mock_stat.return_value = unittest.mock.Mock(st_birthtime=ts, st_ctime=ts) - assert file.creation_time.timestamp() == ts + assert isinstance(filesystem.created("test.txt"), datetime.datetime) - def test_file_modified_time(self, tmp_path: pathlib.Path): + def test_file_modified_time(self, filesystem: local.LocalFileSystem): """Test getting the last modified time of a file.""" - path = tmp_path / "test.txt" - path.write_text("Hello, world!") - - file = local.LocalFile(path) - assert isinstance(file.modified_time, datetime.datetime) - - with unittest.mock.patch("pathlib.Path.stat") as mock_stat: - ts = 1704067200 - mock_stat.return_value = unittest.mock.Mock(st_mtime=ts) - assert file.modified_time.timestamp() == ts - - def test_root_list_contents(self, tmp_path: pathlib.Path): - """Test listing a directory.""" - - # Create a directory with a file and a root-level file - (tmp_path / "a.txt").write_text("Hello from the root!") - dirpath = tmp_path / "b" - dirpath.mkdir() - (dirpath / "c.txt").write_text("Hello from a directory!") - - fs = local.LocalFileSystem(tmp_path) - contents = list(fs.root.list_contents()) - assert len(contents) == 3 - - # Get the root file, the directory, and the nested file regardless of order - root_file, directory, nested_file = sorted(contents, key=lambda x: x.path.name) - assert isinstance(root_file, local.LocalFile) - assert root_file.path.name == "a.txt" - assert root_file.read_text() == "Hello from the root!" - - assert isinstance(directory, local.LocalDirectory) - assert directory.path.name == "b" + with filesystem.open("test.txt", mode="wb") as file: + file.write(b"Hello, world!") - assert isinstance(nested_file, local.LocalFile) - assert nested_file.path.name == "c.txt" - assert nested_file.read_text() == "Hello from a directory!" + assert isinstance(filesystem.modified("test.txt"), datetime.datetime) diff --git a/tests/contrib/filesystem/test_s3.py b/tests/contrib/filesystem/test_s3.py index 325e7f594..99f2e098f 100644 --- a/tests/contrib/filesystem/test_s3.py +++ b/tests/contrib/filesystem/test_s3.py @@ -1,63 +1,33 @@ from __future__ import annotations -import datetime import typing as t -import boto3 import moto import pytest from singer_sdk.contrib.filesystem import s3 as s3fs -if t.TYPE_CHECKING: - from mypy_boto3_s3.client import S3Client - class TestS3Filesystem: @pytest.fixture - def client(self) -> t.Generator[S3Client, None, None]: + def filesystem(self) -> t.Generator[s3fs.S3FileSystem, None, None]: + bucket = "test-bucket" + prefix = "path/to/dir/" with moto.mock_aws(): - yield boto3.client("s3") - - @pytest.fixture - def bucket(self, client: S3Client) -> str: - """Return the name of a bucket.""" - value = "test-bucket" - client.create_bucket(Bucket=value) - return "test-bucket" - - def test_file_read_text(self, client: S3Client, bucket: str): - """Test reading a file.""" - client.put_object(Bucket=bucket, Key="test.txt", Body=b"Hello, world!") + fs = s3fs.S3FileSystem(bucket=bucket, prefix=prefix) + fs.client.create_bucket(Bucket=bucket) + yield fs - file = s3fs.S3File(client, bucket=bucket, key="test.txt") - assert file.read_text() == "Hello, world!" - - def test_file_read(self, client: S3Client, bucket: str): + @pytest.mark.xfail + def test_file_read_write(self, filesystem: s3fs.S3FileSystem): """Test reading a file.""" - client.put_object(Bucket=bucket, Key="test.txt", Body=b"Hello, world!") - - file = s3fs.S3File(client, bucket=bucket, key="test.txt") - assert file.read(3) == b"Hel" - - def test_file_modified_time(self, client: S3Client, bucket: str): - """Test getting the last modified time of a file.""" - client.put_object(Bucket=bucket, Key="test.txt", Body=b"Hello, world!") - - file = s3fs.S3File(client, bucket=bucket, key="test.txt") - assert file.modified_time is not None - assert isinstance(file.modified_time, datetime.datetime) - - def test_root_list_contents(self, client: S3Client, bucket: str): - """Test listing a directory.""" - prefix = "path/to/dir/" - key = "path/to/dir/test.txt" - client.put_object(Bucket=bucket, Key=key, Body=b"Hello, world!") - fs = s3fs.S3FileSystem(client, bucket=bucket, prefix=prefix) + # Write a test file + filesystem.client.put_object( + Bucket=filesystem.bucket, + Key=f"{filesystem.prefix}/test.txt", + Body=b"Hello, world!", + ) - contents = list(fs.root.list_contents()) - assert len(contents) == 1 - assert isinstance(contents[0], s3fs.S3File) - assert contents[0]._key == "path/to/dir/test.txt" - assert contents[0].read_text() == "Hello, world!" + with filesystem.open("test.txt") as file: + assert file.read() == b"Hello, world!"