+import atexit
import errno
import os
import selectors
selector.close()
unused_fds = [alive_r, child_w, sig_r, sig_w]
unused_fds.extend(pid_to_fd.values())
+ atexit._clear()
+ atexit.register(util._exit_function)
code = _serve_one(child_r, fds,
unused_fds,
old_handlers)
sys.excepthook(*sys.exc_info())
sys.stderr.flush()
finally:
+ atexit._run_exitfuncs()
os._exit(code)
else:
# Send pid to client process
+import atexit
import os
import signal
self.pid = os.fork()
if self.pid == 0:
try:
+ atexit._clear()
+ atexit.register(util._exit_function)
os.close(parent_r)
os.close(parent_w)
code = process_obj._bootstrap(parent_sentinel=child_r)
finally:
+ atexit._run_exitfuncs()
os._exit(code)
else:
os.close(child_w)
self.assertFalse(err, msg=err.decode('utf-8'))
+class _TestAtExit(BaseTestCase):
+
+ ALLOWED_TYPES = ('processes',)
+
+ @classmethod
+ def _write_file_at_exit(self, output_path):
+ import atexit
+ def exit_handler():
+ with open(output_path, 'w') as f:
+ f.write("deadbeef")
+ atexit.register(exit_handler)
+
+ def test_atexit(self):
+ # gh-83856
+ with os_helper.temp_dir() as temp_dir:
+ output_path = os.path.join(temp_dir, 'output.txt')
+ p = self.Process(target=self._write_file_at_exit, args=(output_path,))
+ p.start()
+ p.join()
+ with open(output_path) as f:
+ self.assertEqual(f.read(), 'deadbeef')
+
+
class MiscTestCase(unittest.TestCase):
def test__all__(self):
# Just make sure names in not_exported are excluded