Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Adds TIME data type support
Browse files Browse the repository at this point in the history
  • Loading branch information
koszti committed Aug 7, 2019
1 parent 58b6af5 commit 895961d
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 8 deletions.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,28 @@ is invoked.
Incremental replication works in conjunction with a state file to only extract
new records each time the tap is invoked. This requires a replication key to be
specified in the table's metadata as well.

### To run tests:

1. Define environment variables that requires running the tests
```
export TAP_SNOWFLAKE_ACCOUNT=<snowflake-account-name>
export TAP_SNOWFLAKE_DBNAME=<snowflake-database-name>
export TAP_SNOWFLAKE_USER=<snowflake-user>
export TAP_SNOWFLAKE_PASSWORD=<snowfale-password>
export TAP_SNOWFLAKE_WAREHOUSE=<snowflake-warehouse>
```

2. Install python dependencies in a virtual env and run nose unit and integration tests
```
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .
pip install nose
```

3. To run unit tests:
```
nosetests
```
6 changes: 5 additions & 1 deletion tap_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
NUMBER_TYPES = set(['number', 'decimal', 'numeric'])
INTEGER_TYPES = set(['int', 'integer', 'bigint', 'smallint'])
FLOAT_TYPES = set(['float', 'float4', 'float8', 'real', 'double', 'double precision'])
DATETIME_TYPES = set(['datetime', 'timestamp', 'date', 'time', 'timestamp_ltz', 'timestamp_ntz', 'timestamp_tz'])
DATETIME_TYPES = set(['datetime', 'timestamp', 'date', 'timestamp_ltz', 'timestamp_ntz', 'timestamp_tz'])
BINARY_TYPE = set(['binary', 'varbinary'])


Expand Down Expand Up @@ -81,6 +81,10 @@ def schema_for_column(c):
result.type = ['null', 'string']
result.format = 'date-time'

elif data_type == 'time':
result.type = ['null', 'string']
result.format = 'time'

elif data_type in BINARY_TYPE:
result.type = ['null', 'string']

Expand Down
3 changes: 3 additions & 0 deletions tap_snowflake/sync_strategies/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ def row_to_singer_record(catalog_entry, version, row, columns, time_extracted):
timedelta_from_epoch = epoch + elem
row_to_persist += (timedelta_from_epoch.isoformat() + '+00:00',)

elif isinstance(elem, datetime.time):
row_to_persist += (str(elem),)

elif isinstance(elem, bytes):
# for BIT value, treat 0 as False and anything else as True
if 'boolean' in property_type:
Expand Down
72 changes: 68 additions & 4 deletions tests/test_tap_snowflake.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import unittest
import json
import singer

import tap_snowflake
import tap_snowflake.sync_strategies.common as common

from singer.schema import Schema

Expand All @@ -26,9 +28,9 @@ class TestTypeMapping(unittest.TestCase):

@classmethod
def setUpClass(cls):
snowflake_conn = test_utils.get_test_connection()
cls.snowflake_conn = test_utils.get_test_connection()

with snowflake_conn.open_connection() as open_conn:
with cls.snowflake_conn.open_connection() as open_conn:
with open_conn.cursor() as cur:
cur.execute('''
CREATE TABLE {}.test_type_mapping (
Expand All @@ -41,12 +43,31 @@ def setUpClass(cls):
c_float FLOAT,
c_double DOUBLE,
c_date DATE,
c_datetime DATETIME,
c_time TIME,
c_binary BINARY,
c_varbinary VARBINARY(16)
)'''.format(SCHEMA_NAME))

catalog = test_utils.discover_catalog(snowflake_conn)
cur.execute('''
INSERT INTO {}.test_type_mapping
SELECT 1
,12345
,123456789.12
,123
,12345
,1234567890
,123.123
,123.123
,'2019-08-01'
,'2019-08-01 17:23:59'
,'17:23:59'
,HEX_ENCODE('binary')
,HEX_ENCODE('varbinary')
'''.format(SCHEMA_NAME))

catalog = test_utils.discover_catalog(cls.snowflake_conn)
cls.stream = catalog.streams[0]
cls.schema = catalog.streams[0].schema
cls.metadata = catalog.streams[0].metadata

Expand Down Expand Up @@ -121,7 +142,7 @@ def test_date(self):
def test_time(self):
self.assertEqual(self.schema.properties['C_TIME'],
Schema(['null', 'string'],
format='date-time',
format='time',
inclusion='available'))
self.assertEqual(self.get_metadata_for_column('C_TIME'),
{'selected-by-default': True,
Expand All @@ -143,6 +164,49 @@ def test_varbinary(self):
{'selected-by-default': True,
'sql-datatype': 'binary'})

def test_row_to_singer_record(self):
"""Select every supported data type from snowflake,
generate the singer JSON output message and compare to expected JSON"""
catalog_entry = self.stream
columns = list(catalog_entry.schema.properties.keys())
select_sql = common.generate_select_sql(catalog_entry, columns)

# Run query to export data
with self.snowflake_conn.open_connection() as open_conn:
with open_conn.cursor() as cur:
cur.execute(select_sql, params={})
row = cur.fetchone()

# Convert the exported data to singer JSON
record_message = common.row_to_singer_record(catalog_entry=catalog_entry,
version=1,
row=row,
columns=columns,
time_extracted=singer.utils.now())

# Convert to formatted JSON
formatted_record = singer.messages.format_message(record_message)

# Reload the generated JSON to object and assert keys
self.assertEquals(json.loads(formatted_record)['type'], 'RECORD')
self.assertEquals(json.loads(formatted_record)['stream'], 'TEST_TYPE_MAPPING')
self.assertEquals(json.loads(formatted_record)['record'],
{
'C_PK': 1,
'C_DECIMAL': 12345,
'C_DECIMAL_2': 123456789.12,
'C_SMALLINT': 123,
'C_INT': 12345,
'C_BIGINT': 1234567890,
'C_FLOAT': 123.123,
'C_DOUBLE': 123.123,
'C_DATE': '2019-08-01T00:00:00+00:00',
'C_DATETIME': '2019-08-01T17:23:59+00:00',
'C_TIME': '17:23:59',
'C_BINARY': '62696e617279',
'C_VARBINARY': '76617262696e617279'
})


class TestSelectsAppropriateColumns(unittest.TestCase):

Expand Down
5 changes: 2 additions & 3 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import snowflake.connector

import tap_snowflake
import tap_snowflake.sync_strategies.common as common
from tap_snowflake.connection import SnowflakeConnection

SCHEMA_NAME='tap_snowflake_test'
Expand Down Expand Up @@ -42,9 +41,9 @@ def get_test_connection():
return snowflake_conn


def discover_catalog(connection):
def discover_catalog(snowflake_conn):
tap_config = get_tap_config()
catalog = tap_snowflake.discover_catalog(connection, tap_config)
catalog = tap_snowflake.discover_catalog(snowflake_conn, tap_config)
streams = []

for stream in catalog.streams:
Expand Down

0 comments on commit 895961d

Please sign in to comment.