from requests_toolbelt.adapters.source import SourceAddressAdapter
from requests_toolbelt.adapters.host_header_ssl import HostHeaderSSLAdapter
have_doh = True
-except ImportError:
- have_doh = False # pragma: no cover
+except ImportError: # pragma: no cover
+ have_doh = False
try:
import ssl
return True
if not _polling_backend(fd, readable, writable, error, timeout):
raise dns.exception.Timeout
- except OSError as e:
+ except OSError as e: # pragma: no cover
if e.args[0] != errno.EINTR:
raise e
done = True
# be more efficient for high values).
_polling_backend = _poll_for
else:
- _polling_backend = _select_for
+ _polling_backend = _select_for # pragma: no cover
def _wait_for_readable(s, expiration):
"""
if not have_doh:
- raise NoDOH
+ raise NoDOH # pragma: no cover
wire = q.to_wire()
(af, destination, source) = _destination_and_source(where, port,
_wait_for_readable(sock, expiration)
try:
n = sock.recv(count)
- except ssl.SSLWantReadError:
+ except ssl.SSLWantReadError: # pragma: no cover
continue
- except ssl.SSLWantWriteError:
+ except ssl.SSLWantWriteError: # pragma: no cover
_wait_for_writable(sock, expiration)
continue
if n == b'':
_wait_for_writable(sock, expiration)
try:
current += sock.send(data[current:])
- except ssl.SSLWantReadError:
+ except ssl.SSLWantReadError: # pragma: no cover
_wait_for_readable(sock, expiration)
continue
- except ssl.SSLWantWriteError:
+ except ssl.SSLWantWriteError: # pragma: no cover
continue
return
except ssl.SSLWantReadError:
_wait_for_readable(s, expiration)
- except ssl.SSLWantWriteError:
+ except ssl.SSLWantWriteError: # pragma: no cover
_wait_for_writable(s, expiration)