Skip to content

Commit

Permalink
Use should_sync_field from singer
Browse files Browse the repository at this point in the history
  • Loading branch information
John Miller committed Oct 18, 2018
1 parent 0a55ba0 commit 7808fbb
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 61 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
url="http://singer.io",
classifiers=["Programming Language :: Python :: 3 :: Only"],
install_requires=[
"singer-python==5.0.4",
"singer-python==5.3.1",
"pyodbc>4,<5",
],
entry_points="""
Expand Down
60 changes: 1 addition & 59 deletions tap_db2/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,6 @@
LOGGER = singer.get_logger()


def _desired_columns(selected, table_schema):

'''Return the set of column names we need to include in the SELECT.
selected - set of column names marked as selected in the input catalog
table_schema - the most recently discovered Schema for the table
'''
all_columns = set()
available = set()
automatic = set()
unsupported = set()

for column, column_schema in table_schema.properties.items():
all_columns.add(column)
inclusion = column_schema.inclusion
if inclusion == 'automatic':
automatic.add(column)
elif inclusion == 'available':
available.add(column)
elif inclusion == 'unsupported':
unsupported.add(column)
else:
raise Exception('Unknown inclusion ' + inclusion)

selected_but_unsupported = selected.intersection(unsupported)
if selected_but_unsupported:
LOGGER.warning(
'Columns %s were selected but are not supported. Skipping them.',
selected_but_unsupported)

selected_but_nonexistent = selected.difference(all_columns)
if selected_but_nonexistent:
LOGGER.warning(
'Columns %s were selected but do not exist.',
selected_but_nonexistent)

not_selected_but_automatic = automatic.difference(selected)
if not_selected_but_automatic:
LOGGER.warning(
'Columns %s are primary keys but were not selected. Adding them.',
not_selected_but_automatic)

return selected.intersection(available).union(automatic)


def resolve_catalog(catalog, discovered, state):
'''Returns the Catalog of data we're going to sync.
Takes the Catalog we read from the input file and turns it into a
Expand All @@ -66,13 +22,8 @@ def resolve_catalog(catalog, discovered, state):
instance. The result may differ from the input Catalog in the
following ways:
* It will only include streams marked as "selected".
* We will remove any streams and columns that were selected but do
not actually exist in the database right now.
* If the state has a currently_syncing, we will skip to that stream and
drop all streams appearing before it in the catalog.
* We will add any columns that were not selected but should be
automatically included. For example, primary key columns and
columns used as replication keys.
'''

# Filter catalog to include only selected streams
Expand All @@ -97,11 +48,6 @@ def resolve_catalog(catalog, discovered, state):
LOGGER.warning('Database %s table %s was selected but does not exist',
catalog_entry.database, catalog_entry.table)
continue
selected = set([k for k, v in catalog_entry.schema.properties.items()
if v.selected or k == replication_key])

# These are the columns we need to select
columns = _desired_columns(selected, discovered_table.schema)

result.streams.append(CatalogEntry(
tap_stream_id=catalog_entry.tap_stream_id,
Expand All @@ -110,11 +56,7 @@ def resolve_catalog(catalog, discovered, state):
database=catalog_entry.database,
table=catalog_entry.table,
is_view=catalog_entry.is_view,
schema=Schema(
type='object',
properties={col: discovered_table.schema.properties[col]
for col in columns}
)
schema=discovered_table.schema,
))

return result
Expand Down
13 changes: 12 additions & 1 deletion tap_db2/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,19 @@ def _maybe_activate_after_sync(state, catalog_entry, rep_key, stream_version):
return state


def _sync_field(md_map, field_name):
field_metadata = md_map.get(("properties", field_name), {})
return singer.should_sync_field(field_metadata.get('inclusion'),
field_metadata.get('selected'),
True)


def _sync_table(config, state, catalog_entry):
columns = list(catalog_entry.schema.properties)
md_map = metadata.to_map(catalog_entry.metadata)
columns = [
field_name for field_name in catalog_entry.schema.properties.keys()
if _sync_field(md_map, field_name)
]
if not columns:
LOGGER.warning(
"There are no columns selected for table %s, skipping it",
Expand Down

0 comments on commit 7808fbb

Please sign in to comment.