diff --git a/tap_mysql/sync_strategies/binlog.py b/tap_mysql/sync_strategies/binlog.py index c627297..5f11282 100644 --- a/tap_mysql/sync_strategies/binlog.py +++ b/tap_mysql/sync_strategies/binlog.py @@ -384,30 +384,40 @@ def _run_binlog_sync(mysql_conn, reader, binlog_streams_map, state, config: Dict LOGGER.debug('Difference between event and schema: %s', diff) LOGGER.info('Running discovery ... ') - #run discovery for the current table only - catalog_entry = discover_catalog(mysql_conn, + # run discovery for the current table only + new_catalog_entry = discover_catalog(mysql_conn, config.get('filter_dbs'), catalog_entry.table).streams[0] - selected = {k for k, v in catalog_entry.schema.properties.items() - if common.property_is_selected(catalog_entry, k)} + selected = {k for k, v in new_catalog_entry.schema.properties.items() + if common.property_is_selected(new_catalog_entry, k)} # the new catalog has "stream" property = table name, we need to update that to make it the same as # the result of the "resolve_catalog" function - catalog_entry.stream = tap_stream_id + new_catalog_entry.stream = tap_stream_id # These are the columns we need to select - columns = list(desired_columns(selected, catalog_entry.schema)) + new_columns = desired_columns(selected, new_catalog_entry.schema) + + cols = set(new_catalog_entry.schema.properties.keys()) + + # drop unsupported properties from schema + for col in cols: + if col not in new_columns: + new_catalog_entry.schema.properties.pop(col, None) # Add the _sdc_deleted_at col - add_automatic_properties(catalog_entry, columns) + new_columns = add_automatic_properties(new_catalog_entry, list(new_columns)) - # update this dictionary while we're at it - binlog_streams_map[tap_stream_id]['catalog_entry'] = catalog_entry - binlog_streams_map[tap_stream_id]['desired_columns'] = columns + # send the new scheme to target if we have a new schema + if new_catalog_entry.schema.properties != catalog_entry.schema.properties: + write_schema_message(catalog_entry=new_catalog_entry) + catalog_entry = new_catalog_entry - # send the new scheme to target - write_schema_message(catalog_entry=catalog_entry) + # update this dictionary while we're at it + binlog_streams_map[tap_stream_id]['catalog_entry'] = new_catalog_entry + binlog_streams_map[tap_stream_id]['desired_columns'] = new_columns + columns = new_columns if isinstance(binlog_event, WriteRowsEvent): rows_saved = handle_write_rows_event(binlog_event, diff --git a/tests/test_tap_mysql.py b/tests/test_tap_mysql.py index e651b82..b3258e3 100644 --- a/tests/test_tap_mysql.py +++ b/tests/test_tap_mysql.py @@ -751,6 +751,13 @@ def test_binlog_stream(self): config['server_id'] = "100" tap_mysql.do_sync(self.conn, config, self.catalog, self.state) + + schema_messages = list(filter(lambda m: isinstance(m, singer.SchemaMessage), SINGER_MESSAGES)) + + for schema_msg in schema_messages: + for prop, val in schema_msg.schema['properties'].items(): + self.assertIn('type', val, f'property "{prop}" has no "type" in stream "{schema_msg.stream}"') + record_messages = list(filter(lambda m: isinstance(m, singer.RecordMessage), SINGER_MESSAGES)) message_types = [type(m) for m in SINGER_MESSAGES] @@ -815,6 +822,12 @@ def test_binlog_stream_with_alteration(self): tap_mysql.do_sync(self.conn, config, self.catalog, self.state) + schema_messages = list(filter(lambda m: isinstance(m, singer.SchemaMessage), SINGER_MESSAGES)) + + for schema_msg in schema_messages: + for prop, val in schema_msg.schema['properties'].items(): + self.assertIn('type', val, f'property "{prop}" has no "type" in stream "{schema_msg.stream}"') + record_messages = list(filter(lambda m: isinstance(m, singer.RecordMessage), SINGER_MESSAGES)) message_types = [type(m) for m in SINGER_MESSAGES]