Psycopg 3.1.11 (unreleased)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
+- Avoid caching the parsing results of large queries to avoid excessive memory
+ usage (:ticket:`#628`).
- Fix integer overflow in C/binary extension with OID > 2^31 (:ticket:`#630`).
- Fix building on Solaris and derivatives (:ticket:`#632`).
- Fix possible lack of critical section guard in async
# Copyright (C) 2020 The Psycopg Team
import re
-from typing import Any, Dict, List, Mapping, Match, NamedTuple, Optional
+from typing import Any, Callable, Dict, List, Mapping, Match, NamedTuple, Optional
from typing import Sequence, Tuple, Union, TYPE_CHECKING
from functools import lru_cache
+from typing_extensions import TypeAlias
from . import pq
from . import errors as e
if TYPE_CHECKING:
from .abc import Transformer
+MAX_CACHED_STATEMENT_LENGTH = 4096
+MAX_CACHED_STATEMENT_PARAMS = 50
+
class QueryPart(NamedTuple):
pre: bytes
bquery = query
if vars is not None:
- (
- self.query,
- self._want_formats,
- self._order,
- self._parts,
- ) = _query2pg(bquery, self._encoding)
+ # Avoid caching queries extremely long or with a huge number of
+ # parameters. They are usually generated by ORMs and have poor
+ # cacheablility (e.g. INSERT ... VALUES (...), (...) with varying
+ # numbers of tuples.
+ # see https://github.com/psycopg/psycopg/discussions/628
+ if (
+ len(bquery) <= MAX_CACHED_STATEMENT_LENGTH
+ and len(vars) <= MAX_CACHED_STATEMENT_PARAMS
+ ):
+ f: _Query2Pg = _query2pg
+ else:
+ f = _query2pg_nocache
+
+ (self.query, self._want_formats, self._order, self._parts) = f(
+ bquery, self._encoding
+ )
else:
self.query = bquery
self._want_formats = self._order = None
bquery = query
if vars is not None:
- (self.template, self._order, self._parts) = _query2pg_client(
- bquery, self._encoding
- )
+ if (
+ len(bquery) <= MAX_CACHED_STATEMENT_LENGTH
+ and len(vars) <= MAX_CACHED_STATEMENT_PARAMS
+ ):
+ f: _Query2PgClient = _query2pg_client
+ else:
+ f = _query2pg_client_nocache
+ (self.template, self._order, self._parts) = f(bquery, self._encoding)
else:
self.query = bquery
self._order = None
self.params = None
-@lru_cache()
-def _query2pg(
+_Query2Pg: TypeAlias = Callable[
+ [bytes, str], Tuple[bytes, List[PyFormat], Optional[List[str]], List[QueryPart]]
+]
+
+
+def _query2pg_nocache(
query: bytes, encoding: str
) -> Tuple[bytes, List[PyFormat], Optional[List[str]], List[QueryPart]]:
"""
return b"".join(chunks), formats, order, parts
-@lru_cache()
-def _query2pg_client(
+# Note: the cache size is 128 items, but someone has reported throwing ~12k
+# queries (of type `INSERT ... VALUES (...), (...)` with a varying amount of
+# records), and the resulting cache size is >100Mb. So, we will avoid to cache
+# large queries or queries with a large number of params. See
+# https://github.com/sqlalchemy/sqlalchemy/discussions/10270
+_query2pg = lru_cache()(_query2pg_nocache)
+
+
+_Query2PgClient: TypeAlias = Callable[
+ [bytes, str], Tuple[bytes, Optional[List[str]], List[QueryPart]]
+]
+
+
+def _query2pg_client_nocache(
query: bytes, encoding: str
) -> Tuple[bytes, Optional[List[str]], List[QueryPart]]:
"""
return b"".join(chunks), order, parts
+_query2pg_client = lru_cache()(_query2pg_client_nocache)
+
+
def _validate_and_reorder_params(
parts: List[QueryPart], vars: Params, order: Optional[List[str]]
) -> Sequence[Any]:
assert cur.statusmessage is None
+def test_query_parse_cache_size():
+ # Warning: testing internal structures. Test might need refactoring with the code.
+ cache = psycopg._queries._query2pg_client
+ cache.cache_clear()
+ ci = cache.cache_info()
+ h0, m0 = ci.hits, ci.misses
+ tests = [
+ (f"select 1 -- {'x' * 3500}", (), h0, m0 + 1),
+ (f"select 1 -- {'x' * 3500}", (), h0 + 1, m0 + 1),
+ (f"select 1 -- {'x' * 4500}", (), h0 + 1, m0 + 1),
+ (f"select 1 -- {'x' * 4500}", (), h0 + 1, m0 + 1),
+ (f"select 1 -- {'%s' * 40}", ("x",) * 40, h0 + 1, m0 + 2),
+ (f"select 1 -- {'%s' * 40}", ("x",) * 40, h0 + 2, m0 + 2),
+ (f"select 1 -- {'%s' * 60}", ("x",) * 60, h0 + 2, m0 + 2),
+ (f"select 1 -- {'%s' * 60}", ("x",) * 60, h0 + 2, m0 + 2),
+ ]
+ for i, (query, params, hits, misses) in enumerate(tests):
+ pq = psycopg._queries.PostgresClientQuery(psycopg.adapt.Transformer())
+ pq.convert(query, params)
+ ci = cache.cache_info()
+ assert ci.hits == hits, f"at {i}"
+ assert ci.misses == misses, f"at {i}"
+
+
def test_execute_sql(conn):
cur = conn.cursor()
cur.execute(sql.SQL("select {value}").format(value="hello"))
assert cur.statusmessage is None
+def test_query_parse_cache_size():
+ # Warning: testing internal structures. Test might need refactoring with the code.
+ cache = psycopg._queries._query2pg
+ cache.cache_clear()
+ ci = cache.cache_info()
+ h0, m0 = ci.hits, ci.misses
+ tests = [
+ (f"select 1 -- {'x' * 3500}", (), h0, m0 + 1),
+ (f"select 1 -- {'x' * 3500}", (), h0 + 1, m0 + 1),
+ (f"select 1 -- {'x' * 4500}", (), h0 + 1, m0 + 1),
+ (f"select 1 -- {'x' * 4500}", (), h0 + 1, m0 + 1),
+ (f"select 1 -- {'%s' * 40}", ("x",) * 40, h0 + 1, m0 + 2),
+ (f"select 1 -- {'%s' * 40}", ("x",) * 40, h0 + 2, m0 + 2),
+ (f"select 1 -- {'%s' * 60}", ("x",) * 60, h0 + 2, m0 + 2),
+ (f"select 1 -- {'%s' * 60}", ("x",) * 60, h0 + 2, m0 + 2),
+ ]
+ for i, (query, params, hits, misses) in enumerate(tests):
+ pq = psycopg._queries.PostgresQuery(psycopg.adapt.Transformer())
+ pq.convert(query, params)
+ ci = cache.cache_info()
+ assert ci.hits == hits, f"at {i}"
+ assert ci.misses == misses, f"at {i}"
+
+
def test_execute_many_results(conn):
cur = conn.cursor()
assert cur.nextset() is None