| #!/usr/bin/env python |
| # SPDX-License-Identifier: GPL-2.0+ |
| # |
| # Modified by: Corey Goldberg, 2013 |
| # |
| # Original code from: |
| # Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013) |
| # Copyright (C) 2005-2011 Canonical Ltd |
| |
| """Python testtools extension for running unittest suites concurrently. |
| |
| The `testtools` project provides a ConcurrentTestSuite class, but does |
| not provide a `make_tests` implementation needed to use it. |
| |
| This allows you to parallelize a test run across a configurable number |
| of worker processes. While this can speed up CPU-bound test runs, it is |
| mainly useful for IO-bound tests that spend most of their time waiting for |
| data to arrive from someplace else and can benefit from cocncurrency. |
| |
| Unix only. |
| """ |
| |
| import os |
| import sys |
| import traceback |
| import unittest |
| from itertools import cycle |
| from multiprocessing import cpu_count |
| |
| from subunit import ProtocolTestCase, TestProtocolClient |
| from subunit.test_results import AutoTimingTestResultDecorator |
| |
| from testtools import ConcurrentTestSuite, iterate_tests |
| from testtools.content import TracebackContent, text_content |
| |
| |
| _all__ = [ |
| 'ConcurrentTestSuite', |
| 'fork_for_tests', |
| 'partition_tests', |
| ] |
| |
| |
| CPU_COUNT = cpu_count() |
| |
| |
| class BufferingTestProtocolClient(TestProtocolClient): |
| """A TestProtocolClient which can buffer the test outputs |
| |
| This class captures the stdout and stderr output streams of the |
| tests as it runs them, and includes the output texts in the subunit |
| stream as additional details. |
| |
| Args: |
| stream: A file-like object to write a subunit stream to |
| buffer (bool): True to capture test stdout/stderr outputs and |
| include them in the test details |
| """ |
| def __init__(self, stream, buffer=True): |
| super().__init__(stream) |
| self.buffer = buffer |
| |
| def _addOutcome(self, outcome, test, error=None, details=None, |
| error_permitted=True): |
| """Report a test outcome to the subunit stream |
| |
| The parent class uses this function as a common implementation |
| for various methods that report successes, errors, failures, etc. |
| |
| This version automatically upgrades the error tracebacks to the |
| new 'details' format by wrapping them in a Content object, so |
| that we can include the captured test output in the test result |
| details. |
| |
| Args: |
| outcome: A string describing the outcome - used as the |
| event name in the subunit stream. |
| test: The test case whose outcome is to be reported |
| error: Standard unittest positional argument form - an |
| exc_info tuple. |
| details: New Testing-in-python drafted API; a dict from |
| string to subunit.Content objects. |
| error_permitted: If True then one and only one of error or |
| details must be supplied. If False then error must not |
| be supplied and details is still optional. |
| """ |
| if details is None: |
| details = {} |
| |
| # Parent will raise an exception if error_permitted is False but |
| # error is not None. We want that exception in that case, so |
| # don't touch error when error_permitted is explicitly False. |
| if error_permitted and error is not None: |
| # Parent class prefers error over details |
| details['traceback'] = TracebackContent(error, test) |
| error_permitted = False |
| error = None |
| |
| if self.buffer: |
| stdout = sys.stdout.getvalue() |
| if stdout: |
| details['stdout'] = text_content(stdout) |
| |
| stderr = sys.stderr.getvalue() |
| if stderr: |
| details['stderr'] = text_content(stderr) |
| |
| return super()._addOutcome(outcome, test, error=error, |
| details=details, error_permitted=error_permitted) |
| |
| |
| def fork_for_tests(concurrency_num=CPU_COUNT, buffer=False): |
| """Implementation of `make_tests` used to construct `ConcurrentTestSuite`. |
| |
| :param concurrency_num: number of processes to use. |
| """ |
| if buffer: |
| test_protocol_client_class = BufferingTestProtocolClient |
| else: |
| test_protocol_client_class = TestProtocolClient |
| |
| def do_fork(suite): |
| """Take suite and start up multiple runners by forking (Unix only). |
| |
| :param suite: TestSuite object. |
| |
| :return: An iterable of TestCase-like objects which can each have |
| run(result) called on them to feed tests to result. |
| """ |
| result = [] |
| test_blocks = partition_tests(suite, concurrency_num) |
| # Clear the tests from the original suite so it doesn't keep them alive |
| suite._tests[:] = [] |
| for process_tests in test_blocks: |
| process_suite = unittest.TestSuite(process_tests) |
| # Also clear each split list so new suite has only reference |
| process_tests[:] = [] |
| c2pread, c2pwrite = os.pipe() |
| pid = os.fork() |
| if pid == 0: |
| try: |
| stream = os.fdopen(c2pwrite, 'wb') |
| os.close(c2pread) |
| # Leave stderr and stdout open so we can see test noise |
| # Close stdin so that the child goes away if it decides to |
| # read from stdin (otherwise its a roulette to see what |
| # child actually gets keystrokes for pdb etc). |
| sys.stdin.close() |
| subunit_result = AutoTimingTestResultDecorator( |
| test_protocol_client_class(stream) |
| ) |
| process_suite.run(subunit_result) |
| except: |
| # Try and report traceback on stream, but exit with error |
| # even if stream couldn't be created or something else |
| # goes wrong. The traceback is formatted to a string and |
| # written in one go to avoid interleaving lines from |
| # multiple failing children. |
| try: |
| stream.write(traceback.format_exc()) |
| finally: |
| os._exit(1) |
| os._exit(0) |
| else: |
| os.close(c2pwrite) |
| stream = os.fdopen(c2pread, 'rb') |
| # If we don't pass the second argument here, it defaults |
| # to sys.stdout.buffer down the line. But if we don't |
| # pass it *now*, it may be resolved after sys.stdout is |
| # replaced with a StringIO (to capture tests' outputs) |
| # which doesn't have a buffer attribute and can end up |
| # occasionally causing a 'broken-runner' error. |
| test = ProtocolTestCase(stream, sys.stdout.buffer) |
| result.append(test) |
| return result |
| return do_fork |
| |
| |
| def partition_tests(suite, count): |
| """Partition suite into count lists of tests.""" |
| # This just assigns tests in a round-robin fashion. On one hand this |
| # splits up blocks of related tests that might run faster if they shared |
| # resources, but on the other it avoids assigning blocks of slow tests to |
| # just one partition. So the slowest partition shouldn't be much slower |
| # than the fastest. |
| partitions = [list() for _ in range(count)] |
| tests = iterate_tests(suite) |
| for partition, test in zip(cycle(partitions), tests): |
| partition.append(test) |
| return partitions |
| |
| |
| if __name__ == '__main__': |
| import time |
| |
| class SampleTestCase(unittest.TestCase): |
| """Dummy tests that sleep for demo.""" |
| |
| def test_me_1(self): |
| time.sleep(0.5) |
| |
| def test_me_2(self): |
| time.sleep(0.5) |
| |
| def test_me_3(self): |
| time.sleep(0.5) |
| |
| def test_me_4(self): |
| time.sleep(0.5) |
| |
| # Load tests from SampleTestCase defined above |
| suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase) |
| runner = unittest.TextTestRunner() |
| |
| # Run tests sequentially |
| runner.run(suite) |
| |
| # Run same tests across 4 processes |
| suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase) |
| concurrent_suite = ConcurrentTestSuite(suite, fork_for_tests(4)) |
| runner.run(concurrent_suite) |