]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
Try doing a raw test of os.fork()/os.kill().
authorRichard Oudkerk <shibturn@gmail.com>
Thu, 17 Oct 2013 13:24:06 +0000 (14:24 +0100)
committerRichard Oudkerk <shibturn@gmail.com>
Thu, 17 Oct 2013 13:24:06 +0000 (14:24 +0100)
Lib/test/_test_multiprocessing.py

index daf53194114b7f9cdba7a7d933ceb84bda504a1e..d02106d164d6072d0f25b5c4b9154dc1b41e6f6c 100644 (file)
@@ -273,10 +273,11 @@ class _TestProcess(BaseTestCase):
 
     @classmethod
     def _test_terminate(cls):
-        print('signal.getsignal(SIGTERM) =', signal.getsignal(signal.SIGTERM))
-        print('starting sleep')
+        print('signal.getsignal(SIGTERM) =',
+              signal.getsignal(signal.SIGTERM), file=sys.stderr)
+        print('starting sleep', file=sys.stderr)
         time.sleep(100)
-        print('finished sleep')
+        print('finished sleep', file=sys.stderr)
 
     def test_terminate(self):
         if self.TYPE == 'threads':
@@ -314,11 +315,12 @@ class _TestProcess(BaseTestCase):
             try:
                 signal.alarm(10)
                 self.assertEqual(join(), None)
-                signal.alarm(0)
             except RuntimeError:
-                print('os.waitpid() =', os.waitpid(p.pid, os.WNOHANG))
+                print('os.waitpid() =',
+                      os.waitpid(p.pid, os.WNOHANG), file=sys.stderr)
                 raise
             finally:
+                signal.alarm(0)
                 signal.signal(signal.SIGALRM, old_handler)
         else:
             self.assertEqual(join(), None)
@@ -333,6 +335,35 @@ class _TestProcess(BaseTestCase):
         # XXX sometimes get p.exitcode == 0 on Windows ...
         #self.assertEqual(p.exitcode, -signal.SIGTERM)
 
+    @unittest.skipIf(WIN32, 'Unix only')
+    def test_sigterm(self):
+        # A test for the Gentoo build bot which does not directly use
+        # multiprocessing.  Start and terminate child processes.
+        if self.TYPE != 'processes':
+            return
+        for i in range(10):
+            pid = os.fork()
+            if pid == 0:
+                try:
+                    print('sleeping', file=sys.stderr)
+                    time.sleep(100)
+                    print('waking', file=sys.stderr)
+                finally:
+                    sys.stderr.flush()
+                    os._exit(0)
+            else:
+                os.kill(pid, signal.SIGTERM)
+                def handler(*args):
+                    raise RuntimeError('join took too long: %s' % p)
+                old_handler = signal.signal(signal.SIGALRM, handler)
+                try:
+                    signal.alarm(10)
+                    pid_status = os.waitpid(pid, 0)
+                    self.assertEqual(pid_status[0], pid)
+                finally:
+                    signal.alarm(0)
+                    signal.signal(signal.SIGALRM, old_handler)
+
     def test_cpu_count(self):
         try:
             cpus = multiprocessing.cpu_count()