Skip to content

Commit

Permalink
Fix change stream handling in MongoDB log-based replication mode
Browse files Browse the repository at this point in the history
  • Loading branch information
menzenski committed Feb 16, 2024
1 parent b84a1e0 commit cf875b8
Show file tree
Hide file tree
Showing 11 changed files with 1,446 additions and 1,209 deletions.
20 changes: 15 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,54 @@
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
rev: v4.5.0
hooks:
- id: check-added-large-files
- id: check-toml
- id: check-vcs-permalinks
- id: detect-private-key
- id: end-of-file-fixer
exclude: mongodb-replicaset.key
- id: name-tests-test
exclude: tests/utilities/
- id: no-commit-to-branch
args:
- --branch
- main
- id: pretty-format-json
args:
- --autofix
- --indent
- '2'
- id: trailing-whitespace
- repo: https://github.com/lyz-code/yamlfix.git
rev: 1.9.0
rev: 1.16.0
hooks:
- id: yamlfix
- repo: https://github.com/adrienverge/yamllint.git
rev: v1.28.0
rev: v1.35.0
hooks:
- id: yamllint
args:
- --format
- parsable
- --strict
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.0.252
rev: v0.2.1
hooks:
- id: ruff
args:
- --fix
- --exit-non-zero-on-fix
- repo: https://github.com/psf/black
rev: 23.1.0
rev: 24.2.0
hooks:
- id: black
language_version: python3
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.8.0
hooks:
- id: mypy
- repo: local
hooks:
- id: pylint
Expand Down
12 changes: 12 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ plugins:
- insert
- replace
- update
- name: change_stream_resume_strategy
kind: string
description: |
Only used when tap is run in log-based replication mode. This setting specifies how the tap creates a
new change stream on runs after the first. The default is `resume_after` (see
https://www.mongodb.com/docs/manual/changeStreams/#resumeafter-for-change-streams), which was added
in MongoDB 3.6 when the ChangeStream API was introduced. The `start_after` setting (see
https://www.mongodb.com/docs/manual/changeStreams/#startafter-for-change-streams) requires MongoDB
version 4.2 or greater. You may switch back and forth between `resume_after` and `start_after`
settings (provided the MongoDB version is at least 4.2) freely. If the value provided to this setting
is not compatible with the MongoDB version in use, this setting defaults to `resume_after`.
value: resume_after
- name: stream_maps
kind: object
description: |
Expand Down
2,080 changes: 1,088 additions & 992 deletions poetry.lock

Large diffs are not rendered by default.

21 changes: 13 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "tap-mongodb"
version = "2.5.0"
version = "2.6.0"
description = "`tap-mongodb` is a Singer tap for MongoDB and AWS DocumentDB, built with the Meltano Singer SDK."
readme = "README.md"
authors = ["Matt Menzenski"]
Expand All @@ -14,11 +14,12 @@ packages = [
]

[tool.poetry.dependencies]
python = "<3.12,>=3.7.2"
singer-sdk = { version = "^0.31.1" }
python = "<3.12,>=3.8"
singer-sdk = { version = "^0.35.0" }
fs-s3fs = { version = "^1.1.1", optional = true }
pymongo = "^4.4.1"
"backports.cached-property" = { version = "^1.0.2" }
strenum = "^0.4.15"
typing-extensions = {version = "^4.9.0", python = "<3.10"}

[tool.poetry.group.dev.dependencies]
pytest = "^7.3.1"
Expand All @@ -28,8 +29,9 @@ black = "^23.1.0"
pyupgrade = "^3.3.1"
mypy = "^1.0.0"
isort = "^5.11.5"
singer-sdk = { version = "^0.31.1", extras = ["testing"] }
singer-sdk = { version = "^0.35.0", extras = ["testing"] }
pylint = "^3.0.0a6"
loguru = "^0.7.2"

[tool.poetry.extras]
s3 = ["fs-s3fs"]
Expand All @@ -49,6 +51,7 @@ target-version = ['py311']
[tool.poetry.scripts]
# CLI declaration
tap-mongodb = 'tap_mongodb.tap:TapMongoDB.cli'
seed = "tests.utilities.seed:main"

[tool.yamlfix]
comments_min_spaces_from_content = 2
Expand All @@ -63,6 +66,10 @@ quote_basic_values = false
quote_keys_and_basic_values = false

[tool.ruff]
line-length = 120
target-version = "py311"

[tool.ruff.lint]
select = [
"E",
"F",
Expand All @@ -74,10 +81,8 @@ exclude = [
".secrets",
"output",
]
line-length = 120
target-version = "py311"

[tool.ruff.isort]
[tool.ruff.lint.isort]
known-third-party = [
"pymongo",
"singer_sdk",
Expand Down
16 changes: 4 additions & 12 deletions tap_mongodb/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,22 @@

import sys
from logging import Logger, getLogger
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional

from pymongo import MongoClient
from pymongo.database import Database
from pymongo.errors import PyMongoError
from singer_sdk._singerlib.catalog import CatalogEntry, MetadataMapping, Schema

from tap_mongodb.schema import SCHEMA
from tap_mongodb.types import MongoVersion

if sys.version_info[:2] < (3, 8):
from backports.cached_property import cached_property
else:
from functools import cached_property


try:
from typing import TypeAlias # pylint: disable=ungrouped-imports

MongoVersion: TypeAlias = Tuple[int, int]
except ImportError:
TypeAlias = None
MongoVersion = Tuple[int, int]


class MongoDBConnector:
"""MongoDB/DocumentDB connector class"""

Expand All @@ -43,7 +35,7 @@ def __init__( # pylint: disable=too-many-arguments
self._datetime_conversion: str = datetime_conversion.upper()
self._prefix: Optional[str] = prefix
self._logger: Logger = getLogger(__name__)
self._version: Optional[MongoVersion] = None
self._version: MongoVersion

@cached_property
def mongo_client(self) -> MongoClient:
Expand All @@ -66,7 +58,7 @@ def database(self) -> Database:
return self.mongo_client[self._db_name]

@property
def version(self) -> Optional[MongoVersion]:
def version(self) -> MongoVersion:
"""Returns the MongoVersion that is being used."""
return self._version

Expand Down
Loading

0 comments on commit cf875b8

Please sign in to comment.