BaseBackgroundManager,
BaseEvent,
BasePoolSemaphore,
- BaseQueue,
BaseSocketStream,
ConcurrencyBackend,
TimeoutFlag,
def get_semaphore(self, limits: PoolLimits) -> BasePoolSemaphore:
return PoolSemaphore(limits)
- def create_queue(self, max_size: int) -> BaseQueue:
- return typing.cast(BaseQueue, asyncio.Queue(maxsize=max_size))
-
def create_event(self) -> BaseEvent:
return typing.cast(BaseEvent, asyncio.Event())
raise NotImplementedError() # pragma: no cover
-class BaseQueue:
- """
- A FIFO queue. Abstracts away any asyncio-specific interfaces.
- """
-
- async def get(self) -> typing.Any:
- raise NotImplementedError() # pragma: no cover
-
- async def put(self, value: typing.Any) -> None:
- raise NotImplementedError() # pragma: no cover
-
-
class BaseEvent:
"""
An event object. Abstracts away any asyncio-specific interfaces.
except StopAsyncIteration:
break
- def create_queue(self, max_size: int) -> BaseQueue:
- raise NotImplementedError() # pragma: no cover
-
def create_event(self) -> BaseEvent:
raise NotImplementedError() # pragma: no cover
import functools
-import math
import ssl
import typing
from types import TracebackType
BaseBackgroundManager,
BaseEvent,
BasePoolSemaphore,
- BaseQueue,
BaseSocketStream,
ConcurrencyBackend,
TimeoutFlag,
def get_semaphore(self, limits: PoolLimits) -> BasePoolSemaphore:
return PoolSemaphore(limits)
- def create_queue(self, max_size: int) -> BaseQueue:
- return Queue(max_size=max_size)
-
def create_event(self) -> BaseEvent:
return Event()
return BackgroundManager(coroutine, *args)
-class Queue(BaseQueue):
- def __init__(self, max_size: int) -> None:
- self.send_channel, self.receive_channel = trio.open_memory_channel(math.inf)
-
- async def get(self) -> typing.Any:
- return await self.receive_channel.receive()
-
- async def put(self, value: typing.Any) -> None:
- await self.send_channel.send(value)
-
-
class Event(BaseEvent):
def __init__(self) -> None:
self._event = trio.Event()