self.origin = None
self.tsig_ctx = None
self.multi = False
- self.first = True
self.index = {}
@property
int(time.time()),
self.request_mac,
tsig_ctx,
- multi,
- tsig_ctx is None)
+ multi)
self.tsig.clear()
self.tsig.add(new_tsig)
r.add_rrset(dns.renderer.ADDITIONAL, self.tsig)
self.message.request_mac,
rr_start,
self.message.tsig_ctx,
- self.message.multi,
- self.message.first)
+ self.message.multi)
self.message.tsig = dns.rrset.from_rdata(absolute_name, 0, rd)
else:
rrset = self.message.find_rrset(section, name,
def from_wire(wire, keyring=None, request_mac=b'', xfr=False, origin=None,
- tsig_ctx=None, multi=False, first=True,
+ tsig_ctx=None, multi=False,
question_only=False, one_rr_per_rrset=False,
ignore_trailing=False, raise_on_truncation=False):
"""Convert a DNS wire format message into a message
*multi*, a ``bool``, should be set to ``True`` if this message is
part of a multiple message sequence.
- *first*, a ``bool``, should be set to ``True`` if this message is
- stand-alone, or the first message in a multi-message sequence.
-
*question_only*, a ``bool``. If ``True``, read only up to
the end of the question section.
message.origin = origin
message.tsig_ctx = tsig_ctx
message.multi = multi
- message.first = first
reader = _WireReader(wire, initialize_message, question_only,
one_rr_per_rrset, ignore_trailing)
self.tsig_ctx = None
self.had_tsig = False
self.multi = False
- self.first = True
self.index : Dict[Tuple[rrset.RRset, name.Name, int, int, Union[int,str], int], rrset.RRset] = {}
def is_response(self, other : Message) -> bool:
...
def from_wire(wire, keyring : Optional[Dict[name.Name,bytes]] = None, request_mac = b'', xfr=False, origin=None,
- tsig_ctx : Optional[hmac.HMAC] = None, multi=False, first=True,
+ tsig_ctx : Optional[hmac.HMAC] = None, multi=False,
question_only=False, one_rr_per_rrset=False,
ignore_trailing=False) -> Message:
...
origin = None
oname = zone
tsig_ctx = None
- first = True
while not done:
(_, mexpiration) = _compute_times(timeout)
if mexpiration is None or \
r = dns.message.from_wire(wire, keyring=q.keyring,
request_mac=q.mac, xfr=True,
origin=origin, tsig_ctx=tsig_ctx,
- multi=True, first=first,
- one_rr_per_rrset=is_ixfr)
+ multi=True, one_rr_per_rrset=is_ixfr)
rcode = r.rcode()
if rcode != dns.rcode.NOERROR:
raise TransferError(rcode)
tsig_ctx = r.tsig_ctx
- first = False
answer_index = 0
if soa_rrset is None:
if not r.answer or r.answer[0].name != oname:
def sign(wire, keyname, rdata, secret, time=None, request_mac=None,
- ctx=None, multi=False, first=True):
+ ctx=None, multi=False):
"""Return a (tsig_rdata, mac, ctx) tuple containing the HMAC TSIG rdata
for the input parameters, the HMAC MAC calculated by applying the
TSIG signature algorithm, and the TSIG digest context.
@raises NotImplementedError: I{algorithm} is not supported
"""
+ first = not (ctx and multi)
(algorithm_name, digestmod) = get_algorithm(rdata.algorithm)
if first:
ctx = hmac.new(secret, digestmod=digestmod)
def validate(wire, keyname, rdata, secret, now, request_mac, tsig_start,
- ctx=None, multi=False, first=True):
+ ctx=None, multi=False):
"""Validate the specified TSIG rdata against the other input parameters.
@raises FormError: The TSIG is badly formed.
if abs(rdata.time_signed - now) > rdata.fudge:
raise BadTime
(our_rdata, ctx) = sign(new_wire, keyname, rdata, secret, None, request_mac,
- ctx, multi, first)
+ ctx, multi)
if our_rdata.mac != rdata.mac:
raise BadSignature
return ctx