raise dns.exception.FormError
return (rdclass, rdtype, None, False)
- def _parse_special_rr_header(self, section, name, rdclass, rdtype):
+ def _parse_special_rr_header(self, section, count, position,
+ name, rdclass, rdtype):
if rdtype == dns.rdatatype.OPT:
if section != MessageSection.ADDITIONAL or self.opt or \
name != dns.name.root:
raise BadEDNS
+ elif rdtype == dns.rdatatype.TSIG:
+ if section != MessageSection.ADDITIONAL or \
+ rdclass != dns.rdatatype.ANY or \
+ position != count - 1:
+ raise dns.error.FormError
return (rdclass, rdtype, None, False)
struct.unpack('!HHIH',
self.wire[self.current:self.current + 10])
self.current += 10
- if rdtype == dns.rdatatype.TSIG:
- if not (section is self.message.additional and
- i == (count - 1)):
- raise BadTSIG
+ if rdtype in (dns.rdatatype.OPT, dns.rdatatype.TSIG):
+ (rdclass, rdtype, deleting, empty) = \
+ self.message._parse_special_rr_header(section_number,
+ count, i, name,
+ rdclass, rdtype)
+ else:
+ (rdclass, rdtype, deleting, empty) = \
+ self.message._parse_rr_header(section_number,
+ name, rdclass, rdtype)
+ if empty:
+ if rdlen > 0:
+ raise dns.exception.FormError
+ rd = None
+ covers = dns.rdatatype.NONE
+ else:
+ rd = dns.rdata.from_wire(rdclass, rdtype,
+ self.wire, self.current, rdlen,
+ self.message.origin)
+ covers = rd.covers()
+ if self.message.xfr and rdtype == dns.rdatatype.SOA:
+ force_unique = True
+ if rdtype == dns.rdatatype.OPT:
+ self.message.opt = dns.rrset.from_rdata(name, ttl, rd)
+ elif rdtype == dns.rdatatype.TSIG:
if self.message.keyring is None:
raise UnknownTSIGKey('got signed message without keyring')
secret = self.message.keyring.get(absolute_name)
if secret is None:
raise UnknownTSIGKey("key '%s' unknown" % name)
self.message.keyname = absolute_name
- (self.message.keyalgorithm, self.message.mac) = \
- dns.tsig.get_algorithm_and_mac(self.wire, self.current,
- rdlen)
self.message.tsig_ctx = \
dns.tsig.validate(self.wire,
absolute_name,
self.message.tsig_ctx,
self.message.multi,
self.message.first)
+ self.message.keyalgorithm = rd.algorithm
+ self.message.mac = rd.mac
self.message.had_tsig = True
else:
- if rdtype == dns.rdatatype.OPT:
- (rdclass, rdtype, deleting, empty) = \
- self.message._parse_special_rr_header(section_number,
- name,
- rdclass, rdtype)
- else:
- (rdclass, rdtype, deleting, empty) = \
- self.message._parse_rr_header(section_number,
- name, rdclass, rdtype)
- if empty:
- if rdlen > 0:
- raise dns.exception.FormError
- rd = None
- covers = dns.rdatatype.NONE
- else:
- rd = dns.rdata.from_wire(rdclass, rdtype,
- self.wire, self.current, rdlen,
- self.message.origin)
- covers = rd.covers()
- if self.message.xfr and rdtype == dns.rdatatype.SOA:
- force_unique = True
- if rdtype == dns.rdatatype.OPT:
- self.message.opt = dns.rrset.from_rdata(name, ttl, rd)
- else:
- rrset = self.message.find_rrset(section, name,
- rdclass, rdtype, covers,
- deleting, True,
- force_unique)
- if rd is not None:
- if ttl > 0x7fffffff:
- ttl = 0
- rrset.add(rd, ttl)
+ rrset = self.message.find_rrset(section, name,
+ rdclass, rdtype, covers,
+ deleting, True,
+ force_unique)
+ if rd is not None:
+ if ttl > 0x7fffffff:
+ ttl = 0
+ rrset.add(rd, ttl)
self.current += rdlen
def read(self):
--- /dev/null
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+
+# Copyright (C) 2001-2017 Nominum, Inc.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose with or without fee is hereby granted,
+# provided that the above copyright notice and this permission notice
+# appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+import struct
+
+import dns.exception
+import dns.rdata
+
+
+class TSIG(dns.rdata.Rdata):
+
+ """TSIG record"""
+
+ __slots__ = ['algorithm', 'time_signed', 'fudge', 'mac',
+ 'original_id', 'error', 'other']
+
+ def __init__(self, rdclass, rdtype, algorithm, time_signed, fudge, mac,
+ original_id, error, other):
+ """Initialize a TSIG rdata.
+
+ *rdclass*, an ``int`` is the rdataclass of the Rdata.
+
+ *rdtype*, an ``int`` is the rdatatype of the Rdata.
+
+ *algorithm*, a ``dns.name.Name``.
+
+ *time_signed*, an ``int``.
+
+ *fudge*, an ``int`.
+
+ *mac*, a ``bytes``
+
+ *original_id*, an ``int``
+
+ *error*, an ``int``
+
+ *other*, a ``bytes``
+ """
+
+ super().__init__(rdclass, rdtype)
+ object.__setattr__(self, 'algorithm', algorithm)
+ object.__setattr__(self, 'time_signed', time_signed)
+ object.__setattr__(self, 'fudge', fudge)
+ object.__setattr__(self, 'mac', dns.rdata._constify(mac))
+ object.__setattr__(self, 'original_id', original_id)
+ object.__setattr__(self, 'error', error)
+ object.__setattr__(self, 'other', dns.rdata._constify(other))
+
+ def to_text(self, origin=None, relativize=True, **kw):
+ algorithm = self.algorithm.choose_relativity(origin, relativize)
+ return f"{algorithm} ... {self.fudge} {self.time_signed} " + \
+ f"{len(self.mac)} {dns.rdata._base64ify(self.mac)} " + \
+ f"{self.original_id} {self.error} " + \
+ f"{len(self.other)} {dns.rdata._base64ify(self.other)}"
+
+ def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
+ self.algorithm.to_wire(file, None, origin, False)
+ file.write(struct.pack('!HIHH',
+ (self.time_signed >> 32) & 0xffff,
+ self.time_signed & 0xffffffff,
+ self.fudge,
+ len(self.mac)))
+ file.write(self.mac)
+ file.write(struct.pack('HHH', self.original_id, self.error,
+ len(self.other)))
+ file.write(self.other)
+
+ @classmethod
+ def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
+ (algorithm, cused) = dns.name.from_wire(wire[: current + rdlen],
+ current)
+ current += cused
+ rdlen -= cused
+ if rdlen < 10:
+ raise dns.exception.FormError
+ (time_hi, time_lo, fudge, mac_len) = \
+ struct.unpack('!HIHH', wire[current: current + 10])
+ current += 10
+ rdlen -= 10
+ time_signed = (time_hi << 32) + time_lo
+ if rdlen < mac_len:
+ raise dns.exception.FormError
+ mac = wire[current: current + mac_len].unwrap()
+ current += mac_len
+ rdlen -= mac_len
+ if rdlen < 6:
+ raise dns.exception.FormError
+ (original_id, error, other_len) = \
+ struct.unpack('!HHH', wire[current: current + 6])
+ current += 6
+ rdlen -= 6
+ if rdlen < other_len:
+ raise dns.exception.FormError
+ other = wire[current: current + other_len].unwrap()
+ current += other_len
+ rdlen -= other_len
+ return cls(rdclass, rdtype, algorithm, time_signed, fudge, mac,
+ original_id, error, other)