From fe92e4567c19dc9ce6e94b2d92b2557c619c89e9 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 13 Dec 2023 02:08:25 +0100 Subject: [PATCH 1/8] chore: Add `.idea` to `.gitignore` --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 56313116..4f8c62d2 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ smoke-test .meltano/** .tox/** .secrets/** +.idea .vscode/** output/** .env From c0d4a8901c2d71fbeb2bd8cd888113df409f74ca Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 13 Dec 2023 02:07:57 +0100 Subject: [PATCH 2/8] test: Add test cases for arrays and objects In PostgreSQL, all boils down to the `jsonb[]` type, but arrays are reflected as `sqlalchemy.dialects.postgresql.ARRAY` instead of `sqlalchemy.dialects.postgresql.JSONB`. In order to prepare for more advanced type mangling & validation, and to better support databases pretending to be compatible with PostgreSQL, the new test cases exercise arrays with different kinds of inner values, because, on other databases, ARRAYs may need to have uniform content. Along the lines, it adds a `verify_schema` utility function in the spirit of the `verify_data` function, refactored and generalized from the `test_anyof` test case. --- .../tests/data_files/array_boolean.singer | 5 + .../tests/data_files/array_data.singer | 6 - .../tests/data_files/array_number.singer | 5 + .../tests/data_files/array_string.singer | 6 + .../tests/data_files/array_timestamp.singer | 5 + .../tests/data_files/object_mixed.singer | 3 + target_postgres/tests/test_target_postgres.py | 167 +++++++++++++++--- 7 files changed, 162 insertions(+), 35 deletions(-) create mode 100644 target_postgres/tests/data_files/array_boolean.singer delete mode 100644 target_postgres/tests/data_files/array_data.singer create mode 100644 target_postgres/tests/data_files/array_number.singer create mode 100644 target_postgres/tests/data_files/array_string.singer create mode 100644 target_postgres/tests/data_files/array_timestamp.singer create mode 100644 target_postgres/tests/data_files/object_mixed.singer diff --git a/target_postgres/tests/data_files/array_boolean.singer b/target_postgres/tests/data_files/array_boolean.singer new file mode 100644 index 00000000..268a64a0 --- /dev/null +++ b/target_postgres/tests/data_files/array_boolean.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_boolean", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "boolean"}}}}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 1, "value": [ true, false ]}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 2, "value": [ false ]}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 3, "value": [ false, true, true, false ]}} +{"type": "STATE", "value": {"array_boolean": 3}} diff --git a/target_postgres/tests/data_files/array_data.singer b/target_postgres/tests/data_files/array_data.singer deleted file mode 100644 index 0d132ac6..00000000 --- a/target_postgres/tests/data_files/array_data.singer +++ /dev/null @@ -1,6 +0,0 @@ -{"type": "SCHEMA", "stream": "test_carts", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "fruits": {"type": "array","items": {"type": "string"}}}}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 1, "fruits": [ "apple", "orange", "pear" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 2, "fruits": [ "banana", "apple" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 3, "fruits": [ "pear" ]}} -{"type": "RECORD", "stream": "test_carts", "record": {"id": 4, "fruits": [ "orange", "banana", "apple", "pear" ]}} -{"type": "STATE", "value": {"test_carts": 4}} diff --git a/target_postgres/tests/data_files/array_number.singer b/target_postgres/tests/data_files/array_number.singer new file mode 100644 index 00000000..4eac276e --- /dev/null +++ b/target_postgres/tests/data_files/array_number.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_number", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "number"}}}}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 1, "value": [ 42.42, 84.84, 23 ]}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 2, "value": [ 1.0 ]}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 3, "value": [ 1.11, 2.22, 3, 4, 5.55 ]}} +{"type": "STATE", "value": {"array_number": 3}} diff --git a/target_postgres/tests/data_files/array_string.singer b/target_postgres/tests/data_files/array_string.singer new file mode 100644 index 00000000..f14e7870 --- /dev/null +++ b/target_postgres/tests/data_files/array_string.singer @@ -0,0 +1,6 @@ +{"type": "SCHEMA", "stream": "array_string", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array","items": {"type": "string"}}}}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 1, "value": [ "apple", "orange", "pear" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 2, "value": [ "banana", "apple" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 3, "value": [ "pear" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 4, "value": [ "orange", "banana", "apple", "pear" ]}} +{"type": "STATE", "value": {"array_string": 4}} diff --git a/target_postgres/tests/data_files/array_timestamp.singer b/target_postgres/tests/data_files/array_timestamp.singer new file mode 100644 index 00000000..e5192cec --- /dev/null +++ b/target_postgres/tests/data_files/array_timestamp.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_timestamp", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "string", "format": "date-time"}}}}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 1, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02" ]}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 2, "value": [ "2023-12-13T01:15:02" ]}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 3, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02", "2023-12-13T01:17:02" ]}} +{"type": "STATE", "value": {"array_timestamp": 3}} diff --git a/target_postgres/tests/data_files/object_mixed.singer b/target_postgres/tests/data_files/object_mixed.singer new file mode 100644 index 00000000..2ed86261 --- /dev/null +++ b/target_postgres/tests/data_files/object_mixed.singer @@ -0,0 +1,3 @@ +{"type": "SCHEMA", "stream": "object_mixed", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "object"}}}} +{"type": "RECORD", "stream": "object_mixed", "record": {"id": 1, "value": {"string": "foo", "integer": 42, "float": 42.42, "timestamp": "2023-12-13T01:15:02", "array_boolean": [true, false], "array_float": [42.42, 84.84], "array_integer": [42, 84], "array_string": ["foo", "bar"], "nested_object": {"foo": "bar"}}}} +{"type": "STATE", "value": {"object_mixed": 1}} diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index 1eaa9978..ab4cd11d 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -11,8 +11,8 @@ import sqlalchemy from singer_sdk.exceptions import MissingKeyPropertiesError from singer_sdk.testing import get_target_test_class, sync_end_to_end -from sqlalchemy.dialects.postgresql import ARRAY -from sqlalchemy.types import TEXT, TIMESTAMP +from sqlalchemy.dialects.postgresql import ARRAY, JSONB +from sqlalchemy.types import BIGINT, TEXT, TIMESTAMP from target_postgres.connector import PostgresConnector from target_postgres.target import TargetPostgres @@ -94,7 +94,7 @@ def verify_data( Args: target: The target to obtain a database connection from. - full_table_name: The schema and table name of the table to check data for. + table_name: The schema and table name of the table to check data for. primary_key: The primary key of the table. number_of_rows: The expected number of rows that should be in the table. check_data: A dictionary representing the full contents of the first row in the @@ -134,6 +134,43 @@ def verify_data( assert result.first()[0] == number_of_rows +def verify_schema( + target: TargetPostgres, + table_name: str, + check_columns: dict = None, +): + """Checks whether the schema of a database table matches the provided column definitions. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + check_columns: A dictionary mapping column names to their definitions. Currently, + it is all about the `type` attribute which is compared. + """ + engine = create_engine(target) + schema = target.config["default_target_schema"] + with engine.connect() as connection: + meta = sqlalchemy.MetaData() + table = sqlalchemy.Table( + table_name, meta, schema=schema, autoload_with=connection + ) + for column in table.c: + # Ignore `_sdc` columns for now. + if column.name.startswith("_sdc"): + continue + try: + column_type_expected = check_columns[column.name]["type"] + except KeyError: + raise ValueError( + f"Invalid check_columns - missing definition for column: {column.name}" + ) + if not isinstance(column.type, column_type_expected): + raise TypeError( + f"Column '{column.name}' (with type '{column.type}') " + f"does not match expected type: {column_type_expected}" + ) + + def test_sqlalchemy_url_config(postgres_config_no_ssl): """Be sure that passing a sqlalchemy_url works @@ -406,11 +443,92 @@ def test_duplicate_records(postgres_target): verify_data(postgres_target, "test_duplicate_records", 2, "id", row) -def test_array_data(postgres_target): - file_name = "array_data.singer" +def test_array_boolean(postgres_target): + file_name = "array_boolean.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": [True, False]} + verify_data(postgres_target, "array_boolean", 3, "id", row) + verify_schema( + postgres_target, + "array_boolean", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_array_number(postgres_target): + file_name = "array_number.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": [Decimal("42.42"), Decimal("84.84"), 23]} + verify_data(postgres_target, "array_number", 3, "id", row) + verify_schema( + postgres_target, + "array_number", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_array_string(postgres_target): + file_name = "array_string.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": ["apple", "orange", "pear"]} + verify_data(postgres_target, "array_string", 4, "id", row) + verify_schema( + postgres_target, + "array_string", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_array_timestamp(postgres_target): + file_name = "array_timestamp.singer" + singer_file_to_target(file_name, postgres_target) + row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} + verify_data(postgres_target, "array_timestamp", 3, "id", row) + verify_schema( + postgres_target, + "array_timestamp", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": ARRAY}, + }, + ) + + +def test_object_mixed(postgres_target): + file_name = "object_mixed.singer" singer_file_to_target(file_name, postgres_target) - row = {"id": 1, "fruits": ["apple", "orange", "pear"]} - verify_data(postgres_target, "test_carts", 4, "id", row) + row = { + "id": 1, + "value": { + "string": "foo", + "integer": 42, + "float": Decimal("42.42"), + "timestamp": "2023-12-13T01:15:02", + "array_boolean": [True, False], + "array_float": [Decimal("42.42"), Decimal("84.84")], + "array_integer": [42, 84], + "array_string": ["foo", "bar"], + "nested_object": {"foo": "bar"}, + }, + } + verify_data(postgres_target, "object_mixed", 1, "id", row) + verify_schema( + postgres_target, + "object_mixed", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": JSONB}, + }, + ) def test_encoded_string_data(postgres_target): @@ -456,41 +574,32 @@ def test_large_int(postgres_target): def test_anyof(postgres_target): """Test that anyOf is handled correctly""" - engine = create_engine(postgres_target) table_name = "commits" file_name = f"{table_name}.singer" - schema = postgres_target.config["default_target_schema"] singer_file_to_target(file_name, postgres_target) - with engine.connect() as connection: - meta = sqlalchemy.MetaData() - table = sqlalchemy.Table( - "commits", meta, schema=schema, autoload_with=connection - ) - for column in table.c: - # {"type":"string"} - if column.name == "id": - assert isinstance(column.type, TEXT) + verify_schema( + postgres_target, + table_name, + check_columns={ + # {"type":"string"} + "id": {"type": TEXT}, # Any of nullable date-time. # Note that postgres timestamp is equivalent to jsonschema date-time. # {"anyOf":[{"type":"string","format":"date-time"},{"type":"null"}]} - if column.name in {"authored_date", "committed_date"}: - assert isinstance(column.type, TIMESTAMP) - + "authored_date": {"type": TIMESTAMP}, + "committed_date": {"type": TIMESTAMP}, # Any of nullable array of strings or single string. # {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]} - if column.name == "parent_ids": - assert isinstance(column.type, ARRAY) - + "parent_ids": {"type": ARRAY}, # Any of nullable string. # {"anyOf":[{"type":"string"},{"type":"null"}]} - if column.name == "commit_message": - assert isinstance(column.type, TEXT) - + "commit_message": {"type": TEXT}, # Any of nullable string or integer. # {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]} - if column.name == "legacy_id": - assert isinstance(column.type, TEXT) + "legacy_id": {"type": TEXT}, + }, + ) def test_new_array_column(postgres_target): From 8b3ea4f55c3d3e69df5f7db7c9d95fce95317308 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 19 Dec 2023 19:05:45 +0100 Subject: [PATCH 3/8] test: Fix `FATAL: sorry, too many clients already` Dispose the SQLAlchemy engine object after use within test utility functions. --- target_postgres/tests/test_target_postgres.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index ab4cd11d..f676f8f1 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -132,6 +132,7 @@ def verify_data( sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") ) assert result.first()[0] == number_of_rows + engine.dispose() def verify_schema( @@ -169,6 +170,7 @@ def verify_schema( f"Column '{column.name}' (with type '{column.type}') " f"does not match expected type: {column_type_expected}" ) + engine.dispose() def test_sqlalchemy_url_config(postgres_config_no_ssl): From a9d179694aa8b51290928865da7f267ed27e44cc Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 19 Dec 2023 20:10:36 +0100 Subject: [PATCH 4/8] test: Fix `FATAL: sorry, too many clients already` Within `BasePostgresSDKTests`, new database connections via SQLAlchemy haven't been closed, and started filling up the connection pool, eventually saturating it. --- target_postgres/tests/test_sdk.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/target_postgres/tests/test_sdk.py b/target_postgres/tests/test_sdk.py index 3f95c393..5d4207ad 100644 --- a/target_postgres/tests/test_sdk.py +++ b/target_postgres/tests/test_sdk.py @@ -61,7 +61,9 @@ class BasePostgresSDKTests: @pytest.fixture() def connection(self, runner): engine = create_engine(runner) - return engine.connect() + with engine.connect() as connection: + yield connection + engine.dispose() SDKTests = get_target_test_class( From 723e1fa699b188a3d8e3ccd5589e6f64cfacdea3 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 19 Dec 2023 21:31:43 +0100 Subject: [PATCH 5/8] test: Fix `FATAL: sorry, too many clients already` Dispose the SQLAlchemy engine object after use within `PostgresConnector`. --- target_postgres/connector.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index d6730539..369eb462 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -180,8 +180,10 @@ def copy_table_structure( @contextmanager def _connect(self) -> t.Iterator[sqlalchemy.engine.Connection]: - with self._engine.connect().execution_options() as conn: + engine = self._engine + with engine.connect().execution_options() as conn: yield conn + engine.dispose() def drop_table( self, table: sqlalchemy.Table, connection: sqlalchemy.engine.Connection From cf732e1d905bd49deeb98313bb40f8c3d7af38a3 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 14 Dec 2023 21:23:20 +0100 Subject: [PATCH 6/8] chore: Fix parameter names in docstrings --- target_postgres/connector.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 369eb462..59314d60 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -332,7 +332,7 @@ def create_empty_table( # type: ignore[override] """Create an empty target table. Args: - full_table_name: the target table name. + table_name: the target table name. schema: the JSON schema for the new table. primary_keys: list of key properties. partition_keys: list of partition keys. @@ -427,7 +427,7 @@ def _create_empty_column( # type: ignore[override] """Create a new column. Args: - full_table_name: The target table name. + table_name: The target table name. column_name: The name of the new column. sql_type: SQLAlchemy type engine to be used in creating the new column. @@ -491,7 +491,7 @@ def _adapt_column_type( # type: ignore[override] """Adapt table column type to support the new JSON schema type. Args: - full_table_name: The target table name. + table_name: The target table name. column_name: The target column name. sql_type: The new SQLAlchemy type. @@ -722,7 +722,7 @@ def _get_column_type( # type: ignore[override] """Get the SQL type of the declared column. Args: - full_table_name: The name of the table. + table_name: The name of the table. column_name: The name of the column. Returns: From 860baf44ae72a58d1165c313de0515e2ff3a1f4e Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 14 Dec 2023 21:27:48 +0100 Subject: [PATCH 7/8] test: Refactor utility functions `verify_data` and `verify_schema` By wrapping them into a container class `AssertionHelper`, it is easy to parameterize them, and to provide them to the test functions using a pytest fixture. This way, they are reusable from database adapter implementations which derive from PostgreSQL. The motivation for this is because the metadata column prefix `_sdc` needs to be adjusted for other database systems, as they reject such columns, being reserved for system purposes. In the specific case of CrateDB, it is enough to rename it like `__sdc`. Sad but true. --- target_postgres/tests/test_target_postgres.py | 272 +++++++++--------- 1 file changed, 140 insertions(+), 132 deletions(-) diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index f676f8f1..d20fcf0a 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -28,6 +28,8 @@ postgres_config_ssh_tunnel, ) +METADATA_COLUMN_PREFIX = "_sdc" + # The below syntax is documented at https://docs.pytest.org/en/stable/deprecations.html#calling-fixtures-directly @pytest.fixture(scope="session", name="postgres_config") @@ -75,102 +77,114 @@ def singer_file_to_target(file_name, target) -> None: # TODO should set schemas for each tap individually so we don't collide -def remove_metadata_columns(row: dict) -> dict: - new_row = {} - for column in row.keys(): - if not column.startswith("_sdc"): - new_row[column] = row[column] - return new_row - - -def verify_data( - target: TargetPostgres, - table_name: str, - number_of_rows: int = 1, - primary_key: str | None = None, - check_data: dict | list[dict] | None = None, -): - """Checks whether the data in a table matches a provided data sample. - - Args: - target: The target to obtain a database connection from. - table_name: The schema and table name of the table to check data for. - primary_key: The primary key of the table. - number_of_rows: The expected number of rows that should be in the table. - check_data: A dictionary representing the full contents of the first row in the - table, as determined by lowest primary_key value, or else a list of - dictionaries representing every row in the table. - """ - engine = create_engine(target) - full_table_name = f"{target.config['default_target_schema']}.{table_name}" - with engine.connect() as connection: - if primary_key is not None and check_data is not None: - if isinstance(check_data, dict): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" +class AssertionHelper: + def __init__(self, target: TargetPostgres, metadata_column_prefix: str): + self.target = target + self.metadata_column_prefix = metadata_column_prefix + self.engine = create_engine(self.target) + + def remove_metadata_columns(self, row: dict) -> dict: + new_row = {} + for column in row.keys(): + if not column.startswith(self.metadata_column_prefix): + new_row[column] = row[column] + return new_row + + def verify_data( + self, + table_name: str, + number_of_rows: int = 1, + primary_key: str | None = None, + check_data: dict | list[dict] | None = None, + ): + """Checks whether the data in a table matches a provided data sample. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + primary_key: The primary key of the table. + number_of_rows: The expected number of rows that should be in the table. + check_data: A dictionary representing the full contents of the first row in the + table, as determined by lowest primary_key value, or else a list of + dictionaries representing every row in the table. + """ + full_table_name = f"{self.target.config['default_target_schema']}.{table_name}" + with self.engine.connect() as connection: + if primary_key is not None and check_data is not None: + if isinstance(check_data, dict): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) ) - ) - assert result.rowcount == number_of_rows - result_dict = remove_metadata_columns(result.first()._asdict()) - assert result_dict == check_data - elif isinstance(check_data, list): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + assert result.rowcount == number_of_rows + result_dict = self.remove_metadata_columns(result.first()._asdict()) + assert result_dict == check_data + elif isinstance(check_data, list): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) ) - ) - assert result.rowcount == number_of_rows - result_dict = [ - remove_metadata_columns(row._asdict()) for row in result.all() - ] - assert result_dict == check_data + assert result.rowcount == number_of_rows + result_dict = [ + self.remove_metadata_columns(row._asdict()) + for row in result.all() + ] + assert result_dict == check_data + else: + raise ValueError("Invalid check_data - not dict or list of dicts") else: - raise ValueError("Invalid check_data - not dict or list of dicts") - else: - result = connection.execute( - sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + result = connection.execute( + sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + ) + assert result.first()[0] == number_of_rows + + def verify_schema( + self, + table_name: str, + check_columns: dict = None, + ): + """Checks whether the schema of a database table matches the provided column definitions. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + check_columns: A dictionary mapping column names to their definitions. Currently, + it is all about the `type` attribute which is compared. + metadata_column_prefix: The prefix string for metadata columns. Usually `_sdc`. + """ + schema = self.target.config["default_target_schema"] + with self.engine.connect() as connection: + meta = sqlalchemy.MetaData() + table = sqlalchemy.Table( + table_name, meta, schema=schema, autoload_with=connection ) - assert result.first()[0] == number_of_rows - engine.dispose() + for column in table.c: + # Ignore `_sdc` metadata columns when veriying table schema. + if column.name.startswith(self.metadata_column_prefix): + continue + try: + column_type_expected = check_columns[column.name]["type"] + except KeyError: + raise ValueError( + f"Invalid check_columns - missing definition for column: {column.name}" + ) + if not isinstance(column.type, column_type_expected): + raise TypeError( + f"Column '{column.name}' (with type '{column.type}') " + f"does not match expected type: {column_type_expected}" + ) + def __del__(self): + self.engine.dispose() -def verify_schema( - target: TargetPostgres, - table_name: str, - check_columns: dict = None, -): - """Checks whether the schema of a database table matches the provided column definitions. - Args: - target: The target to obtain a database connection from. - table_name: The schema and table name of the table to check data for. - check_columns: A dictionary mapping column names to their definitions. Currently, - it is all about the `type` attribute which is compared. - """ - engine = create_engine(target) - schema = target.config["default_target_schema"] - with engine.connect() as connection: - meta = sqlalchemy.MetaData() - table = sqlalchemy.Table( - table_name, meta, schema=schema, autoload_with=connection - ) - for column in table.c: - # Ignore `_sdc` columns for now. - if column.name.startswith("_sdc"): - continue - try: - column_type_expected = check_columns[column.name]["type"] - except KeyError: - raise ValueError( - f"Invalid check_columns - missing definition for column: {column.name}" - ) - if not isinstance(column.type, column_type_expected): - raise TypeError( - f"Column '{column.name}' (with type '{column.type}') " - f"does not match expected type: {column_type_expected}" - ) - engine.dispose() +@pytest.fixture +def helper(postgres_target) -> AssertionHelper: + return AssertionHelper( + target=postgres_target, metadata_column_prefix=METADATA_COLUMN_PREFIX + ) def test_sqlalchemy_url_config(postgres_config_no_ssl): @@ -287,11 +301,11 @@ def test_special_chars_in_attributes(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_optional_attributes(postgres_target): +def test_optional_attributes(postgres_target, helper): file_name = "optional_attributes.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "optional": "This is optional"} - verify_data(postgres_target, "test_optional_attributes", 4, "id", row) + helper.verify_data("test_optional_attributes", 4, "id", row) def test_schema_no_properties(postgres_target): @@ -311,7 +325,7 @@ def test_large_numeric_primary_key(postgres_target): # TODO test that data is correct -def test_schema_updates(postgres_target): +def test_schema_updates(postgres_target, helper): file_name = "schema_updates.singer" singer_file_to_target(file_name, postgres_target) row = { @@ -323,16 +337,16 @@ def test_schema_updates(postgres_target): "a5": None, "a6": None, } - verify_data(postgres_target, "test_schema_updates", 6, "id", row) + helper.verify_data("test_schema_updates", 6, "id", row) -def test_multiple_state_messages(postgres_target): +def test_multiple_state_messages(postgres_target, helper): file_name = "multiple_state_messages.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "metric": 100} - verify_data(postgres_target, "test_multiple_state_messages_a", 6, "id", row) + helper.verify_data("test_multiple_state_messages_a", 6, "id", row) row = {"id": 1, "metric": 110} - verify_data(postgres_target, "test_multiple_state_messages_b", 6, "id", row) + helper.verify_data("test_multiple_state_messages_b", 6, "id", row) # TODO test that data is correct @@ -349,7 +363,7 @@ def test_multiple_schema_messages(postgres_target, caplog): assert "Schema has changed for stream" not in caplog.text -def test_relational_data(postgres_target): +def test_relational_data(postgres_target, helper): file_name = "user_location_data.singer" singer_file_to_target(file_name, postgres_target) @@ -406,12 +420,12 @@ def test_relational_data(postgres_target): }, ] - verify_data(postgres_target, "test_users", 8, "id", users) - verify_data(postgres_target, "test_locations", 5, "id", locations) - verify_data(postgres_target, "test_user_in_location", 5, "id", user_in_location) + helper.verify_data("test_users", 8, "id", users) + helper.verify_data("test_locations", 5, "id", locations) + helper.verify_data("test_user_in_location", 5, "id", user_in_location) -def test_no_primary_keys(postgres_target): +def test_no_primary_keys(postgres_target, helper): """We run both of these tests twice just to ensure that no records are removed and append only works properly""" engine = create_engine(postgres_target) table_name = "test_no_pk" @@ -430,7 +444,7 @@ def test_no_primary_keys(postgres_target): file_name = f"{table_name}_append.singer" singer_file_to_target(file_name, postgres_target) - verify_data(postgres_target, table_name, 16) + helper.verify_data(table_name, 16) def test_no_type(postgres_target): @@ -438,20 +452,19 @@ def test_no_type(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_duplicate_records(postgres_target): +def test_duplicate_records(postgres_target, helper): file_name = "duplicate_records.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "metric": 100} - verify_data(postgres_target, "test_duplicate_records", 2, "id", row) + helper.verify_data("test_duplicate_records", 2, "id", row) -def test_array_boolean(postgres_target): +def test_array_boolean(postgres_target, helper): file_name = "array_boolean.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": [True, False]} - verify_data(postgres_target, "array_boolean", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_boolean", 3, "id", row) + helper.verify_schema( "array_boolean", check_columns={ "id": {"type": BIGINT}, @@ -460,13 +473,12 @@ def test_array_boolean(postgres_target): ) -def test_array_number(postgres_target): +def test_array_number(postgres_target, helper): file_name = "array_number.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": [Decimal("42.42"), Decimal("84.84"), 23]} - verify_data(postgres_target, "array_number", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_number", 3, "id", row) + helper.verify_schema( "array_number", check_columns={ "id": {"type": BIGINT}, @@ -475,13 +487,12 @@ def test_array_number(postgres_target): ) -def test_array_string(postgres_target): +def test_array_string(postgres_target, helper): file_name = "array_string.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": ["apple", "orange", "pear"]} - verify_data(postgres_target, "array_string", 4, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_string", 4, "id", row) + helper.verify_schema( "array_string", check_columns={ "id": {"type": BIGINT}, @@ -490,13 +501,12 @@ def test_array_string(postgres_target): ) -def test_array_timestamp(postgres_target): +def test_array_timestamp(postgres_target, helper): file_name = "array_timestamp.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} - verify_data(postgres_target, "array_timestamp", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_timestamp", 3, "id", row) + helper.verify_schema( "array_timestamp", check_columns={ "id": {"type": BIGINT}, @@ -505,7 +515,7 @@ def test_array_timestamp(postgres_target): ) -def test_object_mixed(postgres_target): +def test_object_mixed(postgres_target, helper): file_name = "object_mixed.singer" singer_file_to_target(file_name, postgres_target) row = { @@ -522,9 +532,8 @@ def test_object_mixed(postgres_target): "nested_object": {"foo": "bar"}, }, } - verify_data(postgres_target, "object_mixed", 1, "id", row) - verify_schema( - postgres_target, + helper.verify_data("object_mixed", 1, "id", row) + helper.verify_schema( "object_mixed", check_columns={ "id": {"type": BIGINT}, @@ -533,7 +542,7 @@ def test_object_mixed(postgres_target): ) -def test_encoded_string_data(postgres_target): +def test_encoded_string_data(postgres_target, helper): """ We removed NUL characters from the original encoded_strings.singer as postgres doesn't allow them. https://www.postgresql.org/docs/current/functions-string.html#:~:text=chr(0)%20is%20disallowed%20because%20text%20data%20types%20cannot%20store%20that%20character. @@ -546,11 +555,11 @@ def test_encoded_string_data(postgres_target): file_name = "encoded_strings.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "info": "simple string 2837"} - verify_data(postgres_target, "test_strings", 11, "id", row) + helper.verify_data("test_strings", 11, "id", row) row = {"id": 1, "info": {"name": "simple", "value": "simple string 2837"}} - verify_data(postgres_target, "test_strings_in_objects", 11, "id", row) + helper.verify_data("test_strings_in_objects", 11, "id", row) row = {"id": 1, "strings": ["simple string", "απλή συμβολοσειρά", "简单的字串"]} - verify_data(postgres_target, "test_strings_in_arrays", 6, "id", row) + helper.verify_data("test_strings_in_arrays", 6, "id", row) def test_tap_appl(postgres_target): @@ -574,14 +583,13 @@ def test_large_int(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_anyof(postgres_target): +def test_anyof(postgres_target, helper): """Test that anyOf is handled correctly""" table_name = "commits" file_name = f"{table_name}.singer" singer_file_to_target(file_name, postgres_target) - verify_schema( - postgres_target, + helper.verify_schema( table_name, check_columns={ # {"type":"string"} @@ -690,7 +698,7 @@ def test_activate_version_soft_delete(postgres_target): result = connection.execute( sqlalchemy.text( - f"SELECT * FROM {full_table_name} where _sdc_deleted_at is NOT NULL" + f"SELECT * FROM {full_table_name} where {METADATA_COLUMN_PREFIX}_deleted_at is NOT NULL" ) ) assert result.rowcount == 2 From 1f92935aa79e46f8af1a370afea3947a21b70f76 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sat, 16 Dec 2023 06:15:46 +0100 Subject: [PATCH 8/8] chore: Harmonize imports of sqlalchemy module, use `sa` where applicable It follows a convention to import SQLAlchemy like `import sqlalchemy as sa`. In this spirit, all references, even simple ones like symbols to SQLAlchemy base types like `TEXT`, or `BIGINT`, will be referenced by `sa.TEXT`, `sa.BIGINT`, etc., so it is easy to tell them apart when harmonizing type definitions coming from SA's built-in dialects vs. type definitions coming from 3rd-party dialects. --- target_postgres/connector.py | 163 ++++++++---------- target_postgres/sinks.py | 77 ++++----- target_postgres/tests/core.py | 4 +- target_postgres/tests/test_target_postgres.py | 91 +++++----- 4 files changed, 154 insertions(+), 181 deletions(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 59314d60..927d2ed4 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -12,24 +12,11 @@ import paramiko import simplejson -import sqlalchemy +import sqlalchemy as sa from singer_sdk import SQLConnector from singer_sdk import typing as th from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, JSONB -from sqlalchemy.engine import URL -from sqlalchemy.engine.url import make_url -from sqlalchemy.types import ( - BOOLEAN, - DATE, - DATETIME, - DECIMAL, - INTEGER, - TEXT, - TIME, - TIMESTAMP, - VARCHAR, - TypeDecorator, -) +from sqlalchemy.sql.type_api import TypeEngine from sshtunnel import SSHTunnelForwarder @@ -48,7 +35,7 @@ def __init__(self, config: dict) -> None: Args: config: Configuration for the connector. """ - url: URL = make_url(self.get_sqlalchemy_url(config=config)) + url: sa.engine.URL = sa.engine.make_url(self.get_sqlalchemy_url(config=config)) ssh_config = config.get("ssh_tunnel", {}) self.ssh_tunnel: SSHTunnelForwarder @@ -84,10 +71,10 @@ def prepare_table( # type: ignore[override] full_table_name: str, schema: dict, primary_keys: list[str], - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, partition_keys: list[str] | None = None, as_temp_table: bool = False, - ) -> sqlalchemy.Table: + ) -> sa.Table: """Adapt target table to provided schema if possible. Args: @@ -102,8 +89,8 @@ def prepare_table( # type: ignore[override] The table object. """ _, schema_name, table_name = self.parse_full_table_name(full_table_name) - meta = sqlalchemy.MetaData(schema=schema_name) - table: sqlalchemy.Table + meta = sa.MetaData(schema=schema_name) + table: sa.Table if not self.table_exists(full_table_name=full_table_name): table = self.create_empty_table( table_name=table_name, @@ -144,10 +131,10 @@ def prepare_table( # type: ignore[override] def copy_table_structure( self, full_table_name: str, - from_table: sqlalchemy.Table, - connection: sqlalchemy.engine.Connection, + from_table: sa.Table, + connection: sa.engine.Connection, as_temp_table: bool = False, - ) -> sqlalchemy.Table: + ) -> sa.Table: """Copy table structure. Args: @@ -160,60 +147,56 @@ def copy_table_structure( The new table object. """ _, schema_name, table_name = self.parse_full_table_name(full_table_name) - meta = sqlalchemy.MetaData(schema=schema_name) - new_table: sqlalchemy.Table + meta = sa.MetaData(schema=schema_name) + new_table: sa.Table columns = [] if self.table_exists(full_table_name=full_table_name): raise RuntimeError("Table already exists") for column in from_table.columns: columns.append(column._copy()) if as_temp_table: - new_table = sqlalchemy.Table( - table_name, meta, *columns, prefixes=["TEMPORARY"] - ) + new_table = sa.Table(table_name, meta, *columns, prefixes=["TEMPORARY"]) new_table.create(bind=connection) return new_table else: - new_table = sqlalchemy.Table(table_name, meta, *columns) + new_table = sa.Table(table_name, meta, *columns) new_table.create(bind=connection) return new_table @contextmanager - def _connect(self) -> t.Iterator[sqlalchemy.engine.Connection]: + def _connect(self) -> t.Iterator[sa.engine.Connection]: engine = self._engine with engine.connect().execution_options() as conn: yield conn engine.dispose() - def drop_table( - self, table: sqlalchemy.Table, connection: sqlalchemy.engine.Connection - ): + def drop_table(self, table: sa.Table, connection: sa.engine.Connection): """Drop table data.""" table.drop(bind=connection) def clone_table( self, new_table_name, table, metadata, connection, temp_table - ) -> sqlalchemy.Table: + ) -> sa.Table: """Clone a table.""" new_columns = [] for column in table.columns: new_columns.append( - sqlalchemy.Column( + sa.Column( column.name, column.type, ) ) if temp_table is True: - new_table = sqlalchemy.Table( + new_table = sa.Table( new_table_name, metadata, *new_columns, prefixes=["TEMPORARY"] ) else: - new_table = sqlalchemy.Table(new_table_name, metadata, *new_columns) + new_table = sa.Table(new_table_name, metadata, *new_columns) new_table.create(bind=connection) return new_table @staticmethod - def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: + def to_sql_type(jsonschema_type: dict) -> TypeEngine: """Return a JSON Schema representation of the provided type. By default will call `typing.to_sql_type()`. @@ -282,10 +265,10 @@ def pick_individual_type(jsonschema_type: dict): if "array" in jsonschema_type["type"]: return ARRAY(JSONB()) if jsonschema_type.get("format") == "date-time": - return TIMESTAMP() + return sa.TIMESTAMP() individual_type = th.to_sql_type(jsonschema_type) - if isinstance(individual_type, VARCHAR): - return TEXT() + if isinstance(individual_type, sa.VARCHAR): + return sa.TEXT() return individual_type @staticmethod @@ -301,15 +284,15 @@ def pick_best_sql_type(sql_type_array: list): precedence_order = [ ARRAY, JSONB, - TEXT, - TIMESTAMP, - DATETIME, - DATE, - TIME, - DECIMAL, + sa.TEXT, + sa.TIMESTAMP, + sa.DATETIME, + sa.DATE, + sa.TIME, + sa.DECIMAL, BIGINT, - INTEGER, - BOOLEAN, + sa.INTEGER, + sa.BOOLEAN, NOTYPE, ] @@ -317,18 +300,18 @@ def pick_best_sql_type(sql_type_array: list): for obj in sql_type_array: if isinstance(obj, sql_type): return obj - return TEXT() + return sa.TEXT() def create_empty_table( # type: ignore[override] self, table_name: str, - meta: sqlalchemy.MetaData, + meta: sa.MetaData, schema: dict, - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, primary_keys: list[str] | None = None, partition_keys: list[str] | None = None, as_temp_table: bool = False, - ) -> sqlalchemy.Table: + ) -> sa.Table: """Create an empty target table. Args: @@ -342,7 +325,7 @@ def create_empty_table( # type: ignore[override] NotImplementedError: if temp tables are unsupported and as_temp_table=True. RuntimeError: if a variant schema is passed with no properties defined. """ - columns: list[sqlalchemy.Column] = [] + columns: list[sa.Column] = [] primary_keys = primary_keys or [] try: properties: dict = schema["properties"] @@ -355,7 +338,7 @@ def create_empty_table( # type: ignore[override] for property_name, property_jsonschema in properties.items(): is_primary_key = property_name in primary_keys columns.append( - sqlalchemy.Column( + sa.Column( property_name, self.to_sql_type(property_jsonschema), primary_key=is_primary_key, @@ -363,24 +346,22 @@ def create_empty_table( # type: ignore[override] ) ) if as_temp_table: - new_table = sqlalchemy.Table( - table_name, meta, *columns, prefixes=["TEMPORARY"] - ) + new_table = sa.Table(table_name, meta, *columns, prefixes=["TEMPORARY"]) new_table.create(bind=connection) return new_table - new_table = sqlalchemy.Table(table_name, meta, *columns) + new_table = sa.Table(table_name, meta, *columns) new_table.create(bind=connection) return new_table def prepare_column( # type: ignore[override] self, schema_name: str, - table: sqlalchemy.Table, + table: sa.Table, column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - connection: sqlalchemy.engine.Connection, - column_object: sqlalchemy.Column | None = None, + sql_type: TypeEngine, + connection: sa.engine.Connection, + column_object: sa.Column | None = None, ) -> None: """Adapt target table to provided schema if possible. @@ -397,7 +378,7 @@ def prepare_column( # type: ignore[override] if not column_exists: self._create_empty_column( - # We should migrate every function to use sqlalchemy.Table + # We should migrate every function to use sa.Table # instead of having to know what the function wants table_name=table.name, column_name=column_name, @@ -421,8 +402,8 @@ def _create_empty_column( # type: ignore[override] schema_name: str, table_name: str, column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - connection: sqlalchemy.engine.Connection, + sql_type: TypeEngine, + connection: sa.engine.Connection, ) -> None: """Create a new column. @@ -451,8 +432,8 @@ def get_column_add_ddl( # type: ignore[override] table_name: str, schema_name: str, column_name: str, - column_type: sqlalchemy.types.TypeEngine, - ) -> sqlalchemy.DDL: + column_type: TypeEngine, + ) -> sa.DDL: """Get the create column DDL statement. Args: @@ -464,9 +445,9 @@ def get_column_add_ddl( # type: ignore[override] Returns: A sqlalchemy DDL instance. """ - column = sqlalchemy.Column(column_name, column_type) + column = sa.Column(column_name, column_type) - return sqlalchemy.DDL( + return sa.DDL( ( 'ALTER TABLE "%(schema_name)s"."%(table_name)s"' "ADD COLUMN %(column_name)s %(column_type)s" @@ -484,9 +465,9 @@ def _adapt_column_type( # type: ignore[override] schema_name: str, table_name: str, column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - connection: sqlalchemy.engine.Connection, - column_object: sqlalchemy.Column | None, + sql_type: TypeEngine, + connection: sa.engine.Connection, + column_object: sa.Column | None, ) -> None: """Adapt table column type to support the new JSON schema type. @@ -498,9 +479,9 @@ def _adapt_column_type( # type: ignore[override] Raises: NotImplementedError: if altering columns is not supported. """ - current_type: sqlalchemy.types.TypeEngine + current_type: TypeEngine if column_object is not None: - current_type = t.cast(sqlalchemy.types.TypeEngine, column_object.type) + current_type = t.cast(TypeEngine, column_object.type) else: current_type = self._get_column_type( schema_name=schema_name, @@ -551,8 +532,8 @@ def get_column_alter_ddl( # type: ignore[override] schema_name: str, table_name: str, column_name: str, - column_type: sqlalchemy.types.TypeEngine, - ) -> sqlalchemy.DDL: + column_type: TypeEngine, + ) -> sa.DDL: """Get the alter column DDL statement. Override this if your database uses a different syntax for altering columns. @@ -565,8 +546,8 @@ def get_column_alter_ddl( # type: ignore[override] Returns: A sqlalchemy DDL instance. """ - column = sqlalchemy.Column(column_name, column_type) - return sqlalchemy.DDL( + column = sa.Column(column_name, column_type) + return sa.DDL( ( 'ALTER TABLE "%(schema_name)s"."%(table_name)s"' "ALTER COLUMN %(column_name)s %(column_type)s" @@ -589,7 +570,7 @@ def get_sqlalchemy_url(self, config: dict) -> str: return cast(str, config["sqlalchemy_url"]) else: - sqlalchemy_url = URL.create( + sqlalchemy_url = sa.engine.URL.create( drivername=config["dialect+driver"], username=config["user"], password=config["password"], @@ -717,8 +698,8 @@ def _get_column_type( # type: ignore[override] schema_name: str, table_name: str, column_name: str, - connection: sqlalchemy.engine.Connection, - ) -> sqlalchemy.types.TypeEngine: + connection: sa.engine.Connection, + ) -> TypeEngine: """Get the SQL type of the declared column. Args: @@ -744,15 +725,15 @@ def _get_column_type( # type: ignore[override] ) raise KeyError(msg) from ex - return t.cast(sqlalchemy.types.TypeEngine, column.type) + return t.cast(TypeEngine, column.type) def get_table_columns( # type: ignore[override] self, schema_name: str, table_name: str, - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, column_names: list[str] | None = None, - ) -> dict[str, sqlalchemy.Column]: + ) -> dict[str, sa.Column]: """Return a list of table columns. Overrode to support schema_name @@ -765,11 +746,11 @@ def get_table_columns( # type: ignore[override] Returns: An ordered list of column objects. """ - inspector = sqlalchemy.inspect(connection) + inspector = sa.inspect(connection) columns = inspector.get_columns(table_name, schema_name) return { - col_meta["name"]: sqlalchemy.Column( + col_meta["name"]: sa.Column( col_meta["name"], col_meta["type"], nullable=col_meta.get("nullable", False), @@ -783,7 +764,7 @@ def column_exists( # type: ignore[override] self, full_table_name: str, column_name: str, - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, ) -> bool: """Determine if the target column already exists. @@ -802,10 +783,10 @@ def column_exists( # type: ignore[override] ) -class NOTYPE(TypeDecorator): +class NOTYPE(sa.TypeDecorator): """Type to use when none is provided in the schema.""" - impl = TEXT + impl = sa.TEXT cache_ok = True def process_bind_param(self, value, dialect): @@ -824,4 +805,4 @@ def python_type(self): def as_generic(self, *args: t.Any, **kwargs: t.Any): """Return the generic type for this column.""" - return TEXT() + return sa.TEXT() diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 5b5f7c99..1b9a7c71 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -3,12 +3,9 @@ import uuid from typing import Any, Dict, Iterable, List, Optional, Union, cast -import sqlalchemy +import sqlalchemy as sa from pendulum import now from singer_sdk.sinks import SQLSink -from sqlalchemy import Column, MetaData, Table, insert, select, update -from sqlalchemy.sql import Executable -from sqlalchemy.sql.expression import bindparam from target_postgres.connector import PostgresConnector @@ -72,10 +69,10 @@ def process_batch(self, context: dict) -> None: Args: context: Stream partition or context dictionary. """ - # Use one connection so we do this all in a single transaction + # Use one connection, so we do this all in a single transaction with self.connector._connect() as connection, connection.begin(): # Check structure of table - table: sqlalchemy.Table = self.connector.prepare_table( + table: sa.Table = self.connector.prepare_table( full_table_name=self.full_table_name, schema=self.schema, primary_keys=self.key_properties, @@ -83,7 +80,7 @@ def process_batch(self, context: dict) -> None: connection=connection, ) # Create a temp table (Creates from the table above) - temp_table: sqlalchemy.Table = self.connector.copy_table_structure( + temp_table: sa.Table = self.connector.copy_table_structure( full_table_name=self.temp_table_name, from_table=table, as_temp_table=True, @@ -119,11 +116,11 @@ def generate_temp_table_name(self): def bulk_insert_records( # type: ignore[override] self, - table: sqlalchemy.Table, + table: sa.Table, schema: dict, records: Iterable[Dict[str, Any]], primary_keys: List[str], - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, ) -> Optional[int]: """Bulk insert records to an existing destination table. @@ -174,11 +171,11 @@ def bulk_insert_records( # type: ignore[override] def upsert( self, - from_table: sqlalchemy.Table, - to_table: sqlalchemy.Table, + from_table: sa.Table, + to_table: sa.Table, schema: dict, join_keys: List[str], - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, ) -> Optional[int]: """Merge upsert data from one table to another. @@ -196,33 +193,33 @@ def upsert( """ if self.append_only is True: # Insert - select_stmt = select(from_table.columns).select_from(from_table) + select_stmt = sa.select(from_table.columns).select_from(from_table) insert_stmt = to_table.insert().from_select( names=from_table.columns, select=select_stmt ) connection.execute(insert_stmt) else: join_predicates = [] - to_table_key: sqlalchemy.Column + to_table_key: sa.Column for key in join_keys: - from_table_key: sqlalchemy.Column = from_table.columns[key] + from_table_key: sa.Column = from_table.columns[key] to_table_key = to_table.columns[key] join_predicates.append(from_table_key == to_table_key) - join_condition = sqlalchemy.and_(*join_predicates) + join_condition = sa.and_(*join_predicates) where_predicates = [] for key in join_keys: to_table_key = to_table.columns[key] where_predicates.append(to_table_key.is_(None)) - where_condition = sqlalchemy.and_(*where_predicates) + where_condition = sa.and_(*where_predicates) select_stmt = ( - select(from_table.columns) + sa.select(from_table.columns) .select_from(from_table.outerjoin(to_table, join_condition)) .where(where_condition) ) - insert_stmt = insert(to_table).from_select( + insert_stmt = sa.insert(to_table).from_select( names=from_table.columns, select=select_stmt ) @@ -232,11 +229,13 @@ def upsert( where_condition = join_condition update_columns = {} for column_name in self.schema["properties"].keys(): - from_table_column: sqlalchemy.Column = from_table.columns[column_name] - to_table_column: sqlalchemy.Column = to_table.columns[column_name] + from_table_column: sa.Column = from_table.columns[column_name] + to_table_column: sa.Column = to_table.columns[column_name] update_columns[to_table_column] = from_table_column - update_stmt = update(to_table).where(where_condition).values(update_columns) + update_stmt = ( + sa.update(to_table).where(where_condition).values(update_columns) + ) connection.execute(update_stmt) return None @@ -244,12 +243,12 @@ def upsert( def column_representation( self, schema: dict, - ) -> List[Column]: + ) -> List[sa.Column]: """Return a sqlalchemy table representation for the current schema.""" - columns: list[Column] = [] + columns: list[sa.Column] = [] for property_name, property_jsonschema in schema["properties"].items(): columns.append( - Column( + sa.Column( property_name, self.connector.to_sql_type(property_jsonschema), ) @@ -259,8 +258,8 @@ def column_representation( def generate_insert_statement( self, full_table_name: str, - columns: List[Column], # type: ignore[override] - ) -> Union[str, Executable]: + columns: List[sa.Column], # type: ignore[override] + ) -> Union[str, sa.sql.Executable]: """Generate an insert statement for the given records. Args: @@ -270,9 +269,9 @@ def generate_insert_statement( Returns: An insert statement. """ - metadata = MetaData() - table = Table(full_table_name, metadata, *columns) - return insert(table) + metadata = sa.MetaData() + table = sa.Table(full_table_name, metadata, *columns) + return sa.insert(table) def conform_name(self, name: str, object_type: Optional[str] = None) -> str: """Conforming names of tables, schemas, column names.""" @@ -343,8 +342,8 @@ def activate_version(self, new_version: int) -> None: connection=connection, ) - metadata = MetaData() - target_table = Table( + metadata = sa.MetaData() + target_table = sa.Table( self.table_name, metadata, autoload_with=connection.engine, @@ -353,8 +352,8 @@ def activate_version(self, new_version: int) -> None: self.logger.info("Hard delete: %s", self.config.get("hard_delete")) if self.config["hard_delete"] is True: - delete_stmt = sqlalchemy.delete(target_table).where( - sqlalchemy.or_( + delete_stmt = sa.delete(target_table).where( + sa.or_( target_table.c[self.version_column_name].is_(None), target_table.c[self.version_column_name] <= new_version, ) @@ -375,19 +374,19 @@ def activate_version(self, new_version: int) -> None: ) # Need to deal with the case where data doesn't exist for the version column update_stmt = ( - update(target_table) + sa.update(target_table) .values( { - target_table.c[self.soft_delete_column_name]: bindparam( + target_table.c[self.soft_delete_column_name]: sa.bindparam( "deletedate" ) } ) .where( - sqlalchemy.and_( - sqlalchemy.or_( + sa.and_( + sa.or_( target_table.c[self.version_column_name] - < bindparam("version"), + < sa.bindparam("version"), target_table.c[self.version_column_name].is_(None), ), target_table.c[self.soft_delete_column_name].is_(None), diff --git a/target_postgres/tests/core.py b/target_postgres/tests/core.py index ba5662be..dc0ece69 100644 --- a/target_postgres/tests/core.py +++ b/target_postgres/tests/core.py @@ -1,6 +1,6 @@ """ Config and base values for target-postgres testing """ # flake8: noqa -import sqlalchemy +import sqlalchemy as sa from target_postgres.target import TargetPostgres @@ -53,7 +53,7 @@ def postgres_config_ssh_tunnel(): } -def create_engine(target_postgres: TargetPostgres) -> sqlalchemy.engine.Engine: +def create_engine(target_postgres: TargetPostgres) -> sa.engine.Engine: return TargetPostgres.default_sink_class.connector_class( config=target_postgres.config )._engine diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index d20fcf0a..9b04a738 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -8,11 +8,10 @@ import jsonschema import pytest -import sqlalchemy +import sqlalchemy as sa from singer_sdk.exceptions import MissingKeyPropertiesError from singer_sdk.testing import get_target_test_class, sync_end_to_end from sqlalchemy.dialects.postgresql import ARRAY, JSONB -from sqlalchemy.types import BIGINT, TEXT, TIMESTAMP from target_postgres.connector import PostgresConnector from target_postgres.target import TargetPostgres @@ -113,7 +112,7 @@ def verify_data( if primary_key is not None and check_data is not None: if isinstance(check_data, dict): result = connection.execute( - sqlalchemy.text( + sa.text( f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" ) ) @@ -122,7 +121,7 @@ def verify_data( assert result_dict == check_data elif isinstance(check_data, list): result = connection.execute( - sqlalchemy.text( + sa.text( f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" ) ) @@ -136,7 +135,7 @@ def verify_data( raise ValueError("Invalid check_data - not dict or list of dicts") else: result = connection.execute( - sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + sa.text(f"SELECT COUNT(*) FROM {full_table_name}") ) assert result.first()[0] == number_of_rows @@ -156,10 +155,8 @@ def verify_schema( """ schema = self.target.config["default_target_schema"] with self.engine.connect() as connection: - meta = sqlalchemy.MetaData() - table = sqlalchemy.Table( - table_name, meta, schema=schema, autoload_with=connection - ) + meta = sa.MetaData() + table = sa.Table(table_name, meta, schema=schema, autoload_with=connection) for column in table.c: # Ignore `_sdc` metadata columns when veriying table schema. if column.name.startswith(self.metadata_column_prefix): @@ -224,7 +221,7 @@ def test_port_default_config(): target_config = TargetPostgres(config=config).config connector = PostgresConnector(target_config) - engine: sqlalchemy.engine.Engine = connector._engine + engine: sa.engine.Engine = connector._engine assert ( engine.url.render_as_string(hide_password=False) == f"{dialect_driver}://{user}:{password}@{host}:5432/{database}" @@ -249,7 +246,7 @@ def test_port_config(): target_config = TargetPostgres(config=config).config connector = PostgresConnector(target_config) - engine: sqlalchemy.engine.Engine = connector._engine + engine: sa.engine.Engine = connector._engine assert ( engine.url.render_as_string(hide_password=False) == f"{dialect_driver}://{user}:{password}@{host}:5433/{database}" @@ -431,7 +428,7 @@ def test_no_primary_keys(postgres_target, helper): table_name = "test_no_pk" full_table_name = postgres_target.config["default_target_schema"] + "." + table_name with engine.connect() as connection, connection.begin(): - connection.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}")) + connection.execute(sa.text(f"DROP TABLE IF EXISTS {full_table_name}")) file_name = f"{table_name}.singer" singer_file_to_target(file_name, postgres_target) @@ -467,7 +464,7 @@ def test_array_boolean(postgres_target, helper): helper.verify_schema( "array_boolean", check_columns={ - "id": {"type": BIGINT}, + "id": {"type": sa.BIGINT}, "value": {"type": ARRAY}, }, ) @@ -481,7 +478,7 @@ def test_array_number(postgres_target, helper): helper.verify_schema( "array_number", check_columns={ - "id": {"type": BIGINT}, + "id": {"type": sa.BIGINT}, "value": {"type": ARRAY}, }, ) @@ -495,7 +492,7 @@ def test_array_string(postgres_target, helper): helper.verify_schema( "array_string", check_columns={ - "id": {"type": BIGINT}, + "id": {"type": sa.BIGINT}, "value": {"type": ARRAY}, }, ) @@ -509,7 +506,7 @@ def test_array_timestamp(postgres_target, helper): helper.verify_schema( "array_timestamp", check_columns={ - "id": {"type": BIGINT}, + "id": {"type": sa.BIGINT}, "value": {"type": ARRAY}, }, ) @@ -536,7 +533,7 @@ def test_object_mixed(postgres_target, helper): helper.verify_schema( "object_mixed", check_columns={ - "id": {"type": BIGINT}, + "id": {"type": sa.BIGINT}, "value": {"type": JSONB}, }, ) @@ -593,21 +590,21 @@ def test_anyof(postgres_target, helper): table_name, check_columns={ # {"type":"string"} - "id": {"type": TEXT}, + "id": {"type": sa.TEXT}, # Any of nullable date-time. # Note that postgres timestamp is equivalent to jsonschema date-time. # {"anyOf":[{"type":"string","format":"date-time"},{"type":"null"}]} - "authored_date": {"type": TIMESTAMP}, - "committed_date": {"type": TIMESTAMP}, + "authored_date": {"type": sa.TIMESTAMP}, + "committed_date": {"type": sa.TIMESTAMP}, # Any of nullable array of strings or single string. # {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]} "parent_ids": {"type": ARRAY}, # Any of nullable string. # {"anyOf":[{"type":"string"},{"type":"null"}]} - "commit_message": {"type": TEXT}, + "commit_message": {"type": sa.TEXT}, # Any of nullable string or integer. # {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]} - "legacy_id": {"type": TEXT}, + "legacy_id": {"type": sa.TEXT}, }, ) @@ -629,29 +626,29 @@ def test_activate_version_hard_delete(postgres_config_no_ssl): engine = create_engine(pg_hard_delete_true) singer_file_to_target(file_name, pg_hard_delete_true) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 with engine.connect() as connection, connection.begin(): # Add a record like someone would if they weren't using the tap target combo result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')" ) ) result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')" ) ) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 singer_file_to_target(file_name, pg_hard_delete_true) # Should remove the 2 records we added manually with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 @@ -662,42 +659,40 @@ def test_activate_version_soft_delete(postgres_target): file_name = f"{table_name}.singer" full_table_name = postgres_target.config["default_target_schema"] + "." + table_name with engine.connect() as connection, connection.begin(): - result = connection.execute( - sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}") - ) + result = connection.execute(sa.text(f"DROP TABLE IF EXISTS {full_table_name}")) postgres_config_soft_delete = copy.deepcopy(postgres_target._config) postgres_config_soft_delete["hard_delete"] = False pg_soft_delete = TargetPostgres(config=postgres_config_soft_delete) singer_file_to_target(file_name, pg_soft_delete) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 with engine.connect() as connection, connection.begin(): # Add a record like someone would if they weren't using the tap target combo result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')" ) ) result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')" ) ) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 singer_file_to_target(file_name, pg_soft_delete) # Should have all records including the 2 we added manually with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 result = connection.execute( - sqlalchemy.text( + sa.text( f"SELECT * FROM {full_table_name} where {METADATA_COLUMN_PREFIX}_deleted_at is NOT NULL" ) ) @@ -711,9 +706,7 @@ def test_activate_version_deletes_data_properly(postgres_target): file_name = f"{table_name}.singer" full_table_name = postgres_target.config["default_target_schema"] + "." + table_name with engine.connect() as connection, connection.begin(): - result = connection.execute( - sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}") - ) + result = connection.execute(sa.text(f"DROP TABLE IF EXISTS {full_table_name}")) postgres_config_soft_delete = copy.deepcopy(postgres_target._config) postgres_config_soft_delete["hard_delete"] = True @@ -721,27 +714,27 @@ def test_activate_version_deletes_data_properly(postgres_target): singer_file_to_target(file_name, pg_hard_delete) # Will populate us with 7 records with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 with engine.connect() as connection, connection.begin(): result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual1', 'Meltano')" ) ) result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual2', 'Meltano')" ) ) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 # Only has a schema and one activate_version message, should delete all records as it's a higher version than what's currently in the table file_name = f"{table_name}_2.singer" singer_file_to_target(file_name, pg_hard_delete) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 0 @@ -782,7 +775,7 @@ def test_postgres_ssl_no_config(postgres_config_no_ssl): postgres_config_modified = copy.deepcopy(postgres_config_no_ssl) postgres_config_modified["port"] = 5432 - with pytest.raises(sqlalchemy.exc.OperationalError): + with pytest.raises(sa.exc.OperationalError): target = TargetPostgres(config=postgres_config_modified) sync_end_to_end(tap, target) @@ -808,8 +801,8 @@ def test_postgres_ssl_public_pkey(postgres_config): postgres_config_modified["ssl_client_private_key"] = "./ssl/public_pkey.key" # If the private key exists but access is too public, the target won't fail until - # the it attempts to establish a connection to the database. - with pytest.raises(sqlalchemy.exc.OperationalError): + # it attempts to establish a connection to the database. + with pytest.raises(sa.exc.OperationalError): target = TargetPostgres(config=postgres_config_modified) sync_end_to_end(tap, target) @@ -838,7 +831,7 @@ def test_postgres_ssl_invalid_cn(postgres_config): postgres_config_modified["host"] = "127.0.0.1" postgres_config_modified["ssl_mode"] = "verify-full" - with pytest.raises(sqlalchemy.exc.OperationalError): + with pytest.raises(sa.exc.OperationalError): target = TargetPostgres(config=postgres_config_modified) sync_end_to_end(tap, target) @@ -871,7 +864,7 @@ def test_postgres_ssl_unsupported(postgres_config): postgres_config_modified = copy.deepcopy(postgres_config) postgres_config_modified["port"] = 5433 # Alternate service: postgres_no_ssl - with pytest.raises(sqlalchemy.exc.OperationalError): + with pytest.raises(sa.exc.OperationalError): target = TargetPostgres(config=postgres_config_modified) sync_end_to_end(tap, target)