Skip to content

Commit

Permalink
feat(targets): Added a new built-in setting activate_version for ta…
Browse files Browse the repository at this point in the history
…rgets to optionally disable processing of `ACTIVATE_VERSION` messages (#2784)

* feat(targets): Added a new built-in setting `activate_version` for targets to optionally disable processing of `ACTIVATE_VERSION` messages

* Update capabilities

* Default `activate_version` to `True`

* Default to enabling SDC columns

* Revert "Default to enabling SDC columns"

This reverts commit 12fb01a.

* Enable SDC metadata in SQLite tests

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Make Ruff happy

* Test specifically for _sdc attributes

* Test _sdc_table_version

* Use `add_record_metadata` with hard delete

* Do not make a breaking change

* Test with activate version disabled

* DRY

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
edgarrmondragon and pre-commit-ci[bot] authored Dec 2, 2024
1 parent a553c38 commit 0ac558a
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 2 deletions.
42 changes: 42 additions & 0 deletions samples/sample_target_sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import datetime
import sqlite3
import typing as t

from singer_sdk import SQLConnector, SQLSink, SQLTarget
Expand All @@ -10,6 +12,46 @@
DB_PATH_CONFIG = "path_to_db"


def adapt_date_iso(val):
"""Adapt datetime.date to ISO 8601 date."""
return val.isoformat()


def adapt_datetime_iso(val):
"""Adapt datetime.datetime to timezone-naive ISO 8601 date."""
return val.isoformat()


def adapt_datetime_epoch(val):
"""Adapt datetime.datetime to Unix timestamp."""
return int(val.timestamp())


sqlite3.register_adapter(datetime.date, adapt_date_iso)
sqlite3.register_adapter(datetime.datetime, adapt_datetime_iso)
sqlite3.register_adapter(datetime.datetime, adapt_datetime_epoch)


def convert_date(val):
"""Convert ISO 8601 date to datetime.date object."""
return datetime.date.fromisoformat(val.decode())


def convert_datetime(val):
"""Convert ISO 8601 datetime to datetime.datetime object."""
return datetime.datetime.fromisoformat(val.decode())


def convert_timestamp(val):
"""Convert Unix epoch timestamp to datetime.datetime object."""
return datetime.datetime.fromtimestamp(int(val), tz=datetime.timezone.utc)


sqlite3.register_converter("date", convert_date)
sqlite3.register_converter("datetime", convert_datetime)
sqlite3.register_converter("timestamp", convert_timestamp)


class SQLiteConnector(SQLConnector):
"""The connector for SQLite.
Expand Down
9 changes: 9 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@
description="The default target database schema name to use for all streams.",
),
).to_dict()
ACTIVATE_VERSION_CONFIG = PropertiesList(
Property(
"activate_version",
BooleanType,
default=True,
title="Process `ACTIVATE_VERSION` messages",
description="Whether to process `ACTIVATE_VERSION` messages.",
),
).to_dict()
ADD_RECORD_METADATA_CONFIG = PropertiesList(
Property(
"add_record_metadata",
Expand Down
9 changes: 9 additions & 0 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,15 @@ def include_sdc_metadata_properties(self) -> bool:
"""
return self.config.get("add_record_metadata", False)

@property
def process_activate_version_messages(self) -> bool:
"""Check if activate version messages should be processed.
Returns:
True if activate version messages should be processed.
"""
return self.config.get("activate_version", True)

@property
def datetime_error_treatment(self) -> DatetimeErrorTreatmentEnum:
"""Return a treatment to use for datetime parse errors: ERROR. MAX, or NULL.
Expand Down
18 changes: 18 additions & 0 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from singer_sdk.helpers._batch import BaseBatchFileEncoding
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers.capabilities import (
ACTIVATE_VERSION_CONFIG,
ADD_RECORD_METADATA_CONFIG,
BATCH_CONFIG,
TARGET_BATCH_SIZE_ROWS_CONFIG,
Expand Down Expand Up @@ -454,6 +455,19 @@ def _process_activate_version_message(self, message_dict: dict) -> None:

for stream_map in self.mapper.stream_maps[stream_name]:
sink = self.get_sink(stream_map.stream_alias)
if not sink.process_activate_version_messages:
self.logger.warning(
"`ACTIVATE_VERSION` messages are not enabled for '%s'. Ignoring.",
stream_map.stream_alias,
)
continue
if not sink.include_sdc_metadata_properties:
self.logger.warning(
"The `ACTIVATE_VERSION` feature uses the `_sdc_deleted_at` and "
"`_sdc_deleted_at` metadata properties so they will be added to "
"the schema for '%s' even though `add_record_metadata` is "
"disabled.",
)
sink.activate_version(message_dict["version"])

def _process_batch_message(self, message_dict: dict) -> None:
Expand Down Expand Up @@ -621,6 +635,9 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:

capabilities = cls.capabilities

if PluginCapabilities.ACTIVATE_VERSION in capabilities:
_merge_missing(ACTIVATE_VERSION_CONFIG, config_jsonschema)

if PluginCapabilities.BATCH in capabilities:
_merge_missing(BATCH_CONFIG, config_jsonschema)

Expand Down Expand Up @@ -660,6 +677,7 @@ def capabilities(self) -> list[CapabilitiesEnum]:
sql_target_capabilities: list[CapabilitiesEnum] = super().capabilities
sql_target_capabilities.extend(
[
PluginCapabilities.ACTIVATE_VERSION,
TargetCapabilities.TARGET_SCHEMA,
TargetCapabilities.HARD_DELETE,
]
Expand Down
2 changes: 1 addition & 1 deletion tests/samples/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
@pytest.fixture
def csv_config(outdir: str) -> dict:
"""Get configuration dictionary for target-csv."""
return {"target_folder": outdir}
return {"target_folder": outdir, "add_record_metadata": False}


@pytest.fixture
Expand Down
141 changes: 140 additions & 1 deletion tests/samples/test_target_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,23 @@
from singer_sdk.target_base import SQLTarget


def get_table(config: dict, table_name: str) -> sa.Table:
"""Get SQLAlchemy metadata and table for inspection.
Args:
config: Target configuration dictionary containing database path
table_name: Name of the table to inspect
Returns:
Tuple of (metadata, table)
"""
db_path = config["path_to_db"]
engine = sa.create_engine(f"sqlite:///{db_path}")
meta = sa.MetaData()
meta.reflect(bind=engine)
return meta.tables[table_name]


@pytest.fixture
def path_to_target_db(tmp_path: Path) -> Path:
return Path(f"{tmp_path}/target_test.db")
Expand All @@ -50,7 +67,27 @@ def sqlite_sample_target(sqlite_target_test_config):
@pytest.fixture
def sqlite_sample_target_hard_delete(sqlite_target_test_config):
"""Get a sample target object with hard_delete disabled."""
return SQLiteTarget(config={**sqlite_target_test_config, "hard_delete": True})
return SQLiteTarget(
config={
**sqlite_target_test_config,
"hard_delete": True,
"add_record_metadata": False,
}
)


@pytest.fixture
def sqlite_sample_target_no_activate_version(sqlite_target_test_config):
"""Get a sample target object with hard_delete disabled."""
return SQLiteTarget(config={**sqlite_target_test_config, "activate_version": False})


@pytest.fixture
def sqlite_target_add_record_metadata(sqlite_target_test_config):
"""Get a sample target object with add_record_metadata enabled."""
return SQLiteTarget(
config={**sqlite_target_test_config, "add_record_metadata": True}
)


@pytest.fixture
Expand Down Expand Up @@ -268,6 +305,108 @@ def test_sqlite_activate_version(
finalize=True,
)

# Check that the record metadata was added
table = get_table(sqlite_sample_target_hard_delete.config, test_tbl)

assert "_sdc_table_version" in table.columns
assert type(table.columns["_sdc_table_version"].type) is sa.INTEGER

assert "_sdc_deleted_at" in table.columns
assert type(table.columns["_sdc_deleted_at"].type) is sa.DATETIME


def test_sqlite_no_activate_version(
sqlite_sample_target_no_activate_version: SQLTarget,
):
"""Test handling the activate_version message for the SQLite target.
Test performs the following actions:
- Sends an activate_version message for a table that doesn't exist (which should
have no effect)
"""
test_tbl = f"zzz_tmp_{str(uuid4()).split('-')[-1]}"
schema_msg = {
"type": "SCHEMA",
"stream": test_tbl,
"schema": th.PropertiesList(th.Property("col_a", th.StringType())).to_dict(),
}

tap_output = "\n".join(
json.dumps(msg)
for msg in [
schema_msg,
{"type": "ACTIVATE_VERSION", "stream": test_tbl, "version": 12345},
{
"type": "RECORD",
"stream": test_tbl,
"record": {"col_a": "samplerow1"},
"version": 12345,
},
]
)

target_sync_test(
sqlite_sample_target_no_activate_version,
input=StringIO(tap_output),
finalize=True,
)

# Check that the record metadata was added
table = get_table(sqlite_sample_target_no_activate_version.config, test_tbl)

assert "col_a" in table.columns
assert "_sdc_table_version" not in table.columns
assert "_sdc_deleted_at" not in table.columns


def test_sqlite_add_record_metadata(sqlite_target_add_record_metadata: SQLTarget):
"""Test handling the activate_version message for the SQLite target.
Test performs the following actions:
- Sends an activate_version message for a table that doesn't exist (which should
have no effect)
"""
test_tbl = f"zzz_tmp_{str(uuid4()).split('-')[-1]}"
schema_msg = {
"type": "SCHEMA",
"stream": test_tbl,
"schema": th.PropertiesList(th.Property("col_a", th.StringType())).to_dict(),
}

tap_output = "\n".join(
json.dumps(msg)
for msg in [
schema_msg,
{"type": "ACTIVATE_VERSION", "stream": test_tbl, "version": 12345},
{
"type": "RECORD",
"stream": test_tbl,
"record": {"col_a": "samplerow1"},
"version": 12345,
},
]
)

target_sync_test(
sqlite_target_add_record_metadata,
input=StringIO(tap_output),
finalize=True,
)

# Check that the record metadata was added
table = get_table(sqlite_target_add_record_metadata.config, test_tbl)

assert "_sdc_received_at" in table.columns
assert type(table.columns["_sdc_received_at"].type) is sa.DATETIME

assert "_sdc_sync_started_at" in table.columns
assert type(table.columns["_sdc_sync_started_at"].type) is sa.INTEGER

assert "_sdc_table_version" in table.columns
assert type(table.columns["_sdc_table_version"].type) is sa.INTEGER


def test_sqlite_column_morph(sqlite_sample_target: SQLTarget):
"""End-to-end-to-end test for SQLite tap and target.
Expand Down

0 comments on commit 0ac558a

Please sign in to comment.