From e2fa537163da74b3c518ac27cffeb97452825228 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 17 Sep 2024 11:49:48 -0600 Subject: [PATCH] Incremental replication --- samples/sample_tap_csv/client.py | 43 ++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/samples/sample_tap_csv/client.py b/samples/sample_tap_csv/client.py index 15e493bfe..a35fd3510 100644 --- a/samples/sample_tap_csv/client.py +++ b/samples/sample_tap_csv/client.py @@ -1,18 +1,28 @@ from __future__ import annotations import csv +import datetime +import os import typing as t from singer_sdk import Stream +from singer_sdk.streams.core import REPLICATION_INCREMENTAL if t.TYPE_CHECKING: from singer_sdk.helpers.types import Context, Record from singer_sdk.tap_base import Tap -SDC_META_FILEPATH = "_sdc_file" +SDC_META_FILEPATH = "_sdc_path" +SDC_META_MODIFIED_AT = "_sdc_modified_at" + + +def _to_datetime(value: float) -> str: + return datetime.datetime.fromtimestamp(value).astimezone() class CSVStream(Stream): + """CSV stream class.""" + def __init__( self, tap: Tap, @@ -25,23 +35,23 @@ def __init__( "type": ["object"], "properties": { SDC_META_FILEPATH: {"type": "string"}, + SDC_META_MODIFIED_AT: {"type": "string", "format": "date-time"}, }, "required": [], "additionalProperties": {"type": "string"}, } super().__init__(tap, schema, name) + + # TODO(edgarrmondragon): Make this None if the filesytem does not support it. + self.replication_key = SDC_META_MODIFIED_AT + self._partitions = partitions or [] @property def partitions(self) -> list[Context]: return self._partitions - def get_records( # noqa: PLR6301 - self, - context: Context | None, - ) -> t.Iterable[Record | tuple[Record, Context | None]]: - path: str = context[SDC_META_FILEPATH] - + def _read_file(self, path: str) -> t.Iterable[Record]: # noqa: PLR6301 # Make these configurable. delimiter = "," quotechar = '"' @@ -60,3 +70,22 @@ def get_records( # noqa: PLR6301 lineterminator=lineterminator, ) yield from reader + + def get_records( + self, + context: Context | None, + ) -> t.Iterable[Record | tuple[Record, Context | None]]: + path: str = context[SDC_META_FILEPATH] + mtime = os.path.getmtime(path) # noqa: PTH204 + + if ( + self.replication_method is REPLICATION_INCREMENTAL + and (previous_bookmark := self.get_starting_timestamp(context)) + and _to_datetime(mtime) < previous_bookmark + ): + self.logger.info("File has not been modified since last read, skipping") + return + + for record in self._read_file(path): + record[SDC_META_MODIFIED_AT] = _to_datetime(mtime) + yield record