async def inbound_xfr(where, txn_manager, query=None,
port=53, timeout=None, lifetime=None, source=None,
- source_port=0, udp_mode=UDPMode.NEVER, serial=0,
- backend=None):
+ source_port=0, udp_mode=UDPMode.NEVER, backend=None):
"""Conduct an inbound transfer and apply it via a transaction from the
txn_manager.
"""
if query is None:
(query, serial) = dns.xfr.make_query(txn_manager)
+ else:
+ serial = dns.xfr.extract_serial_from_query(query)
rdtype = query.question[0].rdtype
is_ixfr = rdtype == dns.rdatatype.IXFR
origin = txn_manager.from_wire_origin()
def inbound_xfr(where, txn_manager, query=None,
port=53, timeout=None, lifetime=None, source=None,
- source_port=0, udp_mode=UDPMode.NEVER, serial=0):
+ source_port=0, udp_mode=UDPMode.NEVER):
"""Conduct an inbound transfer and apply it via a transaction from the
txn_manager.
"""
if query is None:
(query, serial) = dns.xfr.make_query(txn_manager)
+ else:
+ serial = dns.xfr.extract_serial_from_query(query)
rdtype = query.question[0].rdtype
is_ixfr = rdtype == dns.rdatatype.IXFR
origin = txn_manager.from_wire_origin()
if keyring is not None:
q.use_tsig(keyring, keyname, algorithm=keyalgorithm)
return (q, serial)
+
+def extract_serial_from_query(query):
+ """Extract the SOA serial number from query if it is an IXFR and return
+ it, otherwise return None.
+
+ *query* is a dns.message.QueryMessage that is an IXFR or AXFR request.
+
+ Raises if the query is not an IXFR or AXFR, or if an IXFR doesn't have
+ an appropriate SOA RRset in the authority section."""
+
+ question = query.question[0]
+ if question.rdtype == dns.rdatatype.AXFR:
+ return None
+ elif question.rdtype != dns.rdatatype.IXFR:
+ raise ValueError("query is not an AXFR or IXFR")
+ print(question.name, question.rdclass)
+ soa = query.find_rrset(query.authority, question.name, question.rdclass,
+ dns.rdatatype.SOA)
+ return soa[0].serial
with pytest.raises(ValueError):
dns.xfr.make_query(z, serial=4294967296)
+def test_extract_serial_from_query():
+ z = dns.versioned.Zone('example.')
+ (q, s) = dns.xfr.make_query(z)
+ xs = dns.xfr.extract_serial_from_query(q)
+ assert s is None
+ assert s == xs
+ (q, s) = dns.xfr.make_query(z, serial=10)
+ print(q)
+ xs = dns.xfr.extract_serial_from_query(q)
+ assert s == 10
+ assert s == xs
+ q = dns.message.make_query('example', 'a')
+ with pytest.raises(ValueError):
+ dns.xfr.extract_serial_from_query(q)
+
class XFRNanoNameserver(Server):