SSLConfig,
TimeoutConfig,
)
-from .decoders import IdentityDecoder
-from .exceptions import ResponseClosed, StreamConsumed
+from .models import Response
async def request(
stream: bool = False,
ssl: SSLConfig = DEFAULT_SSL_CONFIG,
timeout: TimeoutConfig = DEFAULT_TIMEOUT_CONFIG,
-) -> "Response":
+) -> Response:
async with PoolManager(ssl=ssl, timeout=timeout) as pool:
return await pool.request(
method=method, url=url, headers=headers, body=body, stream=stream
headers: typing.Sequence[typing.Tuple[bytes, bytes]] = (),
body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"",
stream: bool = False,
- ) -> "Response":
- if stream:
- async def streaming_body():
- yield b"Hello, "
- yield b"world!"
- return Response(200, body=streaming_body)
- return Response(200, body=b"Hello, world!")
+ ) -> Response:
+ raise NotImplementedError()
async def close(self) -> None:
self.is_closed = True
traceback: TracebackType = None,
) -> None:
await self.close()
-
-
-class Response:
- def __init__(
- self,
- status_code: int,
- *,
- headers: typing.Sequence[typing.Tuple[bytes, bytes]] = (),
- body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"",
- on_close: typing.Callable = None,
- ):
- self.status_code = status_code
- self.headers = list(headers)
- self.on_close = on_close
- self.is_closed = False
- self.is_streamed = False
- self.decoder = IdentityDecoder()
- if isinstance(body, bytes):
- self.is_closed = True
- self.body = body
- else:
- self.body_aiter = body
-
- async def read(self) -> bytes:
- """
- Read and return the response content.
- """
- if not hasattr(self, "body"):
- body = b""
- async for part in self.stream():
- body += part
- self.body = body
- return self.body
-
- async def stream(self):
- """
- A byte-iterator over the decoded response content.
- This will allow us to handle gzip, deflate, and brotli encoded responses.
- """
- if hasattr(self, "body"):
- yield self.body
- else:
- async for chunk in self.raw():
- yield self.decoder.decode(chunk)
- yield self.decoder.flush()
-
- async def raw(self) -> typing.AsyncIterator[bytes]:
- """
- A byte-iterator over the raw response content.
- """
- if self.is_streamed:
- raise StreamConsumed()
- if self.is_closed:
- raise ResponseClosed()
- self.is_streamed = True
- async for part in self.body_aiter():
- yield part
- await self.close()
-
- async def close(self) -> None:
- if not self.is_closed:
- self.is_closed = True
- if self.on_close is not None:
- await self.on_close()
--- /dev/null
+from config import TimeoutConfig
+
+import asyncio
+import h11
+import ssl
+
+
+class Connection:
+ def __init__(self):
+ self.reader = None
+ self.writer = None
+ self.state = h11.Connection(our_role=h11.CLIENT)
+
+ async def open(self, host: str, port: int, ssl: ssl.SSLContext):
+ try:
+ self.reader, self.writer = await asyncio.wait_for(
+ asyncio.open_connection(host, port, ssl=ssl), timeout
+ )
+ except asyncio.TimeoutError:
+ raise ConnectTimeout()
+
+ async def send(self, request: Request) -> Response:
+ method = request.method
+
+ target = request.url.path
+ if request.url.query:
+ target += "?" + request.url.query
+
+ headers = [
+ ("host", request.url.netloc)
+ ] += request.headers
+
+ # Send the request method, path/query, and headers.
+ event = h11.Request(method=method, target=target, headers=headers)
+ await self._send_event(event)
+
+ # Send the request body.
+ if request.is_streaming:
+ async for data in request.raw():
+ event = h11.Data(data=data)
+ await self._send_event(event)
+ else:
+ event = h11.Data(data=request.body)
+ await self._send_event(event)
+
+ # Finalize sending the request.
+ event = h11.EndOfMessage()
+ await connection.send_event(event)
+
+ async def _send_event(self, message):
+ data = self.state.send(message)
+ self.writer.write(data)
+
+ async def _receive_event(self, timeout):
+ event = self.state.next_event()
+
+ while type(event) is h11.NEED_DATA:
+ try:
+ data = await asyncio.wait_for(self.reader.read(2048), timeout)
+ except asyncio.TimeoutError:
+ raise ReadTimeout()
+ self.state.receive_data(data)
+ event = self.state.next_event()
+
+ return event
+
+ async def close(self):
+ self.writer.close()
+ if hasattr(self.writer, "wait_closed"):
+ await self.writer.wait_closed()
--- /dev/null
+import typing
+
+from .decoders import IdentityDecoder
+from .exceptions import ResponseClosed, StreamConsumed
+
+
+class Response:
+ def __init__(
+ self,
+ status_code: int,
+ *,
+ headers: typing.Sequence[typing.Tuple[bytes, bytes]] = (),
+ body: typing.Union[bytes, typing.AsyncIterator[bytes]] = b"",
+ on_close: typing.Callable = None,
+ ):
+ self.status_code = status_code
+ self.headers = list(headers)
+ self.on_close = on_close
+ self.is_closed = False
+ self.is_streamed = False
+ self.decoder = IdentityDecoder()
+ if isinstance(body, bytes):
+ self.is_closed = True
+ self.body = body
+ else:
+ self.body_aiter = body
+
+ async def read(self) -> bytes:
+ """
+ Read and return the response content.
+ """
+ if not hasattr(self, "body"):
+ body = b""
+ async for part in self.stream():
+ body += part
+ self.body = body
+ return self.body
+
+ async def stream(self):
+ """
+ A byte-iterator over the decoded response content.
+ This will allow us to handle gzip, deflate, and brotli encoded responses.
+ """
+ if hasattr(self, "body"):
+ yield self.body
+ else:
+ async for chunk in self.raw():
+ yield self.decoder.decode(chunk)
+ yield self.decoder.flush()
+
+ async def raw(self) -> typing.AsyncIterator[bytes]:
+ """
+ A byte-iterator over the raw response content.
+ """
+ if self.is_streamed:
+ raise StreamConsumed()
+ if self.is_closed:
+ raise ResponseClosed()
+ self.is_streamed = True
+ async for part in self.body_aiter():
+ yield part
+ await self.close()
+
+ async def close(self) -> None:
+ if not self.is_closed:
+ self.is_closed = True
+ if self.on_close is not None:
+ await self.on_close()
-import pytest
-
-import httpcore
-
-
-@pytest.mark.asyncio
-async def test_request():
- response = await httpcore.request("GET", "http://example.com")
- assert response.status_code == 200
- assert response.body == b"Hello, world!"
- assert response.is_closed
-
-
-@pytest.mark.asyncio
-async def test_read_response():
- response = await httpcore.request("GET", "http://example.com")
-
- assert response.status_code == 200
- assert response.body == b"Hello, world!"
- assert response.is_closed
-
- body = await response.read()
-
- assert body == b"Hello, world!"
- assert response.body == b"Hello, world!"
- assert response.is_closed
-
-
-@pytest.mark.asyncio
-async def test_stream_response():
- response = await httpcore.request("GET", "http://example.com")
-
- assert response.status_code == 200
- assert response.body == b"Hello, world!"
- assert response.is_closed
-
- body = b''
- async for part in response.stream():
- body += part
-
- assert body == b"Hello, world!"
- assert response.body == b"Hello, world!"
- assert response.is_closed
-
-
-@pytest.mark.asyncio
-async def test_read_streaming_response():
- response = await httpcore.request("GET", "http://example.com", stream=True)
-
- assert response.status_code == 200
- assert not hasattr(response, 'body')
- assert not response.is_closed
-
- body = await response.read()
-
- assert body == b"Hello, world!"
- assert response.body == b"Hello, world!"
- assert response.is_closed
-
-
-@pytest.mark.asyncio
-async def test_stream_streaming_response():
- response = await httpcore.request("GET", "http://example.com", stream=True)
-
- assert response.status_code == 200
- assert not hasattr(response, 'body')
- assert not response.is_closed
-
- body = b''
- async for part in response.stream():
- body += part
-
- assert body == b"Hello, world!"
- assert not hasattr(response, 'body')
- assert response.is_closed
-
-
-@pytest.mark.asyncio
-async def test_cannot_read_after_stream_consumed():
- response = await httpcore.request("GET", "http://example.com", stream=True)
-
- body = b''
- async for part in response.stream():
- body += part
-
- with pytest.raises(httpcore.StreamConsumed):
- await response.read()
-
-@pytest.mark.asyncio
-async def test_cannot_read_after_response_closed():
- response = await httpcore.request("GET", "http://example.com", stream=True)
-
- await response.close()
-
- with pytest.raises(httpcore.ResponseClosed):
- await response.read()
--- /dev/null
+import pytest
+
+import httpcore
+
+
+class MockRequests(httpcore.PoolManager):
+ async def request(self, method, url, *, headers = (), body = b'', stream = False) -> httpcore.Response:
+ if stream:
+ async def streaming_body():
+ yield b"Hello, "
+ yield b"world!"
+ return httpcore.Response(200, body=streaming_body)
+ return httpcore.Response(200, body=b"Hello, world!")
+
+
+http = MockRequests()
+
+
+@pytest.mark.asyncio
+async def test_request():
+ response = await http.request("GET", "http://example.com")
+ assert response.status_code == 200
+ assert response.body == b"Hello, world!"
+ assert response.is_closed
+
+
+@pytest.mark.asyncio
+async def test_read_response():
+ response = await http.request("GET", "http://example.com")
+
+ assert response.status_code == 200
+ assert response.body == b"Hello, world!"
+ assert response.is_closed
+
+ body = await response.read()
+
+ assert body == b"Hello, world!"
+ assert response.body == b"Hello, world!"
+ assert response.is_closed
+
+
+@pytest.mark.asyncio
+async def test_stream_response():
+ response = await http.request("GET", "http://example.com")
+
+ assert response.status_code == 200
+ assert response.body == b"Hello, world!"
+ assert response.is_closed
+
+ body = b''
+ async for part in response.stream():
+ body += part
+
+ assert body == b"Hello, world!"
+ assert response.body == b"Hello, world!"
+ assert response.is_closed
+
+
+@pytest.mark.asyncio
+async def test_read_streaming_response():
+ response = await http.request("GET", "http://example.com", stream=True)
+
+ assert response.status_code == 200
+ assert not hasattr(response, 'body')
+ assert not response.is_closed
+
+ body = await response.read()
+
+ assert body == b"Hello, world!"
+ assert response.body == b"Hello, world!"
+ assert response.is_closed
+
+
+@pytest.mark.asyncio
+async def test_stream_streaming_response():
+ response = await http.request("GET", "http://example.com", stream=True)
+
+ assert response.status_code == 200
+ assert not hasattr(response, 'body')
+ assert not response.is_closed
+
+ body = b''
+ async for part in response.stream():
+ body += part
+
+ assert body == b"Hello, world!"
+ assert not hasattr(response, 'body')
+ assert response.is_closed
+
+
+@pytest.mark.asyncio
+async def test_cannot_read_after_stream_consumed():
+ response = await http.request("GET", "http://example.com", stream=True)
+
+ body = b''
+ async for part in response.stream():
+ body += part
+
+ with pytest.raises(httpcore.StreamConsumed):
+ await response.read()
+
+@pytest.mark.asyncio
+async def test_cannot_read_after_response_closed():
+ response = await http.request("GET", "http://example.com", stream=True)
+
+ await response.close()
+
+ with pytest.raises(httpcore.ResponseClosed):
+ await response.read()