+import gc
import sys
import asyncio
import selectors
-from typing import Any, Dict, List
+from typing import Any, Dict, List, Tuple
import pytest
terminalreporter.section("failed tests ignored")
for msg in allow_fail_messages:
terminalreporter.line(msg)
+
+
+NO_COUNT_TYPES: Tuple[type, ...] = ()
+
+if sys.version_info[:2] == (3, 10):
+ # On my laptop there are occasional creations of a single one of these objects
+ # with empty content, which might be some Decimal caching.
+ # Keeping the guard as strict as possible, to be extended if other types
+ # or versions are necessary.
+ try:
+ from _contextvars import Context # type: ignore
+ except ImportError:
+ pass
+ else:
+ NO_COUNT_TYPES += (Context,)
+
+
+class GCFixture:
+ __slots__ = ()
+
+ @staticmethod
+ def collect() -> None:
+ """
+ gc.collect(), but more insisting.
+ """
+ for i in range(3):
+ gc.collect()
+
+ @staticmethod
+ def count() -> int:
+ """
+ len(gc.get_objects()), with subtleties.
+ """
+
+ if not NO_COUNT_TYPES:
+ return len(gc.get_objects())
+
+ # Note: not using a list comprehension because it pollutes the objects list.
+ rv = 0
+ for obj in gc.get_objects():
+ if isinstance(obj, NO_COUNT_TYPES):
+ continue
+ rv += 1
+
+ return rv
+
+
+@pytest.fixture(name="gc")
+def fixture_gc():
+ """
+ Provides a consistent way to run garbage collection and count references.
+
+ **Note:** This will skip tests on PyPy.
+ """
+ if sys.implementation.name == "pypy":
+ pytest.skip(reason="depends on refcount semantics")
+ return GCFixture()
+
+
+@pytest.fixture
+def gc_collect():
+ """
+ Provides a consistent way to run garbage collection.
+
+ **Note:** This will *not* skip tests on PyPy.
+ """
+ return GCFixture.collect
from psycopg.adapt import PyFormat
from psycopg.types.numeric import Int4
-from ..utils import eur, gc_collect, gc_count
+from ..utils import eur
from .._test_copy import sample_text, sample_binary # noqa
from .._test_copy import ensure_table, sample_records
from .._test_copy import sample_tabledef as sample_tabledef_pg
[(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
)
@pytest.mark.crdb_skip("copy array")
-def test_copy_from_leaks(conn_cls, dsn, faker, fmt, set_types):
+def test_copy_from_leaks(conn_cls, dsn, faker, fmt, set_types, gc):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
for got, want in zip(recs, faker.records):
faker.assert_record(got, want)
- gc_collect()
+ gc.collect()
n = []
for i in range(3):
work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
from psycopg.adapt import PyFormat
from psycopg.types.numeric import Int4
-from ..utils import eur, gc_collect, gc_count
+from ..utils import eur
from .._test_copy import sample_text, sample_binary # noqa
from .._test_copy import ensure_table_async, sample_records
from .test_copy import sample_tabledef, copyopt
[(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
)
@pytest.mark.crdb_skip("copy array")
-async def test_copy_from_leaks(aconn_cls, dsn, faker, fmt, set_types):
+async def test_copy_from_leaks(aconn_cls, dsn, faker, fmt, set_types, gc):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
for got, want in zip(recs, faker.records):
faker.assert_record(got, want)
- gc_collect()
+ gc.collect()
n = []
for i in range(3):
await work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
from psycopg.pq import TransactionStatus
from psycopg.rows import class_row, Row, TupleRow
-from ..utils import assert_type, Counter, gc_collect, set_autocommit
+from ..utils import assert_type, Counter, set_autocommit
from ..acompat import Event, spawn, gather, sleep, skip_sync
from .test_pool_common import delay_connection
assert "BAD" in caplog.records[2].message
-def test_del_no_warning(dsn, recwarn):
+def test_del_no_warning(dsn, recwarn, gc_collect):
p = pool.ConnectionPool(dsn, min_size=2, open=False)
p.open()
with p.connection() as conn:
from psycopg.pq import TransactionStatus
from psycopg.rows import class_row, Row, TupleRow
-from ..utils import assert_type, Counter, gc_collect, set_autocommit
+from ..utils import assert_type, Counter, set_autocommit
from ..acompat import AEvent, spawn, gather, asleep, skip_sync
from .test_pool_common_async import delay_connection
assert "BAD" in caplog.records[2].message
-async def test_del_no_warning(dsn, recwarn):
+async def test_del_no_warning(dsn, recwarn, gc_collect):
p = pool.AsyncConnectionPool(dsn, min_size=2, open=False)
await p.open()
async with p.connection() as conn:
import pytest
-from ..utils import gc_collect
-
try:
import psycopg_pool as pool
except ImportError:
@pytest.fixture
-def asyncio_run(recwarn):
+def asyncio_run(recwarn, gc_collect):
"""Fixture reuturning asyncio.run, but managing resources at exit.
In certain runs, fd objects are leaked and the error will only be caught
import psycopg
-from ..utils import gc_collect, set_autocommit
+from ..utils import set_autocommit
from ..acompat import Event, spawn, gather, sleep, is_alive, skip_async, skip_sync
try:
@skip_async
@pytest.mark.slow
-def test_del_stops_threads(pool_cls, dsn):
+def test_del_stops_threads(pool_cls, dsn, gc):
p = pool_cls(dsn)
assert p._sched_runner is not None
ts = [p._sched_runner] + p._workers
del p
- gc_collect()
+ gc.collect()
sleep(0.1)
for t in ts:
assert not is_alive(t), t
import psycopg
-from ..utils import gc_collect, set_autocommit
+from ..utils import set_autocommit
from ..acompat import AEvent, spawn, gather, asleep, is_alive, skip_async, skip_sync
try:
@skip_async
@pytest.mark.slow
-async def test_del_stops_threads(pool_cls, dsn):
+async def test_del_stops_threads(pool_cls, dsn, gc):
p = pool_cls(dsn)
assert p._sched_runner is not None
ts = [p._sched_runner] + p._workers
del p
- gc_collect()
+ gc.collect()
await asleep(0.1)
for t in ts:
assert not is_alive(t), t
from psycopg import pq
import psycopg.generators
-from ..utils import gc_collect
-
def test_connectdb(dsn):
conn = pq.PGconn.connect(dsn.encode())
@pytest.mark.slow
-def test_weakref(dsn):
+def test_weakref(dsn, gc_collect):
conn = pq.PGconn.connect(dsn.encode())
w = weakref.ref(conn)
conn.finish()
from psycopg.rows import tuple_row
from psycopg.conninfo import conninfo_to_dict, make_conninfo
-from .utils import gc_collect
from .acompat import is_async, skip_sync, skip_async
from ._test_cursor import my_row_factory
from ._test_connection import tx_params, tx_params_isolation, tx_values_map
and (not is_async(__name__)),
reason="Something with Exceptions, C, Python 3.12",
)
-def test_connection_warn_close(conn_cls, dsn, recwarn):
+def test_connection_warn_close(conn_cls, dsn, recwarn, gc_collect):
conn = conn_cls.connect(dsn)
conn.close()
del conn
@pytest.mark.slow
-def test_weakref(conn_cls, dsn):
+def test_weakref(conn_cls, dsn, gc_collect):
conn = conn_cls.connect(dsn)
w = weakref.ref(conn)
conn.close()
from psycopg.rows import tuple_row
from psycopg.conninfo import conninfo_to_dict, make_conninfo
-from .utils import gc_collect
from .acompat import is_async, skip_sync, skip_async
from ._test_cursor import my_row_factory
from ._test_connection import tx_params, tx_params_isolation, tx_values_map
and not is_async(__name__),
reason="Something with Exceptions, C, Python 3.12",
)
-async def test_connection_warn_close(aconn_cls, dsn, recwarn):
+async def test_connection_warn_close(aconn_cls, dsn, recwarn, gc_collect):
conn = await aconn_cls.connect(dsn)
await conn.close()
del conn
@pytest.mark.slow
-async def test_weakref(aconn_cls, dsn):
+async def test_weakref(aconn_cls, dsn, gc_collect):
conn = await aconn_cls.connect(dsn)
w = weakref.ref(conn)
await conn.close()
# DO NOT CHANGE! Change the original file instead.
import string
import hashlib
-import sys
from io import BytesIO, StringIO
from random import choice, randrange
from itertools import cycle
from psycopg.types.hstore import register_hstore
from psycopg.types.numeric import Int4
-from .utils import eur, gc_collect, gc_count
+from .utils import eur
from ._test_copy import sample_text, sample_binary, sample_binary_rows # noqa
from ._test_copy import sample_values, sample_records, sample_tabledef
from ._test_copy import ensure_table, py_to_raw, special_chars, FileWriter
@pytest.mark.slow
-@pytest.mark.skipif(
- sys.implementation.name == "pypy", reason="depends on refcount semantics"
-)
@pytest.mark.parametrize(
"fmt, set_types", [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)]
)
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
-def test_copy_to_leaks(conn_cls, dsn, faker, fmt, set_types, method):
+def test_copy_to_leaks(conn_cls, dsn, faker, fmt, set_types, method, gc):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
elif method == "rows":
list(copy.rows())
- gc_collect()
+ gc.collect()
n = []
for i in range(3):
work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
@pytest.mark.slow
-@pytest.mark.skipif(
- sys.implementation.name == "pypy", reason="depends on refcount semantics"
-)
@pytest.mark.parametrize(
"fmt, set_types", [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)]
)
-def test_copy_from_leaks(conn_cls, dsn, faker, fmt, set_types):
+def test_copy_from_leaks(conn_cls, dsn, faker, fmt, set_types, gc):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
for got, want in zip(recs, faker.records):
faker.assert_record(got, want)
- gc_collect()
+ gc.collect()
n = []
for i in range(3):
work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
import string
import hashlib
-import sys
from io import BytesIO, StringIO
from random import choice, randrange
from itertools import cycle
from psycopg.types.hstore import register_hstore
from psycopg.types.numeric import Int4
-from .utils import eur, gc_collect, gc_count
+from .utils import eur
from .acompat import alist
from ._test_copy import sample_text, sample_binary, sample_binary_rows # noqa
from ._test_copy import sample_values, sample_records, sample_tabledef
@pytest.mark.slow
-@pytest.mark.skipif(
- sys.implementation.name == "pypy", reason="depends on refcount semantics"
-)
@pytest.mark.parametrize(
"fmt, set_types",
[(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
)
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
-async def test_copy_to_leaks(aconn_cls, dsn, faker, fmt, set_types, method):
+async def test_copy_to_leaks(aconn_cls, dsn, faker, fmt, set_types, method, gc):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
elif method == "rows":
await alist(copy.rows())
- gc_collect()
+ gc.collect()
n = []
for i in range(3):
await work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
@pytest.mark.slow
-@pytest.mark.skipif(
- sys.implementation.name == "pypy", reason="depends on refcount semantics"
-)
@pytest.mark.parametrize(
"fmt, set_types",
[(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
)
-async def test_copy_from_leaks(aconn_cls, dsn, faker, fmt, set_types):
+async def test_copy_from_leaks(aconn_cls, dsn, faker, fmt, set_types, gc):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
for got, want in zip(recs, faker.records):
faker.assert_record(got, want)
- gc_collect()
+ gc.collect()
n = []
for i in range(3):
await work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
import pytest
import psycopg
-import sys
from psycopg import pq, rows, errors as e
from psycopg.adapt import PyFormat
-from .utils import gc_collect, gc_count
-
def test_default_cursor(conn):
cur = conn.cursor()
@pytest.mark.slow
-@pytest.mark.skipif(
- sys.implementation.name == "pypy", reason="depends on refcount semantics"
-)
@pytest.mark.parametrize("fmt", PyFormat)
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
-def test_leak(conn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory):
+def test_leak(conn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory, gc):
faker.format = fmt
faker.choose_schema(ncols=5)
faker.make_records(10)
pass
n = []
- gc_collect()
+ gc.collect()
for i in range(3):
work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
import pytest
import psycopg
-import sys
from psycopg import pq, rows, errors as e
from psycopg.adapt import PyFormat
-from .utils import gc_collect, gc_count
-
async def test_default_cursor(aconn):
cur = aconn.cursor()
@pytest.mark.slow
-@pytest.mark.skipif(
- sys.implementation.name == "pypy", reason="depends on refcount semantics"
-)
@pytest.mark.parametrize("fmt", PyFormat)
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
-async def test_leak(aconn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory):
+async def test_leak(aconn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory, gc):
faker.format = fmt
faker.choose_schema(ncols=5)
faker.make_records(10)
pass
n = []
- gc_collect()
+ gc.collect()
for i in range(3):
await work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
import pytest
import psycopg
-import sys
from psycopg import rows
-from .utils import gc_collect, gc_count
from .fix_crdb import crdb_encoding
@pytest.mark.slow
-@pytest.mark.skipif(
- sys.implementation.name == "pypy", reason="depends on refcount semantics"
-)
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
-def test_leak(conn_cls, dsn, faker, fetch, row_factory):
+def test_leak(conn_cls, dsn, faker, fetch, row_factory, gc):
faker.choose_schema(ncols=5)
faker.make_records(10)
row_factory = getattr(rows, row_factory)
pass
n = []
- gc_collect()
+ gc.collect()
for i in range(3):
work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
import pytest
import psycopg
-import sys
from psycopg import rows
-from .utils import gc_collect, gc_count
from .fix_crdb import crdb_encoding
@pytest.mark.slow
-@pytest.mark.skipif(
- sys.implementation.name == "pypy", reason="depends on refcount semantics"
-)
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
-async def test_leak(aconn_cls, dsn, faker, fetch, row_factory):
+async def test_leak(aconn_cls, dsn, faker, fetch, row_factory, gc):
faker.choose_schema(ncols=5)
faker.make_records(10)
row_factory = getattr(rows, row_factory)
pass
n = []
- gc_collect()
+ gc.collect()
for i in range(3):
await work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
from psycopg.adapt import PyFormat
from psycopg.types import TypeInfo
-from .utils import gc_collect, raiseif
+from .utils import raiseif
from .acompat import closing
from .fix_crdb import crdb_encoding
from ._test_cursor import my_row_factory, ph
@pytest.mark.slow
-def test_weakref(conn):
+def test_weakref(conn, gc_collect):
cur = conn.cursor()
w = weakref.ref(cur)
cur.close()
from psycopg.adapt import PyFormat
from psycopg.types import TypeInfo
-from .utils import gc_collect, raiseif
+from .utils import raiseif
from .acompat import aclosing, alist, anext
from .fix_crdb import crdb_encoding
from ._test_cursor import my_row_factory, ph
@pytest.mark.slow
-async def test_weakref(aconn):
+async def test_weakref(aconn, gc_collect):
cur = aconn.cursor()
w = weakref.ref(cur)
await cur.close()
# DO NOT CHANGE! Change the original file instead.
import pytest
import psycopg
-import sys
from psycopg import pq, rows, errors as e
from psycopg.adapt import PyFormat
from ._test_cursor import ph
-from .utils import gc_collect, gc_count
@pytest.fixture
@pytest.mark.slow
-@pytest.mark.skipif(
- sys.implementation.name == "pypy", reason="depends on refcount semantics"
-)
@pytest.mark.parametrize("fmt", PyFormat)
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
-def test_leak(conn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory):
+def test_leak(conn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory, gc):
faker.format = fmt
faker.choose_schema(ncols=5)
faker.make_records(10)
pass
n = []
- gc_collect()
+ gc.collect()
for i in range(3):
work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
import pytest
import psycopg
-import sys
from psycopg import pq, rows, errors as e
from psycopg.adapt import PyFormat
from ._test_cursor import ph
-from .utils import gc_collect, gc_count
@pytest.fixture
@pytest.mark.slow
-@pytest.mark.skipif(
- sys.implementation.name == "pypy", reason="depends on refcount semantics"
-)
@pytest.mark.parametrize("fmt", PyFormat)
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
-async def test_leak(aconn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory):
+async def test_leak(aconn_cls, dsn, faker, fmt, fmt_out, fetch, row_factory, gc):
faker.format = fmt
faker.choose_schema(ncols=5)
faker.make_records(10)
pass
n = []
- gc_collect()
+ gc.collect()
for i in range(3):
await work()
- gc_collect()
- n.append(gc_count())
+ gc.collect()
+ n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
from psycopg import rows, errors as e
from psycopg.pq import Format
-from .utils import gc_collect
pytestmark = pytest.mark.crdb_skip("server-side cursor")
cur.fetchall()
-def test_warn_close(conn, recwarn):
+def test_warn_close(conn, recwarn, gc_collect):
recwarn.clear()
cur = conn.cursor("foo")
cur.execute("select generate_series(1, 10) as bar")
from psycopg.pq import Format
from .acompat import alist
-from .utils import gc_collect
+
pytestmark = pytest.mark.crdb_skip("server-side cursor")
await cur.fetchall()
-async def test_warn_close(aconn, recwarn):
+async def test_warn_close(aconn, recwarn, gc_collect):
recwarn.clear()
cur = aconn.cursor("foo")
await cur.execute("select generate_series(1, 10) as bar")
from psycopg import pq
from psycopg import errors as e
-from .utils import eur, gc_collect
+from .utils import eur
from .fix_crdb import is_crdb
(pq.__impl__ in ("c", "binary") and sys.version_info[:2] == (3, 12)),
reason="Something with Exceptions, C, Python 3.12",
)
-def test_diag_survives_cursor(conn):
+def test_diag_survives_cursor(conn, gc_collect):
cur = conn.cursor()
with pytest.raises(e.Error) as exc:
cur.execute("select * from nosuchtable")
from psycopg.postgres import types as builtins
from psycopg.types.array import register_array
-from ..utils import gc_collect
-
tests_str = [
([[[[[["a"]]]]]], "{{{{{{a}}}}}}"),
@pytest.mark.slow
-def test_register_array_leak(conn):
+def test_register_array_leak(conn, gc_collect):
info = TypeInfo.fetch(conn, "date")
ntypes = []
for i in range(2):
-import gc
import re
import sys
import operator
return (ver_maj, ver_min, ver_fix)
-def gc_collect():
- """
- gc.collect(), but more insisting.
- """
- for i in range(3):
- gc.collect()
-
-
-NO_COUNT_TYPES: Tuple[type, ...] = ()
-
-if sys.version_info[:2] == (3, 10):
- # On my laptop there are occasional creations of a single one of these objects
- # with empty content, which might be some Decimal caching.
- # Keeping the guard as strict as possible, to be extended if other types
- # or versions are necessary.
- try:
- from _contextvars import Context # type: ignore
- except ImportError:
- pass
- else:
- NO_COUNT_TYPES += (Context,)
-
-
-def gc_count() -> int:
- """
- len(gc.get_objects()), with subtleties.
- """
- if not NO_COUNT_TYPES:
- return len(gc.get_objects())
-
- # Note: not using a list comprehension because it pollutes the objects list.
- rv = 0
- for obj in gc.get_objects():
- if isinstance(obj, NO_COUNT_TYPES):
- continue
- rv += 1
-
- return rv
-
-
@contextmanager
def raiseif(cond, *args, **kwargs):
"""