diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..95fbeb42 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,17 @@ +FROM docker.io/bitnami/postgresql:12 + +USER root + +# Git SHA of v2.4 +ENV WAL2JSON_COMMIT_ID=36fbee6cbb7e4bc1bf9ee4f55842ab51393e3ac0 + +# Compile the plugins from sources and install +RUN install_packages ca-certificates curl gcc git gnupg make pkgconf && \ + curl https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor | tee /etc/apt/trusted.gpg.d/apt.postgresql.org.gpg >/dev/null && \ + echo "deb http://apt.postgresql.org/pub/repos/apt buster-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \ + install_packages postgresql-server-dev-12 && \ + git clone https://github.com/eulerto/wal2json -b master --single-branch && \ + (cd /wal2json && git checkout $WAL2JSON_COMMIT_ID && make && make install) && \ + rm -rf wal2json + +USER 1001 diff --git a/docker-compose.yml b/docker-compose.yml index c416b85b..9503fe51 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,18 +2,17 @@ version: "3.3" services: db_primary: - image: "docker.io/bitnami/postgresql:12" + build: . container_name: "primary" ports: - "5432:5432" environment: - POSTGRESQL_REPLICATION_MODE=master - - POSTGRESQL_REPLICATION_USER=repl_user - - POSTGRESQL_REPLICATION_PASSWORD=repl_password - - POSTGRES_USER=test_user - - POSTGRES_PASSWORD=my-secret-passwd + - POSTGRESQL_REPLICATION_USER=test_user + - POSTGRESQL_REPLICATION_PASSWORD=my-secret-passwd - POSTGRESQL_POSTGRES_PASSWORD=my-secret-passwd - POSTGRESQL_DATABASE=tap_postgres_test + - POSTGRESQL_WAL_LEVEL=logical - ALLOW_EMPTY_PASSWORD=yes db_replica: image: "docker.io/bitnami/postgresql:12" @@ -24,8 +23,8 @@ services: - db_primary environment: - POSTGRESQL_REPLICATION_MODE=slave - - POSTGRESQL_REPLICATION_USER=repl_user - - POSTGRESQL_REPLICATION_PASSWORD=repl_password + - POSTGRESQL_REPLICATION_USER=test_user + - POSTGRESQL_REPLICATION_PASSWORD=my-secret-passwd - POSTGRESQL_MASTER_HOST=db_primary - POSTGRESQL_MASTER_PORT_NUMBER=5432 - ALLOW_EMPTY_PASSWORD=yes diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index 5c7c2ac1..2cafea11 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method): # finishing previously interrupted full-table (first stage of logical replication) lookup[stream['tap_stream_id']] = 'logical_initial_interrupted' traditional_steams.append(stream) + # do any required logical replication after inital sync is complete + logical_streams.append(stream) # inconsistent state elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \ @@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method): # initial full-table phase of logical replication lookup[stream['tap_stream_id']] = 'logical_initial' traditional_steams.append(stream) + # do any required logical replication after inital sync is complete + logical_streams.append(stream) else: # no xmin but we have an lsn # initial stage of logical replication(full-table) has been completed. moving onto pure logical replication diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index 8056617a..319d7c78 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -680,7 +680,8 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): LOGGER.debug('Unable to open and parse %s', state_file) finally: lsn_comitted = min( - [get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams]) + get_bookmark(state_comitted, s['tap_stream_id'], 'lsn', start_lsn) for s in logical_streams + ) if (lsn_currently_processing > lsn_comitted) and (lsn_comitted > lsn_to_flush): lsn_to_flush = lsn_comitted LOGGER.info('Confirming write up to %s, flush to %s', diff --git a/tests/test_full_table_interruption.py b/tests/test_full_table_interruption.py index c1417468..fce78146 100644 --- a/tests/test_full_table_interruption.py +++ b/tests/test_full_table_interruption.py @@ -1,8 +1,8 @@ -import re import psycopg2 import unittest.mock import pytest import tap_postgres +from tap_postgres.sync_strategies import logical_replication import tap_postgres.sync_strategies.full_table as full_table import tap_postgres.sync_strategies.common as pg_common import singer @@ -49,10 +49,29 @@ def do_not_dump_catalog(catalog): full_table.UPDATE_BOOKMARK_PERIOD = 1 @pytest.mark.parametrize('use_secondary', [False, True]) +@unittest.mock.patch('tap_postgres.sync_logical_streams', wraps=tap_postgres.sync_logical_streams) @unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) class TestLogicalInterruption: maxDiff = None + def setup_class(self): + conn_config = get_test_connection_config() + slot_name = logical_replication.generate_replication_slot_name( + dbname=conn_config['dbname'], tap_id=conn_config['tap_id'] + ) + with get_test_connection(superuser=True) as conn: + with conn.cursor() as cur: + cur.execute(f"SELECT * FROM pg_create_logical_replication_slot('{slot_name}', 'wal2json')") + + def teardown_class(self): + conn_config = get_test_connection_config() + slot_name = logical_replication.generate_replication_slot_name( + dbname=conn_config['dbname'], tap_id=conn_config['tap_id'] + ) + with get_test_connection(superuser=True) as conn: + with conn.cursor() as cur: + cur.execute(f"SELECT * FROM pg_drop_replication_slot('{slot_name}')") + def setup_method(self): table_spec_1 = {"columns": [{"name": "id", "type" : "serial", "primary_key" : True}, {"name" : 'name', "type": "character varying"}, @@ -67,7 +86,7 @@ def setup_method(self): global CAUGHT_MESSAGES CAUGHT_MESSAGES.clear() - def test_catalog(self, mock_connect, use_secondary): + def test_catalog(self, mock_connect, mock_sync_logical_streams, use_secondary): singer.write_message = singer_write_message_no_cow pg_common.write_schema_message = singer_write_message_ok @@ -75,7 +94,16 @@ def test_catalog(self, mock_connect, use_secondary): streams = tap_postgres.do_discovery(conn_config) # Assert that we connected to the correct database - expected_connection = { + primary_connection = { + 'application_name': unittest.mock.ANY, + 'dbname': unittest.mock.ANY, + 'user': unittest.mock.ANY, + 'password': unittest.mock.ANY, + 'connect_timeout':unittest.mock.ANY, + 'host': conn_config['host'], + 'port': conn_config['port'], + } + secondary_connection = { 'application_name': unittest.mock.ANY, 'dbname': unittest.mock.ANY, 'user': unittest.mock.ANY, @@ -84,8 +112,8 @@ def test_catalog(self, mock_connect, use_secondary): 'host': conn_config['secondary_host'] if use_secondary else conn_config['host'], 'port': conn_config['secondary_port'] if use_secondary else conn_config['port'], } - mock_connect.assert_called_once_with(**expected_connection) - mock_connect.reset_mock() + + mock_connect.assert_called_once_with(**secondary_connection) cow_stream = [s for s in streams if s['table_name'] == 'COW'][0] assert cow_stream is not None @@ -114,6 +142,7 @@ def test_catalog(self, mock_connect, use_secondary): state = {} #the initial phase of cows logical replication will be a full table. #it will sync the first record and then blow up on the 2nd record + mock_connect.reset_mock() try: tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, state) except Exception: @@ -121,8 +150,12 @@ def test_catalog(self, mock_connect, use_secondary): assert blew_up_on_cow is True - mock_connect.assert_called_with(**expected_connection) - mock_connect.reset_mock() + mock_sync_logical_streams.assert_not_called() + + mock_connect.assert_has_calls( + [unittest.mock.call(**primary_connection)] * 2 + \ + [unittest.mock.call(**secondary_connection)] * 4 + ) assert 7 == len(CAUGHT_MESSAGES) @@ -171,12 +204,17 @@ def test_catalog(self, mock_connect, use_secondary): global COW_RECORD_COUNT COW_RECORD_COUNT = 0 CAUGHT_MESSAGES.clear() + mock_connect.reset_mock() tap_postgres.do_sync(get_test_connection_config(use_secondary=use_secondary), {'streams' : streams}, None, old_state) - mock_connect.assert_called_with(**expected_connection) - mock_connect.reset_mock() + mock_sync_logical_streams.assert_called_once() + + mock_connect.assert_has_calls( + [unittest.mock.call(**primary_connection)] * 2 + \ + [unittest.mock.call(**secondary_connection)] * 4 + ) - assert 8 == len(CAUGHT_MESSAGES) + assert 10 == len(CAUGHT_MESSAGES) assert CAUGHT_MESSAGES[0]['type'] == 'SCHEMA' @@ -225,6 +263,13 @@ def test_catalog(self, mock_connect, use_secondary): assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('lsn') == end_lsn assert CAUGHT_MESSAGES[7].value['bookmarks']['public-COW'].get('version') == new_version + assert CAUGHT_MESSAGES[8]['type'] == 'SCHEMA' + + assert isinstance(CAUGHT_MESSAGES[9], singer.messages.StateMessage) + assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('xmin') is None + assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('lsn') == end_lsn + assert CAUGHT_MESSAGES[9].value['bookmarks']['public-COW'].get('version') == new_version + @pytest.mark.parametrize('use_secondary', [False, True]) @unittest.mock.patch('psycopg2.connect', wraps=psycopg2.connect) class TestFullTableInterruption: diff --git a/tests/utils.py b/tests/utils.py index 19d78060..78de5ee0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -53,7 +53,11 @@ def __init__(self, *args, **kwargs): def get_test_connection_config(target_db='postgres', use_secondary=False): try: - conn_config = {'host': os.environ['TAP_POSTGRES_HOST'], + conn_config = {'tap_id': 'test-postgres', + 'max_run_seconds': 5, + 'break_at_end_lsn': True, + 'logical_poll_total_seconds': 2, + 'host': os.environ['TAP_POSTGRES_HOST'], 'user': os.environ['TAP_POSTGRES_USER'], 'password': os.environ['TAP_POSTGRES_PASSWORD'], 'port': os.environ['TAP_POSTGRES_PORT'],