From: Daniele Varrazzo Date: Sat, 27 Nov 2021 22:30:53 +0000 (+0100) Subject: Make sure to close selectors after usage X-Git-Tag: 3.0.5~2^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b24352734565e4fc11ab9c811e24e76eb0299176;p=thirdparty%2Fpsycopg.git Make sure to close selectors after usage Failing to do so leaks file descriptors and may lead to "Too many open files" errors (experienced on macOS, using kqueue selector). Close #158 --- diff --git a/docs/news.rst b/docs/news.rst index 590f7c3ff..8da0b656f 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -10,6 +10,13 @@ Current release --------------- +Psycopg 3.0.5 +^^^^^^^^^^^^^ + +- Fix possible "Too many open files" OS error, reported on macOS but possible + on other platforms too (:ticket:`#158`). + + Psycopg 3.0.4 ^^^^^^^^^^^^^ diff --git a/psycopg/psycopg/waiting.py b/psycopg/psycopg/waiting.py index 3c2185464..f236102c9 100644 --- a/psycopg/psycopg/waiting.py +++ b/psycopg/psycopg/waiting.py @@ -51,17 +51,17 @@ def wait_selector( """ try: s = next(gen) - sel = DefaultSelector() - while 1: - sel.register(fileno, s) - rlist = None - while not rlist: - rlist = sel.select(timeout=timeout) - sel.unregister(fileno) - # note: this line should require a cast, but mypy doesn't complain - ready: Ready = rlist[0][1] - assert s & ready - s = gen.send(ready) + with DefaultSelector() as sel: + while 1: + sel.register(fileno, s) + rlist = None + while not rlist: + rlist = sel.select(timeout=timeout) + sel.unregister(fileno) + # note: this line should require a cast, but mypy doesn't complain + ready: Ready = rlist[0][1] + assert s & ready + s = gen.send(ready) except StopIteration as ex: rv: RV = ex.args[0] if ex.args else None @@ -85,15 +85,15 @@ def wait_conn(gen: PQGenConn[RV], timeout: Optional[float] = None) -> RV: timeout = timeout or None try: fileno, s = next(gen) - sel = DefaultSelector() - while 1: - sel.register(fileno, s) - rlist = sel.select(timeout=timeout) - sel.unregister(fileno) - if not rlist: - raise e.OperationalError("timeout expired") - ready: Ready = rlist[0][1] # type: ignore[assignment] - fileno, s = gen.send(ready) + with DefaultSelector() as sel: + while 1: + sel.register(fileno, s) + rlist = sel.select(timeout=timeout) + sel.unregister(fileno) + if not rlist: + raise e.OperationalError("timeout expired") + ready: Ready = rlist[0][1] # type: ignore[assignment] + fileno, s = gen.send(ready) except StopIteration as ex: rv: RV = ex.args[0] if ex.args else None @@ -226,23 +226,23 @@ def wait_epoll( try: s = next(gen) - epoll = select.epoll() - evmask = poll_evmasks[s] - epoll.register(fileno, evmask) - while 1: - fileevs = None - while not fileevs: - fileevs = epoll.poll(timeout) - ev = fileevs[0][1] - ready = 0 - if ev & ~select.EPOLLOUT: - ready = Ready.R - if ev & ~select.EPOLLIN: - ready |= Ready.W - assert s & ready - s = gen.send(ready) + with select.epoll() as epoll: evmask = poll_evmasks[s] - epoll.modify(fileno, evmask) + epoll.register(fileno, evmask) + while 1: + fileevs = None + while not fileevs: + fileevs = epoll.poll(timeout) + ev = fileevs[0][1] + ready = 0 + if ev & ~select.EPOLLOUT: + ready = Ready.R + if ev & ~select.EPOLLIN: + ready |= Ready.W + assert s & ready + s = gen.send(ready) + evmask = poll_evmasks[s] + epoll.modify(fileno, evmask) except StopIteration as ex: rv: RV = ex.args[0] if ex.args else None diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py index 981d8e262..a17f0f4d3 100644 --- a/tests/test_concurrency.py +++ b/tests/test_concurrency.py @@ -198,18 +198,18 @@ def test_identify_closure(dsn, retries): conn2 = psycopg.connect(dsn) try: t0 = time.time() - sel = selectors.DefaultSelector() - sel.register(conn, selectors.EVENT_READ) - t = threading.Thread(target=closer) - t.start() - try: - assert sel.select(timeout=1.0) - with pytest.raises(psycopg.OperationalError): - conn.execute("select 1") - t1 = time.time() - assert 0.3 < t1 - t0 < 0.6 - finally: - t.join() + with selectors.DefaultSelector() as sel: + sel.register(conn, selectors.EVENT_READ) + t = threading.Thread(target=closer) + t.start() + try: + assert sel.select(timeout=1.0) + with pytest.raises(psycopg.OperationalError): + conn.execute("select 1") + t1 = time.time() + assert 0.3 < t1 - t0 < 0.6 + finally: + t.join() finally: conn.close() conn2.close()