Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Refresh streams schema #129

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

deanmorin
Copy link
Contributor

@deanmorin deanmorin commented Oct 7, 2021

Problem

Cannot override discovered schema with changes to the catalog file. Fixes #128, as well as #101.

Proposed changes

Describe the big picture of your changes here to communicate to the maintainers why we should accept this pull request.
If it fixes a bug or resolves a feature request, be sure to link to that issue.

Discovery is run on each and every sync, and squashes any desired modifications to the stream schema provided in the catalog. This instead considers differences from the catalog and merges them into the discovered stream schema.

I found this when running meltano, with tap-postgres and target-snowflake (both transferwise). I was trying to treat a jsonb as a string rather than an object/array.

A similar issue exists with stream metadata. While working on this I also played around with the metadata and ended up finding #102. I stole the test from that PR and tweaked my fix based on it. I ended up not needing the metadata fix in the end, so let me know if you'd prefer this fix be handled separately.

Types of changes

What types of changes does your code introduce to PipelineWise?
Put an x in the boxes that apply

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation Update (if none of the other choices apply)

Checklist

  • Description above provides context of the change
  • I have added tests that prove my fix is effective or that my feature works
  • Unit tests for changes (not needed for documentation changes)
  • CI checks pass with my changes
  • Bumping version in setup.py is an individual PR and not mixed with feature or bugfix PRs
  • Commit message/PR title starts with [AP-NNNN] (if applicable. AP-NNNN = JIRA ID)
  • Branch name starts with AP-NNN (if applicable. AP-NNN = JIRA ID)
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions

A later commit modifies tests/test_streams_utils.py, and I noticed it
was the only file really using tabs in this repo. Changed it to spaces,
and replaced a few other random ones.
Fixes transferwise#128

- Moves the bulk of the code out of the context manager
  (open_connection) since it isn't needed.
- Creates private functions for merging existing metadata and schema to
  dicts with those in `new_discovery`.
- Aliases `metadata` (from singer) to `metadata_util`. I kept using
  `metadata` as a local var when developing by accident and breaking
  things.
Changed from debug -> info level. I deployed the changes from this PR,
and I wanted to make sure things were working as expected. It was in an
environment with sensitive data, so I wasn't able to simply switch to
debug level since that would lead the sensitive data to the logs. Also
added the column being affected to the message.
@mattbdc
Copy link

mattbdc commented Feb 17, 2022

Would love to see this merged, most of our tables contain a text[] and this is breaking as we cant override the field type

@cdchan
Copy link

cdchan commented Apr 14, 2022

Agree that this would be great to get merged. There's no current way to select individual columns to sync.

Just tagging some of the Wise people: @Samira-El @amofakhar @jeet-parekh-wise anything we could do to get this accepted?

@astrojuanlu
Copy link

For reference, this change was introduced in #58

The current logic seems to be: "pick the selected streams (tables) from the catalog, then refresh the schema of each stream":

def do_sync(conn_config, catalog, default_replication_method, state, state_file=None):
"""
Orchestrates sync of all streams
"""
currently_syncing = singer.get_currently_syncing(state)
streams = list(filter(is_selected_via_metadata, catalog['streams']))
streams.sort(key=lambda s: s['tap_stream_id'])
LOGGER.info("Selected streams: %s ", [s['tap_stream_id'] for s in streams])
if any_logical_streams(streams, default_replication_method):
# Use of logical replication requires fetching an lsn
end_lsn = logical_replication.fetch_current_lsn(conn_config)
LOGGER.debug("end_lsn = %s ", end_lsn)
else:
end_lsn = None
refresh_streams_schema(conn_config, streams)

I arrived here after a long debugging because my "selected": false metadata items were not being respected (essentially #101). This is, in my opinion, very counterintuitive to say the least (from my very short experience with Singer taps).

On the other hand, I think the catalog merging strategy proposed in this pull request might have its own set of problems as well. There is another proposal: in sync mode, if given a catalog, just do what the catalog says (and therefore, decouple the catalog merging problem).

I understand the reasoning behind the current behavior and #58: if the source schema changes, there is potential data loss. However, now tap-postgres is "too smart" about it and we can't even choose how do we want to extract our data, which is annoying.

Would the maintainers accept a simpler PR that does not do a discovery phase if an explicit catalog is provided?

astrojuanlu added a commit to astrojuanlu/pipelinewise-tap-postgres that referenced this pull request May 11, 2022
@visch
Copy link

visch commented Jul 27, 2022

Hit this today, made https://github.com/MeltanoLabs/tap-postgres works for a lot of things (Threw together in literally 15 minutes with the meltano sdk so there's plenty of bugs I"m sure)

@asmisha
Copy link

asmisha commented Oct 13, 2022

Overriding catalog is super-counterintuitive and doesn't let me use this tap when combined with some loaders (s3-csv for instance). I would like to see this merged as well. Is there a way to speed this up?

BTW thanks @deanmorin for the fix, I'll try using it.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Cannot override discovered schema
6 participants