]> git.ipfire.org Git - thirdparty/httpx.git/commitdiff
Close outstanding connections on pool.close()
authorTom Christie <tom@tomchristie.com>
Wed, 24 Apr 2019 10:42:45 +0000 (11:42 +0100)
committerTom Christie <tom@tomchristie.com>
Wed, 24 Apr 2019 10:42:45 +0000 (11:42 +0100)
.travis.yml [new file with mode: 0644]
httpcore/__init__.py
httpcore/compat.py [deleted file]
httpcore/connectionpool.py
httpcore/exceptions.py
httpcore/http11.py
httpcore/sync.py
requirements.txt

diff --git a/.travis.yml b/.travis.yml
new file mode 100644 (file)
index 0000000..5554990
--- /dev/null
@@ -0,0 +1,19 @@
+sudo: required
+dist: xenial
+language: python
+
+cache: pip
+
+python:
+    - "3.6"
+    - "3.7"
+
+install:
+    - pip install -U -r requirements.txt
+
+script:
+    - scripts/test
+
+after_script:
+    - pip install codecov
+    - codecov
index c68510035f28551692d7cb2284b8b661b23ffaf3..9824e854e54223571979e8ac7ffcbf3606be02e4 100644 (file)
@@ -2,9 +2,9 @@ from .config import PoolLimits, SSLConfig, TimeoutConfig
 from .connectionpool import ConnectionPool
 from .datastructures import URL, Origin, Request, Response
 from .exceptions import (
-    BadResponse,
     ConnectTimeout,
     PoolTimeout,
+    ProtocolError,
     ReadTimeout,
     ResponseClosed,
     StreamConsumed,
@@ -13,4 +13,4 @@ from .exceptions import (
 from .http11 import HTTP11Connection
 from .sync import SyncClient, SyncConnectionPool
 
-__version__ = "0.2.0"
+__version__ = "0.2.1"
diff --git a/httpcore/compat.py b/httpcore/compat.py
deleted file mode 100644 (file)
index 35c31d6..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-import asyncio
-
-if hasattr(asyncio, "run"):
-    asyncio_run = asyncio.run
-
-else:  # pragma: nocover
-
-    def asyncio_run(main, *, debug=False):  # type: ignore
-        if asyncio._get_running_loop() is not None:
-            raise RuntimeError(
-                "asyncio.run() cannot be called from a running event loop"
-            )
-
-        if not asyncio.iscoroutine(main):
-            raise ValueError("a coroutine was expected, got {!r}".format(main))
-
-        loop = asyncio.new_event_loop()
-        try:
-            asyncio.set_event_loop(loop)
-            loop.set_debug(debug)
-            return loop.run_until_complete(main)
-        finally:
-            try:
-                _cancel_all_tasks(loop)
-                loop.run_until_complete(loop.shutdown_asyncgens())
-            finally:
-                asyncio.set_event_loop(None)
-                loop.close()
-
-    def _cancel_all_tasks(loop):  # type: ignore
-        to_cancel = asyncio.all_tasks(loop)
-        if not to_cancel:
-            return
-
-        for task in to_cancel:
-            task.cancel()
-
-        loop.run_until_complete(
-            tasks.gather(*to_cancel, loop=loop, return_exceptions=True)
-        )
-
-        for task in to_cancel:
-            if task.cancelled():
-                continue
-            if task.exception() is not None:
-                loop.call_exception_handler(
-                    {
-                        "message": "unhandled exception during asyncio.run() shutdown",
-                        "exception": task.exception(),
-                        "task": task,
-                    }
-                )
index d815919b6a02f3b6b1c9f1b7be0fd2b860901ed2..a2040dbc410ca6671bfdcec645d16fefa13d0101 100644 (file)
@@ -10,9 +10,9 @@ from .config import (
     SSLConfig,
     TimeoutConfig,
 )
-from .http11 import HTTP11Connection
 from .datastructures import Client, Origin, Request, Response
 from .exceptions import PoolTimeout
+from .http11 import HTTP11Connection
 
 
 class ConnectionPool(Client):
@@ -102,6 +102,12 @@ class ConnectionPool(Client):
 
     async def close(self) -> None:
         self.is_closed = True
+        all_connections = []
+        for connections in self._keepalive_connections.values():
+            all_connections.extend(list(connections))
+        self._keepalive_connections.clear()
+        for connection in all_connections:
+            await connection.close()
 
 
 class ConnectionSemaphore:
index 69892afea7f6cdc8497cc5ca6944bc698881fed4..30814332c3732b6e2bddd55aaf25f28f71b5501d 100644 (file)
@@ -22,9 +22,9 @@ class PoolTimeout(Timeout):
     """
 
 
-class BadResponse(Exception):
+class ProtocolError(Exception):
     """
-    A malformed HTTP response.
+    Malformed HTTP.
     """
 
 
index 23cc27ceca14e3130148a8aa9fa6fd17b89101c2..7c38c6032f9e0f8f9b079089c26e77a524a9cc66 100644 (file)
@@ -161,3 +161,4 @@ class HTTP11Connection(Client):
 
         if self._writer is not None:
             self._writer.close()
+            await self._writer.wait_closed()
index 8ca8b189677b815562977f5dfbcfe4591510cb74..1d560faf91b2c31498d13071221d5b989f73e4a6 100644 (file)
@@ -1,15 +1,16 @@
+import asyncio
 import typing
 from types import TracebackType
 
-from .compat import asyncio_run
 from .config import SSLConfig, TimeoutConfig
 from .connectionpool import ConnectionPool
 from .datastructures import URL, Client, Response
 
 
 class SyncResponse:
-    def __init__(self, response: Response):
+    def __init__(self, response: Response, loop: asyncio.AbstractEventLoop):
         self._response = response
+        self._loop = loop
 
     @property
     def status_code(self) -> int:
@@ -28,23 +29,24 @@ class SyncResponse:
         return self._response.body
 
     def read(self) -> bytes:
-        return asyncio_run(self._response.read())
+        return self._loop.run_until_complete(self._response.read())
 
     def stream(self) -> typing.Iterator[bytes]:
         inner = self._response.stream()
         while True:
             try:
-                yield asyncio_run(inner.__anext__())
+                yield self._loop.run_until_complete(inner.__anext__())
             except StopAsyncIteration as exc:
                 break
 
     def close(self) -> None:
-        return asyncio_run(self._response.close())
+        return self._loop.run_until_complete(self._response.close())
 
 
 class SyncClient:
     def __init__(self, client: Client):
         self._client = client
+        self._loop = asyncio.new_event_loop()
 
     def request(
         self,
@@ -57,7 +59,7 @@ class SyncClient:
         timeout: typing.Optional[TimeoutConfig] = None,
         stream: bool = False,
     ) -> SyncResponse:
-        response = asyncio_run(
+        response = self._loop.run_until_complete(
             self._client.request(
                 method,
                 url,
@@ -68,10 +70,10 @@ class SyncClient:
                 stream=stream,
             )
         )
-        return SyncResponse(response)
+        return SyncResponse(response, self._loop)
 
     def close(self) -> None:
-        asyncio_run(self._client.close())
+        self._loop.run_until_complete(self._client.close())
 
     def __enter__(self) -> "SyncClient":
         return self
index 563558e9e268e9d619011a7e9d2ba254504aabe8..5108a8d6d38d4f9c2116fd33c917c95c5c3f74ba 100644 (file)
@@ -8,7 +8,6 @@ brotlipy
 # Testing
 autoflake
 black
-codecov
 isort
 mypy
 pytest