Psycopg 3.1.12 (unreleased)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
+- Fix hanging if an async connection is closed while querying (:ticket:`#608`).
- Fix memory leak when `~register_*()` functions are called repeatedly
(:ticket:`#647`).
if self.closed:
return
self._closed = True
+
+ # TODO: maybe send a cancel on close, if the connection is ACTIVE?
+
self.pgconn.finish()
@overload
if self.closed:
return
self._closed = True
+
+ # TODO: maybe send a cancel on close, if the connection is ACTIVE?
+
self.pgconn.finish()
@overload
assert pipeline is self._pipeline
self._pipeline = None
- async def wait(self, gen: PQGen[RV]) -> RV:
+ async def wait(self, gen: PQGen[RV], timeout: Optional[float] = 0.1) -> RV:
try:
- return await waiting.wait_async(gen, self.pgconn.socket)
+ return await waiting.wait_async(gen, self.pgconn.socket, timeout=timeout)
except (asyncio.CancelledError, KeyboardInterrupt):
# On Ctrl-C, try to cancel the query in the server, otherwise
# the connection will remain stuck in ACTIVE state.
self._try_cancel(self.pgconn)
try:
- await waiting.wait_async(gen, self.pgconn.socket)
+ await waiting.wait_async(gen, self.pgconn.socket, timeout=timeout)
except e.QueryCanceled:
pass # as expected
raise
return rv
-async def wait_async(gen: PQGen[RV], fileno: int) -> RV:
+async def wait_async(
+ gen: PQGen[RV], fileno: int, timeout: Optional[float] = None
+) -> RV:
"""
Coroutine waiting for a generator to complete.
if writer:
loop.add_writer(fileno, wakeup, READY_W)
try:
- await ev.wait()
+ if timeout is None:
+ await ev.wait()
+ else:
+ try:
+ await wait_for(ev.wait(), timeout)
+ except TimeoutError:
+ pass
finally:
if reader:
loop.remove_reader(fileno)
env["PYTHONFAULTHANDLER"] = "1"
out = sp.check_output([sys.executable, "-s", "-c", script], env=env)
assert out.decode().rstrip() == "[1, 1]"
+
+
+@pytest.mark.slow
+@pytest.mark.crdb("skip")
+@pytest.mark.skipif(
+ sys.platform == "win32",
+ reason="Fails with: An operation was attempted on something that is not a socket",
+)
+def test_concurrent_close(dsn, conn):
+ # Verify something similar to the problem in #608, which doesn't affect
+ # sync connections anyway.
+ pid = conn.info.backend_pid
+ conn.autocommit = True
+
+ def worker():
+ try:
+ conn.execute("select pg_sleep(3)")
+ except psycopg.OperationalError:
+ pass # expected
+
+ t0 = time.time()
+ th = threading.Thread(target=worker)
+ th.start()
+ time.sleep(0.5)
+ with psycopg.connect(dsn, autocommit=True) as conn1:
+ cur = conn1.execute("select query from pg_stat_activity where pid = %s", [pid])
+ assert cur.fetchone()
+ conn.close()
+ th.join()
+ time.sleep(0.5)
+ t = time.time()
+ # TODO: this check can pass if we issue a cancel on close, which is
+ # a change in behaviour to be considered better.
+ # cur = conn1.execute(
+ # "select query from pg_stat_activity where pid = %s",
+ # [pid],
+ # )
+ # assert not cur.fetchone()
+ assert t - t0 < 2
t1 = time.time()
assert t1 - t0 < 1.0
+
+
+@pytest.mark.slow
+@pytest.mark.crdb("skip")
+@pytest.mark.skipif(
+ sys.platform == "win32",
+ reason="Fails with: An operation was attempted on something that is not a socket",
+)
+async def test_concurrent_close(dsn, aconn):
+ # Test issue #608: concurrent closing shouldn't hang the server
+ # (although, at the moment, it doesn't cancel a running query).
+ pid = aconn.info.backend_pid
+ await aconn.set_autocommit(True)
+
+ async def worker():
+ try:
+ await aconn.execute("select pg_sleep(3)")
+ except psycopg.OperationalError:
+ pass # expected
+
+ t0 = time.time()
+ task = create_task(worker())
+ await asyncio.sleep(0.5)
+
+ async def test():
+ async with await psycopg.AsyncConnection.connect(dsn, autocommit=True) as conn1:
+ cur = await conn1.execute(
+ "select query from pg_stat_activity where pid = %s", [pid]
+ )
+ assert await cur.fetchone()
+ await aconn.close()
+ await asyncio.gather(task)
+ await asyncio.sleep(0.5)
+ t = time.time()
+ # TODO: this statement can pass only if we send cancel on close
+ # but because async cancelling is not available in the libpq,
+ # we'd rather not do it.
+ # cur = await conn1.execute(
+ # "select query from pg_stat_activity where pid = %s", [pid]
+ # )
+ # assert not await cur.fetchone()
+ assert t - t0 < 2
+
+ await asyncio.wait_for(test(), 5.0)