import httpx
_CoreAsyncNetworkBackend = httpcore.AsyncNetworkBackend
- _CoreAnyIOStream = httpcore._backends.anyio.AnyIOStream # type: ignore
+ _CoreAnyIOStream = httpcore._backends.anyio.AnyIOStream # pyright: ignore
from dns.query import _compute_times, _expiration_for_this_attempt, _remaining
)
else:
- _HTTPTransport = dns._asyncbackend.NullTransport # type: ignore
+ _HTTPTransport = dns._asyncbackend.NullTransport # pyright: ignore
class Backend(dns._asyncbackend.Backend):
# proper fix for [#637].
source = (dns.inet.any_for_af(af), 0)
transport, protocol = await loop.create_datagram_endpoint(
- _DatagramProtocol, # type: ignore
+ _DatagramProtocol, # pyright: ignore
local_addr=source,
family=af,
proto=proto,
self.check_hostname: bool = False
self.verify_mode: int = CERT_NONE
- def wrap_socket(self, *args, **kwargs) -> "SSLSocket": # type: ignore
+ def wrap_socket(self, *args, **kwargs) -> "SSLSocket": # pyright: ignore
raise Exception("no ssl support") # pylint: disable=broad-exception-raised
- def set_alpn_protocols(self, *args, **kwargs): # type: ignore
+ def set_alpn_protocols(self, *args, **kwargs): # pyright: ignore
raise Exception("no ssl support") # pylint: disable=broad-exception-raised
return False
-def create_default_context(*args, **kwargs) -> SSLContext: # type: ignore
+def create_default_context(*args, **kwargs) -> SSLContext: # pyright: ignore
raise Exception("no ssl support") # pylint: disable=broad-exception-raised
import socket
import trio
-import trio.socket # type: ignore
+import trio.socket # pyright: ignore
import dns._asyncbackend
import dns._features
)
else:
- _HTTPTransport = dns._asyncbackend.NullTransport # type: ignore
+ _HTTPTransport = dns._asyncbackend.NullTransport # pyright: ignore
class Backend(dns._asyncbackend.Backend):
try:
import ssl
except ImportError:
- import dns._no_ssl as ssl # type: ignore
+ import dns._no_ssl as ssl # pyright: ignore
if have_doh:
import httpx
dtuple = None
cm = await backend.make_socket(af, socket.SOCK_DGRAM, 0, stuple, dtuple)
async with cm as s:
- await send_udp(s, wire, destination, expiration) # type: ignore
+ await send_udp(s, wire, destination, expiration) # pyright: ignore
(r, received_time, _) = await receive_udp(
- s, # type: ignore
+ s, # pyright: ignore
destination,
expiration,
ignore_unexpected,
af, socket.SOCK_STREAM, 0, stuple, dtuple, timeout
)
async with cm as s:
- await send_tcp(s, wire, expiration) # type: ignore
+ await send_tcp(s, wire, expiration) # pyright: ignore
(r, received_time) = await receive_tcp(
- s, # type: ignore
+ s, # pyright: ignore
expiration,
one_rr_per_rrset,
q.keyring,
def _maybe_get_resolver(
- resolver: "dns.asyncresolver.Resolver | None", # type: ignore
-) -> "dns.asyncresolver.Resolver": # type: ignore
+ resolver: "dns.asyncresolver.Resolver | None", # pyright: ignore
+) -> "dns.asyncresolver.Resolver": # pyright: ignore
# We need a separate method for this to avoid overriding the global
# variable "dns" with the as-yet undefined local variable "dns"
# in https().
post: bool = True,
verify: bool | str | ssl.SSLContext = True,
bootstrap_address: str | None = None,
- resolver: "dns.asyncresolver.Resolver | None" = None, # type: ignore
+ resolver: "dns.asyncresolver.Resolver | None" = None, # pyright: ignore
family: int = socket.AF_UNSPEC,
http_version: HTTPVersion = HTTPVersion.DEFAULT,
) -> dns.message.Message:
):
if bootstrap_address is None:
resolver = _maybe_get_resolver(resolver)
- assert parsed.hostname is not None # type: ignore
- answers = await resolver.resolve_name( # type: ignore
- parsed.hostname, family # type: ignore
+ assert parsed.hostname is not None # pyright: ignore
+ answers = await resolver.resolve_name( # pyright: ignore
+ parsed.hostname, family # pyright: ignore
)
bootstrap_address = random.choice(list(answers.addresses()))
if client and not isinstance(
client, dns.quic.AsyncQuicConnection
- ): # type: ignore
+ ): # pyright: ignore
raise ValueError("client parameter must be a dns.quic.AsyncQuicConnection.")
assert client is None or isinstance(client, dns.quic.AsyncQuicConnection)
return await _http3(
if not have_doh:
raise NoDOH # pragma: no cover
# pylint: disable=possibly-used-before-assignment
- if client and not isinstance(client, httpx.AsyncClient): # type: ignore
+ if client and not isinstance(client, httpx.AsyncClient): # pyright: ignore
raise ValueError("client parameter must be an httpx.AsyncClient")
# pylint: enable=possibly-used-before-assignment
family=family,
)
- cm = httpx.AsyncClient( # type: ignore
+ cm = httpx.AsyncClient( # pyright: ignore
http1=h1, http2=h2, verify=verify, transport=transport # type: ignore
)
}
)
response = await backend.wait_for(
- the_client.post( # type: ignore
+ the_client.post( # pyright: ignore
url,
headers=headers,
content=wire,
wire = base64.urlsafe_b64encode(wire).rstrip(b"=")
twire = wire.decode() # httpx does a repr() if we give it bytes
response = await backend.wait_for(
- the_client.get( # type: ignore
+ the_client.get( # pyright: ignore
url,
headers=headers,
params={"dns": twire},
if connection:
the_connection = connection
else:
- the_connection = the_manager.connect( # type: ignore
+ the_connection = the_manager.connect( # pyright: ignore
where, port, source, source_port
)
(start, expiration) = _compute_times(timeout)
server_name=server_hostname,
) as the_manager:
if not connection:
- the_connection = the_manager.connect( # type: ignore
+ the_connection = the_manager.connect( # pyright: ignore
where, port, source, source_port
)
(start, expiration) = _compute_times(timeout)
mexpiration = expiration
if is_udp:
timeout = _timeout(mexpiration)
- (rwire, _) = await udp_sock.recvfrom(65535, timeout) # type: ignore
+ (rwire, _) = await udp_sock.recvfrom(65535, timeout) # pyright: ignore
else:
- ldata = await _read_exactly(tcp_sock, 2, mexpiration) # type: ignore
+ ldata = await _read_exactly(tcp_sock, 2, mexpiration) # pyright: ignore
(l,) = struct.unpack("!H", ldata)
- rwire = await _read_exactly(tcp_sock, l, mexpiration) # type: ignore
+ rwire = await _read_exactly(tcp_sock, l, mexpiration) # pyright: ignore
r = dns.message.from_wire(
rwire,
keyring=query.keyring,
)
async with s:
try:
- async for _ in _inbound_xfr( # type: ignore
+ async for _ in _inbound_xfr( # pyright: ignore
txn_manager,
s,
query,
serial,
timeout,
- expiration, # type: ignore
+ expiration, # pyright: ignore
):
pass
return
af, socket.SOCK_STREAM, 0, stuple, dtuple, _timeout(expiration)
)
async with s:
- async for _ in _inbound_xfr( # type: ignore
- txn_manager, s, query, serial, timeout, expiration # type: ignore
+ async for _ in _inbound_xfr( # pyright: ignore
+ txn_manager, s, query, serial, timeout, expiration # pyright: ignore
):
pass
def __init__(self, node: Node):
super().__init__()
self.id = node.id
- self.rdatasets = tuple( # type: ignore
+ self.rdatasets = tuple( # pyright: ignore
[dns.rdataset.ImmutableRdataset(rds) for rds in node.rdatasets]
)
self.flags = node.flags
return (node, name)
def update_glue_flag(self, name: dns.name.Name, is_glue: bool) -> None:
- cursor = self.nodes.cursor() # type: ignore
+ cursor = self.nodes.cursor() # pyright: ignore
cursor.seek(name, False)
updates = []
while True:
name = self._validate_name(name)
node = self.nodes.get(name)
if node is not None:
- if node.is_delegation(): # type: ignore
+ if node.is_delegation(): # pyright: ignore
self.delegations.discard(name)
self.update_glue_flag(name, False)
del self.nodes[name]
) -> None:
(node, name) = self._maybe_cow_with_name(name)
if (
- rdataset.rdtype == dns.rdatatype.NS and not node.is_origin_or_glue() # type: ignore
+ rdataset.rdtype == dns.rdatatype.NS
+ and not node.is_origin_or_glue() # type: ignore
):
node.flags |= NodeFlags.DELEGATION # type: ignore
if name not in self.delegations:
covers: dns.rdatatype.RdataType,
) -> None:
(node, name) = self._maybe_cow_with_name(name)
- if rdtype == dns.rdatatype.NS and name in self.delegations: # type: ignore
+ if rdtype == dns.rdatatype.NS and name in self.delegations: # pyright: ignore
node.flags &= ~NodeFlags.DELEGATION # type: ignore
- self.delegations.discard(name) # type: ignore
+ self.delegations.discard(name) # pyright: ignore
self.update_glue_flag(name, False)
node.delete_rdataset(self.zone.rdclass, rdtype, covers)
if len(node) == 0:
node = version.nodes.get(name)
if node:
version.nodes[name] = ImmutableNode(node)
- # the cast below is for mypy
self.nodes = cast(MutableMapping[dns.name.Name, dns.node.Node], version.nodes)
self.nodes.make_immutable() # type: ignore
self.delegations = version.delegations
)
from dns.dnssecalgs.base import GenericPrivateKey, GenericPublicKey
- validate = _validate # type: ignore
- validate_rrsig = _validate_rrsig # type: ignore
+ validate = _validate # pyright: ignore
+ validate_rrsig = _validate_rrsig # pyright: ignore
sign = _sign
make_dnskey = _make_dnskey
make_cdnskey = _make_cdnskey
class CryptographyPrivateKey(GenericPrivateKey):
key: Any = None
key_cls: Any = None
- public_cls: type[CryptographyPublicKey] # type: ignore
+ public_cls: type[CryptographyPublicKey] # pyright: ignore
def __init__(self, key: Any) -> None: # pylint: disable=super-init-not-called
if self.key_cls is None:
keyptr = keyptr[octets:]
dsa_y = keyptr[0:octets]
return cls(
- key=dsa.DSAPublicNumbers( # type: ignore
+ key=dsa.DSAPublicNumbers( # pyright: ignore
int.from_bytes(dsa_y, "big"),
dsa.DSAParameterNumbers(
int.from_bytes(dsa_p, "big"),
def generate(cls) -> "PrivateECDSA":
return cls(
key=ec.generate_private_key(
- curve=cls.public_cls.curve, backend=default_backend() # type: ignore
+ curve=cls.public_cls.curve, backend=default_backend() # pyright: ignore
),
)
class PrivateEDDSA(CryptographyPrivateKey):
- public_cls: type[PublicEDDSA] # type: ignore
+ public_cls: type[PublicEDDSA] # pyright: ignore
def sign(
self,
@classmethod
def _missing_(cls, value):
cls._check_value(value)
- val = int.__new__(cls, value) # type: ignore
+ val = int.__new__(cls, value) # pyright: ignore
val._name_ = cls._extra_to_text(value, None) or f"{cls._prefix()}{value}"
- val._value_ = value # type: ignore
+ val._value_ = value # pyright: ignore
return val
@classmethod
return QueryMessage
elif opcode == dns.opcode.UPDATE:
_maybe_import_update()
- return dns.update.UpdateMessage # type: ignore
+ return dns.update.UpdateMessage # pyright: ignore
else:
return Message
else:
with self.parser.restrict_to(rdlen):
rd = dns.rdata.from_wire_parser(
- rdclass, # type: ignore
+ rdclass, # pyright: ignore
rdtype,
self.parser,
self.message.origin,
rrset = self.message.find_rrset(
section,
name,
- rdclass, # type: ignore
+ rdclass, # pyright: ignore
rdtype,
covers,
deleting,
"""DNS Names."""
import copy
-import encodings.idna # type: ignore
+import encodings.idna # pyright: ignore
import functools
import struct
from collections.abc import Callable, Iterable
# pyright: reportAttributeAccessIssue = false, reportPossiblyUnboundVariable = false
if dns._features.have("idna"):
- import idna # type: ignore
+ import idna # pyright: ignore
have_idna_2008 = True
else: # pragma: no cover
s = io.StringIO()
for rds in self.rdatasets:
if len(rds) > 0:
- s.write(rds.to_text(name, **kw)) # type: ignore[arg-type]
+ s.write(rds.to_text(name, **kw)) # pyright: ignore[arg-type]
s.write("\n")
return s.getvalue()[:-1]
try:
import ssl
except ImportError:
- import dns._no_ssl as ssl # type: ignore
+ import dns._no_ssl as ssl # pyright: ignore
def _remaining(expiration):
): # pylint: disable=signature-differs
raise NotImplementedError
- class _HTTPTransport(httpx.HTTPTransport): # type: ignore
+ class _HTTPTransport(httpx.HTTPTransport): # pyright: ignore
def __init__(
self,
*args,
else:
- class _HTTPTransport: # type: ignore
+ class _HTTPTransport: # pyright: ignore
def __init__(
self,
*args,
if writable:
events |= selectors.EVENT_WRITE
if events:
- sel.register(fd, events) # type: ignore
+ sel.register(fd, events) # pyright: ignore
if expiration is None:
timeout = None
else:
def _maybe_get_resolver(
- resolver: "dns.resolver.Resolver | None", # type: ignore
-) -> "dns.resolver.Resolver": # type: ignore
+ resolver: "dns.resolver.Resolver | None", # pyright: ignore
+) -> "dns.resolver.Resolver": # pyright: ignore
# We need a separate method for this to avoid overriding the global
# variable "dns" with the as-yet undefined local variable "dns"
# in https().
post: bool = True,
bootstrap_address: str | None = None,
verify: bool | str | ssl.SSLContext = True,
- resolver: "dns.resolver.Resolver | None" = None, # type: ignore
+ resolver: "dns.resolver.Resolver | None" = None, # pyright: ignore
family: int = socket.AF_UNSPEC,
http_version: HTTPVersion = HTTPVersion.DEFAULT,
) -> dns.message.Message:
):
if bootstrap_address is None:
resolver = _maybe_get_resolver(resolver)
- assert parsed.hostname is not None # type: ignore
- answers = resolver.resolve_name(parsed.hostname, family) # type: ignore
+ assert parsed.hostname is not None # pyright: ignore
+ answers = resolver.resolve_name(parsed.hostname, family) # pyright: ignore
bootstrap_address = random.choice(list(answers.addresses()))
- if session and not isinstance(
- session, dns.quic.SyncQuicConnection
- ): # type: ignore
+ if session and not isinstance(session, dns.quic.SyncQuicConnection):
raise ValueError("session parameter must be a dns.quic.SyncQuicConnection.")
return _http3(
q,
bootstrap_address,
- url, # type: ignore
+ url, # pyright: ignore
timeout,
port,
source,
if not have_doh:
raise NoDOH # pragma: no cover
- if session and not isinstance(session, httpx.Client): # type: ignore
+ if session and not isinstance(session, httpx.Client): # pyright: ignore
raise ValueError("session parameter must be an httpx.Client")
wire = q.to_wire()
local_port=local_port,
bootstrap_address=bootstrap_address,
resolver=resolver,
- family=family, # type: ignore
+ family=family, # pyright: ignore
)
- cm = httpx.Client( # type: ignore
+ cm = httpx.Client( # pyright: ignore
http1=h1, http2=h2, verify=verify, transport=transport # type: ignore
)
with cm as session:
manager: contextlib.AbstractContextManager = contextlib.nullcontext(None)
else:
manager = dns.quic.SyncQuicManager( # type: ignore
- verify_mode=verify, server_name=hostname, h3=True # type: ignore
+ verify_mode=verify, server_name=hostname, h3=True # pyright: ignore
)
the_manager = manager # for type checking happiness
if connection:
the_connection = connection
else:
- the_connection = the_manager.connect( # type: ignore
+ the_connection = the_manager.connect( # pyright: ignore
where, port, source, source_port
)
(start, expiration) = _compute_times(timeout)
with cm as s:
if not sock:
# pylint: disable=possibly-used-before-assignment
- _connect(s, destination, expiration) # type: ignore
+ _connect(s, destination, expiration) # pyright: ignore
send_tcp(s, wire, expiration)
(r, received_time) = receive_tcp(
s, expiration, one_rr_per_rrset, q.keyring, q.mac, ignore_trailing
ssl_context.verify_mode = ssl.CERT_NONE # type: ignore
if alpns is not None:
ssl_context.set_alpn_protocols(alpns)
- return ssl_context # type: ignore
+ return ssl_context # pyright: ignore
# for backwards compatibility
the_connection = connection
else:
manager = dns.quic.SyncQuicManager( # type: ignore
- verify_mode=verify, server_name=hostname # type: ignore
+ verify_mode=verify, server_name=hostname # pyright: ignore
)
the_manager = manager # for type checking happiness
with manager:
if not connection:
- the_connection = the_manager.connect( # type: ignore
+ the_connection = the_manager.connect( # pyright: ignore
where, port, source, source_port
)
(start, expiration) = _compute_times(timeout)
from dns.quic._asyncio import AsyncioQuicConnection as AsyncioQuicConnection
from dns.quic._asyncio import AsyncioQuicManager
from dns.quic._asyncio import AsyncioQuicStream as AsyncioQuicStream
- from dns.quic._common import AsyncQuicConnection # type: ignore
+ from dns.quic._common import AsyncQuicConnection # pyright: ignore
from dns.quic._common import AsyncQuicManager as AsyncQuicManager
- from dns.quic._sync import SyncQuicConnection # type: ignore
- from dns.quic._sync import SyncQuicStream # type: ignore
+ from dns.quic._sync import SyncQuicConnection # pyright: ignore
+ from dns.quic._sync import SyncQuicStream # pyright: ignore
from dns.quic._sync import SyncQuicManager as SyncQuicManager
have_quic = True
else: # pragma: no cover
have_quic = False
- class AsyncQuicStream: # type: ignore
+ class AsyncQuicStream: # pyright: ignore
pass
- class AsyncQuicConnection: # type: ignore
+ class AsyncQuicConnection: # pyright: ignore
async def make_stream(self) -> Any:
raise NotImplementedError
- class SyncQuicStream: # type: ignore
+ class SyncQuicStream: # pyright: ignore
pass
- class SyncQuicConnection: # type: ignore
+ class SyncQuicConnection: # pyright: ignore
def make_stream(self) -> Any:
raise NotImplementedError
import struct
import time
-import aioquic.h3.events # type: ignore
-import aioquic.quic.events # type: ignore
+import aioquic.h3.events # pyright: ignore
+import aioquic.quic.events # pyright: ignore
import dns.asyncbackend
import dns.exception
import urllib.parse
from typing import Any
-import aioquic.h3.connection # type: ignore
-import aioquic.quic.configuration # type: ignore
-import aioquic.quic.connection # type: ignore
+import aioquic.h3.connection # pyright: ignore
+import aioquic.quic.configuration # pyright: ignore
+import aioquic.quic.connection # pyright: ignore
import dns._tls_util
import dns.inet
import threading
import time
-import aioquic.h3.events # type: ignore
-import aioquic.quic.events # type: ignore
+import aioquic.h3.events # pyright: ignore
+import aioquic.quic.events # pyright: ignore
import dns.exception
import dns.inet
import struct
import time
-import aioquic.h3.events # type: ignore
-import aioquic.quic.events # type: ignore
+import aioquic.h3.events # pyright: ignore
+import aioquic.quic.events # pyright: ignore
import trio
import dns.exception
# race.
interval = 0.0
with trio.CancelScope(
- deadline=trio.current_time() + interval # type: ignore
+ deadline=trio.current_time() + interval # pyright: ignore
) as self._worker_scope:
datagram = await self._socket.recv(QUIC_MAX_DATAGRAM)
self._connection.receive_datagram(datagram, self._peer, time.time())
"""
# Get the constructor parameters.
- parameters = inspect.signature(self.__init__).parameters # type: ignore
+ parameters = inspect.signature(self.__init__).parameters # pyright: ignore
# Ensure that all of the arguments correspond to valid fields.
# Don't allow rdclass or rdtype to be changed, though.
relativize: bool = True,
**kw: dict[str, Any],
) -> str:
- return rf"\# {len(self.data)} " + _hexify(self.data, **kw) # type: ignore
+ return rf"\# {len(self.data)} " + _hexify(self.data, **kw) # pyright: ignore
@classmethod
def from_text(
if len(self) == 0:
return []
else:
- return self[0]._processing_order(iter(self)) # type: ignore
+ return self[0]._processing_order(iter(self)) # pyright: ignore
@dns.immutable.immutable
raise TypeError("immutable")
def __copy__(self):
- return ImmutableRdataset(super().copy()) # type: ignore
+ return ImmutableRdataset(super().copy()) # pyright: ignore
def copy(self):
- return ImmutableRdataset(super().copy()) # type: ignore
+ return ImmutableRdataset(super().copy()) # pyright: ignore
def union(self, other):
- return ImmutableRdataset(super().union(other)) # type: ignore
+ return ImmutableRdataset(super().union(other)) # pyright: ignore
def intersection(self, other):
- return ImmutableRdataset(super().intersection(other)) # type: ignore
+ return ImmutableRdataset(super().intersection(other)) # pyright: ignore
def difference(self, other):
- return ImmutableRdataset(super().difference(other)) # type: ignore
+ return ImmutableRdataset(super().difference(other)) # pyright: ignore
def symmetric_difference(self, other):
- return ImmutableRdataset(super().symmetric_difference(other)) # type: ignore
+ return ImmutableRdataset(super().symmetric_difference(other)) # pyright: ignore
def from_text_list(
def to_text(self, origin=None, relativize=True, **kw):
certificate_type = _ctype_to_text(self.certificate_type)
algorithm = dns.dnssectypes.Algorithm.to_text(self.algorithm)
- certificate = dns.rdata._base64ify(self.certificate, **kw) # type: ignore
+ certificate = dns.rdata._base64ify(self.certificate, **kw) # pyright: ignore
return f"{certificate_type} {self.key_tag} {algorithm} {certificate}"
@classmethod
if isinstance(latitude, float):
latitude = _float_to_tuple(latitude)
_check_coordinate_list(latitude, -90, 90)
- self.latitude = tuple(latitude) # type: ignore
+ self.latitude = tuple(latitude) # pyright: ignore
if isinstance(longitude, int):
longitude = float(longitude)
if isinstance(longitude, float):
longitude = _float_to_tuple(longitude)
_check_coordinate_list(longitude, -180, 180)
- self.longitude = tuple(longitude) # type: ignore
+ self.longitude = tuple(longitude) # pyright: ignore
self.altitude = float(altitude)
self.size = float(size)
self.horizontal_precision = float(hprec)
self.key = self._as_bytes(key)
def to_text(self, origin=None, relativize=True, **kw):
- return dns.rdata._base64ify(self.key, chunksize=None, **kw) # type: ignore
+ return dns.rdata._base64ify(self.key, chunksize=None, **kw) # pyright: ignore
@classmethod
def from_text(
kw = kw.copy()
chunksize = kw.pop("chunksize", 128)
fingerprint = dns.rdata._hexify(
- self.fingerprint, chunksize=chunksize, **kw # type: ignore
+ self.fingerprint, chunksize=chunksize, **kw # pyright: ignore
)
return f"{self.algorithm} {self.fp_type} {fingerprint}"
kw = kw.copy()
chunksize = kw.pop("chunksize", 128)
digest = dns.rdata._hexify(
- self.digest, chunksize=chunksize, **kw # type: ignore
+ self.digest, chunksize=chunksize, **kw # pyright: ignore
)
return f"{self.serial} {self.scheme} {self.hash_algorithm} {digest}"
self.data = self._as_bytes(data)
def to_text(self, origin=None, relativize=True, **kw):
- return dns.rdata._base64ify(self.data, **kw) # type: ignore
+ return dns.rdata._base64ify(self.data, **kw) # pyright: ignore
@classmethod
def from_text(
def to_text(self, origin=None, relativize=True, **kw):
gateway = Gateway(self.gateway_type, self.gateway).to_text(origin, relativize)
- key = dns.rdata._base64ify(self.key, **kw) # type: ignore
+ key = dns.rdata._base64ify(self.key, **kw) # pyright: ignore
return f"{self.precedence} {self.gateway_type} {self.algorithm} {gateway} {key}"
@classmethod
self.key = self._as_bytes(key)
def to_text(self, origin=None, relativize=True, **kw):
- key = dns.rdata._base64ify(self.key, **kw) # type: ignore
+ key = dns.rdata._base64ify(self.key, **kw) # pyright: ignore
return f"{self.flags} {self.protocol} {self.algorithm} {key}"
@classmethod
kw = kw.copy()
chunksize = kw.pop("chunksize", 128)
digest = dns.rdata._hexify(
- self.digest, chunksize=chunksize, **kw # type: ignore
+ self.digest, chunksize=chunksize, **kw # pyright: ignore
)
return f"{self.key_tag} {self.algorithm} {self.digest_type} {digest}"
expiration = posixtime_to_sigtime(self.expiration)
inception = posixtime_to_sigtime(self.inception)
signer = self.signer.choose_relativity(origin, relativize)
- sig = dns.rdata._base64ify(self.signature, **kw) # type: ignore
+ sig = dns.rdata._base64ify(self.signature, **kw) # pyright: ignore
return (
f"{ctext} {self.algorithm} {self.labels} {self.original_ttl} "
+ f"{expiration} {inception} {self.key_tag} {signer} {sig}"
def to_text(self, origin=None, relativize=True, **kw):
kw = kw.copy()
chunksize = kw.pop("chunksize", 128)
- cert = dns.rdata._hexify(self.cert, chunksize=chunksize, **kw) # type: ignore
+ cert = dns.rdata._hexify(
+ self.cert, chunksize=chunksize, **kw # pyright: ignore
+ )
return f"{self.usage} {self.selector} {self.mtype} {cert}"
@classmethod
if weight > r:
break
r -= weight
- total -= weight # type: ignore
+ total -= weight # pyright: ignore
# pylint: disable=undefined-loop-variable
- ordered.append(rdata) # type: ignore
- del rdatas[n] # type: ignore
+ ordered.append(rdata) # pyright: ignore
+ del rdatas[n] # pyright: ignore
ordered.append(rdatas[0])
return ordered
import dns.rdtypes.ANY.TSIG
import dns.rrset
import dns.tsig
-from dns._render_util import prefixed_length as prefixed_length # type: ignore
+from dns._render_util import prefixed_length as prefixed_length # pyright: ignore
DEFAULT_EDNS_PAYLOAD = 1232
pad = b""
options = list(opt_rdata.options)
options.append(dns.edns.GenericOption(dns.edns.OptionType.PADDING, pad))
- opt = _make_opt(ttl, opt_rdata.rdclass, options) # type: ignore
+ opt = _make_opt(ttl, opt_rdata.rdclass, options) # pyright: ignore
self.was_padded = True
self.add_rrset(ADDITIONAL, opt)
# make sure the EDNS version in ednsflags agrees with edns
ednsflags &= 0xFF00FFFF
ednsflags |= edns << 16
- opt = _make_opt(ednsflags, payload, options) # type: ignore
+ opt = _make_opt(ednsflags, payload, options) # pyright: ignore
self.add_opt(opt)
def add_tsig(
key = secret
else:
key = dns.tsig.Key(keyname, secret, algorithm)
- tsig = _make_tsig( # type: ignore
+ tsig = _make_tsig( # pyright: ignore
keyname, algorithm, 0, fudge, b"", id, tsig_error, other_data
)
(tsig, _) = dns.tsig.sign(s, key, tsig[0], int(time.time()), request_mac)
key = secret
else:
key = dns.tsig.Key(keyname, secret, algorithm)
- tsig = _make_tsig( # type: ignore
+ tsig = _make_tsig( # pyright: ignore
keyname, algorithm, 0, fudge, b"", id, tsig_error, other_data
)
(tsig, ctx) = dns.tsig.sign(
tcp: bool = False,
resolver: Resolver | None = None,
lifetime: float | None = None,
-) -> dns.name.Name: # type: ignore[reportReturnType]
+) -> dns.name.Name: # pyright: ignore
"""Find the name of the zone which contains the specified name.
*name*, an absolute ``dns.name.Name`` or ``str``, the query name.
global _resolver
_resolver = None
- socket.getaddrinfo = _original_getaddrinfo # type: ignore
- socket.getnameinfo = _original_getnameinfo # type: ignore
- socket.getfqdn = _original_getfqdn # type: ignore
- socket.gethostbyname = _original_gethostbyname # type: ignore
- socket.gethostbyname_ex = _original_gethostbyname_ex # type: ignore
- socket.gethostbyaddr = _original_gethostbyaddr # type: ignore
+ socket.getaddrinfo = _original_getaddrinfo # pyright: ignore
+ socket.getnameinfo = _original_getnameinfo # pyright: ignore
+ socket.getfqdn = _original_getfqdn # pyright: ignore
+ socket.gethostbyname = _original_gethostbyname # pyright: ignore
+ socket.gethostbyname_ex = _original_gethostbyname_ex # pyright: ignore
+ socket.gethostbyaddr = _original_gethostbyaddr # pyright: ignore
return False
return super().__eq__(other)
- def match(self, *args: Any, **kwargs: Any) -> bool: # type: ignore[override]
+ def match(self, *args: Any, **kwargs: Any) -> bool:
"""Does this rrset match the specified attributes?
Behaves as :py:func:`full_match()` if the first argument is a
compatibility.)
"""
if isinstance(args[0], dns.name.Name):
- return self.full_match(*args, **kwargs) # type: ignore[arg-type]
+ return self.full_match(*args, **kwargs)
else:
- return super().match(*args, **kwargs) # type: ignore[arg-type]
+ return super().match(*args, **kwargs)
def full_match(
self,
# pylint: disable=arguments-differ
- def to_text( # type: ignore[override]
+ def to_text( # type: ignore
self,
origin: dns.name.Name | None = None,
relativize: bool = True,
self.name, origin, relativize, self.deleting, **kw # type: ignore
)
- def to_wire( # type: ignore[override]
+ def to_wire( # type: ignore
self,
file: Any,
- compress: dns.name.CompressType | None = None, # type: ignore
+ compress: dns.name.CompressType | None = None,
origin: dns.name.Name | None = None,
**kw: dict[str, Any],
) -> int:
"""
if hasattr(self, "_clone_class"):
- cls = self._clone_class # type: ignore
+ cls = self._clone_class # pyright: ignore
else:
cls = self.__class__
obj = cls.__new__(cls)
class UpdateMessage(dns.message.Message): # lgtm[py/missing-equals]
# ignore the mypy error here as we mean to use a different enum
- _section_enum = UpdateSection # type: ignore
+ _section_enum = UpdateSection # pyright: ignore
def __init__(
self,
rd = dns.rdata.from_text(
self.zone_rdclass,
rdtype,
- s, # type: ignore[arg-type]
+ s, # pyright: ignore[arg-type]
self.origin,
)
self._add_rr(name, 0, rd, dns.rdataclass.NONE)
if not isinstance(args[0], dns.rdataset.Rdataset):
# Add a 0 TTL
largs = list(args)
- largs.insert(0, 0) # type: ignore[arg-type]
+ largs.insert(0, 0) # pyright: ignore[arg-type]
self._add(False, self.prerequisite, name, *largs)
else:
self._add(False, self.prerequisite, name, *args)
# Updates are always one_rr_per_rrset
return True
- def _parse_rr_header(self, section, name, rdclass, rdtype): # type: ignore
+ def _parse_rr_header(self, section, name, rdclass, rdtype): # pyright: ignore
deleting = None
empty = False
if section == UpdateSection.ZONE:
# Note our definition of least_kept also ensures we do not try to
# delete the greatest version.
if len(self._readers) > 0:
- least_kept = min(txn.version.id for txn in self._readers) # type: ignore
+ least_kept = min(
+ cast(ImmutableVersion, txn.version).id for txn in self._readers
+ )
else:
least_kept = self._versions[-1].id
while self._versions[0].id < least_kept and self._pruning_policy(
raise ValueError("max versions must be at least 1")
if max_versions is None:
# pylint: disable=unused-argument
- def policy(zone, _): # type: ignore
+ def policy(zone, _): # pyright: ignore
return False
else:
else:
- class _WMIGetter: # type: ignore
+ class _WMIGetter: # pyright: ignore
pass
def _config_domain(domain):
if rdataset.rdtype != dns.rdatatype.SOA:
raise dns.exception.FormError("first RRset is not an SOA")
answer_index = 1
- self.soa_rdataset = rdataset.copy() # type: ignore
+ self.soa_rdataset = rdataset.copy() # pyright: ignore
if self.incremental:
assert self.soa_rdataset is not None
soa = cast(dns.rdtypes.ANY.SOA.SOA, self.soa_rdataset[0])
import os
import struct
from collections.abc import Callable, Iterable, Iterator, MutableMapping
-from typing import Any, cast
+from typing import Any, BinaryIO, TextIO, cast
import dns.exception
import dns.immutable
cm: contextlib.AbstractContextManager = open(f, "wb")
else:
cm = contextlib.nullcontext(f)
- with cm as f:
- assert f is not None
+ with cm as output:
# must be in this way, f.encoding may contain None, or even
# attribute may not be there
file_enc = getattr(f, "encoding", None)
else:
nl_b = nl
nl = nl.decode()
+ assert nl is not None
+ assert nl_b is not None
if want_origin:
assert self.origin is not None
l = "$ORIGIN " + self.origin.to_text()
l_b = l.encode(file_enc)
try:
- f.write(l_b)
- f.write(nl_b)
+ f.write(l_b) # pyright: ignore
+ f.write(nl_b) # pyright: ignore
except TypeError: # textual mode
- f.write(l) # type: ignore
- f.write(nl) # type: ignore
+ f.write(l) # pyright: ignore
+ f.write(nl) # pyright: ignore
if sorted:
names = list(self.keys())
for n in names:
l = self[n].to_text(
n,
- origin=self.origin, # type: ignore
- relativize=relativize, # type: ignore
- want_comments=want_comments, # type: ignore
+ origin=self.origin, # pyright: ignore
+ relativize=relativize, # pyright: ignore
+ want_comments=want_comments, # pyright: ignore
)
l_b = l.encode(file_enc)
try:
- f.write(l_b)
- f.write(nl_b)
+ f.write(l_b) # pyright: ignore
+ f.write(nl_b) # pyright: ignore
except TypeError: # textual mode
- f.write(l) # type: ignore
- f.write(nl) # type: ignore
+ f.write(l) # pyright: ignore
+ f.write(nl) # pyright: ignore
def to_text(
self,
# test. Now we use the changed set as this works with both
# regular zones and versioned zones.
#
- # We ignore the mypy error as this is safe but it doesn't see it.
+ # We ignore the type error as this is safe but checkers have trouble.
new_node.id = self.id # type: ignore
if node is not None:
# moo! copy on write!
# we ignore the mypy error.
self.nodes = dns.immutable.Dict(
version.nodes, True, self.zone.map_factory
- ) # type: ignore
+ ) # pyright: ignore
class Transaction(dns.transaction.Transaction):
factory = self.manager.writable_version_factory # type: ignore
if factory is None:
factory = WritableVersion
- self.version = factory(self.zone, self.replacement) # type: ignore
+ self.version = factory(self.zone, self.replacement) # pyright: ignore
def _get_rdataset(self, name, rdtype, covers):
assert self.version is not None
assert self.zone is not None
assert self.version is not None
if self.read_only:
- self.zone._end_read(self) # type: ignore
+ self.zone._end_read(self) # pyright: ignore
elif commit and len(cast(WritableVersion, self.version).changed) > 0:
if self.make_immutable:
factory = self.manager.immutable_version_factory # type: ignore
version = factory(self.version)
else:
version = self.version
- self.zone._commit_version( # type: ignore
+ self.zone._commit_version( # pyright: ignore
self, version, self.version.origin
)
else:
# rollback
- self.zone._end_write(self) # type: ignore
+ self.zone._end_write(self) # pyright: ignore
def _set_origin(self, origin):
assert self.version is not None
)
rrset.update(rdataset)
rrsets.append(rrset)
- self.manager.set_rrsets(rrsets) # type: ignore
+ manager = cast(RRSetsReaderManager, self.manager)
+ manager.set_rrsets(rrsets)
def _set_origin(self, origin):
pass