diff --git a/tap_postgres/stream_utils.py b/tap_postgres/stream_utils.py index 84a4f46e..aed15824 100644 --- a/tap_postgres/stream_utils.py +++ b/tap_postgres/stream_utils.py @@ -72,27 +72,27 @@ def refresh_streams_schema(conn_config: Dict, streams: List[Dict]): for stream in discover_db(conn, conn_config.get('filter_schemas'), [st['table_name'] for st in streams]) } - LOGGER.debug('New discovery schemas %s', new_discovery) - - # For every stream dictionary, update the schema and metadata from the new discovery - for idx, stream in enumerate(streams): - # update schema - streams[idx]['schema'] = copy.deepcopy(new_discovery[stream['tap_stream_id']]['schema']) - - # Update metadata - # - # 1st step: new discovery doesn't contain non-discoverable metadata: e.g replication method & key, selected - # so let's copy those from the original stream object - md_map = metadata.to_map(stream['metadata']) - meta = md_map.get(()) - - for idx_met, metadatum in enumerate(new_discovery[stream['tap_stream_id']]['metadata']): - if not metadatum['breadcrumb']: - meta.update(new_discovery[stream['tap_stream_id']]['metadata'][idx_met]['metadata']) - new_discovery[stream['tap_stream_id']]['metadata'][idx_met]['metadata'] = meta - - # 2nd step: now copy all the metadata from the updated new discovery to the original stream - streams[idx]['metadata'] = copy.deepcopy(new_discovery[stream['tap_stream_id']]['metadata']) + LOGGER.debug('New discovery schemas %s', new_discovery) + + # For every stream, update the schema and metadata from the new discovery + for idx, stream in enumerate(streams): + # Update schema + streams[idx]['schema'] = copy.deepcopy(new_discovery[stream['tap_stream_id']]['schema']) + + # Create updated metadata + original_stream_metadata_map = metadata.to_map(stream['metadata']) + new_discovery_metadata_map = metadata.to_map(new_discovery[stream['tap_stream_id']]['metadata']) + + for metadata_element_key in new_discovery_metadata_map: + if metadata_element_key in original_stream_metadata_map: + original_stream_metadata_map[metadata_element_key].update(new_discovery_metadata_map[metadata_element_key]) + else: + original_stream_metadata_map[metadata_element_key] = new_discovery_metadata_map[metadata_element_key] + + updated_original_metadata_list = metadata.to_list(original_stream_metadata_map) + + # Copy the updated metadata back into the original data structure that was passed in + streams[idx]['metadata'] = copy.deepcopy(updated_original_metadata_list) LOGGER.debug('Updated streams schemas %s', streams) diff --git a/tests/test_full_table_interruption.py b/tests/test_full_table_interruption.py index 21bc0144..14b7cfca 100644 --- a/tests/test_full_table_interruption.py +++ b/tests/test_full_table_interruption.py @@ -99,7 +99,7 @@ def test_catalog(self): self.assertTrue(blew_up_on_cow) - self.assertEqual(7, len(CAUGHT_MESSAGES)) + self.assertEqual(7, len(CAUGHT_MESSAGES), "Number of Caught Messages") self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA') self.assertIsInstance(CAUGHT_MESSAGES[1], singer.StateMessage) diff --git a/tests/test_streams_utils.py b/tests/test_streams_utils.py index 114ab5c2..0734c7e5 100644 --- a/tests/test_streams_utils.py +++ b/tests/test_streams_utils.py @@ -51,7 +51,14 @@ def test_refresh_streams_schema(self): 'table-key-properties': ['some_id'], 'row-count': 1000, } + }, + { + 'breadcrumb': ['properties', 'char_name'], + 'metadata': { + 'arbitrary_field_metadata': 'should be preserved' + } } + ] } ] @@ -86,7 +93,8 @@ def test_refresh_streams_schema(self): 'selected-by-default': True}, ('properties', 'char_name'): {'selected-by-default': True, 'inclusion': 'available', - 'sql-datatype': 'character'}}) + 'sql-datatype': 'character', + 'arbitrary_field_metadata': 'should be preserved'}}) self.assertEqual({'properties': {'id': {'type': ['integer'], 'maximum': 2147483647,