Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add a file (CSV) tap for testing (WIP!) #2668

Merged
merged 10 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ jobs:
nox --version

- uses: actions/cache@v4
if: always() && (matrix.session == 'tests')
if: matrix.session == 'tests'
with:
path: http_cache.sqlite
key: http_cache-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.session }}-${{ matrix.sqlalchemy }}
key: http_cache-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.sqlalchemy }}

- name: Run Nox
env:
Expand All @@ -106,7 +106,7 @@ jobs:
if: always() && (matrix.session == 'tests')
with:
include-hidden-files: true
name: coverage-data-nox_${{ matrix.session }}-${{ matrix.os }}-py${{ matrix.python-version }}_sqlalchemy_${{ matrix.sqlalchemy }}
name: coverage-data-nox_-${{ matrix.os }}-py${{ matrix.python-version }}_sqlalchemy_${{ matrix.sqlalchemy }}
path: ".coverage.*"

tests-external:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# HTTP cache
http_cache.sqlite

# Local Poetry configuration file

poetry.toml
Expand Down
1,001 changes: 1,001 additions & 0 deletions fixtures/csv/customers.csv

Large diffs are not rendered by default.

51 changes: 51 additions & 0 deletions fixtures/csv/employees.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
id,first_name,last_name,email,ip_address
1,Tobye,Tallach,[email protected],159.80.54.64
2,Bret,Auchterlonie,[email protected],63.179.228.179
3,Chester,Leban,[email protected],109.23.123.220
4,Weston,Venny,[email protected],80.78.0.69
5,Alejoa,Hassen,[email protected],193.70.126.231
6,Otes,Ioselevich,[email protected],55.238.240.160
7,Dolley,Mc Ilwrick,[email protected],225.224.151.67
8,Cliff,Druitt,[email protected],216.35.85.142
9,Alfreda,Parysiak,[email protected],234.124.93.69
10,Alfonso,Wotherspoon,[email protected],34.94.1.132
11,Jemmy,Gavriel,[email protected],69.13.142.245
12,Ezechiel,Binion,[email protected],85.203.127.191
13,Burk,Blowfelde,[email protected],74.133.42.177
14,Danette,Brealey,[email protected],249.85.157.243
15,Brent,Collcutt,[email protected],68.202.67.52
16,Filbert,Wane,[email protected],51.190.146.189
17,Amory,Brewers,[email protected],147.155.225.194
18,Giraud,Reen,[email protected],134.254.177.66
19,Burtie,Siebert,[email protected],47.194.48.217
20,Adam,Maddick,[email protected],165.16.248.228
21,Callean,Vernall,[email protected],243.145.198.197
22,Olympie,Itzakovitz,[email protected],40.55.240.15
23,Jacky,Emney,[email protected],216.72.80.81
24,Isidoro,Novello,[email protected],153.171.11.150
25,Kora,Liversedge,[email protected],131.126.97.242
26,Salaidh,McMenamie,[email protected],228.231.31.219
27,Corey,Dowdeswell,[email protected],203.73.30.64
28,Brodie,Holwell,[email protected],64.200.225.25
29,Trudey,Ungerer,[email protected],122.82.88.41
30,Doralin,Maxted,[email protected],37.78.14.199
31,Maurie,Marklin,[email protected],22.181.178.6
32,Hermann,Voase,[email protected],155.126.157.84
33,Fanchette,Callaway,[email protected],121.161.80.246
34,Sara-ann,Birdall,[email protected],35.64.166.83
35,Harriot,Clipsham,[email protected],78.103.253.219
36,Bonita,Woolway,[email protected],70.114.50.135
37,Arleyne,MacComiskey,[email protected],80.22.221.216
38,Ethelbert,Covill,[email protected],52.66.186.124
39,Irita,Knee,[email protected],239.247.34.120
40,Naoma,Janca,[email protected],189.63.152.60
41,Kayne,Mizzen,[email protected],84.133.236.10
42,Estell,Stuckford,[email protected],246.168.153.22
43,Larine,Stack,[email protected],197.176.195.68
44,Rikki,Newbold,[email protected],27.245.43.243
45,Romonda,Charer,[email protected],137.144.236.93
46,Letizia,Monksfield,[email protected],209.47.5.147
47,Sinclare,McAreavey,[email protected],251.250.216.206
48,Athene,Haysham,[email protected],81.227.231.240
49,Gale,Tracy,[email protected],93.138.226.205
50,Dareen,O'Shields,[email protected],27.226.127.240
Empty file.
5 changes: 5 additions & 0 deletions samples/sample_tap_csv/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from samples.sample_tap_csv.sample_tap_csv import SampleTapCSV

SampleTapCSV.cli()
91 changes: 91 additions & 0 deletions samples/sample_tap_csv/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from __future__ import annotations

import csv
import datetime
import os
import typing as t

from singer_sdk import Stream
from singer_sdk.streams.core import REPLICATION_INCREMENTAL

if t.TYPE_CHECKING:
from singer_sdk.helpers.types import Context, Record
from singer_sdk.tap_base import Tap

SDC_META_FILEPATH = "_sdc_path"
SDC_META_MODIFIED_AT = "_sdc_modified_at"


def _to_datetime(value: float) -> str:
return datetime.datetime.fromtimestamp(value).astimezone()


class CSVStream(Stream):
"""CSV stream class."""

def __init__(
self,
tap: Tap,
name: str | None = None,
*,
partitions: list[str] | None = None,
) -> None:
# TODO(edgarmondragon): Build schema from CSV file.
schema = {
"type": ["object"],
"properties": {
SDC_META_FILEPATH: {"type": "string"},
SDC_META_MODIFIED_AT: {"type": "string", "format": "date-time"},
},
"required": [],
"additionalProperties": {"type": "string"},
}
super().__init__(tap, schema, name)

# TODO(edgarrmondragon): Make this None if the filesytem does not support it.
self.replication_key = SDC_META_MODIFIED_AT

self._partitions = partitions or []

@property
def partitions(self) -> list[Context]:
return self._partitions

def _read_file(self, path: str) -> t.Iterable[Record]: # noqa: PLR6301
# Make these configurable.
delimiter = ","
quotechar = '"'
escapechar = None
doublequote = True
lineterminator = "\r\n"

# TODO: Use filesytem-specific file open method.
with open(path, encoding="utf-8") as file: # noqa: PTH123
reader = csv.DictReader(
file,
delimiter=delimiter,
quotechar=quotechar,
escapechar=escapechar,
doublequote=doublequote,
lineterminator=lineterminator,
)
yield from reader

def get_records(
self,
context: Context | None,
) -> t.Iterable[Record | tuple[Record, Context | None]]:
path: str = context[SDC_META_FILEPATH]
mtime = os.path.getmtime(path) # noqa: PTH204

if (
self.replication_method is REPLICATION_INCREMENTAL
and (previous_bookmark := self.get_starting_timestamp(context))
and _to_datetime(mtime) < previous_bookmark
):
self.logger.info("File has not been modified since last read, skipping")
return

for record in self._read_file(path):
record[SDC_META_MODIFIED_AT] = _to_datetime(mtime)
yield record
106 changes: 106 additions & 0 deletions samples/sample_tap_csv/sample_tap_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Sample Tap for CSV files."""

from __future__ import annotations

import enum
import functools
import os

import singer_sdk.typing as th
from samples.sample_tap_csv.client import SDC_META_FILEPATH, CSVStream
from singer_sdk import Tap

DEFAULT_MERGE_STREAM_NAME = "files"


def file_path_to_stream_name(file_path: str) -> str:
"""Convert a file path to a stream name."""
return os.path.basename(file_path).replace(".csv", "").replace(os.sep, "__") # noqa: PTH119


class ReadMode(str, enum.Enum):
"""Sync mode for the tap."""

one_stream_per_file = "one_stream_per_file"
merge = "merge"


class SampleTapCSV(Tap):
"""Sample Tap for CSV files."""

name = "sample-tap-csv"

config_jsonschema = th.PropertiesList(
th.Property(
"path",
th.StringType,
required=True,
description="Path to CSV files.",
),
th.Property(
"read_mode",
th.StringType,
required=True,
description=(
"Use `one_stream_per_file` to read each file as a separate stream, or "
"`merge` to merge all files into a single stream."
),
allowed_values=[
ReadMode.one_stream_per_file,
ReadMode.merge,
],
),
th.Property(
"stream_name",
th.StringType,
required=True,
default=DEFAULT_MERGE_STREAM_NAME,
description="Name of the stream to use when `read_mode` is `merge`.",
),
# TODO(edgarmondragon): Other configuration options.
).to_dict()

@functools.cached_property
def read_mode(self) -> ReadMode:
return ReadMode(self.config["read_mode"])

def discover_streams(self) -> list:
# TODO(edgarmondragon): Implement stream discovery, based on the configured path
# and read mode.
path: str = self.config[
"path"
] # a directory for now, but could be a glob pattern

# One stream per file
if self.read_mode == ReadMode.one_stream_per_file:
if os.path.isdir(path): # noqa: PTH112
return [
CSVStream(
tap=self,
name=file_path_to_stream_name(member),
partitions=[{SDC_META_FILEPATH: os.path.join(path, member)}], # noqa: PTH118
)
for member in os.listdir(path)
if member.endswith(".csv")
]

msg = f"Path {path} is not a directory."
raise ValueError(msg)

# Merge
if os.path.isdir(path): # noqa: PTH112
contexts = [
{
SDC_META_FILEPATH: os.path.join(path, member), # noqa: PTH118
}
for member in os.listdir(path)
if member.endswith(".csv")
]
return [
CSVStream(
tap=self,
name=self.config["stream_name"],
partitions=contexts,
)
]
return []
36 changes: 36 additions & 0 deletions tests/samples/test_tap_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

import pytest

from samples.sample_tap_csv.sample_tap_csv import SampleTapCSV
from singer_sdk.testing import get_tap_test_class

_TestCSVMerge = get_tap_test_class(
tap_class=SampleTapCSV,
config={
"path": "fixtures/csv",
"read_mode": "merge",
"stream_name": "people",
},
)


class TestCSVMerge(_TestCSVMerge):
@pytest.mark.xfail(reason="Schema generation not implemented", strict=True)
def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str):
super().test_tap_stream_record_schema_matches_transformed_catalog(stream)


TestCSVOneStreamPerFile = get_tap_test_class(
tap_class=SampleTapCSV,
config={
"path": "fixtures/csv",
"read_mode": "one_stream_per_file",
},
)


class TestCSVOneStreamPerFile(TestCSVOneStreamPerFile):
@pytest.mark.xfail(reason="Schema generation not implemented", strict=True)
def test_tap_stream_record_schema_matches_transformed_catalog(self, stream: str):
super().test_tap_stream_record_schema_matches_transformed_catalog(stream)
Loading