Skip to content

Commit

Permalink
Update current_log_version on full sync (singer-io#10)
Browse files Browse the repository at this point in the history
* Current_log_version to None on full sync

* Update current_log_instance to None on full sync

* removed unwanted space

* test

* working

* formatting reverted

* Added logic to factor in None values for self.current_log_version

* Update current_log_version to be min_valid_version if full sync

* moved the current_log_version = min_valid_version to the if statement above so that it only changes if current_log_version is actually None

* added comment

* changed to _get_current_log_version for current_log_version

* push for testing

* logging

* logger 2

* more logging

* updated logger

* fixed logger

* one more logging change

* one more

* New log_based_initial_full_table function

* fixed none type again

* get current version back

* removed logging from init.py

* One more change to loook at Nones

* removed logs
  • Loading branch information
brose7230 authored Dec 11, 2021
1 parent 31e3763 commit 3ad059f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 15 deletions.
1 change: 1 addition & 0 deletions tap_mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ def do_sync_log_based_table(mssql_conn, config, catalog_entry, state, columns):
state = singer.write_bookmark(
state, catalog_entry.tap_stream_id, "initial_full_table_complete", True
)

state = singer.write_bookmark(
state,
catalog_entry.tap_stream_id,
Expand Down
31 changes: 16 additions & 15 deletions tap_mssql/sync_strategies/logical.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def _get_current_log_version(self):
sql_query = "SELECT current_version = CHANGE_TRACKING_CURRENT_VERSION()"

current_log_version = self._get_single_result(sql_query, "current_version")

return current_log_version

def _get_non_key_properties(self, key_properties):
Expand All @@ -187,21 +187,22 @@ def log_based_initial_full_table(self):

min_valid_version = self._get_min_valid_version()

min_version_out_of_date = min_valid_version > self.current_log_version

if self.initial_full_table_complete == False:
self.logger.info("No initial load found, executing a full table sync.")
return True

elif (
self.initial_full_table_complete == True and min_version_out_of_date == True
):
self.logger.info(
"CHANGE_TRACKING_MIN_VALID_VERSION has reported a value greater than current-log-version. Executing a full table sync."
)
if self.current_log_version is None: # prevents the operator in the else statement from erroring if None
self.current_log_version = self._get_current_log_version()
self.logger.info("executing a full table sync.")
return True
else:
return False
min_version_out_of_date = min_valid_version > self.current_log_version

if (
self.initial_full_table_complete == True and min_version_out_of_date == True
):
self.logger.info(
"CHANGE_TRACKING_MIN_VALID_VERSION has reported a value greater than current-log-version. Executing a full table sync."
)
return True
else:
return False

def execute_log_based_sync(self):
"Confirm we have state and run a log based query. This will be larger."
Expand Down Expand Up @@ -342,5 +343,5 @@ def _get_single_result(self, sql_query, column):
row = results.fetchone()

single_result = row[column]

return single_result

0 comments on commit 3ad059f

Please sign in to comment.