self.release()
-class Lock(_ContextManagerMixin, mixins._LoopBoundedMixin):
+class Lock(_ContextManagerMixin, mixins._LoopBoundMixin):
"""Primitive lock objects.
A primitive lock is a synchronization primitive that is not owned
"""
- def __init__(self):
+ def __init__(self, *, loop=mixins._marker):
+ super().__init__(loop=loop)
self._waiters = None
self._locked = False
fut.set_result(True)
-class Event(mixins._LoopBoundedMixin):
+class Event(mixins._LoopBoundMixin):
"""Asynchronous equivalent to threading.Event.
Class implementing event objects. An event manages a flag that can be set
false.
"""
- def __init__(self):
+ def __init__(self, *, loop=mixins._marker):
+ super().__init__(loop=loop)
self._waiters = collections.deque()
self._value = False
self._waiters.remove(fut)
-class Condition(_ContextManagerMixin, mixins._LoopBoundedMixin):
+class Condition(_ContextManagerMixin, mixins._LoopBoundMixin):
"""Asynchronous equivalent to threading.Condition.
This class implements condition variable objects. A condition variable
A new Lock object is created and used as the underlying lock.
"""
- def __init__(self, lock=None):
+ def __init__(self, lock=None, *, loop=mixins._marker):
+ super().__init__(loop=loop)
if lock is None:
lock = Lock()
elif lock._loop is not self._get_loop():
self.notify(len(self._waiters))
-class Semaphore(_ContextManagerMixin, mixins._LoopBoundedMixin):
+class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
"""A Semaphore implementation.
A semaphore manages an internal counter which is decremented by each
ValueError is raised.
"""
- def __init__(self, value=1):
+ def __init__(self, value=1, *, loop=mixins._marker):
+ super().__init__(loop=loop)
if value < 0:
raise ValueError("Semaphore initial value must be >= 0")
self._value = value
above the initial value.
"""
- def __init__(self, value=1):
+ def __init__(self, value=1, *, loop=mixins._marker):
self._bound_value = value
- super().__init__(value)
+ super().__init__(value, loop=loop)
def release(self):
if self._value >= self._bound_value:
_global_lock = threading.Lock()
+# Used as a sentinel for loop parameter
+_marker = object()
-class _LoopBoundedMixin:
+
+class _LoopBoundMixin:
_loop = None
+ def __init__(self, *, loop=_marker):
+ if loop is not _marker:
+ raise TypeError(
+ f'As of 3.10, the *loop* parameter was removed from '
+ f'{type(self).__name__}() since it is no longer necessary'
+ )
+
def _get_loop(self):
loop = events._get_running_loop()
if self._loop is None:
self._loop = loop
if loop is not self._loop:
- raise RuntimeError(f'{type(self).__name__} have already bounded to another loop')
+ raise RuntimeError(f'{self!r} is bound to a different event loop')
return loop
pass
-class Queue(mixins._LoopBoundedMixin):
+class Queue(mixins._LoopBoundMixin):
"""A queue, useful for coordinating producer and consumer coroutines.
If maxsize is less than or equal to zero, the queue size is infinite. If it
interrupted between calling qsize() and doing an operation on the Queue.
"""
- def __init__(self, maxsize=0):
+ def __init__(self, maxsize=0, *, loop=mixins._marker):
+ super().__init__(loop=loop)
self._maxsize = maxsize
# Futures.
self.assertFalse(lock.locked())
+ def test_lock_doesnt_accept_loop_parameter(self):
+ primitives_cls = [
+ asyncio.Lock,
+ asyncio.Condition,
+ asyncio.Event,
+ asyncio.Semaphore,
+ asyncio.BoundedSemaphore,
+ ]
+
+ for cls in primitives_cls:
+ with self.assertRaisesRegex(
+ TypeError,
+ rf'As of 3.10, the \*loop\* parameter was removed from '
+ rf'{cls.__name__}\(\) since it is no longer necessary'
+ ):
+ cls(loop=self.loop)
+
def test_lock_by_with_statement(self):
loop = asyncio.new_event_loop() # don't use TestLoop quirks
self.set_event_loop(loop)
self.set_event_loop(loop)
return loop
- def unpatch_get_running_loop(self):
- events._get_running_loop = self._get_running_loop
-
def setUp(self):
- self._get_running_loop = events._get_running_loop
-
- def _get_running_loop():
- frame = sys._getframe(1)
-
- if frame.f_globals['__name__'] == 'asyncio.mixins':
- # When we called from LoopBoundedMixin we should
- # fallback to default implementation of get_running_loop
- try:
- return events.get_running_loop()
- except RuntimeError:
- return None
-
- return None
-
- events._get_running_loop = _get_running_loop
self._thread_cleanup = threading_helper.threading_setup()
def tearDown(self):
- self.unpatch_get_running_loop()
-
events.set_event_loop(None)
# Detect CPython bug #23353: ensure that yield/yield-from is not used