self.assertNotIn("<native>", output)
+@requires_subprocess()
+@skip_if_not_supported
class TestProcessPoolExecutorSupport(unittest.TestCase):
"""
Test that ProcessPoolExecutor works correctly with profiling.sampling.
"-d", "5",
"-i", "100000",
script,
+ stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
) as proc:
- proc.wait(timeout=SHORT_TIMEOUT)
- stdout = proc.stdout.read()
- stderr = proc.stderr.read()
+ try:
+ stdout, stderr = proc.communicate(timeout=SHORT_TIMEOUT)
+ except subprocess.TimeoutExpired:
+ proc.kill()
+ stdout, stderr = proc.communicate()
if "PermissionError" in stderr:
self.skipTest("Insufficient permissions for remote profiling")