1 """Event loop using a selector and related classes.
3 A selector is a "notify-when-ready" multiplexer. For a subclass which
4 also includes support for signal handling, see the unix_events sub-module.
7 __all__
= 'BaseSelectorEventLoop',
20 except ImportError: # pragma: no cover
23 from . import base_events
24 from . import constants
27 from . import protocols
28 from . import sslproto
29 from . import transports
31 from .log
import logger
33 _HAS_SENDMSG
= hasattr(socket
.socket
, 'sendmsg')
37 SC_IOV_MAX
= os
.sysconf('SC_IOV_MAX')
42 def _test_selector_event(selector
, fd
, event
):
43 # Test if the selector is monitoring 'event' events
44 # for the file descriptor 'fd'.
46 key
= selector
.get_key(fd
)
50 return bool(key
.events
& event
)
53 class BaseSelectorEventLoop(base_events
.BaseEventLoop
):
54 """Selector event loop.
56 See events.EventLoop for API specification.
59 def __init__(self
, selector
=None):
63 selector
= selectors
.DefaultSelector()
64 logger
.debug('Using selector: %s', selector
.__class
__.__name
__)
65 self
._selector
= selector
66 self
._make
_self
_pipe
()
67 self
._transports
= weakref
.WeakValueDictionary()
69 def _make_socket_transport(self
, sock
, protocol
, waiter
=None, *,
70 extra
=None, server
=None):
71 self
._ensure
_fd
_no
_transport
(sock
)
72 return _SelectorSocketTransport(self
, sock
, protocol
, waiter
,
75 def _make_ssl_transport(
76 self
, rawsock
, protocol
, sslcontext
, waiter
=None,
77 *, server_side
=False, server_hostname
=None,
78 extra
=None, server
=None,
79 ssl_handshake_timeout
=constants
.SSL_HANDSHAKE_TIMEOUT
,
80 ssl_shutdown_timeout
=constants
.SSL_SHUTDOWN_TIMEOUT
,
82 self
._ensure
_fd
_no
_transport
(rawsock
)
83 ssl_protocol
= sslproto
.SSLProtocol(
84 self
, protocol
, sslcontext
, waiter
,
85 server_side
, server_hostname
,
86 ssl_handshake_timeout
=ssl_handshake_timeout
,
87 ssl_shutdown_timeout
=ssl_shutdown_timeout
89 _SelectorSocketTransport(self
, rawsock
, ssl_protocol
,
90 extra
=extra
, server
=server
)
91 return ssl_protocol
._app
_transport
93 def _make_datagram_transport(self
, sock
, protocol
,
94 address
=None, waiter
=None, extra
=None):
95 self
._ensure
_fd
_no
_transport
(sock
)
96 return _SelectorDatagramTransport(self
, sock
, protocol
,
97 address
, waiter
, extra
)
100 if self
.is_running():
101 raise RuntimeError("Cannot close a running event loop")
104 self
._close
_self
_pipe
()
106 if self
._selector
is not None:
107 self
._selector
.close()
108 self
._selector
= None
110 def _close_self_pipe(self
):
111 self
._remove
_reader
(self
._ssock
.fileno())
116 self
._internal
_fds
-= 1
118 def _make_self_pipe(self
):
119 # A self-socket, really. :-)
120 self
._ssock
, self
._csock
= socket
.socketpair()
121 self
._ssock
.setblocking(False)
122 self
._csock
.setblocking(False)
123 self
._internal
_fds
+= 1
124 self
._add
_reader
(self
._ssock
.fileno(), self
._read
_from
_self
)
126 def _process_self_data(self
, data
):
129 def _read_from_self(self
):
132 data
= self
._ssock
.recv(4096)
135 self
._process
_self
_data
(data
)
136 except InterruptedError
:
138 except BlockingIOError
:
141 def _write_to_self(self
):
142 # This may be called from a different thread, possibly after
143 # _close_self_pipe() has been called or even while it is
144 # running. Guard for self._csock being None or closed. When
145 # a socket is closed, send() raises OSError (with errno set to
146 # EBADF, but let's not rely on the exact error code).
155 logger
.debug("Fail to write a null byte into the "
159 def _start_serving(self
, protocol_factory
, sock
,
160 sslcontext
=None, server
=None, backlog
=100,
161 ssl_handshake_timeout
=constants
.SSL_HANDSHAKE_TIMEOUT
,
162 ssl_shutdown_timeout
=constants
.SSL_SHUTDOWN_TIMEOUT
):
163 self
._add
_reader
(sock
.fileno(), self
._accept
_connection
,
164 protocol_factory
, sock
, sslcontext
, server
, backlog
,
165 ssl_handshake_timeout
, ssl_shutdown_timeout
)
167 def _accept_connection(
168 self
, protocol_factory
, sock
,
169 sslcontext
=None, server
=None, backlog
=100,
170 ssl_handshake_timeout
=constants
.SSL_HANDSHAKE_TIMEOUT
,
171 ssl_shutdown_timeout
=constants
.SSL_SHUTDOWN_TIMEOUT
):
172 # This method is only called once for each event loop tick where the
173 # listening socket has triggered an EVENT_READ. There may be multiple
174 # connections waiting for an .accept() so it is called in a loop.
175 # See https://bugs.python.org/issue27906 for more details.
176 for _
in range(backlog
+ 1):
178 conn
, addr
= sock
.accept()
180 logger
.debug("%r got a new connection from %r: %r",
182 conn
.setblocking(False)
183 except ConnectionAbortedError
:
184 # Discard connections that were aborted before accept().
186 except (BlockingIOError
, InterruptedError
):
187 # Early exit because of a signal or
188 # the socket accept buffer is empty.
190 except OSError as exc
:
191 # There's nowhere to send the error, so just log it.
192 if exc
.errno
in (errno
.EMFILE
, errno
.ENFILE
,
193 errno
.ENOBUFS
, errno
.ENOMEM
):
194 # Some platforms (e.g. Linux keep reporting the FD as
195 # ready, so we remove the read handler temporarily.
196 # We'll try again in a while.
197 self
.call_exception_handler({
198 'message': 'socket.accept() out of system resource',
200 'socket': trsock
.TransportSocket(sock
),
202 self
._remove
_reader
(sock
.fileno())
203 self
.call_later(constants
.ACCEPT_RETRY_DELAY
,
205 protocol_factory
, sock
, sslcontext
, server
,
206 backlog
, ssl_handshake_timeout
,
207 ssl_shutdown_timeout
)
209 raise # The event loop will catch, log and ignore it.
211 extra
= {'peername': addr
}
212 accept
= self
._accept
_connection
2(
213 protocol_factory
, conn
, extra
, sslcontext
, server
,
214 ssl_handshake_timeout
, ssl_shutdown_timeout
)
215 self
.create_task(accept
)
217 async def _accept_connection2(
218 self
, protocol_factory
, conn
, extra
,
219 sslcontext
=None, server
=None,
220 ssl_handshake_timeout
=constants
.SSL_HANDSHAKE_TIMEOUT
,
221 ssl_shutdown_timeout
=constants
.SSL_SHUTDOWN_TIMEOUT
):
225 protocol
= protocol_factory()
226 waiter
= self
.create_future()
228 transport
= self
._make
_ssl
_transport
(
229 conn
, protocol
, sslcontext
, waiter
=waiter
,
230 server_side
=True, extra
=extra
, server
=server
,
231 ssl_handshake_timeout
=ssl_handshake_timeout
,
232 ssl_shutdown_timeout
=ssl_shutdown_timeout
)
234 transport
= self
._make
_socket
_transport
(
235 conn
, protocol
, waiter
=waiter
, extra
=extra
,
240 except BaseException
:
242 # gh-109534: When an exception is raised by the SSLProtocol object the
243 # exception set in this future can keep the protocol object alive and
244 # cause a reference cycle.
247 # It's now up to the protocol to handle the connection.
249 except (SystemExit, KeyboardInterrupt):
251 except BaseException
as exc
:
255 'Error on transport creation for incoming connection',
258 if protocol
is not None:
259 context
['protocol'] = protocol
260 if transport
is not None:
261 context
['transport'] = transport
262 self
.call_exception_handler(context
)
264 def _ensure_fd_no_transport(self
, fd
):
266 if not isinstance(fileno
, int):
268 fileno
= int(fileno
.fileno())
269 except (AttributeError, TypeError, ValueError):
270 # This code matches selectors._fileobj_to_fd function.
271 raise ValueError(f
"Invalid file object: {fd!r}") from None
272 transport
= self
._transports
.get(fileno
)
273 if transport
and not transport
.is_closing():
275 f
'File descriptor {fd!r} is used by transport '
278 def _add_reader(self
, fd
, callback
, *args
):
280 handle
= events
.Handle(callback
, args
, self
, None)
281 key
= self
._selector
.get_map().get(fd
)
283 self
._selector
.register(fd
, selectors
.EVENT_READ
,
286 mask
, (reader
, writer
) = key
.events
, key
.data
287 self
._selector
.modify(fd
, mask | selectors
.EVENT_READ
,
289 if reader
is not None:
293 def _remove_reader(self
, fd
):
296 key
= self
._selector
.get_map().get(fd
)
299 mask
, (reader
, writer
) = key
.events
, key
.data
300 mask
&= ~selectors
.EVENT_READ
302 self
._selector
.unregister(fd
)
304 self
._selector
.modify(fd
, mask
, (None, writer
))
306 if reader
is not None:
312 def _add_writer(self
, fd
, callback
, *args
):
314 handle
= events
.Handle(callback
, args
, self
, None)
315 key
= self
._selector
.get_map().get(fd
)
317 self
._selector
.register(fd
, selectors
.EVENT_WRITE
,
320 mask
, (reader
, writer
) = key
.events
, key
.data
321 self
._selector
.modify(fd
, mask | selectors
.EVENT_WRITE
,
323 if writer
is not None:
327 def _remove_writer(self
, fd
):
328 """Remove a writer callback."""
331 key
= self
._selector
.get_map().get(fd
)
334 mask
, (reader
, writer
) = key
.events
, key
.data
335 # Remove both writer and connector.
336 mask
&= ~selectors
.EVENT_WRITE
338 self
._selector
.unregister(fd
)
340 self
._selector
.modify(fd
, mask
, (reader
, None))
342 if writer
is not None:
348 def add_reader(self
, fd
, callback
, *args
):
349 """Add a reader callback."""
350 self
._ensure
_fd
_no
_transport
(fd
)
351 self
._add
_reader
(fd
, callback
, *args
)
353 def remove_reader(self
, fd
):
354 """Remove a reader callback."""
355 self
._ensure
_fd
_no
_transport
(fd
)
356 return self
._remove
_reader
(fd
)
358 def add_writer(self
, fd
, callback
, *args
):
359 """Add a writer callback.."""
360 self
._ensure
_fd
_no
_transport
(fd
)
361 self
._add
_writer
(fd
, callback
, *args
)
363 def remove_writer(self
, fd
):
364 """Remove a writer callback."""
365 self
._ensure
_fd
_no
_transport
(fd
)
366 return self
._remove
_writer
(fd
)
368 async def sock_recv(self
, sock
, n
):
369 """Receive data from the socket.
371 The return value is a bytes object representing the data received.
372 The maximum amount of data to be received at once is specified by
375 base_events
._check
_ssl
_socket
(sock
)
376 if self
._debug
and sock
.gettimeout() != 0:
377 raise ValueError("the socket must be non-blocking")
380 except (BlockingIOError
, InterruptedError
):
382 fut
= self
.create_future()
384 self
._ensure
_fd
_no
_transport
(fd
)
385 handle
= self
._add
_reader
(fd
, self
._sock
_recv
, fut
, sock
, n
)
386 fut
.add_done_callback(
387 functools
.partial(self
._sock
_read
_done
, fd
, handle
=handle
))
390 def _sock_read_done(self
, fd
, fut
, handle
=None):
391 if handle
is None or not handle
.cancelled():
392 self
.remove_reader(fd
)
394 def _sock_recv(self
, fut
, sock
, n
):
395 # _sock_recv() can add itself as an I/O callback if the operation can't
396 # be done immediately. Don't use it directly, call sock_recv().
401 except (BlockingIOError
, InterruptedError
):
402 return # try again next time
403 except (SystemExit, KeyboardInterrupt):
405 except BaseException
as exc
:
406 fut
.set_exception(exc
)
410 async def sock_recv_into(self
, sock
, buf
):
411 """Receive data from the socket.
413 The received data is written into *buf* (a writable buffer).
414 The return value is the number of bytes written.
416 base_events
._check
_ssl
_socket
(sock
)
417 if self
._debug
and sock
.gettimeout() != 0:
418 raise ValueError("the socket must be non-blocking")
420 return sock
.recv_into(buf
)
421 except (BlockingIOError
, InterruptedError
):
423 fut
= self
.create_future()
425 self
._ensure
_fd
_no
_transport
(fd
)
426 handle
= self
._add
_reader
(fd
, self
._sock
_recv
_into
, fut
, sock
, buf
)
427 fut
.add_done_callback(
428 functools
.partial(self
._sock
_read
_done
, fd
, handle
=handle
))
431 def _sock_recv_into(self
, fut
, sock
, buf
):
432 # _sock_recv_into() can add itself as an I/O callback if the operation
433 # can't be done immediately. Don't use it directly, call
438 nbytes
= sock
.recv_into(buf
)
439 except (BlockingIOError
, InterruptedError
):
440 return # try again next time
441 except (SystemExit, KeyboardInterrupt):
443 except BaseException
as exc
:
444 fut
.set_exception(exc
)
446 fut
.set_result(nbytes
)
448 async def sock_recvfrom(self
, sock
, bufsize
):
449 """Receive a datagram from a datagram socket.
451 The return value is a tuple of (bytes, address) representing the
452 datagram received and the address it came from.
453 The maximum amount of data to be received at once is specified by
456 base_events
._check
_ssl
_socket
(sock
)
457 if self
._debug
and sock
.gettimeout() != 0:
458 raise ValueError("the socket must be non-blocking")
460 return sock
.recvfrom(bufsize
)
461 except (BlockingIOError
, InterruptedError
):
463 fut
= self
.create_future()
465 self
._ensure
_fd
_no
_transport
(fd
)
466 handle
= self
._add
_reader
(fd
, self
._sock
_recvfrom
, fut
, sock
, bufsize
)
467 fut
.add_done_callback(
468 functools
.partial(self
._sock
_read
_done
, fd
, handle
=handle
))
471 def _sock_recvfrom(self
, fut
, sock
, bufsize
):
472 # _sock_recvfrom() can add itself as an I/O callback if the operation
473 # can't be done immediately. Don't use it directly, call
478 result
= sock
.recvfrom(bufsize
)
479 except (BlockingIOError
, InterruptedError
):
480 return # try again next time
481 except (SystemExit, KeyboardInterrupt):
483 except BaseException
as exc
:
484 fut
.set_exception(exc
)
486 fut
.set_result(result
)
488 async def sock_recvfrom_into(self
, sock
, buf
, nbytes
=0):
489 """Receive data from the socket.
491 The received data is written into *buf* (a writable buffer).
492 The return value is a tuple of (number of bytes written, address).
494 base_events
._check
_ssl
_socket
(sock
)
495 if self
._debug
and sock
.gettimeout() != 0:
496 raise ValueError("the socket must be non-blocking")
501 return sock
.recvfrom_into(buf
, nbytes
)
502 except (BlockingIOError
, InterruptedError
):
504 fut
= self
.create_future()
506 self
._ensure
_fd
_no
_transport
(fd
)
507 handle
= self
._add
_reader
(fd
, self
._sock
_recvfrom
_into
, fut
, sock
, buf
,
509 fut
.add_done_callback(
510 functools
.partial(self
._sock
_read
_done
, fd
, handle
=handle
))
513 def _sock_recvfrom_into(self
, fut
, sock
, buf
, bufsize
):
514 # _sock_recv_into() can add itself as an I/O callback if the operation
515 # can't be done immediately. Don't use it directly, call
520 result
= sock
.recvfrom_into(buf
, bufsize
)
521 except (BlockingIOError
, InterruptedError
):
522 return # try again next time
523 except (SystemExit, KeyboardInterrupt):
525 except BaseException
as exc
:
526 fut
.set_exception(exc
)
528 fut
.set_result(result
)
530 async def sock_sendall(self
, sock
, data
):
531 """Send data to the socket.
533 The socket must be connected to a remote socket. This method continues
534 to send data from data until either all data has been sent or an
535 error occurs. None is returned on success. On error, an exception is
536 raised, and there is no way to determine how much data, if any, was
537 successfully processed by the receiving end of the connection.
539 base_events
._check
_ssl
_socket
(sock
)
540 if self
._debug
and sock
.gettimeout() != 0:
541 raise ValueError("the socket must be non-blocking")
544 except (BlockingIOError
, InterruptedError
):
551 fut
= self
.create_future()
553 self
._ensure
_fd
_no
_transport
(fd
)
554 # use a trick with a list in closure to store a mutable state
555 handle
= self
._add
_writer
(fd
, self
._sock
_sendall
, fut
, sock
,
556 memoryview(data
), [n
])
557 fut
.add_done_callback(
558 functools
.partial(self
._sock
_write
_done
, fd
, handle
=handle
))
561 def _sock_sendall(self
, fut
, sock
, view
, pos
):
563 # Future cancellation can be scheduled on previous loop iteration
567 n
= sock
.send(view
[start
:])
568 except (BlockingIOError
, InterruptedError
):
570 except (SystemExit, KeyboardInterrupt):
572 except BaseException
as exc
:
573 fut
.set_exception(exc
)
578 if start
== len(view
):
583 async def sock_sendto(self
, sock
, data
, address
):
584 """Send data to the socket.
586 The socket must be connected to a remote socket. This method continues
587 to send data from data until either all data has been sent or an
588 error occurs. None is returned on success. On error, an exception is
589 raised, and there is no way to determine how much data, if any, was
590 successfully processed by the receiving end of the connection.
592 base_events
._check
_ssl
_socket
(sock
)
593 if self
._debug
and sock
.gettimeout() != 0:
594 raise ValueError("the socket must be non-blocking")
596 return sock
.sendto(data
, address
)
597 except (BlockingIOError
, InterruptedError
):
600 fut
= self
.create_future()
602 self
._ensure
_fd
_no
_transport
(fd
)
603 # use a trick with a list in closure to store a mutable state
604 handle
= self
._add
_writer
(fd
, self
._sock
_sendto
, fut
, sock
, data
,
606 fut
.add_done_callback(
607 functools
.partial(self
._sock
_write
_done
, fd
, handle
=handle
))
610 def _sock_sendto(self
, fut
, sock
, data
, address
):
612 # Future cancellation can be scheduled on previous loop iteration
615 n
= sock
.sendto(data
, 0, address
)
616 except (BlockingIOError
, InterruptedError
):
618 except (SystemExit, KeyboardInterrupt):
620 except BaseException
as exc
:
621 fut
.set_exception(exc
)
625 async def sock_connect(self
, sock
, address
):
626 """Connect to a remote socket at address.
628 This method is a coroutine.
630 base_events
._check
_ssl
_socket
(sock
)
631 if self
._debug
and sock
.gettimeout() != 0:
632 raise ValueError("the socket must be non-blocking")
634 if sock
.family
== socket
.AF_INET
or (
635 base_events
._HAS
_IPv
6 and sock
.family
== socket
.AF_INET6
):
636 resolved
= await self
._ensure
_resolved
(
637 address
, family
=sock
.family
, type=sock
.type, proto
=sock
.proto
,
640 _
, _
, _
, _
, address
= resolved
[0]
642 fut
= self
.create_future()
643 self
._sock
_connect
(fut
, sock
, address
)
647 # Needed to break cycles when an exception occurs.
650 def _sock_connect(self
, fut
, sock
, address
):
653 sock
.connect(address
)
654 except (BlockingIOError
, InterruptedError
):
655 # Issue #23618: When the C function connect() fails with EINTR, the
656 # connection runs in background. We have to wait until the socket
657 # becomes writable to be notified when the connection succeed or
659 self
._ensure
_fd
_no
_transport
(fd
)
660 handle
= self
._add
_writer
(
661 fd
, self
._sock
_connect
_cb
, fut
, sock
, address
)
662 fut
.add_done_callback(
663 functools
.partial(self
._sock
_write
_done
, fd
, handle
=handle
))
664 except (SystemExit, KeyboardInterrupt):
666 except BaseException
as exc
:
667 fut
.set_exception(exc
)
673 def _sock_write_done(self
, fd
, fut
, handle
=None):
674 if handle
is None or not handle
.cancelled():
675 self
.remove_writer(fd
)
677 def _sock_connect_cb(self
, fut
, sock
, address
):
682 err
= sock
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_ERROR
)
684 # Jump to any except clause below.
685 raise OSError(err
, f
'Connect call failed {address}')
686 except (BlockingIOError
, InterruptedError
):
687 # socket is still registered, the callback will be retried later
689 except (SystemExit, KeyboardInterrupt):
691 except BaseException
as exc
:
692 fut
.set_exception(exc
)
698 async def sock_accept(self
, sock
):
699 """Accept a connection.
701 The socket must be bound to an address and listening for connections.
702 The return value is a pair (conn, address) where conn is a new socket
703 object usable to send and receive data on the connection, and address
704 is the address bound to the socket on the other end of the connection.
706 base_events
._check
_ssl
_socket
(sock
)
707 if self
._debug
and sock
.gettimeout() != 0:
708 raise ValueError("the socket must be non-blocking")
709 fut
= self
.create_future()
710 self
._sock
_accept
(fut
, sock
)
713 def _sock_accept(self
, fut
, sock
):
716 conn
, address
= sock
.accept()
717 conn
.setblocking(False)
718 except (BlockingIOError
, InterruptedError
):
719 self
._ensure
_fd
_no
_transport
(fd
)
720 handle
= self
._add
_reader
(fd
, self
._sock
_accept
, fut
, sock
)
721 fut
.add_done_callback(
722 functools
.partial(self
._sock
_read
_done
, fd
, handle
=handle
))
723 except (SystemExit, KeyboardInterrupt):
725 except BaseException
as exc
:
726 fut
.set_exception(exc
)
728 fut
.set_result((conn
, address
))
730 async def _sendfile_native(self
, transp
, file, offset
, count
):
731 del self
._transports
[transp
._sock
_fd
]
732 resume_reading
= transp
.is_reading()
733 transp
.pause_reading()
734 await transp
._make
_empty
_waiter
()
736 return await self
.sock_sendfile(transp
._sock
, file, offset
, count
,
739 transp
._reset
_empty
_waiter
()
741 transp
.resume_reading()
742 self
._transports
[transp
._sock
_fd
] = transp
744 def _process_events(self
, event_list
):
745 for key
, mask
in event_list
:
746 fileobj
, (reader
, writer
) = key
.fileobj
, key
.data
747 if mask
& selectors
.EVENT_READ
and reader
is not None:
748 if reader
._cancelled
:
749 self
._remove
_reader
(fileobj
)
751 self
._add
_callback
(reader
)
752 if mask
& selectors
.EVENT_WRITE
and writer
is not None:
753 if writer
._cancelled
:
754 self
._remove
_writer
(fileobj
)
756 self
._add
_callback
(writer
)
758 def _stop_serving(self
, sock
):
759 self
._remove
_reader
(sock
.fileno())
763 class _SelectorTransport(transports
._FlowControlMixin
,
764 transports
.Transport
):
766 max_size
= 256 * 1024 # Buffer size passed to recv().
768 # Attribute used in the destructor: it must be set even if the constructor
769 # is not called (see _SelectorSslTransport which may start by raising an
773 def __init__(self
, loop
, sock
, protocol
, extra
=None, server
=None):
774 super().__init
__(extra
, loop
)
775 self
._extra
['socket'] = trsock
.TransportSocket(sock
)
777 self
._extra
['sockname'] = sock
.getsockname()
779 self
._extra
['sockname'] = None
780 if 'peername' not in self
._extra
:
782 self
._extra
['peername'] = sock
.getpeername()
784 self
._extra
['peername'] = None
786 self
._sock
_fd
= sock
.fileno()
788 self
._protocol
_connected
= False
789 self
.set_protocol(protocol
)
791 self
._server
= server
792 self
._buffer
= collections
.deque()
793 self
._conn
_lost
= 0 # Set when call to connection_lost scheduled.
794 self
._closing
= False # Set when close() called.
795 self
._paused
= False # Set when pause_reading() called
797 if self
._server
is not None:
798 self
._server
._attach
(self
)
799 loop
._transports
[self
._sock
_fd
] = self
802 info
= [self
.__class
__.__name
__]
803 if self
._sock
is None:
804 info
.append('closed')
806 info
.append('closing')
807 info
.append(f
'fd={self._sock_fd}')
808 # test if the transport was closed
809 if self
._loop
is not None and not self
._loop
.is_closed():
810 polling
= _test_selector_event(self
._loop
._selector
,
811 self
._sock
_fd
, selectors
.EVENT_READ
)
813 info
.append('read=polling')
815 info
.append('read=idle')
817 polling
= _test_selector_event(self
._loop
._selector
,
819 selectors
.EVENT_WRITE
)
825 bufsize
= self
.get_write_buffer_size()
826 info
.append(f
'write=<{state}, bufsize={bufsize}>')
827 return '<{}>'.format(' '.join(info
))
830 self
._force
_close
(None)
832 def set_protocol(self
, protocol
):
833 self
._protocol
= protocol
834 self
._protocol
_connected
= True
836 def get_protocol(self
):
837 return self
._protocol
839 def is_closing(self
):
842 def is_reading(self
):
843 return not self
.is_closing() and not self
._paused
845 def pause_reading(self
):
846 if not self
.is_reading():
849 self
._loop
._remove
_reader
(self
._sock
_fd
)
850 if self
._loop
.get_debug():
851 logger
.debug("%r pauses reading", self
)
853 def resume_reading(self
):
854 if self
._closing
or not self
._paused
:
857 self
._add
_reader
(self
._sock
_fd
, self
._read
_ready
)
858 if self
._loop
.get_debug():
859 logger
.debug("%r resumes reading", self
)
865 self
._loop
._remove
_reader
(self
._sock
_fd
)
868 self
._loop
._remove
_writer
(self
._sock
_fd
)
869 self
._loop
.call_soon(self
._call
_connection
_lost
, None)
871 def __del__(self
, _warn
=warnings
.warn
):
872 if self
._sock
is not None:
873 _warn(f
"unclosed transport {self!r}", ResourceWarning
, source
=self
)
875 if self
._server
is not None:
876 self
._server
._detach
(self
)
878 def _fatal_error(self
, exc
, message
='Fatal error on transport'):
879 # Should be called from exception handler only.
880 if isinstance(exc
, OSError):
881 if self
._loop
.get_debug():
882 logger
.debug("%r: %s", self
, message
, exc_info
=True)
884 self
._loop
.call_exception_handler({
888 'protocol': self
._protocol
,
890 self
._force
_close
(exc
)
892 def _force_close(self
, exc
):
897 self
._loop
._remove
_writer
(self
._sock
_fd
)
898 if not self
._closing
:
900 self
._loop
._remove
_reader
(self
._sock
_fd
)
902 self
._loop
.call_soon(self
._call
_connection
_lost
, exc
)
904 def _call_connection_lost(self
, exc
):
906 if self
._protocol
_connected
:
907 self
._protocol
.connection_lost(exc
)
911 self
._protocol
= None
913 server
= self
._server
914 if server
is not None:
918 def get_write_buffer_size(self
):
919 return sum(map(len, self
._buffer
))
921 def _add_reader(self
, fd
, callback
, *args
):
922 if not self
.is_reading():
924 self
._loop
._add
_reader
(fd
, callback
, *args
)
927 class _SelectorSocketTransport(_SelectorTransport
):
929 _start_tls_compatible
= True
930 _sendfile_compatible
= constants
._SendfileMode
.TRY_NATIVE
932 def __init__(self
, loop
, sock
, protocol
, waiter
=None,
933 extra
=None, server
=None):
935 self
._read
_ready
_cb
= None
936 super().__init
__(loop
, sock
, protocol
, extra
, server
)
938 self
._empty
_waiter
= None
940 self
._write
_ready
= self
._write
_sendmsg
942 self
._write
_ready
= self
._write
_send
943 # Disable the Nagle algorithm -- small writes will be
944 # sent without waiting for the TCP ACK. This generally
945 # decreases the latency (in some cases significantly.)
946 base_events
._set
_nodelay
(self
._sock
)
948 self
._loop
.call_soon(self
._protocol
.connection_made
, self
)
949 # only start reading when connection_made() has been called
950 self
._loop
.call_soon(self
._add
_reader
,
951 self
._sock
_fd
, self
._read
_ready
)
952 if waiter
is not None:
953 # only wake up the waiter when connection_made() has been called
954 self
._loop
.call_soon(futures
._set
_result
_unless
_cancelled
,
957 def set_protocol(self
, protocol
):
958 if isinstance(protocol
, protocols
.BufferedProtocol
):
959 self
._read
_ready
_cb
= self
._read
_ready
__get
_buffer
961 self
._read
_ready
_cb
= self
._read
_ready
__data
_received
963 super().set_protocol(protocol
)
965 def _read_ready(self
):
966 self
._read
_ready
_cb
()
968 def _read_ready__get_buffer(self
):
973 buf
= self
._protocol
.get_buffer(-1)
975 raise RuntimeError('get_buffer() returned an empty buffer')
976 except (SystemExit, KeyboardInterrupt):
978 except BaseException
as exc
:
980 exc
, 'Fatal error: protocol.get_buffer() call failed.')
984 nbytes
= self
._sock
.recv_into(buf
)
985 except (BlockingIOError
, InterruptedError
):
987 except (SystemExit, KeyboardInterrupt):
989 except BaseException
as exc
:
990 self
._fatal
_error
(exc
, 'Fatal read error on socket transport')
994 self
._read
_ready
__on
_eof
()
998 self
._protocol
.buffer_updated(nbytes
)
999 except (SystemExit, KeyboardInterrupt):
1001 except BaseException
as exc
:
1003 exc
, 'Fatal error: protocol.buffer_updated() call failed.')
1005 def _read_ready__data_received(self
):
1009 data
= self
._sock
.recv(self
.max_size
)
1010 except (BlockingIOError
, InterruptedError
):
1012 except (SystemExit, KeyboardInterrupt):
1014 except BaseException
as exc
:
1015 self
._fatal
_error
(exc
, 'Fatal read error on socket transport')
1019 self
._read
_ready
__on
_eof
()
1023 self
._protocol
.data_received(data
)
1024 except (SystemExit, KeyboardInterrupt):
1026 except BaseException
as exc
:
1028 exc
, 'Fatal error: protocol.data_received() call failed.')
1030 def _read_ready__on_eof(self
):
1031 if self
._loop
.get_debug():
1032 logger
.debug("%r received EOF", self
)
1035 keep_open
= self
._protocol
.eof_received()
1036 except (SystemExit, KeyboardInterrupt):
1038 except BaseException
as exc
:
1040 exc
, 'Fatal error: protocol.eof_received() call failed.')
1044 # We're keeping the connection open so the
1045 # protocol can write more, but we still can't
1046 # receive more, so remove the reader callback.
1047 self
._loop
._remove
_reader
(self
._sock
_fd
)
1051 def write(self
, data
):
1052 if not isinstance(data
, (bytes
, bytearray
, memoryview
)):
1053 raise TypeError(f
'data argument must be a bytes-like object, '
1054 f
'not {type(data).__name__!r}')
1056 raise RuntimeError('Cannot call write() after write_eof()')
1057 if self
._empty
_waiter
is not None:
1058 raise RuntimeError('unable to write; sendfile is in progress')
1063 if self
._conn
_lost
>= constants
.LOG_THRESHOLD_FOR_CONNLOST_WRITES
:
1064 logger
.warning('socket.send() raised exception.')
1065 self
._conn
_lost
+= 1
1068 if not self
._buffer
:
1069 # Optimization: try to send now.
1071 n
= self
._sock
.send(data
)
1072 except (BlockingIOError
, InterruptedError
):
1074 except (SystemExit, KeyboardInterrupt):
1076 except BaseException
as exc
:
1077 self
._fatal
_error
(exc
, 'Fatal write error on socket transport')
1080 data
= memoryview(data
)[n
:]
1083 # Not all was written; register write handler.
1084 self
._loop
._add
_writer
(self
._sock
_fd
, self
._write
_ready
)
1086 # Add it to the buffer.
1087 self
._buffer
.append(data
)
1088 self
._maybe
_pause
_protocol
()
1090 def _get_sendmsg_buffer(self
):
1091 return itertools
.islice(self
._buffer
, SC_IOV_MAX
)
1093 def _write_sendmsg(self
):
1094 assert self
._buffer
, 'Data should not be empty'
1098 nbytes
= self
._sock
.sendmsg(self
._get
_sendmsg
_buffer
())
1099 self
._adjust
_leftover
_buffer
(nbytes
)
1100 except (BlockingIOError
, InterruptedError
):
1102 except (SystemExit, KeyboardInterrupt):
1104 except BaseException
as exc
:
1105 self
._loop
._remove
_writer
(self
._sock
_fd
)
1106 self
._buffer
.clear()
1107 self
._fatal
_error
(exc
, 'Fatal write error on socket transport')
1108 if self
._empty
_waiter
is not None:
1109 self
._empty
_waiter
.set_exception(exc
)
1111 self
._maybe
_resume
_protocol
() # May append to buffer.
1112 if not self
._buffer
:
1113 self
._loop
._remove
_writer
(self
._sock
_fd
)
1114 if self
._empty
_waiter
is not None:
1115 self
._empty
_waiter
.set_result(None)
1117 self
._call
_connection
_lost
(None)
1119 self
._sock
.shutdown(socket
.SHUT_WR
)
1121 def _adjust_leftover_buffer(self
, nbytes
: int) -> None:
1122 buffer = self
._buffer
1124 b
= buffer.popleft()
1129 buffer.appendleft(b
[nbytes
:])
1132 def _write_send(self
):
1133 assert self
._buffer
, 'Data should not be empty'
1137 buffer = self
._buffer
.popleft()
1138 n
= self
._sock
.send(buffer)
1139 if n
!= len(buffer):
1140 # Not all data was written
1141 self
._buffer
.appendleft(buffer[n
:])
1142 except (BlockingIOError
, InterruptedError
):
1144 except (SystemExit, KeyboardInterrupt):
1146 except BaseException
as exc
:
1147 self
._loop
._remove
_writer
(self
._sock
_fd
)
1148 self
._buffer
.clear()
1149 self
._fatal
_error
(exc
, 'Fatal write error on socket transport')
1150 if self
._empty
_waiter
is not None:
1151 self
._empty
_waiter
.set_exception(exc
)
1153 self
._maybe
_resume
_protocol
() # May append to buffer.
1154 if not self
._buffer
:
1155 self
._loop
._remove
_writer
(self
._sock
_fd
)
1156 if self
._empty
_waiter
is not None:
1157 self
._empty
_waiter
.set_result(None)
1159 self
._call
_connection
_lost
(None)
1161 self
._sock
.shutdown(socket
.SHUT_WR
)
1163 def write_eof(self
):
1164 if self
._closing
or self
._eof
:
1167 if not self
._buffer
:
1168 self
._sock
.shutdown(socket
.SHUT_WR
)
1170 def writelines(self
, list_of_data
):
1172 raise RuntimeError('Cannot call writelines() after write_eof()')
1173 if self
._empty
_waiter
is not None:
1174 raise RuntimeError('unable to writelines; sendfile is in progress')
1175 if not list_of_data
:
1177 self
._buffer
.extend([memoryview(data
) for data
in list_of_data
])
1179 # If the entire buffer couldn't be written, register a write handler
1181 self
._loop
._add
_writer
(self
._sock
_fd
, self
._write
_ready
)
1182 self
._maybe
_pause
_protocol
()
1184 def can_write_eof(self
):
1187 def _call_connection_lost(self
, exc
):
1189 super()._call
_connection
_lost
(exc
)
1191 self
._write
_ready
= None
1192 if self
._empty
_waiter
is not None:
1193 self
._empty
_waiter
.set_exception(
1194 ConnectionError("Connection is closed by peer"))
1196 def _make_empty_waiter(self
):
1197 if self
._empty
_waiter
is not None:
1198 raise RuntimeError("Empty waiter is already set")
1199 self
._empty
_waiter
= self
._loop
.create_future()
1200 if not self
._buffer
:
1201 self
._empty
_waiter
.set_result(None)
1202 return self
._empty
_waiter
1204 def _reset_empty_waiter(self
):
1205 self
._empty
_waiter
= None
1208 self
._read
_ready
_cb
= None
1212 class _SelectorDatagramTransport(_SelectorTransport
, transports
.DatagramTransport
):
1214 _buffer_factory
= collections
.deque
1216 def __init__(self
, loop
, sock
, protocol
, address
=None,
1217 waiter
=None, extra
=None):
1218 super().__init
__(loop
, sock
, protocol
, extra
)
1219 self
._address
= address
1220 self
._buffer
_size
= 0
1221 self
._loop
.call_soon(self
._protocol
.connection_made
, self
)
1222 # only start reading when connection_made() has been called
1223 self
._loop
.call_soon(self
._add
_reader
,
1224 self
._sock
_fd
, self
._read
_ready
)
1225 if waiter
is not None:
1226 # only wake up the waiter when connection_made() has been called
1227 self
._loop
.call_soon(futures
._set
_result
_unless
_cancelled
,
1230 def get_write_buffer_size(self
):
1231 return self
._buffer
_size
1233 def _read_ready(self
):
1237 data
, addr
= self
._sock
.recvfrom(self
.max_size
)
1238 except (BlockingIOError
, InterruptedError
):
1240 except OSError as exc
:
1241 self
._protocol
.error_received(exc
)
1242 except (SystemExit, KeyboardInterrupt):
1244 except BaseException
as exc
:
1245 self
._fatal
_error
(exc
, 'Fatal read error on datagram transport')
1247 self
._protocol
.datagram_received(data
, addr
)
1249 def sendto(self
, data
, addr
=None):
1250 if not isinstance(data
, (bytes
, bytearray
, memoryview
)):
1251 raise TypeError(f
'data argument must be a bytes-like object, '
1252 f
'not {type(data).__name__!r}')
1255 if addr
not in (None, self
._address
):
1257 f
'Invalid address: must be None or {self._address}')
1258 addr
= self
._address
1260 if self
._conn
_lost
and self
._address
:
1261 if self
._conn
_lost
>= constants
.LOG_THRESHOLD_FOR_CONNLOST_WRITES
:
1262 logger
.warning('socket.send() raised exception.')
1263 self
._conn
_lost
+= 1
1266 if not self
._buffer
:
1267 # Attempt to send it right away first.
1269 if self
._extra
['peername']:
1270 self
._sock
.send(data
)
1272 self
._sock
.sendto(data
, addr
)
1274 except (BlockingIOError
, InterruptedError
):
1275 self
._loop
._add
_writer
(self
._sock
_fd
, self
._sendto
_ready
)
1276 except OSError as exc
:
1277 self
._protocol
.error_received(exc
)
1279 except (SystemExit, KeyboardInterrupt):
1281 except BaseException
as exc
:
1283 exc
, 'Fatal write error on datagram transport')
1286 # Ensure that what we buffer is immutable.
1287 self
._buffer
.append((bytes(data
), addr
))
1288 self
._buffer
_size
+= len(data
) + 8 # include header bytes
1289 self
._maybe
_pause
_protocol
()
1291 def _sendto_ready(self
):
1293 data
, addr
= self
._buffer
.popleft()
1294 self
._buffer
_size
-= len(data
)
1296 if self
._extra
['peername']:
1297 self
._sock
.send(data
)
1299 self
._sock
.sendto(data
, addr
)
1300 except (BlockingIOError
, InterruptedError
):
1301 self
._buffer
.appendleft((data
, addr
)) # Try again later.
1302 self
._buffer
_size
+= len(data
)
1304 except OSError as exc
:
1305 self
._protocol
.error_received(exc
)
1307 except (SystemExit, KeyboardInterrupt):
1309 except BaseException
as exc
:
1311 exc
, 'Fatal write error on datagram transport')
1314 self
._maybe
_resume
_protocol
() # May append to buffer.
1315 if not self
._buffer
:
1316 self
._loop
._remove
_writer
(self
._sock
_fd
)
1318 self
._call
_connection
_lost
(None)