* refactor the code to avoid suboptimal asserts.
* Handle ParseError with assertRaises
Signed-off-by: Han Wang <freddie.wanah@gmail.com>
def test_openid_redirect(self):
response = self.fetch("/openid/client/login", follow_redirects=False)
self.assertEqual(response.code, 302)
- self.assertTrue("/openid/server/authenticate?" in response.headers["Location"])
+ self.assertIn("/openid/server/authenticate?", response.headers["Location"])
def test_openid_get_user(self):
response = self.fetch(
)
)
# the cookie is base64('zxcv')|base64('1234')
- self.assertTrue(
- '_oauth_request_token="enhjdg==|MTIzNA=="'
- in response.headers["Set-Cookie"],
+ self.assertIn(
+ '_oauth_request_token="enhjdg==|MTIzNA=="',
+ response.headers["Set-Cookie"],
response.headers["Set-Cookie"],
)
parsed = json_decode(response.body)
self.assertEqual(parsed["oauth_consumer_key"], "asdf")
self.assertEqual(parsed["oauth_token"], "uiop")
- self.assertTrue("oauth_nonce" in parsed)
- self.assertTrue("oauth_signature" in parsed)
+ self.assertIn("oauth_nonce", parsed)
+ self.assertIn("oauth_signature", parsed)
def test_oauth10a_redirect(self):
response = self.fetch("/oauth10a/client/login", follow_redirects=False)
parsed = json_decode(response.body)
self.assertEqual(parsed["oauth_consumer_key"], "asdf")
self.assertEqual(parsed["oauth_token"], "uiop")
- self.assertTrue("oauth_nonce" in parsed)
- self.assertTrue("oauth_signature" in parsed)
+ self.assertIn("oauth_nonce", parsed)
+ self.assertIn("oauth_signature", parsed)
def test_oauth10a_get_user_coroutine_exception(self):
response = self.fetch(
def test_oauth2_redirect(self):
response = self.fetch("/oauth2/client/login", follow_redirects=False)
self.assertEqual(response.code, 302)
- self.assertTrue("/oauth2/server/authorize?" in response.headers["Location"])
+ self.assertIn("/oauth2/server/authorize?", response.headers["Location"])
def test_facebook_login(self):
response = self.fetch("/facebook/client/login", follow_redirects=False)
)
)
# the cookie is base64('zxcv')|base64('1234')
- self.assertTrue(
- '_oauth_request_token="enhjdg==|MTIzNA=="'
- in response.headers["Set-Cookie"],
+ self.assertIn(
+ '_oauth_request_token="enhjdg==|MTIzNA=="',
+ response.headers["Set-Cookie"],
response.headers["Set-Cookie"],
)
response.headers["Location"],
)
# the cookie is base64('zxcv')|base64('1234')
- self.assertTrue(
- '_oauth_request_token="enhjdg==|MTIzNA=="'
- in response.headers["Set-Cookie"],
+ self.assertIn(
+ '_oauth_request_token="enhjdg==|MTIzNA=="',
+ response.headers["Set-Cookie"],
response.headers["Set-Cookie"],
)
return
result = yield f()
- self.assertEqual(result, None)
+ self.assertIsNone(result)
self.finished = True
@gen_test
return
result = yield f()
- self.assertEqual(result, None)
+ self.assertIsNone(result)
self.finished = True
@gen_test
self.io_loop.run_sync(inner2, timeout=3)
- self.assertIs(self.local_ref(), None)
+ self.assertIsNone(self.local_ref())
self.finished = True
def test_asyncio_future_debug_info(self):
with self.assertRaises(ValueError):
g = gen.WaitIterator(Future(), bar=Future())
- self.assertEqual(g.current_index, None, "bad nil current index")
- self.assertEqual(g.current_future, None, "bad nil current future")
+ self.assertIsNone(g.current_index, "bad nil current index")
+ self.assertIsNone(g.current_future, "bad nil current future")
@gen_test
def test_already_done(self):
self.assertEqual(r, 84)
i += 1
- self.assertEqual(g.current_index, None, "bad nil current index")
- self.assertEqual(g.current_future, None, "bad nil current future")
+ self.assertIsNone(g.current_index, "bad nil current index")
+ self.assertIsNone(g.current_future, "bad nil current future")
dg = gen.WaitIterator(f1=f1, f2=f2)
self.fail("got bad WaitIterator index {}".format(dg.current_index))
i += 1
-
- self.assertEqual(dg.current_index, None, "bad nil current index")
- self.assertEqual(dg.current_future, None, "bad nil current future")
+ self.assertIsNone(g.current_index, "bad nil current index")
+ self.assertIsNone(g.current_future, "bad nil current future")
def finish_coroutines(self, iteration, futures):
if iteration == 3:
loop.close()
gc.collect()
# Future was collected
- self.assertIs(wfut[0](), None)
+ self.assertIsNone(wfut[0]())
# At least one wakeup
self.assertGreaterEqual(len(result), 2)
if not self.is_pypy3():
# coroutine finalizer was called (not on PyPy3 apparently)
- self.assertIs(result[-1], None)
+ self.assertIsNone(result[-1])
def test_gc_infinite_async_await(self):
# Same as test_gc_infinite_coro, but with a `async def` function
loop.close()
gc.collect()
# Future was collected
- self.assertIs(wfut[0](), None)
+ self.assertIsNone(wfut[0]())
# At least one wakeup and one finally
self.assertGreaterEqual(len(result), 2)
if not self.is_pypy3():
# coroutine finalizer was called (not on PyPy3 apparently)
- self.assertIs(result[-1], None)
+ self.assertIsNone(result[-1])
def test_multi_moment(self):
# Test gen.multi with moment
start_time = time.time()
response = self.fetch("/hello")
response.rethrow()
- assert response.request_time is not None
+ self.assertIsNotNone(response.request_time)
self.assertGreaterEqual(response.request_time, 0)
self.assertLess(response.request_time, 1.0)
# A very crude check to make sure that start_time is based on
# wall time and not the monotonic clock.
- assert response.start_time is not None
+ self.assertIsNotNone(response.start_time)
self.assertLess(abs(response.start_time - start_time), 1.0)
for k, v in response.time_info.items():
def test_neither_set(self):
proxy = _RequestProxy(HTTPRequest("http://example.com/"), dict())
- self.assertIs(proxy.auth_username, None)
+ self.assertIsNone(proxy.auth_username)
def test_bad_attribute(self):
proxy = _RequestProxy(HTTPRequest("http://example.com/"), dict())
def test_defaults_none(self):
proxy = _RequestProxy(HTTPRequest("http://example.com/"), None)
- self.assertIs(proxy.auth_username, None)
+ self.assertIsNone(proxy.auth_username)
class HTTPResponseTestCase(unittest.TestCase):
self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
yield self.read_response()
data = yield self.stream.read_until_close()
- self.assertTrue(not data)
- self.assertTrue("Connection" not in self.headers)
+ self.assertFalse(data)
+ self.assertNotIn("Connection", self.headers)
self.close()
@gen_test
request = HTTPServerRequest(
uri="/", headers=HTTPHeaders({"Canary": ["Coal Mine"]})
)
- self.assertTrue("Canary" not in repr(request))
+ self.assertNotIn("Canary", repr(request))
class ParseRequestStartLineTest(unittest.TestCase):
# All the timeout methods return non-None handles that can be
# passed to remove_timeout.
handle = self.io_loop.add_timeout(self.io_loop.time(), lambda: None)
- self.assertFalse(handle is None)
+ self.assertIsNotNone(handle)
self.io_loop.remove_timeout(handle)
def test_call_at_return(self):
handle = self.io_loop.call_at(self.io_loop.time(), lambda: None)
- self.assertFalse(handle is None)
+ self.assertIsNotNone(handle)
self.io_loop.remove_timeout(handle)
def test_call_later_return(self):
handle = self.io_loop.call_later(0, lambda: None)
- self.assertFalse(handle is None)
+ self.assertIsNotNone(handle)
self.io_loop.remove_timeout(handle)
def test_close_file_object(self):
)
future = self.wait()
self.assertTrue(future.done())
- self.assertTrue(future.result() is None)
+ self.assertIsNone(future.result())
@gen_test
def test_run_in_executor_gen(self):
def test_context_manager(self):
sem = locks.Semaphore()
with (yield sem.acquire()) as yielded:
- self.assertTrue(yielded is None)
+ self.assertIsNone(yielded)
# Semaphore was released and can be acquired again.
self.assertTrue(asyncio.ensure_future(sem.acquire()).done())
async def f():
async with sem as yielded:
- self.assertTrue(yielded is None)
+ self.assertIsNone(yielded)
yield f()
self.assertTrue(is_valid_ip("4.4.4.4"))
self.assertTrue(is_valid_ip("::1"))
self.assertTrue(is_valid_ip("2620:0:1cfe:face:b00c::3"))
- self.assertTrue(not is_valid_ip("www.google.com"))
- self.assertTrue(not is_valid_ip("localhost"))
- self.assertTrue(not is_valid_ip("4.4.4.4<"))
- self.assertTrue(not is_valid_ip(" 127.0.0.1"))
- self.assertTrue(not is_valid_ip(""))
- self.assertTrue(not is_valid_ip(" "))
- self.assertTrue(not is_valid_ip("\n"))
- self.assertTrue(not is_valid_ip("\x00"))
- self.assertTrue(not is_valid_ip("a" * 100))
+ self.assertFalse(is_valid_ip("www.google.com"))
+ self.assertFalse(is_valid_ip("localhost"))
+ self.assertFalse(is_valid_ip("4.4.4.4<"))
+ self.assertFalse(is_valid_ip(" 127.0.0.1"))
+ self.assertFalse(is_valid_ip(""))
+ self.assertFalse(is_valid_ip(" "))
+ self.assertFalse(is_valid_ip("\n"))
+ self.assertFalse(is_valid_ip("\x00"))
+ self.assertFalse(is_valid_ip("a" * 100))
class TestPortAllocation(unittest.TestCase):
signal.alarm(5) # master process
try:
id = fork_processes(3, max_restarts=3)
- self.assertTrue(id is not None)
+ self.assertIsNotNone(id)
signal.alarm(5) # child processes
except SystemExit as e:
# if we exit cleanly from fork_processes, all the child processes
# finished with status 0
self.assertEqual(e.code, 0)
- self.assertTrue(task_id() is None)
+ self.assertIsNone(task_id())
sock.close()
return
try:
def test_singleton(self: typing.Any):
# Class "constructor" reuses objects on the same IOLoop
- self.assertTrue(SimpleAsyncHTTPClient() is SimpleAsyncHTTPClient())
+ self.assertIs(SimpleAsyncHTTPClient(), SimpleAsyncHTTPClient())
# unless force_instance is used
- self.assertTrue(
- SimpleAsyncHTTPClient() is not SimpleAsyncHTTPClient(force_instance=True)
+ self.assertIsNot(
+ SimpleAsyncHTTPClient(), SimpleAsyncHTTPClient(force_instance=True)
)
# different IOLoops use different objects
with closing(IOLoop()) as io_loop2:
client1 = self.io_loop.run_sync(make_client)
client2 = io_loop2.run_sync(make_client)
- self.assertTrue(client1 is not client2)
+ self.assertIsNot(client1, client2)
def test_connection_limit(self: typing.Any):
with closing(self.create_client(max_clients=2)) as client:
self.resolve_connect(AF1, "a", True)
conn.on_connect_timeout()
self.assert_pending()
- self.assertEqual(self.streams["a"].closed, False)
+ self.assertFalse(self.streams["a"].closed)
# success stream will be pop
self.assertEqual(len(conn.streams), 0)
# streams in connector should be closed after connect timeout
self.assertEqual(result, b"013456")
def test_break_outside_loop(self):
- try:
+ with self.assertRaises(ParseError, msg="Did not get expected exception"):
Template(utf8("{% break %}"))
- raise Exception("Did not get expected exception")
- except ParseError:
- pass
def test_break_in_apply(self):
# This test verifies current behavior, although of course it would
# be nice if apply didn't cause seemingly unrelated breakage
- try:
+ with self.assertRaises(ParseError, msg="Did not get expected exception"):
Template(
utf8("{% for i in [] %}{% apply foo %}{% break %}{% end %}{% end %}")
)
- raise Exception("Did not get expected exception")
- except ParseError:
- pass
@unittest.skip("no testable future imports")
def test_no_inherit_future(self):
self.fail("did not get expected exception")
except ZeroDivisionError:
exc_stack = traceback.format_exc()
- self.assertTrue("# base.html:1" in exc_stack)
+ self.assertIn("# base.html:1", exc_stack)
def test_error_line_number_extends_sub_error(self):
loader = DictLoader(
loader.load("sub.html").generate()
self.fail("did not get expected exception")
except ZeroDivisionError:
- self.assertTrue("# sub.html:4 (via base.html:1)" in traceback.format_exc())
+ self.assertIn("# sub.html:4 (via base.html:1)", traceback.format_exc())
def test_multi_includes(self):
loader = DictLoader(
loader.load("a.html").generate()
self.fail("did not get expected exception")
except ZeroDivisionError:
- self.assertTrue(
- "# c.html:1 (via b.html:1, a.html:1)" in traceback.format_exc()
- )
+ self.assertIn("# c.html:1 (via b.html:1, a.html:1)", traceback.format_exc())
class ParseErrorDetailTest(unittest.TestCase):
# let us access attributes of the base class.
obj = cast(TestConfig1, TestConfigurable())
self.assertIsInstance(obj, TestConfig1)
- self.assertIs(obj.a, None)
+ self.assertIsNone(obj.a)
obj = cast(TestConfig1, TestConfigurable(a=1))
self.assertIsInstance(obj, TestConfig1)
TestConfigurable.configure(TestConfig2)
obj = cast(TestConfig2, TestConfigurable())
self.assertIsInstance(obj, TestConfig2)
- self.assertIs(obj.b, None)
+ self.assertIsNone(obj.b)
obj = cast(TestConfig2, TestConfigurable(b=2))
self.assertIsInstance(obj, TestConfig2)
TestConfigurable.configure("tornado.test.util_test.TestConfig2")
obj = cast(TestConfig2, TestConfigurable())
self.assertIsInstance(obj, TestConfig2)
- self.assertIs(obj.b, None)
+ self.assertIsNone(obj.b)
obj = cast(TestConfig2, TestConfigurable(b=2))
self.assertIsInstance(obj, TestConfig2)
self.checkSubclasses()
# args bound in configure don't apply when using the subclass directly
obj = TestConfig1()
- self.assertIs(obj.a, None)
+ self.assertIsNone(obj.a)
def test_config_class_args(self):
TestConfigurable.configure(TestConfig2, b=5)
self.checkSubclasses()
# args bound in configure don't apply when using the subclass directly
obj = TestConfig2()
- self.assertIs(obj.b, None)
+ self.assertIsNone(obj.b)
def test_config_multi_level(self):
TestConfigurable.configure(TestConfig3, a=1)
def test_omitted(self):
args = (1, 2)
kwargs = dict() # type: Dict[str, Any]
- self.assertIs(self.replacer.get_old_value(args, kwargs), None)
+ self.assertIsNone(self.replacer.get_old_value(args, kwargs))
self.assertEqual(
self.replacer.replace("new", args, kwargs),
(None, (1, 2), dict(callback="new")),
handler.set_signed_cookie("foo", binascii.a2b_hex(b"d76df8e7aefc"), version=1)
cookie = handler._cookies["foo"]
match = re.match(rb"12345678\|([0-9]+)\|([0-9a-f]+)", cookie)
- assert match is not None
+ self.assertIsNotNone(match)
timestamp = match.group(1)
sig = match.group(2)
self.assertEqual(
)
# it gets rejected
with ExpectLog(gen_log, "Cookie timestamp in future"):
- self.assertTrue(handler.get_signed_cookie("foo", min_version=1) is None)
+ self.assertIsNone(handler.get_signed_cookie("foo", min_version=1))
def test_arbitrary_bytes(self):
# Secure cookies accept arbitrary data (which is base64 encoded).
self.assertEqual(headers[0], 'equals="a=b"; Path=/')
self.assertEqual(headers[1], 'quote="a\\"b"; Path=/')
# python 2.7 octal-escapes the semicolon; older versions leave it alone
- self.assertTrue(
- headers[2] in ('semicolon="a;b"; Path=/', 'semicolon="a\\073b"; Path=/'),
+ self.assertIn(
+ headers[2],
+ ('semicolon="a;b"; Path=/', 'semicolon="a\\073b"; Path=/'),
headers[2],
)
def test_set_cookie_expires_days(self):
response = self.fetch("/set_expires_days")
header = response.headers.get("Set-Cookie")
- assert header is not None
+ self.assertIsNotNone(header)
match = re.match("foo=bar; expires=(?P<expires>.+); Path=/", header)
- assert match is not None
+ self.assertIsNotNone(match)
expires = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
days=10
)
header_expires = email.utils.parsedate_to_datetime(match.groupdict()["expires"])
- self.assertTrue(abs((expires - header_expires).total_seconds()) < 10)
+ self.assertLess(abs((expires - header_expires).total_seconds()), 10)
def test_set_cookie_false_flags(self):
response = self.fetch("/set_falsy_flags")
with ExpectLog(app_log, "Uncaught exception"):
response = self.fetch("/default")
self.assertEqual(response.code, 500)
- self.assertTrue(b"500: Internal Server Error" in response.body)
+ self.assertIn(b"500: Internal Server Error", response.body)
response = self.fetch("/default?status=503")
self.assertEqual(response.code, 503)
- self.assertTrue(b"503: Service Unavailable" in response.body)
+ self.assertIn(b"503: Service Unavailable", response.body)
response = self.fetch("/default?status=435")
self.assertEqual(response.code, 435)
- self.assertTrue(b"435: Unknown" in response.body)
+ self.assertIn(b"435: Unknown", response.body)
def test_write_error(self):
with ExpectLog(app_log, "Uncaught exception"):
def test_static_files(self):
response = self.fetch("/robots.txt")
- self.assertTrue(b"Disallow: /" in response.body)
+ self.assertIn(b"Disallow: /", response.body)
response = self.fetch("/static/robots.txt")
- self.assertTrue(b"Disallow: /" in response.body)
+ self.assertIn(b"Disallow: /", response.body)
self.assertEqual(response.headers.get("Content-Type"), "text/plain")
def test_static_files_cacheable(self):
# test is pretty weak but it gives us coverage of the code path which
# was important for detecting the deprecation of datetime.utcnow.
response = self.fetch("/robots.txt?v=12345")
- self.assertTrue(b"Disallow: /" in response.body)
+ self.assertIn(b"Disallow: /", response.body)
self.assertIn("Cache-Control", response.headers)
self.assertIn("Expires", response.headers)
)
# make sure the uncompressed file still has the correct type
response = self.fetch("/static/sample.xml")
- self.assertTrue(
- response.headers.get("Content-Type") in set(("text/xml", "application/xml"))
+ self.assertIn(
+ response.headers.get("Content-Type"), set(("text/xml", "application/xml"))
)
def test_static_url(self):
headers={"If-Modified-Since": response1.headers["Last-Modified"]},
)
self.assertEqual(response2.code, 304)
- self.assertTrue("Content-Length" not in response2.headers)
+ self.assertNotIn("Content-Length", response2.headers)
def test_static_304_if_none_match(self):
response1 = self.get_and_head("/static/robots.txt")
with open(robots_file_path, encoding="utf-8") as f:
self.assertEqual(response.body, utf8(f.read()))
self.assertEqual(response.headers.get("Content-Length"), "26")
- self.assertEqual(response.headers.get("Content-Range"), None)
+ self.assertIsNone(response.headers.get("Content-Range"))
def test_static_with_range_full_past_end(self):
response = self.get_and_head(
with open(robots_file_path, encoding="utf-8") as f:
self.assertEqual(response.body, utf8(f.read()))
self.assertEqual(response.headers.get("Content-Length"), "26")
- self.assertEqual(response.headers.get("Content-Range"), None)
+ self.assertIsNone(response.headers.get("Content-Range"))
def test_static_with_range_partial_past_end(self):
response = self.get_and_head(
with open(robots_file_path, encoding="utf-8") as f:
self.assertEqual(response.body, utf8(f.read()))
self.assertEqual(response.headers.get("Content-Length"), "26")
- self.assertEqual(response.headers.get("Content-Range"), None)
+ self.assertIsNone(response.headers.get("Content-Range"))
def test_static_invalid_range(self):
response = self.get_and_head("/static/robots.txt", headers={"Range": "asdf"})
)
self.assertEqual(response.code, 304)
self.assertEqual(response.body, b"")
- self.assertTrue("Content-Length" not in response.headers)
+ self.assertNotIn("Content-Length", response.headers)
self.assertEqual(
utf8(response.headers["Etag"]), b'"' + self.robots_txt_hash + b'"'
)
def test_clear_header(self):
response = self.fetch("/")
- self.assertTrue("h1" not in response.headers)
+ self.assertNotIn("h1", response.headers)
self.assertEqual(response.headers["h2"], "bar")
"/", headers={"If-None-Match": response1.headers["Etag"]}
)
self.assertEqual(response2.code, 304)
- self.assertTrue("Content-Length" not in response2.headers)
- self.assertTrue("Content-Language" not in response2.headers)
+ self.assertNotIn("Content-Length", response2.headers)
+ self.assertNotIn("Content-Language", response2.headers)
# Not an entity header, but should not be added to 304s by chunking
- self.assertTrue("Transfer-Encoding" not in response2.headers)
+ self.assertNotIn("Transfer-Encoding", response2.headers)
class StatusReasonTest(SimpleHandlerTestCase):
def test_date_header(self):
response = self.fetch("/")
header_date = email.utils.parsedate_to_datetime(response.headers["Date"])
- self.assertTrue(
- header_date - datetime.datetime.now(datetime.timezone.utc)
- < datetime.timedelta(seconds=2)
+ self.assertLess(
+ header_date - datetime.datetime.now(datetime.timezone.utc),
+ datetime.timedelta(seconds=2),
)
decoded1 = decode_signed_value(
SignedValueTest.SECRET, "key2", signed1, clock=self.present
)
- self.assertIs(decoded1, None)
+ self.assertIsNone(decoded1)
decoded2 = decode_signed_value(
SignedValueTest.SECRET, "key1", signed2, clock=self.present
)
- self.assertIs(decoded2, None)
+ self.assertIsNone(decoded2)
def test_expired(self):
signed = create_signed_value(
decoded_present = decode_signed_value(
SignedValueTest.SECRET, "key1", signed, clock=self.present
)
- self.assertIs(decoded_present, None)
+ self.assertIsNone(decoded_present)
def test_payload_tampering(self):
# These cookies are variants of the one in test_known_values.
newkeys = SignedValueTest.SECRET_DICT.copy()
newkeys.pop(0)
decoded = decode_signed_value(newkeys, "key", signed, clock=self.present)
- self.assertEqual(None, decoded)
+ self.assertIsNone(decoded)
def test_key_version_retrieval(self):
value = b"\xe9"
ws.write_message("hello")
with ExpectLog(app_log, "Uncaught exception"):
response = yield ws.read_message()
- self.assertIs(response, None)
+ self.assertIsNone(response)
@gen_test
def test_websocket_http_fail(self):