From 95a88b3d433eedcf2623e6858f078b36147a6fba Mon Sep 17 00:00:00 2001 From: Matt Menzenski Date: Tue, 23 Apr 2024 15:41:39 -0500 Subject: [PATCH] add collection name to log messages and more explicitly handle the dummy record scenario --- tap_mongodb/streams.py | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/tap_mongodb/streams.py b/tap_mongodb/streams.py index 2a72c3b..de898e8 100644 --- a/tap_mongodb/streams.py +++ b/tap_mongodb/streams.py @@ -244,12 +244,12 @@ def _get_records_log_based( logger.info(f"strategy is RESUME_AFTER, bookmark is {bookmark}") change_stream_options["resume_after"] = {"_data": bookmark} operation_types_allowlist: set = set(self.config.get("operation_types")) - has_seen_a_record: bool = False + has_seen_a_real_record: bool = False keep_open: bool = True try: change_stream = collection.watch(**change_stream_options) - logger.info("Opened change stream") + logger.info(f"Opened change stream on collection {collection.name}") except OperationFailure as operation_failure: if ( operation_failure.code == 136 @@ -271,15 +271,21 @@ def _get_records_log_based( ) from operation_failure elif operation_failure.code == 286: logger.opt(exception=operation_failure).error( - "Unable to resume (open) change stream from resume token. Resetting resume token." + f"Unable to resume (open) change stream on collection {collection.name} from resume token. " + "Resetting resume token." ) change_stream_options.pop("resume_after", None) + change_stream_options.pop("start_after", None) change_stream = collection.watch(**change_stream_options) else: - logger.opt(exception=operation_failure).error("OperationFailure on collection.watch()") + logger.opt(exception=operation_failure).error( + f"OperationFailure on collection.watch() in collection {collection.name}" + ) raise operation_failure except Exception as exception: - logger.opt(exception=exception).error("Unhandled exception on collection.watch()") + logger.opt(exception=exception).error( + f"Unhandled exception on collection.watch() in collection {collection.name}" + ) raise exception while change_stream.alive and keep_open: @@ -289,16 +295,20 @@ def _get_records_log_based( except OperationFailure as operation_failure: if operation_failure.code == 286: logger.opt(exception=operation_failure).error( - "Unable to resume (try_next) change stream from resume token. Resetting resume token." + f"Unable to resume (try_next) change stream from resume token in collection {collection.name}. " + "Resetting resume token." ) change_stream_options.pop("resume_after", None) + change_stream_options.pop("start_after", None) change_stream = collection.watch(**change_stream_options) record = None else: - logger.opt(exception=operation_failure).error("OperationFailure on try_next") + logger.opt(exception=operation_failure).error( + f"OperationFailure on try_next in collection {collection.name}" + ) raise operation_failure except Exception as exception: - logger.opt(exception=exception).error("Unhandled exception on try_next") + logger.opt(exception=exception).error(f"Unhandled exception on try_next in {collection.name}") raise exception # if we have processed any records, a None record means that we've caught up to the end of the # stream - set keep_open to False so that the change stream is closed and the tap exits. @@ -311,13 +321,17 @@ def _get_records_log_based( # then yield that record (whose _id is set to the change stream's resume token, so that the # change stream can be resumed from this point by a later running of the tap). # - If a change stream is opened and there is at least one record, yield all records - if record is None and not has_seen_a_record and change_stream.resume_token is not None: + if record is None and not has_seen_a_real_record and change_stream.resume_token is not None: # if we're in this block, we're in MongoDB specifically - DocumentDB will have a None resume # token here. If we take no action, the tap will remain open and idle until a message appears # in the change stream, then it will yield that record and close. That's not ideal because it # doesn't need to wait around for activity. It can just yield a "dummy" record with the resume # token from the change stream, exit immediately, and then pick up processing the change stream # from this point the next time the tap is run. So that's what we do. + resume_token = change_stream.resume_token["_data"] + logger.info( + f"Yielding 'dummy' record for collection {collection.name} with resume token {resume_token}" + ) yield { "replication_key": change_stream.resume_token["_data"], "object_id": None, @@ -328,9 +342,10 @@ def _get_records_log_based( "namespace": None, "to": None, } - has_seen_a_record = True + keep_open = False + # has_seen_a_real_record = True - if record is None and has_seen_a_record: + if record is None and has_seen_a_real_record: logger.info("Reached the end of the change stream after consuming at least one record, closing it.") keep_open = False if record is not None: @@ -379,7 +394,7 @@ def _get_records_log_based( if operation_type == "delete": parsed_record["_sdc_deleted_at"] = cluster_time yield parsed_record - has_seen_a_record = True + has_seen_a_real_record = True def get_records(self, context: Dict | None) -> Iterable[Dict]: """Return a generator of record-type dictionary objects."""