Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Perform logical replication after initial sync #144

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
17 changes: 17 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM docker.io/bitnami/postgresql:12

USER root

# Git SHA of v2.4
ENV WAL2JSON_COMMIT_ID=36fbee6cbb7e4bc1bf9ee4f55842ab51393e3ac0

# Compile the plugins from sources and install
RUN install_packages ca-certificates curl gcc git gnupg make pkgconf && \
curl https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor | tee /etc/apt/trusted.gpg.d/apt.postgresql.org.gpg >/dev/null && \
echo "deb http://apt.postgresql.org/pub/repos/apt buster-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \
install_packages postgresql-server-dev-12 && \
git clone https://github.com/eulerto/wal2json -b master --single-branch && \
(cd /wal2json && git checkout $WAL2JSON_COMMIT_ID && make && make install) && \
rm -rf wal2json

USER 1001
13 changes: 6 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ version: "3.3"

services:
db_primary:
image: "docker.io/bitnami/postgresql:12"
build: .
container_name: "primary"
ports:
- "5432:5432"
environment:
- POSTGRESQL_REPLICATION_MODE=master
- POSTGRESQL_REPLICATION_USER=repl_user
- POSTGRESQL_REPLICATION_PASSWORD=repl_password
- POSTGRES_USER=test_user
- POSTGRES_PASSWORD=my-secret-passwd
- POSTGRESQL_REPLICATION_USER=test_user
- POSTGRESQL_REPLICATION_PASSWORD=my-secret-passwd
- POSTGRESQL_POSTGRES_PASSWORD=my-secret-passwd
- POSTGRESQL_DATABASE=tap_postgres_test
- POSTGRESQL_WAL_LEVEL=logical
- ALLOW_EMPTY_PASSWORD=yes
db_replica:
image: "docker.io/bitnami/postgresql:12"
Expand All @@ -24,8 +23,8 @@ services:
- db_primary
environment:
- POSTGRESQL_REPLICATION_MODE=slave
- POSTGRESQL_REPLICATION_USER=repl_user
- POSTGRESQL_REPLICATION_PASSWORD=repl_password
- POSTGRESQL_REPLICATION_USER=test_user
- POSTGRESQL_REPLICATION_PASSWORD=my-secret-passwd
- POSTGRESQL_MASTER_HOST=db_primary
- POSTGRESQL_MASTER_PORT_NUMBER=5432
- ALLOW_EMPTY_PASSWORD=yes
4 changes: 4 additions & 0 deletions tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
# finishing previously interrupted full-table (first stage of logical replication)
lookup[stream['tap_stream_id']] = 'logical_initial_interrupted'
traditional_steams.append(stream)
# do any required logical replication after inital sync is complete
logical_streams.append(stream)

# inconsistent state
elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \
Expand All @@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
# initial full-table phase of logical replication
lookup[stream['tap_stream_id']] = 'logical_initial'
traditional_steams.append(stream)
# do any required logical replication after inital sync is complete
logical_streams.append(stream)

else: # no xmin but we have an lsn
# initial stage of logical replication(full-table) has been completed. moving onto pure logical replication
Expand Down
3 changes: 2 additions & 1 deletion tap_postgres/sync_strategies/logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,8 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
LOGGER.debug('Unable to open and parse %s', state_file)
finally:
lsn_comitted = min(
[get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams])
get_bookmark(state_comitted, s['tap_stream_id'], 'lsn', start_lsn) for s in logical_streams
)
if (lsn_currently_processing > lsn_comitted) and (lsn_comitted > lsn_to_flush):
lsn_to_flush = lsn_comitted
LOGGER.info('Confirming write up to %s, flush to %s',
Expand Down
65 changes: 55 additions & 10 deletions tests/test_full_table_interruption.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import re
import psycopg2
import unittest.mock
import pytest
import tap_postgres
from tap_postgres.sync_strategies import logical_replication
import tap_postgres.sync_strategies.full_table as full_table
import tap_postgres.sync_strategies.common as pg_common
import singer
Expand Down Expand Up @@ -49,10 +49,29 @@ def do_not_dump_catalog(catalog):
full_table.UPDATE_BOOKMARK_PERIOD = 1

@pytest.mark.parametrize('use_secondary', [False, True])
@unittest.mock.patch('tap_postgres.sync_logical_streams', wraps=tap_postgres.sync_logical_streams)
@unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect)
class TestLogicalInterruption:
maxDiff = None

def setup_class(self):
conn_config = get_test_connection_config()
slot_name = logical_replication.generate_replication_slot_name(
dbname=conn_config['dbname'], tap_id=conn_config['tap_id']
)
with get_test_connection(superuser=True) as conn:
with conn.cursor() as cur:
cur.execute(f"SELECT * FROM pg_create_logical_replication_slot('{slot_name}', 'wal2json')")

def teardown_class(self):
conn_config = get_test_connection_config()
slot_name = logical_replication.generate_replication_slot_name(
dbname=conn_config['dbname'], tap_id=conn_config['tap_id']
)
with get_test_connection(superuser=True) as conn:
with conn.cursor() as cur:
cur.execute(f"SELECT * FROM pg_drop_replication_slot('{slot_name}')")

def setup_method(self):
table_spec_1 = {"columns": [{"name": "id", "type" : "serial", "primary_key" : True},
{"name" : 'name', "type": "character varying"},
Expand All @@ -67,15 +86,24 @@ def setup_method(self):
global CAUGHT_MESSAGES
CAUGHT_MESSAGES.clear()

def test_catalog(self, mock_connect, use_secondary):
def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary):
singer.write_message = singer_write_message_no_cow
pg_common.write_schema_message = singer_write_message_ok

conn_config = get_test_connection_config(use_secondary=use_secondary)
streams = tap_postgres.do_discovery(conn_config)

# Assert that we connected to the correct database
expected_connection = {
primary_connection = {
'application_name': unittest.mock.ANY,
'dbname': unittest.mock.ANY,
'user': unittest.mock.ANY,
'password': unittest.mock.ANY,
'connect_timeout':unittest.mock.ANY,
'host': conn_config['host'],
'port': conn_config['port'],
}
secondary_connection = {
'application_name': unittest.mock.ANY,
'dbname': unittest.mock.ANY,
'user': unittest.mock.ANY,
Expand All @@ -84,8 +112,8 @@ def test_catalog(self, mock_connect, use_secondary):
'host': conn_config['secondary_host'] if use_secondary else conn_config['host'],
'port': conn_config['secondary_port'] if use_secondary else conn_config['port'],
}
mock_connect.assert_called_once_with(**expected_connection)
mock_connect.reset_mock()

mock_connect.assert_called_once_with(**secondary_connection)

cow_stream = [s for s in streams if s['table_name'] == 'COW'][0]
assert cow_stream is not None
Expand Down Expand Up @@ -114,15 +142,20 @@ def test_catalog(self, mock_connect, use_secondary):
state = {}
#the initial phase of cows logical replication will be a full table.
#it will sync the first record and then blow up on the 2nd record
mock_connect.reset_mock()
try:
tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, state)
except Exception:
blew_up_on_cow = True

assert blew_up_on_cow is True

mock_connect.assert_called_with(**expected_connection)
mock_connect.reset_mock()
mock_sync_logical_streams.assert_not_called()

mock_connect.assert_has_calls(
[unittest.mock.call(**primary_connection)] * 2 + \
[unittest.mock.call(**secondary_connection)] * 4
)

assert 7 == len(CAUGHT_MESSAGES)

Expand Down Expand Up @@ -171,12 +204,17 @@ def test_catalog(self, mock_connect, use_secondary):
global COW_RECORD_COUNT
COW_RECORD_COUNT = 0
CAUGHT_MESSAGES.clear()
mock_connect.reset_mock()
tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, old_state)

mock_connect.assert_called_with(**expected_connection)
mock_connect.reset_mock()
mock_sync_logical_streams.assert_called_once()

mock_connect.assert_has_calls(
[unittest.mock.call(**primary_connection)] * 2 + \
[unittest.mock.call(**secondary_connection)] * 4
)

assert 8 == len(CAUGHT_MESSAGES)
assert 10 == len(CAUGHT_MESSAGES)

assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA'

Expand Down Expand Up @@ -225,6 +263,13 @@ def test_catalog(self, mock_connect, use_secondary):
assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('lsn') == end_lsn
assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('version') == new_version

assert CAUGHT_MESSAGES[8]['type'] == 'SCHEMA'

assert isinstance(CAUGHT_MESSAGES[9], singer.messages.StateMessage)
assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('xmin') is None
assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('lsn') == end_lsn
assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('version') == new_version

Comment on lines +266 to +272
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use TestCase assert* methods instead?

Copy link

@josescuderoh josescuderoh Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Samira-El I'm facing this issue and planning on contributing the fix. Is your feedback related to line 268 only? That is change it to assertIsInstance(CAUGHT_MESSAGES[9], singer.messages.StateMessage). Asking this since I see all other assert statements don't use TestCase either.

@pytest.mark.parametrize('use_secondary', [False, True])
@unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect)
class TestFullTableInterruption:
Expand Down
6 changes: 5 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ def __init__(self, *args, **kwargs):

def get_test_connection_config(target_db='postgres', use_secondary=False):
try:
conn_config = {'host': os.environ['TAP_POSTGRES_HOST'],
conn_config = {'tap_id': 'test-postgres',
'max_run_seconds': 5,
'break_at_end_lsn': True,
'logical_poll_total_seconds': 2,
'host': os.environ['TAP_POSTGRES_HOST'],
'user': os.environ['TAP_POSTGRES_USER'],
'password': os.environ['TAP_POSTGRES_PASSWORD'],
'port': os.environ['TAP_POSTGRES_PORT'],
Expand Down