"the asyncio transport for HTTPX cannot set the local port"
)
- async def connect_tcp(self, host, port, timeout, local_address):
+ async def connect_tcp(
+ self, host, port, timeout, local_address
+ ): # pylint: disable=signature-differs
addresses = []
- now, expiration = _compute_times(timeout)
+ _, expiration = _compute_times(timeout)
if dns.inet.is_address(host):
addresses.append(host)
elif self._bootstrap_address is not None:
timeout = _remaining(attempt_expiration)
with anyio.fail_after(timeout):
stream = await anyio.connect_tcp(
- remote_host=host,
+ remote_host=address,
remote_port=port,
local_host=local_address,
)
pass
raise httpcore.ConnectError
+ async def connect_unix_socket(
+ self, path, timeout
+ ): # pylint: disable=signature-differs
+ raise NotImplementedError
+
+ async def sleep(self, seconds): # pylint: disable=signature-differs
+ await anyio.sleep(seconds)
+
class _HTTPTransport(httpx.AsyncHTTPTransport):
def __init__(
self,
**kwargs,
):
if resolver is None:
+ # pylint: disable=import-outside-toplevel,redefined-outer-name
import dns.asyncresolver
resolver = dns.asyncresolver.Resolver()
self._bootstrap_address = bootstrap_address
self._family = family
- async def connect_tcp(self, host, port, timeout, local_address):
+ async def connect_tcp(
+ self, host, port, timeout, local_address
+ ): # pylint: disable=signature-differs
addresses = []
- now, expiration = _compute_times(timeout)
+ _, expiration = _compute_times(timeout)
if dns.inet.is_address(host):
addresses.append(host)
elif self._bootstrap_address is not None:
continue
raise httpcore.ConnectError
+ async def connect_unix_socket(
+ self, path, timeout
+ ): # pylint: disable=signature-differs
+ raise NotImplementedError
+
+ async def sleep(self, seconds): # pylint: disable=signature-differs
+ await trio.sleep(seconds)
+
class _HTTPTransport(httpx.AsyncHTTPTransport):
def __init__(
self,
**kwargs,
):
if resolver is None:
+ # pylint: disable=import-outside-toplevel,redefined-outer-name
import dns.asyncresolver
resolver = dns.asyncresolver.Resolver()
import socket
import struct
import time
-import urllib.parse
import dns.exception
import dns.inet
self._bootstrap_address = bootstrap_address
self._family = family
- def connect_tcp(self, host, port, timeout, local_address):
+ def connect_tcp(
+ self, host, port, timeout, local_address
+ ): # pylint: disable=signature-differs
addresses = []
- now, expiration = _compute_times(timeout)
+ _, expiration = _compute_times(timeout)
if dns.inet.is_address(host):
addresses.append(host)
elif self._bootstrap_address is not None:
pass
raise httpcore.ConnectError
+ def connect_unix_socket(
+ self, path, timeout
+ ): # pylint: disable=signature-differs
+ raise NotImplementedError
+
class _HTTPTransport(httpx.HTTPTransport):
def __init__(
self,
**kwargs,
):
if resolver is None:
+ # pylint: disable=import-outside-toplevel,redefined-outer-name
import dns.resolver
resolver = dns.resolver.Resolver()