diff --git a/.evergreen/scripts/setup_tests.py b/.evergreen/scripts/setup_tests.py index 13444fe9ca..a4ff77ab71 100644 --- a/.evergreen/scripts/setup_tests.py +++ b/.evergreen/scripts/setup_tests.py @@ -162,10 +162,6 @@ def handle_test_env() -> None: write_env("PIP_PREFER_BINARY") # Prefer binary dists by default. write_env("UV_FROZEN") # Do not modify lock files. - # Skip CSOT tests on non-linux platforms. - if PLATFORM != "linux": - write_env("SKIP_CSOT_TESTS") - # Set an environment variable for the test name and sub test name. write_env(f"TEST_{test_name.upper()}") write_env("TEST_NAME", test_name) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 5a2bf4d913..34dd269690 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -404,6 +404,13 @@ If you are running one of the `no-responder` tests, omit the `run-server` step. - Regenerate the test variants and tasks using `pre-commit run --all-files generate-config`. - Make sure to add instructions for running the test suite to `CONTRIBUTING.md`. +## Handling flaky tests + +We have a custom `flaky` decorator in [test/asynchronous/utils.py](test/asynchronous/utils.py) that can be used for +tests that are `flaky`. By default the decorator only applies when not running on CPython on Linux, since other +runtimes tend to have more variation. When using the `flaky` decorator, open a corresponding ticket and +a use the ticket number as the "reason" parameter to the decorator, e.g. `@flaky(reason="PYTHON-1234")`. + ## Specification Tests The MongoDB [specifications repository](https://github.com/mongodb/specifications) diff --git a/test/__init__.py b/test/__init__.py index e0646ce894..bfca61b20c 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -32,6 +32,7 @@ import warnings from asyncio import iscoroutinefunction +from pymongo.errors import AutoReconnect from pymongo.synchronous.uri_parser import parse_uri try: @@ -1219,12 +1220,17 @@ def teardown(): c = client_context.client if c: if not client_context.is_data_lake: - c.drop_database("pymongo-pooling-tests") - c.drop_database("pymongo_test") - c.drop_database("pymongo_test1") - c.drop_database("pymongo_test2") - c.drop_database("pymongo_test_mike") - c.drop_database("pymongo_test_bernie") + try: + c.drop_database("pymongo-pooling-tests") + c.drop_database("pymongo_test") + c.drop_database("pymongo_test1") + c.drop_database("pymongo_test2") + c.drop_database("pymongo_test_mike") + c.drop_database("pymongo_test_bernie") + except AutoReconnect: + # PYTHON-4982 + if sys.implementation.name.lower() != "pypy": + raise c.close() print_running_clients() diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index 48c9dc2920..6ba2290875 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -33,6 +33,7 @@ from asyncio import iscoroutinefunction from pymongo.asynchronous.uri_parser import parse_uri +from pymongo.errors import AutoReconnect try: import ipaddress @@ -1235,12 +1236,17 @@ async def async_teardown(): c = async_client_context.client if c: if not async_client_context.is_data_lake: - await c.drop_database("pymongo-pooling-tests") - await c.drop_database("pymongo_test") - await c.drop_database("pymongo_test1") - await c.drop_database("pymongo_test2") - await c.drop_database("pymongo_test_mike") - await c.drop_database("pymongo_test_bernie") + try: + await c.drop_database("pymongo-pooling-tests") + await c.drop_database("pymongo_test") + await c.drop_database("pymongo_test1") + await c.drop_database("pymongo_test2") + await c.drop_database("pymongo_test_mike") + await c.drop_database("pymongo_test_bernie") + except AutoReconnect: + # PYTHON-4982 + if sys.implementation.name.lower() != "pypy": + raise await c.close() print_running_clients() diff --git a/test/asynchronous/test_client_bulk_write.py b/test/asynchronous/test_client_bulk_write.py index 2f48466af8..7f85df5467 100644 --- a/test/asynchronous/test_client_bulk_write.py +++ b/test/asynchronous/test_client_bulk_write.py @@ -25,6 +25,7 @@ async_client_context, unittest, ) +from test.asynchronous.utils import flaky from test.utils_shared import ( OvertCommandListener, ) @@ -619,8 +620,6 @@ async def test_15_unacknowledged_write_across_batches(self): # https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites class TestClientBulkWriteCSOT(AsyncIntegrationTest): async def asyncSetUp(self): - if os.environ.get("SKIP_CSOT_TESTS", ""): - raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") await super().asyncSetUp() self.max_write_batch_size = await async_client_context.max_write_batch_size self.max_bson_object_size = await async_client_context.max_bson_size @@ -628,7 +627,10 @@ async def asyncSetUp(self): @async_client_context.require_version_min(8, 0, 0, -24) @async_client_context.require_failCommand_fail_point + @flaky(reason="PYTHON-5290", max_runs=3, affects_cpython_linux=True) async def test_timeout_in_multi_batch_bulk_write(self): + if sys.platform != "linux": + self.skipTest("PYTHON-3522 CSOT test runs too slow on Windows and MacOS") _OVERHEAD = 500 internal_client = await self.async_rs_or_single_client(timeoutMS=None) diff --git a/test/asynchronous/test_csot.py b/test/asynchronous/test_csot.py index 46c97ce6d3..a978d1ccc0 100644 --- a/test/asynchronous/test_csot.py +++ b/test/asynchronous/test_csot.py @@ -23,6 +23,7 @@ from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest from test.asynchronous.unified_format import generate_test_classes +from test.asynchronous.utils import flaky import pymongo from pymongo import _csot @@ -43,9 +44,8 @@ class TestCSOT(AsyncIntegrationTest): RUN_ON_LOAD_BALANCER = True + @flaky(reason="PYTHON-3522") async def test_timeout_nested(self): - if os.environ.get("SKIP_CSOT_TESTS", ""): - raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") coll = self.db.coll self.assertEqual(_csot.get_timeout(), None) self.assertEqual(_csot.get_deadline(), float("inf")) @@ -82,9 +82,8 @@ async def test_timeout_nested(self): self.assertEqual(_csot.get_rtt(), 0.0) @async_client_context.require_change_streams + @flaky(reason="PYTHON-3522") async def test_change_stream_can_resume_after_timeouts(self): - if os.environ.get("SKIP_CSOT_TESTS", ""): - raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") coll = self.db.test await coll.insert_one({}) async with await coll.watch() as stream: diff --git a/test/asynchronous/test_cursor.py b/test/asynchronous/test_cursor.py index de836dbf80..984545438e 100644 --- a/test/asynchronous/test_cursor.py +++ b/test/asynchronous/test_cursor.py @@ -31,6 +31,7 @@ sys.path[0:0] = [""] from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest +from test.asynchronous.utils import flaky from test.utils_shared import ( AllowListEventListener, EventListener, @@ -1406,9 +1407,8 @@ async def test_to_list_length(self): docs = await c.to_list(3) self.assertEqual(len(docs), 2) + @flaky(reason="PYTHON-3522") async def test_to_list_csot_applied(self): - if os.environ.get("SKIP_CSOT_TESTS", ""): - raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") client = await self.async_single_client(timeoutMS=500, w=1) coll = client.pymongo.test # Initialize the client with a larger timeout to help make test less flakey @@ -1449,9 +1449,8 @@ async def test_command_cursor_to_list_length(self): self.assertEqual(len(await result.to_list(1)), 1) @async_client_context.require_failCommand_blockConnection + @flaky(reason="PYTHON-3522") async def test_command_cursor_to_list_csot_applied(self): - if os.environ.get("SKIP_CSOT_TESTS", ""): - raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") client = await self.async_single_client(timeoutMS=500, w=1) coll = client.pymongo.test # Initialize the client with a larger timeout to help make test less flakey diff --git a/test/asynchronous/test_encryption.py b/test/asynchronous/test_encryption.py index a766e63915..c2ef7c7e33 100644 --- a/test/asynchronous/test_encryption.py +++ b/test/asynchronous/test_encryption.py @@ -32,6 +32,7 @@ import warnings from test.asynchronous import AsyncIntegrationTest, AsyncPyMongoTestCase, async_client_context from test.asynchronous.test_bulk import AsyncBulkTestBase +from test.asynchronous.utils import flaky from test.asynchronous.utils_spec_runner import AsyncSpecRunner, AsyncSpecTestCreator from threading import Thread from typing import Any, Dict, Mapping, Optional @@ -3247,6 +3248,7 @@ async def test_kms_retry(self): class TestAutomaticDecryptionKeys(AsyncEncryptionIntegrationTest): @async_client_context.require_no_standalone @async_client_context.require_version_min(7, 0, -1) + @flaky(reason="PYTHON-4982") async def asyncSetUp(self): await super().asyncSetUp() self.key1_document = json_data("etc", "data", "keys", "key1-document.json") @@ -3489,6 +3491,8 @@ async def test_implicit_session_ignored_when_unsupported(self): self.assertNotIn("lsid", self.listener.started_events[1].command) + await self.mongocryptd_client.close() + async def test_explicit_session_errors_when_unsupported(self): self.listener.reset() async with self.mongocryptd_client.start_session() as s: @@ -3501,6 +3505,8 @@ async def test_explicit_session_errors_when_unsupported(self): ): await self.mongocryptd_client.db.test.insert_one({"x": 1}, session=s) + await self.mongocryptd_client.close() + if __name__ == "__main__": unittest.main() diff --git a/test/asynchronous/test_retryable_writes.py b/test/asynchronous/test_retryable_writes.py index 4fe5e5e37f..ddb1d39eb7 100644 --- a/test/asynchronous/test_retryable_writes.py +++ b/test/asynchronous/test_retryable_writes.py @@ -20,7 +20,7 @@ import pprint import sys import threading -from test.asynchronous.utils import async_set_fail_point +from test.asynchronous.utils import async_set_fail_point, flaky sys.path[0:0] = [""] @@ -466,6 +466,7 @@ class TestPoolPausedError(AsyncIntegrationTest): @async_client_context.require_failCommand_blockConnection @async_client_context.require_retryable_writes @client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05) + @flaky(reason="PYTHON-5291") async def test_pool_paused_error_is_retryable(self): cmap_listener = CMAPListener() cmd_listener = OvertCommandListener() diff --git a/test/asynchronous/test_server_selection_in_window.py b/test/asynchronous/test_server_selection_in_window.py index 3fe448d4dd..dd0ff734f7 100644 --- a/test/asynchronous/test_server_selection_in_window.py +++ b/test/asynchronous/test_server_selection_in_window.py @@ -21,6 +21,7 @@ from pathlib import Path from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest from test.asynchronous.helpers import ConcurrentRunner +from test.asynchronous.utils import flaky from test.asynchronous.utils_selection_tests import create_topology from test.asynchronous.utils_spec_runner import AsyncSpecTestCreator from test.utils_shared import ( @@ -137,6 +138,7 @@ async def frequencies(self, client, listener, n_finds=10): @async_client_context.require_failCommand_appName @async_client_context.require_multiple_mongoses + @flaky(reason="PYTHON-3689") async def test_load_balancing(self): listener = OvertCommandListener() cmap_listener = CMAPListener() diff --git a/test/asynchronous/test_srv_polling.py b/test/asynchronous/test_srv_polling.py index 3ba50e77a8..bc167432c3 100644 --- a/test/asynchronous/test_srv_polling.py +++ b/test/asynchronous/test_srv_polling.py @@ -18,6 +18,7 @@ import asyncio import sys import time +from test.asynchronous.utils import flaky from test.utils_shared import FunctionCallRecorder from typing import Any @@ -254,6 +255,7 @@ def final_callback(): # Nodelist should reflect new valid DNS resolver response. await self.assert_nodelist_change(response_final, client) + @flaky(reason="PYTHON-5315") async def test_recover_from_initially_empty_seedlist(self): def empty_seedlist(): return [] diff --git a/test/asynchronous/unified_format.py b/test/asynchronous/unified_format.py index 5f6468b952..61a294dece 100644 --- a/test/asynchronous/unified_format.py +++ b/test/asynchronous/unified_format.py @@ -35,12 +35,11 @@ client_knobs, unittest, ) -from test.asynchronous.utils import async_get_pool +from test.asynchronous.utils import async_get_pool, flaky from test.asynchronous.utils_spec_runner import SpecRunnerTask from test.unified_format_shared import ( KMS_TLS_OPTS, PLACEHOLDER_MAP, - SKIP_CSOT_TESTS, EventListenerUtil, MatchEvaluatorUtil, coerce_result, @@ -518,20 +517,30 @@ def maybe_skip_test(self, spec): self.skipTest("Implement PYTHON-1894") if "timeoutMS applied to entire download" in spec["description"]: self.skipTest("PyMongo's open_download_stream does not cap the stream's lifetime") - if ( - "Error returned from connection pool clear with interruptInUseConnections=true is retryable" - in spec["description"] - and not _IS_SYNC - ): - self.skipTest("PYTHON-5170 tests are flakey") - if "Driver extends timeout while streaming" in spec["description"] and not _IS_SYNC: - self.skipTest("PYTHON-5174 tests are flakey") class_name = self.__class__.__name__.lower() description = spec["description"].lower() if "csot" in class_name: - if "gridfs" in class_name and sys.platform == "win32": - self.skipTest("PYTHON-3522 CSOT GridFS tests are flaky on Windows") + # Skip tests that are too slow to run on a given platform. + slow_macos = [ + "operation fails after two consecutive socket timeouts.*", + "operation succeeds after one socket timeout.*", + "Non-tailable cursor lifetime remaining timeoutMS applied to getMore if timeoutMode is unset", + ] + slow_win32 = [ + *slow_macos, + "maxTimeMS value in the command is less than timeoutMS", + ] + if sys.platform == "win32" and "gridfs" in class_name: + self.skipTest("PYTHON-3522 CSOT GridFS test runs too slow on Windows") + if sys.platform == "win32": + for pat in slow_win32: + if re.match(pat.lower(), description): + self.skipTest("PYTHON-3522 CSOT test runs too slow on Windows") + if sys.platform == "darwin": + for pat in slow_macos: + if re.match(pat.lower(), description): + self.skipTest("PYTHON-3522 CSOT test runs too slow on MacOS") if "change" in description or "change" in class_name: self.skipTest("CSOT not implemented for watch()") if "cursors" in class_name: @@ -1347,38 +1356,31 @@ async def verify_outcome(self, spec): self.assertListEqual(sorted_expected_documents, actual_documents) async def run_scenario(self, spec, uri=None): - if "csot" in self.id().lower() and SKIP_CSOT_TESTS: - raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") - # Kill all sessions before and after each test to prevent an open # transaction (from a test failure) from blocking collection/database # operations during test set up and tear down. await self.kill_all_sessions() - if "csot" in self.id().lower(): - # Retry CSOT tests up to 2 times to deal with flakey tests. - attempts = 3 - for i in range(attempts): - try: - return await self._run_scenario(spec, uri) - except (AssertionError, OperationFailure) as exc: - if isinstance(exc, OperationFailure) and ( - _IS_SYNC or "failpoint" not in exc._message - ): - raise - if i < attempts - 1: - print( - f"Retrying after attempt {i+1} of {self.id()} failed with:\n" - f"{traceback.format_exc()}", - file=sys.stderr, - ) - await self.asyncSetUp() - continue - raise - return None - else: - await self._run_scenario(spec, uri) - return None + # Handle flaky tests. + flaky_tests = [ + ("PYTHON-5170", ".*test_discovery_and_monitoring.*"), + ("PYTHON-5174", ".*Driver_extends_timeout_while_streaming"), + ("PYTHON-5315", ".*TestSrvPolling.test_recover_from_initially_.*"), + ("PYTHON-4987", ".*UnknownTransactionCommitResult_labels_to_connection_errors"), + ("PYTHON-3689", ".*TestProse.test_load_balancing"), + ("PYTHON-3522", ".*csot.*"), + ] + for reason, flaky_test in flaky_tests: + if re.match(flaky_test.lower(), self.id().lower()) is not None: + func_name = self.id() + options = dict(reason=reason, reset_func=self.asyncSetUp, func_name=func_name) + if "csot" in func_name.lower(): + options["max_runs"] = 3 + options["affects_cpython_linux"] = True + decorator = flaky(**options) + await decorator(self._run_scenario)(spec, uri) + return + await self._run_scenario(spec, uri) async def _run_scenario(self, spec, uri=None): # maybe skip test manually diff --git a/test/asynchronous/utils.py b/test/asynchronous/utils.py index ca80d1f6dd..982564a0a5 100644 --- a/test/asynchronous/utils.py +++ b/test/asynchronous/utils.py @@ -17,10 +17,14 @@ import asyncio import contextlib +import os import random +import sys import threading # Used in the synchronized version of this file import time +import traceback from asyncio import iscoroutinefunction +from functools import wraps from bson.son import SON from pymongo import AsyncMongoClient @@ -154,6 +158,62 @@ async def async_joinall(tasks): await asyncio.wait([t.task for t in tasks if t is not None], timeout=300) +def flaky( + *, + reason=None, + max_runs=2, + min_passes=1, + delay=1, + affects_cpython_linux=False, + func_name=None, + reset_func=None, +): + """Decorate a test as flaky. + + :param reason: the reason why the test is flaky + :param max_runs: the maximum number of runs before raising an error + :param min_passes: the minimum number of passing runs + :param delay: the delay in seconds between retries + :param affects_cpython_links: whether the test is flaky on CPython on Linux + :param func_name: the name of the function, used for the rety message + :param reset_func: a function to call before retrying + + """ + if reason is None: + raise ValueError("flaky requires a reason input") + is_cpython_linux = sys.platform == "linux" and sys.implementation.name == "cpython" + disable_flaky = "DISABLE_FLAKY" in os.environ + if disable_flaky or (is_cpython_linux and not affects_cpython_linux): + max_runs = 1 + min_passes = 1 + + def decorator(target_func): + @wraps(target_func) + async def wrapper(*args, **kwargs): + passes = 0 + for i in range(max_runs): + try: + result = await target_func(*args, **kwargs) + passes += 1 + if passes == min_passes: + return result + except Exception as e: + if i == max_runs - 1: + raise e + print( + f"Retrying after attempt {i+1} of {func_name or target_func.__name__} failed with ({reason})):\n" + f"{traceback.format_exc()}", + file=sys.stderr, + ) + await asyncio.sleep(delay) + if reset_func: + await reset_func() + + return wrapper + + return decorator + + class AsyncMockConnection: def __init__(self): self.cancel_context = _CancellationContext() diff --git a/test/test_client_bulk_write.py b/test/test_client_bulk_write.py index 84313c5be0..4cc4ab4122 100644 --- a/test/test_client_bulk_write.py +++ b/test/test_client_bulk_write.py @@ -25,6 +25,7 @@ client_context, unittest, ) +from test.utils import flaky from test.utils_shared import ( OvertCommandListener, ) @@ -615,8 +616,6 @@ def test_15_unacknowledged_write_across_batches(self): # https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites class TestClientBulkWriteCSOT(IntegrationTest): def setUp(self): - if os.environ.get("SKIP_CSOT_TESTS", ""): - raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") super().setUp() self.max_write_batch_size = client_context.max_write_batch_size self.max_bson_object_size = client_context.max_bson_size @@ -624,7 +623,10 @@ def setUp(self): @client_context.require_version_min(8, 0, 0, -24) @client_context.require_failCommand_fail_point + @flaky(reason="PYTHON-5290", max_runs=3, affects_cpython_linux=True) def test_timeout_in_multi_batch_bulk_write(self): + if sys.platform != "linux": + self.skipTest("PYTHON-3522 CSOT test runs too slow on Windows and MacOS") _OVERHEAD = 500 internal_client = self.rs_or_single_client(timeoutMS=None) diff --git a/test/test_csot.py b/test/test_csot.py index ff907cc9c5..981af1ed03 100644 --- a/test/test_csot.py +++ b/test/test_csot.py @@ -23,6 +23,7 @@ from test import IntegrationTest, client_context, unittest from test.unified_format import generate_test_classes +from test.utils import flaky import pymongo from pymongo import _csot @@ -43,9 +44,8 @@ class TestCSOT(IntegrationTest): RUN_ON_LOAD_BALANCER = True + @flaky(reason="PYTHON-3522") def test_timeout_nested(self): - if os.environ.get("SKIP_CSOT_TESTS", ""): - raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") coll = self.db.coll self.assertEqual(_csot.get_timeout(), None) self.assertEqual(_csot.get_deadline(), float("inf")) @@ -82,9 +82,8 @@ def test_timeout_nested(self): self.assertEqual(_csot.get_rtt(), 0.0) @client_context.require_change_streams + @flaky(reason="PYTHON-3522") def test_change_stream_can_resume_after_timeouts(self): - if os.environ.get("SKIP_CSOT_TESTS", ""): - raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") coll = self.db.test coll.insert_one({}) with coll.watch() as stream: diff --git a/test/test_cursor.py b/test/test_cursor.py index 83f2b79316..74fe429269 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -31,6 +31,7 @@ sys.path[0:0] = [""] from test import IntegrationTest, client_context, unittest +from test.utils import flaky from test.utils_shared import ( AllowListEventListener, EventListener, @@ -1397,9 +1398,8 @@ def test_to_list_length(self): docs = c.to_list(3) self.assertEqual(len(docs), 2) + @flaky(reason="PYTHON-3522") def test_to_list_csot_applied(self): - if os.environ.get("SKIP_CSOT_TESTS", ""): - raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") client = self.single_client(timeoutMS=500, w=1) coll = client.pymongo.test # Initialize the client with a larger timeout to help make test less flakey @@ -1440,9 +1440,8 @@ def test_command_cursor_to_list_length(self): self.assertEqual(len(result.to_list(1)), 1) @client_context.require_failCommand_blockConnection + @flaky(reason="PYTHON-3522") def test_command_cursor_to_list_csot_applied(self): - if os.environ.get("SKIP_CSOT_TESTS", ""): - raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") client = self.single_client(timeoutMS=500, w=1) coll = client.pymongo.test # Initialize the client with a larger timeout to help make test less flakey diff --git a/test/test_encryption.py b/test/test_encryption.py index baaefa1e73..68b24f1729 100644 --- a/test/test_encryption.py +++ b/test/test_encryption.py @@ -32,6 +32,7 @@ import warnings from test import IntegrationTest, PyMongoTestCase, client_context from test.test_bulk import BulkTestBase +from test.utils import flaky from test.utils_spec_runner import SpecRunner, SpecTestCreator from threading import Thread from typing import Any, Dict, Mapping, Optional @@ -3229,6 +3230,7 @@ def test_kms_retry(self): class TestAutomaticDecryptionKeys(EncryptionIntegrationTest): @client_context.require_no_standalone @client_context.require_version_min(7, 0, -1) + @flaky(reason="PYTHON-4982") def setUp(self): super().setUp() self.key1_document = json_data("etc", "data", "keys", "key1-document.json") @@ -3471,6 +3473,8 @@ def test_implicit_session_ignored_when_unsupported(self): self.assertNotIn("lsid", self.listener.started_events[1].command) + self.mongocryptd_client.close() + def test_explicit_session_errors_when_unsupported(self): self.listener.reset() with self.mongocryptd_client.start_session() as s: @@ -3483,6 +3487,8 @@ def test_explicit_session_errors_when_unsupported(self): ): self.mongocryptd_client.db.test.insert_one({"x": 1}, session=s) + self.mongocryptd_client.close() + if __name__ == "__main__": unittest.main() diff --git a/test/test_retryable_writes.py b/test/test_retryable_writes.py index 2ac08691cf..a74a3e8030 100644 --- a/test/test_retryable_writes.py +++ b/test/test_retryable_writes.py @@ -20,7 +20,7 @@ import pprint import sys import threading -from test.utils import set_fail_point +from test.utils import flaky, set_fail_point sys.path[0:0] = [""] @@ -464,6 +464,7 @@ class TestPoolPausedError(IntegrationTest): @client_context.require_failCommand_blockConnection @client_context.require_retryable_writes @client_knobs(heartbeat_frequency=0.05, min_heartbeat_interval=0.05) + @flaky(reason="PYTHON-5291") def test_pool_paused_error_is_retryable(self): cmap_listener = CMAPListener() cmd_listener = OvertCommandListener() diff --git a/test/test_server_selection_in_window.py b/test/test_server_selection_in_window.py index 4aad34050c..fcf2cce0e0 100644 --- a/test/test_server_selection_in_window.py +++ b/test/test_server_selection_in_window.py @@ -21,6 +21,7 @@ from pathlib import Path from test import IntegrationTest, client_context, unittest from test.helpers import ConcurrentRunner +from test.utils import flaky from test.utils_selection_tests import create_topology from test.utils_shared import ( CMAPListener, @@ -137,6 +138,7 @@ def frequencies(self, client, listener, n_finds=10): @client_context.require_failCommand_appName @client_context.require_multiple_mongoses + @flaky(reason="PYTHON-3689") def test_load_balancing(self): listener = OvertCommandListener() cmap_listener = CMAPListener() diff --git a/test/test_srv_polling.py b/test/test_srv_polling.py index 971c3bad50..3ab1a514c1 100644 --- a/test/test_srv_polling.py +++ b/test/test_srv_polling.py @@ -18,6 +18,7 @@ import asyncio import sys import time +from test.utils import flaky from test.utils_shared import FunctionCallRecorder from typing import Any @@ -254,6 +255,7 @@ def final_callback(): # Nodelist should reflect new valid DNS resolver response. self.assert_nodelist_change(response_final, client) + @flaky(reason="PYTHON-5315") def test_recover_from_initially_empty_seedlist(self): def empty_seedlist(): return [] diff --git a/test/test_topology.py b/test/test_topology.py index 530cecd1f7..837cf25c62 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -23,7 +23,7 @@ from test import client_knobs, unittest from test.pymongo_mocks import DummyMonitor -from test.utils import MockPool +from test.utils import MockPool, flaky from test.utils_shared import wait_until from bson.objectid import ObjectId @@ -750,6 +750,7 @@ def get_primary(): class TestTopologyErrors(TopologyTest): # Errors when calling hello. + @flaky(reason="PYTHON-5366") def test_pool_reset(self): # hello succeeds at first, then always raises socket error. hello_count = [0] diff --git a/test/unified_format.py b/test/unified_format.py index e45922819d..1b2275c5d2 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -38,7 +38,6 @@ from test.unified_format_shared import ( KMS_TLS_OPTS, PLACEHOLDER_MAP, - SKIP_CSOT_TESTS, EventListenerUtil, MatchEvaluatorUtil, coerce_result, @@ -48,7 +47,7 @@ parse_collection_or_database_options, with_metaclass, ) -from test.utils import get_pool +from test.utils import flaky, get_pool from test.utils_shared import ( camel_to_snake, camel_to_snake_args, @@ -517,20 +516,30 @@ def maybe_skip_test(self, spec): self.skipTest("Implement PYTHON-1894") if "timeoutMS applied to entire download" in spec["description"]: self.skipTest("PyMongo's open_download_stream does not cap the stream's lifetime") - if ( - "Error returned from connection pool clear with interruptInUseConnections=true is retryable" - in spec["description"] - and not _IS_SYNC - ): - self.skipTest("PYTHON-5170 tests are flakey") - if "Driver extends timeout while streaming" in spec["description"] and not _IS_SYNC: - self.skipTest("PYTHON-5174 tests are flakey") class_name = self.__class__.__name__.lower() description = spec["description"].lower() if "csot" in class_name: - if "gridfs" in class_name and sys.platform == "win32": - self.skipTest("PYTHON-3522 CSOT GridFS tests are flaky on Windows") + # Skip tests that are too slow to run on a given platform. + slow_macos = [ + "operation fails after two consecutive socket timeouts.*", + "operation succeeds after one socket timeout.*", + "Non-tailable cursor lifetime remaining timeoutMS applied to getMore if timeoutMode is unset", + ] + slow_win32 = [ + *slow_macos, + "maxTimeMS value in the command is less than timeoutMS", + ] + if sys.platform == "win32" and "gridfs" in class_name: + self.skipTest("PYTHON-3522 CSOT GridFS test runs too slow on Windows") + if sys.platform == "win32": + for pat in slow_win32: + if re.match(pat.lower(), description): + self.skipTest("PYTHON-3522 CSOT test runs too slow on Windows") + if sys.platform == "darwin": + for pat in slow_macos: + if re.match(pat.lower(), description): + self.skipTest("PYTHON-3522 CSOT test runs too slow on MacOS") if "change" in description or "change" in class_name: self.skipTest("CSOT not implemented for watch()") if "cursors" in class_name: @@ -1334,38 +1343,31 @@ def verify_outcome(self, spec): self.assertListEqual(sorted_expected_documents, actual_documents) def run_scenario(self, spec, uri=None): - if "csot" in self.id().lower() and SKIP_CSOT_TESTS: - raise unittest.SkipTest("SKIP_CSOT_TESTS is set, skipping...") - # Kill all sessions before and after each test to prevent an open # transaction (from a test failure) from blocking collection/database # operations during test set up and tear down. self.kill_all_sessions() - if "csot" in self.id().lower(): - # Retry CSOT tests up to 2 times to deal with flakey tests. - attempts = 3 - for i in range(attempts): - try: - return self._run_scenario(spec, uri) - except (AssertionError, OperationFailure) as exc: - if isinstance(exc, OperationFailure) and ( - _IS_SYNC or "failpoint" not in exc._message - ): - raise - if i < attempts - 1: - print( - f"Retrying after attempt {i+1} of {self.id()} failed with:\n" - f"{traceback.format_exc()}", - file=sys.stderr, - ) - self.setUp() - continue - raise - return None - else: - self._run_scenario(spec, uri) - return None + # Handle flaky tests. + flaky_tests = [ + ("PYTHON-5170", ".*test_discovery_and_monitoring.*"), + ("PYTHON-5174", ".*Driver_extends_timeout_while_streaming"), + ("PYTHON-5315", ".*TestSrvPolling.test_recover_from_initially_.*"), + ("PYTHON-4987", ".*UnknownTransactionCommitResult_labels_to_connection_errors"), + ("PYTHON-3689", ".*TestProse.test_load_balancing"), + ("PYTHON-3522", ".*csot.*"), + ] + for reason, flaky_test in flaky_tests: + if re.match(flaky_test.lower(), self.id().lower()) is not None: + func_name = self.id() + options = dict(reason=reason, reset_func=self.setUp, func_name=func_name) + if "csot" in func_name.lower(): + options["max_runs"] = 3 + options["affects_cpython_linux"] = True + decorator = flaky(**options) + decorator(self._run_scenario)(spec, uri) + return + self._run_scenario(spec, uri) def _run_scenario(self, spec, uri=None): # maybe skip test manually diff --git a/test/unified_format_shared.py b/test/unified_format_shared.py index ea0f2f233e..17dd73ec8c 100644 --- a/test/unified_format_shared.py +++ b/test/unified_format_shared.py @@ -91,8 +91,6 @@ from pymongo.server_description import ServerDescription from pymongo.topology_description import TopologyDescription -SKIP_CSOT_TESTS = os.getenv("SKIP_CSOT_TESTS") - JSON_OPTS = json_util.JSONOptions(tz_aware=False) IS_INTERRUPTED = False diff --git a/test/utils.py b/test/utils.py index 25d95d1d3c..855f453c8a 100644 --- a/test/utils.py +++ b/test/utils.py @@ -17,10 +17,14 @@ import asyncio import contextlib +import os import random +import sys import threading # Used in the synchronized version of this file import time +import traceback from asyncio import iscoroutinefunction +from functools import wraps from bson.son import SON from pymongo import MongoClient @@ -152,6 +156,62 @@ def joinall(tasks): asyncio.wait([t.task for t in tasks if t is not None], timeout=300) +def flaky( + *, + reason=None, + max_runs=2, + min_passes=1, + delay=1, + affects_cpython_linux=False, + func_name=None, + reset_func=None, +): + """Decorate a test as flaky. + + :param reason: the reason why the test is flaky + :param max_runs: the maximum number of runs before raising an error + :param min_passes: the minimum number of passing runs + :param delay: the delay in seconds between retries + :param affects_cpython_links: whether the test is flaky on CPython on Linux + :param func_name: the name of the function, used for the rety message + :param reset_func: a function to call before retrying + + """ + if reason is None: + raise ValueError("flaky requires a reason input") + is_cpython_linux = sys.platform == "linux" and sys.implementation.name == "cpython" + disable_flaky = "DISABLE_FLAKY" in os.environ + if disable_flaky or (is_cpython_linux and not affects_cpython_linux): + max_runs = 1 + min_passes = 1 + + def decorator(target_func): + @wraps(target_func) + def wrapper(*args, **kwargs): + passes = 0 + for i in range(max_runs): + try: + result = target_func(*args, **kwargs) + passes += 1 + if passes == min_passes: + return result + except Exception as e: + if i == max_runs - 1: + raise e + print( + f"Retrying after attempt {i+1} of {func_name or target_func.__name__} failed with ({reason})):\n" + f"{traceback.format_exc()}", + file=sys.stderr, + ) + time.sleep(delay) + if reset_func: + reset_func() + + return wrapper + + return decorator + + class MockConnection: def __init__(self): self.cancel_context = _CancellationContext()