Skip to content

Commit

Permalink
Merge branch 'main' into 753-validate-taptarget-config-options
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Aug 23, 2022
2 parents ad77afa + d94e0d8 commit 90cc541
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 33 deletions.
32 changes: 30 additions & 2 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,29 @@ def _write_replication_key_signpost(
state = self.get_context_state(context)
write_replication_key_signpost(state, value)

def compare_start_date(self, value: str, start_date_value: str) -> str:
"""Compare a bookmark value to a start date and return the most recent value.
If the replication key is a datetime-formatted string, this method will parse
the value and compare it to the start date. Otherwise, the bookmark value is
returned.
If the tap uses a non-datetime replication key (e.g. an UNIX timestamp), the
developer is encouraged to override this method to provide custom logic for
comparing the bookmark value to the start date.
Args:
value: The replication key value.
start_date_value: The start date value from the config.
Returns:
The most recent value between the bookmark and start date.
"""
if self.is_timestamp_replication_key:
return max(value, start_date_value, key=pendulum.parse)
else:
return value

def _write_starting_replication_value(self, context: Optional[dict]) -> None:
"""Write the starting replication value, if available.
Expand All @@ -320,8 +343,13 @@ def _write_starting_replication_value(self, context: Optional[dict]) -> None:
):
value = replication_key_value

elif "start_date" in self.config:
value = self.config["start_date"]
# Use start_date if it is more recent than the replication_key state
start_date_value: Optional[str] = self.config.get("start_date")
if start_date_value:
if not value:
value = start_date_value
else:
value = self.compare_start_date(value, start_date_value)

write_starting_replication_value(state, value)

Expand Down
157 changes: 126 additions & 31 deletions tests/core/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from singer_sdk.exceptions import ConfigValidationError
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._singer import Catalog, CatalogEntry, MetadataMapping
from singer_sdk.helpers.jsonpath import _compile_jsonpath
from singer_sdk.streams.core import (
REPLICATION_FULL_TABLE,
Expand All @@ -30,6 +31,8 @@
StringType,
)

CONFIG_START_DATE = "2021-01-01"


class SimpleTestStream(Stream):
"""Test stream class."""
Expand All @@ -53,6 +56,26 @@ def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
yield {"id": 3, "value": "India"}


class UnixTimestampIncrementalStream(SimpleTestStream):
name = "unix_ts"
schema = PropertiesList(
Property("id", IntegerType, required=True),
Property("value", StringType, required=True),
Property("updatedAt", IntegerType, required=True),
).to_dict()
replication_key = "updatedAt"


class UnixTimestampIncrementalStream2(UnixTimestampIncrementalStream):
name = "unix_ts_override"

def compare_start_date(self, value: str, start_date_value: str) -> str:
"""Compare a value to a start date value."""

start_timestamp = pendulum.parse(start_date_value).format("X")
return max(value, start_timestamp, key=float)


class RestTestStream(RESTStream):
"""Test RESTful stream class."""

Expand Down Expand Up @@ -92,32 +115,23 @@ class SimpleTestTap(Tap):

def discover_streams(self) -> list[Stream]:
"""List all streams."""
return [SimpleTestStream(self)]
return [
SimpleTestStream(self),
UnixTimestampIncrementalStream(self),
UnixTimestampIncrementalStream2(self),
]


@pytest.fixture
def tap() -> SimpleTestTap:
"""Tap instance."""
catalog_dict = {
"streams": [
{
"key_properties": ["id"],
"tap_stream_id": SimpleTestStream.name,
"stream": SimpleTestStream.name,
"schema": SimpleTestStream.schema,
"replication_method": REPLICATION_FULL_TABLE,
"replication_key": None,
}
]
}
return SimpleTestTap(
config={
"username": "utest",
"password": "ptest",
"start_date": "2021-01-01",
},
parse_env_config=False,
catalog=catalog_dict,
)


Expand All @@ -127,47 +141,128 @@ def stream(tap: SimpleTestTap) -> SimpleTestStream:
return cast(SimpleTestStream, tap.load_streams()[0])


@pytest.fixture
def unix_timestamp_stream(tap: SimpleTestTap) -> UnixTimestampIncrementalStream:
"""Create a new stream instance."""
return cast(UnixTimestampIncrementalStream, tap.load_streams()[1])


def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream):
"""Applying a catalog to a stream should overwrite fields."""
assert stream.primary_keys == []
assert stream.replication_key == "updatedAt"
assert stream.replication_method == REPLICATION_INCREMENTAL
assert stream.forced_replication_method is None

assert tap.input_catalog is not None
stream.apply_catalog(catalog=tap.input_catalog)
stream.apply_catalog(
catalog=Catalog.from_dict(
{
"streams": [
{
"tap_stream_id": stream.name,
"metadata": MetadataMapping(),
"key_properties": ["id"],
"stream": stream.name,
"schema": stream.schema,
"replication_method": REPLICATION_FULL_TABLE,
"replication_key": None,
}
]
}
)
)

assert stream.primary_keys == ["id"]
assert stream.replication_key is None
assert stream.replication_method == REPLICATION_FULL_TABLE
assert stream.forced_replication_method == REPLICATION_FULL_TABLE


def test_stream_starting_timestamp(tap: SimpleTestTap, stream: SimpleTestStream):
"""Validate state and start_time setting handling."""
timestamp_value = "2021-02-01"
@pytest.mark.parametrize(
"stream_name,bookmark_value,expected_starting_value",
[
pytest.param(
"test",
None,
pendulum.parse(CONFIG_START_DATE),
id="datetime-repl-key-no-state",
),
pytest.param(
"test",
"2021-02-01",
pendulum.datetime(2021, 2, 1),
id="datetime-repl-key-recent-bookmark",
),
pytest.param(
"test",
"2020-01-01",
pendulum.parse(CONFIG_START_DATE),
id="datetime-repl-key-old-bookmark",
),
pytest.param(
"unix_ts",
None,
CONFIG_START_DATE,
id="naive-unix-ts-repl-key-no-state",
),
pytest.param(
"unix_ts",
"1612137600",
"1612137600",
id="naive-unix-ts-repl-key-recent-bookmark",
),
pytest.param(
"unix_ts",
"1577858400",
"1577858400",
id="naive-unix-ts-repl-key-old-bookmark",
),
pytest.param(
"unix_ts_override",
None,
CONFIG_START_DATE,
id="unix-ts-repl-key-no-state",
),
pytest.param(
"unix_ts_override",
"1612137600",
"1612137600",
id="unix-ts-repl-key-recent-bookmark",
),
pytest.param(
"unix_ts_override",
"1577858400",
pendulum.parse(CONFIG_START_DATE).format("X"),
id="unix-ts-repl-key-old-bookmark",
),
],
)
def test_stream_starting_timestamp(
tap: SimpleTestTap,
stream_name: str,
bookmark_value: str,
expected_starting_value: Any,
):
"""Test the starting timestamp for a stream."""
stream = tap.streams[stream_name]

if stream.is_timestamp_replication_key:
get_starting_value = stream.get_starting_timestamp
else:
get_starting_value = stream.get_starting_replication_key_value

stream._write_starting_replication_value(None)
assert stream.get_starting_timestamp(None) == pendulum.parse(
cast(str, stream.config.get("start_date"))
)
tap.load_state(
{
"bookmarks": {
stream.name: {
stream_name: {
"replication_key": stream.replication_key,
"replication_key_value": timestamp_value,
"replication_key_value": bookmark_value,
}
}
}
)
stream._write_starting_replication_value(None)
assert stream.replication_key == "updatedAt"
assert stream.replication_method == REPLICATION_INCREMENTAL
assert stream.is_timestamp_replication_key
assert stream.get_starting_timestamp(None) == pendulum.parse(
timestamp_value
), f"Incorrect starting timestamp. Tap state was {dict(tap.state)}"
assert get_starting_value(None) == expected_starting_value


@pytest.mark.parametrize(
Expand Down

0 comments on commit 90cc541

Please sign in to comment.