From: Brian Wellington Date: Mon, 29 Jun 2020 20:15:57 +0000 (-0700) Subject: Create TSIGRecord; use it in _WireReader. X-Git-Tag: v2.0.0rc2~35^2~8 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6ea8e8a71fd855c61b3d6095ce536105fbedf8d7;p=thirdparty%2Fdnspython.git Create TSIGRecord; use it in _WireReader. --- diff --git a/dns/message.py b/dns/message.py index 597b329f..1e67c99f 100644 --- a/dns/message.py +++ b/dns/message.py @@ -649,11 +649,17 @@ class Message: raise dns.exception.FormError return (rdclass, rdtype, None, False) - def _parse_special_rr_header(self, section, name, rdclass, rdtype): + def _parse_special_rr_header(self, section, count, position, + name, rdclass, rdtype): if rdtype == dns.rdatatype.OPT: if section != MessageSection.ADDITIONAL or self.opt or \ name != dns.name.root: raise BadEDNS + elif rdtype == dns.rdatatype.TSIG: + if section != MessageSection.ADDITIONAL or \ + rdclass != dns.rdatatype.ANY or \ + position != count - 1: + raise dns.error.FormError return (rdclass, rdtype, None, False) @@ -746,19 +752,36 @@ class _WireReader: struct.unpack('!HHIH', self.wire[self.current:self.current + 10]) self.current += 10 - if rdtype == dns.rdatatype.TSIG: - if not (section is self.message.additional and - i == (count - 1)): - raise BadTSIG + if rdtype in (dns.rdatatype.OPT, dns.rdatatype.TSIG): + (rdclass, rdtype, deleting, empty) = \ + self.message._parse_special_rr_header(section_number, + count, i, name, + rdclass, rdtype) + else: + (rdclass, rdtype, deleting, empty) = \ + self.message._parse_rr_header(section_number, + name, rdclass, rdtype) + if empty: + if rdlen > 0: + raise dns.exception.FormError + rd = None + covers = dns.rdatatype.NONE + else: + rd = dns.rdata.from_wire(rdclass, rdtype, + self.wire, self.current, rdlen, + self.message.origin) + covers = rd.covers() + if self.message.xfr and rdtype == dns.rdatatype.SOA: + force_unique = True + if rdtype == dns.rdatatype.OPT: + self.message.opt = dns.rrset.from_rdata(name, ttl, rd) + elif rdtype == dns.rdatatype.TSIG: if self.message.keyring is None: raise UnknownTSIGKey('got signed message without keyring') secret = self.message.keyring.get(absolute_name) if secret is None: raise UnknownTSIGKey("key '%s' unknown" % name) self.message.keyname = absolute_name - (self.message.keyalgorithm, self.message.mac) = \ - dns.tsig.get_algorithm_and_mac(self.wire, self.current, - rdlen) self.message.tsig_ctx = \ dns.tsig.validate(self.wire, absolute_name, @@ -771,40 +794,18 @@ class _WireReader: self.message.tsig_ctx, self.message.multi, self.message.first) + self.message.keyalgorithm = rd.algorithm + self.message.mac = rd.mac self.message.had_tsig = True else: - if rdtype == dns.rdatatype.OPT: - (rdclass, rdtype, deleting, empty) = \ - self.message._parse_special_rr_header(section_number, - name, - rdclass, rdtype) - else: - (rdclass, rdtype, deleting, empty) = \ - self.message._parse_rr_header(section_number, - name, rdclass, rdtype) - if empty: - if rdlen > 0: - raise dns.exception.FormError - rd = None - covers = dns.rdatatype.NONE - else: - rd = dns.rdata.from_wire(rdclass, rdtype, - self.wire, self.current, rdlen, - self.message.origin) - covers = rd.covers() - if self.message.xfr and rdtype == dns.rdatatype.SOA: - force_unique = True - if rdtype == dns.rdatatype.OPT: - self.message.opt = dns.rrset.from_rdata(name, ttl, rd) - else: - rrset = self.message.find_rrset(section, name, - rdclass, rdtype, covers, - deleting, True, - force_unique) - if rd is not None: - if ttl > 0x7fffffff: - ttl = 0 - rrset.add(rd, ttl) + rrset = self.message.find_rrset(section, name, + rdclass, rdtype, covers, + deleting, True, + force_unique) + if rd is not None: + if ttl > 0x7fffffff: + ttl = 0 + rrset.add(rd, ttl) self.current += rdlen def read(self): diff --git a/dns/rdtypes/ANY/TSIG.py b/dns/rdtypes/ANY/TSIG.py new file mode 100644 index 00000000..e3937c6f --- /dev/null +++ b/dns/rdtypes/ANY/TSIG.py @@ -0,0 +1,112 @@ +# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license + +# Copyright (C) 2001-2017 Nominum, Inc. +# +# Permission to use, copy, modify, and distribute this software and its +# documentation for any purpose with or without fee is hereby granted, +# provided that the above copyright notice and this permission notice +# appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT +# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +import struct + +import dns.exception +import dns.rdata + + +class TSIG(dns.rdata.Rdata): + + """TSIG record""" + + __slots__ = ['algorithm', 'time_signed', 'fudge', 'mac', + 'original_id', 'error', 'other'] + + def __init__(self, rdclass, rdtype, algorithm, time_signed, fudge, mac, + original_id, error, other): + """Initialize a TSIG rdata. + + *rdclass*, an ``int`` is the rdataclass of the Rdata. + + *rdtype*, an ``int`` is the rdatatype of the Rdata. + + *algorithm*, a ``dns.name.Name``. + + *time_signed*, an ``int``. + + *fudge*, an ``int`. + + *mac*, a ``bytes`` + + *original_id*, an ``int`` + + *error*, an ``int`` + + *other*, a ``bytes`` + """ + + super().__init__(rdclass, rdtype) + object.__setattr__(self, 'algorithm', algorithm) + object.__setattr__(self, 'time_signed', time_signed) + object.__setattr__(self, 'fudge', fudge) + object.__setattr__(self, 'mac', dns.rdata._constify(mac)) + object.__setattr__(self, 'original_id', original_id) + object.__setattr__(self, 'error', error) + object.__setattr__(self, 'other', dns.rdata._constify(other)) + + def to_text(self, origin=None, relativize=True, **kw): + algorithm = self.algorithm.choose_relativity(origin, relativize) + return f"{algorithm} ... {self.fudge} {self.time_signed} " + \ + f"{len(self.mac)} {dns.rdata._base64ify(self.mac)} " + \ + f"{self.original_id} {self.error} " + \ + f"{len(self.other)} {dns.rdata._base64ify(self.other)}" + + def _to_wire(self, file, compress=None, origin=None, canonicalize=False): + self.algorithm.to_wire(file, None, origin, False) + file.write(struct.pack('!HIHH', + (self.time_signed >> 32) & 0xffff, + self.time_signed & 0xffffffff, + self.fudge, + len(self.mac))) + file.write(self.mac) + file.write(struct.pack('HHH', self.original_id, self.error, + len(self.other))) + file.write(self.other) + + @classmethod + def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None): + (algorithm, cused) = dns.name.from_wire(wire[: current + rdlen], + current) + current += cused + rdlen -= cused + if rdlen < 10: + raise dns.exception.FormError + (time_hi, time_lo, fudge, mac_len) = \ + struct.unpack('!HIHH', wire[current: current + 10]) + current += 10 + rdlen -= 10 + time_signed = (time_hi << 32) + time_lo + if rdlen < mac_len: + raise dns.exception.FormError + mac = wire[current: current + mac_len].unwrap() + current += mac_len + rdlen -= mac_len + if rdlen < 6: + raise dns.exception.FormError + (original_id, error, other_len) = \ + struct.unpack('!HHH', wire[current: current + 6]) + current += 6 + rdlen -= 6 + if rdlen < other_len: + raise dns.exception.FormError + other = wire[current: current + other_len].unwrap() + current += other_len + rdlen -= other_len + return cls(rdclass, rdtype, algorithm, time_signed, fudge, mac, + original_id, error, other) diff --git a/dns/rdtypes/ANY/__init__.py b/dns/rdtypes/ANY/__init__.py index ca41ef80..ea704c86 100644 --- a/dns/rdtypes/ANY/__init__.py +++ b/dns/rdtypes/ANY/__init__.py @@ -43,6 +43,7 @@ __all__ = [ 'NSEC3', 'NSEC3PARAM', 'OPENPGPKEY', + 'OPT', 'PTR', 'RP', 'RRSIG', @@ -51,6 +52,7 @@ __all__ = [ 'SPF', 'SSHFP', 'TLSA', + 'TSIG', 'TXT', 'URI', 'X25',