diff --git a/pyproject.toml b/pyproject.toml index 6395c88..30ab036 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tap-mongodb" -version = "2.3.3" +version = "2.4.0" description = "`tap-mongodb` is a Singer tap for MongoDB and AWS DocumentDB, built with the Meltano Singer SDK." readme = "README.md" authors = ["Matt Menzenski"] diff --git a/tap_mongodb/connector.py b/tap_mongodb/connector.py index 366192a..627d141 100644 --- a/tap_mongodb/connector.py +++ b/tap_mongodb/connector.py @@ -2,7 +2,7 @@ import sys from logging import Logger, getLogger -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple, TypeAlias from pymongo import MongoClient from pymongo.database import Database @@ -17,6 +17,9 @@ from functools import cached_property +MongoVersion: TypeAlias = Tuple[int, int] + + class MongoDBConnector: """MongoDB/DocumentDB connector class""" @@ -34,6 +37,7 @@ def __init__( # pylint: disable=too-many-arguments self._datetime_conversion: str = datetime_conversion.upper() self._prefix: Optional[str] = prefix self._logger: Logger = getLogger(__name__) + self._version: Optional[MongoVersion] = None @cached_property def mongo_client(self) -> MongoClient: @@ -42,7 +46,9 @@ def mongo_client(self) -> MongoClient: self._connection_string, datetime_conversion=self._datetime_conversion, **self._options ) try: - client.server_info() + server_info: Dict[str, Any] = client.server_info() + version_array: List[int] = server_info["versionArray"] + self._version = (version_array[0], version_array[1]) except Exception as exception: self._logger.exception("Could not connect to MongoDB") raise RuntimeError("Could not connect to MongoDB") from exception @@ -53,6 +59,11 @@ def database(self) -> Database: """Provide a Database instance.""" return self.mongo_client[self._db_name] + @property + def version(self) -> Optional[MongoVersion]: + """Returns the MongoVersion that is being used.""" + return self._version + def get_fully_qualified_name( self, collection_name: str, diff --git a/tap_mongodb/streams.py b/tap_mongodb/streams.py index 2232920..21d0bf0 100644 --- a/tap_mongodb/streams.py +++ b/tap_mongodb/streams.py @@ -218,7 +218,13 @@ def get_records(self, context: dict | None) -> Iterable[dict]: change_stream_options = {"full_document": "updateLookup"} if bookmark is not None and bookmark != DEFAULT_START_DATE: self.logger.debug(f"using bookmark: {bookmark}") - change_stream_options["resume_after"] = {"_data": bookmark} + # if on mongo version 4.2 or above, use start_after instead of resume_after, as the former will + # gracefully open a new change stream if the resume token's event is not present in the oplog, while + # the latter will error in that scenario. + if self._connector.version >= (4, 2): + change_stream_options["start_after"] = {"_data": bookmark} + else: + change_stream_options["resume_after"] = {"_data": bookmark} operation_types_allowlist: set = set(self.config.get("operation_types")) has_seen_a_record: bool = False keep_open: bool = True @@ -245,7 +251,8 @@ def get_records(self, context: dict | None) -> Iterable[dict]: f"Unable to enable change streams on collection {collection.name}" ) from operation_failure elif ( - operation_failure.code == 286 + self._connector.version < (4, 2) + and operation_failure.code == 286 and "as the resume point may no longer be in the oplog." in operation_failure.details["errmsg"] ): self.logger.warning("Unable to resume change stream from resume token. Resetting resume token.") @@ -265,7 +272,8 @@ def get_records(self, context: dict | None) -> Iterable[dict]: record = change_stream.try_next() except OperationFailure as operation_failure: if ( - operation_failure.code == 286 + self._connector.version < (4, 2) + and operation_failure.code == 286 and "as the resume point may no longer be in the oplog." in operation_failure.details["errmsg"] ):