]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: ensure gevent collaboration
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 30 Nov 2023 09:05:33 +0000 (10:05 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 1 Dec 2023 12:56:18 +0000 (13:56 +0100)
Close #527

docs/advanced/async.rst
docs/news.rst
psycopg/psycopg/waiting.py
tests/test_gevent.py [new file with mode: 0644]

index 3620ab6814118a50f3d9b2a44a8661f5179ba62b..f81c1a23b8de150f3bc77eb2764166e917ee34a7 100644 (file)
@@ -139,6 +139,31 @@ emulate what normally happens with blocking connections, you can use
 .. __: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.add_signal_handler
 
 
+.. index:: gevent
+
+.. _gevent:
+
+Gevent support
+--------------
+
+Psycopg 3 supports `gevent <https://www.gevent.org/>`__ out of the box. If the
+`socket` module is found patched by functions such as
+`gevent.monkey.patch_select()`__ or `patch_all()`__, psycopg will behave in a
+collaborative way.
+
+Unlike with `!psycopg2`, using the `!psycogreen` module is not required.
+
+.. __: http://www.gevent.org/api/gevent.monkey.html#gevent.monkey.patch_select
+.. __: http://www.gevent.org/api/gevent.monkey.html#gevent.monkey.patch_all
+
+.. warning::
+
+    gevent support was initially accidental, and was accidentally broken in
+    psycopg 3.1.4.
+
+    gevent is officially supported only starting from psycopg 3.1.14.
+
+
 .. index::
     pair: Asynchronous; Notifications
     pair: LISTEN; SQL command
index 3824a73f7f37c949eb3fe78556cddaec9d16121b..814c244bc35e314ff75f88287ba6f29baec78064 100644 (file)
@@ -7,6 +7,17 @@
 ``psycopg`` release notes
 =========================
 
+Future releases
+---------------
+
+Psycopg 3.1.14 (unreleased)
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Fix :ref:`interaction with gevent <gevent>` (:ticket:`#527`).
+
+.. _gevent: https://www.gevent.org/
+
+
 Current release
 ---------------
 
index 3416633b9190adf0f92079dee1a8fe76ac5e038a..56ff562f1224a518889190ca6edbf3b52d0d1eef 100644 (file)
@@ -12,6 +12,7 @@ These functions are designed to consume the generators returned by the
 import os
 import sys
 import select
+import logging
 import selectors
 from typing import Optional
 from asyncio import get_event_loop, wait_for, Event, TimeoutError
@@ -29,6 +30,8 @@ READY_R = Ready.R
 READY_W = Ready.W
 READY_RW = Ready.RW
 
+logger = logging.getLogger(__name__)
+
 
 def wait_selector(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
     """
@@ -353,6 +356,27 @@ def wait_poll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> R
         return rv
 
 
+def _is_select_patched() -> bool:
+    """
+    Detect if some greenlet library has patched the select library.
+
+    If this is the case, avoid to use the wait_c function as it doesn't behave
+    in a collaborative way.
+
+    Currently supported: gevent.
+    """
+    # If not imported, don't import it.
+    m = sys.modules.get("gevent.monkey")
+    if m:
+        try:
+            if m.is_module_patched("select"):
+                return True
+        except Exception as ex:
+            logger.warning("failed to detect gevent monkey-patching: %s", ex)
+
+    return False
+
+
 if _psycopg:
     wait_c = _psycopg.wait_c
 
@@ -377,7 +401,7 @@ if "PSYCOPG_WAIT_FUNC" in os.environ:
 # On Windows, for the moment, avoid using wait_c, because it was reported to
 # use excessive CPU (see #645).
 # TODO: investigate why.
-elif _psycopg and sys.platform != "win32":
+elif _psycopg and sys.platform != "win32" and not _is_select_patched():
     wait = wait_c
 
 elif selectors.DefaultSelector is getattr(selectors, "SelectSelector", None):
diff --git a/tests/test_gevent.py b/tests/test_gevent.py
new file mode 100644 (file)
index 0000000..99027ae
--- /dev/null
@@ -0,0 +1,82 @@
+import sys
+import json
+import subprocess as sp
+
+import pytest
+import psycopg
+
+pytest.importorskip("gevent")
+
+
+@pytest.mark.slow
+@pytest.mark.timing
+def test_gevent(dsn):
+    TICK = 0.1
+    script = f"""\
+import gevent.monkey
+gevent.monkey.patch_all()
+
+import json
+import time
+import gevent
+import psycopg
+
+TICK = {TICK!r}
+dts = []
+queried = False
+
+def ticker():
+    t0 = time.time()
+    for i in range(5):
+        time.sleep(TICK)
+        t = time.time()
+        dts.append(t - t0)
+        t0 = t
+
+def querier():
+    time.sleep(TICK * 2)
+    with psycopg.connect({dsn!r}) as conn:
+        conn.execute("select pg_sleep(0.3)")
+
+    global queried
+    queried = True
+
+jobs = [gevent.spawn(ticker), gevent.spawn(querier)]
+gevent.joinall(jobs, timeout=3)
+print(json.dumps(dts))
+"""
+    cmdline = [sys.executable, "-c", script]
+    rv = sp.run(cmdline, check=True, text=True, stdout=sp.PIPE)
+    dts = json.loads(rv.stdout)
+
+    for dt in dts:
+        assert TICK <= dt < TICK * 1.1
+
+
+@pytest.mark.skipif("not psycopg._cmodule._psycopg")
+def test_patched_dont_use_wait_c():
+    if psycopg.waiting.wait is not psycopg.waiting.wait_c:
+        pytest.skip("wait_c not normally in use")
+
+    script = """
+import gevent.monkey
+gevent.monkey.patch_all()
+
+import psycopg
+assert psycopg.waiting.wait is not psycopg.waiting.wait_c
+"""
+    sp.check_call([sys.executable, "-c", script])
+
+
+@pytest.mark.skipif("not psycopg._cmodule._psycopg")
+def test_unpatched_still_use_wait_c():
+    if psycopg.waiting.wait is not psycopg.waiting.wait_c:
+        pytest.skip("wait_c not normally in use")
+
+    script = """
+import gevent.monkey
+
+import psycopg
+assert psycopg.waiting.wait is psycopg.waiting.wait_c
+"""
+    sp.check_call([sys.executable, "-c", script])