Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Add support for transferring postgres range types as json objects
Browse files Browse the repository at this point in the history
  • Loading branch information
gwax committed Dec 28, 2022
1 parent a97d7d8 commit 76851a8
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 6 deletions.
16 changes: 16 additions & 0 deletions tap_postgres/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,22 @@ def selected_value_to_singer_value_impl(elem, sql_datatype):
cleaned_elem = elem
else:
raise Exception(f"do not know how to marshall a dict if its not an hstore or json: {sql_datatype}")
elif 'range' in sql_datatype:
child_type = {
'int4range': 'int4',
'int8range': 'int8',
'numrange': 'numeric',
'tsrange': 'timestamp without time zone',
'tstzrange': 'timestamp with time zone',
'daterange': 'date',
}.get(sql_datatype, None)
if child_type is None:
raise Exception(f"do not know how to marshall a range of type {sql_datatype}")
cleaned_elem = {
'lower': selected_value_to_singer_value_impl(elem.lower, child_type),
'upper': selected_value_to_singer_value_impl(elem.upper, child_type),
'bounds': elem._bounds,
}
else:
raise Exception(
f"do not know how to marshall value of class( {elem.__class__} ) and sql_datatype ( {sql_datatype} )")
Expand Down
4 changes: 4 additions & 0 deletions tap_postgres/discovery_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ def schema_for_column_datatype(col):
schema['type'] = nullable_column('string', col.is_primary_key)
return schema

if 'range' in data_type:
schema['type'] = nullable_column('object', col.is_primary_key)
return schema

return schema


Expand Down
66 changes: 66 additions & 0 deletions tests/integration/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,72 @@ def test_catalog(self):
'type': 'object'},
stream_dict.get('schema'))

class TestRangeTables(unittest.TestCase):
maxDiff = None
table_name = 'CHICKEN TIMES'

def setUp(self):
table_spec = {"columns": [{"name": 'our_int_range', "type": "int4range"},
{"name": 'our_tstz_range', "type": "tstzrange"}],
"name": TestRangeTables.table_name}
ensure_test_table(table_spec)

def test_catalog(self):
conn_config = get_test_connection_config()

my_stdout = io.StringIO()
with contextlib.redirect_stdout(my_stdout):
streams = tap_postgres.do_discovery(conn_config)

chicken_streams = [s for s in streams if s['tap_stream_id'] == 'public-CHICKEN TIMES']
self.assertEqual(len(chicken_streams), 1)
stream_dict = chicken_streams[0]

stream_dict.get('metadata').sort(key=lambda md: md['breadcrumb'])

self.assertEqual(metadata.to_map(stream_dict.get('metadata')),
{(): {'database-name': 'postgres',
'is-view': False,
'row-count': 0,
'schema-name': 'public',
'table-key-properties': []},
('properties', 'our_int_range'): {'inclusion': 'available',
'selected-by-default': True,
'sql-datatype': 'int4range'},
('properties', 'our_tstz_range'): {'inclusion': 'available',
'selected-by-default': True,
'sql-datatype': 'tstzrange'}})

self.assertEqual(stream_dict.get('schema'),
{'definitions': {'sdc_recursive_boolean_array': {'items': {'$ref': '#/definitions/sdc_recursive_boolean_array'},
'type': ['null',
'boolean',
'array']},
'sdc_recursive_integer_array': {'items': {'$ref': '#/definitions/sdc_recursive_integer_array'},
'type': ['null',
'integer',
'array']},
'sdc_recursive_number_array': {'items': {'$ref': '#/definitions/sdc_recursive_number_array'},
'type': ['null',
'number',
'array']},
'sdc_recursive_object_array': {'items': {'$ref': '#/definitions/sdc_recursive_object_array'},
'type': ['null',
'object',
'array']},
'sdc_recursive_string_array': {'items': {'$ref': '#/definitions/sdc_recursive_string_array'},
'type': ['null',
'string',
'array']},
'sdc_recursive_timestamp_array': {'format': 'date-time',
'items': {'$ref': '#/definitions/sdc_recursive_timestamp_array'},
'type': ['null',
'string',
'array']}},
'properties': {'our_int_range': {'type': ['null', 'object']},
'our_tstz_range': {'type': ['null', 'object']}},
'type': 'object'})


class TestUUIDTables(unittest.TestCase):
maxDiff = None
Expand Down
15 changes: 12 additions & 3 deletions tests/integration/test_logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def setUpClass(cls) -> None:
{"name": 'colour', "type": "character varying"},
{"name": 'timestamp_ntz', "type": "timestamp without time zone"},
{"name": 'timestamp_tz', "type": "timestamp with time zone"},
{"name": 'int_range', "type": "int4range"},
],
"name": cls.table_name}

Expand Down Expand Up @@ -53,19 +54,22 @@ def test_logical_replication(self):
'name': 'betty',
'colour': 'blue',
'timestamp_ntz': '2020-09-01 10:40:59',
'timestamp_tz': '2020-09-01 00:50:59+02'
'timestamp_tz': '2020-09-01 00:50:59+02',
'int_range': '[1,2)',
},
{
'name': 'smelly',
'colour': 'brown',
'timestamp_ntz': '2020-09-01 10:40:59 BC',
'timestamp_tz': '2020-09-01 00:50:59+02 BC'
'timestamp_tz': '2020-09-01 00:50:59+02 BC',
'int_range': '[2,5)',
},
{
'name': 'pooper',
'colour': 'green',
'timestamp_ntz': '30000-09-01 10:40:59',
'timestamp_tz': '10000-09-01 00:50:59+02'
'timestamp_tz': '10000-09-01 00:50:59+02',
'int_range': '[100,)',
}
]

Expand Down Expand Up @@ -101,6 +105,7 @@ def test_logical_replication(self):
'name': 'betty',
'timestamp_ntz': '2020-09-01T10:40:59+00:00',
'timestamp_tz': '2020-08-31T22:50:59+00:00',
'int_range': {'lower': 1, 'upper': 2, 'bounds': '[)'},
},
'time_extracted': unittest.mock.ANY,
'version': unittest.mock.ANY
Expand All @@ -114,6 +119,7 @@ def test_logical_replication(self):
'name': 'smelly',
'timestamp_ntz': '9999-12-31T23:59:59.999000+00:00',
'timestamp_tz': '9999-12-31T23:59:59.999000+00:00',
'int_range': {'lower': 2, 'upper': 5, 'bounds': '[)'},
},
'time_extracted': unittest.mock.ANY,
'version': unittest.mock.ANY
Expand All @@ -127,6 +133,7 @@ def test_logical_replication(self):
'name': 'pooper',
'timestamp_ntz': '9999-12-31T23:59:59.999000+00:00',
'timestamp_tz': '9999-12-31T23:59:59.999000+00:00',
'int_range': {'lower': 100, 'upper': None, 'bounds': '[)'},
},
'time_extracted': unittest.mock.ANY,
'version': unittest.mock.ANY
Expand Down Expand Up @@ -175,6 +182,7 @@ def test_logical_replication(self):
'name': 'betty',
'timestamp_ntz': '2020-09-01T10:40:59+00:00',
'timestamp_tz': '2020-08-31T22:50:59+00:00',
'int_range': '[1,2)',
},
'time_extracted': unittest.mock.ANY,
'version': unittest.mock.ANY,
Expand All @@ -190,6 +198,7 @@ def test_logical_replication(self):
'nice_flag': False,
'timestamp_ntz': '2022-09-01T10:40:59+00:00',
'timestamp_tz': '9999-12-31T23:59:59.999+00:00',
'int_range': None,
},
'time_extracted': unittest.mock.ANY,
'version': unittest.mock.ANY,
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/test_unsupported_pk.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def setUp(self):
{"name": "circle_col", "type": "circle"},
{"name": "xml_col", "type": "xml"},
{"name": "composite_col", "type": "person_composite"},
{"name": "int_range_col", "type": "int4range"},
],
"name": Unsupported.table_name}
with get_test_connection() as conn:
Expand All @@ -53,7 +52,6 @@ def test_catalog(self):
('properties', 'bit_string_col'): {'sql-datatype': 'bit(5)', 'selected-by-default': False, 'inclusion': 'unsupported'},
('properties', 'line_col'): {'sql-datatype': 'line', 'selected-by-default': False, 'inclusion': 'unsupported'},
('properties', 'xml_col'): {'sql-datatype': 'xml', 'selected-by-default': False, 'inclusion': 'unsupported'},
('properties', 'int_range_col'): {'sql-datatype': 'int4range', 'selected-by-default': False, 'inclusion': 'unsupported'},
('properties', 'circle_col'): {'sql-datatype': 'circle', 'selected-by-default': False, 'inclusion': 'unsupported'},
('properties', 'polygon_col'): {'sql-datatype': 'polygon', 'selected-by-default': False, 'inclusion': 'unsupported'},
('properties', 'box_col'): {'sql-datatype': 'box', 'selected-by-default': False, 'inclusion': 'unsupported'},
Expand Down
13 changes: 12 additions & 1 deletion tests/unit/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import unittest

import datetime

import psycopg2.extras
from tap_postgres import db


Expand Down Expand Up @@ -171,6 +171,17 @@ def test_selected_value_to_singer_value_impl_with_non_empty_jsonb_returns_equiva
'key2': [{'kk': 'yo'}, {}]
}, output)

def test_selected_value_to_singer_value_impl_with_intrange(self):
output = db.selected_value_to_singer_value_impl(psycopg2.extras.NumericRange(1,4,'[)'), 'int4range')

self.assertEqual({'lower': 1, 'upper': 4, 'bounds': '[)'}, output)

def test_selected_value_to_singer_value_impl_with_tstzrange(self):
output = db.selected_value_to_singer_value_impl(psycopg2.extras.DateTimeTZRange(datetime.datetime(2020,1,2,3,4,5,tzinfo=datetime.timezone.utc),None,'[)'), 'tstzrange')

self.assertEqual({'lower': '2020-01-02T03:04:05+00:00', 'upper': None, 'bounds': '[)'}, output)


def test_fully_qualified_column_name(self):
schema = 'foo_schema'
table = 'foo_table'
Expand Down

0 comments on commit 76851a8

Please sign in to comment.