]> git.ipfire.org Git - thirdparty/Python/cpython.git/blame - Lib/asyncio/selector_events.py
gh-102555: Fix comment parsing in HTMLParser according to the HTML5 standard (GH...
[thirdparty/Python/cpython.git] / Lib / asyncio / selector_events.py
CommitLineData
27b7c7eb
GR
1"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer. For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
6370f345 7__all__ = 'BaseSelectorEventLoop',
8dffc456 8
27b7c7eb 9import collections
3317a132 10import errno
d5aeccf9 11import functools
c122390a
KA
12import itertools
13import os
4271dfd7 14import selectors
27b7c7eb 15import socket
978a9afc 16import warnings
5b8d4f97 17import weakref
27b7c7eb
GR
18try:
19 import ssl
20except ImportError: # pragma: no cover
21 ssl = None
22
23from . import base_events
24from . import constants
25from . import events
26from . import futures
631fd38d 27from . import protocols
231b404c 28from . import sslproto
631fd38d 29from . import transports
8cd5165b 30from . import trsock
fc29e0f3 31from .log import logger
27b7c7eb 32
c122390a
KA
33_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg')
34
35if _HAS_SENDMSG:
36 try:
37 SC_IOV_MAX = os.sysconf('SC_IOV_MAX')
38 except OSError:
39 # Fallback to send
40 _HAS_SENDMSG = False
27b7c7eb 41
e912e652
VS
42def _test_selector_event(selector, fd, event):
43 # Test if the selector is monitoring 'event' events
44 # for the file descriptor 'fd'.
45 try:
46 key = selector.get_key(fd)
47 except KeyError:
48 return False
49 else:
50 return bool(key.events & event)
51
52
27b7c7eb
GR
53class BaseSelectorEventLoop(base_events.BaseEventLoop):
54 """Selector event loop.
55
56 See events.EventLoop for API specification.
57 """
58
59 def __init__(self, selector=None):
60 super().__init__()
61
62 if selector is None:
63 selector = selectors.DefaultSelector()
fc29e0f3 64 logger.debug('Using selector: %s', selector.__class__.__name__)
27b7c7eb
GR
65 self._selector = selector
66 self._make_self_pipe()
5b8d4f97 67 self._transports = weakref.WeakValueDictionary()
27b7c7eb
GR
68
69 def _make_socket_transport(self, sock, protocol, waiter=None, *,
70 extra=None, server=None):
c0627640 71 self._ensure_fd_no_transport(sock)
27b7c7eb
GR
72 return _SelectorSocketTransport(self, sock, protocol, waiter,
73 extra, server)
74
f7686c1f
NA
75 def _make_ssl_transport(
76 self, rawsock, protocol, sslcontext, waiter=None,
77 *, server_side=False, server_hostname=None,
78 extra=None, server=None,
13c10bfb
KA
79 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
80 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
81 ):
c0627640 82 self._ensure_fd_no_transport(rawsock)
f7686c1f 83 ssl_protocol = sslproto.SSLProtocol(
13c10bfb
KA
84 self, protocol, sslcontext, waiter,
85 server_side, server_hostname,
86 ssl_handshake_timeout=ssl_handshake_timeout,
87 ssl_shutdown_timeout=ssl_shutdown_timeout
88 )
231b404c
VS
89 _SelectorSocketTransport(self, rawsock, ssl_protocol,
90 extra=extra, server=server)
91 return ssl_protocol._app_transport
92
27b7c7eb 93 def _make_datagram_transport(self, sock, protocol,
bfff45d6 94 address=None, waiter=None, extra=None):
c0627640 95 self._ensure_fd_no_transport(sock)
bfff45d6
VS
96 return _SelectorDatagramTransport(self, sock, protocol,
97 address, waiter, extra)
27b7c7eb
GR
98
99 def close(self):
956de691 100 if self.is_running():
5e63120f 101 raise RuntimeError("Cannot close a running event loop")
bb2fc5b2
VS
102 if self.is_closed():
103 return
104 self._close_self_pipe()
5e63120f 105 super().close()
27b7c7eb 106 if self._selector is not None:
27b7c7eb
GR
107 self._selector.close()
108 self._selector = None
109
27b7c7eb 110 def _close_self_pipe(self):
5b8d4f97 111 self._remove_reader(self._ssock.fileno())
27b7c7eb
GR
112 self._ssock.close()
113 self._ssock = None
114 self._csock.close()
115 self._csock = None
116 self._internal_fds -= 1
117
118 def _make_self_pipe(self):
119 # A self-socket, really. :-)
a10dc3ef 120 self._ssock, self._csock = socket.socketpair()
27b7c7eb
GR
121 self._ssock.setblocking(False)
122 self._csock.setblocking(False)
123 self._internal_fds += 1
5b8d4f97 124 self._add_reader(self._ssock.fileno(), self._read_from_self)
27b7c7eb 125
fe5649c7
VS
126 def _process_self_data(self, data):
127 pass
128
27b7c7eb 129 def _read_from_self(self):
54c4b8e5
VS
130 while True:
131 try:
132 data = self._ssock.recv(4096)
133 if not data:
134 break
fe5649c7 135 self._process_self_data(data)
54c4b8e5
VS
136 except InterruptedError:
137 continue
138 except BlockingIOError:
139 break
27b7c7eb
GR
140
141 def _write_to_self(self):
3d139d8e
GR
142 # This may be called from a different thread, possibly after
143 # _close_self_pipe() has been called or even while it is
144 # running. Guard for self._csock being None or closed. When
145 # a socket is closed, send() raises OSError (with errno set to
146 # EBADF, but let's not rely on the exact error code).
147 csock = self._csock
1b0f0e3d
VS
148 if csock is None:
149 return
150
151 try:
152 csock.send(b'\0')
153 except OSError:
154 if self._debug:
155 logger.debug("Fail to write a null byte into the "
156 "self-pipe socket",
157 exc_info=True)
27b7c7eb 158
28dff0d8 159 def _start_serving(self, protocol_factory, sock,
f7686c1f 160 sslcontext=None, server=None, backlog=100,
13c10bfb
KA
161 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
162 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
5b8d4f97 163 self._add_reader(sock.fileno(), self._accept_connection,
f7686c1f 164 protocol_factory, sock, sslcontext, server, backlog,
13c10bfb 165 ssl_handshake_timeout, ssl_shutdown_timeout)
27b7c7eb 166
f7686c1f
NA
167 def _accept_connection(
168 self, protocol_factory, sock,
169 sslcontext=None, server=None, backlog=100,
13c10bfb
KA
170 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
171 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
a1b0e7db
YS
172 # This method is only called once for each event loop tick where the
173 # listening socket has triggered an EVENT_READ. There may be multiple
174 # connections waiting for an .accept() so it is called in a loop.
175 # See https://bugs.python.org/issue27906 for more details.
109f7597 176 for _ in range(backlog + 1):
a1b0e7db
YS
177 try:
178 conn, addr = sock.accept()
179 if self._debug:
180 logger.debug("%r got a new connection from %r: %r",
181 server, addr, conn)
182 conn.setblocking(False)
830e1065 183 except ConnectionAbortedError:
184 # Discard connections that were aborted before accept().
185 continue
186 except (BlockingIOError, InterruptedError):
187 # Early exit because of a signal or
188 # the socket accept buffer is empty.
189 return
a1b0e7db
YS
190 except OSError as exc:
191 # There's nowhere to send the error, so just log it.
192 if exc.errno in (errno.EMFILE, errno.ENFILE,
193 errno.ENOBUFS, errno.ENOMEM):
194 # Some platforms (e.g. Linux keep reporting the FD as
195 # ready, so we remove the read handler temporarily.
196 # We'll try again in a while.
197 self.call_exception_handler({
198 'message': 'socket.accept() out of system resource',
199 'exception': exc,
8cd5165b 200 'socket': trsock.TransportSocket(sock),
a1b0e7db 201 })
5b8d4f97 202 self._remove_reader(sock.fileno())
a1b0e7db
YS
203 self.call_later(constants.ACCEPT_RETRY_DELAY,
204 self._start_serving,
205 protocol_factory, sock, sslcontext, server,
13c10bfb
KA
206 backlog, ssl_handshake_timeout,
207 ssl_shutdown_timeout)
a1b0e7db
YS
208 else:
209 raise # The event loop will catch, log and ignore it.
3317a132 210 else:
a1b0e7db 211 extra = {'peername': addr}
6370f345 212 accept = self._accept_connection2(
f7686c1f 213 protocol_factory, conn, extra, sslcontext, server,
13c10bfb 214 ssl_handshake_timeout, ssl_shutdown_timeout)
a1b0e7db 215 self.create_task(accept)
2934262f 216
f7686c1f
NA
217 async def _accept_connection2(
218 self, protocol_factory, conn, extra,
219 sslcontext=None, server=None,
13c10bfb
KA
220 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
221 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
2934262f
VS
222 protocol = None
223 transport = None
224 try:
29ad0111 225 protocol = protocol_factory()
7661db62 226 waiter = self.create_future()
28dff0d8 227 if sslcontext:
2934262f
VS
228 transport = self._make_ssl_transport(
229 conn, protocol, sslcontext, waiter=waiter,
f7686c1f 230 server_side=True, extra=extra, server=server,
13c10bfb
KA
231 ssl_handshake_timeout=ssl_handshake_timeout,
232 ssl_shutdown_timeout=ssl_shutdown_timeout)
27b7c7eb 233 else:
2934262f
VS
234 transport = self._make_socket_transport(
235 conn, protocol, waiter=waiter, extra=extra,
27b7c7eb 236 server=server)
2934262f
VS
237
238 try:
5f841b55 239 await waiter
431b540b 240 except BaseException:
2934262f 241 transport.close()
80aa7b36
JP
242 # gh-109534: When an exception is raised by the SSLProtocol object the
243 # exception set in this future can keep the protocol object alive and
244 # cause a reference cycle.
245 waiter = None
2934262f 246 raise
431b540b 247 # It's now up to the protocol to handle the connection.
2934262f 248
431b540b
YS
249 except (SystemExit, KeyboardInterrupt):
250 raise
251 except BaseException as exc:
aa41b9b2 252 if self._debug:
2934262f 253 context = {
6370f345
YS
254 'message':
255 'Error on transport creation for incoming connection',
2934262f
VS
256 'exception': exc,
257 }
258 if protocol is not None:
259 context['protocol'] = protocol
260 if transport is not None:
261 context['transport'] = transport
262 self.call_exception_handler(context)
27b7c7eb 263
5b8d4f97 264 def _ensure_fd_no_transport(self, fd):
ce12629c
YS
265 fileno = fd
266 if not isinstance(fileno, int):
267 try:
268 fileno = int(fileno.fileno())
269 except (AttributeError, TypeError, ValueError):
270 # This code matches selectors._fileobj_to_fd function.
6370f345 271 raise ValueError(f"Invalid file object: {fd!r}") from None
7e2d93f3
NK
272 transport = self._transports.get(fileno)
273 if transport and not transport.is_closing():
274 raise RuntimeError(
275 f'File descriptor {fd!r} is used by transport '
276 f'{transport!r}')
5b8d4f97
YS
277
278 def _add_reader(self, fd, callback, *args):
bb2fc5b2 279 self._check_closed()
f23746a9 280 handle = events.Handle(callback, args, self, None)
b7dc795d
NK
281 key = self._selector.get_map().get(fd)
282 if key is None:
27b7c7eb
GR
283 self._selector.register(fd, selectors.EVENT_READ,
284 (handle, None))
285 else:
286 mask, (reader, writer) = key.events, key.data
287 self._selector.modify(fd, mask | selectors.EVENT_READ,
288 (handle, writer))
289 if reader is not None:
290 reader.cancel()
210a1373 291 return handle
27b7c7eb 292
5b8d4f97 293 def _remove_reader(self, fd):
bb2fc5b2 294 if self.is_closed():
eeeebcd8 295 return False
b7dc795d
NK
296 key = self._selector.get_map().get(fd)
297 if key is None:
27b7c7eb 298 return False
b7dc795d
NK
299 mask, (reader, writer) = key.events, key.data
300 mask &= ~selectors.EVENT_READ
301 if not mask:
302 self._selector.unregister(fd)
27b7c7eb 303 else:
b7dc795d 304 self._selector.modify(fd, mask, (None, writer))
27b7c7eb 305
b7dc795d
NK
306 if reader is not None:
307 reader.cancel()
308 return True
309 else:
310 return False
27b7c7eb 311
5b8d4f97 312 def _add_writer(self, fd, callback, *args):
bb2fc5b2 313 self._check_closed()
f23746a9 314 handle = events.Handle(callback, args, self, None)
b7dc795d
NK
315 key = self._selector.get_map().get(fd)
316 if key is None:
27b7c7eb
GR
317 self._selector.register(fd, selectors.EVENT_WRITE,
318 (None, handle))
319 else:
320 mask, (reader, writer) = key.events, key.data
321 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
322 (reader, handle))
323 if writer is not None:
324 writer.cancel()
210a1373 325 return handle
27b7c7eb 326
5b8d4f97 327 def _remove_writer(self, fd):
27b7c7eb 328 """Remove a writer callback."""
bb2fc5b2 329 if self.is_closed():
eeeebcd8 330 return False
b7dc795d
NK
331 key = self._selector.get_map().get(fd)
332 if key is None:
27b7c7eb 333 return False
b7dc795d
NK
334 mask, (reader, writer) = key.events, key.data
335 # Remove both writer and connector.
336 mask &= ~selectors.EVENT_WRITE
337 if not mask:
338 self._selector.unregister(fd)
27b7c7eb 339 else:
b7dc795d 340 self._selector.modify(fd, mask, (reader, None))
27b7c7eb 341
b7dc795d
NK
342 if writer is not None:
343 writer.cancel()
344 return True
345 else:
346 return False
27b7c7eb 347
5b8d4f97
YS
348 def add_reader(self, fd, callback, *args):
349 """Add a reader callback."""
350 self._ensure_fd_no_transport(fd)
210a1373 351 self._add_reader(fd, callback, *args)
5b8d4f97
YS
352
353 def remove_reader(self, fd):
354 """Remove a reader callback."""
355 self._ensure_fd_no_transport(fd)
356 return self._remove_reader(fd)
357
358 def add_writer(self, fd, callback, *args):
359 """Add a writer callback.."""
360 self._ensure_fd_no_transport(fd)
210a1373 361 self._add_writer(fd, callback, *args)
5b8d4f97
YS
362
363 def remove_writer(self, fd):
364 """Remove a writer callback."""
365 self._ensure_fd_no_transport(fd)
366 return self._remove_writer(fd)
367
19a44f63 368 async def sock_recv(self, sock, n):
d143209d
VS
369 """Receive data from the socket.
370
371 The return value is a bytes object representing the data received.
372 The maximum amount of data to be received at once is specified by
373 nbytes.
d143209d 374 """
1f9d4c93 375 base_events._check_ssl_socket(sock)
aa41b9b2 376 if self._debug and sock.gettimeout() != 0:
9c9f1f10 377 raise ValueError("the socket must be non-blocking")
74387926
AS
378 try:
379 return sock.recv(n)
380 except (BlockingIOError, InterruptedError):
381 pass
7661db62 382 fut = self.create_future()
74387926 383 fd = sock.fileno()
210a1373
FK
384 self._ensure_fd_no_transport(fd)
385 handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
74387926 386 fut.add_done_callback(
210a1373 387 functools.partial(self._sock_read_done, fd, handle=handle))
19a44f63 388 return await fut
27b7c7eb 389
210a1373
FK
390 def _sock_read_done(self, fd, fut, handle=None):
391 if handle is None or not handle.cancelled():
392 self.remove_reader(fd)
74387926
AS
393
394 def _sock_recv(self, fut, sock, n):
28773465 395 # _sock_recv() can add itself as an I/O callback if the operation can't
b0b0e628 396 # be done immediately. Don't use it directly, call sock_recv().
74387926 397 if fut.done():
27b7c7eb
GR
398 return
399 try:
400 data = sock.recv(n)
401 except (BlockingIOError, InterruptedError):
74387926 402 return # try again next time
431b540b
YS
403 except (SystemExit, KeyboardInterrupt):
404 raise
405 except BaseException as exc:
27b7c7eb
GR
406 fut.set_exception(exc)
407 else:
408 fut.set_result(data)
409
19a44f63 410 async def sock_recv_into(self, sock, buf):
525f40d2
AP
411 """Receive data from the socket.
412
413 The received data is written into *buf* (a writable buffer).
414 The return value is the number of bytes written.
525f40d2 415 """
1f9d4c93 416 base_events._check_ssl_socket(sock)
525f40d2
AP
417 if self._debug and sock.gettimeout() != 0:
418 raise ValueError("the socket must be non-blocking")
74387926
AS
419 try:
420 return sock.recv_into(buf)
421 except (BlockingIOError, InterruptedError):
422 pass
525f40d2 423 fut = self.create_future()
74387926 424 fd = sock.fileno()
210a1373
FK
425 self._ensure_fd_no_transport(fd)
426 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
74387926 427 fut.add_done_callback(
210a1373 428 functools.partial(self._sock_read_done, fd, handle=handle))
19a44f63 429 return await fut
525f40d2 430
74387926 431 def _sock_recv_into(self, fut, sock, buf):
525f40d2 432 # _sock_recv_into() can add itself as an I/O callback if the operation
6370f345
YS
433 # can't be done immediately. Don't use it directly, call
434 # sock_recv_into().
74387926 435 if fut.done():
525f40d2
AP
436 return
437 try:
438 nbytes = sock.recv_into(buf)
439 except (BlockingIOError, InterruptedError):
74387926 440 return # try again next time
431b540b
YS
441 except (SystemExit, KeyboardInterrupt):
442 raise
443 except BaseException as exc:
525f40d2
AP
444 fut.set_exception(exc)
445 else:
446 fut.set_result(nbytes)
447
9f04ee56
AG
448 async def sock_recvfrom(self, sock, bufsize):
449 """Receive a datagram from a datagram socket.
450
451 The return value is a tuple of (bytes, address) representing the
452 datagram received and the address it came from.
453 The maximum amount of data to be received at once is specified by
454 nbytes.
455 """
456 base_events._check_ssl_socket(sock)
457 if self._debug and sock.gettimeout() != 0:
458 raise ValueError("the socket must be non-blocking")
459 try:
460 return sock.recvfrom(bufsize)
461 except (BlockingIOError, InterruptedError):
462 pass
463 fut = self.create_future()
464 fd = sock.fileno()
465 self._ensure_fd_no_transport(fd)
466 handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
467 fut.add_done_callback(
468 functools.partial(self._sock_read_done, fd, handle=handle))
469 return await fut
470
471 def _sock_recvfrom(self, fut, sock, bufsize):
472 # _sock_recvfrom() can add itself as an I/O callback if the operation
473 # can't be done immediately. Don't use it directly, call
474 # sock_recvfrom().
475 if fut.done():
476 return
477 try:
478 result = sock.recvfrom(bufsize)
479 except (BlockingIOError, InterruptedError):
480 return # try again next time
481 except (SystemExit, KeyboardInterrupt):
482 raise
483 except BaseException as exc:
484 fut.set_exception(exc)
485 else:
486 fut.set_result(result)
487
488 async def sock_recvfrom_into(self, sock, buf, nbytes=0):
489 """Receive data from the socket.
490
491 The received data is written into *buf* (a writable buffer).
492 The return value is a tuple of (number of bytes written, address).
493 """
494 base_events._check_ssl_socket(sock)
495 if self._debug and sock.gettimeout() != 0:
496 raise ValueError("the socket must be non-blocking")
497 if not nbytes:
498 nbytes = len(buf)
22403d3a 499
9f04ee56
AG
500 try:
501 return sock.recvfrom_into(buf, nbytes)
502 except (BlockingIOError, InterruptedError):
503 pass
504 fut = self.create_future()
505 fd = sock.fileno()
506 self._ensure_fd_no_transport(fd)
507 handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
508 nbytes)
509 fut.add_done_callback(
510 functools.partial(self._sock_read_done, fd, handle=handle))
511 return await fut
512
513 def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
514 # _sock_recv_into() can add itself as an I/O callback if the operation
515 # can't be done immediately. Don't use it directly, call
516 # sock_recv_into().
517 if fut.done():
518 return
519 try:
520 result = sock.recvfrom_into(buf, bufsize)
521 except (BlockingIOError, InterruptedError):
522 return # try again next time
523 except (SystemExit, KeyboardInterrupt):
524 raise
525 except BaseException as exc:
526 fut.set_exception(exc)
527 else:
528 fut.set_result(result)
529
19a44f63 530 async def sock_sendall(self, sock, data):
d143209d
VS
531 """Send data to the socket.
532
533 The socket must be connected to a remote socket. This method continues
534 to send data from data until either all data has been sent or an
535 error occurs. None is returned on success. On error, an exception is
536 raised, and there is no way to determine how much data, if any, was
537 successfully processed by the receiving end of the connection.
d143209d 538 """
1f9d4c93 539 base_events._check_ssl_socket(sock)
aa41b9b2 540 if self._debug and sock.gettimeout() != 0:
9c9f1f10 541 raise ValueError("the socket must be non-blocking")
74387926
AS
542 try:
543 n = sock.send(data)
544 except (BlockingIOError, InterruptedError):
545 n = 0
546
547 if n == len(data):
548 # all data sent
549 return
74387926
AS
550
551 fut = self.create_future()
552 fd = sock.fileno()
210a1373 553 self._ensure_fd_no_transport(fd)
6e789002 554 # use a trick with a list in closure to store a mutable state
210a1373
FK
555 handle = self._add_writer(fd, self._sock_sendall, fut, sock,
556 memoryview(data), [n])
557 fut.add_done_callback(
558 functools.partial(self._sock_write_done, fd, handle=handle))
19a44f63 559 return await fut
27b7c7eb 560
6e789002 561 def _sock_sendall(self, fut, sock, view, pos):
74387926
AS
562 if fut.done():
563 # Future cancellation can be scheduled on previous loop iteration
27b7c7eb 564 return
6e789002 565 start = pos[0]
27b7c7eb 566 try:
6e789002 567 n = sock.send(view[start:])
27b7c7eb 568 except (BlockingIOError, InterruptedError):
74387926 569 return
431b540b
YS
570 except (SystemExit, KeyboardInterrupt):
571 raise
572 except BaseException as exc:
27b7c7eb
GR
573 fut.set_exception(exc)
574 return
575
6e789002
AS
576 start += n
577
578 if start == len(view):
27b7c7eb
GR
579 fut.set_result(None)
580 else:
6e789002 581 pos[0] = start
27b7c7eb 582
9f04ee56
AG
583 async def sock_sendto(self, sock, data, address):
584 """Send data to the socket.
585
586 The socket must be connected to a remote socket. This method continues
587 to send data from data until either all data has been sent or an
588 error occurs. None is returned on success. On error, an exception is
589 raised, and there is no way to determine how much data, if any, was
590 successfully processed by the receiving end of the connection.
591 """
592 base_events._check_ssl_socket(sock)
593 if self._debug and sock.gettimeout() != 0:
594 raise ValueError("the socket must be non-blocking")
595 try:
596 return sock.sendto(data, address)
597 except (BlockingIOError, InterruptedError):
598 pass
599
600 fut = self.create_future()
601 fd = sock.fileno()
602 self._ensure_fd_no_transport(fd)
603 # use a trick with a list in closure to store a mutable state
604 handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
605 address)
606 fut.add_done_callback(
607 functools.partial(self._sock_write_done, fd, handle=handle))
608 return await fut
609
610 def _sock_sendto(self, fut, sock, data, address):
611 if fut.done():
612 # Future cancellation can be scheduled on previous loop iteration
613 return
614 try:
615 n = sock.sendto(data, 0, address)
616 except (BlockingIOError, InterruptedError):
617 return
618 except (SystemExit, KeyboardInterrupt):
619 raise
620 except BaseException as exc:
621 fut.set_exception(exc)
622 else:
623 fut.set_result(n)
624
5f841b55 625 async def sock_connect(self, sock, address):
d143209d
VS
626 """Connect to a remote socket at address.
627
d143209d
VS
628 This method is a coroutine.
629 """
1f9d4c93 630 base_events._check_ssl_socket(sock)
aa41b9b2 631 if self._debug and sock.gettimeout() != 0:
9c9f1f10 632 raise ValueError("the socket must be non-blocking")
f1c6fa98 633
5c30388f
VB
634 if sock.family == socket.AF_INET or (
635 base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
19a44f63 636 resolved = await self._ensure_resolved(
8fb94893
TG
637 address, family=sock.family, type=sock.type, proto=sock.proto,
638 loop=self,
639 )
19a44f63 640 _, _, _, _, address = resolved[0]
d6c6771f
YS
641
642 fut = self.create_future()
643 self._sock_connect(fut, sock, address)
995f6170
DUK
644 try:
645 return await fut
646 finally:
647 # Needed to break cycles when an exception occurs.
648 fut = None
27b7c7eb 649
d5aeccf9 650 def _sock_connect(self, fut, sock, address):
27b7c7eb 651 fd = sock.fileno()
d5aeccf9 652 try:
c9d11c34
VS
653 sock.connect(address)
654 except (BlockingIOError, InterruptedError):
655 # Issue #23618: When the C function connect() fails with EINTR, the
656 # connection runs in background. We have to wait until the socket
657 # becomes writable to be notified when the connection succeed or
658 # fails.
210a1373
FK
659 self._ensure_fd_no_transport(fd)
660 handle = self._add_writer(
661 fd, self._sock_connect_cb, fut, sock, address)
d6c6771f 662 fut.add_done_callback(
210a1373 663 functools.partial(self._sock_write_done, fd, handle=handle))
431b540b
YS
664 except (SystemExit, KeyboardInterrupt):
665 raise
666 except BaseException as exc:
d5aeccf9
VS
667 fut.set_exception(exc)
668 else:
669 fut.set_result(None)
995f6170
DUK
670 finally:
671 fut = None
d5aeccf9 672
210a1373
FK
673 def _sock_write_done(self, fd, fut, handle=None):
674 if handle is None or not handle.cancelled():
675 self.remove_writer(fd)
d5aeccf9
VS
676
677 def _sock_connect_cb(self, fut, sock, address):
74387926 678 if fut.done():
27b7c7eb 679 return
d5aeccf9 680
27b7c7eb 681 try:
d5aeccf9
VS
682 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
683 if err != 0:
684 # Jump to any except clause below.
6370f345 685 raise OSError(err, f'Connect call failed {address}')
27b7c7eb 686 except (BlockingIOError, InterruptedError):
d5aeccf9
VS
687 # socket is still registered, the callback will be retried later
688 pass
431b540b
YS
689 except (SystemExit, KeyboardInterrupt):
690 raise
691 except BaseException as exc:
27b7c7eb
GR
692 fut.set_exception(exc)
693 else:
694 fut.set_result(None)
995f6170
DUK
695 finally:
696 fut = None
27b7c7eb 697
19a44f63 698 async def sock_accept(self, sock):
d143209d
VS
699 """Accept a connection.
700
701 The socket must be bound to an address and listening for connections.
702 The return value is a pair (conn, address) where conn is a new socket
703 object usable to send and receive data on the connection, and address
704 is the address bound to the socket on the other end of the connection.
d143209d 705 """
1f9d4c93 706 base_events._check_ssl_socket(sock)
aa41b9b2 707 if self._debug and sock.gettimeout() != 0:
9c9f1f10 708 raise ValueError("the socket must be non-blocking")
7661db62 709 fut = self.create_future()
0dd98c2d 710 self._sock_accept(fut, sock)
19a44f63 711 return await fut
27b7c7eb 712
0dd98c2d 713 def _sock_accept(self, fut, sock):
27b7c7eb 714 fd = sock.fileno()
27b7c7eb
GR
715 try:
716 conn, address = sock.accept()
717 conn.setblocking(False)
718 except (BlockingIOError, InterruptedError):
0dd98c2d
AG
719 self._ensure_fd_no_transport(fd)
720 handle = self._add_reader(fd, self._sock_accept, fut, sock)
721 fut.add_done_callback(
722 functools.partial(self._sock_read_done, fd, handle=handle))
431b540b
YS
723 except (SystemExit, KeyboardInterrupt):
724 raise
725 except BaseException as exc:
27b7c7eb
GR
726 fut.set_exception(exc)
727 else:
728 fut.set_result((conn, address))
729
7c684073
AS
730 async def _sendfile_native(self, transp, file, offset, count):
731 del self._transports[transp._sock_fd]
732 resume_reading = transp.is_reading()
733 transp.pause_reading()
734 await transp._make_empty_waiter()
735 try:
736 return await self.sock_sendfile(transp._sock, file, offset, count,
737 fallback=False)
738 finally:
739 transp._reset_empty_waiter()
740 if resume_reading:
741 transp.resume_reading()
742 self._transports[transp._sock_fd] = transp
743
27b7c7eb
GR
744 def _process_events(self, event_list):
745 for key, mask in event_list:
746 fileobj, (reader, writer) = key.fileobj, key.data
747 if mask & selectors.EVENT_READ and reader is not None:
748 if reader._cancelled:
5b8d4f97 749 self._remove_reader(fileobj)
27b7c7eb
GR
750 else:
751 self._add_callback(reader)
752 if mask & selectors.EVENT_WRITE and writer is not None:
753 if writer._cancelled:
5b8d4f97 754 self._remove_writer(fileobj)
27b7c7eb
GR
755 else:
756 self._add_callback(writer)
757
758 def _stop_serving(self, sock):
5b8d4f97 759 self._remove_reader(sock.fileno())
27b7c7eb
GR
760 sock.close()
761
762
c0982413
YS
763class _SelectorTransport(transports._FlowControlMixin,
764 transports.Transport):
27b7c7eb
GR
765
766 max_size = 256 * 1024 # Buffer size passed to recv().
767
978a9afc
VS
768 # Attribute used in the destructor: it must be set even if the constructor
769 # is not called (see _SelectorSslTransport which may start by raising an
770 # exception)
771 _sock = None
772
47bbea71 773 def __init__(self, loop, sock, protocol, extra=None, server=None):
004adb91 774 super().__init__(extra, loop)
8cd5165b 775 self._extra['socket'] = trsock.TransportSocket(sock)
63deaa5b
VM
776 try:
777 self._extra['sockname'] = sock.getsockname()
778 except OSError:
779 self._extra['sockname'] = None
27b7c7eb
GR
780 if 'peername' not in self._extra:
781 try:
782 self._extra['peername'] = sock.getpeername()
783 except socket.error:
784 self._extra['peername'] = None
27b7c7eb
GR
785 self._sock = sock
786 self._sock_fd = sock.fileno()
dbf10227
YS
787
788 self._protocol_connected = False
789 self.set_protocol(protocol)
790
27b7c7eb 791 self._server = server
c122390a 792 self._buffer = collections.deque()
2546a177 793 self._conn_lost = 0 # Set when call to connection_lost scheduled.
27b7c7eb 794 self._closing = False # Set when close() called.
78942ecd
I
795 self._paused = False # Set when pause_reading() called
796
355491dc 797 if self._server is not None:
41596441 798 self._server._attach(self)
5b8d4f97 799 loop._transports[self._sock_fd] = self
27b7c7eb 800
e912e652 801 def __repr__(self):
0e34dc37
VS
802 info = [self.__class__.__name__]
803 if self._sock is None:
804 info.append('closed')
805 elif self._closing:
806 info.append('closing')
6370f345 807 info.append(f'fd={self._sock_fd}')
b261475a 808 # test if the transport was closed
79fd9626 809 if self._loop is not None and not self._loop.is_closed():
b261475a
VS
810 polling = _test_selector_event(self._loop._selector,
811 self._sock_fd, selectors.EVENT_READ)
812 if polling:
813 info.append('read=polling')
814 else:
815 info.append('read=idle')
e912e652 816
b261475a 817 polling = _test_selector_event(self._loop._selector,
15cc678d
VS
818 self._sock_fd,
819 selectors.EVENT_WRITE)
b261475a
VS
820 if polling:
821 state = 'polling'
822 else:
823 state = 'idle'
e912e652 824
b261475a 825 bufsize = self.get_write_buffer_size()
6370f345
YS
826 info.append(f'write=<{state}, bufsize={bufsize}>')
827 return '<{}>'.format(' '.join(info))
e912e652 828
27b7c7eb
GR
829 def abort(self):
830 self._force_close(None)
831
a05a6ef1
YS
832 def set_protocol(self, protocol):
833 self._protocol = protocol
dbf10227 834 self._protocol_connected = True
a05a6ef1
YS
835
836 def get_protocol(self):
837 return self._protocol
838
5bb1afb3
YS
839 def is_closing(self):
840 return self._closing
841
78942ecd
I
842 def is_reading(self):
843 return not self.is_closing() and not self._paused
844
845 def pause_reading(self):
846 if not self.is_reading():
847 return
848 self._paused = True
849 self._loop._remove_reader(self._sock_fd)
850 if self._loop.get_debug():
851 logger.debug("%r pauses reading", self)
852
853 def resume_reading(self):
854 if self._closing or not self._paused:
855 return
856 self._paused = False
857 self._add_reader(self._sock_fd, self._read_ready)
858 if self._loop.get_debug():
859 logger.debug("%r resumes reading", self)
860
27b7c7eb
GR
861 def close(self):
862 if self._closing:
863 return
864 self._closing = True
5b8d4f97 865 self._loop._remove_reader(self._sock_fd)
27b7c7eb 866 if not self._buffer:
2546a177 867 self._conn_lost += 1
5b8d4f97 868 self._loop._remove_writer(self._sock_fd)
27b7c7eb
GR
869 self._loop.call_soon(self._call_connection_lost, None)
870
fb2c3465 871 def __del__(self, _warn=warnings.warn):
3e2ad8ec 872 if self._sock is not None:
fb2c3465 873 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
3e2ad8ec 874 self._sock.close()
41596441
POT
875 if self._server is not None:
876 self._server._detach(self)
978a9afc 877
065ca25a 878 def _fatal_error(self, exc, message='Fatal error on transport'):
2546a177 879 # Should be called from exception handler only.
1f39c28e 880 if isinstance(exc, OSError):
e912e652
VS
881 if self._loop.get_debug():
882 logger.debug("%r: %s", self, message, exc_info=True)
883 else:
ff827f08 884 self._loop.call_exception_handler({
065ca25a 885 'message': message,
ff827f08
YS
886 'exception': exc,
887 'transport': self,
888 'protocol': self._protocol,
889 })
27b7c7eb
GR
890 self._force_close(exc)
891
892 def _force_close(self, exc):
2546a177
GR
893 if self._conn_lost:
894 return
27b7c7eb
GR
895 if self._buffer:
896 self._buffer.clear()
5b8d4f97 897 self._loop._remove_writer(self._sock_fd)
2546a177
GR
898 if not self._closing:
899 self._closing = True
5b8d4f97 900 self._loop._remove_reader(self._sock_fd)
27b7c7eb 901 self._conn_lost += 1
27b7c7eb
GR
902 self._loop.call_soon(self._call_connection_lost, exc)
903
904 def _call_connection_lost(self, exc):
905 try:
47bbea71
VS
906 if self._protocol_connected:
907 self._protocol.connection_lost(exc)
27b7c7eb
GR
908 finally:
909 self._sock.close()
910 self._sock = None
911 self._protocol = None
912 self._loop = None
913 server = self._server
914 if server is not None:
41596441 915 server._detach(self)
27b7c7eb
GR
916 self._server = None
917
355491dc 918 def get_write_buffer_size(self):
c122390a 919 return sum(map(len, self._buffer))
355491dc 920
a84d0b36 921 def _add_reader(self, fd, callback, *args):
78942ecd 922 if not self.is_reading():
a84d0b36 923 return
a84d0b36
VS
924 self._loop._add_reader(fd, callback, *args)
925
27b7c7eb
GR
926
927class _SelectorSocketTransport(_SelectorTransport):
928
f111b3dc 929 _start_tls_compatible = True
7c684073 930 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
f111b3dc 931
27b7c7eb
GR
932 def __init__(self, loop, sock, protocol, waiter=None,
933 extra=None, server=None):
631fd38d 934
dbf10227 935 self._read_ready_cb = None
27b7c7eb
GR
936 super().__init__(loop, sock, protocol, extra, server)
937 self._eof = False
7c684073 938 self._empty_waiter = None
c122390a
KA
939 if _HAS_SENDMSG:
940 self._write_ready = self._write_sendmsg
941 else:
942 self._write_ready = self._write_send
44c19ecc
YS
943 # Disable the Nagle algorithm -- small writes will be
944 # sent without waiting for the TCP ACK. This generally
945 # decreases the latency (in some cases significantly.)
3bc0ebab 946 base_events._set_nodelay(self._sock)
44c19ecc 947
27b7c7eb 948 self._loop.call_soon(self._protocol.connection_made, self)
fa73779b 949 # only start reading when connection_made() has been called
a84d0b36 950 self._loop.call_soon(self._add_reader,
fa73779b 951 self._sock_fd, self._read_ready)
27b7c7eb 952 if waiter is not None:
f07801bb 953 # only wake up the waiter when connection_made() has been called
5d7e3b6c
YS
954 self._loop.call_soon(futures._set_result_unless_cancelled,
955 waiter, None)
27b7c7eb 956
dbf10227
YS
957 def set_protocol(self, protocol):
958 if isinstance(protocol, protocols.BufferedProtocol):
959 self._read_ready_cb = self._read_ready__get_buffer
960 else:
961 self._read_ready_cb = self._read_ready__data_received
962
963 super().set_protocol(protocol)
964
dbf10227
YS
965 def _read_ready(self):
966 self._read_ready_cb()
967
631fd38d
YS
968 def _read_ready__get_buffer(self):
969 if self._conn_lost:
970 return
971
972 try:
dbf10227
YS
973 buf = self._protocol.get_buffer(-1)
974 if not len(buf):
975 raise RuntimeError('get_buffer() returned an empty buffer')
431b540b
YS
976 except (SystemExit, KeyboardInterrupt):
977 raise
978 except BaseException as exc:
631fd38d
YS
979 self._fatal_error(
980 exc, 'Fatal error: protocol.get_buffer() call failed.')
981 return
982
983 try:
984 nbytes = self._sock.recv_into(buf)
985 except (BlockingIOError, InterruptedError):
986 return
431b540b
YS
987 except (SystemExit, KeyboardInterrupt):
988 raise
989 except BaseException as exc:
631fd38d
YS
990 self._fatal_error(exc, 'Fatal read error on socket transport')
991 return
992
993 if not nbytes:
994 self._read_ready__on_eof()
995 return
996
997 try:
998 self._protocol.buffer_updated(nbytes)
431b540b
YS
999 except (SystemExit, KeyboardInterrupt):
1000 raise
1001 except BaseException as exc:
631fd38d
YS
1002 self._fatal_error(
1003 exc, 'Fatal error: protocol.buffer_updated() call failed.')
1004
1005 def _read_ready__data_received(self):
ca2e0a48
YS
1006 if self._conn_lost:
1007 return
27b7c7eb
GR
1008 try:
1009 data = self._sock.recv(self.max_size)
1010 except (BlockingIOError, InterruptedError):
631fd38d 1011 return
431b540b
YS
1012 except (SystemExit, KeyboardInterrupt):
1013 raise
1014 except BaseException as exc:
065ca25a 1015 self._fatal_error(exc, 'Fatal read error on socket transport')
631fd38d
YS
1016 return
1017
1018 if not data:
1019 self._read_ready__on_eof()
1020 return
1021
1022 try:
1023 self._protocol.data_received(data)
431b540b
YS
1024 except (SystemExit, KeyboardInterrupt):
1025 raise
1026 except BaseException as exc:
631fd38d
YS
1027 self._fatal_error(
1028 exc, 'Fatal error: protocol.data_received() call failed.')
1029
1030 def _read_ready__on_eof(self):
1031 if self._loop.get_debug():
1032 logger.debug("%r received EOF", self)
1033
1034 try:
1035 keep_open = self._protocol.eof_received()
431b540b
YS
1036 except (SystemExit, KeyboardInterrupt):
1037 raise
1038 except BaseException as exc:
631fd38d
YS
1039 self._fatal_error(
1040 exc, 'Fatal error: protocol.eof_received() call failed.')
1041 return
1042
1043 if keep_open:
1044 # We're keeping the connection open so the
1045 # protocol can write more, but we still can't
1046 # receive more, so remove the reader callback.
1047 self._loop._remove_reader(self._sock_fd)
27b7c7eb 1048 else:
631fd38d 1049 self.close()
27b7c7eb
GR
1050
1051 def write(self, data):
a5062c5d 1052 if not isinstance(data, (bytes, bytearray, memoryview)):
6370f345
YS
1053 raise TypeError(f'data argument must be a bytes-like object, '
1054 f'not {type(data).__name__!r}')
a5062c5d
GR
1055 if self._eof:
1056 raise RuntimeError('Cannot call write() after write_eof()')
7c684073
AS
1057 if self._empty_waiter is not None:
1058 raise RuntimeError('unable to write; sendfile is in progress')
27b7c7eb
GR
1059 if not data:
1060 return
1061
1062 if self._conn_lost:
1063 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
fc29e0f3 1064 logger.warning('socket.send() raised exception.')
27b7c7eb
GR
1065 self._conn_lost += 1
1066 return
1067
1068 if not self._buffer:
355491dc 1069 # Optimization: try to send now.
27b7c7eb
GR
1070 try:
1071 n = self._sock.send(data)
1072 except (BlockingIOError, InterruptedError):
2546a177 1073 pass
431b540b
YS
1074 except (SystemExit, KeyboardInterrupt):
1075 raise
1076 except BaseException as exc:
065ca25a 1077 self._fatal_error(exc, 'Fatal write error on socket transport')
27b7c7eb
GR
1078 return
1079 else:
c122390a 1080 data = memoryview(data)[n:]
27b7c7eb
GR
1081 if not data:
1082 return
355491dc 1083 # Not all was written; register write handler.
5b8d4f97 1084 self._loop._add_writer(self._sock_fd, self._write_ready)
27b7c7eb 1085
355491dc 1086 # Add it to the buffer.
c122390a 1087 self._buffer.append(data)
355491dc 1088 self._maybe_pause_protocol()
27b7c7eb 1089
c122390a
KA
1090 def _get_sendmsg_buffer(self):
1091 return itertools.islice(self._buffer, SC_IOV_MAX)
1092
1093 def _write_sendmsg(self):
a5062c5d 1094 assert self._buffer, 'Data should not be empty'
c122390a
KA
1095 if self._conn_lost:
1096 return
1097 try:
1098 nbytes = self._sock.sendmsg(self._get_sendmsg_buffer())
1099 self._adjust_leftover_buffer(nbytes)
1100 except (BlockingIOError, InterruptedError):
1101 pass
1102 except (SystemExit, KeyboardInterrupt):
1103 raise
1104 except BaseException as exc:
1105 self._loop._remove_writer(self._sock_fd)
1106 self._buffer.clear()
1107 self._fatal_error(exc, 'Fatal write error on socket transport')
1108 if self._empty_waiter is not None:
1109 self._empty_waiter.set_exception(exc)
1110 else:
1111 self._maybe_resume_protocol() # May append to buffer.
1112 if not self._buffer:
1113 self._loop._remove_writer(self._sock_fd)
1114 if self._empty_waiter is not None:
1115 self._empty_waiter.set_result(None)
1116 if self._closing:
1117 self._call_connection_lost(None)
1118 elif self._eof:
1119 self._sock.shutdown(socket.SHUT_WR)
27b7c7eb 1120
c122390a
KA
1121 def _adjust_leftover_buffer(self, nbytes: int) -> None:
1122 buffer = self._buffer
1123 while nbytes:
1124 b = buffer.popleft()
1125 b_len = len(b)
1126 if b_len <= nbytes:
1127 nbytes -= b_len
1128 else:
1129 buffer.appendleft(b[nbytes:])
1130 break
1131
1132 def _write_send(self):
1133 assert self._buffer, 'Data should not be empty'
ca2e0a48
YS
1134 if self._conn_lost:
1135 return
27b7c7eb 1136 try:
c122390a
KA
1137 buffer = self._buffer.popleft()
1138 n = self._sock.send(buffer)
1139 if n != len(buffer):
1140 # Not all data was written
1141 self._buffer.appendleft(buffer[n:])
27b7c7eb 1142 except (BlockingIOError, InterruptedError):
a5062c5d 1143 pass
431b540b
YS
1144 except (SystemExit, KeyboardInterrupt):
1145 raise
1146 except BaseException as exc:
5b8d4f97 1147 self._loop._remove_writer(self._sock_fd)
a5062c5d 1148 self._buffer.clear()
065ca25a 1149 self._fatal_error(exc, 'Fatal write error on socket transport')
7c684073
AS
1150 if self._empty_waiter is not None:
1151 self._empty_waiter.set_exception(exc)
27b7c7eb 1152 else:
355491dc
GR
1153 self._maybe_resume_protocol() # May append to buffer.
1154 if not self._buffer:
5b8d4f97 1155 self._loop._remove_writer(self._sock_fd)
7c684073
AS
1156 if self._empty_waiter is not None:
1157 self._empty_waiter.set_result(None)
27b7c7eb
GR
1158 if self._closing:
1159 self._call_connection_lost(None)
1160 elif self._eof:
1161 self._sock.shutdown(socket.SHUT_WR)
27b7c7eb
GR
1162
1163 def write_eof(self):
23f587e3 1164 if self._closing or self._eof:
27b7c7eb
GR
1165 return
1166 self._eof = True
1167 if not self._buffer:
1168 self._sock.shutdown(socket.SHUT_WR)
1169
c122390a
KA
1170 def writelines(self, list_of_data):
1171 if self._eof:
1172 raise RuntimeError('Cannot call writelines() after write_eof()')
1173 if self._empty_waiter is not None:
1174 raise RuntimeError('unable to writelines; sendfile is in progress')
1175 if not list_of_data:
1176 return
1177 self._buffer.extend([memoryview(data) for data in list_of_data])
1178 self._write_ready()
19d2639d
AAS
1179 # If the entire buffer couldn't be written, register a write handler
1180 if self._buffer:
1181 self._loop._add_writer(self._sock_fd, self._write_ready)
e991ac8f 1182 self._maybe_pause_protocol()
c122390a 1183
27b7c7eb
GR
1184 def can_write_eof(self):
1185 return True
1186
7c684073 1187 def _call_connection_lost(self, exc):
4e38eeaf
VB
1188 try:
1189 super()._call_connection_lost(exc)
1190 finally:
1191 self._write_ready = None
1192 if self._empty_waiter is not None:
1193 self._empty_waiter.set_exception(
1194 ConnectionError("Connection is closed by peer"))
7c684073
AS
1195
1196 def _make_empty_waiter(self):
1197 if self._empty_waiter is not None:
1198 raise RuntimeError("Empty waiter is already set")
1199 self._empty_waiter = self._loop.create_future()
1200 if not self._buffer:
1201 self._empty_waiter.set_result(None)
1202 return self._empty_waiter
1203
1204 def _reset_empty_waiter(self):
1205 self._empty_waiter = None
1206
a6331b60
RK
1207 def close(self):
1208 self._read_ready_cb = None
1209 super().close()
1210
27b7c7eb 1211
9bdec0aa 1212class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):
27b7c7eb 1213
a5062c5d
GR
1214 _buffer_factory = collections.deque
1215
bfff45d6
VS
1216 def __init__(self, loop, sock, protocol, address=None,
1217 waiter=None, extra=None):
27b7c7eb 1218 super().__init__(loop, sock, protocol, extra)
27b7c7eb 1219 self._address = address
42fabc3e 1220 self._buffer_size = 0
27b7c7eb 1221 self._loop.call_soon(self._protocol.connection_made, self)
47bbea71 1222 # only start reading when connection_made() has been called
a84d0b36 1223 self._loop.call_soon(self._add_reader,
47bbea71 1224 self._sock_fd, self._read_ready)
bfff45d6 1225 if waiter is not None:
f07801bb 1226 # only wake up the waiter when connection_made() has been called
5d7e3b6c
YS
1227 self._loop.call_soon(futures._set_result_unless_cancelled,
1228 waiter, None)
27b7c7eb 1229
355491dc 1230 def get_write_buffer_size(self):
42fabc3e 1231 return self._buffer_size
355491dc 1232
27b7c7eb 1233 def _read_ready(self):
ca2e0a48
YS
1234 if self._conn_lost:
1235 return
27b7c7eb
GR
1236 try:
1237 data, addr = self._sock.recvfrom(self.max_size)
1238 except (BlockingIOError, InterruptedError):
1239 pass
2335de7a
GR
1240 except OSError as exc:
1241 self._protocol.error_received(exc)
431b540b
YS
1242 except (SystemExit, KeyboardInterrupt):
1243 raise
1244 except BaseException as exc:
065ca25a 1245 self._fatal_error(exc, 'Fatal read error on datagram transport')
27b7c7eb
GR
1246 else:
1247 self._protocol.datagram_received(data, addr)
1248
1249 def sendto(self, data, addr=None):
a5062c5d 1250 if not isinstance(data, (bytes, bytearray, memoryview)):
6370f345
YS
1251 raise TypeError(f'data argument must be a bytes-like object, '
1252 f'not {type(data).__name__!r}')
27b7c7eb 1253
63deaa5b
VM
1254 if self._address:
1255 if addr not in (None, self._address):
1256 raise ValueError(
1257 f'Invalid address: must be None or {self._address}')
1258 addr = self._address
27b7c7eb
GR
1259
1260 if self._conn_lost and self._address:
1261 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
fc29e0f3 1262 logger.warning('socket.send() raised exception.')
27b7c7eb
GR
1263 self._conn_lost += 1
1264 return
1265
1266 if not self._buffer:
1267 # Attempt to send it right away first.
1268 try:
63deaa5b 1269 if self._extra['peername']:
27b7c7eb
GR
1270 self._sock.send(data)
1271 else:
1272 self._sock.sendto(data, addr)
1273 return
2546a177 1274 except (BlockingIOError, InterruptedError):
5b8d4f97 1275 self._loop._add_writer(self._sock_fd, self._sendto_ready)
2335de7a
GR
1276 except OSError as exc:
1277 self._protocol.error_received(exc)
27b7c7eb 1278 return
431b540b
YS
1279 except (SystemExit, KeyboardInterrupt):
1280 raise
1281 except BaseException as exc:
6370f345
YS
1282 self._fatal_error(
1283 exc, 'Fatal write error on datagram transport')
27b7c7eb
GR
1284 return
1285
a5062c5d
GR
1286 # Ensure that what we buffer is immutable.
1287 self._buffer.append((bytes(data), addr))
73e86370 1288 self._buffer_size += len(data) + 8 # include header bytes
355491dc 1289 self._maybe_pause_protocol()
27b7c7eb
GR
1290
1291 def _sendto_ready(self):
1292 while self._buffer:
1293 data, addr = self._buffer.popleft()
42fabc3e 1294 self._buffer_size -= len(data)
27b7c7eb 1295 try:
63deaa5b 1296 if self._extra['peername']:
27b7c7eb
GR
1297 self._sock.send(data)
1298 else:
1299 self._sock.sendto(data, addr)
2546a177
GR
1300 except (BlockingIOError, InterruptedError):
1301 self._buffer.appendleft((data, addr)) # Try again later.
42fabc3e 1302 self._buffer_size += len(data)
2546a177 1303 break
2335de7a
GR
1304 except OSError as exc:
1305 self._protocol.error_received(exc)
27b7c7eb 1306 return
431b540b
YS
1307 except (SystemExit, KeyboardInterrupt):
1308 raise
1309 except BaseException as exc:
6370f345
YS
1310 self._fatal_error(
1311 exc, 'Fatal write error on datagram transport')
27b7c7eb
GR
1312 return
1313
355491dc 1314 self._maybe_resume_protocol() # May append to buffer.
27b7c7eb 1315 if not self._buffer:
5b8d4f97 1316 self._loop._remove_writer(self._sock_fd)
27b7c7eb
GR
1317 if self._closing:
1318 self._call_connection_lost(None)