Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.
This repository was archived by the owner on Sep 23, 2024. It is now read-only.

Cannot add more tables to existing postgres->snowflake log based extract #107

Open
@deanmorin

Description

@deanmorin

Describe the bug

I'm using log based replication with:

  • pipelinewise-tap-postgres==1.8.0
  • pipelinewise-target-snowflake==1.13.1

Initially I selected just a few tables in postgres. This worked as expected. Later, I tried to select more of the postgres tables. When I did this, it did a full table sync (logical_initial) for the new tables, which was expected. However, the state for these new tables is not emitted, and every time I run after that, it once again does a full table sync. The new tables never use log based replication.

I found this bug when running using meltano, but confirmed it using a standalone singer job.

To Reproduce

Steps to reproduce the behavior:

  1. Create a test postgres database with a couple of tables:

    CREATE TABLE a (a integer PRIMARY KEY);
    CREATE TABLE b (a integer PRIMARY KEY);
    
    INSERT INTO a VALUES (1), (2), (3);
    INSERT INTO b VALUES (1), (2), (3);
    
  2. Create config files for the tap and target, for example:

    tap_config.json

    {
      "host": "127.0.0.1",
      "port": 5432,
      "user": "myuser",
      "pass loword": "mypass",
      "dbname": "test",
      "filter_schemas": "public",
      "logical_poll_total_seconds": 60
    }

    target_config.json

    {
      "account": "myaccount",
      "user": "TEST_ROBOT",
      "password": "mypass",
      "warehouse": "TEST",
      "dbname": "TEST",
      "role": "TEST",
      "file_format": "FILE_FORMATS.CSV",
      "default_target_schema": "singer"
    }
  3. Install the tap and create catalog.json

    $ mkvirtualenv tap-postgres
    $ pip install pipelinewise-tap-postgres==1.8.0
    $ tap-postgres --config tap_config.json --discover > catalog.json
    # Add to the "public-a" section of the catalog:
    #             "selected": true,
    #             "replication-method": "LOG_BASED",
    $ deactivate
  4. Install the target

    $ mkvirtualenv target-snowflake
    $ pip install pipelinewise-target-snowflake==1.13.1
    $ deactivate
  5. Do the initial run

    $ ~/.virtualenvs/tap-postgres/bin/tap-postgres \
          --config tap_config.json \
          --properties catalog.json \
        | ~/.virtualenvs/target-snowflake/bin/target-snowflake \
          --config target_config.json \
        >> state.json
  6. Second run to show that "public-a" is now using log based replication

    $ tail -1 state.json > state.json.tmp && mv state.json.tmp state.json \
        && ~/.virtualenvs/tap-postgres/bin/tap-postgres \
          --state state.json \
          --config tap_config.json \
          --properties catalog.json \
        | ~/.virtualenvs/target-snowflake/bin/target-snowflake \
          --config target_config.json \
        >> state.json
  7. Add a new table to the selected tables and run again

    # Add to the "public-b" section of the catalog:
    #             "selected": true,
    #             "replication-method": "LOG_BASED",
    $ tail -1 state.json > state.json.tmp && mv state.json.tmp state.json \
        && ~/.virtualenvs/tap-postgres/bin/tap-postgres \
          --state state.json \
          --config tap_config.json \
          --properties catalog.json \
        | ~/.virtualenvs/target-snowflake/bin/target-snowflake \
          --config target_config.json \
        >> state.json

You'll see this in the logs:

Beginning sync of stream(public-b) with sync method(logical_initial)
Performing initial full table sync 

Later, the target will show the expected state:

Setting state to {'currently_syncing': 'public-b', 'bookmarks': {'public-a': {'last_replication_method': 'LOG_BASED', 'lsn': 37570216, 'version': 1626456439935, 'xmin': None}, 'public-b': {'last_replication_method': 'LOG_BASED', 'lsn': 37570744, 'version': 1626583810479}}}
...
Setting state to {'currently_syncing': None, 'bookmarks': {'public-a': {'last_replication_method': 'LOG_BASED', 'lsn': 37570216, 'version': 1626456439935, 'xmin': None}, 'public-b': {'last_replication_method': 'LOG_BASED', 'lsn': 37570744, 'version': 1626583810479, 'xmin': None}}}

However, then it moves on to reading from the replication slot for the existing table(s). After that's done, the state has "lost" the new table:

Setting state to {'currently_syncing': None, 'bookmarks': {'public-a': {'last_replication_method': 'LOG_BASED', 'lsn': 37570216, 'version': 1626456439935, 'xmin': None}}}

Unfortunately, this is the state that gets emitted:

Emitting state {"currently_syncing": null, "bookmarks": {"public-a": {"last_replication_method": "LOG_BASED", "lsn": 37570216, "version": 1626456439935, "xmin": null}}}

Because of this, the next time you run, it'll start with the "logical_initial" full table sync again for the new table, and every time you run after that as well.

Expected behavior
The state for public-b should be emitted by the target.

Screenshots
If applicable, add screenshots to help explain your problem.

Your environment

  • Version of target: [e.g. 1.13.1]
  • Version of python [e.g. 3.8.9]

Additional context
From this slack thread, it sounds like this may not be encountered when running using pipelinewise, since "FastSync" is used for the initial sync.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions