-
Notifications
You must be signed in to change notification settings - Fork 100
Conversation
Hey @mvgijssel, thanks for this PR! I more or less understand the need behind this PR, we do the same thing but in Pipelinewise FastSync. But just for my own understanding, the outcome you want to have is that for a data pipeline that has incremental, full table and log based streams:
is this correct? |
Didn't know about this! I'm definitely going to check this out
Yes! We want to bulk of the work to be on the replica and only the wal processing on the primary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Samira-El What do you think about these changes? They do seem useful. We are planning to deploy Pipelinewise to replicate from Postgres to BigQuery here at Thread. We are largely planning to use logical replication, however, we have some views which we would also like to replicate and being able to force those to use the replica would be good.
'replica_user': args.config['replica_user'], | ||
'replica_password': args.config['replica_password'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could these not default to the same credentials as the primary? Similar to: https://github.com/transferwise/pipelinewise/blob/206e75e630933d1a2b2ab9afb36261a55ccf0e4c/pipelinewise/fastsync/commons/tap_postgres.py#L161-L164
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think that would be a great default!
@@ -38,14 +38,27 @@ def fully_qualified_table_name(schema, table): | |||
return '"{}"."{}"'.format(canonicalize_identifier(schema), canonicalize_identifier(table)) | |||
|
|||
|
|||
def open_connection(conn_config, logical_replication=False): | |||
def open_connection(conn_config, logical_replication=False, primary_connection=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use prioritize_primary
similar to: https://github.com/transferwise/pipelinewise/blob/206e75e630933d1a2b2ab9afb36261a55ccf0e4c/pipelinewise/fastsync/commons/tap_postgres.py#L129
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think that's a good idea!
@@ -116,8 +116,7 @@ def sync_method_for_streams(streams, state, default_replication_method): | |||
continue | |||
|
|||
if replication_method == 'LOG_BASED' and stream_metadata.get((), {}).get('is-view'): | |||
raise Exception(f'Logical Replication is NOT supported for views. ' \ | |||
f'Please change the replication method for {stream["tap_stream_id"]}') | |||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the behaviour here changed? This exception is useful isn't it? Rather than failing silently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah sorry this is a specific hack for HackerOne, this shouldn't be part of this PR! Same for the changes related to TOAST'ed Postgres values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mvgijssel Does open_connection
not need changing elsewhere to use use_replica
to determine whether to connect to the replica or not rather than defaulting to the replica? Otherwise, what is the point in the use_replica
flag?
@mvgijssel I hope you don't mind that I've opened a separate PR with just the replica option changes + some documentation. I hope the change is more likely to get merged with documentation and as a standalone change. |
No worries! Happy this is being picked up ❤️ |
7516ab5
to
a073f5c
Compare
Hey @mvgijssel, this feature is now available thanks to Judah's PR #145 which was more minimal in changes. |
Problem
In our data architecture we don't want to extract data directly from the primary Postgres instance as this can have (big) impact on the live production system. Unfortunately, as stated in the README, logical replication doesn't work for Postgres read replicas.
Proposed changes
To benefit from log based syncs and not do the bulk of the syncing on the primary instance the proposed change is to add an otion for a read replica which will be used for all the traditional streams. This means that incremental and full table syncs will do to the read replica, which will happen for the initial sync, and the log based syncs go to the primary.
NOTE: This change includes the changes from #130 because HackerOne relies on those changes as well
Types of changes
What types of changes does your code introduce to PipelineWise?
Put an
x
in the boxes that applyChecklist
setup.py
is an individual PR and not mixed with feature or bugfix PRs[AP-NNNN]
(if applicable. AP-NNNN = JIRA ID)AP-NNN
(if applicable. AP-NNN = JIRA ID)