if rdtype == dns.rdatatype.OPT:
self.message.opt = dns.rrset.from_rdata(name, ttl, rd)
elif rdtype == dns.rdatatype.TSIG:
- if self.keyring is None:
+ if self.keyring is None or self.keyring is True:
raise UnknownTSIGKey("got signed message without keyring")
- if isinstance(self.keyring, dict):
+ elif isinstance(self.keyring, dict):
key = self.keyring.get(absolute_name)
if isinstance(key, bytes):
key = dns.tsig.Key(absolute_name, key, rd.algorithm)
key = self.keyring
if key is None:
raise UnknownTSIGKey(f"key '{name}' unknown")
- self.message.keyring = key
- self.message.tsig_ctx = dns.tsig.validate(
- self.parser.wire,
- key,
- absolute_name,
- rd,
- int(time.time()),
- self.message.request_mac,
- rr_start,
- self.message.tsig_ctx,
- self.multi,
- )
+ if key:
+ self.message.keyring = key
+ self.message.tsig_ctx = dns.tsig.validate(
+ self.parser.wire,
+ key,
+ absolute_name,
+ rd,
+ int(time.time()),
+ self.message.request_mac,
+ rr_start,
+ self.message.tsig_ctx,
+ self.multi,
+ )
self.message.tsig = dns.rrset.from_rdata(absolute_name, 0, rd)
else:
rrset = self.message.find_rrset(
) -> Message:
"""Convert a DNS wire format message into a message object.
- *keyring*, a ``dns.tsig.Key`` or ``dict``, the key or keyring to use if the message
- is signed.
+ *keyring*, a ``dns.tsig.Key``, ``dict``, ``bool``, or ``None``, the key or keyring
+ to use if the message is signed. If ``None`` or ``True``, then trying to decode
+ a message with a TSIG will fail as it cannot be validated. If ``False``, then
+ TSIG validation is disabled.
*request_mac*, a ``bytes`` or ``None``. If the message is a response to a
TSIG-signed request, *request_mac* should be set to the MAC of that request.
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+import base64
+import time
import unittest
from unittest.mock import Mock
-import time
-import base64
+import dns.message
import dns.rcode
+import dns.rdtypes.ANY.TKEY
import dns.tsig
import dns.tsigkeyring
-import dns.message
-import dns.rdtypes.ANY.TKEY
keyring = dns.tsigkeyring.from_text({"keyname.": "NjHwPsMKjdN++dOfE5iAiQ=="})
# not raising is passing
dns.message.from_wire(w, keyring)
+ def test_signature_is_invalid(self):
+ m = dns.message.make_query("example", "a")
+ m.use_tsig(keyring, keyname)
+ w = m.to_wire()
+ b = bytearray(w)
+ # corrupt hash
+ b[-7] = (b[-7] + 1) & 0xFF
+ w = bytes(b)
+ with self.assertRaises(dns.tsig.BadSignature):
+ dns.message.from_wire(w, keyring)
+
+ def test_signature_is_invalid_and_ignored(self):
+ m = dns.message.make_query("example", "a")
+ m.use_tsig(keyring, keyname)
+ w = m.to_wire()
+ b = bytearray(w)
+ # corrupt hash
+ b[-7] = (b[-7] + 1) & 0xFF
+ w = bytes(b)
+ m2 = dns.message.from_wire(w, False)
+ self.assertIsNotNone(m2.tsig)
+
def test_validate_with_bad_keyring(self):
m = dns.message.make_query("example", "a")
m.use_tsig(keyring, keyname)