url = self.redirect_url(request, response)
headers = self.redirect_headers(request, url)
content = self.redirect_content(request, method)
- return Request(method=method, url=url, headers=headers, content=content)
+ return Request(method=method, url=url, headers=headers, data=content)
def redirect_method(self, request: Request, response: Response) -> str:
"""
)
from ..models import (
URL,
- ByteOrByteStream,
Headers,
HeaderTypes,
+ QueryParamTypes,
Request,
+ RequestData,
Response,
URLTypes,
)
method: str,
url: URLTypes,
*,
- content: ByteOrByteStream = b"",
+ data: RequestData = b"",
+ query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
stream: bool = False,
allow_redirects: bool = True,
ssl: SSLConfig = None,
timeout: TimeoutConfig = None,
) -> SyncResponse:
- request = Request(method, url, headers=headers, content=content)
+ request = Request(
+ method, url, data=data, query_params=query_params, headers=headers
+ )
self.prepare_request(request)
response = self.send(
request,
self,
url: URLTypes,
*,
+ query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
stream: bool = False,
allow_redirects: bool = True,
self,
url: URLTypes,
*,
+ query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
stream: bool = False,
allow_redirects: bool = True,
self,
url: URLTypes,
*,
+ query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
stream: bool = False,
allow_redirects: bool = False, # Note: Differs to usual default.
self,
url: URLTypes,
*,
- content: ByteOrByteStream = b"",
+ data: RequestData = b"",
+ query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
stream: bool = False,
allow_redirects: bool = True,
return self.request(
"POST",
url,
- content=content,
+ data=data,
headers=headers,
stream=stream,
allow_redirects=allow_redirects,
self,
url: URLTypes,
*,
- content: ByteOrByteStream = b"",
+ data: RequestData = b"",
+ query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
stream: bool = False,
allow_redirects: bool = True,
return self.request(
"PUT",
url,
- content=content,
+ data=data,
headers=headers,
stream=stream,
allow_redirects=allow_redirects,
self,
url: URLTypes,
*,
- content: ByteOrByteStream = b"",
+ data: RequestData = b"",
+ query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
stream: bool = False,
allow_redirects: bool = True,
return self.request(
"PATCH",
url,
- content=content,
+ data=data,
headers=headers,
stream=stream,
allow_redirects=allow_redirects,
self,
url: URLTypes,
*,
- content: ByteOrByteStream = b"",
+ data: RequestData = b"",
+ query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
stream: bool = False,
allow_redirects: bool = True,
return self.request(
"DELETE",
url,
- content=content,
+ data=data,
headers=headers,
stream=stream,
allow_redirects=allow_redirects,
from .dispatch.connection_pool import ConnectionPool
from .models import (
URL,
- ByteOrByteStream,
HeaderTypes,
QueryParamTypes,
Request,
+ RequestData,
Response,
URLTypes,
)
method: str,
url: URLTypes,
*,
- content: ByteOrByteStream = b"",
+ data: RequestData = b"",
query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
stream: bool = False,
timeout: TimeoutConfig = None,
) -> Response:
request = Request(
- method, url, query_params=query_params, headers=headers, content=content
+ method, url, data=data, query_params=query_params, headers=headers
)
self.prepare_request(request)
response = await self.send(
self,
url: URLTypes,
*,
- content: ByteOrByteStream = b"",
+ data: RequestData = b"",
query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
stream: bool = False,
return await self.request(
"POST",
url,
- content=content,
+ data=data,
query_params=query_params,
headers=headers,
stream=stream,
self,
url: URLTypes,
*,
- content: ByteOrByteStream = b"",
+ data: RequestData = b"",
query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
stream: bool = False,
return await self.request(
"PUT",
url,
- content=content,
+ data=data,
query_params=query_params,
headers=headers,
stream=stream,
self,
url: URLTypes,
*,
- content: ByteOrByteStream = b"",
+ data: RequestData = b"",
query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
stream: bool = False,
return await self.request(
"PATCH",
url,
- content=content,
+ data=data,
query_params=query_params,
headers=headers,
stream=stream,
self,
url: URLTypes,
*,
- content: ByteOrByteStream = b"",
+ data: RequestData = b"",
query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
stream: bool = False,
return await self.request(
"DELETE",
url,
- content=content,
+ data=data,
query_params=query_params,
headers=headers,
stream=stream,
from types import TracebackType
from .config import TimeoutConfig
-from .models import URL, ByteOrByteStream, HeaderTypes, Request, Response, URLTypes
+from .models import (
+ URL,
+ HeaderTypes,
+ QueryParamTypes,
+ Request,
+ RequestData,
+ Response,
+ URLTypes,
+)
OptionalTimeout = typing.Optional[TimeoutConfig]
method: str,
url: URLTypes,
*,
+ data: RequestData = b"",
+ query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
- content: ByteOrByteStream = b"",
**options: typing.Any,
) -> Response:
- request = Request(method, url, headers=headers, content=content)
+ request = Request(
+ method, url, data=data, query_params=query_params, headers=headers
+ )
self.prepare_request(request)
response = await self.send(request, **options)
return response
typing.List[typing.Tuple[typing.AnyStr, typing.AnyStr]],
]
-ByteOrByteStream = typing.Union[bytes, typing.AsyncIterator[bytes]]
+RequestData = typing.Union[bytes, typing.AsyncIterator[bytes]]
+
+ResponseContent = typing.Union[bytes, typing.AsyncIterator[bytes]]
class URL:
method: str,
url: typing.Union[str, URL],
*,
+ data: RequestData = b"",
query_params: QueryParamTypes = None,
headers: HeaderTypes = None,
- content: ByteOrByteStream = b"",
):
self.method = method.upper()
self.url = URL(url, query_params=query_params)
- if isinstance(content, bytes):
+ if isinstance(data, bytes):
self.is_streaming = False
- self.content = content
+ self.content = data
else:
self.is_streaming = True
- self.content_aiter = content
+ self.content_aiter = data
self.headers = Headers(headers)
async def read(self) -> bytes:
reason_phrase: str = None,
protocol: str = None,
headers: HeaderTypes = None,
- content: ByteOrByteStream = b"",
+ content: ResponseContent = b"",
on_close: typing.Callable = None,
request: Request = None,
history: typing.List["Response"] = None,
async def test_body_redirect():
client = RedirectAdapter(MockDispatch())
url = "https://example.org/redirect_body"
- content = b"Example request body"
- response = await client.request("POST", url, content=content)
+ data = b"Example request body"
+ response = await client.request("POST", url, data=data)
data = json.loads(response.content.decode())
assert response.url == URL("https://example.org/redirect_body_target")
assert data == {"body": "Example request body"}
yield b"Example request body"
with pytest.raises(RedirectBodyUnavailable):
- await client.request("POST", url, content=streaming_body())
+ await client.request("POST", url, data=streaming_body())
async def test_http2_post_request():
server = MockServer()
async with httpcore.HTTP2Connection(reader=server, writer=server) as conn:
- response = await conn.request("POST", "http://example.org", content=b"<data>")
+ response = await conn.request("POST", "http://example.org", data=b"<data>")
assert response.status_code == 200
assert json.loads(response.content) == {
"method": "POST",
def test_content_length_header():
- request = httpcore.Request("POST", "http://example.org", content=b"test 123")
+ request = httpcore.Request("POST", "http://example.org", data=b"test 123")
request.prepare()
assert request.headers == httpcore.Headers(
[
async def streaming_body(data):
yield data # pragma: nocover
- content = streaming_body(b"test 123")
+ data = streaming_body(b"test 123")
- request = httpcore.Request("POST", "http://example.org", content=content)
+ request = httpcore.Request("POST", "http://example.org", data=data)
request.prepare()
assert request.headers == httpcore.Headers(
[
async def streaming_body(data):
yield data # pragma: nocover
- content = streaming_body(b"test 123")
+ data = streaming_body(b"test 123")
headers = [(b"content-length", b"8")]
request = httpcore.Request(
- "POST", "http://example.org", content=content, headers=headers
+ "POST", "http://example.org", data=data, headers=headers
)
request.prepare()
assert request.headers == httpcore.Headers(
async def test_post(server):
url = "http://127.0.0.1:8000/"
async with httpcore.Client() as client:
- response = await client.post(url, content=b"Hello, world!")
+ response = await client.post(url, data=b"Hello, world!")
assert response.status_code == 200
async with httpcore.Client() as client:
response = await client.request(
- "POST", "http://127.0.0.1:8000/", content=hello_world()
+ "POST", "http://127.0.0.1:8000/", data=hello_world()
)
assert response.status_code == 200
@threadpool
def test_post(server):
with httpcore.SyncClient() as http:
- response = http.post("http://127.0.0.1:8000/", content=b"Hello, world!")
+ response = http.post("http://127.0.0.1:8000/", data=b"Hello, world!")
assert response.status_code == 200
assert response.reason_phrase == "OK"