Skip to content

Commit

Permalink
Allow overrides from catalog in refresh_streams_schema
Browse files Browse the repository at this point in the history
Fixes transferwise#128

- Moves the bulk of the code out of the context manager
  (open_connection) since it isn't needed.
- Creates private functions for merging existing metadata and schema to
  dicts with those in `new_discovery`.
- Aliases `metadata` (from singer) to `metadata_util`. I kept using
  `metadata` as a local var when developing by accident and breaking
  things.
  • Loading branch information
deanmorin committed Oct 7, 2021
1 parent b1d5b55 commit cb914eb
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 25 deletions.
63 changes: 42 additions & 21 deletions tap_postgres/stream_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import singer

from typing import List, Dict
from singer import metadata
from singer import metadata as metadata_util

from tap_postgres.db import open_connection
from tap_postgres.discovery_utils import discover_db
Expand All @@ -29,7 +29,7 @@ def is_selected_via_metadata(stream: Dict) -> bool:
Returns: True if selected, False otherwise.
"""
table_md = metadata.to_map(stream['metadata']).get((), {})
table_md = metadata_util.to_map(stream['metadata']).get((), {})
return table_md.get('selected', False)


Expand Down Expand Up @@ -72,37 +72,58 @@ def refresh_streams_schema(conn_config: Dict, streams: List[Dict]):
for stream in discover_db(conn, conn_config.get('filter_schemas'), [st['table_name'] for st in streams])
}

LOGGER.debug('New discovery schemas %s', new_discovery)
LOGGER.debug('New discovery schemas %s', new_discovery)

# For every stream dictionary, update the schema and metadata from the new discovery
for idx, stream in enumerate(streams):
# update schema
streams[idx]['schema'] = copy.deepcopy(new_discovery[stream['tap_stream_id']]['schema'])
# For every stream, update the schema and metadata from the corresponding discovered stream
for idx, stream in enumerate(streams):
discovered_stream = new_discovery[stream['tap_stream_id']]
streams[idx]['schema'] = _merge_stream_schema(stream, discovered_stream)
streams[idx]['metadata'] = _merge_stream_metadata(stream, discovered_stream)

# Update metadata
#
# 1st step: new discovery doesn't contain non-discoverable metadata: e.g replication method & key, selected
# so let's copy those from the original stream object
md_map = metadata.to_map(stream['metadata'])
meta = md_map.get(())
LOGGER.debug('Updated streams schemas %s', streams)


def _merge_stream_schema(stream, discovered_stream):
"""
A discovered stream doesn't include any schema overrides from the catalog
file. Merges overrides from the catalog file into the discovered schema.
"""
discovered_schema = copy.deepcopy(discovered_stream['schema'])

for idx_met, metadatum in enumerate(new_discovery[stream['tap_stream_id']]['metadata']):
if not metadatum['breadcrumb']:
meta.update(new_discovery[stream['tap_stream_id']]['metadata'][idx_met]['metadata'])
new_discovery[stream['tap_stream_id']]['metadata'][idx_met]['metadata'] = meta
for column, column_schema in stream['schema']['properties'].items():
if column in discovered_schema['properties'] and column_schema != discovered_schema['properties'][column]:
override = copy.deepcopy(stream['schema']['properties'][column])
LOGGER.debug('Overriding schema for %s with %s', column, override)
discovered_schema['properties'][column].update(override)

# 2nd step: now copy all the metadata from the updated new discovery to the original stream
streams[idx]['metadata'] = copy.deepcopy(new_discovery[stream['tap_stream_id']]['metadata'])
return discovered_schema

LOGGER.debug('Updated streams schemas %s', streams)

def _merge_stream_metadata(stream, discovered_stream):
"""
Discovered metadata for a stream doesn't contain non-discoverable
keys/values such as replication method, key, selected, or any other
arbitrary overridden metadata from the catalog file. Merges the discovered
metadata into the metadata from the catalog file.
"""
stream_md = metadata_util.to_map(stream['metadata'])
discovery_md = metadata_util.to_map(discovered_stream['metadata'])

for breadcrumb, metadata in discovery_md.items():
if breadcrumb in stream_md:
stream_md[breadcrumb].update(metadata)
else:
stream_md[breadcrumb] = metadata

return copy.deepcopy(metadata_util.to_list(stream_md))


def any_logical_streams(streams, default_replication_method):
"""
Checks if streams list contains any stream with log_based method
"""
for stream in streams:
stream_metadata = metadata.to_map(stream['metadata'])
stream_metadata = metadata_util.to_map(stream['metadata'])
replication_method = stream_metadata.get((), {}).get('replication-method', default_replication_method)
if replication_method == 'LOG_BASED':
return True
Expand Down
20 changes: 16 additions & 4 deletions tests/test_streams_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def setUp(self):
{"name": '"character-varying_name"', "type": "character varying"},
{"name": '"varchar-name"', "type": "varchar(28)"},
{"name": 'char_name', "type": "char(10)"},
{"name": '"text-name"', "type": "text"}],
{"name": '"text-name"', "type": "text"},
{"name": "json_name", "type": "jsonb"}],
"name": self.table_name}

ensure_test_table(table_spec)
Expand All @@ -42,7 +43,7 @@ def test_refresh_streams_schema(self):
'table_name': self.table_name,
'stream': self.table_name,
'tap_stream_id': f'public-{self.table_name}',
'schema': [],
'schema': {'properties': {'json_name': {'type': ['null', 'string']}}},
'metadata': [
{
'breadcrumb': [],
Expand All @@ -51,6 +52,12 @@ def test_refresh_streams_schema(self):
'table-key-properties': ['some_id'],
'row-count': 1000,
}
},
{
'breadcrumb': ['properties', 'char_name'],
'metadata': {
'arbitrary_field_metadata': 'should be preserved'
}
}
]
}
Expand Down Expand Up @@ -86,14 +93,19 @@ def test_refresh_streams_schema(self):
'selected-by-default': True},
('properties', 'char_name'): {'selected-by-default': True,
'inclusion': 'available',
'sql-datatype': 'character'}})
'sql-datatype': 'character',
'arbitrary_field_metadata': 'should be preserved'},
('properties', 'json_name'): {'selected-by-default': True,
'inclusion': 'available',
'sql-datatype': 'jsonb'}})

self.assertEqual({'properties': {'id': {'type': ['integer'],
'maximum': 2147483647,
'minimum': -2147483648},
'character-varying_name': {'type': ['null', 'string']},
'varchar-name': {'type': ['null', 'string'], 'maxLength': 28},
'char_name': {'type': ['null', 'string'], 'maxLength': 10},
'text-name': {'type': ['null', 'string']}},
'text-name': {'type': ['null', 'string']},
'json_name': {'type': ['null', 'string']}},
'type': 'object',
'definitions': BASE_RECURSIVE_SCHEMAS}, streams[0].get('schema'))

0 comments on commit cb914eb

Please sign in to comment.