From: Daniele Varrazzo Date: Fri, 25 Dec 2020 17:43:56 +0000 (+0100) Subject: Prepared statements management moved to a purpose class X-Git-Tag: 3.0.dev0~247 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1d1960006e6588aff193576c3f872c3469f98d40;p=thirdparty%2Fpsycopg.git Prepared statements management moved to a purpose class --- diff --git a/docs/connection.rst b/docs/connection.rst index d1d325095..567461cf6 100644 --- a/docs/connection.rst +++ b/docs/connection.rst @@ -140,19 +140,13 @@ The `!Connection` class .. autoattribute:: prepare_threshold - - Number of times a query is executed before it is prepared. - - If it is set to 0, every query is prepared the first time is executed. - If it is set to `!None`, prepared statements are disabled on the - connection. + :annotation: Optional[int] See :ref:`prepared-statements` for details. .. autoattribute:: prepared_max - - Maximum number of prepared statements on the connection. + :annotation: int If more queries need to be prepared, old ones are deallocated__. diff --git a/psycopg3/psycopg3/_preparing.py b/psycopg3/psycopg3/_preparing.py new file mode 100644 index 000000000..af2603cf7 --- /dev/null +++ b/psycopg3/psycopg3/_preparing.py @@ -0,0 +1,120 @@ +""" +Support for prepared statements +""" + +# Copyright (C) 2020 The Psycopg Team + +from enum import IntEnum, auto +from typing import Optional, Sequence, Tuple, TYPE_CHECKING, Union +from collections import OrderedDict + +from .pq import ExecStatus +from ._queries import PostgresQuery + +if TYPE_CHECKING: + from .pq.proto import PGresult + + +class Prepare(IntEnum): + NO = auto() + YES = auto() + SHOULD = auto() + + +class PrepareManager: + # Number of times a query is executed before it is prepared. + prepare_threshold: Optional[int] = 5 + + # Maximum number of prepared statements on the connection. + prepared_max: int = 100 + + def __init__(self) -> None: + # Number of times each query was seen in order to prepare it. + # Map (query, types) -> name or number of times seen + # + # Note: with this implementation we keep the tally of up to 100 + # queries, but most likely we will prepare way less than that. We might + # change that if we think it would be better. + self._prepared: OrderedDict[ + Tuple[bytes, Tuple[int, ...]], Union[int, bytes] + ] = OrderedDict() + + # Counter to generate prepared statements names + self._prepared_idx = 0 + + def get( + self, query: PostgresQuery, prepare: Optional[bool] = None + ) -> Tuple[Prepare, bytes]: + """ + Check if a query is prepared, tell back whether to prepare it. + """ + if prepare is False or self.prepare_threshold is None: + # The user doesn't want this query to be prepared + return Prepare.NO, b"" + + key = (query.query, query.types) + value: Union[bytes, int] = self._prepared.get(key, 0) + if isinstance(value, bytes): + # The query was already prepared in this session + return Prepare.YES, value + + if value >= self.prepare_threshold or prepare: + # The query has been executed enough times and needs to be prepared + name = f"_pg3_{self._prepared_idx}".encode("utf-8") + self._prepared_idx += 1 + return Prepare.SHOULD, name + else: + # The query is not to be prepared yet + return Prepare.NO, b"" + + def maintain( + self, + query: PostgresQuery, + results: Sequence["PGresult"], + prep: Prepare, + name: bytes, + ) -> Optional[bytes]: + """Maintain the cache of the prepared statements.""" + # don't do anything if prepared statements are disabled + if self.prepare_threshold is None: + return None + + key = (query.query, query.types) + + # If we know the query already the cache size won't change + # So just update the count and record as last used + if key in self._prepared: + if isinstance(self._prepared[key], int): + if prep is Prepare.SHOULD: + self._prepared[key] = name + else: + self._prepared[key] += 1 # type: ignore # operator + self._prepared.move_to_end(key) + return None + + # The query is not in cache. Let's see if we must add it + if len(results) != 1: + # We cannot prepare a multiple statement + return None + + result = results[0] + if ( + result.status != ExecStatus.TUPLES_OK + and result.status != ExecStatus.COMMAND_OK + ): + # We don't prepare failed queries or other weird results + return None + + # Ok, we got to the conclusion that this query is genuinely to prepare + self._prepared[key] = name if prep is Prepare.SHOULD else 1 + + # Evict an old value from the cache; if it was prepared, deallocate it + # Do it only once: if the cache was resized, deallocate gradually + if len(self._prepared) <= self.prepared_max: + return None + + old_val = self._prepared.popitem(last=False)[1] + if isinstance(old_val, bytes): + return b"DEALLOCATE " + old_val + else: + return None diff --git a/psycopg3/psycopg3/connection.py b/psycopg3/psycopg3/connection.py index b41e93f9b..ee40d6b7d 100644 --- a/psycopg3/psycopg3/connection.py +++ b/psycopg3/psycopg3/connection.py @@ -10,11 +10,10 @@ import logging import threading from types import TracebackType from typing import Any, AsyncIterator, Callable, Iterator, List, NamedTuple -from typing import Optional, Tuple, Type, TYPE_CHECKING, TypeVar, Union +from typing import Optional, Type, TYPE_CHECKING, TypeVar from weakref import ref, ReferenceType from functools import partial from contextlib import contextmanager -from collections import OrderedDict if sys.version_info >= (3, 7): from contextlib import asynccontextmanager @@ -32,6 +31,7 @@ from .proto import DumpersMap, LoadersMap, PQGen, PQGenConn, RV, Query, Params from .conninfo import make_conninfo from .generators import notifies from .transaction import Transaction, AsyncTransaction +from ._preparing import PrepareManager logger = logging.getLogger(__name__) package_logger = logging.getLogger("psycopg3") @@ -102,12 +102,6 @@ class BaseConnection: cursor_factory: Type["BaseCursor[Any]"] - # Number of times a query is executed before it is prepared. - prepare_threshold: Optional[int] = 5 - - # Maximum number of prepared statements on the connection. - prepared_max = 100 - def __init__(self, pgconn: "PGconn"): self.pgconn = pgconn # TODO: document this self._autocommit = False @@ -121,14 +115,7 @@ class BaseConnection: # only a begin/commit and not a savepoint. self._savepoints: List[str] = [] - # Number of times each query was seen in order to prepare it. - # Map (query, types) -> name or number of times seen - self._prepared_statements: OrderedDict[ - Tuple[bytes, Tuple[int, ...]], Union[int, bytes] - ] = OrderedDict() - - # Counter to generate prepared statements names - self._prepared_idx = 0 + self._prepared = PrepareManager() wself = ref(self) @@ -249,6 +236,37 @@ class BaseConnection: for cb in self._notify_handlers: cb(n) + @property + def prepare_threshold(self) -> Optional[int]: + """ + Number of times a query is executed before it is prepared. + + - If it is set to 0, every query is prepared the first time is + executed. + - If it is set to `!None`, prepared statements are disabled on the + connection. + + Default value: 5 + """ + return self._prepared.prepare_threshold + + @prepare_threshold.setter + def prepare_threshold(self, value: Optional[int]) -> None: + self._prepared.prepare_threshold = value + + @property + def prepared_max(self) -> int: + """ + Maximum number of prepared statements on the connection. + + Default value: 100 + """ + return self._prepared.prepared_max + + @prepared_max.setter + def prepared_max(self, value: int) -> None: + self._prepared.prepared_max = value + # Generators to perform high-level operations on the connection # # These operations are expressed in terms of non-blocking generators diff --git a/psycopg3/psycopg3/cursor.py b/psycopg3/psycopg3/cursor.py index 4c97eccd4..b5301d3a2 100644 --- a/psycopg3/psycopg3/cursor.py +++ b/psycopg3/psycopg3/cursor.py @@ -5,10 +5,9 @@ psycopg3 cursor objects # Copyright (C) 2020 The Psycopg Team import sys -from enum import IntEnum, auto from types import TracebackType from typing import Any, AsyncIterator, Callable, Generic, Iterator, List -from typing import Optional, Sequence, Tuple, Type, TYPE_CHECKING, Union +from typing import Optional, Sequence, Type, TYPE_CHECKING from contextlib import contextmanager from . import errors as e @@ -18,6 +17,7 @@ from .copy import Copy, AsyncCopy from .proto import ConnectionType, Query, Params, DumpersMap, LoadersMap, PQGen from ._column import Column from ._queries import PostgresQuery +from ._preparing import Prepare if sys.version_info >= (3, 7): from contextlib import asynccontextmanager @@ -42,12 +42,6 @@ else: execute = generators.execute -class Prepare(IntEnum): - NO = auto() - YES = auto() - SHOULD = auto() - - class BaseCursor(Generic[ConnectionType]): ExecStatus = pq.ExecStatus @@ -174,7 +168,7 @@ class BaseCursor(Generic[ConnectionType]): pgq = self._convert_query(query, params) # Check if the query is prepared or needs preparing - prep, name = self._get_prepared(pgq, prepare) + prep, name = self._conn._prepared.get(pgq, prepare) if prep is Prepare.YES: # The query is already prepared self._send_query_prepared(name, pgq) @@ -198,91 +192,12 @@ class BaseCursor(Generic[ConnectionType]): # Update the prepare state of the query if prepare is not False: - cmd = self._maintain_prepared(pgq, results, prep, name) + cmd = self._conn._prepared.maintain(pgq, results, prep, name) if cmd: yield from self._conn._exec_command(cmd) self._execute_results(results) - def _get_prepared( - self, query: PostgresQuery, prepare: Optional[bool] = None - ) -> Tuple[Prepare, bytes]: - """ - Check if a query is prepared, tell back whether to prepare it. - """ - conn = self._conn - if prepare is False or conn.prepare_threshold is None: - # The user doesn't want this query to be prepared - return Prepare.NO, b"" - - key = (query.query, query.types) - value: Union[bytes, int] = conn._prepared_statements.get(key, 0) - if isinstance(value, bytes): - # The query was already prepared in this session - return Prepare.YES, value - - if value >= conn.prepare_threshold or prepare: - # The query has been executed enough times and needs to be prepared - name = f"_pg3_{conn._prepared_idx}".encode("utf-8") - conn._prepared_idx += 1 - return Prepare.SHOULD, name - else: - # The query is not to be prepared yet - return Prepare.NO, b"" - - def _maintain_prepared( - self, - query: PostgresQuery, - results: Sequence["PGresult"], - prep: Prepare, - name: bytes, - ) -> Optional[bytes]: - """Maintain the cache of he prepared statements.""" - # don't do anything if prepared statements are disabled - if self._conn.prepare_threshold is None: - return None - - cache = self._conn._prepared_statements - key = (query.query, query.types) - - # If we know the query already the cache size won't change - # So just update the count and record as last used - if key in cache: - if isinstance(cache[key], int): - if prep is Prepare.SHOULD: - cache[key] = name - else: - cache[key] += 1 # type: ignore # operator - cache.move_to_end(key) - return None - - # The query is not in cache. Let's see if we must add it - if len(results) != 1: - # We cannot prepare a multiple statement - return None - - result = results[0] - if ( - result.status != ExecStatus.TUPLES_OK - and result.status != ExecStatus.COMMAND_OK - ): - # We don't prepare failed queries or other weird results - return None - - # Ok, we got to the conclusion that this query is genuinely to prepare - cache[key] = name if prep is Prepare.SHOULD else 1 - - # Evict an old value from the cache; if it was prepared, deallocate it - # Do it only once: if the cache was resized, deallocate gradually - if len(cache) <= self._conn.prepared_max: - return None - - old_val = cache.popitem(last=False)[1] - if isinstance(old_val, bytes): - return b"DEALLOCATE " + old_val - else: - return None - def _executemany_gen( self, query: Query, params_seq: Sequence[Params] ) -> PQGen[None]: diff --git a/tests/test_prepared.py b/tests/test_prepared.py index 0ed57b419..a97894c52 100644 --- a/tests/test_prepared.py +++ b/tests/test_prepared.py @@ -77,7 +77,7 @@ def test_prepare_disable(conn): res.append(cur.fetchone()[0]) assert res == [0] * 10 - assert not conn._prepared_statements + assert not conn._prepared._prepared def test_no_prepare_multi(conn): @@ -140,10 +140,10 @@ def test_evict_lru(conn): conn.execute("select 'a'") conn.execute(f"select {i}") - assert len(conn._prepared_statements) == 5 - assert conn._prepared_statements[b"select 'a'", ()] == b"_pg3_0" + assert len(conn._prepared._prepared) == 5 + assert conn._prepared._prepared[b"select 'a'", ()] == b"_pg3_0" for i in [9, 8, 7, 6]: - assert conn._prepared_statements[f"select {i}".encode("utf8"), ()] == 1 + assert conn._prepared._prepared[f"select {i}".encode("utf8"), ()] == 1 cur = conn.execute("select statement from pg_prepared_statements") assert cur.fetchall() == [("select 'a'",)] @@ -156,9 +156,9 @@ def test_evict_lru_deallocate(conn): conn.execute("select 'a'") conn.execute(f"select {i}") - assert len(conn._prepared_statements) == 5 + assert len(conn._prepared._prepared) == 5 for i in [9, 8, 7, 6, "'a'"]: - assert conn._prepared_statements[ + assert conn._prepared._prepared[ f"select {i}".encode("utf8"), () ].startswith(b"_pg3_") diff --git a/tests/test_prepared_async.py b/tests/test_prepared_async.py index a6663d63c..a855f5bf7 100644 --- a/tests/test_prepared_async.py +++ b/tests/test_prepared_async.py @@ -83,7 +83,7 @@ async def test_prepare_disable(aconn): res.append((await cur.fetchone())[0]) assert res == [0] * 10 - assert not aconn._prepared_statements + assert not aconn._prepared._prepared async def test_no_prepare_multi(aconn): @@ -148,12 +148,10 @@ async def test_evict_lru(aconn): await aconn.execute("select 'a'") await aconn.execute(f"select {i}") - assert len(aconn._prepared_statements) == 5 - assert aconn._prepared_statements[b"select 'a'", ()] == b"_pg3_0" + assert len(aconn._prepared._prepared) == 5 + assert aconn._prepared._prepared[b"select 'a'", ()] == b"_pg3_0" for i in [9, 8, 7, 6]: - assert ( - aconn._prepared_statements[f"select {i}".encode("utf8"), ()] == 1 - ) + assert aconn._prepared._prepared[f"select {i}".encode("utf8"), ()] == 1 cur = await aconn.execute("select statement from pg_prepared_statements") assert await cur.fetchall() == [("select 'a'",)] @@ -166,9 +164,9 @@ async def test_evict_lru_deallocate(aconn): await aconn.execute("select 'a'") await aconn.execute(f"select {i}") - assert len(aconn._prepared_statements) == 5 + assert len(aconn._prepared._prepared) == 5 for i in [9, 8, 7, 6, "'a'"]: - assert aconn._prepared_statements[ + assert aconn._prepared._prepared[ f"select {i}".encode("utf8"), () ].startswith(b"_pg3_")