]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
feat(prepare): send Close messages to deallocate when possible
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 6 Apr 2024 19:26:18 +0000 (19:26 +0000)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 8 Apr 2024 21:13:28 +0000 (21:13 +0000)
This allows for compatibility with PgBouncer, but requires libpq v17 at
least.

psycopg/psycopg/_connection_base.py
psycopg/psycopg/_preparing.py
tests/test_prepared.py

index c5a6c2c1eca35b096ea4c7e838efff73a169a751..a5fe327da513c9e436756fc1e87471df5b513fd0 100644 (file)
@@ -50,6 +50,8 @@ FATAL_ERROR = pq.ExecStatus.FATAL_ERROR
 IDLE = pq.TransactionStatus.IDLE
 INTRANS = pq.TransactionStatus.INTRANS
 
+_HAS_SEND_CLOSE = pq.__build_version__ >= 170000
+
 logger = logging.getLogger("psycopg")
 
 
@@ -467,6 +469,45 @@ class BaseConnection(Generic[Row]):
                 )
         return result
 
+    def _deallocate(self, name: Optional[bytes]) -> PQGen[None]:
+        """
+        Deallocate one, or all, prepared statement in the session.
+
+        ``name == None`` stands for DEALLOCATE ALL.
+
+        If possible, use protocol-level commands; otherwise use SQL statements.
+
+        Note that PgBouncer doesn't support DEALLOCATE name, but it supports
+        protocol-level Close from 1.21 and DEALLOCATE ALL from 1.22.
+        """
+        if name is None or not _HAS_SEND_CLOSE:
+            stmt = b"DEALLOCATE " + name if name is not None else b"DEALLOCATE ALL"
+            yield from self._exec_command(stmt)
+            return
+
+        self._check_connection_ok()
+
+        if self._pipeline:
+            cmd = partial(
+                self.pgconn.send_close_prepared,
+                name,
+            )
+            self._pipeline.command_queue.append(cmd)
+            self._pipeline.result_queue.append(None)
+            return
+
+        self.pgconn.send_close_prepared(name)
+
+        result = (yield from generators.execute(self.pgconn))[-1]
+        if result.status != COMMAND_OK:
+            if result.status == FATAL_ERROR:
+                raise e.error_from_result(result, encoding=pgconn_encoding(self.pgconn))
+            else:
+                raise e.InterfaceError(
+                    f"unexpected result {pq.ExecStatus(result.status).name}"
+                    " from sending closing prepared statement message"
+                )
+
     def _check_connection_ok(self) -> None:
         if self.pgconn.status == OK:
             return
index 2433e38aac39b4eff41f4d284f8f611a673fcc52..92385c0295d3bc2d97b8350be7f7854ad6b405d9 100644 (file)
@@ -47,7 +47,7 @@ class PrepareManager:
         # Counter to generate prepared statements names
         self._prepared_idx = 0
 
-        self._to_flush = Deque[bytes]()
+        self._to_flush = Deque[Optional[bytes]]()
 
     @staticmethod
     def key(query: PostgresQuery) -> Key:
@@ -183,7 +183,7 @@ class PrepareManager:
         if self._names:
             self._names.clear()
             self._to_flush.clear()
-            self._to_flush.append(b"ALL")
+            self._to_flush.append(None)
             return True
         else:
             return False
@@ -197,4 +197,4 @@ class PrepareManager:
         """
         while self._to_flush:
             name = self._to_flush.popleft()
-            yield from conn._exec_command(b"DEALLOCATE " + name)
+            yield from conn._deallocate(name)
index c639f8ac336b2e58771034ecdde01b2310c12385..39dff56cc5f9aaa8c128c3d2179f824ac36dfb15 100644 (file)
@@ -5,12 +5,15 @@
 Prepared statements tests
 """
 
+import logging
 import datetime as dt
 from decimal import Decimal
 
 import pytest
 
+import psycopg
 from psycopg.rows import namedtuple_row
+from psycopg.pq._debug import PGconnDebug
 
 
 @pytest.mark.parametrize("value", [None, 0, 3])
@@ -175,6 +178,27 @@ def test_evict_lru_deallocate(conn):
     assert got == [f"select {i}" for i in ["'a'", 6, 7, 8, 9]]
 
 
+@pytest.mark.skipif("psycopg._cmodule._psycopg", reason="Python-only debug conn")
+def test_deallocate_or_close(conn, caplog):
+    conn.pgconn = PGconnDebug(conn.pgconn)
+    caplog.set_level(logging.INFO, logger="psycopg.debug")
+
+    conn.set_autocommit(True)
+    conn.prepare_threshold = 0
+    conn.prepared_max = 1
+
+    conn.execute("select 1::bigint")
+    conn.execute("select 1::text")
+
+    msgs = "\n".join(rec.message for rec in caplog.records)
+    if psycopg.pq.__build_version__ >= 170000:
+        assert "PGconn.send_close_prepared" in msgs
+        assert "DEALLOCATE" not in msgs
+    else:
+        assert "PGconn.send_close_prepared" not in msgs
+        assert "DEALLOCATE" in msgs
+
+
 def test_different_types(conn):
     conn.prepare_threshold = 0
     conn.execute("select %s", [None])