Skip to content

Commit

Permalink
add collection name to log messages and more explicitly handle the du…
Browse files Browse the repository at this point in the history
…mmy record scenario
  • Loading branch information
menzenski committed Apr 23, 2024
1 parent 0207c76 commit 95a88b3
Showing 1 changed file with 27 additions and 12 deletions.
39 changes: 27 additions & 12 deletions tap_mongodb/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,12 @@ def _get_records_log_based(
logger.info(f"strategy is RESUME_AFTER, bookmark is {bookmark}")
change_stream_options["resume_after"] = {"_data": bookmark}
operation_types_allowlist: set = set(self.config.get("operation_types"))
has_seen_a_record: bool = False
has_seen_a_real_record: bool = False
keep_open: bool = True

try:
change_stream = collection.watch(**change_stream_options)
logger.info("Opened change stream")
logger.info(f"Opened change stream on collection {collection.name}")
except OperationFailure as operation_failure:
if (
operation_failure.code == 136
Expand All @@ -271,15 +271,21 @@ def _get_records_log_based(
) from operation_failure
elif operation_failure.code == 286:
logger.opt(exception=operation_failure).error(
"Unable to resume (open) change stream from resume token. Resetting resume token."
f"Unable to resume (open) change stream on collection {collection.name} from resume token. "
"Resetting resume token."
)
change_stream_options.pop("resume_after", None)
change_stream_options.pop("start_after", None)
change_stream = collection.watch(**change_stream_options)
else:
logger.opt(exception=operation_failure).error("OperationFailure on collection.watch()")
logger.opt(exception=operation_failure).error(
f"OperationFailure on collection.watch() in collection {collection.name}"
)
raise operation_failure
except Exception as exception:
logger.opt(exception=exception).error("Unhandled exception on collection.watch()")
logger.opt(exception=exception).error(
f"Unhandled exception on collection.watch() in collection {collection.name}"
)
raise exception

while change_stream.alive and keep_open:
Expand All @@ -289,16 +295,20 @@ def _get_records_log_based(
except OperationFailure as operation_failure:
if operation_failure.code == 286:
logger.opt(exception=operation_failure).error(
"Unable to resume (try_next) change stream from resume token. Resetting resume token."
f"Unable to resume (try_next) change stream from resume token in collection {collection.name}. "
"Resetting resume token."
)
change_stream_options.pop("resume_after", None)
change_stream_options.pop("start_after", None)
change_stream = collection.watch(**change_stream_options)
record = None
else:
logger.opt(exception=operation_failure).error("OperationFailure on try_next")
logger.opt(exception=operation_failure).error(
f"OperationFailure on try_next in collection {collection.name}"
)
raise operation_failure
except Exception as exception:
logger.opt(exception=exception).error("Unhandled exception on try_next")
logger.opt(exception=exception).error(f"Unhandled exception on try_next in {collection.name}")
raise exception
# if we have processed any records, a None record means that we've caught up to the end of the
# stream - set keep_open to False so that the change stream is closed and the tap exits.
Expand All @@ -311,13 +321,17 @@ def _get_records_log_based(
# then yield that record (whose _id is set to the change stream's resume token, so that the
# change stream can be resumed from this point by a later running of the tap).
# - If a change stream is opened and there is at least one record, yield all records
if record is None and not has_seen_a_record and change_stream.resume_token is not None:
if record is None and not has_seen_a_real_record and change_stream.resume_token is not None:
# if we're in this block, we're in MongoDB specifically - DocumentDB will have a None resume
# token here. If we take no action, the tap will remain open and idle until a message appears
# in the change stream, then it will yield that record and close. That's not ideal because it
# doesn't need to wait around for activity. It can just yield a "dummy" record with the resume
# token from the change stream, exit immediately, and then pick up processing the change stream
# from this point the next time the tap is run. So that's what we do.
resume_token = change_stream.resume_token["_data"]
logger.info(
f"Yielding 'dummy' record for collection {collection.name} with resume token {resume_token}"
)
yield {
"replication_key": change_stream.resume_token["_data"],
"object_id": None,
Expand All @@ -328,9 +342,10 @@ def _get_records_log_based(
"namespace": None,
"to": None,
}
has_seen_a_record = True
keep_open = False
# has_seen_a_real_record = True

if record is None and has_seen_a_record:
if record is None and has_seen_a_real_record:
logger.info("Reached the end of the change stream after consuming at least one record, closing it.")
keep_open = False
if record is not None:
Expand Down Expand Up @@ -379,7 +394,7 @@ def _get_records_log_based(
if operation_type == "delete":
parsed_record["_sdc_deleted_at"] = cluster_time
yield parsed_record
has_seen_a_record = True
has_seen_a_real_record = True

def get_records(self, context: Dict | None) -> Iterable[Dict]:
"""Return a generator of record-type dictionary objects."""
Expand Down

0 comments on commit 95a88b3

Please sign in to comment.