]> git.ipfire.org Git - thirdparty/httpx.git/commitdiff
Connections
authorTom Christie <tom@tomchristie.com>
Fri, 5 Apr 2019 16:49:59 +0000 (17:49 +0100)
committerTom Christie <tom@tomchristie.com>
Fri, 5 Apr 2019 16:49:59 +0000 (17:49 +0100)
httpcore/api.py
httpcore/connections.py [new file with mode: 0644]
httpcore/models.py [new file with mode: 0644]
tests/test_api.py
tests/test_responses.py [new file with mode: 0644]

index e8bb5d98efad2c7c6bc108be826b689a44ca1f19..24c4fec0076203e8c7b63a9053324f6b96e9ee31 100644 (file)
@@ -9,8 +9,7 @@ from .config import (
     SSLConfig,
     TimeoutConfig,
 )
-from .decoders import IdentityDecoder
-from .exceptions import ResponseClosed, StreamConsumed
+from .models import Response
 
 
 async def request(
@@ -22,7 +21,7 @@ async def request(
     stream: bool = False,
     ssl: SSLConfig = DEFAULT_SSL_CONFIG,
     timeout: TimeoutConfig = DEFAULT_TIMEOUT_CONFIG,
-) -> "Response":
+) -> Response:
     async with PoolManager(ssl=ssl, timeout=timeout) as pool:
        return await pool.request(
            method=method, url=url, headers=headers, body=body, stream=stream
@@ -50,13 +49,8 @@ class PoolManager:
         headers: typing.Sequence[typing.Tuple[bytes, bytes]] = (),
         body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"",
         stream: bool = False,
-    ) -> "Response":
-        if stream:
-            async def streaming_body():
-                yield b"Hello, "
-                yield b"world!"
-            return Response(200, body=streaming_body)
-        return Response(200, body=b"Hello, world!")
+    ) -> Response:
+        raise NotImplementedError()
 
     async def close(self) -> None:
         self.is_closed = True
@@ -71,67 +65,3 @@ class PoolManager:
         traceback: TracebackType = None,
     ) -> None:
         await self.close()
-
-
-class Response:
-    def __init__(
-        self,
-        status_code: int,
-        *,
-        headers: typing.Sequence[typing.Tuple[bytes, bytes]] = (),
-        body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"",
-        on_close: typing.Callable = None,
-    ):
-        self.status_code = status_code
-        self.headers = list(headers)
-        self.on_close = on_close
-        self.is_closed = False
-        self.is_streamed = False
-        self.decoder = IdentityDecoder()
-        if isinstance(body, bytes):
-            self.is_closed = True
-            self.body = body
-        else:
-            self.body_aiter = body
-
-    async def read(self) -> bytes:
-        """
-        Read and return the response content.
-        """
-        if not hasattr(self, "body"):
-            body = b""
-            async for part in self.stream():
-                body += part
-            self.body = body
-        return self.body
-
-    async def stream(self):
-        """
-        A byte-iterator over the decoded response content.
-        This will allow us to handle gzip, deflate, and brotli encoded responses.
-        """
-        if hasattr(self, "body"):
-            yield self.body
-        else:
-            async for chunk in self.raw():
-                yield self.decoder.decode(chunk)
-            yield self.decoder.flush()
-
-    async def raw(self) -> typing.AsyncIterator[bytes]:
-        """
-        A byte-iterator over the raw response content.
-        """
-        if self.is_streamed:
-            raise StreamConsumed()
-        if self.is_closed:
-            raise ResponseClosed()
-        self.is_streamed = True
-        async for part in self.body_aiter():
-            yield part
-        await self.close()
-
-    async def close(self) -> None:
-        if not self.is_closed:
-            self.is_closed = True
-            if self.on_close is not None:
-                await self.on_close()
diff --git a/httpcore/connections.py b/httpcore/connections.py
new file mode 100644 (file)
index 0000000..dc9c4ff
--- /dev/null
@@ -0,0 +1,70 @@
+from config import TimeoutConfig
+
+import asyncio
+import h11
+import ssl
+
+
+class Connection:
+    def __init__(self):
+        self.reader = None
+        self.writer = None
+        self.state = h11.Connection(our_role=h11.CLIENT)
+
+    async def open(self, host: str, port: int, ssl: ssl.SSLContext):
+        try:
+            self.reader, self.writer = await asyncio.wait_for(
+                asyncio.open_connection(host, port, ssl=ssl), timeout
+            )
+        except asyncio.TimeoutError:
+            raise ConnectTimeout()
+
+    async def send(self, request: Request) -> Response:
+        method = request.method
+
+        target = request.url.path
+        if request.url.query:
+            target += "?" + request.url.query
+
+        headers = [
+            ("host", request.url.netloc)
+        ] += request.headers
+
+        # Send the request method, path/query, and headers.
+        event = h11.Request(method=method, target=target, headers=headers)
+        await self._send_event(event)
+
+        # Send the request body.
+        if request.is_streaming:
+            async for data in request.raw():
+                event = h11.Data(data=data)
+                await self._send_event(event)
+        else:
+            event = h11.Data(data=request.body)
+            await self._send_event(event)
+
+        # Finalize sending the request.
+        event = h11.EndOfMessage()
+        await connection.send_event(event)
+
+    async def _send_event(self, message):
+        data = self.state.send(message)
+        self.writer.write(data)
+
+    async def _receive_event(self, timeout):
+        event = self.state.next_event()
+
+        while type(event) is h11.NEED_DATA:
+            try:
+                data = await asyncio.wait_for(self.reader.read(2048), timeout)
+            except asyncio.TimeoutError:
+                raise ReadTimeout()
+            self.state.receive_data(data)
+            event = self.state.next_event()
+
+        return event
+
+    async def close(self):
+        self.writer.close()
+        if hasattr(self.writer, "wait_closed"):
+            await self.writer.wait_closed()
diff --git a/httpcore/models.py b/httpcore/models.py
new file mode 100644 (file)
index 0000000..edf174b
--- /dev/null
@@ -0,0 +1,68 @@
+import typing
+
+from .decoders import IdentityDecoder
+from .exceptions import ResponseClosed, StreamConsumed
+
+
+class Response:
+    def __init__(
+        self,
+        status_code: int,
+        *,
+        headers: typing.Sequence[typing.Tuple[bytes, bytes]] = (),
+        body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"",
+        on_close: typing.Callable = None,
+    ):
+        self.status_code = status_code
+        self.headers = list(headers)
+        self.on_close = on_close
+        self.is_closed = False
+        self.is_streamed = False
+        self.decoder = IdentityDecoder()
+        if isinstance(body, bytes):
+            self.is_closed = True
+            self.body = body
+        else:
+            self.body_aiter = body
+
+    async def read(self) -> bytes:
+        """
+        Read and return the response content.
+        """
+        if not hasattr(self, "body"):
+            body = b""
+            async for part in self.stream():
+                body += part
+            self.body = body
+        return self.body
+
+    async def stream(self):
+        """
+        A byte-iterator over the decoded response content.
+        This will allow us to handle gzip, deflate, and brotli encoded responses.
+        """
+        if hasattr(self, "body"):
+            yield self.body
+        else:
+            async for chunk in self.raw():
+                yield self.decoder.decode(chunk)
+            yield self.decoder.flush()
+
+    async def raw(self) -> typing.AsyncIterator[bytes]:
+        """
+        A byte-iterator over the raw response content.
+        """
+        if self.is_streamed:
+            raise StreamConsumed()
+        if self.is_closed:
+            raise ResponseClosed()
+        self.is_streamed = True
+        async for part in self.body_aiter():
+            yield part
+        await self.close()
+
+    async def close(self) -> None:
+        if not self.is_closed:
+            self.is_closed = True
+            if self.on_close is not None:
+                await self.on_close()
index 2d5df0f37373e08e850897d7a5b229559b158aff..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 (file)
@@ -1,96 +0,0 @@
-import pytest
-
-import httpcore
-
-
-@pytest.mark.asyncio
-async def test_request():
-    response = await httpcore.request("GET", "http://example.com")
-    assert response.status_code == 200
-    assert response.body == b"Hello, world!"
-    assert response.is_closed
-
-
-@pytest.mark.asyncio
-async def test_read_response():
-    response = await httpcore.request("GET", "http://example.com")
-
-    assert response.status_code == 200
-    assert response.body == b"Hello, world!"
-    assert response.is_closed
-
-    body = await response.read()
-
-    assert body == b"Hello, world!"
-    assert response.body == b"Hello, world!"
-    assert response.is_closed
-
-
-@pytest.mark.asyncio
-async def test_stream_response():
-    response = await httpcore.request("GET", "http://example.com")
-
-    assert response.status_code == 200
-    assert response.body == b"Hello, world!"
-    assert response.is_closed
-
-    body = b''
-    async for part in response.stream():
-        body += part
-
-    assert body == b"Hello, world!"
-    assert response.body == b"Hello, world!"
-    assert response.is_closed
-
-
-@pytest.mark.asyncio
-async def test_read_streaming_response():
-    response = await httpcore.request("GET", "http://example.com", stream=True)
-
-    assert response.status_code == 200
-    assert not hasattr(response, 'body')
-    assert not response.is_closed
-
-    body = await response.read()
-
-    assert body == b"Hello, world!"
-    assert response.body == b"Hello, world!"
-    assert response.is_closed
-
-
-@pytest.mark.asyncio
-async def test_stream_streaming_response():
-    response = await httpcore.request("GET", "http://example.com", stream=True)
-
-    assert response.status_code == 200
-    assert not hasattr(response, 'body')
-    assert not response.is_closed
-
-    body = b''
-    async for part in response.stream():
-        body += part
-
-    assert body == b"Hello, world!"
-    assert not hasattr(response, 'body')
-    assert response.is_closed
-
-
-@pytest.mark.asyncio
-async def test_cannot_read_after_stream_consumed():
-    response = await httpcore.request("GET", "http://example.com", stream=True)
-
-    body = b''
-    async for part in response.stream():
-        body += part
-
-    with pytest.raises(httpcore.StreamConsumed):
-        await response.read()
-
-@pytest.mark.asyncio
-async def test_cannot_read_after_response_closed():
-    response = await httpcore.request("GET", "http://example.com", stream=True)
-
-    await response.close()
-
-    with pytest.raises(httpcore.ResponseClosed):
-        await response.read()
diff --git a/tests/test_responses.py b/tests/test_responses.py
new file mode 100644 (file)
index 0000000..19a387b
--- /dev/null
@@ -0,0 +1,109 @@
+import pytest
+
+import httpcore
+
+
+class MockRequests(httpcore.PoolManager):
+    async def request(self, method, url, *, headers = (), body = b'', stream = False) -> httpcore.Response:
+        if stream:
+            async def streaming_body():
+                yield b"Hello, "
+                yield b"world!"
+            return httpcore.Response(200, body=streaming_body)
+        return httpcore.Response(200, body=b"Hello, world!")
+
+
+http = MockRequests()
+
+
+@pytest.mark.asyncio
+async def test_request():
+    response = await http.request("GET", "http://example.com")
+    assert response.status_code == 200
+    assert response.body == b"Hello, world!"
+    assert response.is_closed
+
+
+@pytest.mark.asyncio
+async def test_read_response():
+    response = await http.request("GET", "http://example.com")
+
+    assert response.status_code == 200
+    assert response.body == b"Hello, world!"
+    assert response.is_closed
+
+    body = await response.read()
+
+    assert body == b"Hello, world!"
+    assert response.body == b"Hello, world!"
+    assert response.is_closed
+
+
+@pytest.mark.asyncio
+async def test_stream_response():
+    response = await http.request("GET", "http://example.com")
+
+    assert response.status_code == 200
+    assert response.body == b"Hello, world!"
+    assert response.is_closed
+
+    body = b''
+    async for part in response.stream():
+        body += part
+
+    assert body == b"Hello, world!"
+    assert response.body == b"Hello, world!"
+    assert response.is_closed
+
+
+@pytest.mark.asyncio
+async def test_read_streaming_response():
+    response = await http.request("GET", "http://example.com", stream=True)
+
+    assert response.status_code == 200
+    assert not hasattr(response, 'body')
+    assert not response.is_closed
+
+    body = await response.read()
+
+    assert body == b"Hello, world!"
+    assert response.body == b"Hello, world!"
+    assert response.is_closed
+
+
+@pytest.mark.asyncio
+async def test_stream_streaming_response():
+    response = await http.request("GET", "http://example.com", stream=True)
+
+    assert response.status_code == 200
+    assert not hasattr(response, 'body')
+    assert not response.is_closed
+
+    body = b''
+    async for part in response.stream():
+        body += part
+
+    assert body == b"Hello, world!"
+    assert not hasattr(response, 'body')
+    assert response.is_closed
+
+
+@pytest.mark.asyncio
+async def test_cannot_read_after_stream_consumed():
+    response = await http.request("GET", "http://example.com", stream=True)
+
+    body = b''
+    async for part in response.stream():
+        body += part
+
+    with pytest.raises(httpcore.StreamConsumed):
+        await response.read()
+
+@pytest.mark.asyncio
+async def test_cannot_read_after_response_closed():
+    response = await http.request("GET", "http://example.com", stream=True)
+
+    await response.close()
+
+    with pytest.raises(httpcore.ResponseClosed):
+        await response.read()