]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
Issue11551 - Increase the test coverage of _dummy_thread module to 100%.
authorSenthil Kumaran <senthil@uthcode.com>
Thu, 8 Sep 2016 09:46:22 +0000 (02:46 -0700)
committerSenthil Kumaran <senthil@uthcode.com>
Thu, 8 Sep 2016 09:46:22 +0000 (02:46 -0700)
Initial patch contributed by Denver Coneybeare.

Lib/test/test_dummy_thread.py

index a6a2856511937694e98d0e7d625e1613605347ff..bb4bfe7a4d39c9311be38ee84c98fcb59f0e8a5d 100644 (file)
@@ -1,19 +1,13 @@
-"""Generic thread tests.
-
-Meant to be used by dummy_thread and thread.  To allow for different modules
-to be used, test_main() can be called with the module to use as the thread
-implementation as its sole argument.
-
-"""
 import _dummy_thread as _thread
 import time
 import queue
 import random
 import unittest
 from test import support
+from unittest import mock
+
+DELAY = 0
 
-DELAY = 0 # Set > 0 when testing a module other than _dummy_thread, such as
-          # the '_thread' module.
 
 class LockTests(unittest.TestCase):
     """Test lock objects."""
@@ -34,6 +28,12 @@ class LockTests(unittest.TestCase):
         self.assertFalse(self.lock.locked(),
                         "Lock object did not release properly.")
 
+    def test_LockType_context_manager(self):
+        with _thread.LockType():
+            pass
+        self.assertFalse(self.lock.locked(),
+                         "Acquired Lock was not released")
+
     def test_improper_release(self):
         #Make sure release of an unlocked thread raises RuntimeError
         self.assertRaises(RuntimeError, self.lock.release)
@@ -83,39 +83,72 @@ class LockTests(unittest.TestCase):
         self.assertGreaterEqual(end_time - start_time, DELAY,
                         "Blocking by unconditional acquiring failed.")
 
+    @mock.patch('time.sleep')
+    def test_acquire_timeout(self, mock_sleep):
+        """Test invoking acquire() with a positive timeout when the lock is
+        already acquired. Ensure that time.sleep() is invoked with the given
+        timeout and that False is returned."""
+
+        self.lock.acquire()
+        retval = self.lock.acquire(waitflag=0, timeout=1)
+        self.assertTrue(mock_sleep.called)
+        mock_sleep.assert_called_once_with(1)
+        self.assertEqual(retval, False)
+
+    def test_lock_representation(self):
+        self.lock.acquire()
+        self.assertIn("locked", repr(self.lock))
+        self.lock.release()
+        self.assertIn("unlocked", repr(self.lock))
+
+
 class MiscTests(unittest.TestCase):
     """Miscellaneous tests."""
 
     def test_exit(self):
-        #Make sure _thread.exit() raises SystemExit
         self.assertRaises(SystemExit, _thread.exit)
 
     def test_ident(self):
-        #Test sanity of _thread.get_ident()
         self.assertIsInstance(_thread.get_ident(), int,
                               "_thread.get_ident() returned a non-integer")
         self.assertNotEqual(_thread.get_ident(), 0,
                         "_thread.get_ident() returned 0")
 
     def test_LockType(self):
-        #Make sure _thread.LockType is the same type as _thread.allocate_locke()
         self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
                               "_thread.LockType is not an instance of what "
                               "is returned by _thread.allocate_lock()")
 
+    def test_set_sentinel(self):
+        self.assertIsInstance(_thread._set_sentinel(), _thread.LockType,
+                              "_thread._set_sentinel() did not return a "
+                              "LockType instance.")
+
     def test_interrupt_main(self):
         #Calling start_new_thread with a function that executes interrupt_main
         # should raise KeyboardInterrupt upon completion.
         def call_interrupt():
             _thread.interrupt_main()
-        self.assertRaises(KeyboardInterrupt, _thread.start_new_thread,
-                              call_interrupt, tuple())
+
+        self.assertRaises(KeyboardInterrupt,
+                          _thread.start_new_thread,
+                          call_interrupt,
+                          tuple())
 
     def test_interrupt_in_main(self):
-        # Make sure that if interrupt_main is called in main threat that
-        # KeyboardInterrupt is raised instantly.
         self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
 
+    def test_stack_size_None(self):
+        retval = _thread.stack_size(None)
+        self.assertEqual(retval, 0)
+
+    def test_stack_size_not_None(self):
+        with self.assertRaises(_thread.error) as cm:
+            _thread.stack_size("")
+        self.assertEqual(cm.exception.args[0],
+                         "setting thread stack size not supported")
+
+
 class ThreadTests(unittest.TestCase):
     """Test thread creation."""
 
@@ -129,31 +162,43 @@ class ThreadTests(unittest.TestCase):
         _thread.start_new_thread(arg_tester, (testing_queue, True, True))
         result = testing_queue.get()
         self.assertTrue(result[0] and result[1],
-                        "Argument passing for thread creation using tuple failed")
-        _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue,
-                                                       'arg1':True, 'arg2':True})
+                        "Argument passing for thread creation "
+                        "using tuple failed")
+
+        _thread.start_new_thread(
+                arg_tester,
+                tuple(),
+                {'queue':testing_queue, 'arg1':True, 'arg2':True})
+
         result = testing_queue.get()
         self.assertTrue(result[0] and result[1],
-                        "Argument passing for thread creation using kwargs failed")
-        _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True})
+                        "Argument passing for thread creation "
+                        "using kwargs failed")
+
+        _thread.start_new_thread(
+                arg_tester,
+                (testing_queue, True),
+                {'arg2':True})
+
         result = testing_queue.get()
         self.assertTrue(result[0] and result[1],
                         "Argument passing for thread creation using both tuple"
                         " and kwargs failed")
 
-    def test_multi_creation(self):
-        #Make sure multiple threads can be created.
+    def test_multi_thread_creation(self):
         def queue_mark(queue, delay):
-            """Wait for ``delay`` seconds and then put something into ``queue``"""
             time.sleep(delay)
             queue.put(_thread.get_ident())
 
         thread_count = 5
         testing_queue = queue.Queue(thread_count)
+
         if support.verbose:
             print()
-            print("*** Testing multiple thread creation "\
-            "(will take approx. %s to %s sec.) ***" % (DELAY, thread_count))
+            print("*** Testing multiple thread creation "
+                  "(will take approx. %s to %s sec.) ***" % (
+                    DELAY, thread_count))
+
         for count in range(thread_count):
             if DELAY:
                 local_delay = round(random.random(), 1)
@@ -165,18 +210,47 @@ class ThreadTests(unittest.TestCase):
         if support.verbose:
             print('done')
         self.assertEqual(testing_queue.qsize(), thread_count,
-                        "Not all %s threads executed properly after %s sec." %
-                        (thread_count, DELAY))
-
-def test_main(imported_module=None):
-    global _thread, DELAY
-    if imported_module:
-        _thread = imported_module
-        DELAY = 2
-    if support.verbose:
-        print()
-        print("*** Using %s as _thread module ***" % _thread)
-    support.run_unittest(LockTests, MiscTests, ThreadTests)
-
-if __name__ == '__main__':
-    test_main()
+                         "Not all %s threads executed properly "
+                         "after %s sec." % (thread_count, DELAY))
+
+    def test_args_not_tuple(self):
+        """
+        Test invoking start_new_thread() with a non-tuple value for "args".
+        Expect TypeError with a meaningful error message to be raised.
+        """
+        with self.assertRaises(TypeError) as cm:
+            _thread.start_new_thread(mock.Mock(), [])
+        self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple")
+
+    def test_kwargs_not_dict(self):
+        """
+        Test invoking start_new_thread() with a non-dict value for "kwargs".
+        Expect TypeError with a meaningful error message to be raised.
+        """
+        with self.assertRaises(TypeError) as cm:
+            _thread.start_new_thread(mock.Mock(), tuple(), kwargs=[])
+        self.assertEqual(cm.exception.args[0], "3rd arg must be a dict")
+
+    def test_SystemExit(self):
+        """
+        Test invoking start_new_thread() with a function that raises
+        SystemExit.
+        The exception should be discarded.
+        """
+        func = mock.Mock(side_effect=SystemExit())
+        try:
+            _thread.start_new_thread(func, tuple())
+        except SystemExit:
+            self.fail("start_new_thread raised SystemExit.")
+
+    @mock.patch('traceback.print_exc')
+    def test_RaiseException(self, mock_print_exc):
+        """
+        Test invoking start_new_thread() with a function that raises exception.
+
+        The exception should be discarded and the traceback should be printed
+        via traceback.print_exc()
+        """
+        func = mock.Mock(side_effect=Exception)
+        _thread.start_new_thread(func, tuple())
+        self.assertTrue(mock_print_exc.called)