Skip to content

Commit

Permalink
Handle timestamp (nothing to do with datetime -- SQL Server rowversio…
Browse files Browse the repository at this point in the history
…n) incremental replication
  • Loading branch information
mjsqu committed Oct 10, 2024
1 parent b82bafb commit c7aead9
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 12 deletions.
6 changes: 5 additions & 1 deletion tap_mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

DECIMAL_TYPES = set(["decimal", "number", "money", "smallmoney", "numeric"])

DATETIME_TYPES = set(["datetime2", "datetime", "datetimeoffset", "timestamp", "smalldatetime"])
DATETIME_TYPES = set(["datetime2", "datetime", "datetimeoffset", "smalldatetime"])

DATE_TYPES = set(["date"])

Expand Down Expand Up @@ -101,6 +101,10 @@ def schema_for_column(c, config):

if data_type == "bit":
result.type = ["null", "boolean"]

elif data_type in ["timestamp", "rowversion"]:
result.type = ["null", "string"]
result.format = "rowversion"

elif data_type in BYTES_FOR_INTEGER_TYPE:
result.type = ["null", "integer"]
Expand Down
14 changes: 11 additions & 3 deletions tap_mssql/sync_strategies/incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,19 @@ def sync_table(mssql_conn, config, catalog_entry, state, columns):
replication_key_value = datetime.fromtimestamp(
pendulum.parse(replication_key_value).timestamp()
)
# Handle timestamp incremental (timestamp)
if catalog_entry.schema.properties[replication_key_metadata].format == 'rowversion':
select_sql += """ WHERE CAST("{}" AS BIGINT) >=
convert(bigint, convert (varbinary(8), '0x{}', 1))
ORDER BY "{}" ASC""".format(
replication_key_metadata, replication_key_value, replication_key_metadata
)

else:
select_sql += ' WHERE "{}" >= %(replication_key_value)s ORDER BY "{}" ASC'.format(
replication_key_metadata, replication_key_metadata
)

select_sql += ' WHERE "{}" >= %(replication_key_value)s ORDER BY "{}" ASC'.format(
replication_key_metadata, replication_key_metadata
)

params["replication_key_value"] = replication_key_value
elif replication_key_metadata is not None:
Expand Down
93 changes: 85 additions & 8 deletions tests/test_tap_mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,15 +561,15 @@ def test_with_no_state(self):

(message_types, versions) = message_types_and_versions(SINGER_MESSAGES)

self.assertEqual(
[
"ActivateVersionMessage",
"RecordMessage",
],
sorted(list(set(message_types))),
)

self.assertTrue(isinstance(versions[0], int))
self.assertEqual(versions[0], versions[1])
record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)]
incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-incremental']
integer_incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-integer_incremental']

self.assertEqual(len(incremental_record_messages),3)
self.assertEqual(len(integer_incremental_record_messages),3)

def test_with_state(self):
state = {
Expand Down Expand Up @@ -602,7 +602,14 @@ def test_with_state(self):
)
self.assertTrue(isinstance(versions[0], int))
self.assertEqual(versions[0], versions[1])
self.assertEqual(versions[1], 12345)

# Based on state values provided check the number of record messages emitted
record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)]
incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-incremental']
integer_incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-integer_incremental']

self.assertEqual(len(incremental_record_messages),2)
self.assertEqual(len(integer_incremental_record_messages),1)


class TestViews(unittest.TestCase):
Expand Down Expand Up @@ -650,6 +657,76 @@ def test_do_not_discover_key_properties_for_view(self):

self.assertEqual(primary_keys, {"a_table": ["id"], "a_view": []})

class TestTimestampIncrementalReplication(unittest.TestCase):
def setUp(self):
self.conn = test_utils.get_test_connection()

with connect_with_backoff(self.conn) as open_conn:
with open_conn.cursor() as cursor:
try:
cursor.execute("drop table incremental")
except:
pass
cursor.execute("CREATE TABLE incremental (val int, updated timestamp)")
cursor.execute("INSERT INTO incremental (val) VALUES (1)") #00000000000007d1
cursor.execute("INSERT INTO incremental (val) VALUES (2)") #00000000000007d2
cursor.execute("INSERT INTO incremental (val) VALUES (3)") #00000000000007d3

self.catalog = test_utils.discover_catalog(self.conn, {})

for stream in self.catalog.streams:
stream.metadata = [
{
"breadcrumb": (),
"metadata": {
"selected": True,
"table-key-properties": [],
"database-name": "dbo",
},
},
{"breadcrumb": ("properties", "val"), "metadata": {"selected": True}},
]

stream.stream = stream.table
test_utils.set_replication_method_and_key(stream, "INCREMENTAL", "updated")

def test_with_no_state(self):
state = {}

global SINGER_MESSAGES
SINGER_MESSAGES.clear()

tap_mssql.do_sync(self.conn, test_utils.get_db_config(), self.catalog, state)

(message_types, versions) = message_types_and_versions(SINGER_MESSAGES)

record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)]

self.assertEqual(len(record_messages),3)


def test_with_state(self):
state = {
"bookmarks": {
"dbo-incremental": {
"version": 1,
"replication_key_value": '00000000000007d2',
"replication_key": "updated",
},
}
}

global SINGER_MESSAGES
SINGER_MESSAGES.clear()
tap_mssql.do_sync(self.conn, test_utils.get_db_config(), self.catalog, state)

(message_types, versions) = message_types_and_versions(SINGER_MESSAGES)

# Given the state value supplied, there should only be two RECORD messages
record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)]

self.assertEqual(len(record_messages),2)


if __name__ == "__main__":
# test1 = TestBinlogReplication()
Expand Down

0 comments on commit c7aead9

Please sign in to comment.