From b24352734565e4fc11ab9c811e24e76eb0299176 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 27 Nov 2021 23:30:53 +0100 Subject: [PATCH] Make sure to close selectors after usage Failing to do so leaks file descriptors and may lead to "Too many open files" errors (experienced on macOS, using kqueue selector). Close #158 --- docs/news.rst | 7 ++++ psycopg/psycopg/waiting.py | 72 +++++++++++++++++++------------------- tests/test_concurrency.py | 24 ++++++------- 3 files changed, 55 insertions(+), 48 deletions(-) diff --git a/docs/news.rst b/docs/news.rst index 590f7c3ff..8da0b656f 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -10,6 +10,13 @@ Current release --------------- +Psycopg 3.0.5 +^^^^^^^^^^^^^ + +- Fix possible "Too many open files" OS error, reported on macOS but possible + on other platforms too (:ticket:`#158`). + + Psycopg 3.0.4 ^^^^^^^^^^^^^ diff --git a/psycopg/psycopg/waiting.py b/psycopg/psycopg/waiting.py index 3c2185464..f236102c9 100644 --- a/psycopg/psycopg/waiting.py +++ b/psycopg/psycopg/waiting.py @@ -51,17 +51,17 @@ def wait_selector( """ try: s = next(gen) - sel = DefaultSelector() - while 1: - sel.register(fileno, s) - rlist = None - while not rlist: - rlist = sel.select(timeout=timeout) - sel.unregister(fileno) - # note: this line should require a cast, but mypy doesn't complain - ready: Ready = rlist[0][1] - assert s & ready - s = gen.send(ready) + with DefaultSelector() as sel: + while 1: + sel.register(fileno, s) + rlist = None + while not rlist: + rlist = sel.select(timeout=timeout) + sel.unregister(fileno) + # note: this line should require a cast, but mypy doesn't complain + ready: Ready = rlist[0][1] + assert s & ready + s = gen.send(ready) except StopIteration as ex: rv: RV = ex.args[0] if ex.args else None @@ -85,15 +85,15 @@ def wait_conn(gen: PQGenConn[RV], timeout: Optional[float] = None) -> RV: timeout = timeout or None try: fileno, s = next(gen) - sel = DefaultSelector() - while 1: - sel.register(fileno, s) - rlist = sel.select(timeout=timeout) - sel.unregister(fileno) - if not rlist: - raise e.OperationalError("timeout expired") - ready: Ready = rlist[0][1] # type: ignore[assignment] - fileno, s = gen.send(ready) + with DefaultSelector() as sel: + while 1: + sel.register(fileno, s) + rlist = sel.select(timeout=timeout) + sel.unregister(fileno) + if not rlist: + raise e.OperationalError("timeout expired") + ready: Ready = rlist[0][1] # type: ignore[assignment] + fileno, s = gen.send(ready) except StopIteration as ex: rv: RV = ex.args[0] if ex.args else None @@ -226,23 +226,23 @@ def wait_epoll( try: s = next(gen) - epoll = select.epoll() - evmask = poll_evmasks[s] - epoll.register(fileno, evmask) - while 1: - fileevs = None - while not fileevs: - fileevs = epoll.poll(timeout) - ev = fileevs[0][1] - ready = 0 - if ev & ~select.EPOLLOUT: - ready = Ready.R - if ev & ~select.EPOLLIN: - ready |= Ready.W - assert s & ready - s = gen.send(ready) + with select.epoll() as epoll: evmask = poll_evmasks[s] - epoll.modify(fileno, evmask) + epoll.register(fileno, evmask) + while 1: + fileevs = None + while not fileevs: + fileevs = epoll.poll(timeout) + ev = fileevs[0][1] + ready = 0 + if ev & ~select.EPOLLOUT: + ready = Ready.R + if ev & ~select.EPOLLIN: + ready |= Ready.W + assert s & ready + s = gen.send(ready) + evmask = poll_evmasks[s] + epoll.modify(fileno, evmask) except StopIteration as ex: rv: RV = ex.args[0] if ex.args else None diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py index 981d8e262..a17f0f4d3 100644 --- a/tests/test_concurrency.py +++ b/tests/test_concurrency.py @@ -198,18 +198,18 @@ def test_identify_closure(dsn, retries): conn2 = psycopg.connect(dsn) try: t0 = time.time() - sel = selectors.DefaultSelector() - sel.register(conn, selectors.EVENT_READ) - t = threading.Thread(target=closer) - t.start() - try: - assert sel.select(timeout=1.0) - with pytest.raises(psycopg.OperationalError): - conn.execute("select 1") - t1 = time.time() - assert 0.3 < t1 - t0 < 0.6 - finally: - t.join() + with selectors.DefaultSelector() as sel: + sel.register(conn, selectors.EVENT_READ) + t = threading.Thread(target=closer) + t.start() + try: + assert sel.select(timeout=1.0) + with pytest.raises(psycopg.OperationalError): + conn.execute("select 1") + t1 = time.time() + assert 0.3 < t1 - t0 < 0.6 + finally: + t.join() finally: conn.close() conn2.close() -- 2.47.2