''')
assert_python_ok("-c", script)
+ @skip_unless_reliable_fork
+ def test_native_id_after_fork(self):
+ script = """if True:
+ import threading
+ import os
+ from test import support
+
+ parent_thread_native_id = threading.current_thread().native_id
+ print(parent_thread_native_id, flush=True)
+ assert parent_thread_native_id == threading.get_native_id()
+ childpid = os.fork()
+ if childpid == 0:
+ print(threading.current_thread().native_id, flush=True)
+ assert threading.current_thread().native_id == threading.get_native_id()
+ else:
+ try:
+ assert parent_thread_native_id == threading.current_thread().native_id
+ assert parent_thread_native_id == threading.get_native_id()
+ finally:
+ support.wait_process(childpid, exitcode=0)
+ """
+ rc, out, err = assert_python_ok('-c', script)
+ self.assertEqual(rc, 0)
+ self.assertEqual(err, b"")
+ native_ids = out.strip().splitlines()
+ self.assertEqual(len(native_ids), 2)
+ self.assertNotEqual(native_ids[0], native_ids[1])
+
class ThreadJoinOnShutdown(BaseTestCase):
def _run_and_join(self, script):
# This thread is alive.
self._ident = new_ident
assert self._os_thread_handle.ident == new_ident
+ if _HAVE_THREAD_NATIVE_ID:
+ self._set_native_id()
else:
# Otherwise, the thread is dead, Jim. _PyThread_AfterFork()
# already marked our handle done.