From: Daniele Varrazzo Date: Thu, 30 Nov 2023 09:05:33 +0000 (+0100) Subject: fix: ensure gevent collaboration X-Git-Tag: 3.1.14~2^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ada18239db60275d35c9b346e53891b9691845f3;p=thirdparty%2Fpsycopg.git fix: ensure gevent collaboration Close #527 --- diff --git a/docs/advanced/async.rst b/docs/advanced/async.rst index 3620ab681..f81c1a23b 100644 --- a/docs/advanced/async.rst +++ b/docs/advanced/async.rst @@ -139,6 +139,31 @@ emulate what normally happens with blocking connections, you can use .. __: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.add_signal_handler +.. index:: gevent + +.. _gevent: + +Gevent support +-------------- + +Psycopg 3 supports `gevent `__ out of the box. If the +`socket` module is found patched by functions such as +`gevent.monkey.patch_select()`__ or `patch_all()`__, psycopg will behave in a +collaborative way. + +Unlike with `!psycopg2`, using the `!psycogreen` module is not required. + +.. __: http://www.gevent.org/api/gevent.monkey.html#gevent.monkey.patch_select +.. __: http://www.gevent.org/api/gevent.monkey.html#gevent.monkey.patch_all + +.. warning:: + + gevent support was initially accidental, and was accidentally broken in + psycopg 3.1.4. + + gevent is officially supported only starting from psycopg 3.1.14. + + .. index:: pair: Asynchronous; Notifications pair: LISTEN; SQL command diff --git a/docs/news.rst b/docs/news.rst index 3824a73f7..814c244bc 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -7,6 +7,17 @@ ``psycopg`` release notes ========================= +Future releases +--------------- + +Psycopg 3.1.14 (unreleased) +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- Fix :ref:`interaction with gevent ` (:ticket:`#527`). + +.. _gevent: https://www.gevent.org/ + + Current release --------------- diff --git a/psycopg/psycopg/waiting.py b/psycopg/psycopg/waiting.py index 3416633b9..56ff562f1 100644 --- a/psycopg/psycopg/waiting.py +++ b/psycopg/psycopg/waiting.py @@ -12,6 +12,7 @@ These functions are designed to consume the generators returned by the import os import sys import select +import logging import selectors from typing import Optional from asyncio import get_event_loop, wait_for, Event, TimeoutError @@ -29,6 +30,8 @@ READY_R = Ready.R READY_W = Ready.W READY_RW = Ready.RW +logger = logging.getLogger(__name__) + def wait_selector(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV: """ @@ -353,6 +356,27 @@ def wait_poll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> R return rv +def _is_select_patched() -> bool: + """ + Detect if some greenlet library has patched the select library. + + If this is the case, avoid to use the wait_c function as it doesn't behave + in a collaborative way. + + Currently supported: gevent. + """ + # If not imported, don't import it. + m = sys.modules.get("gevent.monkey") + if m: + try: + if m.is_module_patched("select"): + return True + except Exception as ex: + logger.warning("failed to detect gevent monkey-patching: %s", ex) + + return False + + if _psycopg: wait_c = _psycopg.wait_c @@ -377,7 +401,7 @@ if "PSYCOPG_WAIT_FUNC" in os.environ: # On Windows, for the moment, avoid using wait_c, because it was reported to # use excessive CPU (see #645). # TODO: investigate why. -elif _psycopg and sys.platform != "win32": +elif _psycopg and sys.platform != "win32" and not _is_select_patched(): wait = wait_c elif selectors.DefaultSelector is getattr(selectors, "SelectSelector", None): diff --git a/tests/test_gevent.py b/tests/test_gevent.py new file mode 100644 index 000000000..99027aee3 --- /dev/null +++ b/tests/test_gevent.py @@ -0,0 +1,82 @@ +import sys +import json +import subprocess as sp + +import pytest +import psycopg + +pytest.importorskip("gevent") + + +@pytest.mark.slow +@pytest.mark.timing +def test_gevent(dsn): + TICK = 0.1 + script = f"""\ +import gevent.monkey +gevent.monkey.patch_all() + +import json +import time +import gevent +import psycopg + +TICK = {TICK!r} +dts = [] +queried = False + +def ticker(): + t0 = time.time() + for i in range(5): + time.sleep(TICK) + t = time.time() + dts.append(t - t0) + t0 = t + +def querier(): + time.sleep(TICK * 2) + with psycopg.connect({dsn!r}) as conn: + conn.execute("select pg_sleep(0.3)") + + global queried + queried = True + +jobs = [gevent.spawn(ticker), gevent.spawn(querier)] +gevent.joinall(jobs, timeout=3) +print(json.dumps(dts)) +""" + cmdline = [sys.executable, "-c", script] + rv = sp.run(cmdline, check=True, text=True, stdout=sp.PIPE) + dts = json.loads(rv.stdout) + + for dt in dts: + assert TICK <= dt < TICK * 1.1 + + +@pytest.mark.skipif("not psycopg._cmodule._psycopg") +def test_patched_dont_use_wait_c(): + if psycopg.waiting.wait is not psycopg.waiting.wait_c: + pytest.skip("wait_c not normally in use") + + script = """ +import gevent.monkey +gevent.monkey.patch_all() + +import psycopg +assert psycopg.waiting.wait is not psycopg.waiting.wait_c +""" + sp.check_call([sys.executable, "-c", script]) + + +@pytest.mark.skipif("not psycopg._cmodule._psycopg") +def test_unpatched_still_use_wait_c(): + if psycopg.waiting.wait is not psycopg.waiting.wait_c: + pytest.skip("wait_c not normally in use") + + script = """ +import gevent.monkey + +import psycopg +assert psycopg.waiting.wait is psycopg.waiting.wait_c +""" + sp.check_call([sys.executable, "-c", script])