def recv_into(self, buffer, nbytes=None, flags=0):
self._checkClosed()
- if buffer and (nbytes is None):
- nbytes = len(buffer)
- elif nbytes is None:
- nbytes = 1024
+ if nbytes is None:
+ if buffer is not None:
+ with memoryview(buffer) as view:
+ nbytes = view.nbytes
+ if not nbytes:
+ nbytes = 1024
+ else:
+ nbytes = 1024
if self._sslobj is not None:
if flags != 0:
raise ValueError(
from test.support import threading_helper
from test.support import warnings_helper
from test.support import asyncore
+import array
import re
import socket
import select
self.assertEqual(s.recv(0), b"")
self.assertEqual(s.recv_into(bytearray()), 0)
+ def test_recv_into_buffer_protocol_len(self):
+ server = ThreadedEchoServer(CERTFILE)
+ self.enterContext(server)
+ s = socket.create_connection((HOST, server.port))
+ self.addCleanup(s.close)
+ s = test_wrap_socket(s, suppress_ragged_eofs=False)
+ self.addCleanup(s.close)
+
+ s.send(b"data")
+ buf = array.array('I', [0, 0])
+ self.assertEqual(s.recv_into(buf), 4)
+ self.assertEqual(bytes(buf)[:4], b"data")
+
+ class B(bytearray):
+ def __len__(self):
+ 1/0
+ s.send(b"data")
+ buf = B(6)
+ self.assertEqual(s.recv_into(buf), 4)
+ self.assertEqual(bytes(buf), b"data\0\0")
+
def test_nonblocking_send(self):
server = ThreadedEchoServer(CERTFILE,
certreqs=ssl.CERT_NONE,