Skip to content

Commit

Permalink
Incremental replication
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Sep 17, 2024
1 parent 4276912 commit e2fa537
Showing 1 changed file with 36 additions and 7 deletions.
43 changes: 36 additions & 7 deletions samples/sample_tap_csv/client.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
from __future__ import annotations

import csv
import datetime
import os
import typing as t

from singer_sdk import Stream
from singer_sdk.streams.core import REPLICATION_INCREMENTAL

if t.TYPE_CHECKING:
from singer_sdk.helpers.types import Context, Record
from singer_sdk.tap_base import Tap

SDC_META_FILEPATH = "_sdc_file"
SDC_META_FILEPATH = "_sdc_path"
SDC_META_MODIFIED_AT = "_sdc_modified_at"


def _to_datetime(value: float) -> str:
return datetime.datetime.fromtimestamp(value).astimezone()


class CSVStream(Stream):
"""CSV stream class."""

def __init__(
self,
tap: Tap,
Expand All @@ -25,23 +35,23 @@ def __init__(
"type": ["object"],
"properties": {
SDC_META_FILEPATH: {"type": "string"},
SDC_META_MODIFIED_AT: {"type": "string", "format": "date-time"},
},
"required": [],
"additionalProperties": {"type": "string"},
}
super().__init__(tap, schema, name)

# TODO(edgarrmondragon): Make this None if the filesytem does not support it.
self.replication_key = SDC_META_MODIFIED_AT

self._partitions = partitions or []

@property
def partitions(self) -> list[Context]:
return self._partitions

def get_records( # noqa: PLR6301
self,
context: Context | None,
) -> t.Iterable[Record | tuple[Record, Context | None]]:
path: str = context[SDC_META_FILEPATH]

def _read_file(self, path: str) -> t.Iterable[Record]: # noqa: PLR6301
# Make these configurable.
delimiter = ","
quotechar = '"'
Expand All @@ -60,3 +70,22 @@ def get_records( # noqa: PLR6301
lineterminator=lineterminator,
)
yield from reader

def get_records(
self,
context: Context | None,
) -> t.Iterable[Record | tuple[Record, Context | None]]:
path: str = context[SDC_META_FILEPATH]
mtime = os.path.getmtime(path) # noqa: PTH204

if (
self.replication_method is REPLICATION_INCREMENTAL
and (previous_bookmark := self.get_starting_timestamp(context))
and _to_datetime(mtime) < previous_bookmark
):
self.logger.info("File has not been modified since last read, skipping")
return

for record in self._read_file(path):
record[SDC_META_MODIFIED_AT] = _to_datetime(mtime)
yield record

0 comments on commit e2fa537

Please sign in to comment.