ignore_trailing=ignore_trailing,
raise_on_truncation=raise_on_truncation,
)
+ except dns.message.Truncated as e:
+ # See the comment in query.py for details.
+ if (
+ ignore_errors
+ and query is not None
+ and not query.is_response(e.message())
+ ):
+ continue
+ else:
+ raise
except Exception:
if ignore_errors:
continue
ignore_trailing=ignore_trailing,
raise_on_truncation=raise_on_truncation,
)
+ except dns.message.Truncated as e:
+ # If we got Truncated and not FORMERR, we at least got the header with TC
+ # set, and very likely the question section, so we'll re-raise if the
+ # message seems to be a response as we need to know when truncation happens.
+ # We need to check that it seems to be a response as we don't want a random
+ # injected message with TC set to cause us to bail out.
+ if (
+ ignore_errors
+ and query is not None
+ and not query.is_response(e.message())
+ ):
+ continue
+ else:
+ raise
except Exception:
if ignore_errors:
continue
from2,
ignore_unexpected=True,
ignore_errors=True,
+ raise_on_truncation=False,
+ good_r=None,
):
+ if good_r is None:
+ good_r = self.good_r
s = MockSock(wire1, from1, wire2, from2)
(r, when, _) = await dns.asyncquery.receive_udp(
s,
time.time() + 2,
ignore_unexpected=ignore_unexpected,
ignore_errors=ignore_errors,
+ raise_on_truncation=raise_on_truncation,
query=self.q,
)
- self.assertEqual(r, self.good_r)
+ self.assertEqual(r, good_r)
def test_good_mock(self):
async def run():
self.async_run(run)
+ def test_good_wire_with_truncation_flag_and_no_truncation_raise(self):
+ async def run():
+ tc_r = dns.message.make_response(self.q)
+ tc_r.flags |= dns.flags.TC
+ tc_r_wire = tc_r.to_wire()
+ await self.mock_receive(
+ tc_r_wire, ("127.0.0.1", 53), None, None, good_r=tc_r
+ )
+
+ self.async_run(run)
+
+ def test_good_wire_with_truncation_flag_and_truncation_raise(self):
+ async def agood():
+ tc_r = dns.message.make_response(self.q)
+ tc_r.flags |= dns.flags.TC
+ tc_r_wire = tc_r.to_wire()
+ await self.mock_receive(
+ tc_r_wire, ("127.0.0.1", 53), None, None, raise_on_truncation=True
+ )
+
+ def good():
+ self.async_run(agood)
+
+ self.assertRaises(dns.message.Truncated, good)
+
+ def test_wrong_id_wire_with_truncation_flag_and_no_truncation_raise(self):
+ async def run():
+ bad_r = dns.message.make_response(self.q)
+ bad_r.id += 1
+ bad_r.flags |= dns.flags.TC
+ bad_r_wire = bad_r.to_wire()
+ await self.mock_receive(
+ bad_r_wire, ("127.0.0.1", 53), self.good_r_wire, ("127.0.0.1", 53)
+ )
+
+ self.async_run(run)
+
+ def test_wrong_id_wire_with_truncation_flag_and_truncation_raise(self):
+ async def run():
+ bad_r = dns.message.make_response(self.q)
+ bad_r.id += 1
+ bad_r.flags |= dns.flags.TC
+ bad_r_wire = bad_r.to_wire()
+ await self.mock_receive(
+ bad_r_wire,
+ ("127.0.0.1", 53),
+ self.good_r_wire,
+ ("127.0.0.1", 53),
+ raise_on_truncation=True,
+ )
+
+ self.async_run(run)
+
def test_bad_wire_not_ignored(self):
bad_r = dns.message.make_response(self.q)
bad_r.id += 1
have_ssl = False
import dns.exception
+import dns.flags
import dns.inet
import dns.message
import dns.name
from2,
ignore_unexpected=True,
ignore_errors=True,
+ raise_on_truncation=False,
+ good_r=None,
):
+ if good_r is None:
+ good_r = self.good_r
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
with mock_udp_recv(wire1, from1, wire2, from2):
time.time() + 2,
ignore_unexpected=ignore_unexpected,
ignore_errors=ignore_errors,
+ raise_on_truncation=raise_on_truncation,
query=self.q,
)
- self.assertEqual(r, self.good_r)
+ self.assertEqual(r, good_r)
finally:
s.close()
bad_r_wire[:10], ("127.0.0.1", 53), self.good_r_wire, ("127.0.0.1", 53)
)
+ def test_good_wire_with_truncation_flag_and_no_truncation_raise(self):
+ tc_r = dns.message.make_response(self.q)
+ tc_r.flags |= dns.flags.TC
+ tc_r_wire = tc_r.to_wire()
+ self.mock_receive(tc_r_wire, ("127.0.0.1", 53), None, None, good_r=tc_r)
+
+ def test_good_wire_with_truncation_flag_and_truncation_raise(self):
+ def good():
+ tc_r = dns.message.make_response(self.q)
+ tc_r.flags |= dns.flags.TC
+ tc_r_wire = tc_r.to_wire()
+ self.mock_receive(
+ tc_r_wire, ("127.0.0.1", 53), None, None, raise_on_truncation=True
+ )
+
+ self.assertRaises(dns.message.Truncated, good)
+
+ def test_wrong_id_wire_with_truncation_flag_and_no_truncation_raise(self):
+ bad_r = dns.message.make_response(self.q)
+ bad_r.id += 1
+ bad_r.flags |= dns.flags.TC
+ bad_r_wire = bad_r.to_wire()
+ self.mock_receive(
+ bad_r_wire, ("127.0.0.1", 53), self.good_r_wire, ("127.0.0.1", 53)
+ )
+
+ def test_wrong_id_wire_with_truncation_flag_and_truncation_raise(self):
+ bad_r = dns.message.make_response(self.q)
+ bad_r.id += 1
+ bad_r.flags |= dns.flags.TC
+ bad_r_wire = bad_r.to_wire()
+ self.mock_receive(
+ bad_r_wire, ("127.0.0.1", 53), self.good_r_wire, ("127.0.0.1", 53),
+ raise_on_truncation=True
+ )
+
def test_bad_wire_not_ignored(self):
bad_r = dns.message.make_response(self.q)
bad_r.id += 1