-from .adapters.redirects import RedirectAdapter
from .backends.sync import SyncClient
from .client import Client
from .config import PoolLimits, SSLConfig, TimeoutConfig
Timeout,
TooManyRedirects,
)
-from .interfaces import Adapter, BaseReader, BaseWriter
+from .interfaces import BaseReader, BaseWriter, Dispatcher
from .models import URL, Headers, Origin, QueryParams, Request, Response
__version__ = "0.2.1"
+++ /dev/null
-"""
-Adapter classes layer additional behavior over the raw dispatching of the
-HTTP request/response.
-"""
+++ /dev/null
-import typing
-
-from ..interfaces import Adapter
-from ..models import Request, Response
-
-
-class AuthenticationAdapter(Adapter):
- def __init__(self, dispatch: Adapter):
- self.dispatch = dispatch
-
- def prepare_request(self, request: Request) -> None:
- self.dispatch.prepare_request(request)
-
- async def send(self, request: Request, **options: typing.Any) -> Response:
- return await self.dispatch.send(request, **options)
-
- async def close(self) -> None:
- await self.dispatch.close()
+++ /dev/null
-import typing
-
-from ..interfaces import Adapter
-from ..models import Request, Response
-
-
-class CookieAdapter(Adapter):
- def __init__(self, dispatch: Adapter):
- self.dispatch = dispatch
-
- def prepare_request(self, request: Request) -> None:
- self.dispatch.prepare_request(request)
-
- async def send(self, request: Request, **options: typing.Any) -> Response:
- return await self.dispatch.send(request, **options)
-
- async def close(self) -> None:
- await self.dispatch.close()
+++ /dev/null
-import typing
-
-from ..interfaces import Adapter
-from ..models import Request, Response
-
-
-class EnvironmentAdapter(Adapter):
- def __init__(self, dispatch: Adapter, trust_env: bool = True):
- self.dispatch = dispatch
- self.trust_env = trust_env
-
- def prepare_request(self, request: Request) -> None:
- self.dispatch.prepare_request(request)
-
- async def send(self, request: Request, **options: typing.Any) -> Response:
- if self.trust_env:
- self.merge_environment_options(options)
- return await self.dispatch.send(request, **options)
-
- async def close(self) -> None:
- await self.dispatch.close()
-
- def merge_environment_options(self, options: dict) -> None:
- """
- Add environment options.
- """
- # TODO
+++ /dev/null
-import typing
-
-from ..config import DEFAULT_MAX_REDIRECTS
-from ..constants import codes
-from ..exceptions import RedirectBodyUnavailable, RedirectLoop, TooManyRedirects
-from ..interfaces import Adapter
-from ..models import URL, Headers, Request, Response
-
-
-class RedirectAdapter(Adapter):
- def __init__(self, dispatch: Adapter, max_redirects: int = DEFAULT_MAX_REDIRECTS):
- self.dispatch = dispatch
- self.max_redirects = max_redirects
-
- def prepare_request(self, request: Request) -> None:
- self.dispatch.prepare_request(request)
-
- async def send(self, request: Request, **options: typing.Any) -> Response:
- allow_redirects = options.pop("allow_redirects", True) # type: bool
-
- # The following will not typically be specified by the end-user developer,
- # but are included in `response.next()` calls.
- history = options.pop("history", []) # type: typing.List[Response]
- seen_urls = options.pop("seen_urls", set()) # type: typing.Set[URL]
-
- while True:
- # We perform these checks here, so that calls to `response.next()`
- # will raise redirect errors if appropriate.
- if len(history) > self.max_redirects:
- raise TooManyRedirects()
- if request.url in seen_urls:
- raise RedirectLoop()
-
- response = await self.dispatch.send(request, **options)
- response.history = list(history)
- if not response.is_redirect:
- break
-
- history.insert(0, response)
- seen_urls.add(request.url)
-
- if allow_redirects:
- request = self.build_redirect_request(request, response)
- else:
- next_options = dict(options)
- next_options["seen_urls"] = seen_urls
- next_options["history"] = history
-
- async def send_next() -> Response:
- nonlocal request, response, next_options
- request = self.build_redirect_request(request, response)
- response = await self.send(request, **next_options)
- return response
-
- response.next = send_next # type: ignore
- break
-
- return response
-
- async def close(self) -> None:
- await self.dispatch.close()
-
- def build_redirect_request(self, request: Request, response: Response) -> Request:
- method = self.redirect_method(request, response)
- url = self.redirect_url(request, response)
- headers = self.redirect_headers(request, url)
- content = self.redirect_content(request, method)
- return Request(method=method, url=url, headers=headers, data=content)
-
- def redirect_method(self, request: Request, response: Response) -> str:
- """
- When being redirected we may want to change the method of the request
- based on certain specs or browser behavior.
- """
- method = request.method
-
- # https://tools.ietf.org/html/rfc7231#section-6.4.4
- if response.status_code == codes.see_other and method != "HEAD":
- method = "GET"
-
- # Do what the browsers do, despite standards...
- # Turn 302s into GETs.
- if response.status_code == codes.found and method != "HEAD":
- method = "GET"
-
- # If a POST is responded to with a 301, turn it into a GET.
- # This bizarre behaviour is explained in 'requests' issue 1704.
- if response.status_code == codes.moved_permanently and method == "POST":
- method = "GET"
-
- return method
-
- def redirect_url(self, request: Request, response: Response) -> URL:
- """
- Return the URL for the redirect to follow.
- """
- location = response.headers["Location"]
-
- url = URL(location, allow_relative=True)
-
- # Facilitate relative 'Location' headers, as allowed by RFC 7231.
- # (e.g. '/path/to/resource' instead of 'http://domain.tld/path/to/resource')
- if url.is_relative_url:
- url = url.resolve_with(request.url)
-
- # Attach previous fragment if needed (RFC 7231 7.1.2)
- if request.url.fragment and not url.fragment:
- url = url.copy_with(fragment=request.url.fragment)
-
- return url
-
- def redirect_headers(self, request: Request, url: URL) -> Headers:
- """
- Strip Authorization headers when responses are redirected away from
- the origin.
- """
- headers = Headers(request.headers)
- if url.origin != request.url.origin:
- del headers["Authorization"]
- return headers
-
- def redirect_content(self, request: Request, method: str) -> bytes:
- """
- Return the body that should be used for the redirect request.
- """
- if method != request.method and method == "GET":
- return b""
- if request.is_streaming:
- raise RedirectBodyUnavailable()
- return request.content
import typing
from types import TracebackType
-from .adapters.authentication import AuthenticationAdapter
-from .adapters.cookies import CookieAdapter
-from .adapters.environment import EnvironmentAdapter
-from .adapters.redirects import RedirectAdapter
from .config import (
DEFAULT_MAX_REDIRECTS,
DEFAULT_POOL_LIMITS,
SSLConfig,
TimeoutConfig,
)
+from .constants import codes
from .dispatch.connection_pool import ConnectionPool
+from .exceptions import RedirectBodyUnavailable, RedirectLoop, TooManyRedirects
+from .interfaces import Dispatcher
from .models import (
URL,
+ Headers,
HeaderTypes,
QueryParamTypes,
Request,
timeout: TimeoutConfig = DEFAULT_TIMEOUT_CONFIG,
pool_limits: PoolLimits = DEFAULT_POOL_LIMITS,
max_redirects: int = DEFAULT_MAX_REDIRECTS,
+ dispatch: Dispatcher = None,
):
- connection_pool = ConnectionPool(
- ssl=ssl, timeout=timeout, pool_limits=pool_limits
- )
- cookie_adapter = CookieAdapter(dispatch=connection_pool)
- auth_adapter = AuthenticationAdapter(dispatch=cookie_adapter)
- redirect_adapter = RedirectAdapter(
- dispatch=auth_adapter, max_redirects=max_redirects
- )
- self.adapter = EnvironmentAdapter(dispatch=redirect_adapter)
+ if dispatch is None:
+ dispatch = ConnectionPool(ssl=ssl, timeout=timeout, pool_limits=pool_limits)
- async def request(
- self,
- method: str,
- url: URLTypes,
- *,
- data: RequestData = b"",
- query_params: QueryParamTypes = None,
- headers: HeaderTypes = None,
- stream: bool = False,
- allow_redirects: bool = True,
- ssl: SSLConfig = None,
- timeout: TimeoutConfig = None,
- ) -> Response:
- request = Request(
- method, url, data=data, query_params=query_params, headers=headers
- )
- self.prepare_request(request)
- response = await self.send(
- request,
- stream=stream,
- allow_redirects=allow_redirects,
- ssl=ssl,
- timeout=timeout,
- )
- return response
+ self.max_redirects = max_redirects
+ self.dispatch = dispatch
async def get(
self,
timeout=timeout,
)
+ async def request(
+ self,
+ method: str,
+ url: URLTypes,
+ *,
+ data: RequestData = b"",
+ query_params: QueryParamTypes = None,
+ headers: HeaderTypes = None,
+ stream: bool = False,
+ allow_redirects: bool = True,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> Response:
+ request = Request(
+ method, url, data=data, query_params=query_params, headers=headers
+ )
+ self.prepare_request(request)
+ response = await self.send(
+ request,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ )
+ return response
+
def prepare_request(self, request: Request) -> None:
- self.adapter.prepare_request(request)
+ request.prepare()
async def send(
self,
request: Request,
*,
stream: bool = False,
- allow_redirects: bool = True,
ssl: SSLConfig = None,
timeout: TimeoutConfig = None,
+ allow_redirects: bool = True,
+ history: typing.List[Response] = None,
) -> Response:
- options = {
- "stream": stream,
- "allow_redirects": allow_redirects,
- } # type: typing.Dict[str, typing.Any]
+ if history is None:
+ history = []
+
+ while True:
+ # We perform these checks here, so that calls to `response.next()`
+ # will raise redirect errors if appropriate.
+ if len(history) > self.max_redirects:
+ raise TooManyRedirects()
+ if request.url in [response.url for response in history]:
+ raise RedirectLoop()
+
+ response = await self.dispatch.send(
+ request, stream=stream, ssl=ssl, timeout=timeout
+ )
+ response.history = list(history)
+ history = [response] + history
+ if not response.is_redirect:
+ break
+
+ if allow_redirects:
+ request = self.build_redirect_request(request, response)
+ else:
+
+ async def send_next() -> Response:
+ nonlocal request, response, ssl, allow_redirects, timeout, history
+ request = self.build_redirect_request(request, response)
+ response = await self.send(
+ request,
+ stream=stream,
+ allow_redirects=allow_redirects,
+ ssl=ssl,
+ timeout=timeout,
+ history=history,
+ )
+ return response
+
+ response.next = send_next # type: ignore
+ break
+
+ return response
+
+ def build_redirect_request(self, request: Request, response: Response) -> Request:
+ method = self.redirect_method(request, response)
+ url = self.redirect_url(request, response)
+ headers = self.redirect_headers(request, url)
+ content = self.redirect_content(request, method)
+ return Request(method=method, url=url, headers=headers, data=content)
+
+ def redirect_method(self, request: Request, response: Response) -> str:
+ """
+ When being redirected we may want to change the method of the request
+ based on certain specs or browser behavior.
+ """
+ method = request.method
+
+ # https://tools.ietf.org/html/rfc7231#section-6.4.4
+ if response.status_code == codes.see_other and method != "HEAD":
+ method = "GET"
+
+ # Do what the browsers do, despite standards...
+ # Turn 302s into GETs.
+ if response.status_code == codes.found and method != "HEAD":
+ method = "GET"
+
+ # If a POST is responded to with a 301, turn it into a GET.
+ # This bizarre behaviour is explained in 'requests' issue 1704.
+ if response.status_code == codes.moved_permanently and method == "POST":
+ method = "GET"
+
+ return method
+
+ def redirect_url(self, request: Request, response: Response) -> URL:
+ """
+ Return the URL for the redirect to follow.
+ """
+ location = response.headers["Location"]
+
+ url = URL(location, allow_relative=True)
+
+ # Facilitate relative 'Location' headers, as allowed by RFC 7231.
+ # (e.g. '/path/to/resource' instead of 'http://domain.tld/path/to/resource')
+ if url.is_relative_url:
+ url = url.resolve_with(request.url)
+
+ # Attach previous fragment if needed (RFC 7231 7.1.2)
+ if request.url.fragment and not url.fragment:
+ url = url.copy_with(fragment=request.url.fragment)
+
+ return url
- if ssl is not None:
- options["ssl"] = ssl
- if timeout is not None:
- options["timeout"] = timeout
+ def redirect_headers(self, request: Request, url: URL) -> Headers:
+ """
+ Strip Authorization headers when responses are redirected away from
+ the origin.
+ """
+ headers = Headers(request.headers)
+ if url.origin != request.url.origin:
+ del headers["Authorization"]
+ return headers
- return await self.adapter.send(request, **options)
+ def redirect_content(self, request: Request, method: str) -> bytes:
+ """
+ Return the body that should be used for the redirect request.
+ """
+ if method != request.method and method == "GET":
+ return b""
+ if request.is_streaming:
+ raise RedirectBodyUnavailable()
+ return request.content
async def close(self) -> None:
- await self.adapter.close()
+ await self.dispatch.close()
async def __aenter__(self) -> "Client":
return self
)
from ..constants import Protocol
from ..exceptions import ConnectTimeout
-from ..interfaces import Adapter
+from ..interfaces import Dispatcher
from ..models import Origin, Request, Response
from .http2 import HTTP2Connection
from .http11 import HTTP11Connection
ReleaseCallback = typing.Callable[["HTTPConnection"], typing.Awaitable[None]]
-class HTTPConnection(Adapter):
+class HTTPConnection(Dispatcher):
def __init__(
self,
origin: typing.Union[str, Origin],
self.h11_connection = None # type: typing.Optional[HTTP11Connection]
self.h2_connection = None # type: typing.Optional[HTTP2Connection]
- def prepare_request(self, request: Request) -> None:
- request.prepare()
-
- async def send(self, request: Request, **options: typing.Any) -> Response:
+ async def send(
+ self,
+ request: Request,
+ stream: bool = False,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> Response:
if self.h11_connection is None and self.h2_connection is None:
- await self.connect(**options)
+ await self.connect(ssl=ssl, timeout=timeout)
if self.h2_connection is not None:
- response = await self.h2_connection.send(request, **options)
+ response = await self.h2_connection.send(
+ request, stream=stream, timeout=timeout
+ )
else:
assert self.h11_connection is not None
- response = await self.h11_connection.send(request, **options)
+ response = await self.h11_connection.send(
+ request, stream=stream, timeout=timeout
+ )
return response
- async def connect(self, **options: typing.Any) -> None:
- ssl = options.get("ssl", self.ssl)
- timeout = options.get("timeout", self.timeout)
- assert isinstance(ssl, SSLConfig)
- assert isinstance(timeout, TimeoutConfig)
+ async def connect(
+ self, ssl: SSLConfig = None, timeout: TimeoutConfig = None
+ ) -> None:
+ if ssl is None:
+ ssl = self.ssl
+ if timeout is None:
+ timeout = self.timeout
host = self.origin.host
port = self.origin.port
)
from ..decoders import ACCEPT_ENCODING
from ..exceptions import PoolTimeout
-from ..interfaces import Adapter
+from ..interfaces import Dispatcher
from ..models import Origin, Request, Response
from .connection import HTTPConnection
return len(self.all)
-class ConnectionPool(Adapter):
+class ConnectionPool(Dispatcher):
def __init__(
self,
*,
def num_connections(self) -> int:
return len(self.keepalive_connections) + len(self.active_connections)
- def prepare_request(self, request: Request) -> None:
- request.prepare()
-
- async def send(self, request: Request, **options: typing.Any) -> Response:
+ async def send(
+ self,
+ request: Request,
+ stream: bool = False,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> Response:
connection = await self.acquire_connection(request.url.origin)
try:
- response = await connection.send(request, **options)
+ response = await connection.send(
+ request, stream=stream, ssl=ssl, timeout=timeout
+ )
except BaseException as exc:
self.active_connections.remove(connection)
self.max_connections.release()
TimeoutConfig,
)
from ..exceptions import ConnectTimeout, ReadTimeout
-from ..interfaces import Adapter, BaseReader, BaseWriter
+from ..interfaces import BaseReader, BaseWriter, Dispatcher
from ..models import Request, Response
H11Event = typing.Union[
OnReleaseCallback = typing.Callable[[], typing.Awaitable[None]]
-class HTTP11Connection(Adapter):
+class HTTP11Connection:
READ_NUM_BYTES = 4096
def __init__(
self.on_release = on_release
self.h11_state = h11.Connection(our_role=h11.CLIENT)
- def prepare_request(self, request: Request) -> None:
- request.prepare()
-
- async def send(self, request: Request, **options: typing.Any) -> Response:
- timeout = options.get("timeout")
- stream = options.get("stream", False)
- assert timeout is None or isinstance(timeout, TimeoutConfig)
-
+ async def send(
+ self, request: Request, stream: bool = False, timeout: TimeoutConfig = None
+ ) -> Response:
# Start sending the request.
method = request.method.encode("ascii")
target = request.url.full_path.encode("ascii")
TimeoutConfig,
)
from ..exceptions import ConnectTimeout, ReadTimeout
-from ..interfaces import Adapter, BaseReader, BaseWriter
+from ..interfaces import BaseReader, BaseWriter, Dispatcher
from ..models import Request, Response
OptionalTimeout = typing.Optional[TimeoutConfig]
-class HTTP2Connection(Adapter):
+class HTTP2Connection:
READ_NUM_BYTES = 4096
def __init__(
self.events = {} # type: typing.Dict[int, typing.List[h2.events.Event]]
self.initialized = False
- def prepare_request(self, request: Request) -> None:
- request.prepare()
-
- async def send(self, request: Request, **options: typing.Any) -> Response:
- timeout = options.get("timeout")
- stream = options.get("stream", False)
- assert timeout is None or isinstance(timeout, TimeoutConfig)
-
+ async def send(
+ self, request: Request, stream: bool = False, timeout: TimeoutConfig = None
+ ) -> Response:
# Start sending the request.
if not self.initialized:
self.initiate_connection()
import typing
from types import TracebackType
-from .config import TimeoutConfig
+from .config import SSLConfig, TimeoutConfig
from .models import (
URL,
+ Headers,
HeaderTypes,
QueryParamTypes,
Request,
OptionalTimeout = typing.Optional[TimeoutConfig]
-class Adapter:
+class Dispatcher:
"""
The base class for all adapter or dispatcher classes.
data: RequestData = b"",
query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
- **options: typing.Any,
+ stream: bool = False,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None
) -> Response:
request = Request(
method, url, data=data, query_params=query_params, headers=headers
)
self.prepare_request(request)
- response = await self.send(request, **options)
+ response = await self.send(request, stream=stream, ssl=ssl, timeout=timeout)
return response
def prepare_request(self, request: Request) -> None:
- raise NotImplementedError() # pragma: nocover
+ request.prepare()
- async def send(self, request: Request, **options: typing.Any) -> Response:
+ async def send(
+ self,
+ request: Request,
+ stream: bool = False,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> Response:
raise NotImplementedError() # pragma: nocover
async def close(self) -> None:
- raise NotImplementedError() # pragma: nocover
+ pass # pragma: nocover
- async def __aenter__(self) -> "Adapter":
+ async def __aenter__(self) -> "Dispatcher":
return self
async def __aexit__(
from httpcore import (
URL,
- Adapter,
- RedirectAdapter,
+ Client,
+ Dispatcher,
RedirectBodyUnavailable,
RedirectLoop,
Request,
Response,
+ SSLConfig,
+ TimeoutConfig,
TooManyRedirects,
codes,
)
-class MockDispatch(Adapter):
- def prepare_request(self, request: Request) -> None:
- pass
-
- async def send(self, request: Request, **options) -> Response:
+class MockDispatch(Dispatcher):
+ async def send(
+ self,
+ request: Request,
+ stream: bool = False,
+ ssl: SSLConfig = None,
+ timeout: TimeoutConfig = None,
+ ) -> Response:
if request.url.path == "/redirect_301":
status_code = codes.moved_permanently
headers = {"location": "https://example.org/"}
@pytest.mark.asyncio
async def test_redirect_301():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
response = await client.request("POST", "https://example.org/redirect_301")
assert response.status_code == codes.ok
assert response.url == URL("https://example.org/")
@pytest.mark.asyncio
async def test_redirect_302():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
response = await client.request("POST", "https://example.org/redirect_302")
assert response.status_code == codes.ok
assert response.url == URL("https://example.org/")
@pytest.mark.asyncio
async def test_redirect_303():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
response = await client.request("GET", "https://example.org/redirect_303")
assert response.status_code == codes.ok
assert response.url == URL("https://example.org/")
@pytest.mark.asyncio
async def test_disallow_redirects():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
response = await client.request(
"POST", "https://example.org/redirect_303", allow_redirects=False
)
@pytest.mark.asyncio
async def test_relative_redirect():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
response = await client.request("GET", "https://example.org/relative_redirect")
assert response.status_code == codes.ok
assert response.url == URL("https://example.org/")
@pytest.mark.asyncio
async def test_no_scheme_redirect():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
response = await client.request("GET", "https://example.org/no_scheme_redirect")
assert response.status_code == codes.ok
assert response.url == URL("https://example.org/")
@pytest.mark.asyncio
async def test_fragment_redirect():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
url = "https://example.org/relative_redirect#fragment"
response = await client.request("GET", url)
assert response.status_code == codes.ok
@pytest.mark.asyncio
async def test_multiple_redirects():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
url = "https://example.org/multiple_redirects?count=20"
response = await client.request("GET", url)
assert response.status_code == codes.ok
@pytest.mark.asyncio
async def test_too_many_redirects():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
with pytest.raises(TooManyRedirects):
await client.request("GET", "https://example.org/multiple_redirects?count=21")
@pytest.mark.asyncio
async def test_too_many_redirects_calling_next():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
url = "https://example.org/multiple_redirects?count=21"
response = await client.request("GET", url, allow_redirects=False)
with pytest.raises(TooManyRedirects):
@pytest.mark.asyncio
async def test_redirect_loop():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
with pytest.raises(RedirectLoop):
await client.request("GET", "https://example.org/redirect_loop")
@pytest.mark.asyncio
async def test_redirect_loop_calling_next():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
url = "https://example.org/redirect_loop"
response = await client.request("GET", url, allow_redirects=False)
with pytest.raises(RedirectLoop):
@pytest.mark.asyncio
async def test_cross_domain_redirect():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
url = "https://example.com/cross_domain"
headers = {"Authorization": "abc"}
response = await client.request("GET", url, headers=headers)
data = json.loads(response.content.decode())
assert response.url == URL("https://example.org/cross_domain_target")
- assert data == {"headers": {}}
+ assert "authorization" not in data["headers"]
@pytest.mark.asyncio
async def test_same_domain_redirect():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
url = "https://example.org/cross_domain"
headers = {"Authorization": "abc"}
response = await client.request("GET", url, headers=headers)
data = json.loads(response.content.decode())
assert response.url == URL("https://example.org/cross_domain_target")
- assert data == {"headers": {"authorization": "abc"}}
+ assert data["headers"]["authorization"] == "abc"
@pytest.mark.asyncio
async def test_body_redirect():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
url = "https://example.org/redirect_body"
data = b"Example request body"
response = await client.request("POST", url, data=data)
@pytest.mark.asyncio
async def test_cannot_redirect_streaming_body():
- client = RedirectAdapter(MockDispatch())
+ client = Client(dispatch=MockDispatch())
url = "https://example.org/redirect_body"
async def streaming_body():
import pytest
-import httpcore
+from httpcore import HTTPConnection, Request
@pytest.mark.asyncio
async def test_get(server):
- http = httpcore.HTTPConnection(origin="http://127.0.0.1:8000/")
- response = await http.request("GET", "http://127.0.0.1:8000/")
+ conn = HTTPConnection(origin="http://127.0.0.1:8000/")
+ request = Request("GET", "http://127.0.0.1:8000/")
+ request.prepare()
+ response = await conn.send(request)
assert response.status_code == 200
assert response.content == b"Hello, world!"
@pytest.mark.asyncio
async def test_post(server):
- http = httpcore.HTTPConnection(origin="http://127.0.0.1:8000/")
- response = await http.request(
- "POST", "http://127.0.0.1:8000/", body=b"Hello, world!"
- )
+ conn = HTTPConnection(origin="http://127.0.0.1:8000/")
+ request = Request("GET", "http://127.0.0.1:8000/", data=b"Hello, world!")
+ request.prepare()
+ response = await conn.send(request)
assert response.status_code == 200
import h2.events
import pytest
-import httpcore
+from httpcore import BaseReader, BaseWriter, HTTP2Connection, Request
-class MockServer(httpcore.BaseReader, httpcore.BaseWriter):
+class MockServer(BaseReader, BaseWriter):
"""
This class exposes Reader and Writer style interfaces
"""
@pytest.mark.asyncio
async def test_http2_get_request():
server = MockServer()
- async with httpcore.HTTP2Connection(reader=server, writer=server) as conn:
- response = await conn.request("GET", "http://example.org")
+ conn = HTTP2Connection(reader=server, writer=server)
+ request = Request("GET", "http://example.org")
+ request.prepare()
+
+ response = await conn.send(request)
+
assert response.status_code == 200
assert json.loads(response.content) == {"method": "GET", "path": "/", "body": ""}
@pytest.mark.asyncio
async def test_http2_post_request():
server = MockServer()
- async with httpcore.HTTP2Connection(reader=server, writer=server) as conn:
- response = await conn.request("POST", "http://example.org", data=b"<data>")
+ conn = HTTP2Connection(reader=server, writer=server)
+ request = Request("POST", "http://example.org", data=b"<data>")
+ request.prepare()
+
+ response = await conn.send(request)
+
assert response.status_code == 200
assert json.loads(response.content) == {
"method": "POST",
@pytest.mark.asyncio
async def test_http2_multiple_requests():
server = MockServer()
- async with httpcore.HTTP2Connection(reader=server, writer=server) as conn:
- response_1 = await conn.request("GET", "http://example.org/1")
- response_2 = await conn.request("GET", "http://example.org/2")
- response_3 = await conn.request("GET", "http://example.org/3")
+ conn = HTTP2Connection(reader=server, writer=server)
+ request_1 = Request("GET", "http://example.org/1")
+ request_2 = Request("GET", "http://example.org/2")
+ request_3 = Request("GET", "http://example.org/3")
+
+ request_1.prepare()
+ request_2.prepare()
+ request_3.prepare()
+
+ response_1 = await conn.send(request_1)
+ response_2 = await conn.send(request_2)
+ response_3 = await conn.send(request_3)
assert response_1.status_code == 200
assert json.loads(response_1.content) == {"method": "GET", "path": "/1", "body": ""}
assert response_3.status_code == 200
assert json.loads(response_3.content) == {"method": "GET", "path": "/3", "body": ""}
+
+ await conn.close()