]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Make sure to close selectors after usage
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 27 Nov 2021 22:30:53 +0000 (23:30 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 28 Nov 2021 01:31:40 +0000 (02:31 +0100)
Failing to do so leaks file descriptors and may lead to "Too many open
files" errors (experienced on macOS, using kqueue selector).

Close #158

docs/news.rst
psycopg/psycopg/waiting.py
tests/test_concurrency.py

index 590f7c3ffe0d85cc29f7b3031b67209f384a812d..8da0b656ffe00cdfafce26232a9680ab81b7126c 100644 (file)
 Current release
 ---------------
 
+Psycopg 3.0.5
+^^^^^^^^^^^^^
+
+- Fix possible "Too many open files" OS error, reported on macOS but possible
+  on other platforms too (:ticket:`#158`).
+
+
 Psycopg 3.0.4
 ^^^^^^^^^^^^^
 
index 3c2185464caaa24a3c5be33ad2b7d9c17f8994c1..f236102c91f08360f0ec086f03a2204313908a16 100644 (file)
@@ -51,17 +51,17 @@ def wait_selector(
     """
     try:
         s = next(gen)
-        sel = DefaultSelector()
-        while 1:
-            sel.register(fileno, s)
-            rlist = None
-            while not rlist:
-                rlist = sel.select(timeout=timeout)
-            sel.unregister(fileno)
-            # note: this line should require a cast, but mypy doesn't complain
-            ready: Ready = rlist[0][1]
-            assert s & ready
-            s = gen.send(ready)
+        with DefaultSelector() as sel:
+            while 1:
+                sel.register(fileno, s)
+                rlist = None
+                while not rlist:
+                    rlist = sel.select(timeout=timeout)
+                sel.unregister(fileno)
+                # note: this line should require a cast, but mypy doesn't complain
+                ready: Ready = rlist[0][1]
+                assert s & ready
+                s = gen.send(ready)
 
     except StopIteration as ex:
         rv: RV = ex.args[0] if ex.args else None
@@ -85,15 +85,15 @@ def wait_conn(gen: PQGenConn[RV], timeout: Optional[float] = None) -> RV:
     timeout = timeout or None
     try:
         fileno, s = next(gen)
-        sel = DefaultSelector()
-        while 1:
-            sel.register(fileno, s)
-            rlist = sel.select(timeout=timeout)
-            sel.unregister(fileno)
-            if not rlist:
-                raise e.OperationalError("timeout expired")
-            ready: Ready = rlist[0][1]  # type: ignore[assignment]
-            fileno, s = gen.send(ready)
+        with DefaultSelector() as sel:
+            while 1:
+                sel.register(fileno, s)
+                rlist = sel.select(timeout=timeout)
+                sel.unregister(fileno)
+                if not rlist:
+                    raise e.OperationalError("timeout expired")
+                ready: Ready = rlist[0][1]  # type: ignore[assignment]
+                fileno, s = gen.send(ready)
 
     except StopIteration as ex:
         rv: RV = ex.args[0] if ex.args else None
@@ -226,23 +226,23 @@ def wait_epoll(
 
     try:
         s = next(gen)
-        epoll = select.epoll()
-        evmask = poll_evmasks[s]
-        epoll.register(fileno, evmask)
-        while 1:
-            fileevs = None
-            while not fileevs:
-                fileevs = epoll.poll(timeout)
-            ev = fileevs[0][1]
-            ready = 0
-            if ev & ~select.EPOLLOUT:
-                ready = Ready.R
-            if ev & ~select.EPOLLIN:
-                ready |= Ready.W
-            assert s & ready
-            s = gen.send(ready)
+        with select.epoll() as epoll:
             evmask = poll_evmasks[s]
-            epoll.modify(fileno, evmask)
+            epoll.register(fileno, evmask)
+            while 1:
+                fileevs = None
+                while not fileevs:
+                    fileevs = epoll.poll(timeout)
+                ev = fileevs[0][1]
+                ready = 0
+                if ev & ~select.EPOLLOUT:
+                    ready = Ready.R
+                if ev & ~select.EPOLLIN:
+                    ready |= Ready.W
+                assert s & ready
+                s = gen.send(ready)
+                evmask = poll_evmasks[s]
+                epoll.modify(fileno, evmask)
 
     except StopIteration as ex:
         rv: RV = ex.args[0] if ex.args else None
index 981d8e2627d5298808a3223dbb6e96b4b56147c8..a17f0f4d32b4d87b24d94e48417607331c9430e0 100644 (file)
@@ -198,18 +198,18 @@ def test_identify_closure(dsn, retries):
             conn2 = psycopg.connect(dsn)
             try:
                 t0 = time.time()
-                sel = selectors.DefaultSelector()
-                sel.register(conn, selectors.EVENT_READ)
-                t = threading.Thread(target=closer)
-                t.start()
-                try:
-                    assert sel.select(timeout=1.0)
-                    with pytest.raises(psycopg.OperationalError):
-                        conn.execute("select 1")
-                    t1 = time.time()
-                    assert 0.3 < t1 - t0 < 0.6
-                finally:
-                    t.join()
+                with selectors.DefaultSelector() as sel:
+                    sel.register(conn, selectors.EVENT_READ)
+                    t = threading.Thread(target=closer)
+                    t.start()
+                    try:
+                        assert sel.select(timeout=1.0)
+                        with pytest.raises(psycopg.OperationalError):
+                            conn.execute("select 1")
+                        t1 = time.time()
+                        assert 0.3 < t1 - t0 < 0.6
+                    finally:
+                        t.join()
             finally:
                 conn.close()
                 conn2.close()