Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix conversations stream bookmarking #73

Merged
merged 31 commits into from
May 8, 2024
Merged
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
cb8dc00
bookmark after each page of conversation records
dsprayberry Jan 8, 2024
2af8e9f
Fix coversations stream bookmarking
Mar 15, 2024
aab6a2d
Minor fix
dsprayberry Mar 15, 2024
56ff5c4
Update bookmarking
dsprayberry Apr 3, 2024
51a1bcc
Refactoring changes
Apr 15, 2024
f301303
Fix unittest failures
Apr 15, 2024
1fd5437
Fix the pagination integration test
RushiT0122 Apr 16, 2024
2adcb11
Fixed All fields test.
shantanu73 Apr 22, 2024
ea85c61
Fixed Automatic Fields test.
shantanu73 Apr 22, 2024
813416b
Fixed Start Date test.
shantanu73 Apr 22, 2024
41d832a
Fixed Bookmarks test.
shantanu73 Apr 23, 2024
9c31d05
Fixed Pagination test.
shantanu73 Apr 23, 2024
dc60686
X-Fail Parent Child test due to insufficient data.
shantanu73 Apr 24, 2024
309d3ee
Changes:
shantanu73 Apr 25, 2024
60a793e
Added contacts stream to bookmark & start_date tests by modifying the…
shantanu73 Apr 25, 2024
bbd5761
Reverted the changes related to TDL-17035 as the issue is not yet res…
shantanu73 Apr 29, 2024
30de8bd
Removed comment related to TDL-19860 as it is resolved.
shantanu73 Apr 29, 2024
1683d61
Added more testable streams in interrupted sync.
shantanu73 Apr 29, 2024
52219e9
Fixed Bookmark test to include more streams.
shantanu73 Apr 29, 2024
8385a2e
Update conversations stream search query
May 1, 2024
56fc918
Merge branch 'TDL-25075-fix-bookmarking-conversations' of github.com:…
May 1, 2024
6580dc2
bookmark after each page of conversation records
dsprayberry Jan 8, 2024
bd056c0
Fix coversations stream bookmarking
Mar 15, 2024
4ca37aa
Minor fix
dsprayberry Mar 15, 2024
60d40e1
Update bookmarking
dsprayberry Apr 3, 2024
3569c3e
Refactoring changes
Apr 15, 2024
6242064
Fix unittest failures
Apr 15, 2024
1f98b2e
Update conversations stream search query
May 1, 2024
936ef63
Fix the pagination integration test
RushiT0122 Apr 16, 2024
0070cb8
Merge branch 'TDL-25075-fix-bookmarking-conversations' of github.com:…
May 1, 2024
05c2de0
Merge branch 'master' into TDL-25075-fix-bookmarking-conversations
RushiT0122 May 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 123 additions & 29 deletions tap_intercom/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def sync_substream(self, parent_id, stream_schema, stream_metadata, parent_repli

return state

# pylint: disable=abstract-method
# pylint: disable=abstract-method,unused-argument
class IncrementalStream(BaseStream):
"""
A child class of a base stream used to represent streams that use the
Expand All @@ -136,6 +136,36 @@ class IncrementalStream(BaseStream):
"""
replication_method = 'INCREMENTAL'
to_write_intermediate_bookmark = False
last_processed = None
last_sync_started_at = None
skipped_parent_ids = []

def set_last_processed(self, state):
self.last_processed = None

def get_last_sync_started_at(self, state):
self.last_sync_started_at = None

def set_last_sync_started_at(self, state):
pass

def skip_records(self, record):
return False

def write_bookmark(self, state, bookmark_value):
return singer.write_bookmark(state,
self.tap_stream_id,
self.replication_key,
bookmark_value)

def write_intermediate_bookmark(self, state, last_processed, bookmark_value):
if self.to_write_intermediate_bookmark:
# Write bookmark and state after every page of records
state = singer.write_bookmark(state,
self.tap_stream_id,
self.replication_key,
singer.utils.strftime(bookmark_value))
singer.write_state(state)

# Disabled `unused-argument` as it causing pylint error.
# Method which call this `sync` method is passing unused argument.So, removing argument would not work.
Expand Down Expand Up @@ -165,6 +195,8 @@ def sync(self,
parent_bookmark = singer.get_bookmark(state, self.tap_stream_id, self.replication_key, config['start_date'])
parent_bookmark_utc = singer.utils.strptime_to_utc(parent_bookmark)
sync_start_date = parent_bookmark_utc
self.set_last_processed(state)
self.set_last_sync_started_at(state)

is_parent_selected = True
is_child_selected = False
Expand Down Expand Up @@ -209,47 +241,46 @@ def sync(self,

with metrics.record_counter(self.tap_stream_id) as counter:
for record in self.get_records(sync_start_date, stream_metadata=stream_metadata):
# In case of interrupted sync, skip records last synced conversations

transform_times(record, schema_datetimes)

record_datetime = singer.utils.strptime_to_utc(
self.epoch_milliseconds_to_dt_str(
record[self.replication_key])
)
)

# Write the record if the parent is selected
if is_parent_selected and record_datetime >= parent_bookmark_utc:
record_counter += 1
transformed_record = transform(record,
stream_schema,
integer_datetime_fmt=UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING,
metadata=stream_metadata)
stream_schema,
integer_datetime_fmt=UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING,
metadata=stream_metadata)
# Write record if a parent is selected
singer.write_record(self.tap_stream_id, transformed_record, time_extracted=singer.utils.now())
counter.increment()
max_datetime = max(record_datetime, max_datetime)

if self.to_write_intermediate_bookmark and record_counter == MAX_PAGE_SIZE:
# Write bookmark and state after every page of records
state = singer.write_bookmark(state,
self.tap_stream_id,
self.replication_key,
singer.utils.strftime(max_datetime))
singer.write_state(state)
# Reset counter
record_counter = 0

# Sync child stream, if the child is selected and if we have records greater than the child stream bookmark
if has_child and is_child_selected and (record[self.replication_key] >= child_bookmark_ts):
# Long running jobs may starve the child stream extraction so skip the parent ids synced in last sync
# Store the parent ids so that later we can resync child records of skipped parent ids
if self.skip_records(record):
self.skipped_parent_ids.append((record.get('id'), record[self.replication_key]))
continue
state = child_stream_obj.sync_substream(record.get('id'), child_schema, child_metadata, record[self.replication_key], state)

if record_counter == MAX_PAGE_SIZE:
self.write_intermediate_bookmark(state, record.get("id"), max_datetime)
# Reset counter
record_counter = 0

bookmark_date = singer.utils.strftime(max_datetime)
LOGGER.info("FINISHED Syncing: {}, total_records: {}.".format(self.tap_stream_id, counter.value))

LOGGER.info("Stream: {}, writing final bookmark".format(self.tap_stream_id))
state = singer.write_bookmark(state,
self.tap_stream_id,
self.replication_key,
bookmark_date)
self.write_bookmark(state, bookmark_date)
return state


Expand Down Expand Up @@ -508,6 +539,50 @@ class Conversations(IncrementalStream):
per_page = MAX_PAGE_SIZE
child = 'conversation_parts'

def set_last_processed(self, state):
self.last_processed = singer.get_bookmark(
state, self.tap_stream_id, "last_processed")

def set_last_sync_started_at(self, state):
last_sync_started_at = singer.get_bookmark(
state, self.tap_stream_id, "last_sync_started_at")
self.last_sync_started_at = last_sync_started_at or singer.utils.strftime(singer.utils.now())

def skip_records(self, record):
# If last processed id exists then check if current record id is less than last processed id
return self.last_processed and record.get("id") <= self.last_processed

def write_bookmark(self, state, bookmark_value):
# Set last successful sync start time as new bookmark and delete intermitiate bookmarks
if "last_sync_started_at" in state.get("bookmarks", {}).get(self.tap_stream_id, {}):
bookmark_value = singer.get_bookmark(state,
self.tap_stream_id,
"last_sync_started_at")

del state["bookmarks"][self.tap_stream_id]["last_sync_started_at"]

if "last_processed" in state.get("bookmarks", {}).get(self.tap_stream_id, {}):
del state["bookmarks"][self.tap_stream_id]["last_processed"]

return singer.write_bookmark(state,
self.tap_stream_id,
self.replication_key,
bookmark_value)

def write_intermediate_bookmark(self, state, last_processed, bookmark_value):
# In scenarios where sync is interrupted, we should resume from the last id processed
state = singer.write_bookmark(state,
self.tap_stream_id,
"last_processed",
last_processed)

# This should be set as new bookmark once all conversation records are synced
state = singer.write_bookmark(state,
self.tap_stream_id,
"last_sync_started_at",
self.last_sync_started_at)
singer.write_state(state)

def get_records(self, bookmark_datetime=None, is_parent=False, stream_metadata=None) -> Iterator[list]:
paging = True
starting_after = None
Expand All @@ -516,20 +591,39 @@ def get_records(self, bookmark_datetime=None, is_parent=False, stream_metadata=N
'per_page': self.per_page
},
'query': {
'operator': 'OR',
'value': [{
'field': self.replication_key,
'operator': '>',
'value': self.dt_to_epoch_seconds(bookmark_datetime)
'operator': 'AND',
'value': [
{
'operator': 'OR',
'value': [
{
'field': 'id',
'operator': '>',
'value': self.last_processed or ""
},
{
'field': 'id',
'operator': '=',
'value': self.last_processed or ""
}]
},
{
'field': self.replication_key,
'operator': '=',
'value': self.dt_to_epoch_seconds(bookmark_datetime)
'operator': 'OR',
'value': [
{
'field': self.replication_key,
'operator': '>',
'value': self.dt_to_epoch_seconds(bookmark_datetime)
},
{
'field': self.replication_key,
'operator': '=',
'value': self.dt_to_epoch_seconds(bookmark_datetime)
}]
}]
},
},
"sort": {
"field": self.replication_key,
"field": "id",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

field is hardcoded to id, is it working for all the streams?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for the Conversations stream.

"order": "ascending"
}
}
Expand Down