return (af, destination, source)
-def https(q, where, timeout=None, af=None, source_port=0,
+def https(q, where, timeout=None, af=None, source=None, source_port=0,
one_rr_per_rrset=False, ignore_trailing=False,
post=True, path='/dns-query', verify=True):
"""Return the response obtained after sending a query via DNS-over-HTTPS.
"""
wire = q.to_wire()
+ port = 443
+ (af, destination, source) = _destination_and_source(af, where, port,
+ source, source_port)
+
+ # see https://github.com/requests/toolbelt/blob/master/requests_toolbelt/adapters/source.py
+ from requests_toolbelt.adapters.source import SourceAddressAdapter
+ session = requests.Session()
+ session.mount('http://', SourceAddressAdapter(source))
+ session.mount('https://', SourceAddressAdapter(source))
+
+ # see https://stackoverflow.com/a/46972341/9638991
+ import requests.packages.urllib3.util.connection as urllib3_cn
+
+ def allowed_gai_family():
+ return af
+
+ urllib3_cn.allowed_gai_family = allowed_gai_family
+
try:
_ = ipaddress.ip_address(where)
url = 'https://{}{}'.format(where, path)
"content-type": "application/dns-message",
"content-length": str(len(wire))
}
- response = requests.post(url, headers=headers, data=wire, stream=True, timeout=timeout, verify=verify)
+ response = session.post(url, headers=headers, data=wire, stream=True, timeout=timeout, verify=verify)
else:
wire = base64.urlsafe_b64encode(wire).decode('utf-8').strip("=")
headers = {
"accept": "application/dns-message"
}
url += "?dns={}".format(wire)
- response = requests.get(url, headers=headers, stream=True, timeout=timeout, verify=verify)
+ response = session.get(url, headers=headers, stream=True, timeout=timeout, verify=verify)
# see https://tools.ietf.org/html/rfc8484#section-4.2.1 for more info about DoH status codes
if 200 > response.status_code > 299: