diff --git a/samples/sample_tap_csv/client.py b/samples/sample_tap_csv/client.py index b0ff13923c..c5a40bfe21 100644 --- a/samples/sample_tap_csv/client.py +++ b/samples/sample_tap_csv/client.py @@ -4,14 +4,22 @@ import typing as t from singer_sdk.contrib.filesystem import FileStream +from singer_sdk.contrib.filesystem.stream import SDC_META_FILEPATH if t.TYPE_CHECKING: from singer_sdk.helpers.types import Record +SDC_META_LINE_NUMBER = "_sdc_line_number" + + class CSVStream(FileStream): """CSV stream class.""" + @property + def primary_keys(self) -> t.Sequence[str]: + return (SDC_META_FILEPATH, SDC_META_LINE_NUMBER) + def get_schema(self, path: str) -> dict[str, t.Any]: with self.filesystem.open(path, mode="r") as file: reader = csv.DictReader( @@ -22,10 +30,12 @@ def get_schema(self, path: str) -> dict[str, t.Any]: doublequote=self.config["doublequote"], lineterminator=self.config["lineterminator"], ) - return { + schema = { "type": "object", "properties": {key: {"type": "string"} for key in reader.fieldnames}, } + schema["properties"][SDC_META_LINE_NUMBER] = {"type": "integer"} + return schema def read_file(self, path: str) -> t.Iterable[Record]: with self.filesystem.open(path, mode="r") as file: @@ -37,4 +47,6 @@ def read_file(self, path: str) -> t.Iterable[Record]: doublequote=self.config["doublequote"], lineterminator=self.config["lineterminator"], ) - yield from reader + for record in reader: + record[SDC_META_LINE_NUMBER] = reader.line_num + yield record diff --git a/singer_sdk/contrib/filesystem/stream.py b/singer_sdk/contrib/filesystem/stream.py index a1b71da160..ed6045086b 100644 --- a/singer_sdk/contrib/filesystem/stream.py +++ b/singer_sdk/contrib/filesystem/stream.py @@ -132,10 +132,15 @@ def get_records( record[SDC_META_FILEPATH] = path yield record - @abc.abstractmethod - def read_file(self, path: str) -> t.Iterable[Record]: - """Return a generator of records from the file.""" - @abc.abstractmethod def get_schema(self, path: str) -> dict[str, t.Any]: """Return the schema for the file.""" + + @property + @abc.abstractmethod + def primary_keys(self) -> t.Sequence[str]: + """Return the primary key for the stream.""" + + @abc.abstractmethod + def read_file(self, path: str) -> t.Iterable[Record]: + """Return a generator of records from the file.""" diff --git a/singer_sdk/contrib/filesystem/tap.py b/singer_sdk/contrib/filesystem/tap.py index 6e3b07f47c..f2fd7c3520 100644 --- a/singer_sdk/contrib/filesystem/tap.py +++ b/singer_sdk/contrib/filesystem/tap.py @@ -89,8 +89,16 @@ class FolderTap(Tap, t.Generic[_T]): """Singer tap for files in a directory.""" valid_extensions: tuple[str, ...] + """Valid file extensions for this tap. + + Files with extensions not in this list will be ignored. + """ default_stream_class: type[_T] + """The default stream class to use for this tap. + + This should be a subclass of `FileStream`. + """ config_jsonschema: t.ClassVar[dict] = {"properties": {}}