diff --git a/tap_snowflake/__init__.py b/tap_snowflake/__init__.py index 6a0b005..f08f3ee 100644 --- a/tap_snowflake/__init__.py +++ b/tap_snowflake/__init__.py @@ -20,6 +20,7 @@ import tap_snowflake.sync_strategies.common as common import tap_snowflake.sync_strategies.full_table as full_table import tap_snowflake.sync_strategies.incremental as incremental +import tap_snowflake.sync_strategies.n_rows as n_rows from tap_snowflake.connection import SnowflakeConnection LOGGER = singer.get_logger('tap_snowflake') @@ -56,7 +57,8 @@ INTEGER_TYPES = set(['int', 'integer', 'bigint', 'smallint']) FLOAT_TYPES = set(['float', 'float4', 'float8', 'real', 'double', 'double precision']) DATETIME_TYPES = set(['datetime', 'timestamp', 'date', 'timestamp_ltz', 'timestamp_ntz', 'timestamp_tz']) -BINARY_TYPE = set(['binary', 'varbinary']) +BINARY_TYPES = set(['binary', 'varbinary']) +OBJECT_TYPES = set(['object', 'array']) def schema_for_column(c): @@ -90,10 +92,13 @@ def schema_for_column(c): result.type = ['null', 'string'] result.format = 'time' - elif data_type in BINARY_TYPE: + elif data_type in BINARY_TYPES: result.type = ['null', 'string'] result.format = 'binary' + elif data_type in OBJECT_TYPES: + result.type = ['null', 'object', 'array'] + else: result = Schema(None, inclusion='unsupported', @@ -158,6 +163,7 @@ def get_table_columns(snowflake_conn, tables): ,PARSE_JSON(show_columns."data_type"):precision::number AS numeric_precision ,PARSE_JSON(show_columns."data_type"):scale::number AS numeric_scale FROM show_columns + ORDER BY table_name ASC, column_name ASC """ queries.extend([show_columns, select]) @@ -431,6 +437,18 @@ def do_sync_full_table(snowflake_conn, catalog_entry, state, columns): singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) +def do_sync_n_rows(snowflake_conn, catalog_entry, state, columns): + LOGGER.info('Stream %s is using n-rows replication', catalog_entry.stream) + + write_schema_message(catalog_entry) + + stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state) + + n_rows.sync_table(snowflake_conn, catalog_entry, state, columns, stream_version) + + singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) + + def sync_streams(snowflake_conn, catalog, state): for catalog_entry in catalog.streams: columns = list(catalog_entry.schema.properties.keys()) @@ -461,6 +479,8 @@ def sync_streams(snowflake_conn, catalog, state): do_sync_incremental(snowflake_conn, catalog_entry, state, columns) elif replication_method == 'FULL_TABLE': do_sync_full_table(snowflake_conn, catalog_entry, state, columns) + elif replication_method == 'N_ROWS': + do_sync_n_rows(snowflake_conn, catalog_entry, state, columns) else: raise Exception('Only INCREMENTAL and FULL TABLE replication methods are supported') diff --git a/tap_snowflake/sync_strategies/common.py b/tap_snowflake/sync_strategies/common.py index c0ba297..bc0da73 100644 --- a/tap_snowflake/sync_strategies/common.py +++ b/tap_snowflake/sync_strategies/common.py @@ -5,6 +5,7 @@ import datetime import singer import time +import json import singer.metrics as metrics from singer import metadata @@ -153,6 +154,12 @@ def row_to_singer_record(catalog_entry, version, row, columns, time_extracted): boolean_representation = True row_to_persist += (boolean_representation,) + elif 'object' in property_type or property_type == 'object': + obj_rep = None + if elem: + obj_rep = json.loads(elem) + row_to_persist += (obj_rep,) + else: row_to_persist += (elem,) rec = dict(zip(columns, row_to_persist)) diff --git a/tap_snowflake/sync_strategies/n_rows.py b/tap_snowflake/sync_strategies/n_rows.py new file mode 100644 index 0000000..4380912 --- /dev/null +++ b/tap_snowflake/sync_strategies/n_rows.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +# pylint: disable=duplicate-code,too-many-locals,simplifiable-if-expression + +import singer +import tap_snowflake.sync_strategies.common as common + +LOGGER = singer.get_logger('tap_snowflake') + +def sync_table(snowflake_conn, catalog_entry, state, columns, stream_version): + """Sync table with N_ROWS""" + # common.whitelist_bookmark_keys(BOOKMARK_KEYS, catalog_entry.tap_stream_id, state) + + with snowflake_conn.connect_with_backoff() as open_conn: + with open_conn.cursor() as cur: + select_sql = common.generate_select_sql(catalog_entry, columns) + ' LIMIT 10' + params = {} + + common.sync_query(cur, + catalog_entry, + state, + select_sql, + columns, + stream_version, + params) + + # clear max pk value and last pk fetched upon successful sync + singer.clear_bookmark(state, catalog_entry.tap_stream_id, 'max_pk_values') + singer.clear_bookmark(state, catalog_entry.tap_stream_id, 'last_pk_fetched') \ No newline at end of file diff --git a/tests/integration/test_tap_snowflake.py b/tests/integration/test_tap_snowflake.py index c195403..a9b8e50 100644 --- a/tests/integration/test_tap_snowflake.py +++ b/tests/integration/test_tap_snowflake.py @@ -52,7 +52,9 @@ def setUpClass(cls): c_datetime DATETIME, c_time TIME, c_binary BINARY, - c_varbinary VARBINARY(16) + c_varbinary VARBINARY(16), + c_object OBJECT, + c_array ARRAY )'''.format(SCHEMA_NAME)) cur.execute(''' @@ -70,6 +72,8 @@ def setUpClass(cls): ,'17:23:59' ,HEX_ENCODE('binary') ,HEX_ENCODE('varbinary') + ,parse_json($${{'test_key':['test_val']}}$$) + ,array_construct(1, object_construct('foo', 'bar'), array_construct(3,3,3)) '''.format(SCHEMA_NAME)) cur.execute(''' @@ -241,6 +245,28 @@ def test_varbinary(self): {'selected-by-default': True, 'sql-datatype': 'binary'}) + def test_object(self): + self.assertEqual( + self.dt_schema.properties['C_OBJECT'], + Schema(['null', 'object', 'array'], inclusion='available'), + ) + self.assertEqual( + self.get_dt_metadata_for_column('C_OBJECT'), + {'selected-by-default': True, + 'sql-datatype': 'object'}, + ) + + def test_array(self): + self.assertEqual( + self.dt_schema.properties['C_ARRAY'], + Schema(['null', 'object', 'array'], inclusion='available'), + ) + self.assertEqual( + self.get_dt_metadata_for_column('C_ARRAY'), + {'selected-by-default': True, + 'sql-datatype': 'array'}, + ) + def test_row_to_singer_record(self): """Select every supported data type from snowflake, generate the singer JSON output message and compare to expected JSON""" @@ -281,7 +307,9 @@ def test_row_to_singer_record(self): 'C_DATETIME': '2019-08-01T17:23:59+00:00', 'C_TIME': '17:23:59', 'C_BINARY': '62696E617279', - 'C_VARBINARY': '76617262696E617279' + 'C_VARBINARY': '76617262696E617279', + 'C_OBJECT': {'test_key': ['test_val']}, + 'C_ARRAY': [1, {'foo': 'bar'}, [3, 3, 3]] })