Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use should_sync_field from singer #10

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
url="http://singer.io",
classifiers=["Programming Language :: Python :: 3 :: Only"],
install_requires=[
"singer-python==5.0.4",
"singer-python==5.3.1",
"pyodbc>4,<5",
],
entry_points="""
Expand Down
60 changes: 1 addition & 59 deletions tap_db2/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,6 @@
LOGGER = singer.get_logger()


def _desired_columns(selected, table_schema):

'''Return the set of column names we need to include in the SELECT.
selected - set of column names marked as selected in the input catalog
table_schema - the most recently discovered Schema for the table
'''
all_columns = set()
available = set()
automatic = set()
unsupported = set()

for column, column_schema in table_schema.properties.items():
all_columns.add(column)
inclusion = column_schema.inclusion
if inclusion == 'automatic':
automatic.add(column)
elif inclusion == 'available':
available.add(column)
elif inclusion == 'unsupported':
unsupported.add(column)
else:
raise Exception('Unknown inclusion ' + inclusion)

selected_but_unsupported = selected.intersection(unsupported)
if selected_but_unsupported:
LOGGER.warning(
'Columns %s were selected but are not supported. Skipping them.',
selected_but_unsupported)

selected_but_nonexistent = selected.difference(all_columns)
if selected_but_nonexistent:
LOGGER.warning(
'Columns %s were selected but do not exist.',
selected_but_nonexistent)

not_selected_but_automatic = automatic.difference(selected)
if not_selected_but_automatic:
LOGGER.warning(
'Columns %s are primary keys but were not selected. Adding them.',
not_selected_but_automatic)

return selected.intersection(available).union(automatic)


def resolve_catalog(catalog, discovered, state):
'''Returns the Catalog of data we're going to sync.
Takes the Catalog we read from the input file and turns it into a
Expand All @@ -66,13 +22,8 @@ def resolve_catalog(catalog, discovered, state):
instance. The result may differ from the input Catalog in the
following ways:
* It will only include streams marked as "selected".
* We will remove any streams and columns that were selected but do
not actually exist in the database right now.
* If the state has a currently_syncing, we will skip to that stream and
drop all streams appearing before it in the catalog.
* We will add any columns that were not selected but should be
automatically included. For example, primary key columns and
columns used as replication keys.
'''

# Filter catalog to include only selected streams
Expand All @@ -97,11 +48,6 @@ def resolve_catalog(catalog, discovered, state):
LOGGER.warning('Database %s table %s was selected but does not exist',
catalog_entry.database, catalog_entry.table)
continue
selected = set([k for k, v in catalog_entry.schema.properties.items()
if v.selected or k == replication_key])

# These are the columns we need to select
columns = _desired_columns(selected, discovered_table.schema)

result.streams.append(CatalogEntry(
tap_stream_id=catalog_entry.tap_stream_id,
Expand All @@ -110,11 +56,7 @@ def resolve_catalog(catalog, discovered, state):
database=catalog_entry.database,
table=catalog_entry.table,
is_view=catalog_entry.is_view,
schema=Schema(
type='object',
properties={col: discovered_table.schema.properties[col]
for col in columns}
)
schema=discovered_table.schema,
))

return result
Expand Down
13 changes: 12 additions & 1 deletion tap_db2/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,19 @@ def _maybe_activate_after_sync(state, catalog_entry, rep_key, stream_version):
return state


def _sync_field(md_map, field_name):
field_metadata = md_map.get(("properties", field_name), {})
return singer.should_sync_field(field_metadata.get('inclusion'),
field_metadata.get('selected'),
True)


def _sync_table(config, state, catalog_entry):
columns = list(catalog_entry.schema.properties)
md_map = metadata.to_map(catalog_entry.metadata)
columns = [
field_name for field_name in catalog_entry.schema.properties.keys()
if _sync_field(md_map, field_name)
]
if not columns:
LOGGER.warning(
"There are no columns selected for table %s, skipping it",
Expand Down