IDLE = pq.TransactionStatus.IDLE
INTRANS = pq.TransactionStatus.INTRANS
+_HAS_SEND_CLOSE = pq.__build_version__ >= 170000
+
logger = logging.getLogger("psycopg")
)
return result
+ def _deallocate(self, name: Optional[bytes]) -> PQGen[None]:
+ """
+ Deallocate one, or all, prepared statement in the session.
+
+ ``name == None`` stands for DEALLOCATE ALL.
+
+ If possible, use protocol-level commands; otherwise use SQL statements.
+
+ Note that PgBouncer doesn't support DEALLOCATE name, but it supports
+ protocol-level Close from 1.21 and DEALLOCATE ALL from 1.22.
+ """
+ if name is None or not _HAS_SEND_CLOSE:
+ stmt = b"DEALLOCATE " + name if name is not None else b"DEALLOCATE ALL"
+ yield from self._exec_command(stmt)
+ return
+
+ self._check_connection_ok()
+
+ if self._pipeline:
+ cmd = partial(
+ self.pgconn.send_close_prepared,
+ name,
+ )
+ self._pipeline.command_queue.append(cmd)
+ self._pipeline.result_queue.append(None)
+ return
+
+ self.pgconn.send_close_prepared(name)
+
+ result = (yield from generators.execute(self.pgconn))[-1]
+ if result.status != COMMAND_OK:
+ if result.status == FATAL_ERROR:
+ raise e.error_from_result(result, encoding=pgconn_encoding(self.pgconn))
+ else:
+ raise e.InterfaceError(
+ f"unexpected result {pq.ExecStatus(result.status).name}"
+ " from sending closing prepared statement message"
+ )
+
def _check_connection_ok(self) -> None:
if self.pgconn.status == OK:
return
# Counter to generate prepared statements names
self._prepared_idx = 0
- self._to_flush = Deque[bytes]()
+ self._to_flush = Deque[Optional[bytes]]()
@staticmethod
def key(query: PostgresQuery) -> Key:
if self._names:
self._names.clear()
self._to_flush.clear()
- self._to_flush.append(b"ALL")
+ self._to_flush.append(None)
return True
else:
return False
"""
while self._to_flush:
name = self._to_flush.popleft()
- yield from conn._exec_command(b"DEALLOCATE " + name)
+ yield from conn._deallocate(name)
Prepared statements tests
"""
+import logging
import datetime as dt
from decimal import Decimal
import pytest
+import psycopg
from psycopg.rows import namedtuple_row
+from psycopg.pq._debug import PGconnDebug
@pytest.mark.parametrize("value", [None, 0, 3])
assert got == [f"select {i}" for i in ["'a'", 6, 7, 8, 9]]
+@pytest.mark.skipif("psycopg._cmodule._psycopg", reason="Python-only debug conn")
+def test_deallocate_or_close(conn, caplog):
+ conn.pgconn = PGconnDebug(conn.pgconn)
+ caplog.set_level(logging.INFO, logger="psycopg.debug")
+
+ conn.set_autocommit(True)
+ conn.prepare_threshold = 0
+ conn.prepared_max = 1
+
+ conn.execute("select 1::bigint")
+ conn.execute("select 1::text")
+
+ msgs = "\n".join(rec.message for rec in caplog.records)
+ if psycopg.pq.__build_version__ >= 170000:
+ assert "PGconn.send_close_prepared" in msgs
+ assert "DEALLOCATE" not in msgs
+ else:
+ assert "PGconn.send_close_prepared" not in msgs
+ assert "DEALLOCATE" in msgs
+
+
def test_different_types(conn):
conn.prepare_threshold = 0
conn.execute("select %s", [None])