diff --git a/tests/units/fastsync/commons/test_fastsync_tap_postgres.py b/tests/units/fastsync/commons/test_fastsync_tap_postgres.py index 7dafa9b34..d26e168c7 100644 --- a/tests/units/fastsync/commons/test_fastsync_tap_postgres.py +++ b/tests/units/fastsync/commons/test_fastsync_tap_postgres.py @@ -77,10 +77,15 @@ def execute_mock(query): # mock cursor with execute method cursor_mock = MagicMock().return_value cursor_mock.__enter__.return_value.execute.side_effect = execute_mock + cursor_mock.__enter__.return_value.fetchone.return_value = ( + 'pipelinewise_test_database_test_tap', '242FC/BA84A740', '00000009-0192A151-1', 'wal2json' + ) type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(return_value=0) # mock PG connection instance with ability to open cursor pg_con = Mock() + pg_con.__enter__ = lambda *_: pg_con + pg_con.__exit__ = lambda *_: None pg_con.cursor.return_value = cursor_mock self.postgres.primary_host_conn = pg_con @@ -88,8 +93,10 @@ def execute_mock(query): self.postgres.create_replication_slot() assert self.postgres.executed_queries_primary_host == [ "SELECT * FROM pg_replication_slots WHERE slot_name = 'pipelinewise_test_database';", - "SELECT * FROM pg_create_logical_replication_slot('pipelinewise_test_database_test_tap', 'wal2json')", ] + cursor_mock.__enter__.return_value.create_replication_slot.assert_called_once_with( + 'pipelinewise_test_database_test_tap', output_plugin='wal2json' + ) def test_create_replication_slot_2(self): """ @@ -103,10 +110,15 @@ def execute_mock(query): # mock cursor with execute method cursor_mock = MagicMock().return_value cursor_mock.__enter__.return_value.execute.side_effect = execute_mock + cursor_mock.__enter__.return_value.fetchone.return_value = ( + 'pipelinewise_test_database_test_tap', '242FC/BA84A740', '00000009-0192A151-1', 'wal2json' + ) type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(return_value=1) # mock PG connection instance with ability to open cursor pg_con = Mock() + pg_con.__enter__ = lambda *_: pg_con + pg_con.__exit__ = lambda *_: None pg_con.cursor.return_value = cursor_mock self.postgres.primary_host_conn = pg_con @@ -114,8 +126,10 @@ def execute_mock(query): self.postgres.create_replication_slot() assert self.postgres.executed_queries_primary_host == [ "SELECT * FROM pg_replication_slots WHERE slot_name = 'pipelinewise_test_database';", - "SELECT * FROM pg_create_logical_replication_slot('pipelinewise_test_database', 'wal2json')", ] + cursor_mock.__enter__.return_value.create_replication_slot.assert_called_once_with( + 'pipelinewise_test_database', output_plugin='wal2json' + ) @patch('pipelinewise.fastsync.commons.tap_postgres.psycopg2.connect') def test_get_connection_to_primary(self, connect_mock): @@ -253,6 +267,8 @@ def execute_mock(query): # mock PG connection instance with ability to open cursor pg_con = Mock() + pg_con.__enter__ = lambda *_: pg_con + pg_con.__exit__ = lambda *_: None pg_con.cursor.return_value = cursor_mock connect_mock.return_value = pg_con @@ -261,8 +277,7 @@ def execute_mock(query): assert self.postgres.executed_queries_primary_host == [ "SELECT * FROM pg_replication_slots WHERE slot_name = 'pipelinewise_my_db';", - 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE ' - "slot_name = 'pipelinewise_my_db';", + 'DROP_REPLICATION_SLOT "pipelinewise_my_db"', ] @patch('pipelinewise.fastsync.commons.tap_postgres.psycopg2.connect') @@ -294,6 +309,8 @@ def execute_mock(query): # mock PG connection instance with ability to open cursor pg_con = Mock() + pg_con.__enter__ = lambda *_: pg_con + pg_con.__exit__ = lambda *_: None pg_con.cursor.return_value = cursor_mock connect_mock.return_value = pg_con @@ -302,8 +319,7 @@ def execute_mock(query): assert self.postgres.executed_queries_primary_host == [ "SELECT * FROM pg_replication_slots WHERE slot_name = 'pipelinewise_my_db';", - 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE ' - "slot_name = 'pipelinewise_my_db_tap_test';", + 'DROP_REPLICATION_SLOT "pipelinewise_my_db_tap_test"', ] def test_fetch_current_incremental_key_pos_empty_result_expect_exception(self):