from typing import AsyncGenerator
import dns.name
+import dns.rcode
import dns.rdataclass
import dns.rdatatype
import dns.rrset
import logging
import re
+import dns.name
import dns.rcode
-import dns.rdata
import dns.rdataclass
import dns.rdatatype
import dns.rrset
from typing import AsyncGenerator
-import dns
+import dns.edns
+import dns.message
+import dns.name
+import dns.rdatatype
+import dns.rrset
import dns.tsigkeyring
from isctest.asyncserver import (
def _soa(qctx: QueryContext) -> dns.rrset.RRset:
return dns.rrset.from_text(
- _tld(qctx), 2, dns.rdataclass.IN, dns.rdatatype.SOA, ". . 0 0 0 0 2"
+ _tld(qctx), 2, qctx.qclass, dns.rdatatype.SOA, ". . 0 0 0 0 2"
)
return dns.rrset.from_text(
qctx.qname,
1,
- dns.rdataclass.IN,
+ qctx.qclass,
dns.rdatatype.NS,
_ns_name(qctx).to_text(),
)
def _legit_a(qctx: QueryContext) -> dns.rrset.RRset:
- return dns.rrset.from_text(
- qctx.qname, 1, dns.rdataclass.IN, dns.rdatatype.A, "10.53.0.9"
- )
+ return dns.rrset.from_text(qctx.qname, 1, qctx.qclass, dns.rdatatype.A, "10.53.0.9")
def _spoofed_a(qctx: QueryContext) -> dns.rrset.RRset:
return dns.rrset.from_text(
- qctx.qname, 1, dns.rdataclass.IN, dns.rdatatype.A, "10.53.0.10"
+ qctx.qname, 1, qctx.qclass, dns.rdatatype.A, "10.53.0.10"
)
from typing import AsyncGenerator
-import dns
+import dns.flags
+import dns.rcode
from isctest.asyncserver import (
AsyncDnsServer,
from typing import AsyncGenerator
-import dns
+import dns.rdatatype
+import dns.rrset
from isctest.asyncserver import (
AsyncDnsServer,
"gB+eISXAhSPZU2i/II0W9ZUhC2SCIrb94mlNvP5092WAeXxqN/vG43/1nmDly2Qs7y5VCjSMOGn85bnaMoAc7w=="
)
rrsig_rrset = dns.rrset.from_text(
- qctx.qname, 1, dns.rdataclass.IN, dns.rdatatype.RRSIG, rrsig
+ qctx.qname, 1, qctx.qclass, dns.rdatatype.RRSIG, rrsig
)
qctx.response.answer.append(rrsig_rrset)
yield DnsResponseSend(qctx.response)
) -> AsyncGenerator[DnsResponseSend, None]:
nsec = f"{qctx.qname.to_text()} A NS SOA RRSIG NSEC"
nsec_rrset = dns.rrset.from_text(
- qctx.qname, 1, dns.rdataclass.IN, dns.rdatatype.NSEC, nsec
+ qctx.qname, 1, qctx.qclass, dns.rdatatype.NSEC, nsec
)
qctx.response.authority.append(nsec_rrset)
yield DnsResponseSend(qctx.response)
from typing import AsyncGenerator
-import dns
+import dns.rcode
+import dns.rdatatype
+import dns.rrset
from isctest.asyncserver import (
ControllableAsyncDnsServer,
from typing import AsyncGenerator
import dns.rdatatype
+import dns.rrset
from isctest.asyncserver import (
ControllableAsyncDnsServer,
from typing import AsyncGenerator
-import dns
+import dns.name
+import dns.rcode
+import dns.rdatatype
+import dns.rrset
from isctest.asyncserver import (
ControllableAsyncDnsServer,
import struct
import sys
+import dns.exception
import dns.flags
import dns.message
import dns.name
import dns.node
import dns.rcode
+import dns.rdata
import dns.rdataclass
+import dns.rdataset
import dns.rdatatype
import dns.rrset
import dns.tsig
import dns.message
import dns.name
import dns.rcode
-import dns.rdataclass
import dns.rdatatype
+import dns.rrset
from isctest.asyncserver import (
AsyncDnsServer,
ADDITIONAL section.
"""
ns_name = "ns." + zone_cut.to_text()
- ns_rrset = dns.rrset.from_text(
- zone_cut, 2, dns.rdataclass.IN, dns.rdatatype.NS, ns_name
- )
- a_rrset = dns.rrset.from_text(
- ns_name, 2, dns.rdataclass.IN, dns.rdatatype.A, target_addr
- )
+ ns_rrset = dns.rrset.from_text(zone_cut, 2, qctx.qclass, dns.rdatatype.NS, ns_name)
+ a_rrset = dns.rrset.from_text(ns_name, 2, qctx.qclass, dns.rdatatype.A, target_addr)
response = dns.message.make_response(qctx.query)
response.set_rcode(dns.rcode.NOERROR)
from typing import AsyncGenerator
import dns.rcode
+import dns.rdatatype
from isctest.asyncserver import (
AsyncDnsServer,
import abc
import dns.rcode
-import dns.rdataclass
import dns.rdatatype
from isctest.asyncserver import (
from typing import AsyncGenerator
-import dns
+import dns.rcode
+import dns.rdatatype
+import dns.rrset
from isctest.asyncserver import (
AsyncDnsServer,
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
a_rrset = dns.rrset.from_text(
- qctx.qname, 300, dns.rdataclass.IN, dns.rdatatype.A, "10.53.0.5"
+ qctx.qname, 300, qctx.qclass, dns.rdatatype.A, "10.53.0.5"
)
qctx.response.answer.append(a_rrset)
yield DnsResponseSend(qctx.response)
from typing import AsyncGenerator
-import dns
+import dns.rcode
+import dns.rdatatype
+import dns.rrset
from isctest.asyncserver import (
AsyncDnsServer,
from typing import AsyncGenerator
import dns.message
-import dns.rdataclass
import dns.rdatatype
import dns.rrset
soa_rrset = dns.rrset.from_text(
qctx.qname,
300,
- dns.rdataclass.IN,
+ qctx.qclass,
dns.rdatatype.SOA,
f". . {self.soa_version} 0 0 0 0",
)
# will already have been done with the mandatory stuff by then.
ns_message = dns.message.make_response(qctx.query)
ns_rrset = dns.rrset.from_text(
- qctx.qname, 300, dns.rdataclass.IN, dns.rdatatype.NS, "."
+ qctx.qname, 300, qctx.qclass, dns.rdatatype.NS, "."
)
ns_message.answer.append(ns_rrset)
txt_rrset = dns.rrset.from_text(
qctx.qname,
300,
- dns.rdataclass.IN,
+ qctx.qclass,
dns.rdatatype.TXT,
"foo bar",
)
import ipaddress
from typing import AsyncGenerator
-import dns.flags
-import dns.message
-import dns.rdata
-import dns.rdataclass
+import dns.rcode
import dns.rdatatype
import dns.rrset