import asyncio
import inspect
import unittest
+ import threading
from unittest.mock import sentinel, DEFAULT, ANY
from unittest.mock import patch, call, Mock, MagicMock, PropertyMock, AsyncMock
+ from unittest.mock import ThreadingMock
from unittest.mock import mock_open
:class:`Mock` is a flexible mock object intended to replace the use of stubs and
[call('foo'), call('bar')]
+.. class:: ThreadingMock(spec=None, side_effect=None, return_value=DEFAULT, wraps=None, name=None, spec_set=None, unsafe=False, *, timeout=UNSET, **kwargs)
+
+ A version of :class:`MagicMock` for multithreading tests. The
+ :class:`ThreadingMock` object provides extra methods to wait for a call to
+ be invoked, rather than assert on it immediately.
+
+ The default timeout is specified by the ``timeout`` argument, or if unset by the
+ :attr:`ThreadingMock.DEFAULT_TIMEOUT` attribute, which defaults to blocking (``None``).
+
+ You can configure the global default timeout by setting :attr:`ThreadingMock.DEFAULT_TIMEOUT`.
+
+ .. method:: wait_until_called(*, timeout=UNSET)
+
+ Waits until the mock is called.
+
+ If a timeout was passed at the creation of the mock or if a timeout
+ argument is passed to this function, the function raises an
+ :exc:`AssertionError` if the call is not performed in time.
+
+ >>> mock = ThreadingMock()
+ >>> thread = threading.Thread(target=mock)
+ >>> thread.start()
+ >>> mock.wait_until_called(timeout=1)
+ >>> thread.join()
+
+ .. method:: wait_until_any_call(*args, **kwargs)
+
+ Waits until the the mock is called with the specified arguments.
+
+ If a timeout was passed at the creation of the mock
+ the function raises an :exc:`AssertionError` if the call is not performed in time.
+
+ >>> mock = ThreadingMock()
+ >>> thread = threading.Thread(target=mock, args=("arg1", "arg2",), kwargs={"arg": "thing"})
+ >>> thread.start()
+ >>> mock.wait_until_any_call("arg1", "arg2", arg="thing")
+ >>> thread.join()
+
+ .. attribute:: DEFAULT_TIMEOUT
+
+ Global default timeout in seconds to create instances of :class:`ThreadingMock`.
+
+ .. versionadded:: 3.13
+
+
Calling
~~~~~~~
--- /dev/null
+import time
+import unittest
+import concurrent.futures
+
+from unittest.mock import patch, ThreadingMock, call
+
+
+class Something:
+ def method_1(self):
+ pass
+
+ def method_2(self):
+ pass
+
+
+class TestThreadingMock(unittest.TestCase):
+ def _call_after_delay(self, func, /, *args, **kwargs):
+ time.sleep(kwargs.pop("delay"))
+ func(*args, **kwargs)
+
+ def setUp(self):
+ self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
+
+ def tearDown(self):
+ self._executor.shutdown()
+
+ def run_async(self, func, /, *args, delay=0, **kwargs):
+ self._executor.submit(
+ self._call_after_delay, func, *args, **kwargs, delay=delay
+ )
+
+ def _make_mock(self, *args, **kwargs):
+ return ThreadingMock(*args, **kwargs)
+
+ def test_spec(self):
+ waitable_mock = self._make_mock(spec=Something)
+
+ with patch(f"{__name__}.Something", waitable_mock) as m:
+ something = m()
+
+ self.assertIsInstance(something.method_1, ThreadingMock)
+ self.assertIsInstance(something.method_1().method_2(), ThreadingMock)
+
+ with self.assertRaises(AttributeError):
+ m.test
+
+ def test_side_effect(self):
+ waitable_mock = self._make_mock()
+
+ with patch(f"{__name__}.Something", waitable_mock):
+ something = Something()
+ something.method_1.side_effect = [1]
+
+ self.assertEqual(something.method_1(), 1)
+
+ def test_instance_check(self):
+ waitable_mock = self._make_mock()
+
+ with patch(f"{__name__}.Something", waitable_mock):
+ something = Something()
+
+ self.assertIsInstance(something.method_1, ThreadingMock)
+ self.assertIsInstance(something.method_1().method_2(), ThreadingMock)
+
+ def test_dynamic_child_mocks_are_threading_mocks(self):
+ waitable_mock = self._make_mock()
+ self.assertIsInstance(waitable_mock.child, ThreadingMock)
+
+ def test_dynamic_child_mocks_inherit_timeout(self):
+ mock1 = self._make_mock()
+ self.assertIs(mock1._mock_wait_timeout, None)
+ mock2 = self._make_mock(timeout=2)
+ self.assertEqual(mock2._mock_wait_timeout, 2)
+ mock3 = self._make_mock(timeout=3)
+ self.assertEqual(mock3._mock_wait_timeout, 3)
+
+ self.assertIs(mock1.child._mock_wait_timeout, None)
+ self.assertEqual(mock2.child._mock_wait_timeout, 2)
+ self.assertEqual(mock3.child._mock_wait_timeout, 3)
+
+ self.assertEqual(mock2.really().__mul__().complex._mock_wait_timeout, 2)
+
+ def test_no_name_clash(self):
+ waitable_mock = self._make_mock()
+ waitable_mock._event = "myevent"
+ waitable_mock.event = "myevent"
+ waitable_mock.timeout = "mytimeout"
+ waitable_mock("works")
+ waitable_mock.wait_until_called()
+ waitable_mock.wait_until_any_call("works")
+
+ def test_wait_success(self):
+ waitable_mock = self._make_mock(spec=Something)
+
+ with patch(f"{__name__}.Something", waitable_mock):
+ something = Something()
+ self.run_async(something.method_1, delay=0.01)
+ something.method_1.wait_until_called()
+ something.method_1.wait_until_any_call()
+ something.method_1.assert_called()
+
+ def test_wait_success_with_instance_timeout(self):
+ waitable_mock = self._make_mock(timeout=1)
+
+ with patch(f"{__name__}.Something", waitable_mock):
+ something = Something()
+ self.run_async(something.method_1, delay=0.01)
+ something.method_1.wait_until_called()
+ something.method_1.wait_until_any_call()
+ something.method_1.assert_called()
+
+ def test_wait_failed_with_instance_timeout(self):
+ waitable_mock = self._make_mock(timeout=0.01)
+
+ with patch(f"{__name__}.Something", waitable_mock):
+ something = Something()
+ self.run_async(something.method_1, delay=0.5)
+ self.assertRaises(AssertionError, waitable_mock.method_1.wait_until_called)
+ self.assertRaises(
+ AssertionError, waitable_mock.method_1.wait_until_any_call
+ )
+
+ def test_wait_success_with_timeout_override(self):
+ waitable_mock = self._make_mock(timeout=0.01)
+
+ with patch(f"{__name__}.Something", waitable_mock):
+ something = Something()
+ self.run_async(something.method_1, delay=0.05)
+ something.method_1.wait_until_called(timeout=1)
+
+ def test_wait_failed_with_timeout_override(self):
+ waitable_mock = self._make_mock(timeout=1)
+
+ with patch(f"{__name__}.Something", waitable_mock):
+ something = Something()
+ self.run_async(something.method_1, delay=0.1)
+ with self.assertRaises(AssertionError):
+ something.method_1.wait_until_called(timeout=0.05)
+ with self.assertRaises(AssertionError):
+ something.method_1.wait_until_any_call(timeout=0.05)
+
+ def test_wait_success_called_before(self):
+ waitable_mock = self._make_mock()
+
+ with patch(f"{__name__}.Something", waitable_mock):
+ something = Something()
+ something.method_1()
+ something.method_1.wait_until_called()
+ something.method_1.wait_until_any_call()
+ something.method_1.assert_called()
+
+ def test_wait_magic_method(self):
+ waitable_mock = self._make_mock()
+
+ with patch(f"{__name__}.Something", waitable_mock):
+ something = Something()
+ self.run_async(something.method_1.__str__, delay=0.01)
+ something.method_1.__str__.wait_until_called()
+ something.method_1.__str__.assert_called()
+
+ def test_wait_until_any_call_positional(self):
+ waitable_mock = self._make_mock(spec=Something)
+
+ with patch(f"{__name__}.Something", waitable_mock):
+ something = Something()
+ self.run_async(something.method_1, 1, delay=0.1)
+ self.run_async(something.method_1, 2, delay=0.2)
+ self.run_async(something.method_1, 3, delay=0.3)
+ self.assertNotIn(call(1), something.method_1.mock_calls)
+
+ something.method_1.wait_until_any_call(1)
+ something.method_1.assert_called_with(1)
+ self.assertNotIn(call(2), something.method_1.mock_calls)
+ self.assertNotIn(call(3), something.method_1.mock_calls)
+
+ something.method_1.wait_until_any_call(3)
+ self.assertIn(call(2), something.method_1.mock_calls)
+ something.method_1.wait_until_any_call(2)
+
+ def test_wait_until_any_call_keywords(self):
+ waitable_mock = self._make_mock(spec=Something)
+
+ with patch(f"{__name__}.Something", waitable_mock):
+ something = Something()
+ self.run_async(something.method_1, a=1, delay=0.1)
+ self.run_async(something.method_1, b=2, delay=0.2)
+ self.run_async(something.method_1, c=3, delay=0.3)
+ self.assertNotIn(call(a=1), something.method_1.mock_calls)
+
+ something.method_1.wait_until_any_call(a=1)
+ something.method_1.assert_called_with(a=1)
+ self.assertNotIn(call(b=2), something.method_1.mock_calls)
+ self.assertNotIn(call(c=3), something.method_1.mock_calls)
+
+ something.method_1.wait_until_any_call(c=3)
+ self.assertIn(call(b=2), something.method_1.mock_calls)
+ something.method_1.wait_until_any_call(b=2)
+
+ def test_wait_until_any_call_no_argument_fails_when_called_with_arg(self):
+ waitable_mock = self._make_mock(timeout=0.01)
+
+ with patch(f"{__name__}.Something", waitable_mock):
+ something = Something()
+ something.method_1(1)
+
+ something.method_1.assert_called_with(1)
+ with self.assertRaises(AssertionError):
+ something.method_1.wait_until_any_call()
+
+ something.method_1()
+ something.method_1.wait_until_any_call()
+
+ def test_wait_until_any_call_global_default(self):
+ with patch.object(ThreadingMock, "DEFAULT_TIMEOUT"):
+ ThreadingMock.DEFAULT_TIMEOUT = 0.01
+ m = self._make_mock()
+ with self.assertRaises(AssertionError):
+ m.wait_until_any_call()
+ with self.assertRaises(AssertionError):
+ m.wait_until_called()
+
+ m()
+ m.wait_until_any_call()
+ assert ThreadingMock.DEFAULT_TIMEOUT != 0.01
+
+ def test_wait_until_any_call_change_global_and_override(self):
+ with patch.object(ThreadingMock, "DEFAULT_TIMEOUT"):
+ ThreadingMock.DEFAULT_TIMEOUT = 0.01
+
+ m1 = self._make_mock()
+ self.run_async(m1, delay=0.1)
+ with self.assertRaises(AssertionError):
+ m1.wait_until_called()
+
+ m2 = self._make_mock(timeout=1)
+ self.run_async(m2, delay=0.1)
+ m2.wait_until_called()
+
+ m3 = self._make_mock()
+ self.run_async(m3, delay=0.1)
+ m3.wait_until_called(timeout=1)
+
+ m4 = self._make_mock()
+ self.run_async(m4, delay=0.1)
+ m4.wait_until_called(timeout=None)
+
+ m5 = self._make_mock(timeout=None)
+ self.run_async(m5, delay=0.1)
+ m5.wait_until_called()
+
+ assert ThreadingMock.DEFAULT_TIMEOUT != 0.01
+
+ def test_reset_mock_resets_wait(self):
+ m = self._make_mock(timeout=0.01)
+
+ with self.assertRaises(AssertionError):
+ m.wait_until_called()
+ with self.assertRaises(AssertionError):
+ m.wait_until_any_call()
+ m()
+ m.wait_until_called()
+ m.wait_until_any_call()
+ m.assert_called_once()
+
+ m.reset_mock()
+
+ with self.assertRaises(AssertionError):
+ m.wait_until_called()
+ with self.assertRaises(AssertionError):
+ m.wait_until_any_call()
+ m()
+ m.wait_until_called()
+ m.wait_until_any_call()
+ m.assert_called_once()
+
+
+if __name__ == "__main__":
+ unittest.main()
'call',
'create_autospec',
'AsyncMock',
+ 'ThreadingMock',
'FILTER_DIR',
'NonCallableMock',
'NonCallableMagicMock',
import builtins
import pkgutil
from asyncio import iscoroutinefunction
+import threading
from types import CodeType, ModuleType, MethodType
from unittest.util import safe_repr
from functools import wraps, partial
self(val)
+_timeout_unset = sentinel.TIMEOUT_UNSET
+
+class ThreadingMixin(Base):
+
+ DEFAULT_TIMEOUT = None
+
+ def _get_child_mock(self, /, **kw):
+ if "timeout" in kw:
+ kw["timeout"] = kw.pop("timeout")
+ elif isinstance(kw.get("parent"), ThreadingMixin):
+ kw["timeout"] = kw["parent"]._mock_wait_timeout
+ elif isinstance(kw.get("_new_parent"), ThreadingMixin):
+ kw["timeout"] = kw["_new_parent"]._mock_wait_timeout
+ return super()._get_child_mock(**kw)
+
+ def __init__(self, *args, timeout=_timeout_unset, **kwargs):
+ super().__init__(*args, **kwargs)
+ if timeout is _timeout_unset:
+ timeout = self.DEFAULT_TIMEOUT
+ self.__dict__["_mock_event"] = threading.Event() # Event for any call
+ self.__dict__["_mock_calls_events"] = [] # Events for each of the calls
+ self.__dict__["_mock_calls_events_lock"] = threading.Lock()
+ self.__dict__["_mock_wait_timeout"] = timeout
+
+ def reset_mock(self, /, *args, **kwargs):
+ """
+ See :func:`.Mock.reset_mock()`
+ """
+ super().reset_mock(*args, **kwargs)
+ self.__dict__["_mock_event"] = threading.Event()
+ self.__dict__["_mock_calls_events"] = []
+
+ def __get_event(self, expected_args, expected_kwargs):
+ with self._mock_calls_events_lock:
+ for args, kwargs, event in self._mock_calls_events:
+ if (args, kwargs) == (expected_args, expected_kwargs):
+ return event
+ new_event = threading.Event()
+ self._mock_calls_events.append((expected_args, expected_kwargs, new_event))
+ return new_event
+
+ def _mock_call(self, *args, **kwargs):
+ ret_value = super()._mock_call(*args, **kwargs)
+
+ call_event = self.__get_event(args, kwargs)
+ call_event.set()
+
+ self._mock_event.set()
+
+ return ret_value
+
+ def wait_until_called(self, *, timeout=_timeout_unset):
+ """Wait until the mock object is called.
+
+ `timeout` - time to wait for in seconds, waits forever otherwise.
+ Defaults to the constructor provided timeout.
+ Use None to block undefinetively.
+ """
+ if timeout is _timeout_unset:
+ timeout = self._mock_wait_timeout
+ if not self._mock_event.wait(timeout=timeout):
+ msg = (f"{self._mock_name or 'mock'} was not called before"
+ f" timeout({timeout}).")
+ raise AssertionError(msg)
+
+ def wait_until_any_call(self, *args, **kwargs):
+ """Wait until the mock object is called with given args.
+
+ Waits for the timeout in seconds provided in the constructor.
+ """
+ event = self.__get_event(args, kwargs)
+ if not event.wait(timeout=self._mock_wait_timeout):
+ expected_string = self._format_mock_call_signature(args, kwargs)
+ raise AssertionError(f'{expected_string} call not found')
+
+
+class ThreadingMock(ThreadingMixin, MagicMixin, Mock):
+ """
+ A mock that can be used to wait until on calls happening
+ in a different thread.
+
+ The constructor can take a `timeout` argument which
+ controls the timeout in seconds for all `wait` calls of the mock.
+
+ You can change the default timeout of all instances via the
+ `ThreadingMock.DEFAULT_TIMEOUT` attribute.
+
+ If no timeout is set, it will block undefinetively.
+ """
+ pass
+
+
def seal(mock):
"""Disable the automatic generation of child mocks.