method = self.redirect_method(request, response)
url = self.redirect_url(request, response)
headers = self.redirect_headers(request, url)
- body = self.redirect_body(request, method)
- return Request(method=method, url=url, headers=headers, body=body)
+ content = self.redirect_content(request, method)
+ return Request(method=method, url=url, headers=headers, content=content)
def redirect_method(self, request: Request, response: Response) -> str:
"""
del headers["Authorization"]
return headers
- def redirect_body(self, request: Request, method: str) -> bytes:
+ def redirect_content(self, request: Request, method: str) -> bytes:
"""
Return the body that should be used for the redirect request.
"""
return b""
if request.is_streaming:
raise RedirectBodyUnavailable()
- return request.body
+ return request.content
TimeoutConfig,
)
from .dispatch.connection_pool import ConnectionPool
-from .models import URL, BodyTypes, HeaderTypes, Request, Response, URLTypes
+from .models import URL, ByteOrByteStream, HeaderTypes, Request, Response, URLTypes
class Client:
method: str,
url: URLTypes,
*,
- body: BodyTypes = b"",
+ content: ByteOrByteStream = b"",
headers: HeaderTypes = None,
stream: bool = False,
allow_redirects: bool = True,
ssl: SSLConfig = None,
timeout: TimeoutConfig = None,
) -> Response:
- request = Request(method, url, headers=headers, body=body)
+ request = Request(method, url, headers=headers, content=content)
self.prepare_request(request)
response = await self.send(
request,
self,
url: URLTypes,
*,
- body: BodyTypes = b"",
+ content: ByteOrByteStream = b"",
headers: HeaderTypes = None,
stream: bool = False,
allow_redirects: bool = True,
return await self.request(
"POST",
url,
- body=body,
+ content=content,
headers=headers,
stream=stream,
allow_redirects=allow_redirects,
self,
url: URLTypes,
*,
- body: BodyTypes = b"",
+ content: ByteOrByteStream = b"",
headers: HeaderTypes = None,
stream: bool = False,
allow_redirects: bool = True,
return await self.request(
"PUT",
url,
- body=body,
+ content=content,
headers=headers,
stream=stream,
allow_redirects=allow_redirects,
self,
url: URLTypes,
*,
- body: BodyTypes = b"",
+ content: ByteOrByteStream = b"",
headers: HeaderTypes = None,
stream: bool = False,
allow_redirects: bool = True,
return await self.request(
"PATCH",
url,
- body=body,
+ content=content,
headers=headers,
stream=stream,
allow_redirects=allow_redirects,
self,
url: URLTypes,
*,
- body: BodyTypes = b"",
+ content: ByteOrByteStream = b"",
headers: HeaderTypes = None,
stream: bool = False,
allow_redirects: bool = True,
return await self.request(
"DELETE",
url,
- body=body,
+ content=content,
headers=headers,
stream=stream,
allow_redirects=allow_redirects,
reason_phrase = event.reason.decode("latin1")
status_code = event.status_code
headers = event.headers
- body = self._body_iter(timeout)
+ content = self._body_iter(timeout)
response = Response(
status_code=status_code,
reason_phrase=reason_phrase,
protocol="HTTP/1.1",
headers=headers,
- content=body,
+ content=content,
on_close=self.response_closed,
request=request,
)
elif not k.startswith(b":"):
headers.append((k, v))
- body = self.body_iter(stream_id, timeout)
+ content = self.body_iter(stream_id, timeout)
on_close = functools.partial(self.response_closed, stream_id=stream_id)
response = Response(
status_code=status_code,
protocol="HTTP/2",
headers=headers,
- content=body,
+ content=content,
on_close=on_close,
request=request,
)
from types import TracebackType
from .config import TimeoutConfig
-from .models import URL, Request, Response
+from .models import URL, ByteOrByteStream, HeaderTypes, Request, Response, URLTypes
OptionalTimeout = typing.Optional[TimeoutConfig]
async def request(
self,
method: str,
- url: typing.Union[str, URL],
+ url: URLTypes,
*,
- headers: typing.List[typing.Tuple[bytes, bytes]] = [],
- body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"",
+ headers: HeaderTypes = None,
+ content: ByteOrByteStream = b"",
**options: typing.Any,
) -> Response:
- request = Request(method, url, headers=headers, body=body)
+ request = Request(method, url, headers=headers, content=content)
self.prepare_request(request)
response = await self.send(request, **options)
return response
typing.List[typing.Tuple[typing.AnyStr, typing.AnyStr]],
]
-BodyTypes = typing.Union[bytes, typing.AsyncIterator[bytes]]
+ByteOrByteStream = typing.Union[bytes, typing.AsyncIterator[bytes]]
class URL:
except KeyError:
return default
- def getlist(
- self, key: str, split_commas: bool=False
- ) -> typing.List[str]:
+ def getlist(self, key: str, split_commas: bool = False) -> typing.List[str]:
"""
Return multiple header values.
url: typing.Union[str, URL],
*,
headers: HeaderTypes = None,
- body: BodyTypes = b"",
+ content: ByteOrByteStream = b"",
):
self.method = method.upper()
self.url = URL(url) if isinstance(url, str) else url
- if isinstance(body, bytes):
+ if isinstance(content, bytes):
self.is_streaming = False
- self.body = body
+ self.content = content
else:
self.is_streaming = True
- self.body_aiter = body
+ self.content_aiter = content
self.headers = Headers(headers)
async def read(self) -> bytes:
"""
Read and return the response content.
"""
- if not hasattr(self, "body"):
- body = b""
+ if not hasattr(self, "content"):
+ content = b""
async for part in self.stream():
- body += part
- self.body = body
- return self.body
+ content += part
+ self.content = content
+ return self.content
async def stream(self) -> typing.AsyncIterator[bytes]:
if self.is_streaming:
- async for part in self.body_aiter:
+ async for part in self.content_aiter:
yield part
- elif self.body:
- yield self.body
+ elif self.content:
+ yield self.content
def prepare(self) -> None:
"""
if not has_content_length:
if self.is_streaming:
auto_headers.append((b"transfer-encoding", b"chunked"))
- elif self.body:
- content_length = str(len(self.body)).encode()
+ elif self.content:
+ content_length = str(len(self.content)).encode()
auto_headers.append((b"content-length", content_length))
if not has_accept_encoding:
auto_headers.append((b"accept-encoding", ACCEPT_ENCODING.encode()))
reason_phrase: str = None,
protocol: str = None,
headers: HeaderTypes = None,
- content: BodyTypes = b"",
+ content: ByteOrByteStream = b"",
on_close: typing.Callable = None,
request: Request = None,
history: typing.List["Response"] = None,
elif request.url.path == "/cross_domain_target":
headers = dict(request.headers.items())
- body = json.dumps({"headers": headers}).encode()
- return Response(codes.ok, content=body, request=request)
+ content = json.dumps({"headers": headers}).encode()
+ return Response(codes.ok, content=content, request=request)
elif request.url.path == "/redirect_body":
- body = await request.read()
+ await request.read()
headers = {"location": "/redirect_body_target"}
return Response(codes.permanent_redirect, headers=headers, request=request)
elif request.url.path == "/redirect_body_target":
- body = await request.read()
- body = json.dumps({"body": body.decode()}).encode()
+ content = await request.read()
+ body = json.dumps({"body": content.decode()}).encode()
return Response(codes.ok, content=body, request=request)
return Response(codes.ok, content=b"Hello, world!", request=request)
async def test_body_redirect():
client = RedirectAdapter(MockDispatch())
url = "https://example.org/redirect_body"
- body = b"Example request body"
- response = await client.request("POST", url, body=body)
+ content = b"Example request body"
+ response = await client.request("POST", url, content=content)
data = json.loads(response.content.decode())
assert response.url == URL("https://example.org/redirect_body_target")
assert data == {"body": "Example request body"}
client = RedirectAdapter(MockDispatch())
url = "https://example.org/redirect_body"
- async def body():
+ async def streaming_body():
yield b"Example request body"
with pytest.raises(RedirectBodyUnavailable):
- await client.request("POST", url, body=body())
+ await client.request("POST", url, content=streaming_body())
async def test_http2_post_request():
server = MockServer()
async with httpcore.HTTP2Connection(reader=server, writer=server) as conn:
- response = await conn.request("POST", "http://example.org", body=b"<data>")
+ response = await conn.request("POST", "http://example.org", content=b"<data>")
assert response.status_code == 200
assert json.loads(response.content) == {
"method": "POST",
def test_content_length_header():
- request = httpcore.Request("POST", "http://example.org", body=b"test 123")
+ request = httpcore.Request("POST", "http://example.org", content=b"test 123")
request.prepare()
assert request.headers == httpcore.Headers(
[
async def streaming_body(data):
yield data # pragma: nocover
- body = streaming_body(b"test 123")
+ content = streaming_body(b"test 123")
- request = httpcore.Request("POST", "http://example.org", body=body)
+ request = httpcore.Request("POST", "http://example.org", content=content)
request.prepare()
assert request.headers == httpcore.Headers(
[
async def streaming_body(data):
yield data # pragma: nocover
- body = streaming_body(b"test 123")
+ content = streaming_body(b"test 123")
headers = [(b"content-length", b"8")]
- request = httpcore.Request("POST", "http://example.org", body=body, headers=headers)
+ request = httpcore.Request(
+ "POST", "http://example.org", content=content, headers=headers
+ )
request.prepare()
assert request.headers == httpcore.Headers(
[
async def test_post(server):
url = "http://127.0.0.1:8000/"
async with httpcore.Client() as client:
- response = await client.post(url, body=b"Hello, world!")
+ response = await client.post(url, content=b"Hello, world!")
assert response.status_code == 200
async with httpcore.Client() as client:
response = await client.request(
- "POST", "http://127.0.0.1:8000/", body=hello_world()
+ "POST", "http://127.0.0.1:8000/", content=hello_world()
)
assert response.status_code == 200