]> git.ipfire.org Git - thirdparty/Python/cpython.git/blame - Lib/asyncio/selector_events.py
gh-109534: fix reference leak when SSL handshake fails (#114074)
[thirdparty/Python/cpython.git] / Lib / asyncio / selector_events.py
CommitLineData
27b7c7eb
GR
1"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer. For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
6370f345 7__all__ = 'BaseSelectorEventLoop',
8dffc456 8
27b7c7eb 9import collections
3317a132 10import errno
d5aeccf9 11import functools
c122390a
KA
12import itertools
13import os
4271dfd7 14import selectors
27b7c7eb 15import socket
978a9afc 16import warnings
5b8d4f97 17import weakref
27b7c7eb
GR
18try:
19 import ssl
20except ImportError: # pragma: no cover
21 ssl = None
22
23from . import base_events
24from . import constants
25from . import events
26from . import futures
631fd38d 27from . import protocols
231b404c 28from . import sslproto
631fd38d 29from . import transports
8cd5165b 30from . import trsock
fc29e0f3 31from .log import logger
27b7c7eb 32
c122390a
KA
33_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg')
34
35if _HAS_SENDMSG:
36 try:
37 SC_IOV_MAX = os.sysconf('SC_IOV_MAX')
38 except OSError:
39 # Fallback to send
40 _HAS_SENDMSG = False
27b7c7eb 41
e912e652
VS
42def _test_selector_event(selector, fd, event):
43 # Test if the selector is monitoring 'event' events
44 # for the file descriptor 'fd'.
45 try:
46 key = selector.get_key(fd)
47 except KeyError:
48 return False
49 else:
50 return bool(key.events & event)
51
52
27b7c7eb
GR
53class BaseSelectorEventLoop(base_events.BaseEventLoop):
54 """Selector event loop.
55
56 See events.EventLoop for API specification.
57 """
58
59 def __init__(self, selector=None):
60 super().__init__()
61
62 if selector is None:
63 selector = selectors.DefaultSelector()
fc29e0f3 64 logger.debug('Using selector: %s', selector.__class__.__name__)
27b7c7eb
GR
65 self._selector = selector
66 self._make_self_pipe()
5b8d4f97 67 self._transports = weakref.WeakValueDictionary()
27b7c7eb
GR
68
69 def _make_socket_transport(self, sock, protocol, waiter=None, *,
70 extra=None, server=None):
c0627640 71 self._ensure_fd_no_transport(sock)
27b7c7eb
GR
72 return _SelectorSocketTransport(self, sock, protocol, waiter,
73 extra, server)
74
f7686c1f
NA
75 def _make_ssl_transport(
76 self, rawsock, protocol, sslcontext, waiter=None,
77 *, server_side=False, server_hostname=None,
78 extra=None, server=None,
13c10bfb
KA
79 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
80 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
81 ):
c0627640 82 self._ensure_fd_no_transport(rawsock)
f7686c1f 83 ssl_protocol = sslproto.SSLProtocol(
13c10bfb
KA
84 self, protocol, sslcontext, waiter,
85 server_side, server_hostname,
86 ssl_handshake_timeout=ssl_handshake_timeout,
87 ssl_shutdown_timeout=ssl_shutdown_timeout
88 )
231b404c
VS
89 _SelectorSocketTransport(self, rawsock, ssl_protocol,
90 extra=extra, server=server)
91 return ssl_protocol._app_transport
92
27b7c7eb 93 def _make_datagram_transport(self, sock, protocol,
bfff45d6 94 address=None, waiter=None, extra=None):
c0627640 95 self._ensure_fd_no_transport(sock)
bfff45d6
VS
96 return _SelectorDatagramTransport(self, sock, protocol,
97 address, waiter, extra)
27b7c7eb
GR
98
99 def close(self):
956de691 100 if self.is_running():
5e63120f 101 raise RuntimeError("Cannot close a running event loop")
bb2fc5b2
VS
102 if self.is_closed():
103 return
104 self._close_self_pipe()
5e63120f 105 super().close()
27b7c7eb 106 if self._selector is not None:
27b7c7eb
GR
107 self._selector.close()
108 self._selector = None
109
27b7c7eb 110 def _close_self_pipe(self):
5b8d4f97 111 self._remove_reader(self._ssock.fileno())
27b7c7eb
GR
112 self._ssock.close()
113 self._ssock = None
114 self._csock.close()
115 self._csock = None
116 self._internal_fds -= 1
117
118 def _make_self_pipe(self):
119 # A self-socket, really. :-)
a10dc3ef 120 self._ssock, self._csock = socket.socketpair()
27b7c7eb
GR
121 self._ssock.setblocking(False)
122 self._csock.setblocking(False)
123 self._internal_fds += 1
5b8d4f97 124 self._add_reader(self._ssock.fileno(), self._read_from_self)
27b7c7eb 125
fe5649c7
VS
126 def _process_self_data(self, data):
127 pass
128
27b7c7eb 129 def _read_from_self(self):
54c4b8e5
VS
130 while True:
131 try:
132 data = self._ssock.recv(4096)
133 if not data:
134 break
fe5649c7 135 self._process_self_data(data)
54c4b8e5
VS
136 except InterruptedError:
137 continue
138 except BlockingIOError:
139 break
27b7c7eb
GR
140
141 def _write_to_self(self):
3d139d8e
GR
142 # This may be called from a different thread, possibly after
143 # _close_self_pipe() has been called or even while it is
144 # running. Guard for self._csock being None or closed. When
145 # a socket is closed, send() raises OSError (with errno set to
146 # EBADF, but let's not rely on the exact error code).
147 csock = self._csock
1b0f0e3d
VS
148 if csock is None:
149 return
150
151 try:
152 csock.send(b'\0')
153 except OSError:
154 if self._debug:
155 logger.debug("Fail to write a null byte into the "
156 "self-pipe socket",
157 exc_info=True)
27b7c7eb 158
28dff0d8 159 def _start_serving(self, protocol_factory, sock,
f7686c1f 160 sslcontext=None, server=None, backlog=100,
13c10bfb
KA
161 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
162 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
5b8d4f97 163 self._add_reader(sock.fileno(), self._accept_connection,
f7686c1f 164 protocol_factory, sock, sslcontext, server, backlog,
13c10bfb 165 ssl_handshake_timeout, ssl_shutdown_timeout)
27b7c7eb 166
f7686c1f
NA
167 def _accept_connection(
168 self, protocol_factory, sock,
169 sslcontext=None, server=None, backlog=100,
13c10bfb
KA
170 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
171 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
a1b0e7db
YS
172 # This method is only called once for each event loop tick where the
173 # listening socket has triggered an EVENT_READ. There may be multiple
174 # connections waiting for an .accept() so it is called in a loop.
175 # See https://bugs.python.org/issue27906 for more details.
176 for _ in range(backlog):
177 try:
178 conn, addr = sock.accept()
179 if self._debug:
180 logger.debug("%r got a new connection from %r: %r",
181 server, addr, conn)
182 conn.setblocking(False)
183 except (BlockingIOError, InterruptedError, ConnectionAbortedError):
184 # Early exit because the socket accept buffer is empty.
185 return None
186 except OSError as exc:
187 # There's nowhere to send the error, so just log it.
188 if exc.errno in (errno.EMFILE, errno.ENFILE,
189 errno.ENOBUFS, errno.ENOMEM):
190 # Some platforms (e.g. Linux keep reporting the FD as
191 # ready, so we remove the read handler temporarily.
192 # We'll try again in a while.
193 self.call_exception_handler({
194 'message': 'socket.accept() out of system resource',
195 'exception': exc,
8cd5165b 196 'socket': trsock.TransportSocket(sock),
a1b0e7db 197 })
5b8d4f97 198 self._remove_reader(sock.fileno())
a1b0e7db
YS
199 self.call_later(constants.ACCEPT_RETRY_DELAY,
200 self._start_serving,
201 protocol_factory, sock, sslcontext, server,
13c10bfb
KA
202 backlog, ssl_handshake_timeout,
203 ssl_shutdown_timeout)
a1b0e7db
YS
204 else:
205 raise # The event loop will catch, log and ignore it.
3317a132 206 else:
a1b0e7db 207 extra = {'peername': addr}
6370f345 208 accept = self._accept_connection2(
f7686c1f 209 protocol_factory, conn, extra, sslcontext, server,
13c10bfb 210 ssl_handshake_timeout, ssl_shutdown_timeout)
a1b0e7db 211 self.create_task(accept)
2934262f 212
f7686c1f
NA
213 async def _accept_connection2(
214 self, protocol_factory, conn, extra,
215 sslcontext=None, server=None,
13c10bfb
KA
216 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
217 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
2934262f
VS
218 protocol = None
219 transport = None
220 try:
29ad0111 221 protocol = protocol_factory()
7661db62 222 waiter = self.create_future()
28dff0d8 223 if sslcontext:
2934262f
VS
224 transport = self._make_ssl_transport(
225 conn, protocol, sslcontext, waiter=waiter,
f7686c1f 226 server_side=True, extra=extra, server=server,
13c10bfb
KA
227 ssl_handshake_timeout=ssl_handshake_timeout,
228 ssl_shutdown_timeout=ssl_shutdown_timeout)
27b7c7eb 229 else:
2934262f
VS
230 transport = self._make_socket_transport(
231 conn, protocol, waiter=waiter, extra=extra,
27b7c7eb 232 server=server)
2934262f
VS
233
234 try:
5f841b55 235 await waiter
431b540b 236 except BaseException:
2934262f 237 transport.close()
80aa7b36
JP
238 # gh-109534: When an exception is raised by the SSLProtocol object the
239 # exception set in this future can keep the protocol object alive and
240 # cause a reference cycle.
241 waiter = None
2934262f 242 raise
431b540b 243 # It's now up to the protocol to handle the connection.
2934262f 244
431b540b
YS
245 except (SystemExit, KeyboardInterrupt):
246 raise
247 except BaseException as exc:
aa41b9b2 248 if self._debug:
2934262f 249 context = {
6370f345
YS
250 'message':
251 'Error on transport creation for incoming connection',
2934262f
VS
252 'exception': exc,
253 }
254 if protocol is not None:
255 context['protocol'] = protocol
256 if transport is not None:
257 context['transport'] = transport
258 self.call_exception_handler(context)
27b7c7eb 259
5b8d4f97 260 def _ensure_fd_no_transport(self, fd):
ce12629c
YS
261 fileno = fd
262 if not isinstance(fileno, int):
263 try:
264 fileno = int(fileno.fileno())
265 except (AttributeError, TypeError, ValueError):
266 # This code matches selectors._fileobj_to_fd function.
6370f345 267 raise ValueError(f"Invalid file object: {fd!r}") from None
7e2d93f3
NK
268 transport = self._transports.get(fileno)
269 if transport and not transport.is_closing():
270 raise RuntimeError(
271 f'File descriptor {fd!r} is used by transport '
272 f'{transport!r}')
5b8d4f97
YS
273
274 def _add_reader(self, fd, callback, *args):
bb2fc5b2 275 self._check_closed()
f23746a9 276 handle = events.Handle(callback, args, self, None)
b7dc795d
NK
277 key = self._selector.get_map().get(fd)
278 if key is None:
27b7c7eb
GR
279 self._selector.register(fd, selectors.EVENT_READ,
280 (handle, None))
281 else:
282 mask, (reader, writer) = key.events, key.data
283 self._selector.modify(fd, mask | selectors.EVENT_READ,
284 (handle, writer))
285 if reader is not None:
286 reader.cancel()
210a1373 287 return handle
27b7c7eb 288
5b8d4f97 289 def _remove_reader(self, fd):
bb2fc5b2 290 if self.is_closed():
eeeebcd8 291 return False
b7dc795d
NK
292 key = self._selector.get_map().get(fd)
293 if key is None:
27b7c7eb 294 return False
b7dc795d
NK
295 mask, (reader, writer) = key.events, key.data
296 mask &= ~selectors.EVENT_READ
297 if not mask:
298 self._selector.unregister(fd)
27b7c7eb 299 else:
b7dc795d 300 self._selector.modify(fd, mask, (None, writer))
27b7c7eb 301
b7dc795d
NK
302 if reader is not None:
303 reader.cancel()
304 return True
305 else:
306 return False
27b7c7eb 307
5b8d4f97 308 def _add_writer(self, fd, callback, *args):
bb2fc5b2 309 self._check_closed()
f23746a9 310 handle = events.Handle(callback, args, self, None)
b7dc795d
NK
311 key = self._selector.get_map().get(fd)
312 if key is None:
27b7c7eb
GR
313 self._selector.register(fd, selectors.EVENT_WRITE,
314 (None, handle))
315 else:
316 mask, (reader, writer) = key.events, key.data
317 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
318 (reader, handle))
319 if writer is not None:
320 writer.cancel()
210a1373 321 return handle
27b7c7eb 322
5b8d4f97 323 def _remove_writer(self, fd):
27b7c7eb 324 """Remove a writer callback."""
bb2fc5b2 325 if self.is_closed():
eeeebcd8 326 return False
b7dc795d
NK
327 key = self._selector.get_map().get(fd)
328 if key is None:
27b7c7eb 329 return False
b7dc795d
NK
330 mask, (reader, writer) = key.events, key.data
331 # Remove both writer and connector.
332 mask &= ~selectors.EVENT_WRITE
333 if not mask:
334 self._selector.unregister(fd)
27b7c7eb 335 else:
b7dc795d 336 self._selector.modify(fd, mask, (reader, None))
27b7c7eb 337
b7dc795d
NK
338 if writer is not None:
339 writer.cancel()
340 return True
341 else:
342 return False
27b7c7eb 343
5b8d4f97
YS
344 def add_reader(self, fd, callback, *args):
345 """Add a reader callback."""
346 self._ensure_fd_no_transport(fd)
210a1373 347 self._add_reader(fd, callback, *args)
5b8d4f97
YS
348
349 def remove_reader(self, fd):
350 """Remove a reader callback."""
351 self._ensure_fd_no_transport(fd)
352 return self._remove_reader(fd)
353
354 def add_writer(self, fd, callback, *args):
355 """Add a writer callback.."""
356 self._ensure_fd_no_transport(fd)
210a1373 357 self._add_writer(fd, callback, *args)
5b8d4f97
YS
358
359 def remove_writer(self, fd):
360 """Remove a writer callback."""
361 self._ensure_fd_no_transport(fd)
362 return self._remove_writer(fd)
363
19a44f63 364 async def sock_recv(self, sock, n):
d143209d
VS
365 """Receive data from the socket.
366
367 The return value is a bytes object representing the data received.
368 The maximum amount of data to be received at once is specified by
369 nbytes.
d143209d 370 """
1f9d4c93 371 base_events._check_ssl_socket(sock)
aa41b9b2 372 if self._debug and sock.gettimeout() != 0:
9c9f1f10 373 raise ValueError("the socket must be non-blocking")
74387926
AS
374 try:
375 return sock.recv(n)
376 except (BlockingIOError, InterruptedError):
377 pass
7661db62 378 fut = self.create_future()
74387926 379 fd = sock.fileno()
210a1373
FK
380 self._ensure_fd_no_transport(fd)
381 handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
74387926 382 fut.add_done_callback(
210a1373 383 functools.partial(self._sock_read_done, fd, handle=handle))
19a44f63 384 return await fut
27b7c7eb 385
210a1373
FK
386 def _sock_read_done(self, fd, fut, handle=None):
387 if handle is None or not handle.cancelled():
388 self.remove_reader(fd)
74387926
AS
389
390 def _sock_recv(self, fut, sock, n):
28773465 391 # _sock_recv() can add itself as an I/O callback if the operation can't
b0b0e628 392 # be done immediately. Don't use it directly, call sock_recv().
74387926 393 if fut.done():
27b7c7eb
GR
394 return
395 try:
396 data = sock.recv(n)
397 except (BlockingIOError, InterruptedError):
74387926 398 return # try again next time
431b540b
YS
399 except (SystemExit, KeyboardInterrupt):
400 raise
401 except BaseException as exc:
27b7c7eb
GR
402 fut.set_exception(exc)
403 else:
404 fut.set_result(data)
405
19a44f63 406 async def sock_recv_into(self, sock, buf):
525f40d2
AP
407 """Receive data from the socket.
408
409 The received data is written into *buf* (a writable buffer).
410 The return value is the number of bytes written.
525f40d2 411 """
1f9d4c93 412 base_events._check_ssl_socket(sock)
525f40d2
AP
413 if self._debug and sock.gettimeout() != 0:
414 raise ValueError("the socket must be non-blocking")
74387926
AS
415 try:
416 return sock.recv_into(buf)
417 except (BlockingIOError, InterruptedError):
418 pass
525f40d2 419 fut = self.create_future()
74387926 420 fd = sock.fileno()
210a1373
FK
421 self._ensure_fd_no_transport(fd)
422 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
74387926 423 fut.add_done_callback(
210a1373 424 functools.partial(self._sock_read_done, fd, handle=handle))
19a44f63 425 return await fut
525f40d2 426
74387926 427 def _sock_recv_into(self, fut, sock, buf):
525f40d2 428 # _sock_recv_into() can add itself as an I/O callback if the operation
6370f345
YS
429 # can't be done immediately. Don't use it directly, call
430 # sock_recv_into().
74387926 431 if fut.done():
525f40d2
AP
432 return
433 try:
434 nbytes = sock.recv_into(buf)
435 except (BlockingIOError, InterruptedError):
74387926 436 return # try again next time
431b540b
YS
437 except (SystemExit, KeyboardInterrupt):
438 raise
439 except BaseException as exc:
525f40d2
AP
440 fut.set_exception(exc)
441 else:
442 fut.set_result(nbytes)
443
9f04ee56
AG
444 async def sock_recvfrom(self, sock, bufsize):
445 """Receive a datagram from a datagram socket.
446
447 The return value is a tuple of (bytes, address) representing the
448 datagram received and the address it came from.
449 The maximum amount of data to be received at once is specified by
450 nbytes.
451 """
452 base_events._check_ssl_socket(sock)
453 if self._debug and sock.gettimeout() != 0:
454 raise ValueError("the socket must be non-blocking")
455 try:
456 return sock.recvfrom(bufsize)
457 except (BlockingIOError, InterruptedError):
458 pass
459 fut = self.create_future()
460 fd = sock.fileno()
461 self._ensure_fd_no_transport(fd)
462 handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
463 fut.add_done_callback(
464 functools.partial(self._sock_read_done, fd, handle=handle))
465 return await fut
466
467 def _sock_recvfrom(self, fut, sock, bufsize):
468 # _sock_recvfrom() can add itself as an I/O callback if the operation
469 # can't be done immediately. Don't use it directly, call
470 # sock_recvfrom().
471 if fut.done():
472 return
473 try:
474 result = sock.recvfrom(bufsize)
475 except (BlockingIOError, InterruptedError):
476 return # try again next time
477 except (SystemExit, KeyboardInterrupt):
478 raise
479 except BaseException as exc:
480 fut.set_exception(exc)
481 else:
482 fut.set_result(result)
483
484 async def sock_recvfrom_into(self, sock, buf, nbytes=0):
485 """Receive data from the socket.
486
487 The received data is written into *buf* (a writable buffer).
488 The return value is a tuple of (number of bytes written, address).
489 """
490 base_events._check_ssl_socket(sock)
491 if self._debug and sock.gettimeout() != 0:
492 raise ValueError("the socket must be non-blocking")
493 if not nbytes:
494 nbytes = len(buf)
22403d3a 495
9f04ee56
AG
496 try:
497 return sock.recvfrom_into(buf, nbytes)
498 except (BlockingIOError, InterruptedError):
499 pass
500 fut = self.create_future()
501 fd = sock.fileno()
502 self._ensure_fd_no_transport(fd)
503 handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
504 nbytes)
505 fut.add_done_callback(
506 functools.partial(self._sock_read_done, fd, handle=handle))
507 return await fut
508
509 def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
510 # _sock_recv_into() can add itself as an I/O callback if the operation
511 # can't be done immediately. Don't use it directly, call
512 # sock_recv_into().
513 if fut.done():
514 return
515 try:
516 result = sock.recvfrom_into(buf, bufsize)
517 except (BlockingIOError, InterruptedError):
518 return # try again next time
519 except (SystemExit, KeyboardInterrupt):
520 raise
521 except BaseException as exc:
522 fut.set_exception(exc)
523 else:
524 fut.set_result(result)
525
19a44f63 526 async def sock_sendall(self, sock, data):
d143209d
VS
527 """Send data to the socket.
528
529 The socket must be connected to a remote socket. This method continues
530 to send data from data until either all data has been sent or an
531 error occurs. None is returned on success. On error, an exception is
532 raised, and there is no way to determine how much data, if any, was
533 successfully processed by the receiving end of the connection.
d143209d 534 """
1f9d4c93 535 base_events._check_ssl_socket(sock)
aa41b9b2 536 if self._debug and sock.gettimeout() != 0:
9c9f1f10 537 raise ValueError("the socket must be non-blocking")
74387926
AS
538 try:
539 n = sock.send(data)
540 except (BlockingIOError, InterruptedError):
541 n = 0
542
543 if n == len(data):
544 # all data sent
545 return
74387926
AS
546
547 fut = self.create_future()
548 fd = sock.fileno()
210a1373 549 self._ensure_fd_no_transport(fd)
6e789002 550 # use a trick with a list in closure to store a mutable state
210a1373
FK
551 handle = self._add_writer(fd, self._sock_sendall, fut, sock,
552 memoryview(data), [n])
553 fut.add_done_callback(
554 functools.partial(self._sock_write_done, fd, handle=handle))
19a44f63 555 return await fut
27b7c7eb 556
6e789002 557 def _sock_sendall(self, fut, sock, view, pos):
74387926
AS
558 if fut.done():
559 # Future cancellation can be scheduled on previous loop iteration
27b7c7eb 560 return
6e789002 561 start = pos[0]
27b7c7eb 562 try:
6e789002 563 n = sock.send(view[start:])
27b7c7eb 564 except (BlockingIOError, InterruptedError):
74387926 565 return
431b540b
YS
566 except (SystemExit, KeyboardInterrupt):
567 raise
568 except BaseException as exc:
27b7c7eb
GR
569 fut.set_exception(exc)
570 return
571
6e789002
AS
572 start += n
573
574 if start == len(view):
27b7c7eb
GR
575 fut.set_result(None)
576 else:
6e789002 577 pos[0] = start
27b7c7eb 578
9f04ee56
AG
579 async def sock_sendto(self, sock, data, address):
580 """Send data to the socket.
581
582 The socket must be connected to a remote socket. This method continues
583 to send data from data until either all data has been sent or an
584 error occurs. None is returned on success. On error, an exception is
585 raised, and there is no way to determine how much data, if any, was
586 successfully processed by the receiving end of the connection.
587 """
588 base_events._check_ssl_socket(sock)
589 if self._debug and sock.gettimeout() != 0:
590 raise ValueError("the socket must be non-blocking")
591 try:
592 return sock.sendto(data, address)
593 except (BlockingIOError, InterruptedError):
594 pass
595
596 fut = self.create_future()
597 fd = sock.fileno()
598 self._ensure_fd_no_transport(fd)
599 # use a trick with a list in closure to store a mutable state
600 handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
601 address)
602 fut.add_done_callback(
603 functools.partial(self._sock_write_done, fd, handle=handle))
604 return await fut
605
606 def _sock_sendto(self, fut, sock, data, address):
607 if fut.done():
608 # Future cancellation can be scheduled on previous loop iteration
609 return
610 try:
611 n = sock.sendto(data, 0, address)
612 except (BlockingIOError, InterruptedError):
613 return
614 except (SystemExit, KeyboardInterrupt):
615 raise
616 except BaseException as exc:
617 fut.set_exception(exc)
618 else:
619 fut.set_result(n)
620
5f841b55 621 async def sock_connect(self, sock, address):
d143209d
VS
622 """Connect to a remote socket at address.
623
d143209d
VS
624 This method is a coroutine.
625 """
1f9d4c93 626 base_events._check_ssl_socket(sock)
aa41b9b2 627 if self._debug and sock.gettimeout() != 0:
9c9f1f10 628 raise ValueError("the socket must be non-blocking")
f1c6fa98 629
5c30388f
VB
630 if sock.family == socket.AF_INET or (
631 base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
19a44f63 632 resolved = await self._ensure_resolved(
8fb94893
TG
633 address, family=sock.family, type=sock.type, proto=sock.proto,
634 loop=self,
635 )
19a44f63 636 _, _, _, _, address = resolved[0]
d6c6771f
YS
637
638 fut = self.create_future()
639 self._sock_connect(fut, sock, address)
995f6170
DUK
640 try:
641 return await fut
642 finally:
643 # Needed to break cycles when an exception occurs.
644 fut = None
27b7c7eb 645
d5aeccf9 646 def _sock_connect(self, fut, sock, address):
27b7c7eb 647 fd = sock.fileno()
d5aeccf9 648 try:
c9d11c34
VS
649 sock.connect(address)
650 except (BlockingIOError, InterruptedError):
651 # Issue #23618: When the C function connect() fails with EINTR, the
652 # connection runs in background. We have to wait until the socket
653 # becomes writable to be notified when the connection succeed or
654 # fails.
210a1373
FK
655 self._ensure_fd_no_transport(fd)
656 handle = self._add_writer(
657 fd, self._sock_connect_cb, fut, sock, address)
d6c6771f 658 fut.add_done_callback(
210a1373 659 functools.partial(self._sock_write_done, fd, handle=handle))
431b540b
YS
660 except (SystemExit, KeyboardInterrupt):
661 raise
662 except BaseException as exc:
d5aeccf9
VS
663 fut.set_exception(exc)
664 else:
665 fut.set_result(None)
995f6170
DUK
666 finally:
667 fut = None
d5aeccf9 668
210a1373
FK
669 def _sock_write_done(self, fd, fut, handle=None):
670 if handle is None or not handle.cancelled():
671 self.remove_writer(fd)
d5aeccf9
VS
672
673 def _sock_connect_cb(self, fut, sock, address):
74387926 674 if fut.done():
27b7c7eb 675 return
d5aeccf9 676
27b7c7eb 677 try:
d5aeccf9
VS
678 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
679 if err != 0:
680 # Jump to any except clause below.
6370f345 681 raise OSError(err, f'Connect call failed {address}')
27b7c7eb 682 except (BlockingIOError, InterruptedError):
d5aeccf9
VS
683 # socket is still registered, the callback will be retried later
684 pass
431b540b
YS
685 except (SystemExit, KeyboardInterrupt):
686 raise
687 except BaseException as exc:
27b7c7eb
GR
688 fut.set_exception(exc)
689 else:
690 fut.set_result(None)
995f6170
DUK
691 finally:
692 fut = None
27b7c7eb 693
19a44f63 694 async def sock_accept(self, sock):
d143209d
VS
695 """Accept a connection.
696
697 The socket must be bound to an address and listening for connections.
698 The return value is a pair (conn, address) where conn is a new socket
699 object usable to send and receive data on the connection, and address
700 is the address bound to the socket on the other end of the connection.
d143209d 701 """
1f9d4c93 702 base_events._check_ssl_socket(sock)
aa41b9b2 703 if self._debug and sock.gettimeout() != 0:
9c9f1f10 704 raise ValueError("the socket must be non-blocking")
7661db62 705 fut = self.create_future()
0dd98c2d 706 self._sock_accept(fut, sock)
19a44f63 707 return await fut
27b7c7eb 708
0dd98c2d 709 def _sock_accept(self, fut, sock):
27b7c7eb 710 fd = sock.fileno()
27b7c7eb
GR
711 try:
712 conn, address = sock.accept()
713 conn.setblocking(False)
714 except (BlockingIOError, InterruptedError):
0dd98c2d
AG
715 self._ensure_fd_no_transport(fd)
716 handle = self._add_reader(fd, self._sock_accept, fut, sock)
717 fut.add_done_callback(
718 functools.partial(self._sock_read_done, fd, handle=handle))
431b540b
YS
719 except (SystemExit, KeyboardInterrupt):
720 raise
721 except BaseException as exc:
27b7c7eb
GR
722 fut.set_exception(exc)
723 else:
724 fut.set_result((conn, address))
725
7c684073
AS
726 async def _sendfile_native(self, transp, file, offset, count):
727 del self._transports[transp._sock_fd]
728 resume_reading = transp.is_reading()
729 transp.pause_reading()
730 await transp._make_empty_waiter()
731 try:
732 return await self.sock_sendfile(transp._sock, file, offset, count,
733 fallback=False)
734 finally:
735 transp._reset_empty_waiter()
736 if resume_reading:
737 transp.resume_reading()
738 self._transports[transp._sock_fd] = transp
739
27b7c7eb
GR
740 def _process_events(self, event_list):
741 for key, mask in event_list:
742 fileobj, (reader, writer) = key.fileobj, key.data
743 if mask & selectors.EVENT_READ and reader is not None:
744 if reader._cancelled:
5b8d4f97 745 self._remove_reader(fileobj)
27b7c7eb
GR
746 else:
747 self._add_callback(reader)
748 if mask & selectors.EVENT_WRITE and writer is not None:
749 if writer._cancelled:
5b8d4f97 750 self._remove_writer(fileobj)
27b7c7eb
GR
751 else:
752 self._add_callback(writer)
753
754 def _stop_serving(self, sock):
5b8d4f97 755 self._remove_reader(sock.fileno())
27b7c7eb
GR
756 sock.close()
757
758
c0982413
YS
759class _SelectorTransport(transports._FlowControlMixin,
760 transports.Transport):
27b7c7eb
GR
761
762 max_size = 256 * 1024 # Buffer size passed to recv().
763
978a9afc
VS
764 # Attribute used in the destructor: it must be set even if the constructor
765 # is not called (see _SelectorSslTransport which may start by raising an
766 # exception)
767 _sock = None
768
47bbea71 769 def __init__(self, loop, sock, protocol, extra=None, server=None):
004adb91 770 super().__init__(extra, loop)
8cd5165b 771 self._extra['socket'] = trsock.TransportSocket(sock)
63deaa5b
VM
772 try:
773 self._extra['sockname'] = sock.getsockname()
774 except OSError:
775 self._extra['sockname'] = None
27b7c7eb
GR
776 if 'peername' not in self._extra:
777 try:
778 self._extra['peername'] = sock.getpeername()
779 except socket.error:
780 self._extra['peername'] = None
27b7c7eb
GR
781 self._sock = sock
782 self._sock_fd = sock.fileno()
dbf10227
YS
783
784 self._protocol_connected = False
785 self.set_protocol(protocol)
786
27b7c7eb 787 self._server = server
c122390a 788 self._buffer = collections.deque()
2546a177 789 self._conn_lost = 0 # Set when call to connection_lost scheduled.
27b7c7eb 790 self._closing = False # Set when close() called.
78942ecd
I
791 self._paused = False # Set when pause_reading() called
792
355491dc 793 if self._server is not None:
b28dbac8 794 self._server._attach()
5b8d4f97 795 loop._transports[self._sock_fd] = self
27b7c7eb 796
e912e652 797 def __repr__(self):
0e34dc37
VS
798 info = [self.__class__.__name__]
799 if self._sock is None:
800 info.append('closed')
801 elif self._closing:
802 info.append('closing')
6370f345 803 info.append(f'fd={self._sock_fd}')
b261475a 804 # test if the transport was closed
79fd9626 805 if self._loop is not None and not self._loop.is_closed():
b261475a
VS
806 polling = _test_selector_event(self._loop._selector,
807 self._sock_fd, selectors.EVENT_READ)
808 if polling:
809 info.append('read=polling')
810 else:
811 info.append('read=idle')
e912e652 812
b261475a 813 polling = _test_selector_event(self._loop._selector,
15cc678d
VS
814 self._sock_fd,
815 selectors.EVENT_WRITE)
b261475a
VS
816 if polling:
817 state = 'polling'
818 else:
819 state = 'idle'
e912e652 820
b261475a 821 bufsize = self.get_write_buffer_size()
6370f345
YS
822 info.append(f'write=<{state}, bufsize={bufsize}>')
823 return '<{}>'.format(' '.join(info))
e912e652 824
27b7c7eb
GR
825 def abort(self):
826 self._force_close(None)
827
a05a6ef1
YS
828 def set_protocol(self, protocol):
829 self._protocol = protocol
dbf10227 830 self._protocol_connected = True
a05a6ef1
YS
831
832 def get_protocol(self):
833 return self._protocol
834
5bb1afb3
YS
835 def is_closing(self):
836 return self._closing
837
78942ecd
I
838 def is_reading(self):
839 return not self.is_closing() and not self._paused
840
841 def pause_reading(self):
842 if not self.is_reading():
843 return
844 self._paused = True
845 self._loop._remove_reader(self._sock_fd)
846 if self._loop.get_debug():
847 logger.debug("%r pauses reading", self)
848
849 def resume_reading(self):
850 if self._closing or not self._paused:
851 return
852 self._paused = False
853 self._add_reader(self._sock_fd, self._read_ready)
854 if self._loop.get_debug():
855 logger.debug("%r resumes reading", self)
856
27b7c7eb
GR
857 def close(self):
858 if self._closing:
859 return
860 self._closing = True
5b8d4f97 861 self._loop._remove_reader(self._sock_fd)
27b7c7eb 862 if not self._buffer:
2546a177 863 self._conn_lost += 1
5b8d4f97 864 self._loop._remove_writer(self._sock_fd)
27b7c7eb
GR
865 self._loop.call_soon(self._call_connection_lost, None)
866
fb2c3465 867 def __del__(self, _warn=warnings.warn):
3e2ad8ec 868 if self._sock is not None:
fb2c3465 869 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
3e2ad8ec 870 self._sock.close()
978a9afc 871
065ca25a 872 def _fatal_error(self, exc, message='Fatal error on transport'):
2546a177 873 # Should be called from exception handler only.
1f39c28e 874 if isinstance(exc, OSError):
e912e652
VS
875 if self._loop.get_debug():
876 logger.debug("%r: %s", self, message, exc_info=True)
877 else:
ff827f08 878 self._loop.call_exception_handler({
065ca25a 879 'message': message,
ff827f08
YS
880 'exception': exc,
881 'transport': self,
882 'protocol': self._protocol,
883 })
27b7c7eb
GR
884 self._force_close(exc)
885
886 def _force_close(self, exc):
2546a177
GR
887 if self._conn_lost:
888 return
27b7c7eb
GR
889 if self._buffer:
890 self._buffer.clear()
5b8d4f97 891 self._loop._remove_writer(self._sock_fd)
2546a177
GR
892 if not self._closing:
893 self._closing = True
5b8d4f97 894 self._loop._remove_reader(self._sock_fd)
27b7c7eb 895 self._conn_lost += 1
27b7c7eb
GR
896 self._loop.call_soon(self._call_connection_lost, exc)
897
898 def _call_connection_lost(self, exc):
899 try:
47bbea71
VS
900 if self._protocol_connected:
901 self._protocol.connection_lost(exc)
27b7c7eb
GR
902 finally:
903 self._sock.close()
904 self._sock = None
905 self._protocol = None
906 self._loop = None
907 server = self._server
908 if server is not None:
b28dbac8 909 server._detach()
27b7c7eb
GR
910 self._server = None
911
355491dc 912 def get_write_buffer_size(self):
c122390a 913 return sum(map(len, self._buffer))
355491dc 914
a84d0b36 915 def _add_reader(self, fd, callback, *args):
78942ecd 916 if not self.is_reading():
a84d0b36 917 return
a84d0b36
VS
918 self._loop._add_reader(fd, callback, *args)
919
27b7c7eb
GR
920
921class _SelectorSocketTransport(_SelectorTransport):
922
f111b3dc 923 _start_tls_compatible = True
7c684073 924 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
f111b3dc 925
27b7c7eb
GR
926 def __init__(self, loop, sock, protocol, waiter=None,
927 extra=None, server=None):
631fd38d 928
dbf10227 929 self._read_ready_cb = None
27b7c7eb
GR
930 super().__init__(loop, sock, protocol, extra, server)
931 self._eof = False
7c684073 932 self._empty_waiter = None
c122390a
KA
933 if _HAS_SENDMSG:
934 self._write_ready = self._write_sendmsg
935 else:
936 self._write_ready = self._write_send
44c19ecc
YS
937 # Disable the Nagle algorithm -- small writes will be
938 # sent without waiting for the TCP ACK. This generally
939 # decreases the latency (in some cases significantly.)
3bc0ebab 940 base_events._set_nodelay(self._sock)
44c19ecc 941
27b7c7eb 942 self._loop.call_soon(self._protocol.connection_made, self)
fa73779b 943 # only start reading when connection_made() has been called
a84d0b36 944 self._loop.call_soon(self._add_reader,
fa73779b 945 self._sock_fd, self._read_ready)
27b7c7eb 946 if waiter is not None:
f07801bb 947 # only wake up the waiter when connection_made() has been called
5d7e3b6c
YS
948 self._loop.call_soon(futures._set_result_unless_cancelled,
949 waiter, None)
27b7c7eb 950
dbf10227
YS
951 def set_protocol(self, protocol):
952 if isinstance(protocol, protocols.BufferedProtocol):
953 self._read_ready_cb = self._read_ready__get_buffer
954 else:
955 self._read_ready_cb = self._read_ready__data_received
956
957 super().set_protocol(protocol)
958
dbf10227
YS
959 def _read_ready(self):
960 self._read_ready_cb()
961
631fd38d
YS
962 def _read_ready__get_buffer(self):
963 if self._conn_lost:
964 return
965
966 try:
dbf10227
YS
967 buf = self._protocol.get_buffer(-1)
968 if not len(buf):
969 raise RuntimeError('get_buffer() returned an empty buffer')
431b540b
YS
970 except (SystemExit, KeyboardInterrupt):
971 raise
972 except BaseException as exc:
631fd38d
YS
973 self._fatal_error(
974 exc, 'Fatal error: protocol.get_buffer() call failed.')
975 return
976
977 try:
978 nbytes = self._sock.recv_into(buf)
979 except (BlockingIOError, InterruptedError):
980 return
431b540b
YS
981 except (SystemExit, KeyboardInterrupt):
982 raise
983 except BaseException as exc:
631fd38d
YS
984 self._fatal_error(exc, 'Fatal read error on socket transport')
985 return
986
987 if not nbytes:
988 self._read_ready__on_eof()
989 return
990
991 try:
992 self._protocol.buffer_updated(nbytes)
431b540b
YS
993 except (SystemExit, KeyboardInterrupt):
994 raise
995 except BaseException as exc:
631fd38d
YS
996 self._fatal_error(
997 exc, 'Fatal error: protocol.buffer_updated() call failed.')
998
999 def _read_ready__data_received(self):
ca2e0a48
YS
1000 if self._conn_lost:
1001 return
27b7c7eb
GR
1002 try:
1003 data = self._sock.recv(self.max_size)
1004 except (BlockingIOError, InterruptedError):
631fd38d 1005 return
431b540b
YS
1006 except (SystemExit, KeyboardInterrupt):
1007 raise
1008 except BaseException as exc:
065ca25a 1009 self._fatal_error(exc, 'Fatal read error on socket transport')
631fd38d
YS
1010 return
1011
1012 if not data:
1013 self._read_ready__on_eof()
1014 return
1015
1016 try:
1017 self._protocol.data_received(data)
431b540b
YS
1018 except (SystemExit, KeyboardInterrupt):
1019 raise
1020 except BaseException as exc:
631fd38d
YS
1021 self._fatal_error(
1022 exc, 'Fatal error: protocol.data_received() call failed.')
1023
1024 def _read_ready__on_eof(self):
1025 if self._loop.get_debug():
1026 logger.debug("%r received EOF", self)
1027
1028 try:
1029 keep_open = self._protocol.eof_received()
431b540b
YS
1030 except (SystemExit, KeyboardInterrupt):
1031 raise
1032 except BaseException as exc:
631fd38d
YS
1033 self._fatal_error(
1034 exc, 'Fatal error: protocol.eof_received() call failed.')
1035 return
1036
1037 if keep_open:
1038 # We're keeping the connection open so the
1039 # protocol can write more, but we still can't
1040 # receive more, so remove the reader callback.
1041 self._loop._remove_reader(self._sock_fd)
27b7c7eb 1042 else:
631fd38d 1043 self.close()
27b7c7eb
GR
1044
1045 def write(self, data):
a5062c5d 1046 if not isinstance(data, (bytes, bytearray, memoryview)):
6370f345
YS
1047 raise TypeError(f'data argument must be a bytes-like object, '
1048 f'not {type(data).__name__!r}')
a5062c5d
GR
1049 if self._eof:
1050 raise RuntimeError('Cannot call write() after write_eof()')
7c684073
AS
1051 if self._empty_waiter is not None:
1052 raise RuntimeError('unable to write; sendfile is in progress')
27b7c7eb
GR
1053 if not data:
1054 return
1055
1056 if self._conn_lost:
1057 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
fc29e0f3 1058 logger.warning('socket.send() raised exception.')
27b7c7eb
GR
1059 self._conn_lost += 1
1060 return
1061
1062 if not self._buffer:
355491dc 1063 # Optimization: try to send now.
27b7c7eb
GR
1064 try:
1065 n = self._sock.send(data)
1066 except (BlockingIOError, InterruptedError):
2546a177 1067 pass
431b540b
YS
1068 except (SystemExit, KeyboardInterrupt):
1069 raise
1070 except BaseException as exc:
065ca25a 1071 self._fatal_error(exc, 'Fatal write error on socket transport')
27b7c7eb
GR
1072 return
1073 else:
c122390a 1074 data = memoryview(data)[n:]
27b7c7eb
GR
1075 if not data:
1076 return
355491dc 1077 # Not all was written; register write handler.
5b8d4f97 1078 self._loop._add_writer(self._sock_fd, self._write_ready)
27b7c7eb 1079
355491dc 1080 # Add it to the buffer.
c122390a 1081 self._buffer.append(data)
355491dc 1082 self._maybe_pause_protocol()
27b7c7eb 1083
c122390a
KA
1084 def _get_sendmsg_buffer(self):
1085 return itertools.islice(self._buffer, SC_IOV_MAX)
1086
1087 def _write_sendmsg(self):
a5062c5d 1088 assert self._buffer, 'Data should not be empty'
c122390a
KA
1089 if self._conn_lost:
1090 return
1091 try:
1092 nbytes = self._sock.sendmsg(self._get_sendmsg_buffer())
1093 self._adjust_leftover_buffer(nbytes)
1094 except (BlockingIOError, InterruptedError):
1095 pass
1096 except (SystemExit, KeyboardInterrupt):
1097 raise
1098 except BaseException as exc:
1099 self._loop._remove_writer(self._sock_fd)
1100 self._buffer.clear()
1101 self._fatal_error(exc, 'Fatal write error on socket transport')
1102 if self._empty_waiter is not None:
1103 self._empty_waiter.set_exception(exc)
1104 else:
1105 self._maybe_resume_protocol() # May append to buffer.
1106 if not self._buffer:
1107 self._loop._remove_writer(self._sock_fd)
1108 if self._empty_waiter is not None:
1109 self._empty_waiter.set_result(None)
1110 if self._closing:
1111 self._call_connection_lost(None)
1112 elif self._eof:
1113 self._sock.shutdown(socket.SHUT_WR)
27b7c7eb 1114
c122390a
KA
1115 def _adjust_leftover_buffer(self, nbytes: int) -> None:
1116 buffer = self._buffer
1117 while nbytes:
1118 b = buffer.popleft()
1119 b_len = len(b)
1120 if b_len <= nbytes:
1121 nbytes -= b_len
1122 else:
1123 buffer.appendleft(b[nbytes:])
1124 break
1125
1126 def _write_send(self):
1127 assert self._buffer, 'Data should not be empty'
ca2e0a48
YS
1128 if self._conn_lost:
1129 return
27b7c7eb 1130 try:
c122390a
KA
1131 buffer = self._buffer.popleft()
1132 n = self._sock.send(buffer)
1133 if n != len(buffer):
1134 # Not all data was written
1135 self._buffer.appendleft(buffer[n:])
27b7c7eb 1136 except (BlockingIOError, InterruptedError):
a5062c5d 1137 pass
431b540b
YS
1138 except (SystemExit, KeyboardInterrupt):
1139 raise
1140 except BaseException as exc:
5b8d4f97 1141 self._loop._remove_writer(self._sock_fd)
a5062c5d 1142 self._buffer.clear()
065ca25a 1143 self._fatal_error(exc, 'Fatal write error on socket transport')
7c684073
AS
1144 if self._empty_waiter is not None:
1145 self._empty_waiter.set_exception(exc)
27b7c7eb 1146 else:
355491dc
GR
1147 self._maybe_resume_protocol() # May append to buffer.
1148 if not self._buffer:
5b8d4f97 1149 self._loop._remove_writer(self._sock_fd)
7c684073
AS
1150 if self._empty_waiter is not None:
1151 self._empty_waiter.set_result(None)
27b7c7eb
GR
1152 if self._closing:
1153 self._call_connection_lost(None)
1154 elif self._eof:
1155 self._sock.shutdown(socket.SHUT_WR)
27b7c7eb
GR
1156
1157 def write_eof(self):
23f587e3 1158 if self._closing or self._eof:
27b7c7eb
GR
1159 return
1160 self._eof = True
1161 if not self._buffer:
1162 self._sock.shutdown(socket.SHUT_WR)
1163
c122390a
KA
1164 def writelines(self, list_of_data):
1165 if self._eof:
1166 raise RuntimeError('Cannot call writelines() after write_eof()')
1167 if self._empty_waiter is not None:
1168 raise RuntimeError('unable to writelines; sendfile is in progress')
1169 if not list_of_data:
1170 return
1171 self._buffer.extend([memoryview(data) for data in list_of_data])
1172 self._write_ready()
19d2639d
AAS
1173 # If the entire buffer couldn't be written, register a write handler
1174 if self._buffer:
1175 self._loop._add_writer(self._sock_fd, self._write_ready)
c122390a 1176
27b7c7eb
GR
1177 def can_write_eof(self):
1178 return True
1179
7c684073
AS
1180 def _call_connection_lost(self, exc):
1181 super()._call_connection_lost(exc)
1182 if self._empty_waiter is not None:
1183 self._empty_waiter.set_exception(
1184 ConnectionError("Connection is closed by peer"))
1185
1186 def _make_empty_waiter(self):
1187 if self._empty_waiter is not None:
1188 raise RuntimeError("Empty waiter is already set")
1189 self._empty_waiter = self._loop.create_future()
1190 if not self._buffer:
1191 self._empty_waiter.set_result(None)
1192 return self._empty_waiter
1193
1194 def _reset_empty_waiter(self):
1195 self._empty_waiter = None
1196
a6331b60
RK
1197 def close(self):
1198 self._read_ready_cb = None
3e5ce796 1199 self._write_ready = None
a6331b60
RK
1200 super().close()
1201
27b7c7eb 1202
9bdec0aa 1203class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):
27b7c7eb 1204
a5062c5d
GR
1205 _buffer_factory = collections.deque
1206
bfff45d6
VS
1207 def __init__(self, loop, sock, protocol, address=None,
1208 waiter=None, extra=None):
27b7c7eb 1209 super().__init__(loop, sock, protocol, extra)
27b7c7eb 1210 self._address = address
42fabc3e 1211 self._buffer_size = 0
27b7c7eb 1212 self._loop.call_soon(self._protocol.connection_made, self)
47bbea71 1213 # only start reading when connection_made() has been called
a84d0b36 1214 self._loop.call_soon(self._add_reader,
47bbea71 1215 self._sock_fd, self._read_ready)
bfff45d6 1216 if waiter is not None:
f07801bb 1217 # only wake up the waiter when connection_made() has been called
5d7e3b6c
YS
1218 self._loop.call_soon(futures._set_result_unless_cancelled,
1219 waiter, None)
27b7c7eb 1220
355491dc 1221 def get_write_buffer_size(self):
42fabc3e 1222 return self._buffer_size
355491dc 1223
27b7c7eb 1224 def _read_ready(self):
ca2e0a48
YS
1225 if self._conn_lost:
1226 return
27b7c7eb
GR
1227 try:
1228 data, addr = self._sock.recvfrom(self.max_size)
1229 except (BlockingIOError, InterruptedError):
1230 pass
2335de7a
GR
1231 except OSError as exc:
1232 self._protocol.error_received(exc)
431b540b
YS
1233 except (SystemExit, KeyboardInterrupt):
1234 raise
1235 except BaseException as exc:
065ca25a 1236 self._fatal_error(exc, 'Fatal read error on datagram transport')
27b7c7eb
GR
1237 else:
1238 self._protocol.datagram_received(data, addr)
1239
1240 def sendto(self, data, addr=None):
a5062c5d 1241 if not isinstance(data, (bytes, bytearray, memoryview)):
6370f345
YS
1242 raise TypeError(f'data argument must be a bytes-like object, '
1243 f'not {type(data).__name__!r}')
27b7c7eb
GR
1244 if not data:
1245 return
1246
63deaa5b
VM
1247 if self._address:
1248 if addr not in (None, self._address):
1249 raise ValueError(
1250 f'Invalid address: must be None or {self._address}')
1251 addr = self._address
27b7c7eb
GR
1252
1253 if self._conn_lost and self._address:
1254 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
fc29e0f3 1255 logger.warning('socket.send() raised exception.')
27b7c7eb
GR
1256 self._conn_lost += 1
1257 return
1258
1259 if not self._buffer:
1260 # Attempt to send it right away first.
1261 try:
63deaa5b 1262 if self._extra['peername']:
27b7c7eb
GR
1263 self._sock.send(data)
1264 else:
1265 self._sock.sendto(data, addr)
1266 return
2546a177 1267 except (BlockingIOError, InterruptedError):
5b8d4f97 1268 self._loop._add_writer(self._sock_fd, self._sendto_ready)
2335de7a
GR
1269 except OSError as exc:
1270 self._protocol.error_received(exc)
27b7c7eb 1271 return
431b540b
YS
1272 except (SystemExit, KeyboardInterrupt):
1273 raise
1274 except BaseException as exc:
6370f345
YS
1275 self._fatal_error(
1276 exc, 'Fatal write error on datagram transport')
27b7c7eb
GR
1277 return
1278
a5062c5d
GR
1279 # Ensure that what we buffer is immutable.
1280 self._buffer.append((bytes(data), addr))
42fabc3e 1281 self._buffer_size += len(data)
355491dc 1282 self._maybe_pause_protocol()
27b7c7eb
GR
1283
1284 def _sendto_ready(self):
1285 while self._buffer:
1286 data, addr = self._buffer.popleft()
42fabc3e 1287 self._buffer_size -= len(data)
27b7c7eb 1288 try:
63deaa5b 1289 if self._extra['peername']:
27b7c7eb
GR
1290 self._sock.send(data)
1291 else:
1292 self._sock.sendto(data, addr)
2546a177
GR
1293 except (BlockingIOError, InterruptedError):
1294 self._buffer.appendleft((data, addr)) # Try again later.
42fabc3e 1295 self._buffer_size += len(data)
2546a177 1296 break
2335de7a
GR
1297 except OSError as exc:
1298 self._protocol.error_received(exc)
27b7c7eb 1299 return
431b540b
YS
1300 except (SystemExit, KeyboardInterrupt):
1301 raise
1302 except BaseException as exc:
6370f345
YS
1303 self._fatal_error(
1304 exc, 'Fatal write error on datagram transport')
27b7c7eb
GR
1305 return
1306
355491dc 1307 self._maybe_resume_protocol() # May append to buffer.
27b7c7eb 1308 if not self._buffer:
5b8d4f97 1309 self._loop._remove_writer(self._sock_fd)
27b7c7eb
GR
1310 if self._closing:
1311 self._call_connection_lost(None)