diff --git a/samples/sample_target_sqlite/__init__.py b/samples/sample_target_sqlite/__init__.py index 0e1062642..7274f51f1 100644 --- a/samples/sample_target_sqlite/__init__.py +++ b/samples/sample_target_sqlite/__init__.py @@ -2,6 +2,8 @@ from __future__ import annotations +import datetime +import sqlite3 import typing as t from singer_sdk import SQLConnector, SQLSink, SQLTarget @@ -10,6 +12,46 @@ DB_PATH_CONFIG = "path_to_db" +def adapt_date_iso(val): + """Adapt datetime.date to ISO 8601 date.""" + return val.isoformat() + + +def adapt_datetime_iso(val): + """Adapt datetime.datetime to timezone-naive ISO 8601 date.""" + return val.isoformat() + + +def adapt_datetime_epoch(val): + """Adapt datetime.datetime to Unix timestamp.""" + return int(val.timestamp()) + + +sqlite3.register_adapter(datetime.date, adapt_date_iso) +sqlite3.register_adapter(datetime.datetime, adapt_datetime_iso) +sqlite3.register_adapter(datetime.datetime, adapt_datetime_epoch) + + +def convert_date(val): + """Convert ISO 8601 date to datetime.date object.""" + return datetime.date.fromisoformat(val.decode()) + + +def convert_datetime(val): + """Convert ISO 8601 datetime to datetime.datetime object.""" + return datetime.datetime.fromisoformat(val.decode()) + + +def convert_timestamp(val): + """Convert Unix epoch timestamp to datetime.datetime object.""" + return datetime.datetime.fromtimestamp(int(val), tz=datetime.timezone.utc) + + +sqlite3.register_converter("date", convert_date) +sqlite3.register_converter("datetime", convert_datetime) +sqlite3.register_converter("timestamp", convert_timestamp) + + class SQLiteConnector(SQLConnector): """The connector for SQLite. diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index 0f5c42918..a679bfd84 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -144,6 +144,15 @@ description="The default target database schema name to use for all streams.", ), ).to_dict() +ACTIVATE_VERSION_CONFIG = PropertiesList( + Property( + "activate_version", + BooleanType, + default=True, + title="Process `ACTIVATE_VERSION` messages", + description="Whether to process `ACTIVATE_VERSION` messages.", + ), +).to_dict() ADD_RECORD_METADATA_CONFIG = PropertiesList( Property( "add_record_metadata", diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 5070655d4..7d9d24a3c 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -399,6 +399,15 @@ def include_sdc_metadata_properties(self) -> bool: """ return self.config.get("add_record_metadata", False) + @property + def process_activate_version_messages(self) -> bool: + """Check if activate version messages should be processed. + + Returns: + True if activate version messages should be processed. + """ + return self.config.get("activate_version", True) + @property def datetime_error_treatment(self) -> DatetimeErrorTreatmentEnum: """Return a treatment to use for datetime parse errors: ERROR. MAX, or NULL. diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 8907b6201..c30c2084a 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -16,6 +16,7 @@ from singer_sdk.helpers._batch import BaseBatchFileEncoding from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers.capabilities import ( + ACTIVATE_VERSION_CONFIG, ADD_RECORD_METADATA_CONFIG, BATCH_CONFIG, TARGET_BATCH_SIZE_ROWS_CONFIG, @@ -454,6 +455,19 @@ def _process_activate_version_message(self, message_dict: dict) -> None: for stream_map in self.mapper.stream_maps[stream_name]: sink = self.get_sink(stream_map.stream_alias) + if not sink.process_activate_version_messages: + self.logger.warning( + "`ACTIVATE_VERSION` messages are not enabled for '%s'. Ignoring.", + stream_map.stream_alias, + ) + continue + if not sink.include_sdc_metadata_properties: + self.logger.warning( + "The `ACTIVATE_VERSION` feature uses the `_sdc_deleted_at` and " + "`_sdc_deleted_at` metadata properties so they will be added to " + "the schema for '%s' even though `add_record_metadata` is " + "disabled.", + ) sink.activate_version(message_dict["version"]) def _process_batch_message(self, message_dict: dict) -> None: @@ -621,6 +635,9 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: capabilities = cls.capabilities + if PluginCapabilities.ACTIVATE_VERSION in capabilities: + _merge_missing(ACTIVATE_VERSION_CONFIG, config_jsonschema) + if PluginCapabilities.BATCH in capabilities: _merge_missing(BATCH_CONFIG, config_jsonschema) @@ -660,6 +677,7 @@ def capabilities(self) -> list[CapabilitiesEnum]: sql_target_capabilities: list[CapabilitiesEnum] = super().capabilities sql_target_capabilities.extend( [ + PluginCapabilities.ACTIVATE_VERSION, TargetCapabilities.TARGET_SCHEMA, TargetCapabilities.HARD_DELETE, ] diff --git a/tests/samples/conftest.py b/tests/samples/conftest.py index 6580c0d0c..d1bd024d3 100644 --- a/tests/samples/conftest.py +++ b/tests/samples/conftest.py @@ -15,7 +15,7 @@ @pytest.fixture def csv_config(outdir: str) -> dict: """Get configuration dictionary for target-csv.""" - return {"target_folder": outdir} + return {"target_folder": outdir, "add_record_metadata": False} @pytest.fixture diff --git a/tests/samples/test_target_sqlite.py b/tests/samples/test_target_sqlite.py index 4f6d54e60..bb659763d 100644 --- a/tests/samples/test_target_sqlite.py +++ b/tests/samples/test_target_sqlite.py @@ -30,6 +30,23 @@ from singer_sdk.target_base import SQLTarget +def get_table(config: dict, table_name: str) -> sa.Table: + """Get SQLAlchemy metadata and table for inspection. + + Args: + config: Target configuration dictionary containing database path + table_name: Name of the table to inspect + + Returns: + Tuple of (metadata, table) + """ + db_path = config["path_to_db"] + engine = sa.create_engine(f"sqlite:///{db_path}") + meta = sa.MetaData() + meta.reflect(bind=engine) + return meta.tables[table_name] + + @pytest.fixture def path_to_target_db(tmp_path: Path) -> Path: return Path(f"{tmp_path}/target_test.db") @@ -50,7 +67,27 @@ def sqlite_sample_target(sqlite_target_test_config): @pytest.fixture def sqlite_sample_target_hard_delete(sqlite_target_test_config): """Get a sample target object with hard_delete disabled.""" - return SQLiteTarget(config={**sqlite_target_test_config, "hard_delete": True}) + return SQLiteTarget( + config={ + **sqlite_target_test_config, + "hard_delete": True, + "add_record_metadata": False, + } + ) + + +@pytest.fixture +def sqlite_sample_target_no_activate_version(sqlite_target_test_config): + """Get a sample target object with hard_delete disabled.""" + return SQLiteTarget(config={**sqlite_target_test_config, "activate_version": False}) + + +@pytest.fixture +def sqlite_target_add_record_metadata(sqlite_target_test_config): + """Get a sample target object with add_record_metadata enabled.""" + return SQLiteTarget( + config={**sqlite_target_test_config, "add_record_metadata": True} + ) @pytest.fixture @@ -268,6 +305,108 @@ def test_sqlite_activate_version( finalize=True, ) + # Check that the record metadata was added + table = get_table(sqlite_sample_target_hard_delete.config, test_tbl) + + assert "_sdc_table_version" in table.columns + assert type(table.columns["_sdc_table_version"].type) is sa.INTEGER + + assert "_sdc_deleted_at" in table.columns + assert type(table.columns["_sdc_deleted_at"].type) is sa.DATETIME + + +def test_sqlite_no_activate_version( + sqlite_sample_target_no_activate_version: SQLTarget, +): + """Test handling the activate_version message for the SQLite target. + + Test performs the following actions: + + - Sends an activate_version message for a table that doesn't exist (which should + have no effect) + """ + test_tbl = f"zzz_tmp_{str(uuid4()).split('-')[-1]}" + schema_msg = { + "type": "SCHEMA", + "stream": test_tbl, + "schema": th.PropertiesList(th.Property("col_a", th.StringType())).to_dict(), + } + + tap_output = "\n".join( + json.dumps(msg) + for msg in [ + schema_msg, + {"type": "ACTIVATE_VERSION", "stream": test_tbl, "version": 12345}, + { + "type": "RECORD", + "stream": test_tbl, + "record": {"col_a": "samplerow1"}, + "version": 12345, + }, + ] + ) + + target_sync_test( + sqlite_sample_target_no_activate_version, + input=StringIO(tap_output), + finalize=True, + ) + + # Check that the record metadata was added + table = get_table(sqlite_sample_target_no_activate_version.config, test_tbl) + + assert "col_a" in table.columns + assert "_sdc_table_version" not in table.columns + assert "_sdc_deleted_at" not in table.columns + + +def test_sqlite_add_record_metadata(sqlite_target_add_record_metadata: SQLTarget): + """Test handling the activate_version message for the SQLite target. + + Test performs the following actions: + + - Sends an activate_version message for a table that doesn't exist (which should + have no effect) + """ + test_tbl = f"zzz_tmp_{str(uuid4()).split('-')[-1]}" + schema_msg = { + "type": "SCHEMA", + "stream": test_tbl, + "schema": th.PropertiesList(th.Property("col_a", th.StringType())).to_dict(), + } + + tap_output = "\n".join( + json.dumps(msg) + for msg in [ + schema_msg, + {"type": "ACTIVATE_VERSION", "stream": test_tbl, "version": 12345}, + { + "type": "RECORD", + "stream": test_tbl, + "record": {"col_a": "samplerow1"}, + "version": 12345, + }, + ] + ) + + target_sync_test( + sqlite_target_add_record_metadata, + input=StringIO(tap_output), + finalize=True, + ) + + # Check that the record metadata was added + table = get_table(sqlite_target_add_record_metadata.config, test_tbl) + + assert "_sdc_received_at" in table.columns + assert type(table.columns["_sdc_received_at"].type) is sa.DATETIME + + assert "_sdc_sync_started_at" in table.columns + assert type(table.columns["_sdc_sync_started_at"].type) is sa.INTEGER + + assert "_sdc_table_version" in table.columns + assert type(table.columns["_sdc_table_version"].type) is sa.INTEGER + def test_sqlite_column_morph(sqlite_sample_target: SQLTarget): """End-to-end-to-end test for SQLite tap and target.