Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Trick Postgres into more efficiant use of index (#189)
Browse files Browse the repository at this point in the history
  • Loading branch information
louis-pie authored Sep 9, 2022
1 parent e7db01e commit 4c29e65
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
1.8.4 (2022-09-08)
-------------------
**Changes**
- INCREMENTAL: Use sub-query to trick PostreSQL into more efficient use of index.

1.8.3 (2022-01-18)
-------------------
**Fixes**
Expand Down Expand Up @@ -48,7 +53,7 @@
Fix data loss issue when running `LOG_BASED` due to the tap not sending new SCHEMA singer messages when source tables change structure, mainly new/renamed columns, which causes the target to not be up to date with the stream structure.
The tap now:
* Runs discovery for selected stream at the beginning of sync to send up to date SCHEMA singer messages
* When new columns are detected in WAL payloads, then run discovery for the stream and send new SCHEMA message.
* When new columns are detected in WAL payloads, then run discovery for the stream and send new SCHEMA message.

1.6.2 (2020-05-18)
-------------------
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = f.read()

setup(name='pipelinewise-tap-postgres',
version='1.8.3',
version='1.8.4',
description='Singer.io tap for extracting data from PostgresSQL - PipelineWise compatible',
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
18 changes: 11 additions & 7 deletions tap_postgres/sync_strategies/incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,17 @@ def _get_select_sql(params):
stream = params['stream']
if replication_key_value:
select_sql = f"""
SELECT {','.join(escaped_columns)}
FROM {post_db.fully_qualified_table_name(schema_name, stream['table_name'])}
WHERE {post_db.prepare_columns_sql(replication_key)} >= '{replication_key_value}'::{replication_key_sql_datatype}
ORDER BY {post_db.prepare_columns_sql(replication_key)} ASC"""
SELECT {','.join(escaped_columns)}
FROM (
SELECT *
FROM {post_db.fully_qualified_table_name(schema_name, stream['table_name'])}
WHERE {post_db.prepare_columns_sql(replication_key)} >= '{replication_key_value}'::{replication_key_sql_datatype}
ORDER BY {post_db.prepare_columns_sql(replication_key)} ASC
) pg_speedup_trick"""
else:
# if not replication_key_value
select_sql = f"""SELECT {','.join(escaped_columns)}
FROM {post_db.fully_qualified_table_name(schema_name, stream['table_name'])}
ORDER BY {post_db.prepare_columns_sql(replication_key)} ASC"""
select_sql = f"""
SELECT {','.join(escaped_columns)}
FROM {post_db.fully_qualified_table_name(schema_name, stream['table_name'])}
"""
return select_sql

0 comments on commit 4c29e65

Please sign in to comment.