from ._models import URL, Cookies, Headers, QueryParams, Request, Response
from ._status_codes import codes
from ._transports.asgi import ASGITransport
-from ._transports.base import (
- AsyncBaseTransport,
- AsyncByteStream,
- BaseTransport,
- SyncByteStream,
-)
+from ._transports.base import AsyncBaseTransport, BaseTransport
from ._transports.default import AsyncHTTPTransport, HTTPTransport
from ._transports.mock import MockTransport
from ._transports.wsgi import WSGITransport
+from ._types import AsyncByteStream, SyncByteStream
__all__ = [
"__description__",
from ._models import URL, Cookies, Headers, QueryParams, Request, Response
from ._status_codes import codes
from ._transports.asgi import ASGITransport
-from ._transports.base import (
- AsyncBaseTransport,
- AsyncByteStream,
- BaseTransport,
- SyncByteStream,
-)
+from ._transports.base import AsyncBaseTransport, BaseTransport
from ._transports.default import AsyncHTTPTransport, HTTPTransport
from ._transports.wsgi import WSGITransport
from ._types import (
+ AsyncByteStream,
AuthTypes,
CertTypes,
CookieTypes,
RequestContent,
RequestData,
RequestFiles,
+ SyncByteStream,
TimeoutTypes,
URLTypes,
VerifyTypes,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
+ timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
) -> Request:
"""
Build and return a request instance.
headers = self._merge_headers(headers)
cookies = self._merge_cookies(cookies)
params = self._merge_queryparams(params)
+ timeout = (
+ self.timeout if isinstance(timeout, UseClientDefault) else Timeout(timeout)
+ )
return Request(
method,
url,
params=params,
headers=headers,
cookies=cookies,
+ extensions={"timeout": timeout.as_dict()},
)
def _merge_url(self, url: URLTypes) -> URL:
params=params,
headers=headers,
cookies=cookies,
+ timeout=timeout,
)
- return self.send(
- request, auth=auth, follow_redirects=follow_redirects, timeout=timeout
- )
+ return self.send(request, auth=auth, follow_redirects=follow_redirects)
@contextmanager
def stream(
params=params,
headers=headers,
cookies=cookies,
+ timeout=timeout,
)
response = self.send(
request=request,
auth=auth,
follow_redirects=follow_redirects,
- timeout=timeout,
stream=True,
)
try:
stream: bool = False,
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
) -> Response:
"""
Send a request.
raise RuntimeError("Cannot send a request, as the client has been closed.")
self._state = ClientState.OPENED
- timeout = (
- self.timeout if isinstance(timeout, UseClientDefault) else Timeout(timeout)
- )
follow_redirects = (
self.follow_redirects
if isinstance(follow_redirects, UseClientDefault)
response = self._send_handling_auth(
request,
auth=auth,
- timeout=timeout,
follow_redirects=follow_redirects,
history=[],
)
self,
request: Request,
auth: Auth,
- timeout: Timeout,
follow_redirects: bool,
history: typing.List[Response],
) -> Response:
while True:
response = self._send_handling_redirects(
request,
- timeout=timeout,
follow_redirects=follow_redirects,
history=history,
)
def _send_handling_redirects(
self,
request: Request,
- timeout: Timeout,
follow_redirects: bool,
history: typing.List[Response],
) -> Response:
for hook in self._event_hooks["request"]:
hook(request)
- response = self._send_single_request(request, timeout)
+ response = self._send_single_request(request)
try:
for hook in self._event_hooks["response"]:
hook(response)
response.close()
raise exc
- def _send_single_request(self, request: Request, timeout: Timeout) -> Response:
+ def _send_single_request(self, request: Request) -> Response:
"""
Sends a single request, without handling any redirections.
"""
)
with request_context(request=request):
- (status_code, headers, stream, extensions) = transport.handle_request(
- request.method.encode(),
- request.url.raw,
- headers=request.headers.raw,
- stream=request.stream,
- extensions={"timeout": timeout.as_dict()},
- )
+ response = transport.handle_request(request)
- response = Response(
- status_code,
- headers=headers,
- stream=stream,
- extensions=extensions,
- request=request,
- )
+ assert isinstance(response.stream, SyncByteStream)
- response.stream = BoundSyncStream(stream, response=response, timer=timer)
+ response.request = request
+ response.stream = BoundSyncStream(
+ response.stream, response=response, timer=timer
+ )
self.cookies.extract_cookies(response)
status = f"{response.status_code} {response.reason_phrase}"
params=params,
headers=headers,
cookies=cookies,
+ timeout=timeout,
)
response = await self.send(
- request, auth=auth, follow_redirects=follow_redirects, timeout=timeout
+ request, auth=auth, follow_redirects=follow_redirects
)
return response
params=params,
headers=headers,
cookies=cookies,
+ timeout=timeout,
)
response = await self.send(
request=request,
auth=auth,
follow_redirects=follow_redirects,
- timeout=timeout,
stream=True,
)
try:
stream: bool = False,
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
- timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
) -> Response:
"""
Send a request.
raise RuntimeError("Cannot send a request, as the client has been closed.")
self._state = ClientState.OPENED
- timeout = (
- self.timeout if isinstance(timeout, UseClientDefault) else Timeout(timeout)
- )
follow_redirects = (
self.follow_redirects
if isinstance(follow_redirects, UseClientDefault)
response = await self._send_handling_auth(
request,
auth=auth,
- timeout=timeout,
follow_redirects=follow_redirects,
history=[],
)
self,
request: Request,
auth: Auth,
- timeout: Timeout,
follow_redirects: bool,
history: typing.List[Response],
) -> Response:
while True:
response = await self._send_handling_redirects(
request,
- timeout=timeout,
follow_redirects=follow_redirects,
history=history,
)
async def _send_handling_redirects(
self,
request: Request,
- timeout: Timeout,
follow_redirects: bool,
history: typing.List[Response],
) -> Response:
for hook in self._event_hooks["request"]:
await hook(request)
- response = await self._send_single_request(request, timeout)
+ response = await self._send_single_request(request)
try:
for hook in self._event_hooks["response"]:
await hook(response)
await response.aclose()
raise exc
- async def _send_single_request(
- self, request: Request, timeout: Timeout
- ) -> Response:
+ async def _send_single_request(self, request: Request) -> Response:
"""
Sends a single request, without handling any redirections.
"""
)
with request_context(request=request):
- (
- status_code,
- headers,
- stream,
- extensions,
- ) = await transport.handle_async_request(
- request.method.encode(),
- request.url.raw,
- headers=request.headers.raw,
- stream=request.stream,
- extensions={"timeout": timeout.as_dict()},
- )
+ response = await transport.handle_async_request(request)
- response = Response(
- status_code,
- headers=headers,
- stream=stream,
- extensions=extensions,
- request=request,
+ assert isinstance(response.stream, AsyncByteStream)
+ response.request = request
+ response.stream = BoundAsyncStream(
+ response.stream, response=response, timer=timer
)
-
- response.stream = BoundAsyncStream(stream, response=response, timer=timer)
self.cookies.extract_cookies(response)
status = f"{response.status_code} {response.reason_phrase}"
from ._exceptions import StreamClosed, StreamConsumed
from ._multipart import MultipartStream
-from ._transports.base import AsyncByteStream, SyncByteStream
-from ._types import RequestContent, RequestData, RequestFiles, ResponseContent
+from ._types import (
+ AsyncByteStream,
+ RequestContent,
+ RequestData,
+ RequestFiles,
+ ResponseContent,
+ SyncByteStream,
+)
from ._utils import peek_filelike_length, primitive_value_to_str
request_context,
)
from ._status_codes import codes
-from ._transports.base import AsyncByteStream, SyncByteStream
from ._types import (
+ AsyncByteStream,
CookieTypes,
HeaderTypes,
PrimitiveData,
RequestData,
RequestFiles,
ResponseContent,
+ SyncByteStream,
URLTypes,
)
from ._utils import (
files: RequestFiles = None,
json: typing.Any = None,
stream: typing.Union[SyncByteStream, AsyncByteStream] = None,
+ extensions: dict = None,
):
- if isinstance(method, bytes):
- self.method = method.decode("ascii").upper()
- else:
- self.method = method.upper()
+ self.method = (
+ method.decode("ascii").upper()
+ if isinstance(method, bytes)
+ else method.upper()
+ )
self.url = URL(url)
if params is not None:
self.url = self.url.copy_merge_params(params=params)
self.headers = Headers(headers)
+ self.extensions = {} if extensions is None else extensions
+
if cookies:
Cookies(cookies).set_cookie_header(self)
import typing
from pathlib import Path
-from ._transports.base import AsyncByteStream, SyncByteStream
-from ._types import FileContent, FileTypes, RequestFiles
+from ._types import (
+ AsyncByteStream,
+ FileContent,
+ FileTypes,
+ RequestFiles,
+ SyncByteStream,
+)
from ._utils import (
format_form_param,
guess_content_type,
import typing
-from urllib.parse import unquote
import sniffio
-from .base import AsyncBaseTransport, AsyncByteStream
+from .._models import Request, Response
+from .._types import AsyncByteStream
+from .base import AsyncBaseTransport
if typing.TYPE_CHECKING: # pragma: no cover
import asyncio
async def handle_async_request(
self,
- method: bytes,
- url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: AsyncByteStream,
- extensions: dict,
- ) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], AsyncByteStream, dict
- ]:
+ request: Request,
+ ) -> Response:
+ assert isinstance(request.stream, AsyncByteStream)
+
# ASGI scope.
- scheme, host, port, full_path = url
- path, _, query = full_path.partition(b"?")
scope = {
"type": "http",
"asgi": {"version": "3.0"},
"http_version": "1.1",
- "method": method.decode(),
- "headers": [(k.lower(), v) for (k, v) in headers],
- "scheme": scheme.decode("ascii"),
- "path": unquote(path.decode("ascii")),
- "raw_path": path,
- "query_string": query,
- "server": (host.decode("ascii"), port),
+ "method": request.method,
+ "headers": [(k.lower(), v) for (k, v) in request.headers.raw],
+ "scheme": request.url.scheme,
+ "path": request.url.path,
+ "raw_path": request.url.raw_path,
+ "query_string": request.url.query,
+ "server": (request.url.host, request.url.port),
"client": self.client,
"root_path": self.root_path,
}
# Request.
- request_body_chunks = stream.__aiter__()
+ request_body_chunks = request.stream.__aiter__()
request_complete = False
# Response.
body = message.get("body", b"")
more_body = message.get("more_body", False)
- if body and method != b"HEAD":
+ if body and request.method != "HEAD":
body_parts.append(body)
if not more_body:
assert response_headers is not None
stream = ASGIResponseStream(body_parts)
- extensions = {}
- return (status_code, response_headers, stream, extensions)
+ return Response(status_code, headers=response_headers, stream=stream)
import typing
from types import TracebackType
+from .._models import Request, Response
+
T = typing.TypeVar("T", bound="BaseTransport")
A = typing.TypeVar("A", bound="AsyncBaseTransport")
-class SyncByteStream:
- def __iter__(self) -> typing.Iterator[bytes]:
- raise NotImplementedError(
- "The '__iter__' method must be implemented."
- ) # pragma: nocover
- yield b"" # pragma: nocover
-
- def close(self) -> None:
- """
- Subclasses can override this method to release any network resources
- after a request/response cycle is complete.
-
- Streaming cases should use a `try...finally` block to ensure that
- the stream `close()` method is always called.
-
- Example:
-
- status_code, headers, stream, extensions = transport.handle_request(...)
- try:
- ...
- finally:
- stream.close()
- """
-
- def read(self) -> bytes:
- """
- Simple cases can use `.read()` as a convience method for consuming
- the entire stream and then closing it.
-
- Example:
-
- status_code, headers, stream, extensions = transport.handle_request(...)
- body = stream.read()
- """
- try:
- return b"".join([part for part in self])
- finally:
- self.close()
-
-
-class AsyncByteStream:
- async def __aiter__(self) -> typing.AsyncIterator[bytes]:
- raise NotImplementedError(
- "The '__aiter__' method must be implemented."
- ) # pragma: nocover
- yield b"" # pragma: nocover
-
- async def aclose(self) -> None:
- pass
-
- async def aread(self) -> bytes:
- try:
- return b"".join([part async for part in self])
- finally:
- await self.aclose()
-
-
class BaseTransport:
def __enter__(self: T) -> T:
return self
) -> None:
self.close()
- def handle_request(
- self,
- method: bytes,
- url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: SyncByteStream,
- extensions: dict,
- ) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], SyncByteStream, dict
- ]:
+ def handle_request(self, request: Request) -> Response:
"""
Send a single HTTP request and return a response.
async def handle_async_request(
self,
- method: bytes,
- url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: AsyncByteStream,
- extensions: dict,
- ) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], AsyncByteStream, dict
- ]:
+ request: Request,
+ ) -> Response:
raise NotImplementedError(
"The 'handle_async_request' method must be implemented."
) # pragma: nocover
WriteError,
WriteTimeout,
)
-from .._types import CertTypes, VerifyTypes
-from .base import AsyncBaseTransport, AsyncByteStream, BaseTransport, SyncByteStream
+from .._models import Request, Response
+from .._types import AsyncByteStream, CertTypes, SyncByteStream, VerifyTypes
+from .base import AsyncBaseTransport, BaseTransport
T = typing.TypeVar("T", bound="HTTPTransport")
A = typing.TypeVar("A", bound="AsyncHTTPTransport")
def handle_request(
self,
- method: bytes,
- url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: SyncByteStream,
- extensions: dict,
- ) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], SyncByteStream, dict
- ]:
+ request: Request,
+ ) -> Response:
+ assert isinstance(request.stream, SyncByteStream)
+
with map_httpcore_exceptions():
status_code, headers, byte_stream, extensions = self._pool.handle_request(
- method=method,
- url=url,
- headers=headers,
- stream=httpcore.IteratorByteStream(iter(stream)),
- extensions=extensions,
+ method=request.method.encode("ascii"),
+ url=request.url.raw,
+ headers=request.headers.raw,
+ stream=httpcore.IteratorByteStream(iter(request.stream)),
+ extensions=request.extensions,
)
stream = ResponseStream(byte_stream)
- return status_code, headers, stream, extensions
+ return Response(
+ status_code, headers=headers, stream=stream, extensions=extensions
+ )
def close(self) -> None:
self._pool.close()
async def handle_async_request(
self,
- method: bytes,
- url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: AsyncByteStream,
- extensions: dict,
- ) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], AsyncByteStream, dict
- ]:
+ request: Request,
+ ) -> Response:
+ assert isinstance(request.stream, AsyncByteStream)
+
with map_httpcore_exceptions():
(
status_code,
byte_stream,
extensions,
) = await self._pool.handle_async_request(
- method=method,
- url=url,
- headers=headers,
- stream=httpcore.AsyncIteratorByteStream(stream.__aiter__()),
- extensions=extensions,
+ method=request.method.encode("ascii"),
+ url=request.url.raw,
+ headers=request.headers.raw,
+ stream=httpcore.AsyncIteratorByteStream(request.stream.__aiter__()),
+ extensions=request.extensions,
)
stream = AsyncResponseStream(byte_stream)
- return status_code, headers, stream, extensions
+ return Response(
+ status_code, headers=headers, stream=stream, extensions=extensions
+ )
async def aclose(self) -> None:
await self._pool.aclose()
import asyncio
import typing
-from .._models import Request
-from .base import AsyncBaseTransport, AsyncByteStream, BaseTransport, SyncByteStream
+from .._models import Request, Response
+from .base import AsyncBaseTransport, BaseTransport
class MockTransport(AsyncBaseTransport, BaseTransport):
def handle_request(
self,
- method: bytes,
- url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: SyncByteStream,
- extensions: dict,
- ) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], SyncByteStream, dict
- ]:
- request = Request(
- method=method,
- url=url,
- headers=headers,
- stream=stream,
- )
+ request: Request,
+ ) -> Response:
request.read()
- response = self.handler(request)
- return (
- response.status_code,
- response.headers.raw,
- response.stream,
- response.extensions,
- )
+ return self.handler(request)
async def handle_async_request(
self,
- method: bytes,
- url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: AsyncByteStream,
- extensions: dict,
- ) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], AsyncByteStream, dict
- ]:
- request = Request(
- method=method,
- url=url,
- headers=headers,
- stream=stream,
- )
+ request: Request,
+ ) -> Response:
await request.aread()
-
response = self.handler(request)
# Allow handler to *optionally* be an `async` function.
if asyncio.iscoroutine(response):
response = await response
- return (
- response.status_code,
- response.headers.raw,
- response.stream,
- response.extensions,
- )
+ return response
import itertools
import sys
import typing
-from urllib.parse import unquote
-from .base import BaseTransport, SyncByteStream
+from .._models import Request, Response
+from .._types import SyncByteStream
+from .base import BaseTransport
def _skip_leading_empty_chunks(body: typing.Iterable) -> typing.Iterable:
self.remote_addr = remote_addr
self.wsgi_errors = wsgi_errors
- def handle_request(
- self,
- method: bytes,
- url: typing.Tuple[bytes, bytes, typing.Optional[int], bytes],
- headers: typing.List[typing.Tuple[bytes, bytes]],
- stream: SyncByteStream,
- extensions: dict,
- ) -> typing.Tuple[
- int, typing.List[typing.Tuple[bytes, bytes]], SyncByteStream, dict
- ]:
- wsgi_input = io.BytesIO(b"".join(stream))
-
- scheme, host, port, full_path = url
- path, _, query = full_path.partition(b"?")
- if port is None:
- port = {b"http": 80, b"https": 443}[scheme]
+ def handle_request(self, request: Request) -> Response:
+ request.read()
+ wsgi_input = io.BytesIO(request.content)
+ port = request.url.port or {"http": 80, "https": 443}[request.url.scheme]
environ = {
"wsgi.version": (1, 0),
- "wsgi.url_scheme": scheme.decode("ascii"),
+ "wsgi.url_scheme": request.url.scheme,
"wsgi.input": wsgi_input,
"wsgi.errors": self.wsgi_errors or sys.stderr,
"wsgi.multithread": True,
"wsgi.multiprocess": False,
"wsgi.run_once": False,
- "REQUEST_METHOD": method.decode(),
+ "REQUEST_METHOD": request.method,
"SCRIPT_NAME": self.script_name,
- "PATH_INFO": unquote(path.decode("ascii")),
- "QUERY_STRING": query.decode("ascii"),
- "SERVER_NAME": host.decode("ascii"),
+ "PATH_INFO": request.url.path,
+ "QUERY_STRING": request.url.query.decode("ascii"),
+ "SERVER_NAME": request.url.host,
"SERVER_PORT": str(port),
"REMOTE_ADDR": self.remote_addr,
}
- for header_key, header_value in headers:
+ for header_key, header_value in request.headers.raw:
key = header_key.decode("ascii").upper().replace("-", "_")
if key not in ("CONTENT_TYPE", "CONTENT_LENGTH"):
key = "HTTP_" + key
(key.encode("ascii"), value.encode("ascii"))
for key, value in seen_response_headers
]
- extensions = {}
- return (status_code, headers, stream, extensions)
+ return Response(status_code, headers=headers, stream=stream)
IO,
TYPE_CHECKING,
AsyncIterable,
+ AsyncIterator,
Callable,
Dict,
Iterable,
+ Iterator,
List,
Mapping,
Optional,
Tuple[Optional[str], FileContent, Optional[str]],
]
RequestFiles = Union[Mapping[str, FileTypes], Sequence[Tuple[str, FileTypes]]]
+
+
+class SyncByteStream:
+ def __iter__(self) -> Iterator[bytes]:
+ raise NotImplementedError(
+ "The '__iter__' method must be implemented."
+ ) # pragma: nocover
+ yield b"" # pragma: nocover
+
+ def close(self) -> None:
+ """
+ Subclasses can override this method to release any network resources
+ after a request/response cycle is complete.
+
+ Streaming cases should use a `try...finally` block to ensure that
+ the stream `close()` method is always called.
+
+ Example:
+
+ status_code, headers, stream, extensions = transport.handle_request(...)
+ try:
+ ...
+ finally:
+ stream.close()
+ """
+
+ def read(self) -> bytes:
+ """
+ Simple cases can use `.read()` as a convience method for consuming
+ the entire stream and then closing it.
+
+ Example:
+
+ status_code, headers, stream, extensions = transport.handle_request(...)
+ body = stream.read()
+ """
+ try:
+ return b"".join([part for part in self])
+ finally:
+ self.close()
+
+
+class AsyncByteStream:
+ async def __aiter__(self) -> AsyncIterator[bytes]:
+ raise NotImplementedError(
+ "The '__aiter__' method must be implemented."
+ ) # pragma: nocover
+ yield b"" # pragma: nocover
+
+ async def aclose(self) -> None:
+ pass
+
+ async def aread(self) -> bytes:
+ try:
+ return b"".join([part async for part in self])
+ finally:
+ await self.aclose()
assert len(resp1.history) == 0
+class ConsumeBodyTransport(httpx.MockTransport):
+ async def handle_async_request(self, request: Request) -> Response:
+ assert isinstance(request.stream, httpx.AsyncByteStream)
+ [_ async for _ in request.stream]
+ return self.handler(request)
+
+
@pytest.mark.asyncio
async def test_digest_auth_unavailable_streaming_body():
url = "https://example.org/"
async def streaming_body():
yield b"Example request body" # pragma: nocover
- async with httpx.AsyncClient(transport=httpx.MockTransport(app)) as client:
+ async with httpx.AsyncClient(transport=ConsumeBodyTransport(app)) as client:
with pytest.raises(httpx.StreamConsumed):
await client.post(url, content=streaming_body(), auth=auth)
client = httpx.Client(transport=httpx.MockTransport(redirects))
url = "https://example.org/redirect_301"
with client.stream("GET", url, follow_redirects=False) as response:
- assert not response.is_closed
+ pass
assert response.status_code == httpx.codes.MOVED_PERMANENTLY
assert response.headers["location"] == "https://example.org/"
+class ConsumeBodyTransport(httpx.MockTransport):
+ def handle_request(self, request: httpx.Request) -> httpx.Response:
+ assert isinstance(request.stream, httpx.SyncByteStream)
+ [_ for _ in request.stream]
+ return self.handler(request)
+
+
def test_cannot_redirect_streaming_body():
- client = httpx.Client(transport=httpx.MockTransport(redirects))
+ client = httpx.Client(transport=ConsumeBodyTransport(redirects))
url = "https://example.org/redirect_body"
def streaming_body():
raise RuntimeError()
-async def empty_stream():
- yield b""
-
-
@pytest.mark.usefixtures("async_environment")
async def test_asgi_transport():
async with httpx.ASGITransport(app=hello_world) as transport:
- status_code, headers, stream, ext = await transport.handle_async_request(
- method=b"GET",
- url=(b"http", b"www.example.org", 80, b"/"),
- headers=[(b"Host", b"www.example.org")],
- stream=empty_stream(),
- extensions={},
- )
- body = b"".join([part async for part in stream])
-
- assert status_code == 200
- assert body == b"Hello, World!"
+ request = httpx.Request("GET", "http://www.example.com/")
+ response = await transport.handle_async_request(request)
+ await response.aread()
+ assert response.status_code == 200
+ assert response.content == b"Hello, World!"
@pytest.mark.usefixtures("async_environment")
async def test_asgi_transport_no_body():
async with httpx.ASGITransport(app=echo_body) as transport:
- status_code, headers, stream, ext = await transport.handle_async_request(
- method=b"GET",
- url=(b"http", b"www.example.org", 80, b"/"),
- headers=[(b"Host", b"www.example.org")],
- stream=empty_stream(),
- extensions={},
- )
- body = b"".join([part async for part in stream])
-
- assert status_code == 200
- assert body == b""
+ request = httpx.Request("GET", "http://www.example.com/")
+ response = await transport.handle_async_request(request)
+ await response.aread()
+ assert response.status_code == 200
+ assert response.content == b""
@pytest.mark.usefixtures("async_environment")