Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Perform logical replication after initial sync
Browse files Browse the repository at this point in the history
  • Loading branch information
judahrand committed Jan 18, 2022
1 parent 6dc8394 commit b2857b5
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
4 changes: 4 additions & 0 deletions tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
# finishing previously interrupted full-table (first stage of logical replication)
lookup[stream['tap_stream_id']] = 'logical_initial_interrupted'
traditional_steams.append(stream)
# do any required logical replication after inital sync is complete
logical_streams.append(stream)

# inconsistent state
elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \
Expand All @@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method):
# initial full-table phase of logical replication
lookup[stream['tap_stream_id']] = 'logical_initial'
traditional_steams.append(stream)
# do any required logical replication after inital sync is complete
logical_streams.append(stream)

else: # no xmin but we have an lsn
# initial stage of logical replication(full-table) has been completed. moving onto pure logical replication
Expand Down
14 changes: 10 additions & 4 deletions tests/test_full_table_interruption.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
import unittest.mock
import tap_postgres
import tap_postgres.sync_strategies.full_table as full_table
import tap_postgres.sync_strategies.common as pg_common
Expand Down Expand Up @@ -45,6 +46,7 @@ def do_not_dump_catalog(catalog):
tap_postgres.dump_catalog = do_not_dump_catalog
full_table.UPDATE_BOOKMARK_PERIOD = 1

@unittest.mock.patch('tap_postgres.sync_logical_streams')
class LogicalInterruption(unittest.TestCase):
maxDiff = None

Expand All @@ -62,7 +64,7 @@ def setUp(self):
global CAUGHT_MESSAGES
CAUGHT_MESSAGES.clear()

def test_catalog(self):
def test_catalog(self, mock_sync_logical_streams):
singer.write_message = singer_write_message_no_cow
pg_common.write_schema_message = singer_write_message_ok

Expand Down Expand Up @@ -90,7 +92,7 @@ def test_catalog(self):
insert_record(cur, 'COW', cow_rec)

conn.close()

blew_up_on_cow = False
state = {}
#the initial phase of cows logical replication will be a full table.
Expand All @@ -102,6 +104,8 @@ def test_catalog(self):

self.assertTrue(blew_up_on_cow)

mock_sync_logical_streams.assert_not_called()

self.assertEqual(7, len(CAUGHT_MESSAGES))

self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA')
Expand Down Expand Up @@ -151,6 +155,8 @@ def test_catalog(self):
CAUGHT_MESSAGES.clear()
tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, old_state)

mock_sync_logical_streams.assert_called_once()

self.assertEqual(8, len(CAUGHT_MESSAGES))

self.assertEqual(CAUGHT_MESSAGES[0]['type'], 'SCHEMA')
Expand Down Expand Up @@ -238,7 +244,7 @@ def test_catalog(self):

conn = get_test_connection()
conn.autocommit = True

with conn.cursor() as cur:
cow_rec = {'name': 'betty', 'colour': 'blue'}
insert_record(cur, 'COW', {'name': 'betty', 'colour': 'blue'})
Expand All @@ -256,7 +262,7 @@ def test_catalog(self):

state = {}
blew_up_on_cow = False

#this will sync the CHICKEN but then blow up on the COW
try:
tap_postgres.do_sync(get_test_connection_config(), {'streams' : streams}, None, state)
Expand Down

0 comments on commit b2857b5

Please sign in to comment.