Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Rab/test run n rows #93

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions tap_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import tap_snowflake.sync_strategies.common as common
import tap_snowflake.sync_strategies.full_table as full_table
import tap_snowflake.sync_strategies.incremental as incremental
import tap_snowflake.sync_strategies.n_rows as n_rows
from tap_snowflake.connection import SnowflakeConnection

LOGGER = singer.get_logger('tap_snowflake')
Expand Down Expand Up @@ -56,7 +57,8 @@
INTEGER_TYPES = set(['int', 'integer', 'bigint', 'smallint'])
FLOAT_TYPES = set(['float', 'float4', 'float8', 'real', 'double', 'double precision'])
DATETIME_TYPES = set(['datetime', 'timestamp', 'date', 'timestamp_ltz', 'timestamp_ntz', 'timestamp_tz'])
BINARY_TYPE = set(['binary', 'varbinary'])
BINARY_TYPES = set(['binary', 'varbinary'])
OBJECT_TYPES = set(['object', 'array'])


def schema_for_column(c):
Expand Down Expand Up @@ -90,10 +92,13 @@ def schema_for_column(c):
result.type = ['null', 'string']
result.format = 'time'

elif data_type in BINARY_TYPE:
elif data_type in BINARY_TYPES:
result.type = ['null', 'string']
result.format = 'binary'

elif data_type in OBJECT_TYPES:
result.type = ['null', 'object', 'array']

else:
result = Schema(None,
inclusion='unsupported',
Expand Down Expand Up @@ -158,6 +163,7 @@ def get_table_columns(snowflake_conn, tables):
,PARSE_JSON(show_columns."data_type"):precision::number AS numeric_precision
,PARSE_JSON(show_columns."data_type"):scale::number AS numeric_scale
FROM show_columns
ORDER BY table_name ASC, column_name ASC
"""
queries.extend([show_columns, select])

Expand Down Expand Up @@ -431,6 +437,18 @@ def do_sync_full_table(snowflake_conn, catalog_entry, state, columns):
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))


def do_sync_n_rows(snowflake_conn, catalog_entry, state, columns):
LOGGER.info('Stream %s is using n-rows replication', catalog_entry.stream)

write_schema_message(catalog_entry)

stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state)

n_rows.sync_table(snowflake_conn, catalog_entry, state, columns, stream_version)

singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))


def sync_streams(snowflake_conn, catalog, state):
for catalog_entry in catalog.streams:
columns = list(catalog_entry.schema.properties.keys())
Expand Down Expand Up @@ -461,6 +479,8 @@ def sync_streams(snowflake_conn, catalog, state):
do_sync_incremental(snowflake_conn, catalog_entry, state, columns)
elif replication_method == 'FULL_TABLE':
do_sync_full_table(snowflake_conn, catalog_entry, state, columns)
elif replication_method == 'N_ROWS':
do_sync_n_rows(snowflake_conn, catalog_entry, state, columns)
else:
raise Exception('Only INCREMENTAL and FULL TABLE replication methods are supported')

Expand Down
7 changes: 7 additions & 0 deletions tap_snowflake/sync_strategies/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import datetime
import singer
import time
import json

import singer.metrics as metrics
from singer import metadata
Expand Down Expand Up @@ -153,6 +154,12 @@ def row_to_singer_record(catalog_entry, version, row, columns, time_extracted):
boolean_representation = True
row_to_persist += (boolean_representation,)

elif 'object' in property_type or property_type == 'object':
obj_rep = None
if elem:
obj_rep = json.loads(elem)
row_to_persist += (obj_rep,)

else:
row_to_persist += (elem,)
rec = dict(zip(columns, row_to_persist))
Expand Down
28 changes: 28 additions & 0 deletions tap_snowflake/sync_strategies/n_rows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env python3
# pylint: disable=duplicate-code,too-many-locals,simplifiable-if-expression

import singer
import tap_snowflake.sync_strategies.common as common

LOGGER = singer.get_logger('tap_snowflake')

def sync_table(snowflake_conn, catalog_entry, state, columns, stream_version):
"""Sync table with N_ROWS"""
# common.whitelist_bookmark_keys(BOOKMARK_KEYS, catalog_entry.tap_stream_id, state)

with snowflake_conn.connect_with_backoff() as open_conn:
with open_conn.cursor() as cur:
select_sql = common.generate_select_sql(catalog_entry, columns) + ' LIMIT 10'
params = {}

common.sync_query(cur,
catalog_entry,
state,
select_sql,
columns,
stream_version,
params)

# clear max pk value and last pk fetched upon successful sync
singer.clear_bookmark(state, catalog_entry.tap_stream_id, 'max_pk_values')
singer.clear_bookmark(state, catalog_entry.tap_stream_id, 'last_pk_fetched')
32 changes: 30 additions & 2 deletions tests/integration/test_tap_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def setUpClass(cls):
c_datetime DATETIME,
c_time TIME,
c_binary BINARY,
c_varbinary VARBINARY(16)
c_varbinary VARBINARY(16),
c_object OBJECT,
c_array ARRAY
)'''.format(SCHEMA_NAME))

cur.execute('''
Expand All @@ -70,6 +72,8 @@ def setUpClass(cls):
,'17:23:59'
,HEX_ENCODE('binary')
,HEX_ENCODE('varbinary')
,parse_json($${{'test_key':['test_val']}}$$)
,array_construct(1, object_construct('foo', 'bar'), array_construct(3,3,3))
'''.format(SCHEMA_NAME))

cur.execute('''
Expand Down Expand Up @@ -241,6 +245,28 @@ def test_varbinary(self):
{'selected-by-default': True,
'sql-datatype': 'binary'})

def test_object(self):
self.assertEqual(
self.dt_schema.properties['C_OBJECT'],
Schema(['null', 'object', 'array'], inclusion='available'),
)
self.assertEqual(
self.get_dt_metadata_for_column('C_OBJECT'),
{'selected-by-default': True,
'sql-datatype': 'object'},
)

def test_array(self):
self.assertEqual(
self.dt_schema.properties['C_ARRAY'],
Schema(['null', 'object', 'array'], inclusion='available'),
)
self.assertEqual(
self.get_dt_metadata_for_column('C_ARRAY'),
{'selected-by-default': True,
'sql-datatype': 'array'},
)

def test_row_to_singer_record(self):
"""Select every supported data type from snowflake,
generate the singer JSON output message and compare to expected JSON"""
Expand Down Expand Up @@ -281,7 +307,9 @@ def test_row_to_singer_record(self):
'C_DATETIME': '2019-08-01T17:23:59+00:00',
'C_TIME': '17:23:59',
'C_BINARY': '62696E617279',
'C_VARBINARY': '76617262696E617279'
'C_VARBINARY': '76617262696E617279',
'C_OBJECT': {'test_key': ['test_val']},
'C_ARRAY': [1, {'foo': 'bar'}, [3, 3, 3]]
})


Expand Down