_current_process = self
_parent_process = _ParentProcess(
self._parent_name, self._parent_pid, parent_sentinel)
+ if threading._HAVE_THREAD_NATIVE_ID:
+ threading.main_thread()._set_native_id()
try:
util._finalizer_registry.clear()
util._run_after_forkers()
self.assertNotIn(p, self.active_children())
close_queue(q)
+ @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id")
+ def test_process_mainthread_native_id(self):
+ if self.TYPE == 'threads':
+ self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+ current_mainthread_native_id = threading.main_thread().native_id
+
+ q = self.Queue(1)
+ p = self.Process(target=self._test_process_mainthread_native_id, args=(q,))
+ p.start()
+
+ child_mainthread_native_id = q.get()
+ p.join()
+ close_queue(q)
+
+ self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id)
+
+ @classmethod
+ def _test_process_mainthread_native_id(cls, q):
+ mainthread_native_id = threading.main_thread().native_id
+ q.put(mainthread_native_id)
+
@classmethod
def _sleep_some(cls):
time.sleep(100)