Skip to content

Commit

Permalink
Fix conversations stream bookmarking (#73)
Browse files Browse the repository at this point in the history
* Fix conversations stream bookmarking
* Refactoring changes


---------

Co-authored-by: RushiT0122 <[email protected]>
  • Loading branch information
RushiT0122 and RushiT0122 authored May 8, 2024
1 parent 8c6bbc5 commit 4a9b682
Showing 1 changed file with 123 additions and 29 deletions.
152 changes: 123 additions & 29 deletions tap_intercom/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def sync_substream(self, parent_id, stream_schema, stream_metadata, parent_repli

return state

# pylint: disable=abstract-method
# pylint: disable=abstract-method,unused-argument
class IncrementalStream(BaseStream):
"""
A child class of a base stream used to represent streams that use the
Expand All @@ -136,6 +136,36 @@ class IncrementalStream(BaseStream):
"""
replication_method = 'INCREMENTAL'
to_write_intermediate_bookmark = False
last_processed = None
last_sync_started_at = None
skipped_parent_ids = []

def set_last_processed(self, state):
self.last_processed = None

def get_last_sync_started_at(self, state):
self.last_sync_started_at = None

def set_last_sync_started_at(self, state):
pass

def skip_records(self, record):
return False

def write_bookmark(self, state, bookmark_value):
return singer.write_bookmark(state,
self.tap_stream_id,
self.replication_key,
bookmark_value)

def write_intermediate_bookmark(self, state, last_processed, bookmark_value):
if self.to_write_intermediate_bookmark:
# Write bookmark and state after every page of records
state = singer.write_bookmark(state,
self.tap_stream_id,
self.replication_key,
singer.utils.strftime(bookmark_value))
singer.write_state(state)

# Disabled `unused-argument` as it causing pylint error.
# Method which call this `sync` method is passing unused argument.So, removing argument would not work.
Expand Down Expand Up @@ -165,6 +195,8 @@ def sync(self,
parent_bookmark = singer.get_bookmark(state, self.tap_stream_id, self.replication_key, config['start_date'])
parent_bookmark_utc = singer.utils.strptime_to_utc(parent_bookmark)
sync_start_date = parent_bookmark_utc
self.set_last_processed(state)
self.set_last_sync_started_at(state)

is_parent_selected = True
is_child_selected = False
Expand Down Expand Up @@ -209,47 +241,46 @@ def sync(self,

with metrics.record_counter(self.tap_stream_id) as counter:
for record in self.get_records(sync_start_date, stream_metadata=stream_metadata):
# In case of interrupted sync, skip records last synced conversations

transform_times(record, schema_datetimes)

record_datetime = singer.utils.strptime_to_utc(
self.epoch_milliseconds_to_dt_str(
record[self.replication_key])
)
)

# Write the record if the parent is selected
if is_parent_selected and record_datetime >= parent_bookmark_utc:
record_counter += 1
transformed_record = transform(record,
stream_schema,
integer_datetime_fmt=UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING,
metadata=stream_metadata)
stream_schema,
integer_datetime_fmt=UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING,
metadata=stream_metadata)
# Write record if a parent is selected
singer.write_record(self.tap_stream_id, transformed_record, time_extracted=singer.utils.now())
counter.increment()
max_datetime = max(record_datetime, max_datetime)

if self.to_write_intermediate_bookmark and record_counter == MAX_PAGE_SIZE:
# Write bookmark and state after every page of records
state = singer.write_bookmark(state,
self.tap_stream_id,
self.replication_key,
singer.utils.strftime(max_datetime))
singer.write_state(state)
# Reset counter
record_counter = 0

# Sync child stream, if the child is selected and if we have records greater than the child stream bookmark
if has_child and is_child_selected and (record[self.replication_key] >= child_bookmark_ts):
# Long running jobs may starve the child stream extraction so skip the parent ids synced in last sync
# Store the parent ids so that later we can resync child records of skipped parent ids
if self.skip_records(record):
self.skipped_parent_ids.append((record.get('id'), record[self.replication_key]))
continue
state = child_stream_obj.sync_substream(record.get('id'), child_schema, child_metadata, record[self.replication_key], state)

if record_counter == MAX_PAGE_SIZE:
self.write_intermediate_bookmark(state, record.get("id"), max_datetime)
# Reset counter
record_counter = 0

bookmark_date = singer.utils.strftime(max_datetime)
LOGGER.info("FINISHED Syncing: {}, total_records: {}.".format(self.tap_stream_id, counter.value))

LOGGER.info("Stream: {}, writing final bookmark".format(self.tap_stream_id))
state = singer.write_bookmark(state,
self.tap_stream_id,
self.replication_key,
bookmark_date)
self.write_bookmark(state, bookmark_date)
return state


Expand Down Expand Up @@ -508,6 +539,50 @@ class Conversations(IncrementalStream):
per_page = MAX_PAGE_SIZE
child = 'conversation_parts'

def set_last_processed(self, state):
self.last_processed = singer.get_bookmark(
state, self.tap_stream_id, "last_processed")

def set_last_sync_started_at(self, state):
last_sync_started_at = singer.get_bookmark(
state, self.tap_stream_id, "last_sync_started_at")
self.last_sync_started_at = last_sync_started_at or singer.utils.strftime(singer.utils.now())

def skip_records(self, record):
# If last processed id exists then check if current record id is less than last processed id
return self.last_processed and record.get("id") <= self.last_processed

def write_bookmark(self, state, bookmark_value):
# Set last successful sync start time as new bookmark and delete intermitiate bookmarks
if "last_sync_started_at" in state.get("bookmarks", {}).get(self.tap_stream_id, {}):
bookmark_value = singer.get_bookmark(state,
self.tap_stream_id,
"last_sync_started_at")

del state["bookmarks"][self.tap_stream_id]["last_sync_started_at"]

if "last_processed" in state.get("bookmarks", {}).get(self.tap_stream_id, {}):
del state["bookmarks"][self.tap_stream_id]["last_processed"]

return singer.write_bookmark(state,
self.tap_stream_id,
self.replication_key,
bookmark_value)

def write_intermediate_bookmark(self, state, last_processed, bookmark_value):
# In scenarios where sync is interrupted, we should resume from the last id processed
state = singer.write_bookmark(state,
self.tap_stream_id,
"last_processed",
last_processed)

# This should be set as new bookmark once all conversation records are synced
state = singer.write_bookmark(state,
self.tap_stream_id,
"last_sync_started_at",
self.last_sync_started_at)
singer.write_state(state)

def get_records(self, bookmark_datetime=None, is_parent=False, stream_metadata=None) -> Iterator[list]:
paging = True
starting_after = None
Expand All @@ -516,20 +591,39 @@ def get_records(self, bookmark_datetime=None, is_parent=False, stream_metadata=N
'per_page': self.per_page
},
'query': {
'operator': 'OR',
'value': [{
'field': self.replication_key,
'operator': '>',
'value': self.dt_to_epoch_seconds(bookmark_datetime)
'operator': 'AND',
'value': [
{
'operator': 'OR',
'value': [
{
'field': 'id',
'operator': '>',
'value': self.last_processed or ""
},
{
'field': 'id',
'operator': '=',
'value': self.last_processed or ""
}]
},
{
'field': self.replication_key,
'operator': '=',
'value': self.dt_to_epoch_seconds(bookmark_datetime)
'operator': 'OR',
'value': [
{
'field': self.replication_key,
'operator': '>',
'value': self.dt_to_epoch_seconds(bookmark_datetime)
},
{
'field': self.replication_key,
'operator': '=',
'value': self.dt_to_epoch_seconds(bookmark_datetime)
}]
}]
},
},
"sort": {
"field": self.replication_key,
"field": "id",
"order": "ascending"
}
}
Expand Down

0 comments on commit 4a9b682

Please sign in to comment.