.. autoattribute:: prepare_threshold
-
- Number of times a query is executed before it is prepared.
-
- If it is set to 0, every query is prepared the first time is executed.
- If it is set to `!None`, prepared statements are disabled on the
- connection.
+ :annotation: Optional[int]
See :ref:`prepared-statements` for details.
.. autoattribute:: prepared_max
-
- Maximum number of prepared statements on the connection.
+ :annotation: int
If more queries need to be prepared, old ones are deallocated__.
--- /dev/null
+"""
+Support for prepared statements
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
+from enum import IntEnum, auto
+from typing import Optional, Sequence, Tuple, TYPE_CHECKING, Union
+from collections import OrderedDict
+
+from .pq import ExecStatus
+from ._queries import PostgresQuery
+
+if TYPE_CHECKING:
+ from .pq.proto import PGresult
+
+
+class Prepare(IntEnum):
+ NO = auto()
+ YES = auto()
+ SHOULD = auto()
+
+
+class PrepareManager:
+ # Number of times a query is executed before it is prepared.
+ prepare_threshold: Optional[int] = 5
+
+ # Maximum number of prepared statements on the connection.
+ prepared_max: int = 100
+
+ def __init__(self) -> None:
+ # Number of times each query was seen in order to prepare it.
+ # Map (query, types) -> name or number of times seen
+ #
+ # Note: with this implementation we keep the tally of up to 100
+ # queries, but most likely we will prepare way less than that. We might
+ # change that if we think it would be better.
+ self._prepared: OrderedDict[
+ Tuple[bytes, Tuple[int, ...]], Union[int, bytes]
+ ] = OrderedDict()
+
+ # Counter to generate prepared statements names
+ self._prepared_idx = 0
+
+ def get(
+ self, query: PostgresQuery, prepare: Optional[bool] = None
+ ) -> Tuple[Prepare, bytes]:
+ """
+ Check if a query is prepared, tell back whether to prepare it.
+ """
+ if prepare is False or self.prepare_threshold is None:
+ # The user doesn't want this query to be prepared
+ return Prepare.NO, b""
+
+ key = (query.query, query.types)
+ value: Union[bytes, int] = self._prepared.get(key, 0)
+ if isinstance(value, bytes):
+ # The query was already prepared in this session
+ return Prepare.YES, value
+
+ if value >= self.prepare_threshold or prepare:
+ # The query has been executed enough times and needs to be prepared
+ name = f"_pg3_{self._prepared_idx}".encode("utf-8")
+ self._prepared_idx += 1
+ return Prepare.SHOULD, name
+ else:
+ # The query is not to be prepared yet
+ return Prepare.NO, b""
+
+ def maintain(
+ self,
+ query: PostgresQuery,
+ results: Sequence["PGresult"],
+ prep: Prepare,
+ name: bytes,
+ ) -> Optional[bytes]:
+ """Maintain the cache of the prepared statements."""
+ # don't do anything if prepared statements are disabled
+ if self.prepare_threshold is None:
+ return None
+
+ key = (query.query, query.types)
+
+ # If we know the query already the cache size won't change
+ # So just update the count and record as last used
+ if key in self._prepared:
+ if isinstance(self._prepared[key], int):
+ if prep is Prepare.SHOULD:
+ self._prepared[key] = name
+ else:
+ self._prepared[key] += 1 # type: ignore # operator
+ self._prepared.move_to_end(key)
+ return None
+
+ # The query is not in cache. Let's see if we must add it
+ if len(results) != 1:
+ # We cannot prepare a multiple statement
+ return None
+
+ result = results[0]
+ if (
+ result.status != ExecStatus.TUPLES_OK
+ and result.status != ExecStatus.COMMAND_OK
+ ):
+ # We don't prepare failed queries or other weird results
+ return None
+
+ # Ok, we got to the conclusion that this query is genuinely to prepare
+ self._prepared[key] = name if prep is Prepare.SHOULD else 1
+
+ # Evict an old value from the cache; if it was prepared, deallocate it
+ # Do it only once: if the cache was resized, deallocate gradually
+ if len(self._prepared) <= self.prepared_max:
+ return None
+
+ old_val = self._prepared.popitem(last=False)[1]
+ if isinstance(old_val, bytes):
+ return b"DEALLOCATE " + old_val
+ else:
+ return None
import threading
from types import TracebackType
from typing import Any, AsyncIterator, Callable, Iterator, List, NamedTuple
-from typing import Optional, Tuple, Type, TYPE_CHECKING, TypeVar, Union
+from typing import Optional, Type, TYPE_CHECKING, TypeVar
from weakref import ref, ReferenceType
from functools import partial
from contextlib import contextmanager
-from collections import OrderedDict
if sys.version_info >= (3, 7):
from contextlib import asynccontextmanager
from .conninfo import make_conninfo
from .generators import notifies
from .transaction import Transaction, AsyncTransaction
+from ._preparing import PrepareManager
logger = logging.getLogger(__name__)
package_logger = logging.getLogger("psycopg3")
cursor_factory: Type["BaseCursor[Any]"]
- # Number of times a query is executed before it is prepared.
- prepare_threshold: Optional[int] = 5
-
- # Maximum number of prepared statements on the connection.
- prepared_max = 100
-
def __init__(self, pgconn: "PGconn"):
self.pgconn = pgconn # TODO: document this
self._autocommit = False
# only a begin/commit and not a savepoint.
self._savepoints: List[str] = []
- # Number of times each query was seen in order to prepare it.
- # Map (query, types) -> name or number of times seen
- self._prepared_statements: OrderedDict[
- Tuple[bytes, Tuple[int, ...]], Union[int, bytes]
- ] = OrderedDict()
-
- # Counter to generate prepared statements names
- self._prepared_idx = 0
+ self._prepared = PrepareManager()
wself = ref(self)
for cb in self._notify_handlers:
cb(n)
+ @property
+ def prepare_threshold(self) -> Optional[int]:
+ """
+ Number of times a query is executed before it is prepared.
+
+ - If it is set to 0, every query is prepared the first time is
+ executed.
+ - If it is set to `!None`, prepared statements are disabled on the
+ connection.
+
+ Default value: 5
+ """
+ return self._prepared.prepare_threshold
+
+ @prepare_threshold.setter
+ def prepare_threshold(self, value: Optional[int]) -> None:
+ self._prepared.prepare_threshold = value
+
+ @property
+ def prepared_max(self) -> int:
+ """
+ Maximum number of prepared statements on the connection.
+
+ Default value: 100
+ """
+ return self._prepared.prepared_max
+
+ @prepared_max.setter
+ def prepared_max(self, value: int) -> None:
+ self._prepared.prepared_max = value
+
# Generators to perform high-level operations on the connection
#
# These operations are expressed in terms of non-blocking generators
# Copyright (C) 2020 The Psycopg Team
import sys
-from enum import IntEnum, auto
from types import TracebackType
from typing import Any, AsyncIterator, Callable, Generic, Iterator, List
-from typing import Optional, Sequence, Tuple, Type, TYPE_CHECKING, Union
+from typing import Optional, Sequence, Type, TYPE_CHECKING
from contextlib import contextmanager
from . import errors as e
from .proto import ConnectionType, Query, Params, DumpersMap, LoadersMap, PQGen
from ._column import Column
from ._queries import PostgresQuery
+from ._preparing import Prepare
if sys.version_info >= (3, 7):
from contextlib import asynccontextmanager
execute = generators.execute
-class Prepare(IntEnum):
- NO = auto()
- YES = auto()
- SHOULD = auto()
-
-
class BaseCursor(Generic[ConnectionType]):
ExecStatus = pq.ExecStatus
pgq = self._convert_query(query, params)
# Check if the query is prepared or needs preparing
- prep, name = self._get_prepared(pgq, prepare)
+ prep, name = self._conn._prepared.get(pgq, prepare)
if prep is Prepare.YES:
# The query is already prepared
self._send_query_prepared(name, pgq)
# Update the prepare state of the query
if prepare is not False:
- cmd = self._maintain_prepared(pgq, results, prep, name)
+ cmd = self._conn._prepared.maintain(pgq, results, prep, name)
if cmd:
yield from self._conn._exec_command(cmd)
self._execute_results(results)
- def _get_prepared(
- self, query: PostgresQuery, prepare: Optional[bool] = None
- ) -> Tuple[Prepare, bytes]:
- """
- Check if a query is prepared, tell back whether to prepare it.
- """
- conn = self._conn
- if prepare is False or conn.prepare_threshold is None:
- # The user doesn't want this query to be prepared
- return Prepare.NO, b""
-
- key = (query.query, query.types)
- value: Union[bytes, int] = conn._prepared_statements.get(key, 0)
- if isinstance(value, bytes):
- # The query was already prepared in this session
- return Prepare.YES, value
-
- if value >= conn.prepare_threshold or prepare:
- # The query has been executed enough times and needs to be prepared
- name = f"_pg3_{conn._prepared_idx}".encode("utf-8")
- conn._prepared_idx += 1
- return Prepare.SHOULD, name
- else:
- # The query is not to be prepared yet
- return Prepare.NO, b""
-
- def _maintain_prepared(
- self,
- query: PostgresQuery,
- results: Sequence["PGresult"],
- prep: Prepare,
- name: bytes,
- ) -> Optional[bytes]:
- """Maintain the cache of he prepared statements."""
- # don't do anything if prepared statements are disabled
- if self._conn.prepare_threshold is None:
- return None
-
- cache = self._conn._prepared_statements
- key = (query.query, query.types)
-
- # If we know the query already the cache size won't change
- # So just update the count and record as last used
- if key in cache:
- if isinstance(cache[key], int):
- if prep is Prepare.SHOULD:
- cache[key] = name
- else:
- cache[key] += 1 # type: ignore # operator
- cache.move_to_end(key)
- return None
-
- # The query is not in cache. Let's see if we must add it
- if len(results) != 1:
- # We cannot prepare a multiple statement
- return None
-
- result = results[0]
- if (
- result.status != ExecStatus.TUPLES_OK
- and result.status != ExecStatus.COMMAND_OK
- ):
- # We don't prepare failed queries or other weird results
- return None
-
- # Ok, we got to the conclusion that this query is genuinely to prepare
- cache[key] = name if prep is Prepare.SHOULD else 1
-
- # Evict an old value from the cache; if it was prepared, deallocate it
- # Do it only once: if the cache was resized, deallocate gradually
- if len(cache) <= self._conn.prepared_max:
- return None
-
- old_val = cache.popitem(last=False)[1]
- if isinstance(old_val, bytes):
- return b"DEALLOCATE " + old_val
- else:
- return None
-
def _executemany_gen(
self, query: Query, params_seq: Sequence[Params]
) -> PQGen[None]:
res.append(cur.fetchone()[0])
assert res == [0] * 10
- assert not conn._prepared_statements
+ assert not conn._prepared._prepared
def test_no_prepare_multi(conn):
conn.execute("select 'a'")
conn.execute(f"select {i}")
- assert len(conn._prepared_statements) == 5
- assert conn._prepared_statements[b"select 'a'", ()] == b"_pg3_0"
+ assert len(conn._prepared._prepared) == 5
+ assert conn._prepared._prepared[b"select 'a'", ()] == b"_pg3_0"
for i in [9, 8, 7, 6]:
- assert conn._prepared_statements[f"select {i}".encode("utf8"), ()] == 1
+ assert conn._prepared._prepared[f"select {i}".encode("utf8"), ()] == 1
cur = conn.execute("select statement from pg_prepared_statements")
assert cur.fetchall() == [("select 'a'",)]
conn.execute("select 'a'")
conn.execute(f"select {i}")
- assert len(conn._prepared_statements) == 5
+ assert len(conn._prepared._prepared) == 5
for i in [9, 8, 7, 6, "'a'"]:
- assert conn._prepared_statements[
+ assert conn._prepared._prepared[
f"select {i}".encode("utf8"), ()
].startswith(b"_pg3_")
res.append((await cur.fetchone())[0])
assert res == [0] * 10
- assert not aconn._prepared_statements
+ assert not aconn._prepared._prepared
async def test_no_prepare_multi(aconn):
await aconn.execute("select 'a'")
await aconn.execute(f"select {i}")
- assert len(aconn._prepared_statements) == 5
- assert aconn._prepared_statements[b"select 'a'", ()] == b"_pg3_0"
+ assert len(aconn._prepared._prepared) == 5
+ assert aconn._prepared._prepared[b"select 'a'", ()] == b"_pg3_0"
for i in [9, 8, 7, 6]:
- assert (
- aconn._prepared_statements[f"select {i}".encode("utf8"), ()] == 1
- )
+ assert aconn._prepared._prepared[f"select {i}".encode("utf8"), ()] == 1
cur = await aconn.execute("select statement from pg_prepared_statements")
assert await cur.fetchall() == [("select 'a'",)]
await aconn.execute("select 'a'")
await aconn.execute(f"select {i}")
- assert len(aconn._prepared_statements) == 5
+ assert len(aconn._prepared._prepared) == 5
for i in [9, 8, 7, 6, "'a'"]:
- assert aconn._prepared_statements[
+ assert aconn._prepared._prepared[
f"select {i}".encode("utf8"), ()
].startswith(b"_pg3_")