]> git.ipfire.org Git - thirdparty/Python/cpython.git/blame_incremental - Lib/asyncio/selector_events.py
gh-135773: have pyvenv.cfg without home key anchor a venv and deduce home (#135831)
[thirdparty/Python/cpython.git] / Lib / asyncio / selector_events.py
... / ...
CommitLineData
1"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer. For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
7__all__ = 'BaseSelectorEventLoop',
8
9import collections
10import errno
11import functools
12import itertools
13import os
14import selectors
15import socket
16import warnings
17import weakref
18try:
19 import ssl
20except ImportError: # pragma: no cover
21 ssl = None
22
23from . import base_events
24from . import constants
25from . import events
26from . import futures
27from . import protocols
28from . import sslproto
29from . import transports
30from . import trsock
31from .log import logger
32
33_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg')
34
35if _HAS_SENDMSG:
36 try:
37 SC_IOV_MAX = os.sysconf('SC_IOV_MAX')
38 except OSError:
39 # Fallback to send
40 _HAS_SENDMSG = False
41
42def _test_selector_event(selector, fd, event):
43 # Test if the selector is monitoring 'event' events
44 # for the file descriptor 'fd'.
45 try:
46 key = selector.get_key(fd)
47 except KeyError:
48 return False
49 else:
50 return bool(key.events & event)
51
52
53class BaseSelectorEventLoop(base_events.BaseEventLoop):
54 """Selector event loop.
55
56 See events.EventLoop for API specification.
57 """
58
59 def __init__(self, selector=None):
60 super().__init__()
61
62 if selector is None:
63 selector = selectors.DefaultSelector()
64 logger.debug('Using selector: %s', selector.__class__.__name__)
65 self._selector = selector
66 self._make_self_pipe()
67 self._transports = weakref.WeakValueDictionary()
68
69 def _make_socket_transport(self, sock, protocol, waiter=None, *,
70 extra=None, server=None):
71 self._ensure_fd_no_transport(sock)
72 return _SelectorSocketTransport(self, sock, protocol, waiter,
73 extra, server)
74
75 def _make_ssl_transport(
76 self, rawsock, protocol, sslcontext, waiter=None,
77 *, server_side=False, server_hostname=None,
78 extra=None, server=None,
79 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
80 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
81 ):
82 self._ensure_fd_no_transport(rawsock)
83 ssl_protocol = sslproto.SSLProtocol(
84 self, protocol, sslcontext, waiter,
85 server_side, server_hostname,
86 ssl_handshake_timeout=ssl_handshake_timeout,
87 ssl_shutdown_timeout=ssl_shutdown_timeout
88 )
89 _SelectorSocketTransport(self, rawsock, ssl_protocol,
90 extra=extra, server=server)
91 return ssl_protocol._app_transport
92
93 def _make_datagram_transport(self, sock, protocol,
94 address=None, waiter=None, extra=None):
95 self._ensure_fd_no_transport(sock)
96 return _SelectorDatagramTransport(self, sock, protocol,
97 address, waiter, extra)
98
99 def close(self):
100 if self.is_running():
101 raise RuntimeError("Cannot close a running event loop")
102 if self.is_closed():
103 return
104 self._close_self_pipe()
105 super().close()
106 if self._selector is not None:
107 self._selector.close()
108 self._selector = None
109
110 def _close_self_pipe(self):
111 self._remove_reader(self._ssock.fileno())
112 self._ssock.close()
113 self._ssock = None
114 self._csock.close()
115 self._csock = None
116 self._internal_fds -= 1
117
118 def _make_self_pipe(self):
119 # A self-socket, really. :-)
120 self._ssock, self._csock = socket.socketpair()
121 self._ssock.setblocking(False)
122 self._csock.setblocking(False)
123 self._internal_fds += 1
124 self._add_reader(self._ssock.fileno(), self._read_from_self)
125
126 def _process_self_data(self, data):
127 pass
128
129 def _read_from_self(self):
130 while True:
131 try:
132 data = self._ssock.recv(4096)
133 if not data:
134 break
135 self._process_self_data(data)
136 except InterruptedError:
137 continue
138 except BlockingIOError:
139 break
140
141 def _write_to_self(self):
142 # This may be called from a different thread, possibly after
143 # _close_self_pipe() has been called or even while it is
144 # running. Guard for self._csock being None or closed. When
145 # a socket is closed, send() raises OSError (with errno set to
146 # EBADF, but let's not rely on the exact error code).
147 csock = self._csock
148 if csock is None:
149 return
150
151 try:
152 csock.send(b'\0')
153 except OSError:
154 if self._debug:
155 logger.debug("Fail to write a null byte into the "
156 "self-pipe socket",
157 exc_info=True)
158
159 def _start_serving(self, protocol_factory, sock,
160 sslcontext=None, server=None, backlog=100,
161 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
162 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
163 self._add_reader(sock.fileno(), self._accept_connection,
164 protocol_factory, sock, sslcontext, server, backlog,
165 ssl_handshake_timeout, ssl_shutdown_timeout)
166
167 def _accept_connection(
168 self, protocol_factory, sock,
169 sslcontext=None, server=None, backlog=100,
170 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
171 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
172 # This method is only called once for each event loop tick where the
173 # listening socket has triggered an EVENT_READ. There may be multiple
174 # connections waiting for an .accept() so it is called in a loop.
175 # See https://bugs.python.org/issue27906 for more details.
176 for _ in range(backlog + 1):
177 try:
178 conn, addr = sock.accept()
179 if self._debug:
180 logger.debug("%r got a new connection from %r: %r",
181 server, addr, conn)
182 conn.setblocking(False)
183 except ConnectionAbortedError:
184 # Discard connections that were aborted before accept().
185 continue
186 except (BlockingIOError, InterruptedError):
187 # Early exit because of a signal or
188 # the socket accept buffer is empty.
189 return
190 except OSError as exc:
191 # There's nowhere to send the error, so just log it.
192 if exc.errno in (errno.EMFILE, errno.ENFILE,
193 errno.ENOBUFS, errno.ENOMEM):
194 # Some platforms (e.g. Linux keep reporting the FD as
195 # ready, so we remove the read handler temporarily.
196 # We'll try again in a while.
197 self.call_exception_handler({
198 'message': 'socket.accept() out of system resource',
199 'exception': exc,
200 'socket': trsock.TransportSocket(sock),
201 })
202 self._remove_reader(sock.fileno())
203 self.call_later(constants.ACCEPT_RETRY_DELAY,
204 self._start_serving,
205 protocol_factory, sock, sslcontext, server,
206 backlog, ssl_handshake_timeout,
207 ssl_shutdown_timeout)
208 else:
209 raise # The event loop will catch, log and ignore it.
210 else:
211 extra = {'peername': addr}
212 accept = self._accept_connection2(
213 protocol_factory, conn, extra, sslcontext, server,
214 ssl_handshake_timeout, ssl_shutdown_timeout)
215 self.create_task(accept)
216
217 async def _accept_connection2(
218 self, protocol_factory, conn, extra,
219 sslcontext=None, server=None,
220 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
221 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
222 protocol = None
223 transport = None
224 try:
225 protocol = protocol_factory()
226 waiter = self.create_future()
227 if sslcontext:
228 transport = self._make_ssl_transport(
229 conn, protocol, sslcontext, waiter=waiter,
230 server_side=True, extra=extra, server=server,
231 ssl_handshake_timeout=ssl_handshake_timeout,
232 ssl_shutdown_timeout=ssl_shutdown_timeout)
233 else:
234 transport = self._make_socket_transport(
235 conn, protocol, waiter=waiter, extra=extra,
236 server=server)
237
238 try:
239 await waiter
240 except BaseException:
241 transport.close()
242 # gh-109534: When an exception is raised by the SSLProtocol object the
243 # exception set in this future can keep the protocol object alive and
244 # cause a reference cycle.
245 waiter = None
246 raise
247 # It's now up to the protocol to handle the connection.
248
249 except (SystemExit, KeyboardInterrupt):
250 raise
251 except BaseException as exc:
252 if self._debug:
253 context = {
254 'message':
255 'Error on transport creation for incoming connection',
256 'exception': exc,
257 }
258 if protocol is not None:
259 context['protocol'] = protocol
260 if transport is not None:
261 context['transport'] = transport
262 self.call_exception_handler(context)
263
264 def _ensure_fd_no_transport(self, fd):
265 fileno = fd
266 if not isinstance(fileno, int):
267 try:
268 fileno = int(fileno.fileno())
269 except (AttributeError, TypeError, ValueError):
270 # This code matches selectors._fileobj_to_fd function.
271 raise ValueError(f"Invalid file object: {fd!r}") from None
272 transport = self._transports.get(fileno)
273 if transport and not transport.is_closing():
274 raise RuntimeError(
275 f'File descriptor {fd!r} is used by transport '
276 f'{transport!r}')
277
278 def _add_reader(self, fd, callback, *args):
279 self._check_closed()
280 handle = events.Handle(callback, args, self, None)
281 key = self._selector.get_map().get(fd)
282 if key is None:
283 self._selector.register(fd, selectors.EVENT_READ,
284 (handle, None))
285 else:
286 mask, (reader, writer) = key.events, key.data
287 self._selector.modify(fd, mask | selectors.EVENT_READ,
288 (handle, writer))
289 if reader is not None:
290 reader.cancel()
291 return handle
292
293 def _remove_reader(self, fd):
294 if self.is_closed():
295 return False
296 key = self._selector.get_map().get(fd)
297 if key is None:
298 return False
299 mask, (reader, writer) = key.events, key.data
300 mask &= ~selectors.EVENT_READ
301 if not mask:
302 self._selector.unregister(fd)
303 else:
304 self._selector.modify(fd, mask, (None, writer))
305
306 if reader is not None:
307 reader.cancel()
308 return True
309 else:
310 return False
311
312 def _add_writer(self, fd, callback, *args):
313 self._check_closed()
314 handle = events.Handle(callback, args, self, None)
315 key = self._selector.get_map().get(fd)
316 if key is None:
317 self._selector.register(fd, selectors.EVENT_WRITE,
318 (None, handle))
319 else:
320 mask, (reader, writer) = key.events, key.data
321 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
322 (reader, handle))
323 if writer is not None:
324 writer.cancel()
325 return handle
326
327 def _remove_writer(self, fd):
328 """Remove a writer callback."""
329 if self.is_closed():
330 return False
331 key = self._selector.get_map().get(fd)
332 if key is None:
333 return False
334 mask, (reader, writer) = key.events, key.data
335 # Remove both writer and connector.
336 mask &= ~selectors.EVENT_WRITE
337 if not mask:
338 self._selector.unregister(fd)
339 else:
340 self._selector.modify(fd, mask, (reader, None))
341
342 if writer is not None:
343 writer.cancel()
344 return True
345 else:
346 return False
347
348 def add_reader(self, fd, callback, *args):
349 """Add a reader callback."""
350 self._ensure_fd_no_transport(fd)
351 self._add_reader(fd, callback, *args)
352
353 def remove_reader(self, fd):
354 """Remove a reader callback."""
355 self._ensure_fd_no_transport(fd)
356 return self._remove_reader(fd)
357
358 def add_writer(self, fd, callback, *args):
359 """Add a writer callback.."""
360 self._ensure_fd_no_transport(fd)
361 self._add_writer(fd, callback, *args)
362
363 def remove_writer(self, fd):
364 """Remove a writer callback."""
365 self._ensure_fd_no_transport(fd)
366 return self._remove_writer(fd)
367
368 async def sock_recv(self, sock, n):
369 """Receive data from the socket.
370
371 The return value is a bytes object representing the data received.
372 The maximum amount of data to be received at once is specified by
373 nbytes.
374 """
375 base_events._check_ssl_socket(sock)
376 if self._debug and sock.gettimeout() != 0:
377 raise ValueError("the socket must be non-blocking")
378 try:
379 return sock.recv(n)
380 except (BlockingIOError, InterruptedError):
381 pass
382 fut = self.create_future()
383 fd = sock.fileno()
384 self._ensure_fd_no_transport(fd)
385 handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
386 fut.add_done_callback(
387 functools.partial(self._sock_read_done, fd, handle=handle))
388 return await fut
389
390 def _sock_read_done(self, fd, fut, handle=None):
391 if handle is None or not handle.cancelled():
392 self.remove_reader(fd)
393
394 def _sock_recv(self, fut, sock, n):
395 # _sock_recv() can add itself as an I/O callback if the operation can't
396 # be done immediately. Don't use it directly, call sock_recv().
397 if fut.done():
398 return
399 try:
400 data = sock.recv(n)
401 except (BlockingIOError, InterruptedError):
402 return # try again next time
403 except (SystemExit, KeyboardInterrupt):
404 raise
405 except BaseException as exc:
406 fut.set_exception(exc)
407 else:
408 fut.set_result(data)
409
410 async def sock_recv_into(self, sock, buf):
411 """Receive data from the socket.
412
413 The received data is written into *buf* (a writable buffer).
414 The return value is the number of bytes written.
415 """
416 base_events._check_ssl_socket(sock)
417 if self._debug and sock.gettimeout() != 0:
418 raise ValueError("the socket must be non-blocking")
419 try:
420 return sock.recv_into(buf)
421 except (BlockingIOError, InterruptedError):
422 pass
423 fut = self.create_future()
424 fd = sock.fileno()
425 self._ensure_fd_no_transport(fd)
426 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
427 fut.add_done_callback(
428 functools.partial(self._sock_read_done, fd, handle=handle))
429 return await fut
430
431 def _sock_recv_into(self, fut, sock, buf):
432 # _sock_recv_into() can add itself as an I/O callback if the operation
433 # can't be done immediately. Don't use it directly, call
434 # sock_recv_into().
435 if fut.done():
436 return
437 try:
438 nbytes = sock.recv_into(buf)
439 except (BlockingIOError, InterruptedError):
440 return # try again next time
441 except (SystemExit, KeyboardInterrupt):
442 raise
443 except BaseException as exc:
444 fut.set_exception(exc)
445 else:
446 fut.set_result(nbytes)
447
448 async def sock_recvfrom(self, sock, bufsize):
449 """Receive a datagram from a datagram socket.
450
451 The return value is a tuple of (bytes, address) representing the
452 datagram received and the address it came from.
453 The maximum amount of data to be received at once is specified by
454 nbytes.
455 """
456 base_events._check_ssl_socket(sock)
457 if self._debug and sock.gettimeout() != 0:
458 raise ValueError("the socket must be non-blocking")
459 try:
460 return sock.recvfrom(bufsize)
461 except (BlockingIOError, InterruptedError):
462 pass
463 fut = self.create_future()
464 fd = sock.fileno()
465 self._ensure_fd_no_transport(fd)
466 handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
467 fut.add_done_callback(
468 functools.partial(self._sock_read_done, fd, handle=handle))
469 return await fut
470
471 def _sock_recvfrom(self, fut, sock, bufsize):
472 # _sock_recvfrom() can add itself as an I/O callback if the operation
473 # can't be done immediately. Don't use it directly, call
474 # sock_recvfrom().
475 if fut.done():
476 return
477 try:
478 result = sock.recvfrom(bufsize)
479 except (BlockingIOError, InterruptedError):
480 return # try again next time
481 except (SystemExit, KeyboardInterrupt):
482 raise
483 except BaseException as exc:
484 fut.set_exception(exc)
485 else:
486 fut.set_result(result)
487
488 async def sock_recvfrom_into(self, sock, buf, nbytes=0):
489 """Receive data from the socket.
490
491 The received data is written into *buf* (a writable buffer).
492 The return value is a tuple of (number of bytes written, address).
493 """
494 base_events._check_ssl_socket(sock)
495 if self._debug and sock.gettimeout() != 0:
496 raise ValueError("the socket must be non-blocking")
497 if not nbytes:
498 nbytes = len(buf)
499
500 try:
501 return sock.recvfrom_into(buf, nbytes)
502 except (BlockingIOError, InterruptedError):
503 pass
504 fut = self.create_future()
505 fd = sock.fileno()
506 self._ensure_fd_no_transport(fd)
507 handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
508 nbytes)
509 fut.add_done_callback(
510 functools.partial(self._sock_read_done, fd, handle=handle))
511 return await fut
512
513 def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
514 # _sock_recv_into() can add itself as an I/O callback if the operation
515 # can't be done immediately. Don't use it directly, call
516 # sock_recv_into().
517 if fut.done():
518 return
519 try:
520 result = sock.recvfrom_into(buf, bufsize)
521 except (BlockingIOError, InterruptedError):
522 return # try again next time
523 except (SystemExit, KeyboardInterrupt):
524 raise
525 except BaseException as exc:
526 fut.set_exception(exc)
527 else:
528 fut.set_result(result)
529
530 async def sock_sendall(self, sock, data):
531 """Send data to the socket.
532
533 The socket must be connected to a remote socket. This method continues
534 to send data from data until either all data has been sent or an
535 error occurs. None is returned on success. On error, an exception is
536 raised, and there is no way to determine how much data, if any, was
537 successfully processed by the receiving end of the connection.
538 """
539 base_events._check_ssl_socket(sock)
540 if self._debug and sock.gettimeout() != 0:
541 raise ValueError("the socket must be non-blocking")
542 try:
543 n = sock.send(data)
544 except (BlockingIOError, InterruptedError):
545 n = 0
546
547 if n == len(data):
548 # all data sent
549 return
550
551 fut = self.create_future()
552 fd = sock.fileno()
553 self._ensure_fd_no_transport(fd)
554 # use a trick with a list in closure to store a mutable state
555 handle = self._add_writer(fd, self._sock_sendall, fut, sock,
556 memoryview(data), [n])
557 fut.add_done_callback(
558 functools.partial(self._sock_write_done, fd, handle=handle))
559 return await fut
560
561 def _sock_sendall(self, fut, sock, view, pos):
562 if fut.done():
563 # Future cancellation can be scheduled on previous loop iteration
564 return
565 start = pos[0]
566 try:
567 n = sock.send(view[start:])
568 except (BlockingIOError, InterruptedError):
569 return
570 except (SystemExit, KeyboardInterrupt):
571 raise
572 except BaseException as exc:
573 fut.set_exception(exc)
574 return
575
576 start += n
577
578 if start == len(view):
579 fut.set_result(None)
580 else:
581 pos[0] = start
582
583 async def sock_sendto(self, sock, data, address):
584 """Send data to the socket.
585
586 The socket must be connected to a remote socket. This method continues
587 to send data from data until either all data has been sent or an
588 error occurs. None is returned on success. On error, an exception is
589 raised, and there is no way to determine how much data, if any, was
590 successfully processed by the receiving end of the connection.
591 """
592 base_events._check_ssl_socket(sock)
593 if self._debug and sock.gettimeout() != 0:
594 raise ValueError("the socket must be non-blocking")
595 try:
596 return sock.sendto(data, address)
597 except (BlockingIOError, InterruptedError):
598 pass
599
600 fut = self.create_future()
601 fd = sock.fileno()
602 self._ensure_fd_no_transport(fd)
603 # use a trick with a list in closure to store a mutable state
604 handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
605 address)
606 fut.add_done_callback(
607 functools.partial(self._sock_write_done, fd, handle=handle))
608 return await fut
609
610 def _sock_sendto(self, fut, sock, data, address):
611 if fut.done():
612 # Future cancellation can be scheduled on previous loop iteration
613 return
614 try:
615 n = sock.sendto(data, 0, address)
616 except (BlockingIOError, InterruptedError):
617 return
618 except (SystemExit, KeyboardInterrupt):
619 raise
620 except BaseException as exc:
621 fut.set_exception(exc)
622 else:
623 fut.set_result(n)
624
625 async def sock_connect(self, sock, address):
626 """Connect to a remote socket at address.
627
628 This method is a coroutine.
629 """
630 base_events._check_ssl_socket(sock)
631 if self._debug and sock.gettimeout() != 0:
632 raise ValueError("the socket must be non-blocking")
633
634 if sock.family == socket.AF_INET or (
635 base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
636 resolved = await self._ensure_resolved(
637 address, family=sock.family, type=sock.type, proto=sock.proto,
638 loop=self,
639 )
640 _, _, _, _, address = resolved[0]
641
642 fut = self.create_future()
643 self._sock_connect(fut, sock, address)
644 try:
645 return await fut
646 finally:
647 # Needed to break cycles when an exception occurs.
648 fut = None
649
650 def _sock_connect(self, fut, sock, address):
651 fd = sock.fileno()
652 try:
653 sock.connect(address)
654 except (BlockingIOError, InterruptedError):
655 # Issue #23618: When the C function connect() fails with EINTR, the
656 # connection runs in background. We have to wait until the socket
657 # becomes writable to be notified when the connection succeed or
658 # fails.
659 self._ensure_fd_no_transport(fd)
660 handle = self._add_writer(
661 fd, self._sock_connect_cb, fut, sock, address)
662 fut.add_done_callback(
663 functools.partial(self._sock_write_done, fd, handle=handle))
664 except (SystemExit, KeyboardInterrupt):
665 raise
666 except BaseException as exc:
667 fut.set_exception(exc)
668 else:
669 fut.set_result(None)
670 finally:
671 fut = None
672
673 def _sock_write_done(self, fd, fut, handle=None):
674 if handle is None or not handle.cancelled():
675 self.remove_writer(fd)
676
677 def _sock_connect_cb(self, fut, sock, address):
678 if fut.done():
679 return
680
681 try:
682 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
683 if err != 0:
684 # Jump to any except clause below.
685 raise OSError(err, f'Connect call failed {address}')
686 except (BlockingIOError, InterruptedError):
687 # socket is still registered, the callback will be retried later
688 pass
689 except (SystemExit, KeyboardInterrupt):
690 raise
691 except BaseException as exc:
692 fut.set_exception(exc)
693 else:
694 fut.set_result(None)
695 finally:
696 fut = None
697
698 async def sock_accept(self, sock):
699 """Accept a connection.
700
701 The socket must be bound to an address and listening for connections.
702 The return value is a pair (conn, address) where conn is a new socket
703 object usable to send and receive data on the connection, and address
704 is the address bound to the socket on the other end of the connection.
705 """
706 base_events._check_ssl_socket(sock)
707 if self._debug and sock.gettimeout() != 0:
708 raise ValueError("the socket must be non-blocking")
709 fut = self.create_future()
710 self._sock_accept(fut, sock)
711 return await fut
712
713 def _sock_accept(self, fut, sock):
714 fd = sock.fileno()
715 try:
716 conn, address = sock.accept()
717 conn.setblocking(False)
718 except (BlockingIOError, InterruptedError):
719 self._ensure_fd_no_transport(fd)
720 handle = self._add_reader(fd, self._sock_accept, fut, sock)
721 fut.add_done_callback(
722 functools.partial(self._sock_read_done, fd, handle=handle))
723 except (SystemExit, KeyboardInterrupt):
724 raise
725 except BaseException as exc:
726 fut.set_exception(exc)
727 else:
728 fut.set_result((conn, address))
729
730 async def _sendfile_native(self, transp, file, offset, count):
731 del self._transports[transp._sock_fd]
732 resume_reading = transp.is_reading()
733 transp.pause_reading()
734 await transp._make_empty_waiter()
735 try:
736 return await self.sock_sendfile(transp._sock, file, offset, count,
737 fallback=False)
738 finally:
739 transp._reset_empty_waiter()
740 if resume_reading:
741 transp.resume_reading()
742 self._transports[transp._sock_fd] = transp
743
744 def _process_events(self, event_list):
745 for key, mask in event_list:
746 fileobj, (reader, writer) = key.fileobj, key.data
747 if mask & selectors.EVENT_READ and reader is not None:
748 if reader._cancelled:
749 self._remove_reader(fileobj)
750 else:
751 self._add_callback(reader)
752 if mask & selectors.EVENT_WRITE and writer is not None:
753 if writer._cancelled:
754 self._remove_writer(fileobj)
755 else:
756 self._add_callback(writer)
757
758 def _stop_serving(self, sock):
759 self._remove_reader(sock.fileno())
760 sock.close()
761
762
763class _SelectorTransport(transports._FlowControlMixin,
764 transports.Transport):
765
766 max_size = 256 * 1024 # Buffer size passed to recv().
767
768 # Attribute used in the destructor: it must be set even if the constructor
769 # is not called (see _SelectorSslTransport which may start by raising an
770 # exception)
771 _sock = None
772
773 def __init__(self, loop, sock, protocol, extra=None, server=None):
774 super().__init__(extra, loop)
775 self._extra['socket'] = trsock.TransportSocket(sock)
776 try:
777 self._extra['sockname'] = sock.getsockname()
778 except OSError:
779 self._extra['sockname'] = None
780 if 'peername' not in self._extra:
781 try:
782 self._extra['peername'] = sock.getpeername()
783 except socket.error:
784 self._extra['peername'] = None
785 self._sock = sock
786 self._sock_fd = sock.fileno()
787
788 self._protocol_connected = False
789 self.set_protocol(protocol)
790
791 self._server = server
792 self._buffer = collections.deque()
793 self._conn_lost = 0 # Set when call to connection_lost scheduled.
794 self._closing = False # Set when close() called.
795 self._paused = False # Set when pause_reading() called
796
797 if self._server is not None:
798 self._server._attach(self)
799 loop._transports[self._sock_fd] = self
800
801 def __repr__(self):
802 info = [self.__class__.__name__]
803 if self._sock is None:
804 info.append('closed')
805 elif self._closing:
806 info.append('closing')
807 info.append(f'fd={self._sock_fd}')
808 # test if the transport was closed
809 if self._loop is not None and not self._loop.is_closed():
810 polling = _test_selector_event(self._loop._selector,
811 self._sock_fd, selectors.EVENT_READ)
812 if polling:
813 info.append('read=polling')
814 else:
815 info.append('read=idle')
816
817 polling = _test_selector_event(self._loop._selector,
818 self._sock_fd,
819 selectors.EVENT_WRITE)
820 if polling:
821 state = 'polling'
822 else:
823 state = 'idle'
824
825 bufsize = self.get_write_buffer_size()
826 info.append(f'write=<{state}, bufsize={bufsize}>')
827 return '<{}>'.format(' '.join(info))
828
829 def abort(self):
830 self._force_close(None)
831
832 def set_protocol(self, protocol):
833 self._protocol = protocol
834 self._protocol_connected = True
835
836 def get_protocol(self):
837 return self._protocol
838
839 def is_closing(self):
840 return self._closing
841
842 def is_reading(self):
843 return not self.is_closing() and not self._paused
844
845 def pause_reading(self):
846 if not self.is_reading():
847 return
848 self._paused = True
849 self._loop._remove_reader(self._sock_fd)
850 if self._loop.get_debug():
851 logger.debug("%r pauses reading", self)
852
853 def resume_reading(self):
854 if self._closing or not self._paused:
855 return
856 self._paused = False
857 self._add_reader(self._sock_fd, self._read_ready)
858 if self._loop.get_debug():
859 logger.debug("%r resumes reading", self)
860
861 def close(self):
862 if self._closing:
863 return
864 self._closing = True
865 self._loop._remove_reader(self._sock_fd)
866 if not self._buffer:
867 self._conn_lost += 1
868 self._loop._remove_writer(self._sock_fd)
869 self._loop.call_soon(self._call_connection_lost, None)
870
871 def __del__(self, _warn=warnings.warn):
872 if self._sock is not None:
873 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
874 self._sock.close()
875 if self._server is not None:
876 self._server._detach(self)
877
878 def _fatal_error(self, exc, message='Fatal error on transport'):
879 # Should be called from exception handler only.
880 if isinstance(exc, OSError):
881 if self._loop.get_debug():
882 logger.debug("%r: %s", self, message, exc_info=True)
883 else:
884 self._loop.call_exception_handler({
885 'message': message,
886 'exception': exc,
887 'transport': self,
888 'protocol': self._protocol,
889 })
890 self._force_close(exc)
891
892 def _force_close(self, exc):
893 if self._conn_lost:
894 return
895 if self._buffer:
896 self._buffer.clear()
897 self._loop._remove_writer(self._sock_fd)
898 if not self._closing:
899 self._closing = True
900 self._loop._remove_reader(self._sock_fd)
901 self._conn_lost += 1
902 self._loop.call_soon(self._call_connection_lost, exc)
903
904 def _call_connection_lost(self, exc):
905 try:
906 if self._protocol_connected:
907 self._protocol.connection_lost(exc)
908 finally:
909 self._sock.close()
910 self._sock = None
911 self._protocol = None
912 self._loop = None
913 server = self._server
914 if server is not None:
915 server._detach(self)
916 self._server = None
917
918 def get_write_buffer_size(self):
919 return sum(map(len, self._buffer))
920
921 def _add_reader(self, fd, callback, *args):
922 if not self.is_reading():
923 return
924 self._loop._add_reader(fd, callback, *args)
925
926
927class _SelectorSocketTransport(_SelectorTransport):
928
929 _start_tls_compatible = True
930 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
931
932 def __init__(self, loop, sock, protocol, waiter=None,
933 extra=None, server=None):
934
935 self._read_ready_cb = None
936 super().__init__(loop, sock, protocol, extra, server)
937 self._eof = False
938 self._empty_waiter = None
939 if _HAS_SENDMSG:
940 self._write_ready = self._write_sendmsg
941 else:
942 self._write_ready = self._write_send
943 # Disable the Nagle algorithm -- small writes will be
944 # sent without waiting for the TCP ACK. This generally
945 # decreases the latency (in some cases significantly.)
946 base_events._set_nodelay(self._sock)
947
948 self._loop.call_soon(self._protocol.connection_made, self)
949 # only start reading when connection_made() has been called
950 self._loop.call_soon(self._add_reader,
951 self._sock_fd, self._read_ready)
952 if waiter is not None:
953 # only wake up the waiter when connection_made() has been called
954 self._loop.call_soon(futures._set_result_unless_cancelled,
955 waiter, None)
956
957 def set_protocol(self, protocol):
958 if isinstance(protocol, protocols.BufferedProtocol):
959 self._read_ready_cb = self._read_ready__get_buffer
960 else:
961 self._read_ready_cb = self._read_ready__data_received
962
963 super().set_protocol(protocol)
964
965 def _read_ready(self):
966 self._read_ready_cb()
967
968 def _read_ready__get_buffer(self):
969 if self._conn_lost:
970 return
971
972 try:
973 buf = self._protocol.get_buffer(-1)
974 if not len(buf):
975 raise RuntimeError('get_buffer() returned an empty buffer')
976 except (SystemExit, KeyboardInterrupt):
977 raise
978 except BaseException as exc:
979 self._fatal_error(
980 exc, 'Fatal error: protocol.get_buffer() call failed.')
981 return
982
983 try:
984 nbytes = self._sock.recv_into(buf)
985 except (BlockingIOError, InterruptedError):
986 return
987 except (SystemExit, KeyboardInterrupt):
988 raise
989 except BaseException as exc:
990 self._fatal_error(exc, 'Fatal read error on socket transport')
991 return
992
993 if not nbytes:
994 self._read_ready__on_eof()
995 return
996
997 try:
998 self._protocol.buffer_updated(nbytes)
999 except (SystemExit, KeyboardInterrupt):
1000 raise
1001 except BaseException as exc:
1002 self._fatal_error(
1003 exc, 'Fatal error: protocol.buffer_updated() call failed.')
1004
1005 def _read_ready__data_received(self):
1006 if self._conn_lost:
1007 return
1008 try:
1009 data = self._sock.recv(self.max_size)
1010 except (BlockingIOError, InterruptedError):
1011 return
1012 except (SystemExit, KeyboardInterrupt):
1013 raise
1014 except BaseException as exc:
1015 self._fatal_error(exc, 'Fatal read error on socket transport')
1016 return
1017
1018 if not data:
1019 self._read_ready__on_eof()
1020 return
1021
1022 try:
1023 self._protocol.data_received(data)
1024 except (SystemExit, KeyboardInterrupt):
1025 raise
1026 except BaseException as exc:
1027 self._fatal_error(
1028 exc, 'Fatal error: protocol.data_received() call failed.')
1029
1030 def _read_ready__on_eof(self):
1031 if self._loop.get_debug():
1032 logger.debug("%r received EOF", self)
1033
1034 try:
1035 keep_open = self._protocol.eof_received()
1036 except (SystemExit, KeyboardInterrupt):
1037 raise
1038 except BaseException as exc:
1039 self._fatal_error(
1040 exc, 'Fatal error: protocol.eof_received() call failed.')
1041 return
1042
1043 if keep_open:
1044 # We're keeping the connection open so the
1045 # protocol can write more, but we still can't
1046 # receive more, so remove the reader callback.
1047 self._loop._remove_reader(self._sock_fd)
1048 else:
1049 self.close()
1050
1051 def write(self, data):
1052 if not isinstance(data, (bytes, bytearray, memoryview)):
1053 raise TypeError(f'data argument must be a bytes-like object, '
1054 f'not {type(data).__name__!r}')
1055 if self._eof:
1056 raise RuntimeError('Cannot call write() after write_eof()')
1057 if self._empty_waiter is not None:
1058 raise RuntimeError('unable to write; sendfile is in progress')
1059 if not data:
1060 return
1061
1062 if self._conn_lost:
1063 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1064 logger.warning('socket.send() raised exception.')
1065 self._conn_lost += 1
1066 return
1067
1068 if not self._buffer:
1069 # Optimization: try to send now.
1070 try:
1071 n = self._sock.send(data)
1072 except (BlockingIOError, InterruptedError):
1073 pass
1074 except (SystemExit, KeyboardInterrupt):
1075 raise
1076 except BaseException as exc:
1077 self._fatal_error(exc, 'Fatal write error on socket transport')
1078 return
1079 else:
1080 data = memoryview(data)[n:]
1081 if not data:
1082 return
1083 # Not all was written; register write handler.
1084 self._loop._add_writer(self._sock_fd, self._write_ready)
1085
1086 # Add it to the buffer.
1087 self._buffer.append(data)
1088 self._maybe_pause_protocol()
1089
1090 def _get_sendmsg_buffer(self):
1091 return itertools.islice(self._buffer, SC_IOV_MAX)
1092
1093 def _write_sendmsg(self):
1094 assert self._buffer, 'Data should not be empty'
1095 if self._conn_lost:
1096 return
1097 try:
1098 nbytes = self._sock.sendmsg(self._get_sendmsg_buffer())
1099 self._adjust_leftover_buffer(nbytes)
1100 except (BlockingIOError, InterruptedError):
1101 pass
1102 except (SystemExit, KeyboardInterrupt):
1103 raise
1104 except BaseException as exc:
1105 self._loop._remove_writer(self._sock_fd)
1106 self._buffer.clear()
1107 self._fatal_error(exc, 'Fatal write error on socket transport')
1108 if self._empty_waiter is not None:
1109 self._empty_waiter.set_exception(exc)
1110 else:
1111 self._maybe_resume_protocol() # May append to buffer.
1112 if not self._buffer:
1113 self._loop._remove_writer(self._sock_fd)
1114 if self._empty_waiter is not None:
1115 self._empty_waiter.set_result(None)
1116 if self._closing:
1117 self._call_connection_lost(None)
1118 elif self._eof:
1119 self._sock.shutdown(socket.SHUT_WR)
1120
1121 def _adjust_leftover_buffer(self, nbytes: int) -> None:
1122 buffer = self._buffer
1123 while nbytes:
1124 b = buffer.popleft()
1125 b_len = len(b)
1126 if b_len <= nbytes:
1127 nbytes -= b_len
1128 else:
1129 buffer.appendleft(b[nbytes:])
1130 break
1131
1132 def _write_send(self):
1133 assert self._buffer, 'Data should not be empty'
1134 if self._conn_lost:
1135 return
1136 try:
1137 buffer = self._buffer.popleft()
1138 n = self._sock.send(buffer)
1139 if n != len(buffer):
1140 # Not all data was written
1141 self._buffer.appendleft(buffer[n:])
1142 except (BlockingIOError, InterruptedError):
1143 pass
1144 except (SystemExit, KeyboardInterrupt):
1145 raise
1146 except BaseException as exc:
1147 self._loop._remove_writer(self._sock_fd)
1148 self._buffer.clear()
1149 self._fatal_error(exc, 'Fatal write error on socket transport')
1150 if self._empty_waiter is not None:
1151 self._empty_waiter.set_exception(exc)
1152 else:
1153 self._maybe_resume_protocol() # May append to buffer.
1154 if not self._buffer:
1155 self._loop._remove_writer(self._sock_fd)
1156 if self._empty_waiter is not None:
1157 self._empty_waiter.set_result(None)
1158 if self._closing:
1159 self._call_connection_lost(None)
1160 elif self._eof:
1161 self._sock.shutdown(socket.SHUT_WR)
1162
1163 def write_eof(self):
1164 if self._closing or self._eof:
1165 return
1166 self._eof = True
1167 if not self._buffer:
1168 self._sock.shutdown(socket.SHUT_WR)
1169
1170 def writelines(self, list_of_data):
1171 if self._eof:
1172 raise RuntimeError('Cannot call writelines() after write_eof()')
1173 if self._empty_waiter is not None:
1174 raise RuntimeError('unable to writelines; sendfile is in progress')
1175 if not list_of_data:
1176 return
1177 self._buffer.extend([memoryview(data) for data in list_of_data])
1178 self._write_ready()
1179 # If the entire buffer couldn't be written, register a write handler
1180 if self._buffer:
1181 self._loop._add_writer(self._sock_fd, self._write_ready)
1182 self._maybe_pause_protocol()
1183
1184 def can_write_eof(self):
1185 return True
1186
1187 def _call_connection_lost(self, exc):
1188 try:
1189 super()._call_connection_lost(exc)
1190 finally:
1191 self._write_ready = None
1192 if self._empty_waiter is not None:
1193 self._empty_waiter.set_exception(
1194 ConnectionError("Connection is closed by peer"))
1195
1196 def _make_empty_waiter(self):
1197 if self._empty_waiter is not None:
1198 raise RuntimeError("Empty waiter is already set")
1199 self._empty_waiter = self._loop.create_future()
1200 if not self._buffer:
1201 self._empty_waiter.set_result(None)
1202 return self._empty_waiter
1203
1204 def _reset_empty_waiter(self):
1205 self._empty_waiter = None
1206
1207 def close(self):
1208 self._read_ready_cb = None
1209 super().close()
1210
1211
1212class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):
1213
1214 _buffer_factory = collections.deque
1215
1216 def __init__(self, loop, sock, protocol, address=None,
1217 waiter=None, extra=None):
1218 super().__init__(loop, sock, protocol, extra)
1219 self._address = address
1220 self._buffer_size = 0
1221 self._loop.call_soon(self._protocol.connection_made, self)
1222 # only start reading when connection_made() has been called
1223 self._loop.call_soon(self._add_reader,
1224 self._sock_fd, self._read_ready)
1225 if waiter is not None:
1226 # only wake up the waiter when connection_made() has been called
1227 self._loop.call_soon(futures._set_result_unless_cancelled,
1228 waiter, None)
1229
1230 def get_write_buffer_size(self):
1231 return self._buffer_size
1232
1233 def _read_ready(self):
1234 if self._conn_lost:
1235 return
1236 try:
1237 data, addr = self._sock.recvfrom(self.max_size)
1238 except (BlockingIOError, InterruptedError):
1239 pass
1240 except OSError as exc:
1241 self._protocol.error_received(exc)
1242 except (SystemExit, KeyboardInterrupt):
1243 raise
1244 except BaseException as exc:
1245 self._fatal_error(exc, 'Fatal read error on datagram transport')
1246 else:
1247 self._protocol.datagram_received(data, addr)
1248
1249 def sendto(self, data, addr=None):
1250 if not isinstance(data, (bytes, bytearray, memoryview)):
1251 raise TypeError(f'data argument must be a bytes-like object, '
1252 f'not {type(data).__name__!r}')
1253
1254 if self._address:
1255 if addr not in (None, self._address):
1256 raise ValueError(
1257 f'Invalid address: must be None or {self._address}')
1258 addr = self._address
1259
1260 if self._conn_lost and self._address:
1261 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1262 logger.warning('socket.send() raised exception.')
1263 self._conn_lost += 1
1264 return
1265
1266 if not self._buffer:
1267 # Attempt to send it right away first.
1268 try:
1269 if self._extra['peername']:
1270 self._sock.send(data)
1271 else:
1272 self._sock.sendto(data, addr)
1273 return
1274 except (BlockingIOError, InterruptedError):
1275 self._loop._add_writer(self._sock_fd, self._sendto_ready)
1276 except OSError as exc:
1277 self._protocol.error_received(exc)
1278 return
1279 except (SystemExit, KeyboardInterrupt):
1280 raise
1281 except BaseException as exc:
1282 self._fatal_error(
1283 exc, 'Fatal write error on datagram transport')
1284 return
1285
1286 # Ensure that what we buffer is immutable.
1287 self._buffer.append((bytes(data), addr))
1288 self._buffer_size += len(data) + 8 # include header bytes
1289 self._maybe_pause_protocol()
1290
1291 def _sendto_ready(self):
1292 while self._buffer:
1293 data, addr = self._buffer.popleft()
1294 self._buffer_size -= len(data)
1295 try:
1296 if self._extra['peername']:
1297 self._sock.send(data)
1298 else:
1299 self._sock.sendto(data, addr)
1300 except (BlockingIOError, InterruptedError):
1301 self._buffer.appendleft((data, addr)) # Try again later.
1302 self._buffer_size += len(data)
1303 break
1304 except OSError as exc:
1305 self._protocol.error_received(exc)
1306 return
1307 except (SystemExit, KeyboardInterrupt):
1308 raise
1309 except BaseException as exc:
1310 self._fatal_error(
1311 exc, 'Fatal write error on datagram transport')
1312 return
1313
1314 self._maybe_resume_protocol() # May append to buffer.
1315 if not self._buffer:
1316 self._loop._remove_writer(self._sock_fd)
1317 if self._closing:
1318 self._call_connection_lost(None)