-import gc
import os
import ctypes
import logging
from psycopg3 import pq
import psycopg3.generators
+from ..utils import gc_collect
+
def test_connectdb(dsn):
conn = pq.PGconn.connect(dsn.encode("utf8"))
w = weakref.ref(conn)
conn.finish()
del conn
- gc.collect()
+ gc_collect()
assert w() is None
-import gc
import sys
import time
import socket
from psycopg3.rows import tuple_row
from psycopg3.errors import UndefinedTable
from psycopg3.conninfo import conninfo_to_dict
+
+from .utils import gc_collect
from .test_cursor import my_row_factory
w = weakref.ref(conn)
conn.close()
del conn
- gc.collect()
+ gc_collect()
assert w() is None
-import gc
import time
import socket
import pytest
from psycopg3.rows import tuple_row
from psycopg3.errors import UndefinedTable
from psycopg3.conninfo import conninfo_to_dict
+
+from .utils import gc_collect
from .test_cursor import my_row_factory
pytestmark = pytest.mark.asyncio
w = weakref.ref(conn)
await conn.close()
del conn
- gc.collect()
+ gc_collect()
assert w() is None
from psycopg3.adapt import Format as PgFormat
from psycopg3.types import Int4
+from .utils import gc_collect
+
eur = "\u20ac"
sample_records = [(Int4(10), Int4(20), "hello"), (Int4(40), None, "world")]
n = []
for i in range(3):
work()
- gc.collect()
- gc.collect()
+ gc_collect()
n.append(len(gc.get_objects()))
assert (
n = []
for i in range(3):
work()
- gc.collect()
- gc.collect()
+ gc_collect()
n.append(len(gc.get_objects()))
assert (
from psycopg3.pq import Format
from psycopg3.adapt import Format as PgFormat
+from .utils import gc_collect
from .test_copy import sample_text, sample_binary, sample_binary_rows # noqa
from .test_copy import eur, sample_values, sample_records, sample_tabledef
from .test_copy import py_to_raw
n = []
for i in range(3):
await work()
- gc.collect()
- gc.collect()
+ gc_collect()
n.append(len(gc.get_objects()))
assert (
n = []
for i in range(3):
await work()
- gc.collect()
- gc.collect()
+ gc_collect()
n.append(len(gc.get_objects()))
assert (
from psycopg3.oids import postgres_types as builtins
from psycopg3.adapt import Format
+from .utils import gc_collect
+
def test_close(conn):
cur = conn.cursor()
w = weakref.ref(cur)
cur.close()
del cur
- gc.collect()
+ gc_collect()
assert w() is None
tmp = None
del cur, conn
- gc.collect()
- gc.collect()
+ gc_collect()
n.append(len(gc.get_objects()))
assert (
import psycopg3
from psycopg3 import sql, rows
from psycopg3.adapt import Format
+
+from .utils import gc_collect
from .test_cursor import my_row_factory
pytestmark = pytest.mark.asyncio
w = weakref.ref(cur)
await cur.close()
del cur
- gc.collect()
+ gc_collect()
assert w() is None
tmp = None
del cur, conn
- gc.collect()
- gc.collect()
+ gc_collect()
n.append(len(gc.get_objects()))
assert (
-import gc
import pickle
from weakref import ref
from psycopg3 import pq
from psycopg3 import errors as e
+from .utils import gc_collect
+
eur = "\u20ac"
del exc
w = ref(cur)
del cur
- gc.collect()
+ gc_collect()
assert w() is None
assert diag.sqlstate == "42P01"
+import gc
import re
import operator
f"skipping test: {whose_version} version is {'.'.join(map(str, got))}"
f" {revops[m.group(1)]} {'.'.join(map(str, want))}"
)
+
+
+def gc_collect():
+ """
+ gc.collect(), but more insisting.
+ """
+ for i in range(3):
+ gc.collect()