with self.assertRaises(OSError) as cm:
self.loop.run_until_complete(coro)
- self.assertTrue(str(cm.exception).startswith('Multiple exceptions: '))
+ self.assertStartsWith(str(cm.exception), 'Multiple exceptions: ')
self.assertTrue(m_socket.socket.return_value.close.called)
coro = self.loop.create_connection(
transp.close()
self.assertEqual(b'OUT:test', proto.data[1])
- self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2])
+ self.assertStartsWith(proto.data[2], b'ERR:test')
self.assertEqual(0, proto.returncode)
@support.requires_subprocess()
stdin.write(b'test')
self.loop.run_until_complete(proto.completed)
- self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'),
- proto.data[1])
+ self.assertStartsWith(proto.data[1], b'OUT:testERR:test')
self.assertEqual(b'', proto.data[2])
transp.close()
def test_future_cancel_message_getter(self):
f = self._new_future(loop=self.loop)
- self.assertTrue(hasattr(f, '_cancel_message'))
+ self.assertHasAttr(f, '_cancel_message')
self.assertEqual(f._cancel_message, None)
f.cancel('my message')
async def test_repr(self):
lock = asyncio.Lock()
- self.assertTrue(repr(lock).endswith('[unlocked]>'))
+ self.assertEndsWith(repr(lock), '[unlocked]>')
self.assertTrue(RGX_REPR.match(repr(lock)))
await lock.acquire()
- self.assertTrue(repr(lock).endswith('[locked]>'))
+ self.assertEndsWith(repr(lock), '[locked]>')
self.assertTrue(RGX_REPR.match(repr(lock)))
async def test_lock(self):
def test_repr(self):
ev = asyncio.Event()
- self.assertTrue(repr(ev).endswith('[unset]>'))
+ self.assertEndsWith(repr(ev), '[unset]>')
match = RGX_REPR.match(repr(ev))
self.assertEqual(match.group('extras'), 'unset')
ev.set()
- self.assertTrue(repr(ev).endswith('[set]>'))
+ self.assertEndsWith(repr(ev), '[set]>')
self.assertTrue(RGX_REPR.match(repr(ev)))
ev._waiters.append(mock.Mock())
async def test_repr(self):
sem = asyncio.Semaphore()
- self.assertTrue(repr(sem).endswith('[unlocked, value:1]>'))
+ self.assertEndsWith(repr(sem), '[unlocked, value:1]>')
self.assertTrue(RGX_REPR.match(repr(sem)))
await sem.acquire()
- self.assertTrue(repr(sem).endswith('[locked]>'))
+ self.assertEndsWith(repr(sem), '[locked]>')
self.assertTrue('waiters' not in repr(sem))
self.assertTrue(RGX_REPR.match(repr(sem)))
self.assertIsNone(p.connection_lost(f))
self.assertIsNone(p.pause_writing())
self.assertIsNone(p.resume_writing())
- self.assertFalse(hasattr(p, '__dict__'))
+ self.assertNotHasAttr(p, '__dict__')
def test_protocol(self):
f = mock.Mock()
self.assertIsNone(p.eof_received())
self.assertIsNone(p.pause_writing())
self.assertIsNone(p.resume_writing())
- self.assertFalse(hasattr(p, '__dict__'))
+ self.assertNotHasAttr(p, '__dict__')
def test_buffered_protocol(self):
f = mock.Mock()
self.assertIsNone(p.buffer_updated(150))
self.assertIsNone(p.pause_writing())
self.assertIsNone(p.resume_writing())
- self.assertFalse(hasattr(p, '__dict__'))
+ self.assertNotHasAttr(p, '__dict__')
def test_datagram_protocol(self):
f = mock.Mock()
self.assertIsNone(dp.connection_lost(f))
self.assertIsNone(dp.error_received(f))
self.assertIsNone(dp.datagram_received(f, f))
- self.assertFalse(hasattr(dp, '__dict__'))
+ self.assertNotHasAttr(dp, '__dict__')
def test_subprocess_protocol(self):
f = mock.Mock()
self.assertIsNone(sp.pipe_data_received(1, f))
self.assertIsNone(sp.pipe_connection_lost(1, f))
self.assertIsNone(sp.process_exited())
- self.assertFalse(hasattr(sp, '__dict__'))
+ self.assertNotHasAttr(sp, '__dict__')
if __name__ == '__main__':
appear in fn(Queue()).
"""
q = asyncio.Queue()
- self.assertTrue(fn(q).startswith('<Queue'), fn(q))
+ self.assertStartsWith(fn(q), '<Queue')
id_is_present = hex(id(q)) in fn(q)
self.assertEqual(expect_id, id_is_present)
self.loop.run_until_complete(
self.loop.sock_recv(sock, 1024))
sock.close()
- self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
+ self.assertStartsWith(data, b'HTTP/1.0 200 OK')
def _basetest_sock_recv_into(self, httpd, sock):
# same as _basetest_sock_client_ops, but using sock_recv_into
self.loop.run_until_complete(
self.loop.sock_recv_into(sock, buf[nbytes:]))
sock.close()
- self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
+ self.assertStartsWith(data, b'HTTP/1.0 200 OK')
def test_sock_client_ops(self):
with test_utils.run_test_server() as httpd:
# consume data
await self.loop.sock_recv(sock, 1024)
- self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
+ self.assertStartsWith(data, b'HTTP/1.0 200 OK')
async def _basetest_sock_recv_into_racing(self, httpd, sock):
sock.setblocking(False)
nbytes = await self.loop.sock_recv_into(sock, buf[:1024])
# consume data
await self.loop.sock_recv_into(sock, buf[nbytes:])
- self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
+ self.assertStartsWith(data, b'HTTP/1.0 200 OK')
await task
sock.shutdown(socket.SHUT_WR)
data = await task
# ProactorEventLoop could deliver hello, so endswith is necessary
- self.assertTrue(data.endswith(b'world'))
+ self.assertEndsWith(data, b'world')
# After the first connect attempt before the listener is ready,
# the socket needs time to "recover" to make the next connect call.
data = await self.loop.sock_recv(sock, DATA_SIZE)
# HTTP headers size is less than MTU,
# they are sent by the first packet always
- self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
+ self.assertStartsWith(data, b'HTTP/1.0 200 OK')
while data.find(b'\r\n\r\n') == -1:
data += await self.loop.sock_recv(sock, DATA_SIZE)
# Strip headers
data = bytes(buf[:nbytes])
# HTTP headers size is less than MTU,
# they are sent by the first packet always
- self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
+ self.assertStartsWith(data, b'HTTP/1.0 200 OK')
while data.find(b'\r\n\r\n') == -1:
nbytes = await self.loop.sock_recv_into(sock, buf)
data = bytes(buf[:nbytes])
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
f = reader.read()
data = self.loop.run_until_complete(f)
- self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
+ self.assertEndsWith(data, b'\r\n\r\nTest message')
writer.close()
self.assertEqual(messages, [])
writer.write(b'GET / HTTP/1.0\r\n\r\n')
f = reader.read()
data = self.loop.run_until_complete(f)
- self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
+ self.assertEndsWith(data, b'\r\n\r\nTest message')
writer.close()
self.assertEqual(messages, [])
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
f = rd.read()
data = self.loop.run_until_complete(f)
- self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
+ self.assertEndsWith(data, b'\r\n\r\nTest message')
self.assertFalse(wr.is_closing())
wr.close()
self.assertTrue(wr.is_closing())
data = await rd.readline()
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
data = await rd.read()
- self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
+ self.assertEndsWith(data, b'\r\n\r\nTest message')
wr.close()
await wr.wait_closed()
data = await rd.readline()
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
data = await rd.read()
- self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
+ self.assertEndsWith(data, b'\r\n\r\nTest message')
wr.close()
with self.assertRaises(ConnectionResetError):
wr.write(b'data')
data = await rd.readline()
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
data = await rd.read()
- self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
+ self.assertEndsWith(data, b'\r\n\r\nTest message')
with self.assertWarns(ResourceWarning) as cm:
del wr
gc.collect()
self.assertEqual(len(cm.warnings), 1)
- self.assertTrue(str(cm.warnings[0].message).startswith("unclosed <StreamWriter"))
+ self.assertStartsWith(str(cm.warnings[0].message), "unclosed <StreamWriter")
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
data = await rd.readline()
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
data = await rd.read()
- self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
+ self.assertEndsWith(data, b'\r\n\r\nTest message')
# Make "loop is closed" occur first before "del wr" for this test.
self.loop.stop()
del wr
gc.collect()
self.assertEqual(len(cm.warnings), 1)
- self.assertTrue(str(cm.warnings[0].message).startswith("unclosed <StreamWriter"))
+ self.assertStartsWith(str(cm.warnings[0].message), "unclosed <StreamWriter")
async def outer():
srv = await asyncio.start_server(inner, socket_helper.HOSTv4, 0)
async def coro():
pass
t = self.new_task(self.loop, coro())
- self.assertTrue(hasattr(t, '_cancel_message'))
+ self.assertHasAttr(t, '_cancel_message')
self.assertEqual(t._cancel_message, None)
t.cancel('my message')
class GenericTaskTests(test_utils.TestCase):
def test_future_subclass(self):
- self.assertTrue(issubclass(asyncio.Task, asyncio.Future))
+ self.assertIsSubclass(asyncio.Task, asyncio.Future)
@support.cpython_only
def test_asyncio_module_compiled(self):
self.assertGreater(len(out), 0)
self.assertGreater(len(err), 0)
# allow for partial reads...
- self.assertTrue(msg.upper().rstrip().startswith(out))
- self.assertTrue(b"stderr".startswith(err))
+ self.assertStartsWith(msg.upper().rstrip(), out)
+ self.assertStartsWith(b"stderr", err)
# The context manager calls wait() and closes resources
with p: