return PoolSemaphore(max_value)
def create_event(self) -> BaseEvent:
- return typing.cast(BaseEvent, asyncio.Event())
+ return Event()
+
+
+class Event(BaseEvent):
+ def __init__(self) -> None:
+ self._event = asyncio.Event()
+
+ def set(self) -> None:
+ self._event.set()
+
+ async def wait(self) -> None:
+ await self._event.wait()
def set(self) -> None:
raise NotImplementedError() # pragma: no cover
- def is_set(self) -> bool:
- raise NotImplementedError() # pragma: no cover
-
- def clear(self) -> None:
- raise NotImplementedError() # pragma: no cover
-
async def wait(self) -> None:
raise NotImplementedError() # pragma: no cover
def set(self) -> None:
self._event.set()
- def is_set(self) -> bool:
- return self._event.is_set()
-
async def wait(self) -> None:
await self._event.wait()
-
- def clear(self) -> None:
- # trio.Event.clear() was deprecated in Trio 0.12.
- # https://github.com/python-trio/trio/issues/637
- self._event = trio.Event()