If factory is None the default task factory will be set.
If factory is a callable, it should have a signature matching
- '(loop, coro, **kwargs)', where 'loop' will be a reference to the active
- event loop, 'coro' will be a coroutine object, and **kwargs will be
- arbitrary keyword arguments that should be passed on to Task.
- The callable must return a Task.
+ '(loop, coro, **kwargs)', where 'loop' will be a reference to the
+ active event loop, 'coro' will be a coroutine object, and **kwargs
+ will be arbitrary keyword arguments that should be passed on to
+ Task. The callable must return a Task.
"""
if factory is not None and not callable(factory):
raise TypeError('task factory must be a callable or None')
def stop(self):
"""Stop running the event loop.
- Every callback already scheduled will still run. This simply informs
- run_forever to stop looping after a complete iteration.
+ Every callback already scheduled will still run. This simply
+ informs run_forever to stop looping after a complete iteration.
"""
self._stopping = True
Create a streaming transport connection to a given internet host and
port: socket family AF_INET or socket.AF_INET6 depending on host (or
- family if specified), socket type SOCK_STREAM. protocol_factory must be
- a callable returning a protocol instance.
+ family if specified), socket type SOCK_STREAM. protocol_factory must
+ be a callable returning a protocol instance.
- This method is a coroutine which will try to establish the connection
- in the background. When successful, the coroutine returns a
- (transport, protocol) pair.
+ This method is a coroutine which will try to establish the
+ connection in the background. When successful, the coroutine
+ returns a (transport, protocol) pair.
"""
if server_hostname is not None and not ssl:
raise ValueError('server_hostname is only meaningful with ssl')
The host parameter can be a string, in that case the TCP server is
bound to host and port.
- The host parameter can also be a sequence of strings and in that case
- the TCP server is bound to all hosts of the sequence. If a host
- appears multiple times (possibly indirectly e.g. when hostnames
- resolve to the same IP address), the server is only bound once to that
- host.
+ The host parameter can also be a sequence of strings and in that
+ case the TCP server is bound to all hosts of the sequence. If
+ a host appears multiple times (possibly indirectly e.g. when
+ hostnames resolve to the same IP address), the server is only bound
+ once to that host.
Return a Server object which can be used to stop the service.
If host is an empty string or None all interfaces are assumed
and a list of multiple sockets will be returned (most likely
- one for IPv4 and another one for IPv6). The host parameter can also be
- a sequence (e.g. list) of hosts to bind to.
+ one for IPv4 and another one for IPv6). The host parameter can also
+ be a sequence (e.g. list) of hosts to bind to.
family can be set to either AF_INET or AF_INET6 to force the
socket to use IPv4 or IPv6. If not set it will be determined
start_serving set to True (default) causes the created server
to start accepting connections immediately. When set to False,
- the user should await Server.start_serving() or Server.serve_forever()
- to make the server to start accepting connections.
+ the user should await Server.start_serving() or
+ Server.serve_forever() to make the server to start accepting
+ connections.
"""
raise NotImplementedError
start_serving set to True (default) causes the created server
to start accepting connections immediately. When set to False,
- the user should await Server.start_serving() or Server.serve_forever()
- to make the server to start accepting connections.
+ the user should await Server.start_serving() or
+ Server.serve_forever() to make the server to start accepting
+ connections.
"""
raise NotImplementedError
protocol_factory must be a callable returning a protocol instance.
- socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on
- host (or family if specified), socket type SOCK_DGRAM.
+ socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending
+ on host (or family if specified), socket type SOCK_DGRAM.
reuse_address tells the kernel to reuse a local socket in
TIME_WAIT state, without waiting for its natural timeout to
async def connect_write_pipe(self, protocol_factory, pipe):
"""Register write pipe in event loop.
- protocol_factory should instantiate object with BaseProtocol interface.
+ protocol_factory should instantiate object with BaseProtocol
+ interface.
Pipe is file-like object already switched to nonblocking.
Return pair (transport, protocol), where transport support
WriteTransport interface."""
optional keyword-only 'depth' argument can be used to skip the specified
number of frames from top of the stack.
- If the optional keyword-only 'limit' argument is provided, each call stack
- in the resulting graph is truncated to include at most ``abs(limit)``
- entries. If 'limit' is positive, the entries left are the closest to
- the invocation point. If 'limit' is negative, the topmost entries are
- left. If 'limit' is omitted or None, all entries are present.
- If 'limit' is 0, the call stack is not captured at all, only
- "awaited by" information is present.
+ If the optional keyword-only 'limit' argument is provided, each call
+ stack in the resulting graph is truncated to include at most
+ ``abs(limit)`` entries. If 'limit' is positive, the entries left are
+ the closest to the invocation point. If 'limit' is negative, the
+ topmost entries are left. If 'limit' is omitted or None, all entries
+ are present. If 'limit' is 0, the call stack is not captured at all,
+ only "awaited by" information is present.
"""
loop = events._get_running_loop()
class Event(mixins._LoopBoundMixin):
"""Asynchronous equivalent to threading.Event.
- Class implementing event objects. An event manages a flag that can be set
- to true with the set() method and reset to false with the clear() method.
- The wait() method blocks until the flag is true. The flag is initially
- false.
+ Class implementing event objects. An event manages a flag that can be
+ set to true with the set() method and reset to false with the clear()
+ method. The wait() method blocks until the flag is true. The flag is
+ initially false.
"""
def __init__(self):
"""A Semaphore implementation.
A semaphore manages an internal counter which is decremented by each
- acquire() call and incremented by each release() call. The counter
- can never go below zero; when acquire() finds that it is zero, it blocks,
- waiting until some other thread calls release().
+ acquire() call and incremented by each release() call. The counter
+ can never go below zero; when acquire() finds that it is zero, it
+ blocks, waiting until some other thread calls release().
Semaphores also support the context management protocol.
async def wait(self):
"""Wait for the barrier.
- When the specified number of tasks have started waiting, they are all
- simultaneously awoken.
+ When the specified number of tasks have started waiting, they are
+ all simultaneously awoken.
Returns an unique and individual index number from 0 to 'parties-1'.
"""
async with self._cond:
class Queue(mixins._LoopBoundMixin):
"""A queue, useful for coordinating producer and consumer coroutines.
- If maxsize is less than or equal to zero, the queue size is infinite. If it
- is an integer greater than 0, then "await put()" will block when the
- queue reaches maxsize, until an item is removed by get().
+ If maxsize is less than or equal to zero, the queue size is infinite.
+ If it is an integer greater than 0, then "await put()" will block when
+ the queue reaches maxsize, until an item is removed by get().
Unlike queue.Queue, you can reliably know this Queue's size
with qsize(), since your single-threaded asyncio application won't be
If queue is empty, wait until an item is available.
- Raises QueueShutDown if the queue has been shut down and is empty, or
- if the queue has been shut down immediately.
+ Raises QueueShutDown if the queue has been shut down and is empty,
+ or if the queue has been shut down immediately.
"""
while self.empty():
if self._is_shutdown and self.empty():
def get_nowait(self):
"""Remove and return an item from the queue.
- Return an item if one is immediately available, else raise QueueEmpty.
+ Return an item if one is immediately available, else raise
+ QueueEmpty.
- Raises QueueShutDown if the queue has been shut down and is empty, or
- if the queue has been shut down immediately.
+ Raises QueueShutDown if the queue has been shut down and is empty,
+ or if the queue has been shut down immediately.
"""
if self.empty():
if self._is_shutdown:
a subsequent call to task_done() tells the queue that the processing
on the task is complete.
- If a join() is currently blocking, it will resume when all items have
- been processed (meaning that a task_done() call was received for every
- item that had been put() into the queue).
+ If a join() is currently blocking, it will resume when all items
+ have been processed (meaning that a task_done() call was received
+ for every item that had been put() into the queue).
- Raises ValueError if called more times than there were items placed in
- the queue.
+ Raises ValueError if called more times than there were items placed
+ in the queue.
"""
if self._unfinished_tasks <= 0:
raise ValueError('task_done() called too many times')
async def join(self):
"""Block until all items in the queue have been gotten and processed.
- The count of unfinished tasks goes up whenever an item is added to the
- queue. The count goes down whenever a consumer calls task_done() to
- indicate that the item was retrieved and all work on it is complete.
- When the count of unfinished tasks drops to zero, join() unblocks.
+ The count of unfinished tasks goes up whenever an item is added to
+ the queue. The count goes down whenever a consumer calls
+ task_done() to indicate that the item was retrieved and all work on
+ it is complete. When the count of unfinished tasks drops to zero,
+ join() unblocks.
"""
if self._unfinished_tasks > 0:
await self._finished.wait()
with asyncio.Runner(debug=True) as runner:
runner.run(main())
- The run() method can be called multiple times within the runner's context.
+ The run() method can be called multiple times within the runner's
+ context.
This can be useful for interactive console (e.g. IPython),
unittest runners, console tools, -- everywhere when async code
async def sock_sendall(self, sock, data):
"""Send data to the socket.
- The socket must be connected to a remote socket. This method continues
- to send data from data until either all data has been sent or an
- error occurs. None is returned on success. On error, an exception is
- raised, and there is no way to determine how much data, if any, was
- successfully processed by the receiving end of the connection.
+ The socket must be connected to a remote socket. This method
+ continues to send data from data until either all data has been
+ sent or an error occurs. None is returned on success. On error,
+ an exception is raised, and there is no way to determine how much
+ data, if any, was successfully processed by the receiving end of
+ the connection.
"""
base_events._check_ssl_socket(sock)
if self._debug and sock.gettimeout() != 0:
async def sock_sendto(self, sock, data, address):
"""Send data to the socket.
- The socket must be connected to a remote socket. This method continues
- to send data from data until either all data has been sent or an
- error occurs. None is returned on success. On error, an exception is
- raised, and there is no way to determine how much data, if any, was
- successfully processed by the receiving end of the connection.
+ The socket must be connected to a remote socket. This method
+ continues to send data from data until either all data has been
+ sent or an error occurs. None is returned on success. On error,
+ an exception is raised, and there is no way to determine how much
+ data, if any, was successfully processed by the receiving end of
+ the connection.
"""
base_events._check_ssl_socket(sock)
if self._debug and sock.gettimeout() != 0:
async def sock_accept(self, sock):
"""Accept a connection.
- The socket must be bound to an address and listening for connections.
- The return value is a pair (conn, address) where conn is a new socket
- object usable to send and receive data on the connection, and address
- is the address bound to the socket on the other end of the connection.
+ The socket must be bound to an address and listening for
+ connections. The return value is a pair (conn, address) where
+ conn is a new socket object usable to send and receive data on the
+ connection, and address is the address bound to the socket on the
+ other end of the connection.
"""
base_events._check_ssl_socket(sock)
if self._debug and sock.gettimeout() != 0:
self._waiter = None
async def readline(self):
- """Read chunk of data from the stream until newline (b'\n') is found.
+ r"""Read chunk of data from the stream until newline (b'\n') is found.
- On success, return chunk that ends with newline. If only partial
+ On success, return chunk that ends with newline. If only partial
line can be read due to EOF, return incomplete line without
- terminating newline. When EOF was reached while no bytes read, empty
- bytes object is returned.
+ terminating newline. When EOF was reached while no bytes read,
+ empty bytes object is returned.
- If limit is reached, ValueError will be raised. In that case, if
+ If limit is reached, ValueError will be raised. In that case, if
newline was found, complete line including newline will be removed
- from internal buffer. Else, internal buffer will be cleared. Limit is
- compared against part of the line without newline.
+ from internal buffer. Else, internal buffer will be cleared.
+ Limit is compared against part of the line without newline.
If stream was paused, this function will automatically resume it if
needed.
"""Cancel the task group
`cancel()` will be called on any tasks in the group that aren't yet
- done, as well as the parent (body) of the group. This will cause the
- task group context manager to exit *without* `asyncio.CancelledError`
- being raised.
-
- If `cancel()` is called before entering the task group, the group will be
- cancelled upon entry. This is useful for patterns where one piece of
- code passes an unused TaskGroup instance to another in order to have
- the ability to cancel anything run within the group.
+ done, as well as the parent (body) of the group. This will cause
+ the task group context manager to exit *without*
+ `asyncio.CancelledError` being raised.
+
+ If `cancel()` is called before entering the task group, the group
+ will be cancelled upon entry. This is useful for patterns where
+ one piece of code passes an unused TaskGroup instance to another in
+ order to have the ability to cancel anything run within the group.
`cancel()` is idempotent and may be called after the task group has
already exited.
Run the supplied awaitables concurrently. The returned object can be
iterated to obtain the results of the awaitables as they finish.
- The object returned can be iterated as an asynchronous iterator or a plain
- iterator. When asynchronous iteration is used, the originally-supplied
- awaitables are yielded if they are tasks or futures. This makes it easy to
- correlate previously-scheduled tasks with their results:
+ The object returned can be iterated as an asynchronous iterator or
+ a plain iterator. When asynchronous iteration is used, the
+ originally-supplied awaitables are yielded if they are tasks or
+ futures. This makes it easy to correlate previously-scheduled tasks
+ with their results:
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
else:
print("IPv4 connection established.")
- During asynchronous iteration, implicitly-created tasks will be yielded for
- supplied awaitables that aren't tasks or futures.
+ During asynchronous iteration, implicitly-created tasks will be
+ yielded for supplied awaitables that aren't tasks or futures.
- When used as a plain iterator, each iteration yields a new coroutine that
- returns the result or raises the exception of the next completed awaitable.
- This pattern is compatible with Python versions older than 3.13:
+ When used as a plain iterator, each iteration yields a new coroutine
+ that returns the result or raises the exception of the next completed
+ awaitable. This pattern is compatible with Python versions older than
+ 3.13:
ipv4_connect = create_task(open_connection("127.0.0.1", 80))
ipv6_connect = create_task(open_connection("::1", 80))
tasks = [ipv4_connect, ipv6_connect]
for next_connect in as_completed(tasks):
- # next_connect is not one of the original task objects. It must be
- # awaited to obtain the result value or raise the exception of the
- # awaitable that finishes next.
+ # next_connect is not one of the original task objects. It must
+ # be awaited to obtain the result value or raise the exception
+ # of the awaitable that finishes next.
reader, writer = await next_connect
- A TimeoutError is raised if the timeout occurs before all awaitables are
- done. This is raised by the async for loop during asynchronous iteration or
- by the coroutines yielded during plain iteration.
+ A TimeoutError is raised if the timeout occurs before all awaitables
+ are done. This is raised by the async for loop during asynchronous
+ iteration or by the coroutines yielded during plain iteration.
"""
if inspect.isawaitable(fs):
raise TypeError(
def create_eager_task_factory(custom_task_constructor):
"""Create a function suitable for use as a task factory on an event-loop.
- Example usage:
+ Example usage:
- loop.set_task_factory(
- asyncio.create_eager_task_factory(my_task_constructor))
+ loop.set_task_factory(
+ asyncio.create_eager_task_factory(my_task_constructor))
- Now, tasks created will be started immediately (rather than being first
- scheduled to an event loop). The constructor argument can be any callable
- that returns a Task-compatible object and has a signature compatible
- with `Task.__init__`; it must have the `eager_start` keyword argument.
+ Now, tasks created will be started immediately (rather than being first
+ scheduled to an event loop). The constructor argument can be any
+ callable that returns a Task-compatible object and has a signature
+ compatible with `Task.__init__`; it must have the `eager_start`
+ keyword argument.
- Most applications will use `Task` for `custom_task_constructor` and in
- this case there's no need to call `create_eager_task_factory()`
- directly. Instead the global `eager_task_factory` instance can be
- used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`.
- """
+ Most applications will use `Task` for `custom_task_constructor` and in
+ this case there's no need to call `create_eager_task_factory()`
+ directly. Instead the global `eager_task_factory` instance can be
+ used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`.
+ """
def factory(loop, coro, *, eager_start=True, **kwargs):
return custom_task_constructor(
allowing context variables from the main thread to be accessed in the
separate thread.
- Return a coroutine that can be awaited to get the eventual result of *func*.
+ Return a coroutine that can be awaited to get the eventual result of
+ *func*.
"""
loop = events.get_running_loop()
ctx = contextvars.copy_context()
class Timeout:
"""Asynchronous context manager for cancelling overdue coroutines.
- Use `timeout()` or `timeout_at()` rather than instantiating this class directly.
+ Use `timeout()` or `timeout_at()` rather than instantiating this class
+ directly.
"""
def __init__(self, when: float | None) -> None:
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
"""Unix event loop.
- Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
+ Adds signal handling and UNIX Domain Socket support to
+ SelectorEventLoop.
"""
def __init__(self, selector=None):