From 0fd556a556f18f44b2c02eb07e384e3e1aab083d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 17 Sep 2024 13:16:55 -0600 Subject: [PATCH] Use fsspec --- poetry.lock | 41 +++++++++++++++++++++++++++++++- pyproject.toml | 5 ++-- samples/sample_tap_csv/client.py | 25 +++++++++++++------ 3 files changed, 61 insertions(+), 10 deletions(-) diff --git a/poetry.lock b/poetry.lock index b261a08cc..430180608 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1187,6 +1187,45 @@ boto3 = ">=1.9,<2.0" fs = ">=2.4,<3.0" six = ">=1.10,<2.0" +[[package]] +name = "fsspec" +version = "2024.9.0" +description = "File-system specification" +optional = false +python-versions = ">=3.8" +files = [ + {file = "fsspec-2024.9.0-py3-none-any.whl", hash = "sha256:a0947d552d8a6efa72cc2c730b12c41d043509156966cca4fb157b0f2a0c574b"}, + {file = "fsspec-2024.9.0.tar.gz", hash = "sha256:4b0afb90c2f21832df142f292649035d80b421f60a9e1c027802e5a0da2b04e8"}, +] + +[package.extras] +abfs = ["adlfs"] +adl = ["adlfs"] +arrow = ["pyarrow (>=1)"] +dask = ["dask", "distributed"] +dev = ["pre-commit", "ruff"] +doc = ["numpydoc", "sphinx", "sphinx-design", "sphinx-rtd-theme", "yarl"] +dropbox = ["dropbox", "dropboxdrivefs", "requests"] +full = ["adlfs", "aiohttp (!=4.0.0a0,!=4.0.0a1)", "dask", "distributed", "dropbox", "dropboxdrivefs", "fusepy", "gcsfs", "libarchive-c", "ocifs", "panel", "paramiko", "pyarrow (>=1)", "pygit2", "requests", "s3fs", "smbprotocol", "tqdm"] +fuse = ["fusepy"] +gcs = ["gcsfs"] +git = ["pygit2"] +github = ["requests"] +gs = ["gcsfs"] +gui = ["panel"] +hdfs = ["pyarrow (>=1)"] +http = ["aiohttp (!=4.0.0a0,!=4.0.0a1)"] +libarchive = ["libarchive-c"] +oci = ["ocifs"] +s3 = ["s3fs"] +sftp = ["paramiko"] +smb = ["smbprotocol"] +ssh = ["paramiko"] +test = ["aiohttp (!=4.0.0a0,!=4.0.0a1)", "numpy", "pytest", "pytest-asyncio (!=0.22.0)", "pytest-benchmark", "pytest-cov", "pytest-mock", "pytest-recording", "pytest-rerunfailures", "requests"] +test-downstream = ["aiobotocore (>=2.5.4,<3.0.0)", "dask-expr", "dask[dataframe,test]", "moto[server] (>4,<5)", "pytest-timeout", "xarray"] +test-full = ["adlfs", "aiohttp (!=4.0.0a0,!=4.0.0a1)", "cloudpickle", "dask", "distributed", "dropbox", "dropboxdrivefs", "fastparquet", "fusepy", "gcsfs", "jinja2", "kerchunk", "libarchive-c", "lz4", "notebook", "numpy", "ocifs", "pandas", "panel", "paramiko", "pyarrow", "pyarrow (>=1)", "pyftpdlib", "pygit2", "pytest", "pytest-asyncio (!=0.22.0)", "pytest-benchmark", "pytest-cov", "pytest-mock", "pytest-recording", "pytest-rerunfailures", "python-snappy", "requests", "smbprotocol", "tqdm", "urllib3", "zarr", "zstandard"] +tqdm = ["tqdm"] + [[package]] name = "furo" version = "2024.8.6" @@ -3282,4 +3321,4 @@ testing = ["pytest"] [metadata] lock-version = "2.0" python-versions = ">=3.8" -content-hash = "e2e23042459ce09c426685f6dfd64ae78cb76539e87674e614215d76d7de1c9c" +content-hash = "69fad05e83a513220857484b23016656005116deb6f541b32311cd366ef047d3" diff --git a/pyproject.toml b/pyproject.toml index 37651e4f8..deab6f0d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,8 +43,9 @@ backoff = { version = ">=2.0.0", python = "<4" } backports-datetime-fromisoformat = { version = ">=2.0.1", python = "<3.11" } click = "~=8.0" fs = ">=2.4.16" -importlib-metadata = {version = "<9.0.0", python = "<3.12"} -importlib-resources = {version = ">=5.12.0,!=6.2.0,!=6.3.0,!=6.3.1", python = "<3.10"} +fsspec = ">=2024.9.0" +importlib-metadata = { version = "<9.0.0", python = "<3.12" } +importlib-resources = { version = ">=5.12.0,!=6.2.0,!=6.3.0,!=6.3.1", python = "<3.10" } inflection = ">=0.5.1" joblib = ">=1.3.0" jsonpath-ng = ">=1.5.3" diff --git a/samples/sample_tap_csv/client.py b/samples/sample_tap_csv/client.py index a35fd3510..6aa3b1850 100644 --- a/samples/sample_tap_csv/client.py +++ b/samples/sample_tap_csv/client.py @@ -2,10 +2,12 @@ import csv import datetime -import os import typing as t +import fsspec + from singer_sdk import Stream +from singer_sdk.helpers._util import utc_now # noqa: PLC2701 from singer_sdk.streams.core import REPLICATION_INCREMENTAL if t.TYPE_CHECKING: @@ -47,11 +49,14 @@ def __init__( self._partitions = partitions or [] + self.filesystem: fsspec.AbstractFileSystem = fsspec.filesystem("local") + self._sync_start_time = utc_now() + @property def partitions(self) -> list[Context]: return self._partitions - def _read_file(self, path: str) -> t.Iterable[Record]: # noqa: PLR6301 + def _read_file(self, path: str) -> t.Iterable[Record]: # Make these configurable. delimiter = "," quotechar = '"' @@ -59,8 +64,7 @@ def _read_file(self, path: str) -> t.Iterable[Record]: # noqa: PLR6301 doublequote = True lineterminator = "\r\n" - # TODO: Use filesytem-specific file open method. - with open(path, encoding="utf-8") as file: # noqa: PTH123 + with self.filesystem.open(path, mode="r") as file: reader = csv.DictReader( file, delimiter=delimiter, @@ -76,16 +80,23 @@ def get_records( context: Context | None, ) -> t.Iterable[Record | tuple[Record, Context | None]]: path: str = context[SDC_META_FILEPATH] - mtime = os.path.getmtime(path) # noqa: PTH204 + + mtime: datetime.datetime | None + try: + mtime: datetime.datetime = self.filesystem.modified(path) + except NotImplementedError: + self.logger.warning("Filesystem does not support modified time") + mtime = None if ( self.replication_method is REPLICATION_INCREMENTAL and (previous_bookmark := self.get_starting_timestamp(context)) - and _to_datetime(mtime) < previous_bookmark + and mtime is not None + and mtime < previous_bookmark ): self.logger.info("File has not been modified since last read, skipping") return for record in self._read_file(path): - record[SDC_META_MODIFIED_AT] = _to_datetime(mtime) + record[SDC_META_MODIFIED_AT] = mtime or self._sync_start_time yield record