Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix the resuming of MongoDB change streams in log-based replication mode #31

Draft
wants to merge 43 commits into
base: main
Choose a base branch
from

Conversation

menzenski
Copy link
Contributor

@menzenski menzenski commented Feb 16, 2024

Fixes #27
Fixes #29
Fixes #28
Fixes #30

@menzenski menzenski self-assigned this Feb 16, 2024
pyproject.toml Outdated Show resolved Hide resolved
@@ -179,7 +170,7 @@ def _generate_record_messages(self, record: dict) -> Generator[singer.RecordMess
Record message objects.
"""
extracted_at: DateTime = record.pop("_sdc_extracted_at", utc_now())
pop_deselected_record_properties(record, self.schema, self.mask, self.logger)
pop_deselected_record_properties(record, self.schema, self.mask)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the SDK version requires this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about this change, I was confident this was in practice a private API 😞

@@ -197,174 +188,190 @@ def _generate_record_messages(self, record: dict) -> Generator[singer.RecordMess
)
yield record_message

def get_records(self, context: dict | None) -> Iterable[dict]:
"""Return a generator of record-type dictionary objects."""
def _get_records_incremental(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored slightly to split up the giant method, now have incremental and log-based get_records implementation methods.

self.logger.critical(exception)
raise exception

while change_stream.alive and keep_open:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's hard to see in this diff with the other refactors, but note the with change_stream in https://github.com/MeltanoLabs/tap-mongodb/pull/31/files#diff-e276f9374d5b67980ff79f28e37aa6417f4f456c8bdfbf49d709acd30ebb3f7aL289 that is absent here.

) from operation_failure
elif (
self._connector.version < (4, 2)
resume_strategy == ResumeStrategy.RESUME_AFTER
and operation_failure.code == 286
and "as the resume point may no longer be in the oplog." in operation_failure.details["errmsg"]
):
self.logger.warning("Unable to resume change stream from resume token. Resetting resume token.")
change_stream_options.pop("resume_after", None)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff with the refactor makes this hard to see, but lines 291–293 here are new - these three lines combined with the removal of the with change_stream line (see above comment) are the meat of this fix. These lines allow the tap to gracefully re-open a change stream if the resume token from the state is not valid.

.github/workflows/validate.yml Outdated Show resolved Hide resolved
pyproject.toml Outdated Show resolved Hide resolved
# fullDocument key is not present on delete events - if it is missing, fall back to documentKey
# instead. If that is missing, pass None/null to avoid raising an error.
document = record.get("fullDocument", record.get("documentKey", None))
object_id: Optional[ObjectId] = document["_id"] if document and "_id" in document else None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#27

this issue is fixed with the if document check. Compare:

object_id: Optional[ObjectId] = document["_id"] if "_id" in document else None

to

object_id: Optional[ObjectId] = document["_id"] if document and "_id" in document else None

@menzenski menzenski changed the title Fix the resuming of MongoDB change streams in log-based replication mode fix: Fix the resuming of MongoDB change streams in log-based replication mode Feb 16, 2024
@menzenski menzenski requested review from WillDaSilva, edgarrmondragon and a team February 16, 2024 16:11
pyproject.toml Outdated Show resolved Hide resolved
pyproject.toml Outdated
@@ -28,7 +29,7 @@ black = "^23.1.0"
pyupgrade = "^3.3.1"
mypy = "^1.0.0"
isort = "^5.11.5"
singer-sdk = { version = "^0.31.1", extras = ["testing"] }
singer-sdk = { version = "^0.35.0", extras = ["testing"] }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes #27

@@ -43,7 +35,7 @@ def __init__( # pylint: disable=too-many-arguments
self._datetime_conversion: str = datetime_conversion.upper()
self._prefix: Optional[str] = prefix
self._logger: Logger = getLogger(__name__)
self._version: Optional[MongoVersion] = None
self._version: MongoVersion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might make sense to move this annotation to the class level, ie

class MongoDBConnector:

    _version: MongoVersion

@@ -179,7 +170,7 @@ def _generate_record_messages(self, record: dict) -> Generator[singer.RecordMess
Record message objects.
"""
extracted_at: DateTime = record.pop("_sdc_extracted_at", utc_now())
pop_deselected_record_properties(record, self.schema, self.mask, self.logger)
pop_deselected_record_properties(record, self.schema, self.mask)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about this change, I was confident this was in practice a private API 😞


from pymongo import MongoClient
from pymongo.database import Database
from pymongo.errors import PyMongoError
from singer_sdk._singerlib.catalog import CatalogEntry, MetadataMapping, Schema

from tap_mongodb.schema import SCHEMA
from tap_mongodb.types import MongoVersion

if sys.version_info[:2] < (3, 8):
from backports.cached_property import cached_property
else:
from functools import cached_property
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're dropping support for Python 3.7 I think this can be simplified

@@ -102,7 +96,6 @@ def primary_keys(self) -> Optional[list[str]]:
def primary_keys(self, new_value: list[str]) -> None:
"""Set primary keys for the stream."""
self._primary_keys = new_value
return self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

tap_mongodb/streams.py Outdated Show resolved Hide resolved
Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
@menzenski menzenski marked this pull request as draft February 20, 2024 11:46
@menzenski
Copy link
Contributor Author

Moving this PR to Draft status - I'm still working on it.

@menzenski menzenski marked this pull request as ready for review February 20, 2024 13:12
@menzenski menzenski marked this pull request as draft February 20, 2024 13:12
@edgarrmondragon
Copy link
Member

Moving this PR to Draft status - I'm still working on it.

👍 @menzenski thanks for this PR! Ping me when it's ready :)

@menzenski
Copy link
Contributor Author

@edgarrmondragon will do!

I think I have a way to run a MongoDB container in docker as a replica set, which hopefully will let me add real actual tests of the log-based behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants