af = None
if af is not None and dns.inet.is_address(where):
if af == socket.AF_INET:
- url = "https://{}:{}{}".format(where, port, path)
+ url = f"https://{where}:{port}{path}"
elif af == socket.AF_INET6:
- url = "https://[{}]:{}{}".format(where, port, path)
+ url = f"https://[{where}]:{port}{path}"
else:
url = where
- if http_version == HTTPVersion.H3 or (http_version == HTTPVersion.DEFAULT and not have_doh):
+ if http_version == HTTPVersion.H3 or (
+ http_version == HTTPVersion.DEFAULT and not have_doh
+ ):
if bootstrap_address is None:
parsed = urllib.parse.urlparse(url)
resolver = _maybe_get_resolver(resolver)
if client:
cm: contextlib.AbstractAsyncContextManager = NullContext(client)
else:
- cm = httpx.AsyncClient(
- http1=h1, http2=h2, verify=verify, transport=transport
- )
+ cm = httpx.AsyncClient(http1=h1, http2=h2, verify=verify, transport=transport)
async with cm as the_client:
# see https://tools.ietf.org/html/rfc8484#section-4.1.1 for DoH
# status codes
if response.status_code < 200 or response.status_code > 299:
raise ValueError(
- "{} responded with status code {}"
- "\nResponse body: {!r}".format(
- where, response.status_code, response.content
- )
+ f"{where} responded with status code {response.status_code}"
+ f"\nResponse body: {response.content!r}"
)
r = dns.message.from_wire(
response.content,
self.addrdata = self.addrdata[:-1] + last
def to_text(self) -> str:
- return "ECS {}/{} scope/{}".format(self.address, self.srclen, self.scopelen)
+ return f"ECS {self.address}/{self.srclen} scope/{self.scopelen}"
@staticmethod
def from_text(text: str) -> Option:
ecs_text = tokens[0]
elif len(tokens) == 2:
if tokens[0] != optional_prefix:
- raise ValueError('could not parse ECS from "{}"'.format(text))
+ raise ValueError(f'could not parse ECS from "{text}"')
ecs_text = tokens[1]
else:
- raise ValueError('could not parse ECS from "{}"'.format(text))
+ raise ValueError(f'could not parse ECS from "{text}"')
n_slashes = ecs_text.count("/")
if n_slashes == 1:
address, tsrclen = ecs_text.split("/")
elif n_slashes == 2:
address, tsrclen, tscope = ecs_text.split("/")
else:
- raise ValueError('could not parse ECS from "{}"'.format(text))
+ raise ValueError(f'could not parse ECS from "{text}"')
try:
scope = int(tscope)
except ValueError:
- raise ValueError(
- "invalid scope " + '"{}": scope must be an integer'.format(tscope)
- )
+ raise ValueError("invalid scope " + f'"{tscope}": scope must be an integer')
try:
srclen = int(tsrclen)
except ValueError:
raise ValueError(
- "invalid srclen " + '"{}": srclen must be an integer'.format(tsrclen)
+ "invalid srclen " + f'"{tsrclen}": srclen must be an integer'
)
return ECSOption(address, srclen, scope)
if m is not None:
b = dns.ipv4.inet_aton(m.group(2))
btext = (
- "{}:{:02x}{:02x}:{:02x}{:02x}".format(
- m.group(1).decode(), b[0], b[1], b[2], b[3]
- )
+ f"{m.group(1).decode()}:{b[0]:02x}{b[1]:02x}:{b[2]:02x}{b[3]:02x}"
).encode()
#
# Try to turn '::<whatever>' into ':<whatever>'; if no match try to
)
if af is not None and dns.inet.is_address(where):
if af == socket.AF_INET:
- url = "https://{}:{}{}".format(where, port, path)
+ url = f"https://{where}:{port}{path}"
elif af == socket.AF_INET6:
- url = "https://[{}]:{}{}".format(where, port, path)
+ url = f"https://[{where}]:{port}{path}"
else:
url = where
- if http_version == HTTPVersion.H3 or (http_version == HTTPVersion.DEFAULT and not have_doh):
+ if http_version == HTTPVersion.H3 or (
+ http_version == HTTPVersion.DEFAULT and not have_doh
+ ):
if bootstrap_address is None:
parsed = urllib.parse.urlparse(url)
resolver = _maybe_get_resolver(resolver)
# status codes
if response.status_code < 200 or response.status_code > 299:
raise ValueError(
- "{} responded with status code {}"
- "\nResponse body: {}".format(where, response.status_code, response.content)
+ f"{where} responded with status code {response.status_code}"
+ f"\nResponse body: {response.content}"
)
r = dns.message.from_wire(
response.content,
continue
if key not in parameters:
raise AttributeError(
- "'{}' object has no attribute '{}'".format(
- self.__class__.__name__, key
- )
+ f"'{self.__class__.__name__}' object has no attribute '{key}'"
)
if key in ("rdclass", "rdtype"):
raise AttributeError(
- "Cannot overwrite '{}' attribute '{}'".format(
- self.__class__.__name__, key
- )
+ f"Cannot overwrite '{self.__class__.__name__}' attribute '{key}'"
)
# Construct the parameter list. For each field, use the value in
# (which is meaningless anyway).
#
s.write(
- "{}{}{} {}\n".format(
- ntext,
- pad,
- dns.rdataclass.to_text(rdclass),
- dns.rdatatype.to_text(self.rdtype),
- )
+ f"{ntext}{pad}{dns.rdataclass.to_text(rdclass)} {dns.rdatatype.to_text(self.rdtype)}\n"
)
else:
for rd in self:
raise dns.exception.FormError("bad longitude")
def to_text(self, origin=None, relativize=True, **kw):
- return "{} {} {}".format(
- self.latitude.decode(), self.longitude.decode(), self.altitude.decode()
- )
+ return f"{self.latitude.decode()} {self.longitude.decode()} {self.altitude.decode()}"
@classmethod
def from_text(
self.os = self._as_bytes(os, True, 255)
def to_text(self, origin=None, relativize=True, **kw):
- return '"{}" "{}"'.format(
- dns.rdata._escapify(self.cpu), dns.rdata._escapify(self.os)
- )
+ return f'"{dns.rdata._escapify(self.cpu)}" "{dns.rdata._escapify(self.os)}"'
@classmethod
def from_text(
def to_text(self, origin=None, relativize=True, **kw):
if self.subaddress:
- return '"{}" "{}"'.format(
- dns.rdata._escapify(self.address), dns.rdata._escapify(self.subaddress)
- )
+ return f'"{dns.rdata._escapify(self.address)}" "{dns.rdata._escapify(self.subaddress)}"'
else:
return '"%s"' % dns.rdata._escapify(self.address)
or self.horizontal_precision != _default_hprec
or self.vertical_precision != _default_vprec
):
- text += " {:0.2f}m {:0.2f}m {:0.2f}m".format(
- self.size / 100.0,
- self.horizontal_precision / 100.0,
- self.vertical_precision / 100.0,
- )
+ text += f" {self.size / 100.0:0.2f}m {self.horizontal_precision / 100.0:0.2f}m {self.vertical_precision / 100.0:0.2f}m"
return text
@classmethod
def to_text(self, origin=None, relativize=True, **kw):
next = self.next.choose_relativity(origin, relativize)
text = Bitmap(self.windows).to_text()
- return "{}{}".format(next, text)
+ return f"{next}{text}"
@classmethod
def from_text(
def to_text(self, origin=None, relativize=True, **kw):
mbox = self.mbox.choose_relativity(origin, relativize)
txt = self.txt.choose_relativity(origin, relativize)
- return "{} {}".format(str(mbox), str(txt))
+ return f"{str(mbox)} {str(txt)}"
@classmethod
def from_text(
import dns.immutable
import dns.rdtypes.txtbase
+
@dns.immutable.immutable
class WALLET(dns.rdtypes.txtbase.TXTBase):
"""WALLET record"""
txt = ""
prefix = ""
for s in self.strings:
- txt += '{}"{}"'.format(prefix, dns.rdata._escapify(s))
+ txt += f'{prefix}"{dns.rdata._escapify(s)}"'
prefix = " "
return txt
else:
msg = "The DNS query name does not exist"
qnames = ", ".join(map(str, qnames))
- return "{}: {}".format(msg, qnames)
+ return f"{msg}: {qnames}"
@property
def canonical_name(self):
"""Turn a resolution errors trace into a list of text."""
texts = []
for err in errors:
- texts.append("Server {} answered {}".format(err[0], err[3]))
+ texts.append(f"Server {err[0]} answered {err[3]}")
return texts
enriched_nameservers.append(enriched_nameserver)
else:
raise ValueError(
- "nameservers must be a list or tuple (not a {})".format(
- type(nameservers)
- )
+ f"nameservers must be a list or tuple (not a {type(nameservers)})"
)
return enriched_nameservers
# We convert them to syntax errors so that we can emit
# helpful filename:line info.
(ty, va) = sys.exc_info()[:2]
- raise dns.exception.SyntaxError(
- "caught exception {}: {}".format(str(ty), str(va))
- )
+ raise dns.exception.SyntaxError(f"caught exception {str(ty)}: {str(va)}")
if not self.default_ttl_known and rdtype == dns.rdatatype.SOA:
# The pre-RFC2308 and pre-BIND9 behavior inherits the zone default
"B904",
"B011",
"UP031",
- "UP032",
]
lint.exclude = ["tests/*"]
@unittest.skipIf(not dns.quic.have_quic, "aioquic not available")
def TestDoH3QueryIP(self):
async def run():
- nameserver_ip = '8.8.8.8'
+ nameserver_ip = "8.8.8.8"
q = dns.message.make_query("example.com.", dns.rdatatype.A)
r = dns.asyncquery.https(
q,
)
self.assertTrue(q.is_response(r))
+
if __name__ == "__main__":
unittest.main()