with pool.ConnectionPool(dsn, min_size=2) as p:
with p.connection() as conn:
with conn.execute("select pg_backend_pid()") as cur:
- (pid1,) = cur.fetchone()
+ (pid1,) = cur.fetchone() # type: ignore[misc]
with p.connection() as conn2:
with conn2.execute("select pg_backend_pid()") as cur:
- (pid2,) = cur.fetchone()
+ (pid2,) = cur.fetchone() # type: ignore[misc]
with p.connection() as conn:
assert conn.pgconn.backend_pid in (pid1, pid2)
with p.connection() as conn:
assert inits == 1
res = conn.execute("show default_transaction_read_only")
- assert res.fetchone()[0] == "on"
+ assert res.fetchone()[0] == "on" # type: ignore[index]
with p.connection() as conn:
assert inits == 1
res = conn.execute("show default_transaction_read_only")
- assert res.fetchone()[0] == "on"
+ assert res.fetchone()[0] == "on" # type: ignore[index]
conn.close()
with p.connection() as conn:
assert inits == 2
res = conn.execute("show default_transaction_read_only")
- assert res.fetchone()[0] == "on"
+ assert res.fetchone()[0] == "on" # type: ignore[index]
@pytest.mark.slow
with p.connection() as conn:
(pid,) = conn.execute(
"select pg_backend_pid() from pg_sleep(0.2)"
- ).fetchone()
+ ).fetchone() # type: ignore[misc]
t1 = time()
results.append((n, t1 - t0, pid))
t0 = time()
try:
with p.connection() as conn:
- (pid,) = conn.execute(
+ (pid,) = conn.execute( # type: ignore[misc]
"select pg_backend_pid() from pg_sleep(0.2)"
).fetchone()
except pool.PoolTimeout as e:
timeout = 0.25 if n == 3 else None
try:
with p.connection(timeout=timeout) as conn:
- (pid,) = conn.execute(
+ (pid,) = conn.execute( # type: ignore[misc]
"select pg_backend_pid() from pg_sleep(0.2)"
).fetchone()
except pool.PoolTimeout as e:
with pool.ConnectionPool(dsn, min_size=1) as p:
with p.connection() as conn:
with conn.execute("select pg_backend_pid()") as cur:
- (pid1,) = cur.fetchone()
+ (pid1,) = cur.fetchone() # type: ignore[misc]
conn.close()
with p.connection() as conn2:
with conn2.execute("select pg_backend_pid()") as cur:
- (pid2,) = cur.fetchone()
+ (pid2,) = cur.fetchone() # type: ignore[misc]
assert pid1 != pid2
def test_closed_queue(dsn, retries):
def w1():
with p.connection() as conn:
- assert (
- conn.execute("select 1 from pg_sleep(0.2)").fetchone()[0] == 1
- )
+ cur = conn.execute("select 1 from pg_sleep(0.2)")
+ assert cur.fetchone()[0] == 1 # type: ignore[index]
success.append("w1")
def w2():
async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
async with p.connection() as conn:
cur = await conn.execute("select pg_backend_pid()")
- (pid1,) = await cur.fetchone()
+ (pid1,) = await cur.fetchone() # type: ignore[misc]
async with p.connection() as conn2:
cur = await conn2.execute("select pg_backend_pid()")
- (pid2,) = await cur.fetchone()
+ (pid2,) = await cur.fetchone() # type: ignore[misc]
async with p.connection() as conn:
assert conn.pgconn.backend_pid in (pid1, pid2)
async with p.connection() as conn:
assert inits == 1
res = await conn.execute("show default_transaction_read_only")
- assert (await res.fetchone())[0] == "on"
+ assert (await res.fetchone())[0] == "on" # type: ignore[index]
async with p.connection() as conn:
assert inits == 1
res = await conn.execute("show default_transaction_read_only")
- assert (await res.fetchone())[0] == "on"
+ assert (await res.fetchone())[0] == "on" # type: ignore[index]
await conn.close()
async with p.connection() as conn:
assert inits == 2
res = await conn.execute("show default_transaction_read_only")
- assert (await res.fetchone())[0] == "on"
+ assert (await res.fetchone())[0] == "on" # type: ignore[index]
@pytest.mark.slow
cur = await conn.execute(
"select pg_backend_pid() from pg_sleep(0.2)"
)
- (pid,) = await cur.fetchone()
+ (pid,) = await cur.fetchone() # type: ignore[misc]
t1 = time()
results.append((n, t1 - t0, pid))
cur = await conn.execute(
"select pg_backend_pid() from pg_sleep(0.2)"
)
- (pid,) = await cur.fetchone()
+ (pid,) = await cur.fetchone() # type: ignore[misc]
except pool.PoolTimeout as e:
t1 = time()
errors.append((n, t1 - t0, e))
cur = await conn.execute(
"select pg_backend_pid() from pg_sleep(0.2)"
)
- (pid,) = await cur.fetchone()
+ (pid,) = await cur.fetchone() # type: ignore[misc]
except pool.PoolTimeout as e:
t1 = time()
errors.append((n, t1 - t0, e))
async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
async with p.connection() as conn:
cur = await conn.execute("select pg_backend_pid()")
- (pid1,) = await cur.fetchone()
+ (pid1,) = await cur.fetchone() # type: ignore[misc]
await conn.close()
async with p.connection() as conn2:
cur = await conn2.execute("select pg_backend_pid()")
- (pid2,) = await cur.fetchone()
+ (pid2,) = await cur.fetchone() # type: ignore[misc]
assert pid1 != pid2