diff --git a/tap_postgres/stream_utils.py b/tap_postgres/stream_utils.py index 84a4f46e..c881be61 100644 --- a/tap_postgres/stream_utils.py +++ b/tap_postgres/stream_utils.py @@ -4,7 +4,7 @@ import singer from typing import List, Dict -from singer import metadata +from singer import metadata as metadata_util from tap_postgres.db import open_connection from tap_postgres.discovery_utils import discover_db @@ -29,7 +29,7 @@ def is_selected_via_metadata(stream: Dict) -> bool: Returns: True if selected, False otherwise. """ - table_md = metadata.to_map(stream['metadata']).get((), {}) + table_md = metadata_util.to_map(stream['metadata']).get((), {}) return table_md.get('selected', False) @@ -72,29 +72,50 @@ def refresh_streams_schema(conn_config: Dict, streams: List[Dict]): for stream in discover_db(conn, conn_config.get('filter_schemas'), [st['table_name'] for st in streams]) } - LOGGER.debug('New discovery schemas %s', new_discovery) + LOGGER.debug('New discovery schemas %s', new_discovery) - # For every stream dictionary, update the schema and metadata from the new discovery - for idx, stream in enumerate(streams): - # update schema - streams[idx]['schema'] = copy.deepcopy(new_discovery[stream['tap_stream_id']]['schema']) + # For every stream, update the schema and metadata from the corresponding discovered stream + for idx, stream in enumerate(streams): + discovered_stream = new_discovery[stream['tap_stream_id']] + streams[idx]['schema'] = _merge_stream_schema(stream, discovered_stream) + streams[idx]['metadata'] = _merge_stream_metadata(stream, discovered_stream) - # Update metadata - # - # 1st step: new discovery doesn't contain non-discoverable metadata: e.g replication method & key, selected - # so let's copy those from the original stream object - md_map = metadata.to_map(stream['metadata']) - meta = md_map.get(()) + LOGGER.debug('Updated streams schemas %s', streams) + + +def _merge_stream_schema(stream, discovered_stream): + """ + A discovered stream doesn't include any schema overrides from the catalog + file. Merges overrides from the catalog file into the discovered schema. + """ + discovered_schema = copy.deepcopy(discovered_stream['schema']) - for idx_met, metadatum in enumerate(new_discovery[stream['tap_stream_id']]['metadata']): - if not metadatum['breadcrumb']: - meta.update(new_discovery[stream['tap_stream_id']]['metadata'][idx_met]['metadata']) - new_discovery[stream['tap_stream_id']]['metadata'][idx_met]['metadata'] = meta + for column, column_schema in stream['schema']['properties'].items(): + if column in discovered_schema['properties'] and column_schema != discovered_schema['properties'][column]: + override = copy.deepcopy(stream['schema']['properties'][column]) + LOGGER.debug('Overriding schema for %s with %s', column, override) + discovered_schema['properties'][column].update(override) - # 2nd step: now copy all the metadata from the updated new discovery to the original stream - streams[idx]['metadata'] = copy.deepcopy(new_discovery[stream['tap_stream_id']]['metadata']) + return discovered_schema - LOGGER.debug('Updated streams schemas %s', streams) + +def _merge_stream_metadata(stream, discovered_stream): + """ + Discovered metadata for a stream doesn't contain non-discoverable + keys/values such as replication method, key, selected, or any other + arbitrary overridden metadata from the catalog file. Merges the discovered + metadata into the metadata from the catalog file. + """ + stream_md = metadata_util.to_map(stream['metadata']) + discovery_md = metadata_util.to_map(discovered_stream['metadata']) + + for breadcrumb, metadata in discovery_md.items(): + if breadcrumb in stream_md: + stream_md[breadcrumb].update(metadata) + else: + stream_md[breadcrumb] = metadata + + return copy.deepcopy(metadata_util.to_list(stream_md)) def any_logical_streams(streams, default_replication_method): @@ -102,7 +123,7 @@ def any_logical_streams(streams, default_replication_method): Checks if streams list contains any stream with log_based method """ for stream in streams: - stream_metadata = metadata.to_map(stream['metadata']) + stream_metadata = metadata_util.to_map(stream['metadata']) replication_method = stream_metadata.get((), {}).get('replication-method', default_replication_method) if replication_method == 'LOG_BASED': return True diff --git a/tests/test_streams_utils.py b/tests/test_streams_utils.py index 417c437c..2743e0c1 100644 --- a/tests/test_streams_utils.py +++ b/tests/test_streams_utils.py @@ -29,7 +29,8 @@ def setUp(self): {"name": '"character-varying_name"', "type": "character varying"}, {"name": '"varchar-name"', "type": "varchar(28)"}, {"name": 'char_name', "type": "char(10)"}, - {"name": '"text-name"', "type": "text"}], + {"name": '"text-name"', "type": "text"}, + {"name": "json_name", "type": "jsonb"}], "name": self.table_name} ensure_test_table(table_spec) @@ -42,7 +43,7 @@ def test_refresh_streams_schema(self): 'table_name': self.table_name, 'stream': self.table_name, 'tap_stream_id': f'public-{self.table_name}', - 'schema': [], + 'schema': {'properties': {'json_name': {'type': ['null', 'string']}}}, 'metadata': [ { 'breadcrumb': [], @@ -51,6 +52,12 @@ def test_refresh_streams_schema(self): 'table-key-properties': ['some_id'], 'row-count': 1000, } + }, + { + 'breadcrumb': ['properties', 'char_name'], + 'metadata': { + 'arbitrary_field_metadata': 'should be preserved' + } } ] } @@ -86,7 +93,11 @@ def test_refresh_streams_schema(self): 'selected-by-default': True}, ('properties', 'char_name'): {'selected-by-default': True, 'inclusion': 'available', - 'sql-datatype': 'character'}}) + 'sql-datatype': 'character', + 'arbitrary_field_metadata': 'should be preserved'}, + ('properties', 'json_name'): {'selected-by-default': True, + 'inclusion': 'available', + 'sql-datatype': 'jsonb'}}) self.assertEqual({'properties': {'id': {'type': ['integer'], 'maximum': 2147483647, @@ -94,6 +105,7 @@ def test_refresh_streams_schema(self): 'character-varying_name': {'type': ['null', 'string']}, 'varchar-name': {'type': ['null', 'string'], 'maxLength': 28}, 'char_name': {'type': ['null', 'string'], 'maxLength': 10}, - 'text-name': {'type': ['null', 'string']}}, + 'text-name': {'type': ['null', 'string']}, + 'json_name': {'type': ['null', 'string']}}, 'type': 'object', 'definitions': BASE_RECURSIVE_SCHEMAS}, streams[0].get('schema'))