from __future__ import generators
+import urllib.request
import errno
import select
import socket
return (af, destination, source)
+def doh(query, nameserver):
+ """Return the response obtained after sending a query via DoH.
+
+ *query*, a ``dns.message.Message`` containing the query to send
+
+ *nameserver*, a ``str`` containing the nameserver URL.
+
+ Returns a ``dns.message.Message``.
+ """
+
+ wirequery = query.to_wire()
+ headers = {
+ 'Accept': 'application/dns-message',
+ 'Content-Type': 'application/dns-message',
+ }
+
+ request = urllib.request.Request(nameserver, data=wirequery, headers=headers)
+ response = urllib.request.urlopen(request).read()
+
+ return dns.message.from_wire(response)
+
def send_udp(sock, what, destination, expiration=None):
"""Send a DNS message to the specified UDP socket.
from typing import Optional, Union, Dict, Generator, Any
from . import message, tsig, rdatatype, rdataclass, name, message
+def doh(query : message.Message, nameserver : str) -> message.Message:
+ pass
+
def tcp(q : message.Message, where : str, timeout : float = None, port=53, af : Optional[int] = None, source : Optional[str] = None, source_port : int = 0,
one_rr_per_rrset=False) -> message.Message:
pass
"""DNS stub resolver."""
+from urllib.parse import urlparse
import socket
import sys
import time
for nameserver in nameservers[:]:
timeout = self._compute_timeout(start, lifetime)
port = self.nameserver_ports.get(nameserver, self.port)
+ protocol = urlparse(nameserver).scheme
try:
- tcp_attempt = tcp
- if tcp:
- response = dns.query.tcp(request, nameserver,
- timeout, port,
- source=source,
- source_port=source_port)
+ if protocol == 'https':
+ tcp_attempt = True
+ response = dns.query.doh(request, nameserver)
else:
- try:
- response = dns.query.udp(request, nameserver,
+ tcp_attempt = tcp
+ if tcp:
+ response = dns.query.tcp(request, nameserver,
timeout, port,
source=source,
- source_port=\
- source_port)
- except dns.message.Truncated:
- # Response truncated; retry with TCP.
- tcp_attempt = True
- timeout = self._compute_timeout(start, lifetime)
- response = \
- dns.query.tcp(request, nameserver,
- timeout, port,
- source=source,
- source_port=source_port)
+ source_port=source_port)
+ else:
+ try:
+ response = dns.query.udp(request, nameserver,
+ timeout, port,
+ source=source,
+ source_port=\
+ source_port)
+ except dns.message.Truncated:
+ # Response truncated; retry with TCP.
+ tcp_attempt = True
+ timeout = self._compute_timeout(start, lifetime)
+ response = \
+ dns.query.tcp(request, nameserver,
+ timeout, port,
+ source=source,
+ source_port=source_port)
except (socket.error, dns.exception.Timeout) as ex:
#
# Communication failure or timeout. Go to the