blob: 1c4f03f37e5bd7f9eb399867d711600097d1ae18 [file] [log] [blame]
Simon Glass11ae93e2018-10-01 21:12:47 -06001#!/usr/bin/env python
2# SPDX-License-Identifier: GPL-2.0+
3#
4# Modified by: Corey Goldberg, 2013
5#
6# Original code from:
7# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
8# Copyright (C) 2005-2011 Canonical Ltd
9
10"""Python testtools extension for running unittest suites concurrently.
11
12The `testtools` project provides a ConcurrentTestSuite class, but does
13not provide a `make_tests` implementation needed to use it.
14
15This allows you to parallelize a test run across a configurable number
16of worker processes. While this can speed up CPU-bound test runs, it is
17mainly useful for IO-bound tests that spend most of their time waiting for
18data to arrive from someplace else and can benefit from cocncurrency.
19
20Unix only.
21"""
22
23import os
24import sys
25import traceback
26import unittest
27from itertools import cycle
28from multiprocessing import cpu_count
29
30from subunit import ProtocolTestCase, TestProtocolClient
31from subunit.test_results import AutoTimingTestResultDecorator
32
33from testtools import ConcurrentTestSuite, iterate_tests
Alper Nebi Yasakebcaafc2022-04-02 20:06:08 +030034from testtools.content import TracebackContent, text_content
Simon Glass11ae93e2018-10-01 21:12:47 -060035
36
37_all__ = [
38 'ConcurrentTestSuite',
39 'fork_for_tests',
40 'partition_tests',
41]
42
43
44CPU_COUNT = cpu_count()
45
46
Alper Nebi Yasakebcaafc2022-04-02 20:06:08 +030047class BufferingTestProtocolClient(TestProtocolClient):
48 """A TestProtocolClient which can buffer the test outputs
49
50 This class captures the stdout and stderr output streams of the
51 tests as it runs them, and includes the output texts in the subunit
52 stream as additional details.
53
54 Args:
55 stream: A file-like object to write a subunit stream to
56 buffer (bool): True to capture test stdout/stderr outputs and
57 include them in the test details
58 """
59 def __init__(self, stream, buffer=True):
60 super().__init__(stream)
61 self.buffer = buffer
62
63 def _addOutcome(self, outcome, test, error=None, details=None,
64 error_permitted=True):
65 """Report a test outcome to the subunit stream
66
67 The parent class uses this function as a common implementation
68 for various methods that report successes, errors, failures, etc.
69
70 This version automatically upgrades the error tracebacks to the
71 new 'details' format by wrapping them in a Content object, so
72 that we can include the captured test output in the test result
73 details.
74
75 Args:
76 outcome: A string describing the outcome - used as the
77 event name in the subunit stream.
78 test: The test case whose outcome is to be reported
79 error: Standard unittest positional argument form - an
80 exc_info tuple.
81 details: New Testing-in-python drafted API; a dict from
82 string to subunit.Content objects.
83 error_permitted: If True then one and only one of error or
84 details must be supplied. If False then error must not
85 be supplied and details is still optional.
86 """
87 if details is None:
88 details = {}
89
90 # Parent will raise an exception if error_permitted is False but
91 # error is not None. We want that exception in that case, so
92 # don't touch error when error_permitted is explicitly False.
93 if error_permitted and error is not None:
94 # Parent class prefers error over details
95 details['traceback'] = TracebackContent(error, test)
96 error_permitted = False
97 error = None
98
99 if self.buffer:
100 stdout = sys.stdout.getvalue()
101 if stdout:
102 details['stdout'] = text_content(stdout)
103
104 stderr = sys.stderr.getvalue()
105 if stderr:
106 details['stderr'] = text_content(stderr)
107
108 return super()._addOutcome(outcome, test, error=error,
109 details=details, error_permitted=error_permitted)
110
111
112def fork_for_tests(concurrency_num=CPU_COUNT, buffer=False):
Simon Glass11ae93e2018-10-01 21:12:47 -0600113 """Implementation of `make_tests` used to construct `ConcurrentTestSuite`.
114
115 :param concurrency_num: number of processes to use.
116 """
Alper Nebi Yasakebcaafc2022-04-02 20:06:08 +0300117 if buffer:
118 test_protocol_client_class = BufferingTestProtocolClient
119 else:
120 test_protocol_client_class = TestProtocolClient
121
Simon Glass11ae93e2018-10-01 21:12:47 -0600122 def do_fork(suite):
123 """Take suite and start up multiple runners by forking (Unix only).
124
125 :param suite: TestSuite object.
126
127 :return: An iterable of TestCase-like objects which can each have
128 run(result) called on them to feed tests to result.
129 """
130 result = []
131 test_blocks = partition_tests(suite, concurrency_num)
132 # Clear the tests from the original suite so it doesn't keep them alive
133 suite._tests[:] = []
134 for process_tests in test_blocks:
135 process_suite = unittest.TestSuite(process_tests)
136 # Also clear each split list so new suite has only reference
137 process_tests[:] = []
138 c2pread, c2pwrite = os.pipe()
139 pid = os.fork()
140 if pid == 0:
141 try:
Simon Glassfe240082020-12-28 20:34:58 -0700142 stream = os.fdopen(c2pwrite, 'wb')
Simon Glass11ae93e2018-10-01 21:12:47 -0600143 os.close(c2pread)
144 # Leave stderr and stdout open so we can see test noise
145 # Close stdin so that the child goes away if it decides to
146 # read from stdin (otherwise its a roulette to see what
147 # child actually gets keystrokes for pdb etc).
148 sys.stdin.close()
149 subunit_result = AutoTimingTestResultDecorator(
Alper Nebi Yasakebcaafc2022-04-02 20:06:08 +0300150 test_protocol_client_class(stream)
Simon Glass11ae93e2018-10-01 21:12:47 -0600151 )
152 process_suite.run(subunit_result)
153 except:
154 # Try and report traceback on stream, but exit with error
155 # even if stream couldn't be created or something else
156 # goes wrong. The traceback is formatted to a string and
157 # written in one go to avoid interleaving lines from
158 # multiple failing children.
159 try:
160 stream.write(traceback.format_exc())
161 finally:
162 os._exit(1)
163 os._exit(0)
164 else:
165 os.close(c2pwrite)
Simon Glassfe240082020-12-28 20:34:58 -0700166 stream = os.fdopen(c2pread, 'rb')
Alper Nebi Yasakebcaafc2022-04-02 20:06:08 +0300167 # If we don't pass the second argument here, it defaults
168 # to sys.stdout.buffer down the line. But if we don't
169 # pass it *now*, it may be resolved after sys.stdout is
170 # replaced with a StringIO (to capture tests' outputs)
171 # which doesn't have a buffer attribute and can end up
172 # occasionally causing a 'broken-runner' error.
173 test = ProtocolTestCase(stream, sys.stdout.buffer)
Simon Glass11ae93e2018-10-01 21:12:47 -0600174 result.append(test)
175 return result
176 return do_fork
177
178
179def partition_tests(suite, count):
180 """Partition suite into count lists of tests."""
181 # This just assigns tests in a round-robin fashion. On one hand this
182 # splits up blocks of related tests that might run faster if they shared
183 # resources, but on the other it avoids assigning blocks of slow tests to
184 # just one partition. So the slowest partition shouldn't be much slower
185 # than the fastest.
186 partitions = [list() for _ in range(count)]
187 tests = iterate_tests(suite)
188 for partition, test in zip(cycle(partitions), tests):
189 partition.append(test)
190 return partitions
191
192
193if __name__ == '__main__':
194 import time
195
196 class SampleTestCase(unittest.TestCase):
197 """Dummy tests that sleep for demo."""
198
199 def test_me_1(self):
200 time.sleep(0.5)
201
202 def test_me_2(self):
203 time.sleep(0.5)
204
205 def test_me_3(self):
206 time.sleep(0.5)
207
208 def test_me_4(self):
209 time.sleep(0.5)
210
211 # Load tests from SampleTestCase defined above
212 suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase)
213 runner = unittest.TextTestRunner()
214
215 # Run tests sequentially
216 runner.run(suite)
217
218 # Run same tests across 4 processes
219 suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase)
220 concurrent_suite = ConcurrentTestSuite(suite, fork_for_tests(4))
221 runner.run(concurrent_suite)