import httpx
_CoreAsyncNetworkBackend = httpcore.AsyncNetworkBackend
- _CoreAnyIOStream = httpcore._backends.anyio.AnyIOStream # pyright: ignore
+ _CoreAnyIOStream = httpcore._backends.anyio.AnyIOStream # type: ignore
from dns.query import _compute_times, _expiration_for_this_attempt, _remaining
# proper fix for [#637].
source = (dns.inet.any_for_af(af), 0)
transport, protocol = await loop.create_datagram_endpoint(
- _DatagramProtocol, # pyright: ignore
+ _DatagramProtocol, # type: ignore
source,
family=af,
proto=proto,
finally:
_in__init__.reset(previous)
- nf.__signature__ = inspect.signature(f) # pyright: ignore
+ nf.__signature__ = inspect.signature(f) # type: ignore
return nf
dtuple = None
cm = await backend.make_socket(af, socket.SOCK_DGRAM, 0, stuple, dtuple)
async with cm as s:
- await send_udp(s, wire, destination, expiration) # pyright: ignore
+ await send_udp(s, wire, destination, expiration) # type: ignore
(r, received_time, _) = await receive_udp(
- s, # pyright: ignore
+ s, # type: ignore
destination,
expiration,
ignore_unexpected,
af, socket.SOCK_STREAM, 0, stuple, dtuple, timeout
)
async with cm as s:
- await send_tcp(s, wire, expiration) # pyright: ignore
+ await send_tcp(s, wire, expiration) # type: ignore
(r, received_time) = await receive_tcp(
- s, # pyright: ignore
+ s, # type: ignore
expiration,
one_rr_per_rrset,
q.keyring,
def _maybe_get_resolver(
- resolver: Optional["dns.asyncresolver.Resolver"], # pyright: ignore
-) -> "dns.asyncresolver.Resolver": # pyright: ignore
+ resolver: Optional["dns.asyncresolver.Resolver"], # type: ignore
+) -> "dns.asyncresolver.Resolver": # type: ignore
# We need a separate method for this to avoid overriding the global
# variable "dns" with the as-yet undefined local variable "dns"
# in https().
post: bool = True,
verify: bool | str | ssl.SSLContext = True,
bootstrap_address: str | None = None,
- resolver: Optional["dns.asyncresolver.Resolver"] = None, # pyright: ignore
+ resolver: Optional["dns.asyncresolver.Resolver"] = None, # type: ignore
family: int = socket.AF_UNSPEC,
http_version: HTTPVersion = HTTPVersion.DEFAULT,
) -> dns.message.Message:
):
if bootstrap_address is None:
resolver = _maybe_get_resolver(resolver)
- assert parsed.hostname is not None # pyright: ignore
- answers = await resolver.resolve_name( # pyright: ignore
- parsed.hostname, family # pyright: ignore
+ assert parsed.hostname is not None # type: ignore
+ answers = await resolver.resolve_name( # type: ignore
+ parsed.hostname, family # type: ignore
)
bootstrap_address = random.choice(list(answers.addresses()))
if client and not isinstance(
client, dns.quic.AsyncQuicConnection
- ): # pyright: ignore
+ ): # type: ignore
raise ValueError("client parameter must be a dns.quic.AsyncQuicConnection.")
assert client is None or isinstance(client, dns.quic.AsyncQuicConnection)
return await _http3(
if not have_doh:
raise NoDOH # pragma: no cover
# pylint: disable=possibly-used-before-assignment
- if client and not isinstance(client, httpx.AsyncClient): # pyright: ignore
+ if client and not isinstance(client, httpx.AsyncClient): # type: ignore
raise ValueError("client parameter must be an httpx.AsyncClient")
# pylint: enable=possibly-used-before-assignment
family=family,
)
- cm = httpx.AsyncClient( # pyright: ignore
+ cm = httpx.AsyncClient( # type: ignore
http1=h1, http2=h2, verify=verify, transport=transport # type: ignore
)
}
)
response = await backend.wait_for(
- the_client.post( # pyright: ignore
+ the_client.post( # type: ignore
url,
headers=headers,
content=wire,
wire = base64.urlsafe_b64encode(wire).rstrip(b"=")
twire = wire.decode() # httpx does a repr() if we give it bytes
response = await backend.wait_for(
- the_client.get( # pyright: ignore
+ the_client.get( # type: ignore
url,
headers=headers,
params={"dns": twire},
if connection:
the_connection = connection
else:
- the_connection = the_manager.connect( # pyright: ignore
+ the_connection = the_manager.connect( # type: ignore
where, port, source, source_port
)
(start, expiration) = _compute_times(timeout)
- stream = await the_connection.make_stream(timeout) # pyright: ignore
+ stream = await the_connection.make_stream(timeout) # type: ignore
async with stream:
# note that send_h3() does not need await
stream.send_h3(url, wire, post)
server_name=server_hostname,
) as the_manager:
if not connection:
- the_connection = the_manager.connect( # pyright: ignore
+ the_connection = the_manager.connect( # type: ignore
where, port, source, source_port
)
(start, expiration) = _compute_times(timeout)
- stream = await the_connection.make_stream(timeout) # pyright: ignore
+ stream = await the_connection.make_stream(timeout) # type: ignore
async with stream:
await stream.send(wire, True)
wire = await stream.receive(_remaining(expiration))
mexpiration = expiration
if is_udp:
timeout = _timeout(mexpiration)
- (rwire, _) = await udp_sock.recvfrom(65535, timeout) # pyright: ignore
+ (rwire, _) = await udp_sock.recvfrom(65535, timeout) # type: ignore
else:
- ldata = await _read_exactly(tcp_sock, 2, mexpiration) # pyright: ignore
+ ldata = await _read_exactly(tcp_sock, 2, mexpiration) # type: ignore
(l,) = struct.unpack("!H", ldata)
- rwire = await _read_exactly(tcp_sock, l, mexpiration) # pyright: ignore
+ rwire = await _read_exactly(tcp_sock, l, mexpiration) # type: ignore
r = dns.message.from_wire(
rwire,
keyring=query.keyring,
)
async with s:
try:
- async for _ in _inbound_xfr( # pyright: ignore
+ async for _ in _inbound_xfr( # type: ignore
txn_manager,
s,
query,
serial,
timeout,
- expiration, # pyright: ignore
+ expiration, # type: ignore
):
pass
return
af, socket.SOCK_STREAM, 0, stuple, dtuple, _timeout(expiration)
)
async with s:
- async for _ in _inbound_xfr( # pyright: ignore
- txn_manager, s, query, serial, timeout, expiration # pyright: ignore
+ async for _ in _inbound_xfr( # type: ignore
+ txn_manager, s, query, serial, timeout, expiration # type: ignore
):
pass
class CryptographyPrivateKey(GenericPrivateKey):
key: Any = None
key_cls: Any = None
- public_cls: Type[CryptographyPublicKey] # pyright: ignore
+ public_cls: Type[CryptographyPublicKey] # type: ignore
def __init__(self, key: Any) -> None: # pylint: disable=super-init-not-called
if self.key_cls is None:
public_dsa_key = self.key.public_key()
if public_dsa_key.key_size > 1024:
raise ValueError("DSA key size overflow")
- der_signature = self.key.sign(
- data, self.public_cls.chosen_hash # pyright: ignore
- )
+ der_signature = self.key.sign(data, self.public_cls.chosen_hash) # type: ignore
dsa_r, dsa_s = utils.decode_dss_signature(der_signature)
dsa_t = (public_dsa_key.key_size // 8 - 64) // 8
octets = 20
) -> bytes:
"""Sign using a private key per RFC 6605, section 4."""
algorithm = ec.ECDSA(
- self.public_cls.chosen_hash, # pyright: ignore
+ self.public_cls.chosen_hash, # type: ignore
deterministic_signing=deterministic,
)
der_signature = self.key.sign(data, algorithm)
dsa_r, dsa_s = utils.decode_dss_signature(der_signature)
signature = int.to_bytes(
- dsa_r, length=self.public_cls.octets, byteorder="big" # pyright: ignore
+ dsa_r, length=self.public_cls.octets, byteorder="big" # type: ignore
) + int.to_bytes(
- dsa_s, length=self.public_cls.octets, byteorder="big" # pyright: ignore
+ dsa_s, length=self.public_cls.octets, byteorder="big" # type: ignore
)
if verify:
self.public_key().verify(signature, data)
def generate(cls) -> "PrivateECDSA":
return cls(
key=ec.generate_private_key(
- curve=cls.public_cls.curve, backend=default_backend() # pyright: ignore
+ curve=cls.public_cls.curve, backend=default_backend() # type: ignore
),
)
class PrivateEDDSA(CryptographyPrivateKey):
- public_cls: Type[PublicEDDSA] # pyright: ignore
+ public_cls: Type[PublicEDDSA] # type: ignore
def sign(
self,
) -> bytes:
"""Sign using a private key per RFC 3110, section 3."""
signature = self.key.sign(
- data, padding.PKCS1v15(), self.public_cls.chosen_hash # pyright: ignore
+ data, padding.PKCS1v15(), self.public_cls.chosen_hash # type: ignore
)
if verify:
self.public_key().verify(signature, data)
@classmethod
def _missing_(cls, value):
cls._check_value(value)
- val = int.__new__(cls, value) # pyright: ignore
+ val = int.__new__(cls, value) # type: ignore
val._name_ = cls._extra_to_text(value, None) or f"{cls._prefix()}{value}"
- val._value_ = value # pyright: ignore
+ val._value_ = value # type: ignore
return val
@classmethod
return QueryMessage
elif opcode == dns.opcode.UPDATE:
_maybe_import_update()
- return dns.update.UpdateMessage # pyright: ignore
+ return dns.update.UpdateMessage # type: ignore
else:
return Message
else:
with self.parser.restrict_to(rdlen):
rd = dns.rdata.from_wire_parser(
- rdclass, # pyright: ignore
+ rdclass, # type: ignore
rdtype,
self.parser,
self.message.origin,
rrset = self.message.find_rrset(
section,
name,
- rdclass, # pyright: ignore
+ rdclass, # type: ignore
rdtype,
covers,
deleting,
): # pylint: disable=signature-differs
raise NotImplementedError
- class _HTTPTransport(httpx.HTTPTransport): # pyright: ignore
+ class _HTTPTransport(httpx.HTTPTransport): # type: ignore
def __init__(
self,
*args,
if writable:
events |= selectors.EVENT_WRITE
if events:
- sel.register(fd, events) # pyright: ignore
+ sel.register(fd, events) # type: ignore
if expiration is None:
timeout = None
else:
def _maybe_get_resolver(
- resolver: Optional["dns.resolver.Resolver"], # pyright: ignore
-) -> "dns.resolver.Resolver": # pyright: ignore
+ resolver: Optional["dns.resolver.Resolver"], # type: ignore
+) -> "dns.resolver.Resolver": # type: ignore
# We need a separate method for this to avoid overriding the global
# variable "dns" with the as-yet undefined local variable "dns"
# in https().
post: bool = True,
bootstrap_address: str | None = None,
verify: bool | str | ssl.SSLContext = True,
- resolver: Optional["dns.resolver.Resolver"] = None, # pyright: ignore
+ resolver: Optional["dns.resolver.Resolver"] = None, # type: ignore
family: int = socket.AF_UNSPEC,
http_version: HTTPVersion = HTTPVersion.DEFAULT,
) -> dns.message.Message:
):
if bootstrap_address is None:
resolver = _maybe_get_resolver(resolver)
- assert parsed.hostname is not None # pyright: ignore
- answers = resolver.resolve_name(parsed.hostname, family) # pyright: ignore
+ assert parsed.hostname is not None # type: ignore
+ answers = resolver.resolve_name(parsed.hostname, family) # type: ignore
bootstrap_address = random.choice(list(answers.addresses()))
if session and not isinstance(
session, dns.quic.SyncQuicConnection
- ): # pyright: ignore
+ ): # type: ignore
raise ValueError("session parameter must be a dns.quic.SyncQuicConnection.")
return _http3(
q,
bootstrap_address,
- url, # pyright: ignore
+ url, # type: ignore
timeout,
port,
source,
if not have_doh:
raise NoDOH # pragma: no cover
- if session and not isinstance(session, httpx.Client): # pyright: ignore
+ if session and not isinstance(session, httpx.Client): # type: ignore
raise ValueError("session parameter must be an httpx.Client")
wire = q.to_wire()
local_port=local_port,
bootstrap_address=bootstrap_address,
resolver=resolver,
- family=family, # pyright: ignore
+ family=family, # type: ignore
)
cm = httpx.Client( # type: ignore
manager: contextlib.AbstractContextManager = contextlib.nullcontext(None)
else:
manager = dns.quic.SyncQuicManager(
- verify_mode=verify, server_name=hostname, h3=True # pyright: ignore
+ verify_mode=verify, server_name=hostname, h3=True # type: ignore
)
the_manager = manager # for type checking happiness
if connection:
the_connection = connection
else:
- the_connection = the_manager.connect( # pyright: ignore
+ the_connection = the_manager.connect( # type: ignore
where, port, source, source_port
)
(start, expiration) = _compute_times(timeout)
- with the_connection.make_stream(timeout) as stream: # pyright: ignore
+ with the_connection.make_stream(timeout) as stream: # type: ignore
stream.send_h3(url, wire, post)
wire = stream.receive(_remaining(expiration))
_check_status(stream.headers(), where, wire)
with cm as s:
if not sock:
# pylint: disable=possibly-used-before-assignment
- _connect(s, destination, expiration) # pyright: ignore
+ _connect(s, destination, expiration) # type: ignore
send_tcp(s, wire, expiration)
(r, received_time) = receive_tcp(
s, expiration, one_rr_per_rrset, q.keyring, q.mac, ignore_trailing
the_connection = connection
else:
manager = dns.quic.SyncQuicManager(
- verify_mode=verify, server_name=hostname # pyright: ignore
+ verify_mode=verify, server_name=hostname # type: ignore
)
the_manager = manager # for type checking happiness
with manager:
if not connection:
- the_connection = the_manager.connect( # pyright: ignore
+ the_connection = the_manager.connect( # type: ignore
where, port, source, source_port
)
(start, expiration) = _compute_times(timeout)
- with the_connection.make_stream(timeout) as stream: # pyright: ignore
+ with the_connection.make_stream(timeout) as stream: # type: ignore
stream.send(wire, True)
wire = stream.receive(_remaining(expiration))
finish = time.time()
from dns.quic._asyncio import AsyncioQuicConnection as AsyncioQuicConnection
from dns.quic._asyncio import AsyncioQuicManager
from dns.quic._asyncio import AsyncioQuicStream as AsyncioQuicStream
- from dns.quic._common import AsyncQuicConnection # pyright: ignore
+ from dns.quic._common import AsyncQuicConnection # type: ignore
from dns.quic._common import AsyncQuicManager as AsyncQuicManager
- from dns.quic._sync import SyncQuicConnection # pyright: ignore
- from dns.quic._sync import SyncQuicStream # pyright: ignore
+ from dns.quic._sync import SyncQuicConnection # type: ignore
+ from dns.quic._sync import SyncQuicStream # type: ignore
from dns.quic._sync import SyncQuicManager as SyncQuicManager
have_quic = True
# race.
interval = 0.0
with trio.CancelScope(
- deadline=trio.current_time() + interval # pyright: ignore
+ deadline=trio.current_time() + interval # type: ignore
) as self._worker_scope:
datagram = await self._socket.recv(QUIC_MAX_DATAGRAM)
self._connection.receive_datagram(datagram, self._peer, time.time())
relativize: bool = True,
**kw: Dict[str, Any],
) -> str:
- return rf"\# {len(self.data)} " + _hexify(self.data, **kw) # pyright: ignore
+ return rf"\# {len(self.data)} " + _hexify(self.data, **kw) # type: ignore
@classmethod
def from_text(
self.ttl = ttl
# pylint: disable=arguments-differ,arguments-renamed
- def add( # pyright: ignore
- self, rd: dns.rdata.Rdata, ttl: int | None = None
- ) -> None:
+ def add(self, rd: dns.rdata.Rdata, ttl: int | None = None) -> None: # type: ignore
"""Add the specified rdata to the rdataset.
If the optional *ttl* parameter is supplied, then
if len(self) == 0:
return []
else:
- return self[0]._processing_order(iter(self)) # pyright: ignore
+ return self[0]._processing_order(iter(self)) # type: ignore
@dns.immutable.immutable
raise TypeError("immutable")
def __copy__(self):
- return ImmutableRdataset(super().copy()) # pyright: ignore
+ return ImmutableRdataset(super().copy()) # type: ignore
def copy(self):
- return ImmutableRdataset(super().copy()) # pyright: ignore
+ return ImmutableRdataset(super().copy()) # type: ignore
def union(self, other):
- return ImmutableRdataset(super().union(other)) # pyright: ignore
+ return ImmutableRdataset(super().union(other)) # type: ignore
def intersection(self, other):
- return ImmutableRdataset(super().intersection(other)) # pyright: ignore
+ return ImmutableRdataset(super().intersection(other)) # type: ignore
def difference(self, other):
- return ImmutableRdataset(super().difference(other)) # pyright: ignore
+ return ImmutableRdataset(super().difference(other)) # type: ignore
def symmetric_difference(self, other):
- return ImmutableRdataset(super().symmetric_difference(other)) # pyright: ignore
+ return ImmutableRdataset(super().symmetric_difference(other)) # type: ignore
def from_text_list(
def to_text(self, origin=None, relativize=True, **kw):
certificate_type = _ctype_to_text(self.certificate_type)
algorithm = dns.dnssectypes.Algorithm.to_text(self.algorithm)
- certificate = dns.rdata._base64ify(self.certificate, **kw) # pyright: ignore
+ certificate = dns.rdata._base64ify(self.certificate, **kw) # type: ignore
return f"{certificate_type} {self.key_tag} {algorithm} {certificate}"
@classmethod
if isinstance(latitude, float):
latitude = _float_to_tuple(latitude)
_check_coordinate_list(latitude, -90, 90)
- self.latitude = tuple(latitude) # pyright: ignore
+ self.latitude = tuple(latitude) # type: ignore
if isinstance(longitude, int):
longitude = float(longitude)
if isinstance(longitude, float):
longitude = _float_to_tuple(longitude)
_check_coordinate_list(longitude, -180, 180)
- self.longitude = tuple(longitude) # pyright: ignore
+ self.longitude = tuple(longitude) # type: ignore
self.altitude = float(altitude)
self.size = float(size)
self.horizontal_precision = float(hprec)
self.key = self._as_bytes(key)
def to_text(self, origin=None, relativize=True, **kw):
- return dns.rdata._base64ify(self.key, chunksize=None, **kw) # pyright: ignore
+ return dns.rdata._base64ify(self.key, chunksize=None, **kw) # type: ignore
@classmethod
def from_text(
kw = kw.copy()
chunksize = kw.pop("chunksize", 128)
fingerprint = dns.rdata._hexify(
- self.fingerprint, chunksize=chunksize, **kw # pyright: ignore
+ self.fingerprint, chunksize=chunksize, **kw # type: ignore
)
return f"{self.algorithm} {self.fp_type} {fingerprint}"
kw = kw.copy()
chunksize = kw.pop("chunksize", 128)
digest = dns.rdata._hexify(
- self.digest, chunksize=chunksize, **kw # pyright: ignore
+ self.digest, chunksize=chunksize, **kw # type: ignore
)
return f"{self.serial} {self.scheme} {self.hash_algorithm} {digest}"
self.data = self._as_bytes(data)
def to_text(self, origin=None, relativize=True, **kw):
- return dns.rdata._base64ify(self.data, **kw) # pyright: ignore
+ return dns.rdata._base64ify(self.data, **kw) # type: ignore
@classmethod
def from_text(
def to_text(self, origin=None, relativize=True, **kw):
gateway = Gateway(self.gateway_type, self.gateway).to_text(origin, relativize)
- key = dns.rdata._base64ify(self.key, **kw) # pyright: ignore
+ key = dns.rdata._base64ify(self.key, **kw) # type: ignore
return f"{self.precedence} {self.gateway_type} {self.algorithm} {gateway} {key}"
@classmethod
self.key = self._as_bytes(key)
def to_text(self, origin=None, relativize=True, **kw):
- key = dns.rdata._base64ify(self.key, **kw) # pyright: ignore
+ key = dns.rdata._base64ify(self.key, **kw) # type: ignore
return f"{self.flags} {self.protocol} {self.algorithm} {key}"
@classmethod
kw = kw.copy()
chunksize = kw.pop("chunksize", 128)
digest = dns.rdata._hexify(
- self.digest, chunksize=chunksize, **kw # pyright: ignore
+ self.digest, chunksize=chunksize, **kw # type: ignore
)
return f"{self.key_tag} {self.algorithm} {self.digest_type} {digest}"
expiration = posixtime_to_sigtime(self.expiration)
inception = posixtime_to_sigtime(self.inception)
signer = self.signer.choose_relativity(origin, relativize)
- sig = dns.rdata._base64ify(self.signature, **kw) # pyright: ignore
+ sig = dns.rdata._base64ify(self.signature, **kw) # type: ignore
return (
f"{ctext} {self.algorithm} {self.labels} {self.original_ttl} "
+ f"{expiration} {inception} {self.key_tag} {signer} {sig}"
header = parser.get_struct("!HBBIIIH")
signer = parser.get_name(origin)
signature = parser.get_remaining()
- return cls(rdclass, rdtype, *header, signer, signature) # pyright: ignore
+ return cls(rdclass, rdtype, *header, signer, signature) # type: ignore
def to_text(self, origin=None, relativize=True, **kw):
kw = kw.copy()
chunksize = kw.pop("chunksize", 128)
- cert = dns.rdata._hexify(
- self.cert, chunksize=chunksize, **kw # pyright: ignore
- )
+ cert = dns.rdata._hexify(self.cert, chunksize=chunksize, **kw) # type: ignore
return f"{self.usage} {self.selector} {self.mtype} {cert}"
@classmethod
if weight > r:
break
r -= weight
- total -= weight # pyright: ignore[reportPossiblyUnboundVariable]
+ total -= weight # type: ignore
# pylint: disable=undefined-loop-variable
- ordered.append(rdata) # pyright: ignore[reportPossiblyUnboundVariable]
- del rdatas[n] # pyright: ignore[reportPossiblyUnboundVariable]
+ ordered.append(rdata) # type: ignore
+ del rdatas[n] # type: ignore
ordered.append(rdatas[0])
return ordered
pad = b""
options = list(opt_rdata.options)
options.append(dns.edns.GenericOption(dns.edns.OptionType.PADDING, pad))
- opt = dns.message.Message._make_opt( # pyright: ignore
+ opt = dns.message.Message._make_opt( # type: ignore
ttl, opt_rdata.rdclass, options
)
self.was_padded = True
# make sure the EDNS version in ednsflags agrees with edns
ednsflags &= 0xFF00FFFF
ednsflags |= edns << 16
- opt = dns.message.Message._make_opt( # pyright: ignore
- ednsflags, payload, options
- )
+ opt = dns.message.Message._make_opt(ednsflags, payload, options) # type: ignore
self.add_opt(opt)
def add_tsig(
key = secret
else:
key = dns.tsig.Key(keyname, secret, algorithm)
- tsig = dns.message.Message._make_tsig( # pyright: ignore
+ tsig = dns.message.Message._make_tsig( # type: ignore
keyname, algorithm, 0, fudge, b"", id, tsig_error, other_data
)
(tsig, _) = dns.tsig.sign(s, key, tsig[0], int(time.time()), request_mac)
key = secret
else:
key = dns.tsig.Key(keyname, secret, algorithm)
- tsig = dns.message.Message._make_tsig( # pyright: ignore
+ tsig = dns.message.Message._make_tsig( # type: ignore
keyname, algorithm, 0, fudge, b"", id, tsig_error, other_data
)
(tsig, ctx) = dns.tsig.sign(
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
- def _check_kwargs(self, qnames, responses=None): # pyright: ignore
+ def _check_kwargs(self, qnames, responses=None): # type: ignore
if not isinstance(qnames, list | tuple | set):
raise AttributeError("qnames must be a list, tuple or set")
if len(qnames) == 0:
tcp: bool = False,
resolver: Resolver | None = None,
lifetime: float | None = None,
-) -> dns.name.Name: # pyright: ignore[reportReturnType]
+) -> dns.name.Name: # type: ignore[reportReturnType]
"""Find the name of the zone which contains the specified name.
*name*, an absolute ``dns.name.Name`` or ``str``, the query name.
except Exception:
if flags & socket.AI_NUMERICSERV == 0:
try:
- port = socket.getservbyname(service) # pyright: ignore
+ port = socket.getservbyname(service) # type: ignore
except Exception:
pass
if port is None:
# Updates are always one_rr_per_rrset
return True
- def _parse_rr_header(self, section, name, rdclass, rdtype): # pyright: ignore
+ def _parse_rr_header(self, section, name, rdclass, rdtype): # type: ignore
deleting = None
empty = False
if section == UpdateSection.ZONE:
# Note our definition of least_kept also ensures we do not try to
# delete the greatest version.
if len(self._readers) > 0:
- least_kept = min(txn.version.id for txn in self._readers) # pyright: ignore
+ least_kept = min(txn.version.id for txn in self._readers) # type: ignore
else:
least_kept = self._versions[-1].id
while self._versions[0].id < least_kept and self._pruning_policy(
raise ValueError("max versions must be at least 1")
if max_versions is None:
# pylint: disable=unused-argument
- def policy(zone, _): # pyright: ignore
+ def policy(zone, _): # type: ignore
return False
else:
if rdataset.rdtype != dns.rdatatype.SOA:
raise dns.exception.FormError("first RRset is not an SOA")
answer_index = 1
- self.soa_rdataset = rdataset.copy() # pyright: ignore
+ self.soa_rdataset = rdataset.copy() # type: ignore
if self.incremental:
assert self.soa_rdataset is not None
soa = cast(dns.rdtypes.ANY.SOA.SOA, self.soa_rdataset[0])
for n in names:
l = self[n].to_text(
n,
- origin=self.origin, # pyright: ignore
- relativize=relativize, # pyright: ignore
- want_comments=want_comments, # pyright: ignore
+ origin=self.origin, # type: ignore
+ relativize=relativize, # type: ignore
+ want_comments=want_comments, # type: ignore
)
l_b = l.encode(file_enc)
def _setup_version(self):
assert self.version is None
- factory = self.manager.writable_version_factory # pyright: ignore
+ factory = self.manager.writable_version_factory # type: ignore
if factory is None:
factory = WritableVersion
- self.version = factory(self.zone, self.replacement) # pyright: ignore
+ self.version = factory(self.zone, self.replacement) # type: ignore
def _get_rdataset(self, name, rdtype, covers):
assert self.version is not None
assert self.zone is not None
assert self.version is not None
if self.read_only:
- self.zone._end_read(self) # pyright: ignore
+ self.zone._end_read(self) # type: ignore
elif commit and len(self.version.changed) > 0:
if self.make_immutable:
- factory = self.manager.immutable_version_factory # pyright: ignore
+ factory = self.manager.immutable_version_factory # type: ignore
if factory is None:
factory = ImmutableVersion
version = factory(self.version)
else:
version = self.version
- self.zone._commit_version( # pyright: ignore
+ self.zone._commit_version( # type: ignore
self, version, self.version.origin
)
else:
# rollback
- self.zone._end_write(self) # pyright: ignore
+ self.zone._end_write(self) # type: ignore
def _set_origin(self, origin):
assert self.version is not None
)
rrset.update(rdataset)
rrsets.append(rrset)
- self.manager.set_rrsets(rrsets) # pyright: ignore
+ self.manager.set_rrsets(rrsets) # type: ignore
def _set_origin(self, origin):
pass