]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Prepared statements management moved to a purpose class
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 25 Dec 2020 17:43:56 +0000 (18:43 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 25 Dec 2020 17:43:56 +0000 (18:43 +0100)
docs/connection.rst
psycopg3/psycopg3/_preparing.py [new file with mode: 0644]
psycopg3/psycopg3/connection.py
psycopg3/psycopg3/cursor.py
tests/test_prepared.py
tests/test_prepared_async.py

index d1d325095c14bd2444012a48ce2b940cb97c03a6..567461cf638b638ae798ce8a8d5fc596d3fa81ef 100644 (file)
@@ -140,19 +140,13 @@ The `!Connection` class
 
 
     .. autoattribute:: prepare_threshold
-
-        Number of times a query is executed before it is prepared.
-
-        If it is set to 0, every query is prepared the first time is executed.
-        If it is set to `!None`, prepared statements are disabled on the
-        connection.
+        :annotation: Optional[int]
 
         See :ref:`prepared-statements` for details.
 
 
     .. autoattribute:: prepared_max
-
-        Maximum number of prepared statements on the connection.
+        :annotation: int
 
         If more queries need to be prepared, old ones are deallocated__.
 
diff --git a/psycopg3/psycopg3/_preparing.py b/psycopg3/psycopg3/_preparing.py
new file mode 100644 (file)
index 0000000..af2603c
--- /dev/null
@@ -0,0 +1,120 @@
+"""
+Support for prepared statements
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
+from enum import IntEnum, auto
+from typing import Optional, Sequence, Tuple, TYPE_CHECKING, Union
+from collections import OrderedDict
+
+from .pq import ExecStatus
+from ._queries import PostgresQuery
+
+if TYPE_CHECKING:
+    from .pq.proto import PGresult
+
+
+class Prepare(IntEnum):
+    NO = auto()
+    YES = auto()
+    SHOULD = auto()
+
+
+class PrepareManager:
+    # Number of times a query is executed before it is prepared.
+    prepare_threshold: Optional[int] = 5
+
+    # Maximum number of prepared statements on the connection.
+    prepared_max: int = 100
+
+    def __init__(self) -> None:
+        # Number of times each query was seen in order to prepare it.
+        # Map (query, types) -> name or number of times seen
+        #
+        # Note: with this implementation we keep the tally of up to 100
+        # queries, but most likely we will prepare way less than that. We might
+        # change that if we think it would be better.
+        self._prepared: OrderedDict[
+            Tuple[bytes, Tuple[int, ...]], Union[int, bytes]
+        ] = OrderedDict()
+
+        # Counter to generate prepared statements names
+        self._prepared_idx = 0
+
+    def get(
+        self, query: PostgresQuery, prepare: Optional[bool] = None
+    ) -> Tuple[Prepare, bytes]:
+        """
+        Check if a query is prepared, tell back whether to prepare it.
+        """
+        if prepare is False or self.prepare_threshold is None:
+            # The user doesn't want this query to be prepared
+            return Prepare.NO, b""
+
+        key = (query.query, query.types)
+        value: Union[bytes, int] = self._prepared.get(key, 0)
+        if isinstance(value, bytes):
+            # The query was already prepared in this session
+            return Prepare.YES, value
+
+        if value >= self.prepare_threshold or prepare:
+            # The query has been executed enough times and needs to be prepared
+            name = f"_pg3_{self._prepared_idx}".encode("utf-8")
+            self._prepared_idx += 1
+            return Prepare.SHOULD, name
+        else:
+            # The query is not to be prepared yet
+            return Prepare.NO, b""
+
+    def maintain(
+        self,
+        query: PostgresQuery,
+        results: Sequence["PGresult"],
+        prep: Prepare,
+        name: bytes,
+    ) -> Optional[bytes]:
+        """Maintain the cache of the prepared statements."""
+        # don't do anything if prepared statements are disabled
+        if self.prepare_threshold is None:
+            return None
+
+        key = (query.query, query.types)
+
+        # If we know the query already the cache size won't change
+        # So just update the count and record as last used
+        if key in self._prepared:
+            if isinstance(self._prepared[key], int):
+                if prep is Prepare.SHOULD:
+                    self._prepared[key] = name
+                else:
+                    self._prepared[key] += 1  # type: ignore  # operator
+            self._prepared.move_to_end(key)
+            return None
+
+        # The query is not in cache. Let's see if we must add it
+        if len(results) != 1:
+            # We cannot prepare a multiple statement
+            return None
+
+        result = results[0]
+        if (
+            result.status != ExecStatus.TUPLES_OK
+            and result.status != ExecStatus.COMMAND_OK
+        ):
+            # We don't prepare failed queries or other weird results
+            return None
+
+        # Ok, we got to the conclusion that this query is genuinely to prepare
+        self._prepared[key] = name if prep is Prepare.SHOULD else 1
+
+        # Evict an old value from the cache; if it was prepared, deallocate it
+        # Do it only once: if the cache was resized, deallocate gradually
+        if len(self._prepared) <= self.prepared_max:
+            return None
+
+        old_val = self._prepared.popitem(last=False)[1]
+        if isinstance(old_val, bytes):
+            return b"DEALLOCATE " + old_val
+        else:
+            return None
index b41e93f9b3cfcc35cbc3688e2f98b5d715368ad0..ee40d6b7de6a35ed5feb0768a167032930dcda23 100644 (file)
@@ -10,11 +10,10 @@ import logging
 import threading
 from types import TracebackType
 from typing import Any, AsyncIterator, Callable, Iterator, List, NamedTuple
-from typing import Optional, Tuple, Type, TYPE_CHECKING, TypeVar, Union
+from typing import Optional, Type, TYPE_CHECKING, TypeVar
 from weakref import ref, ReferenceType
 from functools import partial
 from contextlib import contextmanager
-from collections import OrderedDict
 
 if sys.version_info >= (3, 7):
     from contextlib import asynccontextmanager
@@ -32,6 +31,7 @@ from .proto import DumpersMap, LoadersMap, PQGen, PQGenConn, RV, Query, Params
 from .conninfo import make_conninfo
 from .generators import notifies
 from .transaction import Transaction, AsyncTransaction
+from ._preparing import PrepareManager
 
 logger = logging.getLogger(__name__)
 package_logger = logging.getLogger("psycopg3")
@@ -102,12 +102,6 @@ class BaseConnection:
 
     cursor_factory: Type["BaseCursor[Any]"]
 
-    # Number of times a query is executed before it is prepared.
-    prepare_threshold: Optional[int] = 5
-
-    # Maximum number of prepared statements on the connection.
-    prepared_max = 100
-
     def __init__(self, pgconn: "PGconn"):
         self.pgconn = pgconn  # TODO: document this
         self._autocommit = False
@@ -121,14 +115,7 @@ class BaseConnection:
         # only a begin/commit and not a savepoint.
         self._savepoints: List[str] = []
 
-        # Number of times each query was seen in order to prepare it.
-        # Map (query, types) -> name or number of times seen
-        self._prepared_statements: OrderedDict[
-            Tuple[bytes, Tuple[int, ...]], Union[int, bytes]
-        ] = OrderedDict()
-
-        # Counter to generate prepared statements names
-        self._prepared_idx = 0
+        self._prepared = PrepareManager()
 
         wself = ref(self)
 
@@ -249,6 +236,37 @@ class BaseConnection:
         for cb in self._notify_handlers:
             cb(n)
 
+    @property
+    def prepare_threshold(self) -> Optional[int]:
+        """
+        Number of times a query is executed before it is prepared.
+
+        - If it is set to 0, every query is prepared the first time is
+          executed.
+        - If it is set to `!None`, prepared statements are disabled on the
+          connection.
+
+        Default value: 5
+        """
+        return self._prepared.prepare_threshold
+
+    @prepare_threshold.setter
+    def prepare_threshold(self, value: Optional[int]) -> None:
+        self._prepared.prepare_threshold = value
+
+    @property
+    def prepared_max(self) -> int:
+        """
+        Maximum number of prepared statements on the connection.
+
+        Default value: 100
+        """
+        return self._prepared.prepared_max
+
+    @prepared_max.setter
+    def prepared_max(self, value: int) -> None:
+        self._prepared.prepared_max = value
+
     # Generators to perform high-level operations on the connection
     #
     # These operations are expressed in terms of non-blocking generators
index 4c97eccd4039a9176e42ba44537535e50ffb0ee7..b5301d3a27ba9bdb29c83f2571b259442c8e6bdb 100644 (file)
@@ -5,10 +5,9 @@ psycopg3 cursor objects
 # Copyright (C) 2020 The Psycopg Team
 
 import sys
-from enum import IntEnum, auto
 from types import TracebackType
 from typing import Any, AsyncIterator, Callable, Generic, Iterator, List
-from typing import Optional, Sequence, Tuple, Type, TYPE_CHECKING, Union
+from typing import Optional, Sequence, Type, TYPE_CHECKING
 from contextlib import contextmanager
 
 from . import errors as e
@@ -18,6 +17,7 @@ from .copy import Copy, AsyncCopy
 from .proto import ConnectionType, Query, Params, DumpersMap, LoadersMap, PQGen
 from ._column import Column
 from ._queries import PostgresQuery
+from ._preparing import Prepare
 
 if sys.version_info >= (3, 7):
     from contextlib import asynccontextmanager
@@ -42,12 +42,6 @@ else:
     execute = generators.execute
 
 
-class Prepare(IntEnum):
-    NO = auto()
-    YES = auto()
-    SHOULD = auto()
-
-
 class BaseCursor(Generic[ConnectionType]):
     ExecStatus = pq.ExecStatus
 
@@ -174,7 +168,7 @@ class BaseCursor(Generic[ConnectionType]):
         pgq = self._convert_query(query, params)
 
         # Check if the query is prepared or needs preparing
-        prep, name = self._get_prepared(pgq, prepare)
+        prep, name = self._conn._prepared.get(pgq, prepare)
         if prep is Prepare.YES:
             # The query is already prepared
             self._send_query_prepared(name, pgq)
@@ -198,91 +192,12 @@ class BaseCursor(Generic[ConnectionType]):
 
         # Update the prepare state of the query
         if prepare is not False:
-            cmd = self._maintain_prepared(pgq, results, prep, name)
+            cmd = self._conn._prepared.maintain(pgq, results, prep, name)
             if cmd:
                 yield from self._conn._exec_command(cmd)
 
         self._execute_results(results)
 
-    def _get_prepared(
-        self, query: PostgresQuery, prepare: Optional[bool] = None
-    ) -> Tuple[Prepare, bytes]:
-        """
-        Check if a query is prepared, tell back whether to prepare it.
-        """
-        conn = self._conn
-        if prepare is False or conn.prepare_threshold is None:
-            # The user doesn't want this query to be prepared
-            return Prepare.NO, b""
-
-        key = (query.query, query.types)
-        value: Union[bytes, int] = conn._prepared_statements.get(key, 0)
-        if isinstance(value, bytes):
-            # The query was already prepared in this session
-            return Prepare.YES, value
-
-        if value >= conn.prepare_threshold or prepare:
-            # The query has been executed enough times and needs to be prepared
-            name = f"_pg3_{conn._prepared_idx}".encode("utf-8")
-            conn._prepared_idx += 1
-            return Prepare.SHOULD, name
-        else:
-            # The query is not to be prepared yet
-            return Prepare.NO, b""
-
-    def _maintain_prepared(
-        self,
-        query: PostgresQuery,
-        results: Sequence["PGresult"],
-        prep: Prepare,
-        name: bytes,
-    ) -> Optional[bytes]:
-        """Maintain the cache of he prepared statements."""
-        # don't do anything if prepared statements are disabled
-        if self._conn.prepare_threshold is None:
-            return None
-
-        cache = self._conn._prepared_statements
-        key = (query.query, query.types)
-
-        # If we know the query already the cache size won't change
-        # So just update the count and record as last used
-        if key in cache:
-            if isinstance(cache[key], int):
-                if prep is Prepare.SHOULD:
-                    cache[key] = name
-                else:
-                    cache[key] += 1  # type: ignore  # operator
-            cache.move_to_end(key)
-            return None
-
-        # The query is not in cache. Let's see if we must add it
-        if len(results) != 1:
-            # We cannot prepare a multiple statement
-            return None
-
-        result = results[0]
-        if (
-            result.status != ExecStatus.TUPLES_OK
-            and result.status != ExecStatus.COMMAND_OK
-        ):
-            # We don't prepare failed queries or other weird results
-            return None
-
-        # Ok, we got to the conclusion that this query is genuinely to prepare
-        cache[key] = name if prep is Prepare.SHOULD else 1
-
-        # Evict an old value from the cache; if it was prepared, deallocate it
-        # Do it only once: if the cache was resized, deallocate gradually
-        if len(cache) <= self._conn.prepared_max:
-            return None
-
-        old_val = cache.popitem(last=False)[1]
-        if isinstance(old_val, bytes):
-            return b"DEALLOCATE " + old_val
-        else:
-            return None
-
     def _executemany_gen(
         self, query: Query, params_seq: Sequence[Params]
     ) -> PQGen[None]:
index 0ed57b41980435c6f4f27dc8a72b41546798c559..a97894c52bc533cd58dd64caf0161dfbdceee228 100644 (file)
@@ -77,7 +77,7 @@ def test_prepare_disable(conn):
         res.append(cur.fetchone()[0])
 
     assert res == [0] * 10
-    assert not conn._prepared_statements
+    assert not conn._prepared._prepared
 
 
 def test_no_prepare_multi(conn):
@@ -140,10 +140,10 @@ def test_evict_lru(conn):
         conn.execute("select 'a'")
         conn.execute(f"select {i}")
 
-    assert len(conn._prepared_statements) == 5
-    assert conn._prepared_statements[b"select 'a'", ()] == b"_pg3_0"
+    assert len(conn._prepared._prepared) == 5
+    assert conn._prepared._prepared[b"select 'a'", ()] == b"_pg3_0"
     for i in [9, 8, 7, 6]:
-        assert conn._prepared_statements[f"select {i}".encode("utf8"), ()] == 1
+        assert conn._prepared._prepared[f"select {i}".encode("utf8"), ()] == 1
 
     cur = conn.execute("select statement from pg_prepared_statements")
     assert cur.fetchall() == [("select 'a'",)]
@@ -156,9 +156,9 @@ def test_evict_lru_deallocate(conn):
         conn.execute("select 'a'")
         conn.execute(f"select {i}")
 
-    assert len(conn._prepared_statements) == 5
+    assert len(conn._prepared._prepared) == 5
     for i in [9, 8, 7, 6, "'a'"]:
-        assert conn._prepared_statements[
+        assert conn._prepared._prepared[
             f"select {i}".encode("utf8"), ()
         ].startswith(b"_pg3_")
 
index a6663d63c6ce502f92cec3392b79ff53d2e74230..a855f5bf780a9e838caec4151fc313dd1899bcbb 100644 (file)
@@ -83,7 +83,7 @@ async def test_prepare_disable(aconn):
         res.append((await cur.fetchone())[0])
 
     assert res == [0] * 10
-    assert not aconn._prepared_statements
+    assert not aconn._prepared._prepared
 
 
 async def test_no_prepare_multi(aconn):
@@ -148,12 +148,10 @@ async def test_evict_lru(aconn):
         await aconn.execute("select 'a'")
         await aconn.execute(f"select {i}")
 
-    assert len(aconn._prepared_statements) == 5
-    assert aconn._prepared_statements[b"select 'a'", ()] == b"_pg3_0"
+    assert len(aconn._prepared._prepared) == 5
+    assert aconn._prepared._prepared[b"select 'a'", ()] == b"_pg3_0"
     for i in [9, 8, 7, 6]:
-        assert (
-            aconn._prepared_statements[f"select {i}".encode("utf8"), ()] == 1
-        )
+        assert aconn._prepared._prepared[f"select {i}".encode("utf8"), ()] == 1
 
     cur = await aconn.execute("select statement from pg_prepared_statements")
     assert await cur.fetchall() == [("select 'a'",)]
@@ -166,9 +164,9 @@ async def test_evict_lru_deallocate(aconn):
         await aconn.execute("select 'a'")
         await aconn.execute(f"select {i}")
 
-    assert len(aconn._prepared_statements) == 5
+    assert len(aconn._prepared._prepared) == 5
     for i in [9, 8, 7, 6, "'a'"]:
-        assert aconn._prepared_statements[
+        assert aconn._prepared._prepared[
             f"select {i}".encode("utf8"), ()
         ].startswith(b"_pg3_")