Skip to content

Commit

Permalink
Use DirFileSystem wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Nov 30, 2024
1 parent cbc4486 commit edbd7e0
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 14 deletions.
30 changes: 18 additions & 12 deletions singer_sdk/contrib/filesystem/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import enum
import functools
import logging
import os
import typing as t
from pathlib import Path

import fsspec
import fsspec.implementations
import fsspec.implementations.dirfs

import singer_sdk.typing as th
from singer_sdk import Tap
Expand Down Expand Up @@ -138,6 +139,11 @@ def read_mode(self) -> ReadMode:
"""Folder read mode."""
return ReadMode(self.config["read_mode"])

@functools.cached_property
def path(self) -> str:
"""Return the path to the directory."""
return self.config["path"]

@functools.cached_property
def fs(self) -> fsspec.AbstractFileSystem:
"""Return the filesystem object.
Expand All @@ -147,13 +153,14 @@ def fs(self) -> fsspec.AbstractFileSystem:
"""
protocol = self.config["filesystem"]
if protocol != "local" and protocol not in self.config: # pragma: no cover
msg = "Filesytem configuration is missing"
msg = "Filesystem configuration is missing"
raise ConfigValidationError(
msg,
errors=[f"Missing configuration for filesystem {protocol}"],
)
logger.info("Instatiating filesystem inteface: '%s'", protocol)
return fsspec.filesystem(protocol, **self.config.get(protocol, {}))
logger.info("Instantiating filesystem interface: '%s'", protocol)
fs = fsspec.filesystem(protocol, **self.config.get(protocol, {}))
return fsspec.implementations.dirfs.DirFileSystem(path=self.path, fs=fs)

def discover_streams(self) -> list:
"""Return a list of discovered streams.
Expand All @@ -162,11 +169,9 @@ def discover_streams(self) -> list:
ValueError: If the path does not exist or is not a directory.
"""
# A directory for now, but could be a glob pattern.
path: str = self.config["path"]

if not self.fs.exists(path) or not self.fs.isdir(path): # pragma: no cover
if not self.fs.exists(".") or not self.fs.isdir("."): # pragma: no cover
# Raise a more specific error if the path is not a directory.
msg = f"Path {path} does not exist or is not a directory"
msg = f"Path {self.path} does not exist or is not a directory"
raise ValueError(msg)

# One stream per file
Expand All @@ -175,11 +180,12 @@ def discover_streams(self) -> list:
self.default_stream_class(
tap=self,
name=file_path_to_stream_name(member["name"]),
filepaths=[os.path.join(path, member["name"])], # noqa: PTH118
filepaths=[member["name"]],
filesystem=self.fs,
)
for member in self.fs.listdir(path)
if member["name"].endswith(self.valid_extensions)
for member in self.fs.listdir(".")
if member["type"] == "file"
and member["name"].endswith(self.valid_extensions)
]

# Merge
Expand All @@ -189,7 +195,7 @@ def discover_streams(self) -> list:
name=self.config["stream_name"],
filepaths=[
member["name"]
for member in self.fs.listdir(path)
for member in self.fs.listdir(".")
if member["type"] == "file"
and member["name"].endswith(self.valid_extensions)
],
Expand Down
19 changes: 17 additions & 2 deletions tests/samples/test_tap_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile):
"customers": {
"partitions": [
{
"context": {"_sdc_path": "fixtures/csv/customers.csv"},
"context": {"_sdc_path": "./customers.csv"},
"replication_key": "_sdc_modified_at",
"replication_key_value": FUTURE.isoformat(),
}
Expand All @@ -57,7 +57,7 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile):
"employees": {
"partitions": [
{
"context": {"_sdc_path": "fixtures/csv/employees.csv"},
"context": {"_sdc_path": "./employees.csv"},
"replication_key": "_sdc_modified_at",
"replication_key_value": FUTURE.isoformat(),
}
Expand All @@ -80,6 +80,21 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile):


class TestCSVOneStreamPerFileIncremental(_TestCSVOneStreamPerFileIncremental):
def test_tap_stream_transformed_catalog_schema_matches_record(
self,
config: SuiteConfig,
resource: t.Any,
runner: TapTestRunner,
stream: CSVStream,
):
with pytest.warns(UserWarning):
super().test_tap_stream_transformed_catalog_schema_matches_record(
config,
resource,
runner,
stream,
)

def test_tap_stream_returns_record(
self,
config: SuiteConfig,
Expand Down

0 comments on commit edbd7e0

Please sign in to comment.