From 4a9b68258b2704c00c985c0543292f218f498f4c Mon Sep 17 00:00:00 2001 From: Rushikesh Todkar <98420315+RushiT0122@users.noreply.github.com> Date: Wed, 8 May 2024 18:18:12 +0530 Subject: [PATCH] Fix conversations stream bookmarking (#73) * Fix conversations stream bookmarking * Refactoring changes --------- Co-authored-by: RushiT0122 --- tap_intercom/streams.py | 152 ++++++++++++++++++++++++++++++++-------- 1 file changed, 123 insertions(+), 29 deletions(-) diff --git a/tap_intercom/streams.py b/tap_intercom/streams.py index 5fa16c1..fe365a5 100644 --- a/tap_intercom/streams.py +++ b/tap_intercom/streams.py @@ -126,7 +126,7 @@ def sync_substream(self, parent_id, stream_schema, stream_metadata, parent_repli return state -# pylint: disable=abstract-method +# pylint: disable=abstract-method,unused-argument class IncrementalStream(BaseStream): """ A child class of a base stream used to represent streams that use the @@ -136,6 +136,36 @@ class IncrementalStream(BaseStream): """ replication_method = 'INCREMENTAL' to_write_intermediate_bookmark = False + last_processed = None + last_sync_started_at = None + skipped_parent_ids = [] + + def set_last_processed(self, state): + self.last_processed = None + + def get_last_sync_started_at(self, state): + self.last_sync_started_at = None + + def set_last_sync_started_at(self, state): + pass + + def skip_records(self, record): + return False + + def write_bookmark(self, state, bookmark_value): + return singer.write_bookmark(state, + self.tap_stream_id, + self.replication_key, + bookmark_value) + + def write_intermediate_bookmark(self, state, last_processed, bookmark_value): + if self.to_write_intermediate_bookmark: + # Write bookmark and state after every page of records + state = singer.write_bookmark(state, + self.tap_stream_id, + self.replication_key, + singer.utils.strftime(bookmark_value)) + singer.write_state(state) # Disabled `unused-argument` as it causing pylint error. # Method which call this `sync` method is passing unused argument.So, removing argument would not work. @@ -165,6 +195,8 @@ def sync(self, parent_bookmark = singer.get_bookmark(state, self.tap_stream_id, self.replication_key, config['start_date']) parent_bookmark_utc = singer.utils.strptime_to_utc(parent_bookmark) sync_start_date = parent_bookmark_utc + self.set_last_processed(state) + self.set_last_sync_started_at(state) is_parent_selected = True is_child_selected = False @@ -209,47 +241,46 @@ def sync(self, with metrics.record_counter(self.tap_stream_id) as counter: for record in self.get_records(sync_start_date, stream_metadata=stream_metadata): + # In case of interrupted sync, skip records last synced conversations + transform_times(record, schema_datetimes) record_datetime = singer.utils.strptime_to_utc( self.epoch_milliseconds_to_dt_str( record[self.replication_key]) - ) + ) # Write the record if the parent is selected if is_parent_selected and record_datetime >= parent_bookmark_utc: record_counter += 1 transformed_record = transform(record, - stream_schema, - integer_datetime_fmt=UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING, - metadata=stream_metadata) + stream_schema, + integer_datetime_fmt=UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING, + metadata=stream_metadata) # Write record if a parent is selected singer.write_record(self.tap_stream_id, transformed_record, time_extracted=singer.utils.now()) counter.increment() max_datetime = max(record_datetime, max_datetime) - if self.to_write_intermediate_bookmark and record_counter == MAX_PAGE_SIZE: - # Write bookmark and state after every page of records - state = singer.write_bookmark(state, - self.tap_stream_id, - self.replication_key, - singer.utils.strftime(max_datetime)) - singer.write_state(state) - # Reset counter - record_counter = 0 - # Sync child stream, if the child is selected and if we have records greater than the child stream bookmark if has_child and is_child_selected and (record[self.replication_key] >= child_bookmark_ts): + # Long running jobs may starve the child stream extraction so skip the parent ids synced in last sync + # Store the parent ids so that later we can resync child records of skipped parent ids + if self.skip_records(record): + self.skipped_parent_ids.append((record.get('id'), record[self.replication_key])) + continue state = child_stream_obj.sync_substream(record.get('id'), child_schema, child_metadata, record[self.replication_key], state) + if record_counter == MAX_PAGE_SIZE: + self.write_intermediate_bookmark(state, record.get("id"), max_datetime) + # Reset counter + record_counter = 0 + bookmark_date = singer.utils.strftime(max_datetime) LOGGER.info("FINISHED Syncing: {}, total_records: {}.".format(self.tap_stream_id, counter.value)) LOGGER.info("Stream: {}, writing final bookmark".format(self.tap_stream_id)) - state = singer.write_bookmark(state, - self.tap_stream_id, - self.replication_key, - bookmark_date) + self.write_bookmark(state, bookmark_date) return state @@ -508,6 +539,50 @@ class Conversations(IncrementalStream): per_page = MAX_PAGE_SIZE child = 'conversation_parts' + def set_last_processed(self, state): + self.last_processed = singer.get_bookmark( + state, self.tap_stream_id, "last_processed") + + def set_last_sync_started_at(self, state): + last_sync_started_at = singer.get_bookmark( + state, self.tap_stream_id, "last_sync_started_at") + self.last_sync_started_at = last_sync_started_at or singer.utils.strftime(singer.utils.now()) + + def skip_records(self, record): + # If last processed id exists then check if current record id is less than last processed id + return self.last_processed and record.get("id") <= self.last_processed + + def write_bookmark(self, state, bookmark_value): + # Set last successful sync start time as new bookmark and delete intermitiate bookmarks + if "last_sync_started_at" in state.get("bookmarks", {}).get(self.tap_stream_id, {}): + bookmark_value = singer.get_bookmark(state, + self.tap_stream_id, + "last_sync_started_at") + + del state["bookmarks"][self.tap_stream_id]["last_sync_started_at"] + + if "last_processed" in state.get("bookmarks", {}).get(self.tap_stream_id, {}): + del state["bookmarks"][self.tap_stream_id]["last_processed"] + + return singer.write_bookmark(state, + self.tap_stream_id, + self.replication_key, + bookmark_value) + + def write_intermediate_bookmark(self, state, last_processed, bookmark_value): + # In scenarios where sync is interrupted, we should resume from the last id processed + state = singer.write_bookmark(state, + self.tap_stream_id, + "last_processed", + last_processed) + + # This should be set as new bookmark once all conversation records are synced + state = singer.write_bookmark(state, + self.tap_stream_id, + "last_sync_started_at", + self.last_sync_started_at) + singer.write_state(state) + def get_records(self, bookmark_datetime=None, is_parent=False, stream_metadata=None) -> Iterator[list]: paging = True starting_after = None @@ -516,20 +591,39 @@ def get_records(self, bookmark_datetime=None, is_parent=False, stream_metadata=N 'per_page': self.per_page }, 'query': { - 'operator': 'OR', - 'value': [{ - 'field': self.replication_key, - 'operator': '>', - 'value': self.dt_to_epoch_seconds(bookmark_datetime) + 'operator': 'AND', + 'value': [ + { + 'operator': 'OR', + 'value': [ + { + 'field': 'id', + 'operator': '>', + 'value': self.last_processed or "" + }, + { + 'field': 'id', + 'operator': '=', + 'value': self.last_processed or "" + }] }, { - 'field': self.replication_key, - 'operator': '=', - 'value': self.dt_to_epoch_seconds(bookmark_datetime) + 'operator': 'OR', + 'value': [ + { + 'field': self.replication_key, + 'operator': '>', + 'value': self.dt_to_epoch_seconds(bookmark_datetime) + }, + { + 'field': self.replication_key, + 'operator': '=', + 'value': self.dt_to_epoch_seconds(bookmark_datetime) + }] }] - }, + }, "sort": { - "field": self.replication_key, + "field": "id", "order": "ascending" } }