Skip to content
This repository has been archived by the owner on May 27, 2024. It is now read-only.
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: ansible/pytest-mp
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: master
Choose a base ref
...
head repository: vinissimus/pytest-mp
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
Able to merge. These branches can be automatically merged.
  • 6 commits
  • 3 files changed
  • 2 contributors

Commits on Apr 16, 2021

  1. Custom run strategy

    masipcat committed Apr 16, 2021
    Copy the full SHA
    c52bde5 View commit details
  2. Some changes

    masipcat committed Apr 16, 2021
    Copy the full SHA
    d94bee1 View commit details

Commits on Apr 19, 2021

  1. Copy the full SHA
    5aa3a4d View commit details

Commits on Aug 16, 2021

  1. Support pytest 6 (#2)

    masipcat authored Aug 16, 2021
    Copy the full SHA
    f387da8 View commit details

Commits on Aug 17, 2021

  1. Copy the full SHA
    3191536 View commit details

Commits on May 11, 2022

  1. Copy the full SHA
    19d1e7f View commit details
Showing with 33 additions and 64 deletions.
  1. +29 −48 pytest_mp/plugin.py
  2. +3 −15 pytest_mp/terminal.py
  3. +1 −1 setup.py
77 changes: 29 additions & 48 deletions pytest_mp/plugin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from contextlib import contextmanager
from contextvars import ContextVar
import multiprocessing
import collections

@@ -27,11 +28,13 @@ def pytest_addoption(parser):
help="show failures and errors instantly as they occur (disabled by default).")


# https://stackoverflow.com/a/45351044
fixture_lock = ContextVar("Lock")

manager = multiprocessing.Manager()
# Used for "global" synchronization access.
synchronization = dict(manager=manager)
synchronization['fixture_message_board'] = manager.dict()
synchronization['fixture_lock'] = manager.Lock()

state_fixtures = dict(use_mp=False, num_processes=None)

@@ -53,7 +56,7 @@ def mp_message_board():

@pytest.fixture(scope='session')
def mp_lock():
return synchronization['fixture_lock']
return fixture_lock.get()


@pytest.fixture(scope='session')
@@ -66,7 +69,7 @@ def trail(name, state='start'):
raise Exception('mp_trail state must be "start" or "finish": {}'.format(state))

consumer_key = name + '__consumers__'
with synchronization['fixture_lock']:
with fixture_lock.get():
if state == 'start':
if consumer_key not in message_board:
message_board[consumer_key] = 1
@@ -76,10 +79,9 @@ def trail(name, state='start'):
yield False
else:
message_board[consumer_key] -= 1
if message_board[consumer_key]:
if message_board[consumer_key] > 0:
yield False
else:
del message_board[consumer_key]
yield True

return trail
@@ -210,15 +212,16 @@ def submit_test_to_process(test, session):

def submit_batch_to_process(batch, session):

def run_batch(tests, finished_signal):
def run_batch(lock, tests, finished_signal):
fixture_lock.set(lock)
for i, test in enumerate(tests):
next_test = tests[i + 1] if i + 1 < len(tests) else None
test.config.hook.pytest_runtest_protocol(item=test, nextitem=next_test)
if session.shouldstop:
raise session.Interrupted(session.shouldstop)
finished_signal.set()

proc = multiprocessing.Process(target=run_batch, args=(batch['tests'], synchronization['trigger_process_loop']))
proc = multiprocessing.Process(target=run_batch, args=(fixture_lock.get(), batch['tests'], synchronization['trigger_process_loop']))
with synchronization['processes_lock']:
proc.start()
pid = proc.pid
@@ -254,44 +257,20 @@ def wait_until_can_submit(num_processes):


def run_batched_tests(batches, session, num_processes):
sorting = dict(free=3, serial=2, isolated_free=1, isolated_serial=0)
if not num_processes:
num_processes = 1

batch_names = sorted(batches.keys(), key=lambda x: sorting.get(batches[x]['strategy'], 4))
batch_of_tests = {i: [] for i in range(num_processes)}
for batch in batches.values():
for i, test in enumerate(batch['tests']):
batch_of_tests[i % num_processes] += [test]

if not num_processes:
for i, batch in enumerate(batch_names):
next_test = batches[batch_names[i + 1]]['tests'][0] if i + 1 < len(batch_names) else None
run_isolated_serial_batch(batches[batch], next_test, session)
return

for batch in batch_names:
strategy = batches[batch]['strategy']
if strategy == 'free':
for test in batches[batch]['tests']:
wait_until_can_submit(num_processes)
submit_test_to_process(test, session)
reap_finished_processes()
elif strategy == 'serial':
wait_until_can_submit(num_processes)
submit_batch_to_process(batches[batch], session)
reap_finished_processes()
elif strategy == 'isolated_free':
wait_until_no_running()
for test in batches[batch]['tests']:
wait_until_can_submit(num_processes)
submit_test_to_process(test, session)
reap_finished_processes()
wait_until_no_running()
elif strategy == 'isolated_serial':
wait_until_no_running()
submit_batch_to_process(batches[batch], session)
reap_finished_processes()
wait_until_no_running()
else:
raise Exception('Unknown strategy {}'.format(strategy))
for _, tests in batch_of_tests.items():
submit_batch_to_process({"tests": tests}, session)

wait_until_no_running()
# All process finished
reap_finished_processes()
wait_until_can_submit(num_processes)


def process_loop(num_processes):
@@ -336,6 +315,8 @@ def pytest_runtestloop(session):

batches = batch_tests(session)

fixture_lock.set(multiprocessing.Lock())

if not use_mp or not num_processes:
return main.pytest_runtestloop(session)

@@ -371,8 +352,8 @@ def pytest_runtest_logreport(report):
if 'stats' in synchronization:
with synchronization['stats_lock']:
if report.failed and not synchronization['stats']['failed']:
if report.when == 'call':
synchronization['stats']['failed'] = True
# if report.when == 'call':
synchronization['stats']['failed'] = True


@pytest.mark.trylast
@@ -382,21 +363,21 @@ def pytest_configure(config):
"grouped w/ desired strategy: 'free' (default), 'serial', "
"'isolated_free', or 'isolated_serial'.")

if config.option.use_mp is None:
if not config.getini('mp'):
return

standard_reporter = config.pluginmanager.get_plugin('terminalreporter')
if standard_reporter:
from pytest_mp.terminal import MPTerminalReporter
mp_reporter = MPTerminalReporter(standard_reporter, manager)
config.pluginmanager.unregister(standard_reporter)
config.pluginmanager.register(mp_reporter, 'terminalreporter')

if config.option.use_mp is None:
if not config.getini('mp'):
return

if config.option.xmlpath is not None:
from pytest_mp.junitxml import MPLogXML
synchronization['node_reporters'] = manager.list()
synchronization['node_reporters_lock'] = manager.Lock()
synchronization['node_reporters_lock'] = multiprocessing.Lock()
xmlpath = config.option.xmlpath
config.pluginmanager.unregister(config._xml)
config._xml = MPLogXML(xmlpath, config.option.junitprefix, config.getini("junit_suite_name"), manager)
18 changes: 3 additions & 15 deletions pytest_mp/terminal.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from _pytest.terminal import TerminalReporter
import multiprocessing


# Taken from pytest/_pytest/terminal.py
@@ -14,13 +15,13 @@ class MPTerminalReporter(TerminalReporter):

def __init__(self, reporter, manager):
TerminalReporter.__init__(self, reporter.config)
self._tw = self.writer = reporter.writer # some monkeypatching needed to access existing writer
self._tw = reporter._tw
self.manager = manager
self.stats = dict()
self.stat_keys = ['passed', 'failed', 'error', 'skipped', 'warnings', 'xpassed', 'xfailed', '']
for key in self.stat_keys:
self.stats[key] = manager.list()
self.stats_lock = manager.Lock()
self.stats_lock = multiprocessing.Lock()
self._progress_items_reported_proxy = manager.Value('i', 0)

def pytest_collectreport(self, report):
@@ -116,19 +117,6 @@ def print_failure(self, report):
if not self.config.getvalue("usepdb"):
self._outrep_summary(report)

def _write_progress_if_past_edge(self):
if not self._show_progress_info:
return
last_item = self._progress_items_reported_proxy.value == self._session.testscollected
if last_item:
self._write_progress_information_filling_space()
return

past_edge = self._tw.chars_on_current_line + self._PROGRESS_LENGTH + 1 >= self._screen_width
if past_edge:
msg = self._get_progress_information_message()
self._tw.write(msg + '\n', cyan=True)

def _get_progress_information_message(self):
collected = self._session.testscollected
if collected:
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ def run_tests(self):
py_modules=['pytest_mp'],
packages=find_packages(),
install_requires=['pytest', 'psutil'],
setup_requires=['setuptools-markdown'],
setup_requires=[''],
tests_require=['pytest', 'tox'],
classifiers=['Development Status :: 4 - Beta',
'Framework :: Pytest',