from uuid import UUID
from random import choice, random, randrange
from decimal import Decimal
+from contextlib import contextmanager
from collections import deque
import pytest
import psycopg
from psycopg import sql
from psycopg.adapt import PyFormat
+from psycopg.compat import asynccontextmanager
from psycopg.types.range import Range
from psycopg.types.numeric import Int4, Int8
sql.SQL(", ").join(phs),
)
- def insert_field_stmt(self, i):
- ph = sql.Placeholder(format=self.format)
- return sql.SQL("insert into {} ({}) values ({})").format(
- self.table_name, self.fields_names[i], ph
- )
-
@property
def select_stmt(self):
fields = sql.SQL(", ").join(self.fields_names)
fields, self.table_name
)
+ @contextmanager
+ def find_insert_problem(self, conn):
+ """Context manager to help finding a problematic vaule."""
+ try:
+ yield
+ except psycopg.DatabaseError:
+ conn.rollback()
+ cur = conn.cursor()
+ # Repeat insert one field at time, until finding the wrong one
+ cur.execute(self.drop_stmt)
+ cur.execute(self.create_stmt)
+ for i, rec in enumerate(self.records):
+ for j, val in enumerate(rec):
+ try:
+ cur.execute(self._insert_field_stmt(j), (val,))
+ except psycopg.DatabaseError as e:
+ r = repr(val)
+ if len(r) > 200:
+ r = f"{r[:200]}... ({len(r)} chars)"
+ raise Exception(
+ f"value {r!r} at record {i} column0 {j}"
+ f" failed insert: {e}"
+ ) from None
+
+ # just in case, but hopefully we should have triggered the problem
+ raise
+
+ @asynccontextmanager
+ async def find_insert_problem_async(self, aconn):
+ try:
+ yield
+ except psycopg.DatabaseError:
+ await aconn.rollback()
+ acur = aconn.cursor()
+ # Repeat insert one field at time, until finding the wrong one
+ await acur.execute(self.drop_stmt)
+ await acur.execute(self.create_stmt)
+ for i, rec in enumerate(self.records):
+ for j, val in enumerate(rec):
+ try:
+ await acur.execute(self._insert_field_stmt(j), (val,))
+ except psycopg.DatabaseError as e:
+ r = repr(val)
+ if len(r) > 200:
+ r = f"{r[:200]}... ({len(r)} chars)"
+ raise Exception(
+ f"value {r!r} at record {i} column0 {j}"
+ f" failed insert: {e}"
+ ) from None
+
+ # just in case, but hopefully we should have triggered the problem
+ raise
+
+ def _insert_field_stmt(self, i):
+ ph = sql.Placeholder(format=self.format)
+ return sql.SQL("insert into {} ({}) values ({})").format(
+ self.table_name, self.fields_names[i], ph
+ )
+
def choose_schema(self, ncols=20):
schema = []
while len(schema) < ncols:
with conn.cursor(binary=fmt_out) as cur:
cur.execute(faker.drop_stmt)
cur.execute(faker.create_stmt)
- try:
+ with faker.find_insert_problem(conn):
cur.executemany(faker.insert_stmt, faker.records)
- except psycopg.DatabaseError:
- # Insert one by one to find problematic values
- conn.rollback()
- cur.execute(faker.drop_stmt)
- cur.execute(faker.create_stmt)
- for rec in faker.records:
- for i, val in enumerate(rec):
- cur.execute(faker.insert_field_stmt(i), (val,))
-
- # just in case, but hopefully we should have triggered the problem
- raise
cur.execute(faker.select_stmt)
recs = cur.fetchall()
with conn.cursor(binary=fmt) as cur:
cur.execute(faker.drop_stmt)
cur.execute(faker.create_stmt)
- cur.executemany(faker.insert_stmt, faker.records)
+ with faker.find_insert_problem(conn):
+ cur.executemany(faker.insert_stmt, faker.records)
stmt = sql.SQL(
"copy (select {} from {} order by id) to stdout (format {})"
async with conn.cursor(binary=fmt) as cur:
await cur.execute(faker.drop_stmt)
await cur.execute(faker.create_stmt)
- await cur.executemany(faker.insert_stmt, faker.records)
+ async with faker.find_insert_problem_async(conn):
+ await cur.executemany(faker.insert_stmt, faker.records)
stmt = sql.SQL(
"copy (select {} from {} order by id) to stdout (format {})"
faker.make_records(10)
row_factory = getattr(rows, row_factory)
+ def work():
+ with psycopg.connect(dsn) as conn:
+ with conn.cursor(binary=fmt_out, row_factory=row_factory) as cur:
+ cur.execute(faker.drop_stmt)
+ cur.execute(faker.create_stmt)
+ with faker.find_insert_problem(conn):
+ cur.executemany(faker.insert_stmt, faker.records)
+
+ cur.execute(faker.select_stmt)
+
+ if fetch == "one":
+ while 1:
+ tmp = cur.fetchone()
+ if tmp is None:
+ break
+ elif fetch == "many":
+ while 1:
+ tmp = cur.fetchmany(3)
+ if not tmp:
+ break
+ elif fetch == "all":
+ cur.fetchall()
+ elif fetch == "iter":
+ for rec in cur:
+ pass
+
for retry in retries:
with retry:
n = []
gc_collect()
for i in range(3):
- with psycopg.connect(dsn) as conn:
- with conn.cursor(
- binary=fmt_out, row_factory=row_factory
- ) as cur:
- cur.execute(faker.drop_stmt)
- cur.execute(faker.create_stmt)
- cur.executemany(faker.insert_stmt, faker.records)
- cur.execute(faker.select_stmt)
-
- if fetch == "one":
- while 1:
- tmp = cur.fetchone()
- if tmp is None:
- break
- elif fetch == "many":
- while 1:
- tmp = cur.fetchmany(3)
- if not tmp:
- break
- elif fetch == "all":
- cur.fetchall()
- elif fetch == "iter":
- for rec in cur:
- pass
-
- tmp = None
-
- del cur, conn
+ work()
gc_collect()
n.append(len(gc.get_objects()))
-
assert (
n[0] == n[1] == n[2]
), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
faker.make_records(10)
row_factory = getattr(rows, row_factory)
+ async def work():
+ async with await psycopg.AsyncConnection.connect(dsn) as conn:
+ async with conn.cursor(
+ binary=fmt_out, row_factory=row_factory
+ ) as cur:
+ await cur.execute(faker.drop_stmt)
+ await cur.execute(faker.create_stmt)
+ async with faker.find_insert_problem_async(conn):
+ await cur.executemany(faker.insert_stmt, faker.records)
+ await cur.execute(faker.select_stmt)
+
+ if fetch == "one":
+ while 1:
+ tmp = await cur.fetchone()
+ if tmp is None:
+ break
+ elif fetch == "many":
+ while 1:
+ tmp = await cur.fetchmany(3)
+ if not tmp:
+ break
+ elif fetch == "all":
+ await cur.fetchall()
+ elif fetch == "iter":
+ async for rec in cur:
+ pass
+
async for retry in retries:
with retry:
n = []
gc_collect()
for i in range(3):
- async with await psycopg.AsyncConnection.connect(dsn) as conn:
- async with conn.cursor(
- binary=fmt_out, row_factory=row_factory
- ) as cur:
- await cur.execute(faker.drop_stmt)
- await cur.execute(faker.create_stmt)
- await cur.executemany(faker.insert_stmt, faker.records)
- await cur.execute(faker.select_stmt)
-
- if fetch == "one":
- while 1:
- tmp = await cur.fetchone()
- if tmp is None:
- break
- elif fetch == "many":
- while 1:
- tmp = await cur.fetchmany(3)
- if not tmp:
- break
- elif fetch == "all":
- await cur.fetchall()
- elif fetch == "iter":
- async for rec in cur:
- pass
-
- tmp = None
-
- del cur, conn
+ await work()
gc_collect()
n.append(len(gc.get_objects()))