From: Tom Christie Date: Fri, 5 Apr 2019 16:49:59 +0000 (+0100) Subject: Connections X-Git-Tag: 0.0.3~1^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6a4376b202b339b384c728b6753f8f852b42cac8;p=thirdparty%2Fhttpx.git Connections --- diff --git a/httpcore/api.py b/httpcore/api.py index e8bb5d98..24c4fec0 100644 --- a/httpcore/api.py +++ b/httpcore/api.py @@ -9,8 +9,7 @@ from .config import ( SSLConfig, TimeoutConfig, ) -from .decoders import IdentityDecoder -from .exceptions import ResponseClosed, StreamConsumed +from .models import Response async def request( @@ -22,7 +21,7 @@ async def request( stream: bool = False, ssl: SSLConfig = DEFAULT_SSL_CONFIG, timeout: TimeoutConfig = DEFAULT_TIMEOUT_CONFIG, -) -> "Response": +) -> Response: async with PoolManager(ssl=ssl, timeout=timeout) as pool: return await pool.request( method=method, url=url, headers=headers, body=body, stream=stream @@ -50,13 +49,8 @@ class PoolManager: headers: typing.Sequence[typing.Tuple[bytes, bytes]] = (), body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"", stream: bool = False, - ) -> "Response": - if stream: - async def streaming_body(): - yield b"Hello, " - yield b"world!" - return Response(200, body=streaming_body) - return Response(200, body=b"Hello, world!") + ) -> Response: + raise NotImplementedError() async def close(self) -> None: self.is_closed = True @@ -71,67 +65,3 @@ class PoolManager: traceback: TracebackType = None, ) -> None: await self.close() - - -class Response: - def __init__( - self, - status_code: int, - *, - headers: typing.Sequence[typing.Tuple[bytes, bytes]] = (), - body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"", - on_close: typing.Callable = None, - ): - self.status_code = status_code - self.headers = list(headers) - self.on_close = on_close - self.is_closed = False - self.is_streamed = False - self.decoder = IdentityDecoder() - if isinstance(body, bytes): - self.is_closed = True - self.body = body - else: - self.body_aiter = body - - async def read(self) -> bytes: - """ - Read and return the response content. - """ - if not hasattr(self, "body"): - body = b"" - async for part in self.stream(): - body += part - self.body = body - return self.body - - async def stream(self): - """ - A byte-iterator over the decoded response content. - This will allow us to handle gzip, deflate, and brotli encoded responses. - """ - if hasattr(self, "body"): - yield self.body - else: - async for chunk in self.raw(): - yield self.decoder.decode(chunk) - yield self.decoder.flush() - - async def raw(self) -> typing.AsyncIterator[bytes]: - """ - A byte-iterator over the raw response content. - """ - if self.is_streamed: - raise StreamConsumed() - if self.is_closed: - raise ResponseClosed() - self.is_streamed = True - async for part in self.body_aiter(): - yield part - await self.close() - - async def close(self) -> None: - if not self.is_closed: - self.is_closed = True - if self.on_close is not None: - await self.on_close() diff --git a/httpcore/connections.py b/httpcore/connections.py new file mode 100644 index 00000000..dc9c4ff0 --- /dev/null +++ b/httpcore/connections.py @@ -0,0 +1,70 @@ +from config import TimeoutConfig + +import asyncio +import h11 +import ssl + + +class Connection: + def __init__(self): + self.reader = None + self.writer = None + self.state = h11.Connection(our_role=h11.CLIENT) + + async def open(self, host: str, port: int, ssl: ssl.SSLContext): + try: + self.reader, self.writer = await asyncio.wait_for( + asyncio.open_connection(host, port, ssl=ssl), timeout + ) + except asyncio.TimeoutError: + raise ConnectTimeout() + + async def send(self, request: Request) -> Response: + method = request.method + + target = request.url.path + if request.url.query: + target += "?" + request.url.query + + headers = [ + ("host", request.url.netloc) + ] += request.headers + + # Send the request method, path/query, and headers. + event = h11.Request(method=method, target=target, headers=headers) + await self._send_event(event) + + # Send the request body. + if request.is_streaming: + async for data in request.raw(): + event = h11.Data(data=data) + await self._send_event(event) + else: + event = h11.Data(data=request.body) + await self._send_event(event) + + # Finalize sending the request. + event = h11.EndOfMessage() + await connection.send_event(event) + + async def _send_event(self, message): + data = self.state.send(message) + self.writer.write(data) + + async def _receive_event(self, timeout): + event = self.state.next_event() + + while type(event) is h11.NEED_DATA: + try: + data = await asyncio.wait_for(self.reader.read(2048), timeout) + except asyncio.TimeoutError: + raise ReadTimeout() + self.state.receive_data(data) + event = self.state.next_event() + + return event + + async def close(self): + self.writer.close() + if hasattr(self.writer, "wait_closed"): + await self.writer.wait_closed() diff --git a/httpcore/models.py b/httpcore/models.py new file mode 100644 index 00000000..edf174b1 --- /dev/null +++ b/httpcore/models.py @@ -0,0 +1,68 @@ +import typing + +from .decoders import IdentityDecoder +from .exceptions import ResponseClosed, StreamConsumed + + +class Response: + def __init__( + self, + status_code: int, + *, + headers: typing.Sequence[typing.Tuple[bytes, bytes]] = (), + body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"", + on_close: typing.Callable = None, + ): + self.status_code = status_code + self.headers = list(headers) + self.on_close = on_close + self.is_closed = False + self.is_streamed = False + self.decoder = IdentityDecoder() + if isinstance(body, bytes): + self.is_closed = True + self.body = body + else: + self.body_aiter = body + + async def read(self) -> bytes: + """ + Read and return the response content. + """ + if not hasattr(self, "body"): + body = b"" + async for part in self.stream(): + body += part + self.body = body + return self.body + + async def stream(self): + """ + A byte-iterator over the decoded response content. + This will allow us to handle gzip, deflate, and brotli encoded responses. + """ + if hasattr(self, "body"): + yield self.body + else: + async for chunk in self.raw(): + yield self.decoder.decode(chunk) + yield self.decoder.flush() + + async def raw(self) -> typing.AsyncIterator[bytes]: + """ + A byte-iterator over the raw response content. + """ + if self.is_streamed: + raise StreamConsumed() + if self.is_closed: + raise ResponseClosed() + self.is_streamed = True + async for part in self.body_aiter(): + yield part + await self.close() + + async def close(self) -> None: + if not self.is_closed: + self.is_closed = True + if self.on_close is not None: + await self.on_close() diff --git a/tests/test_api.py b/tests/test_api.py index 2d5df0f3..e69de29b 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,96 +0,0 @@ -import pytest - -import httpcore - - -@pytest.mark.asyncio -async def test_request(): - response = await httpcore.request("GET", "http://example.com") - assert response.status_code == 200 - assert response.body == b"Hello, world!" - assert response.is_closed - - -@pytest.mark.asyncio -async def test_read_response(): - response = await httpcore.request("GET", "http://example.com") - - assert response.status_code == 200 - assert response.body == b"Hello, world!" - assert response.is_closed - - body = await response.read() - - assert body == b"Hello, world!" - assert response.body == b"Hello, world!" - assert response.is_closed - - -@pytest.mark.asyncio -async def test_stream_response(): - response = await httpcore.request("GET", "http://example.com") - - assert response.status_code == 200 - assert response.body == b"Hello, world!" - assert response.is_closed - - body = b'' - async for part in response.stream(): - body += part - - assert body == b"Hello, world!" - assert response.body == b"Hello, world!" - assert response.is_closed - - -@pytest.mark.asyncio -async def test_read_streaming_response(): - response = await httpcore.request("GET", "http://example.com", stream=True) - - assert response.status_code == 200 - assert not hasattr(response, 'body') - assert not response.is_closed - - body = await response.read() - - assert body == b"Hello, world!" - assert response.body == b"Hello, world!" - assert response.is_closed - - -@pytest.mark.asyncio -async def test_stream_streaming_response(): - response = await httpcore.request("GET", "http://example.com", stream=True) - - assert response.status_code == 200 - assert not hasattr(response, 'body') - assert not response.is_closed - - body = b'' - async for part in response.stream(): - body += part - - assert body == b"Hello, world!" - assert not hasattr(response, 'body') - assert response.is_closed - - -@pytest.mark.asyncio -async def test_cannot_read_after_stream_consumed(): - response = await httpcore.request("GET", "http://example.com", stream=True) - - body = b'' - async for part in response.stream(): - body += part - - with pytest.raises(httpcore.StreamConsumed): - await response.read() - -@pytest.mark.asyncio -async def test_cannot_read_after_response_closed(): - response = await httpcore.request("GET", "http://example.com", stream=True) - - await response.close() - - with pytest.raises(httpcore.ResponseClosed): - await response.read() diff --git a/tests/test_responses.py b/tests/test_responses.py new file mode 100644 index 00000000..19a387ba --- /dev/null +++ b/tests/test_responses.py @@ -0,0 +1,109 @@ +import pytest + +import httpcore + + +class MockRequests(httpcore.PoolManager): + async def request(self, method, url, *, headers = (), body = b'', stream = False) -> httpcore.Response: + if stream: + async def streaming_body(): + yield b"Hello, " + yield b"world!" + return httpcore.Response(200, body=streaming_body) + return httpcore.Response(200, body=b"Hello, world!") + + +http = MockRequests() + + +@pytest.mark.asyncio +async def test_request(): + response = await http.request("GET", "http://example.com") + assert response.status_code == 200 + assert response.body == b"Hello, world!" + assert response.is_closed + + +@pytest.mark.asyncio +async def test_read_response(): + response = await http.request("GET", "http://example.com") + + assert response.status_code == 200 + assert response.body == b"Hello, world!" + assert response.is_closed + + body = await response.read() + + assert body == b"Hello, world!" + assert response.body == b"Hello, world!" + assert response.is_closed + + +@pytest.mark.asyncio +async def test_stream_response(): + response = await http.request("GET", "http://example.com") + + assert response.status_code == 200 + assert response.body == b"Hello, world!" + assert response.is_closed + + body = b'' + async for part in response.stream(): + body += part + + assert body == b"Hello, world!" + assert response.body == b"Hello, world!" + assert response.is_closed + + +@pytest.mark.asyncio +async def test_read_streaming_response(): + response = await http.request("GET", "http://example.com", stream=True) + + assert response.status_code == 200 + assert not hasattr(response, 'body') + assert not response.is_closed + + body = await response.read() + + assert body == b"Hello, world!" + assert response.body == b"Hello, world!" + assert response.is_closed + + +@pytest.mark.asyncio +async def test_stream_streaming_response(): + response = await http.request("GET", "http://example.com", stream=True) + + assert response.status_code == 200 + assert not hasattr(response, 'body') + assert not response.is_closed + + body = b'' + async for part in response.stream(): + body += part + + assert body == b"Hello, world!" + assert not hasattr(response, 'body') + assert response.is_closed + + +@pytest.mark.asyncio +async def test_cannot_read_after_stream_consumed(): + response = await http.request("GET", "http://example.com", stream=True) + + body = b'' + async for part in response.stream(): + body += part + + with pytest.raises(httpcore.StreamConsumed): + await response.read() + +@pytest.mark.asyncio +async def test_cannot_read_after_response_closed(): + response = await http.request("GET", "http://example.com", stream=True) + + await response.close() + + with pytest.raises(httpcore.ResponseClosed): + await response.read()