From 895961d0e9aaef12055230ce3ea045388aaf4a12 Mon Sep 17 00:00:00 2001 From: Peter Kosztolanyi Date: Wed, 7 Aug 2019 19:07:02 +0100 Subject: [PATCH] Adds TIME data type support --- README.md | 25 +++++++++ tap_snowflake/__init__.py | 6 ++- tap_snowflake/sync_strategies/common.py | 3 ++ tests/test_tap_snowflake.py | 72 +++++++++++++++++++++++-- tests/utils.py | 5 +- 5 files changed, 103 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 119dcea..b25310f 100644 --- a/README.md +++ b/README.md @@ -91,3 +91,28 @@ is invoked. Incremental replication works in conjunction with a state file to only extract new records each time the tap is invoked. This requires a replication key to be specified in the table's metadata as well. + +### To run tests: + +1. Define environment variables that requires running the tests +``` + export TAP_SNOWFLAKE_ACCOUNT= + export TAP_SNOWFLAKE_DBNAME= + export TAP_SNOWFLAKE_USER= + export TAP_SNOWFLAKE_PASSWORD= + export TAP_SNOWFLAKE_WAREHOUSE= +``` + +2. Install python dependencies in a virtual env and run nose unit and integration tests +``` + python3 -m venv venv + . venv/bin/activate + pip install --upgrade pip + pip install . + pip install nose +``` + +3. To run unit tests: +``` + nosetests +``` diff --git a/tap_snowflake/__init__.py b/tap_snowflake/__init__.py index ab7a4ed..9846817 100644 --- a/tap_snowflake/__init__.py +++ b/tap_snowflake/__init__.py @@ -50,7 +50,7 @@ NUMBER_TYPES = set(['number', 'decimal', 'numeric']) INTEGER_TYPES = set(['int', 'integer', 'bigint', 'smallint']) FLOAT_TYPES = set(['float', 'float4', 'float8', 'real', 'double', 'double precision']) -DATETIME_TYPES = set(['datetime', 'timestamp', 'date', 'time', 'timestamp_ltz', 'timestamp_ntz', 'timestamp_tz']) +DATETIME_TYPES = set(['datetime', 'timestamp', 'date', 'timestamp_ltz', 'timestamp_ntz', 'timestamp_tz']) BINARY_TYPE = set(['binary', 'varbinary']) @@ -81,6 +81,10 @@ def schema_for_column(c): result.type = ['null', 'string'] result.format = 'date-time' + elif data_type == 'time': + result.type = ['null', 'string'] + result.format = 'time' + elif data_type in BINARY_TYPE: result.type = ['null', 'string'] diff --git a/tap_snowflake/sync_strategies/common.py b/tap_snowflake/sync_strategies/common.py index b8d66ce..5523bcd 100644 --- a/tap_snowflake/sync_strategies/common.py +++ b/tap_snowflake/sync_strategies/common.py @@ -113,6 +113,9 @@ def row_to_singer_record(catalog_entry, version, row, columns, time_extracted): timedelta_from_epoch = epoch + elem row_to_persist += (timedelta_from_epoch.isoformat() + '+00:00',) + elif isinstance(elem, datetime.time): + row_to_persist += (str(elem),) + elif isinstance(elem, bytes): # for BIT value, treat 0 as False and anything else as True if 'boolean' in property_type: diff --git a/tests/test_tap_snowflake.py b/tests/test_tap_snowflake.py index 06bc5f9..c8a4f19 100644 --- a/tests/test_tap_snowflake.py +++ b/tests/test_tap_snowflake.py @@ -1,7 +1,9 @@ import unittest +import json import singer import tap_snowflake +import tap_snowflake.sync_strategies.common as common from singer.schema import Schema @@ -26,9 +28,9 @@ class TestTypeMapping(unittest.TestCase): @classmethod def setUpClass(cls): - snowflake_conn = test_utils.get_test_connection() + cls.snowflake_conn = test_utils.get_test_connection() - with snowflake_conn.open_connection() as open_conn: + with cls.snowflake_conn.open_connection() as open_conn: with open_conn.cursor() as cur: cur.execute(''' CREATE TABLE {}.test_type_mapping ( @@ -41,12 +43,31 @@ def setUpClass(cls): c_float FLOAT, c_double DOUBLE, c_date DATE, + c_datetime DATETIME, c_time TIME, c_binary BINARY, c_varbinary VARBINARY(16) )'''.format(SCHEMA_NAME)) - catalog = test_utils.discover_catalog(snowflake_conn) + cur.execute(''' + INSERT INTO {}.test_type_mapping + SELECT 1 + ,12345 + ,123456789.12 + ,123 + ,12345 + ,1234567890 + ,123.123 + ,123.123 + ,'2019-08-01' + ,'2019-08-01 17:23:59' + ,'17:23:59' + ,HEX_ENCODE('binary') + ,HEX_ENCODE('varbinary') + '''.format(SCHEMA_NAME)) + + catalog = test_utils.discover_catalog(cls.snowflake_conn) + cls.stream = catalog.streams[0] cls.schema = catalog.streams[0].schema cls.metadata = catalog.streams[0].metadata @@ -121,7 +142,7 @@ def test_date(self): def test_time(self): self.assertEqual(self.schema.properties['C_TIME'], Schema(['null', 'string'], - format='date-time', + format='time', inclusion='available')) self.assertEqual(self.get_metadata_for_column('C_TIME'), {'selected-by-default': True, @@ -143,6 +164,49 @@ def test_varbinary(self): {'selected-by-default': True, 'sql-datatype': 'binary'}) + def test_row_to_singer_record(self): + """Select every supported data type from snowflake, + generate the singer JSON output message and compare to expected JSON""" + catalog_entry = self.stream + columns = list(catalog_entry.schema.properties.keys()) + select_sql = common.generate_select_sql(catalog_entry, columns) + + # Run query to export data + with self.snowflake_conn.open_connection() as open_conn: + with open_conn.cursor() as cur: + cur.execute(select_sql, params={}) + row = cur.fetchone() + + # Convert the exported data to singer JSON + record_message = common.row_to_singer_record(catalog_entry=catalog_entry, + version=1, + row=row, + columns=columns, + time_extracted=singer.utils.now()) + + # Convert to formatted JSON + formatted_record = singer.messages.format_message(record_message) + + # Reload the generated JSON to object and assert keys + self.assertEquals(json.loads(formatted_record)['type'], 'RECORD') + self.assertEquals(json.loads(formatted_record)['stream'], 'TEST_TYPE_MAPPING') + self.assertEquals(json.loads(formatted_record)['record'], + { + 'C_PK': 1, + 'C_DECIMAL': 12345, + 'C_DECIMAL_2': 123456789.12, + 'C_SMALLINT': 123, + 'C_INT': 12345, + 'C_BIGINT': 1234567890, + 'C_FLOAT': 123.123, + 'C_DOUBLE': 123.123, + 'C_DATE': '2019-08-01T00:00:00+00:00', + 'C_DATETIME': '2019-08-01T17:23:59+00:00', + 'C_TIME': '17:23:59', + 'C_BINARY': '62696e617279', + 'C_VARBINARY': '76617262696e617279' + }) + class TestSelectsAppropriateColumns(unittest.TestCase): diff --git a/tests/utils.py b/tests/utils.py index c604f6b..145b8f1 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -3,7 +3,6 @@ import snowflake.connector import tap_snowflake -import tap_snowflake.sync_strategies.common as common from tap_snowflake.connection import SnowflakeConnection SCHEMA_NAME='tap_snowflake_test' @@ -42,9 +41,9 @@ def get_test_connection(): return snowflake_conn -def discover_catalog(connection): +def discover_catalog(snowflake_conn): tap_config = get_tap_config() - catalog = tap_snowflake.discover_catalog(connection, tap_config) + catalog = tap_snowflake.discover_catalog(snowflake_conn, tap_config) streams = [] for stream in catalog.streams: