# Test with minimum dependencies versions
- {impl: c, python: "3.7", ext: min, postgres: "postgres:15"}
+ # Test with PyPy.
+ - {impl: python, python: "pypy3.9", postgres: "postgres:13"}
+ - {impl: python, python: "pypy3.10", postgres: "postgres:14"}
+
env:
PSYCOPG_IMPL: ${{ matrix.impl }}
DEPS: ./psycopg[test] ./psycopg_pool
echo "DEPS=$DEPS shapely" >> $GITHUB_ENV
echo "MARKERS=$MARKERS postgis" >> $GITHUB_ENV
+ - name: Exclude certain tests from pypy
+ if: ${{ startsWith(matrix.python, 'pypy') }}
+ run: |
+ echo "NOT_MARKERS=$NOT_MARKERS timing" >> $GITHUB_ENV
+
- name: Configure to use the oldest dependencies
if: ${{ matrix.ext == 'min' }}
run: |
In short, if you use a :ref:`supported system<supported-systems>`::
pip install --upgrade pip # upgrade pip to at least 20.3
- pip install "psycopg[binary]"
+ pip install "psycopg[binary]" # remove [binary] for PyPy
and you should be :ref:`ready to start <module-usage>`. Read further for
alternative ways to install.
- Python 3.6 supported before Psycopg 3.1
+- PyPy: from version 3.9 to 3.10
+
+ - **Note:** Only the pure Python version is supported.
+
- PostgreSQL: from version 10 to 16
- OS: Linux, macOS, Windows
For further information about the differences between the packages see
:ref:`pq-impl`.
+.. warning::
+
+ The binary installation is not supported by PyPy.
+
.. _local-installation:
pip install "psycopg[c]"
+.. warning::
+
+ The local installation is not supported by PyPy.
+
.. _pure-python-installation:
^^^^^^^^^^^^^^^^^^^^^^^^^^^
- Fix :ref:`interaction with gevent <gevent>` (:ticket:`#527`).
+- Add support for PyPy (:ticket:`#686`).
.. _gevent: https://www.gevent.org/
@impl.PQnoticeReceiver # type: ignore
def notice_receiver(arg: c_void_p, result_ptr: impl.PGresult_struct) -> None:
- pgconn = cast(arg, POINTER(py_object)).contents.value()
+ pgconn = cast(arg, POINTER(py_object)).contents.value
+ if callable(pgconn): # Not a weak reference on PyPy.
+ pgconn = pgconn()
+
if not (pgconn and pgconn.notice_handler):
return
Programming Language :: Python :: 3.10
Programming Language :: Python :: 3.11
Programming Language :: Python :: 3.12
+ Programming Language :: Python :: Implementation :: CPython
+ Programming Language :: Python :: Implementation :: PyPy
Topic :: Database
Topic :: Database :: Front-Ends
Topic :: Software Development
extras_require = {
# Install the C extension module (requires dev tools)
"c": [
- f"psycopg-c == {version}",
+ f"psycopg-c == {version}; implementation_name != 'pypy'",
],
# Install the stand-alone C extension module
"binary": [
- f"psycopg-binary == {version}",
+ f"psycopg-binary == {version}; implementation_name != 'pypy'",
],
# Install the connection pool
"pool": [
Operating System :: MacOS :: MacOS X
Operating System :: Microsoft :: Windows
Operating System :: POSIX
+ Programming Language :: Cython
Programming Language :: Python :: 3
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.10
Programming Language :: Python :: 3.11
Programming Language :: Python :: 3.12
+ Programming Language :: Python :: Implementation :: CPython
Topic :: Database
Topic :: Database :: Front-Ends
Topic :: Software Development
Programming Language :: Python :: 3.10
Programming Language :: Python :: 3.11
Programming Language :: Python :: 3.12
+ Programming Language :: Python :: Implementation :: CPython
+ Programming Language :: Python :: Implementation :: PyPy
Topic :: Database
Topic :: Database :: Front-Ends
Topic :: Software Development
+import gc
import sys
import asyncio
import selectors
-from typing import Any, Dict, List
+from typing import Any, Dict, List, Tuple
import pytest
terminalreporter.section("failed tests ignored")
for msg in allow_fail_messages:
terminalreporter.line(msg)
+
+
+NO_COUNT_TYPES: Tuple[type, ...] = ()
+
+if sys.version_info[:2] == (3, 10):
+ # On my laptop there are occasional creations of a single one of these objects
+ # with empty content, which might be some Decimal caching.
+ # Keeping the guard as strict as possible, to be extended if other types
+ # or versions are necessary.
+ try:
+ from _contextvars import Context # type: ignore
+ except ImportError:
+ pass
+ else:
+ NO_COUNT_TYPES += (Context,)
+
+
+class GCFixture:
+ __slots__ = ()
+
+ @staticmethod
+ def collect() -> None:
+ """
+ gc.collect(), but more insisting.
+ """
+ for i in range(3):
+ gc.collect()
+
+ @staticmethod
+ def count() -> int:
+ """
+ len(gc.get_objects()), with subtleties.
+ """
+
+ if not NO_COUNT_TYPES:
+ return len(gc.get_objects())
+
+ # Note: not using a list comprehension because it pollutes the objects list.
+ rv = 0
+ for obj in gc.get_objects():
+ if isinstance(obj, NO_COUNT_TYPES):
+ continue
+ rv += 1
+
+ return rv
+
+
+@pytest.fixture(name="gc")
+def fixture_gc():
+ """
+ Provides a consistent way to run garbage collection and count references.
+
+ **Note:** This will skip tests on PyPy.
+ """
+ if sys.implementation.name == "pypy":
+ pytest.skip(reason="depends on refcount semantics")
+ return GCFixture()
+
+
+@pytest.fixture
+def gc_collect():
+ """
+ Provides a consistent way to run garbage collection.
+
+ **Note:** This will *not* skip tests on PyPy.
+ """
+ return GCFixture.collect
from psycopg.adapt import PyFormat
from psycopg.types.numeric import Int4
-from ..utils import eur, gc_collect, gc_count
+from ..utils import eur
from ..test_copy import sample_text, sample_binary # noqa
from ..test_copy import ensure_table, sample_records
from ..test_copy import sample_tabledef as sample_tabledef_pg
[(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
)
@pytest.mark.crdb_skip("copy array")
-def test_copy_from_leaks(conn_cls, dsn, faker, fmt, set_types):
+def test_copy_from_leaks(conn_cls, dsn, faker, fmt, set_types, gc):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
for got, want in zip(recs, faker.records):
faker.assert_record(got, want)
- gc_collect()
+ gc.collect()
n = []
for i in range(3):
work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
from psycopg.adapt import PyFormat
from psycopg.types.numeric import Int4
-from ..utils import eur, gc_collect, gc_count
+from ..utils import eur
from ..test_copy import sample_text, sample_binary # noqa
from ..test_copy import sample_records
from ..test_copy_async import ensure_table
[(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
)
@pytest.mark.crdb_skip("copy array")
-async def test_copy_from_leaks(aconn_cls, dsn, faker, fmt, set_types):
+async def test_copy_from_leaks(aconn_cls, dsn, faker, fmt, set_types, gc):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
for got, want in zip(recs, faker.records):
faker.assert_record(got, want)
- gc_collect()
+ gc.collect()
n = []
for i in range(3):
await work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
# Not available when testing the binary package
libname = find_libpq_full_path()
assert libname, "libpq libname not found"
- return ctypes.pydll.LoadLibrary(libname)
+ return ctypes.cdll.LoadLibrary(libname)
except Exception as e:
if pq.__impl__ == "binary":
pytest.skip(f"can't load libpq for testing: {e}")
@pytest.mark.slow
-def test_del_stop_threads(dsn):
+def test_del_stop_threads(dsn, gc):
p = NullConnectionPool(dsn)
assert p._sched_runner is not None
ts = [p._sched_runner] + p._workers
del p
sleep(0.1)
+ gc.collect()
for t in ts:
assert not t.is_alive()
p2.putconn(conn)
-def test_del_no_warning(dsn, recwarn):
+def test_del_no_warning(dsn, recwarn, gc_collect):
p = pool.ConnectionPool(dsn, min_size=2)
with p.connection() as conn:
conn.execute("select 1")
p.wait()
ref = weakref.ref(p)
del p
+ gc_collect()
assert not ref()
assert not recwarn, [str(w.message) for w in recwarn.list]
@pytest.mark.slow
-def test_del_stop_threads(dsn):
+def test_del_stop_threads(dsn, gc):
p = pool.ConnectionPool(dsn)
assert p._sched_runner is not None
ts = [p._sched_runner] + p._workers
del p
sleep(0.1)
+ gc.collect()
for t in ts:
assert not t.is_alive()
import pytest
-from ..utils import gc_collect
-
try:
import psycopg_pool as pool
except ImportError:
@pytest.fixture
-def asyncio_run(anyio_backend_options, recwarn):
+def asyncio_run(anyio_backend_options, recwarn, gc_collect):
"""Fixture reuturning asyncio.run, but managing resources at exit.
In certain runs, fd objects are leaked and the error will only be caught
from psycopg import pq
import psycopg.generators
-from ..utils import gc_collect
-
def test_connectdb(dsn):
conn = pq.PGconn.connect(dsn.encode())
@pytest.mark.slow
-def test_weakref(dsn):
+def test_weakref(dsn, gc_collect):
conn = pq.PGconn.connect(dsn.encode())
w = weakref.ref(conn)
conn.finish()
from psycopg.postgres import types as builtins
from psycopg.types import TypeInfo
-from .utils import gc_collect, gc_count
from .test_cursor import my_row_factory
from .fix_crdb import is_crdb, crdb_encoding, crdb_time_precision
@pytest.mark.slow
-def test_weakref(conn):
+def test_weakref(conn, gc_collect):
cur = conn.cursor()
w = weakref.ref(cur)
cur.close()
@pytest.mark.slow
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
-def test_leak(conn_cls, dsn, faker, fetch, row_factory):
+def test_leak(conn_cls, dsn, faker, fetch, row_factory, gc):
faker.choose_schema(ncols=5)
faker.make_records(10)
row_factory = getattr(rows, row_factory)
pass
n = []
- gc_collect()
+ gc.collect()
for i in range(3):
work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
from psycopg.adapt import PyFormat
from psycopg.types import TypeInfo
-from .utils import alist, gc_collect, gc_count
+from .utils import alist
from .test_cursor import my_row_factory
from .test_cursor import execmany, _execmany # noqa: F401
from .fix_crdb import crdb_encoding
@pytest.mark.slow
-async def test_weakref(aconn):
+async def test_weakref(aconn, gc_collect):
cur = aconn.cursor()
w = weakref.ref(cur)
await cur.close()
@pytest.mark.slow
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
-async def test_leak(aconn_cls, dsn, faker, fetch, row_factory):
+async def test_leak(aconn_cls, dsn, faker, fetch, row_factory, gc):
faker.choose_schema(ncols=5)
faker.make_records(10)
row_factory = getattr(rows, row_factory)
pass
n = []
- gc_collect()
+ gc.collect()
for i in range(3):
await work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
from psycopg.rows import tuple_row
from psycopg.conninfo import conninfo_to_dict, make_conninfo
-from .utils import gc_collect
from .test_cursor import my_row_factory
from .test_adapt import make_bin_dumper, make_dumper
(pq.__impl__ in ("c", "binary") and sys.version_info[:2] == (3, 12)),
reason="Something with Exceptions, C, Python 3.12",
)
-def test_connection_warn_close(conn_cls, dsn, recwarn):
+def test_connection_warn_close(conn_cls, dsn, recwarn, gc_collect):
conn = conn_cls.connect(dsn)
conn.close()
del conn
+ gc_collect()
assert not recwarn, [str(w.message) for w in recwarn.list]
conn = conn_cls.connect(dsn)
del conn
+ gc_collect()
assert "IDLE" in str(recwarn.pop(ResourceWarning).message)
conn = conn_cls.connect(dsn)
conn.execute("select 1")
del conn
+ gc_collect()
assert "INTRANS" in str(recwarn.pop(ResourceWarning).message)
conn = conn_cls.connect(dsn)
with conn_cls.connect(dsn) as conn:
pass
del conn
+ gc_collect()
assert not recwarn, [str(w.message) for w in recwarn.list]
@pytest.mark.slow
-def test_weakref(conn_cls, dsn):
+def test_weakref(conn_cls, dsn, gc_collect):
conn = conn_cls.connect(dsn)
w = weakref.ref(conn)
conn.close()
from psycopg.rows import tuple_row
from psycopg.conninfo import conninfo_to_dict, make_conninfo
-from .utils import gc_collect
from .test_cursor import my_row_factory
from .test_connection import tx_params, tx_params_isolation, tx_values_map
from .test_connection import conninfo_params_timeout, drop_default_args_from_conninfo
aconn.cursor()
-async def test_connection_warn_close(aconn_cls, dsn, recwarn):
+async def test_connection_warn_close(aconn_cls, dsn, recwarn, gc_collect):
conn = await aconn_cls.connect(dsn)
await conn.close()
del conn
+ gc_collect()
assert not recwarn, [str(w.message) for w in recwarn.list]
conn = await aconn_cls.connect(dsn)
del conn
+ gc_collect()
assert "IDLE" in str(recwarn.pop(ResourceWarning).message)
conn = await aconn_cls.connect(dsn)
await conn.execute("select 1")
del conn
+ gc_collect()
assert "INTRANS" in str(recwarn.pop(ResourceWarning).message)
conn = await aconn_cls.connect(dsn)
except Exception:
pass
del conn
+ gc_collect()
assert "INERROR" in str(recwarn.pop(ResourceWarning).message)
async with await aconn_cls.connect(dsn) as conn:
pass
del conn
+ gc_collect()
assert not recwarn, [str(w.message) for w in recwarn.list]
@pytest.mark.slow
-async def test_weakref(aconn_cls, dsn):
+async def test_weakref(aconn_cls, dsn, gc_collect):
conn = await aconn_cls.connect(dsn)
w = weakref.ref(conn)
await conn.close()
from psycopg.types.hstore import register_hstore
from psycopg.types.numeric import Int4
-from .utils import eur, gc_collect, gc_count
+from .utils import eur
pytestmark = pytest.mark.crdb_skip("copy")
[(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
)
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
-def test_copy_to_leaks(conn_cls, dsn, faker, fmt, set_types, method):
+def test_copy_to_leaks(conn_cls, dsn, faker, fmt, set_types, method, gc):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
elif method == "rows":
list(copy.rows())
- gc_collect()
+ gc.collect()
n = []
for i in range(3):
work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
"fmt, set_types",
[(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
)
-def test_copy_from_leaks(conn_cls, dsn, faker, fmt, set_types):
+def test_copy_from_leaks(conn_cls, dsn, faker, fmt, set_types, gc):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
for got, want in zip(recs, faker.records):
faker.assert_record(got, want)
- gc_collect()
+ gc.collect()
n = []
for i in range(3):
work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
from psycopg.types.hstore import register_hstore
from psycopg.types.numeric import Int4
-from .utils import alist, eur, gc_collect, gc_count
+from .utils import alist, eur
from .test_copy import sample_text, sample_binary, sample_binary_rows # noqa
from .test_copy import sample_values, sample_records, sample_tabledef
from .test_copy import py_to_raw, special_chars
[(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
)
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
-async def test_copy_to_leaks(aconn_cls, dsn, faker, fmt, set_types, method):
+async def test_copy_to_leaks(aconn_cls, dsn, faker, fmt, set_types, method, gc):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
elif method == "rows":
await alist(copy.rows())
- gc_collect()
+ gc.collect()
n = []
for i in range(3):
await work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
"fmt, set_types",
[(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
)
-async def test_copy_from_leaks(aconn_cls, dsn, faker, fmt, set_types):
+async def test_copy_from_leaks(aconn_cls, dsn, faker, fmt, set_types, gc):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
for got, want in zip(recs, faker.records):
faker.assert_record(got, want)
- gc_collect()
+ gc.collect()
n = []
for i in range(3):
await work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
from psycopg.postgres import types as builtins
from psycopg.rows import RowMaker
-from .utils import gc_collect, gc_count
from .fix_crdb import is_crdb, crdb_encoding, crdb_time_precision
@pytest.mark.slow
-def test_weakref(conn):
+def test_weakref(conn, gc_collect):
cur = conn.cursor()
w = weakref.ref(cur)
cur.close()
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
-def test_leak(conn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory):
+def test_leak(conn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory, gc):
faker.format = fmt
faker.choose_schema(ncols=5)
faker.make_records(10)
pass
n = []
- gc_collect()
+ gc.collect()
for i in range(3):
work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
+
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
from psycopg import pq, sql, rows
from psycopg.adapt import PyFormat
-from .utils import gc_collect, gc_count
from .test_cursor import my_row_factory
from .test_cursor import execmany, _execmany # noqa: F401
from .fix_crdb import crdb_encoding
@pytest.mark.slow
-async def test_weakref(aconn):
+async def test_weakref(aconn, gc_collect):
cur = aconn.cursor()
w = weakref.ref(cur)
await cur.close()
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
-async def test_leak(aconn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory):
+async def test_leak(aconn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory, gc):
faker.format = fmt
faker.choose_schema(ncols=5)
faker.make_records(10)
pass
n = []
- gc_collect()
+ gc.collect()
for i in range(3):
await work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
from psycopg import pq
from psycopg import errors as e
-from .utils import eur, gc_collect
+from .utils import eur
from .fix_crdb import is_crdb
(pq.__impl__ in ("c", "binary") and sys.version_info[:2] == (3, 12)),
reason="Something with Exceptions, C, Python 3.12",
)
-def test_diag_survives_cursor(conn):
+def test_diag_survives_cursor(conn, gc_collect):
cur = conn.cursor()
with pytest.raises(e.Error) as exc:
cur.execute("select * from nosuchtable")
cur.fetchall()
-def test_warn_close(conn, recwarn):
+def test_warn_close(conn, recwarn, gc_collect):
recwarn.clear()
cur = conn.cursor("foo")
cur.execute("select generate_series(1, 10) as bar")
del cur
+ gc_collect()
assert ".close()" in str(recwarn.pop(ResourceWarning).message)
from psycopg import rows, errors as e
from psycopg.pq import Format
-pytestmark = [
- pytest.mark.crdb_skip("server-side cursor"),
-]
+pytestmark = pytest.mark.crdb_skip("server-side cursor")
async def test_init_row_factory(aconn):
await cur.fetchall()
-async def test_warn_close(aconn, recwarn):
+async def test_warn_close(aconn, recwarn, gc_collect):
recwarn.clear()
cur = aconn.cursor("foo")
await cur.execute("select generate_series(1, 10) as bar")
del cur
+ gc_collect()
assert ".close()" in str(recwarn.pop(ResourceWarning).message)
from psycopg.postgres import types as builtins
from psycopg.types.array import register_array
-from ..utils import gc_collect
-
tests_str = [
([[[[[["a"]]]]]], "{{{{{{a}}}}}}"),
@pytest.mark.slow
-def test_register_array_leak(conn):
+def test_register_array_leak(conn, gc_collect):
info = TypeInfo.fetch(conn, "date")
ntypes = []
for i in range(2):
-import gc
import re
-import sys
import operator
from typing import Callable, Optional, Tuple
return (ver_maj, ver_min, ver_fix)
-def gc_collect():
- """
- gc.collect(), but more insisting.
- """
- for i in range(3):
- gc.collect()
-
-
-NO_COUNT_TYPES: Tuple[type, ...] = ()
-
-if sys.version_info[:2] == (3, 10):
- # On my laptop there are occasional creations of a single one of these objects
- # with empty content, which might be some Decimal caching.
- # Keeping the guard as strict as possible, to be extended if other types
- # or versions are necessary.
- try:
- from _contextvars import Context # type: ignore
- except ImportError:
- pass
- else:
- NO_COUNT_TYPES += (Context,)
-
-
-def gc_count() -> int:
- """
- len(gc.get_objects()), with subtleties.
- """
- if not NO_COUNT_TYPES:
- return len(gc.get_objects())
-
- # Note: not using a list comprehension because it pollutes the objects list.
- rv = 0
- for obj in gc.get_objects():
- if isinstance(obj, NO_COUNT_TYPES):
- continue
- rv += 1
-
- return rv
-
-
async def alist(it):
return [i async for i in it]