from __future__ import annotations
import json
+import logging
+from types import CodeType
from typing import Any, Callable
+from warnings import warn
+from threading import Lock
from .. import _oids, abc
from .. import errors as e
from ..pq import Format
from ..adapt import AdaptersMap, Buffer, Dumper, Loader, PyFormat
from ..errors import DataError
-from .._compat import TypeAlias, cache
+from .._compat import TypeAlias
JsonDumpsFunction: TypeAlias = Callable[[Any], "str | bytes"]
JsonLoadsFunction: TypeAlias = Callable[["str | bytes"], Any]
+_AdapterKey: TypeAlias = tuple[type, CodeType]
+
+logger = logging.getLogger("psycopg")
def set_json_dumps(
# Cache all dynamically-generated types to avoid leaks in case the types
# cannot be GC'd.
+_dumpers_cache: dict[_AdapterKey, type[abc.Dumper]] = {}
+_loaders_cache: dict[_AdapterKey, type[abc.Loader]] = {}
+
+
+def _make_dumper(
+ base: type[abc.Dumper], dumps: JsonDumpsFunction, __lock: Lock = Lock()
+) -> type[abc.Dumper]:
+ with __lock:
+ if key := _get_adapter_key(base, dumps):
+ try:
+ return _dumpers_cache[key]
+ except KeyError:
+ pass
+
+ if not (name := base.__name__).startswith("Custom"):
+ name = f"Custom{name}"
+ rv = type(name, (base,), {"_dumps": dumps})
+
+ if key:
+ _dumpers_cache[key] = rv
+
+ return rv
+
-@cache
-def _make_dumper(base: type[abc.Dumper], dumps: JsonDumpsFunction) -> type[abc.Dumper]:
- if not (name := base.__name__).startswith("Custom"):
- name = f"Custom{name}"
- return type(name, (base,), {"_dumps": dumps})
+def _make_loader(
+ base: type[Loader], loads: JsonLoadsFunction, __lock: Lock = Lock()
+) -> type[abc.Loader]:
+ with __lock:
+ if key := _get_adapter_key(base, loads):
+ try:
+ return _loaders_cache[key]
+ except KeyError:
+ pass
+ if not (name := base.__name__).startswith("Custom"):
+ name = f"Custom{name}"
+ rv = type(name, (base,), {"_loads": loads})
-@cache
-def _make_loader(base: type[Loader], loads: JsonLoadsFunction) -> type[Loader]:
- if not (name := base.__name__).startswith("Custom"):
- name = f"Custom{name}"
- return type(name, (base,), {"_loads": loads})
+ if key:
+ _loaders_cache[key] = rv
+
+ return rv
+
+
+def _get_adapter_key(t: type, f: Callable[..., Any]) -> _AdapterKey | None:
+ """
+ Return an adequate caching key for a dumps/loads function and a base type.
+
+ We can't use just the function, even if it is hashable, because different
+ lambda expression will have a different hash. The code, instead, will be
+ the same if a lambda if defined in a function, so we can use it as a more
+ stable hash key.
+ """
+ # Check if there's an unexpected Python implementation that doesn't define
+ # these dunder attributes. If thta's the case, raise a warning, which will
+ # crash our test suite and/or hopefully will be detected by the user.
+ try:
+ f.__code__
+ f.__closure__
+ except AttributeError:
+ warn(f"function {f} has no __code__ or __closure__.", RuntimeWarning)
+ return None
+
+ # If there is a closure, the same code might have different effects
+ # according to the closure arguments. We could do something funny like
+ # using the closure values to build a cache key, but I am not 100% sure
+ # about whether the closure objects are always `cell` (the type says it's
+ # `cell | Any`) and the solution would be partial anyway because of
+ # non-hashable closure objects, therefore let's just give a warning (which
+ # can be detected via logging) and avoid to create a leak.
+ if f.__closure__:
+ logger.warning(
+ "using a closure in a dumps/loads function may cause a resource leak"
+ )
+ return None
+
+ return (t, f.__code__)
class _JsonWrapper:
import json
+import logging
from copy import deepcopy
+from typing import Any
import pytest
assert got["answer"] == 42
+@pytest.mark.parametrize("binary", [True, False])
+@pytest.mark.parametrize("pgtype", ["json", "jsonb"])
+def test_dump_leak_with_local_functions(dsn, binary, pgtype, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+
+ # Note: private implementation, it might change
+ from psycopg.types.json import _dumpers_cache
+
+ # A function with no closure is cached on the code, so lambdas are not
+ # different items.
+
+ def register(conn: psycopg.Connection) -> None:
+ set_json_dumps(lambda x: json.dumps(x), conn)
+
+ with psycopg.connect(dsn) as conn1:
+ register(conn1)
+ assert (size1 := len(_dumpers_cache))
+
+ with psycopg.connect(dsn) as conn2:
+ register(conn2)
+ size2 = len(_dumpers_cache)
+
+ assert size1 == size2
+ assert not caplog.records
+
+ # A function with a closure won't be cached, but will cause a warning
+
+ def register2(conn: psycopg.Connection, skipkeys: bool) -> None:
+ def f(x: Any) -> str:
+ return json.dumps(x, skipkeys=skipkeys)
+
+ set_json_dumps(f, conn)
+
+ with psycopg.connect(dsn) as conn3:
+ register2(conn3, False)
+ size3 = len(_dumpers_cache)
+
+ assert size2 == size3
+ assert caplog.records
+
+
+@pytest.mark.parametrize("binary", [True, False])
+@pytest.mark.parametrize("pgtype", ["json", "jsonb"])
+def test_load_leak_with_local_functions(dsn, binary, pgtype, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+
+ # Note: private implementation, it might change
+ from psycopg.types.json import _loaders_cache
+
+ # A function with no closure is cached on the code, so lambdas are not
+ # different items.
+
+ def register(conn: psycopg.Connection) -> None:
+
+ def f(x: str | bytes) -> Any:
+ return json.loads(x)
+
+ set_json_loads(f, conn)
+
+ with psycopg.connect(dsn) as conn1:
+ register(conn1)
+ assert (size1 := len(_loaders_cache))
+
+ with psycopg.connect(dsn) as conn2:
+ register(conn2)
+ size2 = len(_loaders_cache)
+
+ assert size1 == size2
+ assert not caplog.records
+
+ # A function with a closure won't be cached, but will cause a warning
+
+ def register2(conn: psycopg.Connection, parse_float: Any) -> None:
+ set_json_dumps(lambda x: json.dumps(x, parse_float=parse_float), conn)
+
+ with psycopg.connect(dsn) as conn3:
+ register2(conn3, None)
+ size3 = len(_loaders_cache)
+
+ assert size2 == size3
+ assert caplog.records
+
+
def my_dumps(obj):
obj = deepcopy(obj)
obj["baz"] = "qux"