check: test
-type:
+mypy:
python -m mypy --disallow-incomplete-defs dns
pyright:
pyright dns
+ty:
+ ty check dns
+
+type:
+ pyright dns
+ ty check dns
+
ruff:
ruff check dns
resolver = dns.asyncresolver.Resolver()
super().__init__(*args, **kwargs)
- self._pool._network_backend = _NetworkBackend(
+ self._pool._network_backend = _NetworkBackend( # type: ignore
resolver, local_port, bootstrap_address, family
)
def make_tls_context(self):
ssl = dns.query.ssl
ctx = ssl.create_default_context()
- ctx.minimum_version = ssl.TLSVersion.TLSv1_2
+ ctx.minimum_version = ssl.TLSVersion.TLSv1_2 # type: ignore
return ctx
def ddr_tls_check_sync(self, lifetime):
resolver = dns.asyncresolver.Resolver()
super().__init__(*args, **kwargs)
- self._pool._network_backend = _NetworkBackend(
+ self._pool._network_backend = _NetworkBackend( # type: ignore
resolver, local_port, bootstrap_address, family
)
def __init__(
self,
- text: str,
+ text: Any,
idna_codec: dns.name.IDNACodec | None,
one_rr_per_rrset: bool = False,
origin: dns.name.Name | None = None,
else:
cm = contextlib.nullcontext(f)
with cm as f:
- return from_text(f, idna_codec, one_rr_per_rrset)
+ reader = _TextReader(f, idna_codec, one_rr_per_rrset)
+ return reader.read()
assert False # for mypy lgtm[py/unreachable-statement]
resolver = dns.resolver.Resolver()
super().__init__(*args, **kwargs)
- self._pool._network_backend = _NetworkBackend(
+ self._pool._network_backend = _NetworkBackend( # type: ignore
resolver, local_port, bootstrap_address, family
)
def update_ttl(self, ttl):
raise TypeError("immutable")
- def add(self, rd, ttl=None):
+ def add(self, rd, ttl=None): # type: ignore
raise TypeError("immutable")
def union_update(self, other):
else:
cm = contextlib.nullcontext(f)
with cm as f:
+ assert f is not None
for l in f:
if len(l) == 0 or l[0] == "#" or l[0] == ";":
continue
resolver = get_default_resolver()
global _resolver
_resolver = resolver
- socket.getaddrinfo = _getaddrinfo
- socket.getnameinfo = _getnameinfo
- socket.getfqdn = _getfqdn
- socket.gethostbyname = _gethostbyname
- socket.gethostbyname_ex = _gethostbyname_ex
- socket.gethostbyaddr = _gethostbyaddr
+ socket.getaddrinfo = _getaddrinfo # type: ignore
+ socket.getnameinfo = _getnameinfo # type: ignore
+ socket.getfqdn = _getfqdn # type: ignore
+ socket.gethostbyname = _gethostbyname # type: ignore
+ socket.gethostbyname_ex = _gethostbyname_ex # type: ignore
+ socket.gethostbyaddr = _gethostbyaddr # type: ignore
def restore_system_resolver() -> None:
global _resolver
_resolver = None
- socket.getaddrinfo = _original_getaddrinfo
- socket.getnameinfo = _original_getnameinfo
- socket.getfqdn = _original_getfqdn
- socket.gethostbyname = _original_gethostbyname
- socket.gethostbyname_ex = _original_gethostbyname_ex
- socket.gethostbyaddr = _original_gethostbyaddr
+ socket.getaddrinfo = _original_getaddrinfo # type: ignore
+ socket.getnameinfo = _original_getnameinfo # type: ignore
+ socket.getfqdn = _original_getfqdn # type: ignore
+ socket.gethostbyname = _original_gethostbyname # type: ignore
+ socket.gethostbyname_ex = _original_gethostbyname_ex # type: ignore
+ socket.gethostbyaddr = _original_gethostbyaddr # type: ignore
raise ValueError("origin parameter must be convertible to a DNS name")
if not origin.is_absolute():
raise ValueError("origin parameter must be an absolute name")
- self.origin = origin
+ self.origin: dns.name.Name | None = origin
self.rdclass = rdclass
self.nodes: MutableMapping[dns.name.Name, dns.node.Node] = self.map_factory()
self.relativize = relativize
f.write(l_b)
f.write(nl_b)
except TypeError: # textual mode
- f.write(l)
- f.write(nl)
+ f.write(l) # type: ignore
+ f.write(nl) # type: ignore
if sorted:
names = list(self.keys())
f.write(l_b)
f.write(nl_b)
except TypeError: # textual mode
- f.write(l)
- f.write(nl)
+ f.write(l) # type: ignore
+ f.write(nl) # type: ignore
def to_text(
self,
class Transaction(dns.transaction.Transaction):
- def __init__(self, zone, replacement, version=None, make_immutable=False):
+ def __init__(
+ self,
+ zone: Zone,
+ replacement: bool,
+ version: Version | None = None,
+ make_immutable: bool = False,
+ ):
read_only = version is not None
super().__init__(zone, replacement, read_only)
self.version = version
self.make_immutable = make_immutable
@property
- def zone(self):
- return self.manager
+ def zone(self) -> Zone:
+ return cast(Zone, self.manager)
def _setup_version(self):
assert self.version is None
def _put_rdataset(self, name, rdataset):
assert not self.read_only
assert self.version is not None
- self.version.put_rdataset(name, rdataset)
+ version = cast(WritableVersion, self.version)
+ version.put_rdataset(name, rdataset)
def _delete_name(self, name):
assert not self.read_only
assert self.version is not None
- self.version.delete_node(name)
+ version = cast(WritableVersion, self.version)
+ version.delete_node(name)
def _delete_rdataset(self, name, rdtype, covers):
assert not self.read_only
assert self.version is not None
- self.version.delete_rdataset(name, rdtype, covers)
+ version = cast(WritableVersion, self.version)
+ version.delete_rdataset(name, rdtype, covers)
def _name_exists(self, name):
assert self.version is not None
return False
else:
assert self.version is not None
- return len(self.version.changed) > 0
+ version = cast(WritableVersion, self.version)
+ return len(version.changed) > 0
def _end_transaction(self, commit):
assert self.zone is not None
assert self.version is not None
if self.read_only:
self.zone._end_read(self) # type: ignore
- elif commit and len(self.version.changed) > 0:
+ elif commit and len(cast(WritableVersion, self.version).changed) > 0:
if self.make_immutable:
factory = self.manager.immutable_version_factory # type: ignore
if factory is None:
assert self.version is not None
return self.version.get_node(name)
- def _origin_information(self):
+ def _origin_information(
+ self,
+ ) -> Tuple[dns.name.Name | None, bool, dns.name.Name | None]:
assert self.version is not None
(absolute, relativize, effective) = self.manager.origin_information()
if absolute is None and self.version.origin is not None:
raise dns.exception.SyntaxError
return token
- def _rr_line(self):
+ def _rr_line(self) -> None:
"""Process one line from a DNS zone file."""
token: dns.tokenizer.Token
+ name: dns.name.Name | None
# Name
if self.force_name is not None:
name = self.force_name
"sphinx>=8.2.0 ; python_version >= '3.11'",
"sphinx-rtd-theme>=3.0.0 ; python_full_version >= '3.11'",
"twine>=6.1.0",
+ "ty>=0.0.1a32",
"wheel>=0.45.0",
]
dnssec = ["cryptography>=45"]