iotests: check: multiprocessing support

Add -j <JOBS> parameter, to run tests in several jobs simultaneously.
For realization - simply utilize multiprocessing.Pool class.

Notes:

1. Of course, tests can't run simultaneously in same TEST_DIR. So,
   use subdirectories TEST_DIR/testname/ and SOCK_DIR/testname/
   instead of simply TEST_DIR and SOCK_DIR

2. multiprocessing.Pool.starmap function doesn't support passing
   context managers, so we can't simply pass "self". Happily, we need
   self only for read-only access, and it just works if it is defined
   in global space. So, add a temporary link TestRunner.shared_self
   during run_tests().

Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
Message-Id: <20211203122223.2780098-4-vsementsov@virtuozzo.com>
Reviewed-by: John Snow <jsnow@redhat.com>
Tested-by: John Snow <jsnow@redhat.com>
Signed-off-by: Hanna Reitz <hreitz@redhat.com>
This commit is contained in:
Vladimir Sementsov-Ogievskiy 2021-12-03 13:22:23 +01:00 committed by Hanna Reitz
parent 1f257b70d1
commit 722f87df25
2 changed files with 64 additions and 9 deletions

View File

@ -34,6 +34,8 @@ def make_argparser() -> argparse.ArgumentParser:
help='show me, do not run tests') help='show me, do not run tests')
p.add_argument('-makecheck', action='store_true', p.add_argument('-makecheck', action='store_true',
help='pretty print output for make check') help='pretty print output for make check')
p.add_argument('-j', dest='jobs', type=int, default=1,
help='run tests in multiple parallel jobs')
p.add_argument('-d', dest='debug', action='store_true', help='debug') p.add_argument('-d', dest='debug', action='store_true', help='debug')
p.add_argument('-p', dest='print', action='store_true', p.add_argument('-p', dest='print', action='store_true',
@ -165,6 +167,6 @@ if __name__ == '__main__':
with TestRunner(env, makecheck=args.makecheck, with TestRunner(env, makecheck=args.makecheck,
color=args.color) as tr: color=args.color) as tr:
paths = [os.path.join(env.source_iotests, t) for t in tests] paths = [os.path.join(env.source_iotests, t) for t in tests]
ok = tr.run_tests(paths) ok = tr.run_tests(paths, args.jobs)
if not ok: if not ok:
sys.exit(1) sys.exit(1)

View File

@ -26,6 +26,7 @@ import contextlib
import json import json
import termios import termios
import sys import sys
from multiprocessing import Pool
from contextlib import contextmanager from contextlib import contextmanager
from typing import List, Optional, Iterator, Any, Sequence, Dict, \ from typing import List, Optional, Iterator, Any, Sequence, Dict, \
ContextManager ContextManager
@ -126,6 +127,31 @@ class TestResult:
class TestRunner(ContextManager['TestRunner']): class TestRunner(ContextManager['TestRunner']):
shared_self = None
@staticmethod
def proc_run_test(test: str, test_field_width: int) -> TestResult:
# We are in a subprocess, we can't change the runner object!
runner = TestRunner.shared_self
assert runner is not None
return runner.run_test(test, test_field_width, mp=True)
def run_tests_pool(self, tests: List[str],
test_field_width: int, jobs: int) -> List[TestResult]:
# passing self directly to Pool.starmap() just doesn't work, because
# it's a context manager.
assert TestRunner.shared_self is None
TestRunner.shared_self = self
with Pool(jobs) as p:
results = p.starmap(self.proc_run_test,
zip(tests, [test_field_width] * len(tests)))
TestRunner.shared_self = None
return results
def __init__(self, env: TestEnv, makecheck: bool = False, def __init__(self, env: TestEnv, makecheck: bool = False,
color: str = 'auto') -> None: color: str = 'auto') -> None:
self.env = env self.env = env
@ -219,11 +245,16 @@ class TestRunner(ContextManager['TestRunner']):
return f'{test}.out' return f'{test}.out'
def do_run_test(self, test: str) -> TestResult: def do_run_test(self, test: str, mp: bool) -> TestResult:
""" """
Run one test Run one test
:param test: test file path :param test: test file path
:param mp: if true, we are in a multiprocessing environment, use
personal subdirectories for test run
Note: this method may be called from subprocess, so it does not
change ``self`` object in any way!
""" """
f_test = Path(test) f_test = Path(test)
@ -249,6 +280,12 @@ class TestRunner(ContextManager['TestRunner']):
args = [str(f_test.resolve())] args = [str(f_test.resolve())]
env = self.env.prepare_subprocess(args) env = self.env.prepare_subprocess(args)
if mp:
# Split test directories, so that tests running in parallel don't
# break each other.
for d in ['TEST_DIR', 'SOCK_DIR']:
env[d] = os.path.join(env[d], f_test.name)
Path(env[d]).mkdir(parents=True, exist_ok=True)
t0 = time.time() t0 = time.time()
with f_bad.open('w', encoding="utf-8") as f: with f_bad.open('w', encoding="utf-8") as f:
@ -291,23 +328,32 @@ class TestRunner(ContextManager['TestRunner']):
casenotrun=casenotrun) casenotrun=casenotrun)
def run_test(self, test: str, def run_test(self, test: str,
test_field_width: Optional[int] = None) -> TestResult: test_field_width: Optional[int] = None,
mp: bool = False) -> TestResult:
""" """
Run one test and print short status Run one test and print short status
:param test: test file path :param test: test file path
:param test_field_width: width for first field of status format :param test_field_width: width for first field of status format
:param mp: if true, we are in a multiprocessing environment, don't try
to rewrite things in stdout
Note: this method may be called from subprocess, so it does not
change ``self`` object in any way!
""" """
last_el = self.last_elapsed.get(test) last_el = self.last_elapsed.get(test)
start = datetime.datetime.now().strftime('%H:%M:%S') start = datetime.datetime.now().strftime('%H:%M:%S')
if not self.makecheck: if not self.makecheck:
self.test_print_one_line(test=test, starttime=start, self.test_print_one_line(test=test,
lasttime=last_el, end='\r', status = 'started' if mp else '...',
starttime=start,
lasttime=last_el,
end = '\n' if mp else '\r',
test_field_width=test_field_width) test_field_width=test_field_width)
res = self.do_run_test(test) res = self.do_run_test(test, mp)
end = datetime.datetime.now().strftime('%H:%M:%S') end = datetime.datetime.now().strftime('%H:%M:%S')
self.test_print_one_line(test=test, status=res.status, self.test_print_one_line(test=test, status=res.status,
@ -321,7 +367,7 @@ class TestRunner(ContextManager['TestRunner']):
return res return res
def run_tests(self, tests: List[str]) -> bool: def run_tests(self, tests: List[str], jobs: int = 1) -> bool:
n_run = 0 n_run = 0
failed = [] failed = []
notrun = [] notrun = []
@ -332,9 +378,16 @@ class TestRunner(ContextManager['TestRunner']):
test_field_width = max(len(os.path.basename(t)) for t in tests) + 2 test_field_width = max(len(os.path.basename(t)) for t in tests) + 2
for t in tests: if jobs > 1:
results = self.run_tests_pool(tests, test_field_width, jobs)
for i, t in enumerate(tests):
name = os.path.basename(t) name = os.path.basename(t)
res = self.run_test(t, test_field_width=test_field_width)
if jobs > 1:
res = results[i]
else:
res = self.run_test(t, test_field_width)
assert res.status in ('pass', 'fail', 'not run') assert res.status in ('pass', 'fail', 'not run')