fields, self.table_name
)
- def choose_schema(self, types=None, nfields=20):
+ def choose_schema(self, types=None, ncols=20):
if not types:
types = self.get_supported_types()
types_list = sorted(types, key=lambda cls: cls.__name__)
- schema = [choice(types_list) for i in range(nfields)]
+ schema = [choice(types_list) for i in range(ncols)]
for i, cls in enumerate(schema):
# choose the type of the array
if cls is list:
break
schema[i] = [scls]
elif cls is tuple:
- schema[i] = tuple(
- self.choose_schema(types=types, nfields=nfields)
- )
+ schema[i] = tuple(self.choose_schema(types=types, ncols=ncols))
return schema
@pytest.mark.slow
@pytest.mark.parametrize("fmt", [Format.AUTO, Format.TEXT, Format.BINARY])
-def test_leak_fetchall(dsn, faker, fmt):
+@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
+def test_leak(dsn, faker, fmt, fetch):
if fmt != Format.BINARY:
pytest.xfail("faker to extend to all text dumpers")
faker.format = fmt
- faker.choose_schema()
- faker.make_records(100)
+ faker.choose_schema(ncols=5)
+ faker.make_records(10)
n = []
for i in range(3):
cur.execute(faker.create_stmt)
cur.executemany(faker.insert_stmt, faker.records)
cur.execute(faker.select_stmt)
- for got, want in zip(cur.fetchall(), faker.records):
+
+ recs = []
+ if fetch == "one":
+ while 1:
+ tmp = cur.fetchone()
+ if tmp is None:
+ break
+ recs.append(tmp)
+ elif fetch == "many":
+ while 1:
+ tmp = cur.fetchmany(3)
+ if not tmp:
+ break
+ recs.extend(tmp)
+ elif fetch == "all":
+ recs.extend(cur.fetchall())
+ elif fetch == "iter":
+ for rec in cur:
+ recs.append(rec)
+
+ for got, want in zip(recs, faker.records):
faker.assert_record(got, want)
+
+ recs = tmp = None
+
del cur, conn
gc.collect()
gc.collect()