"tests.fix_db",
"tests.fix_pq",
)
+
+
+def pytest_configure(config):
+ # register slow marker
+ config.addinivalue_line(
+ "markers", "slow: this test is kinda slow (skip with -m 'not slow')"
+ )
import gc
+import time
import pytest
+import asyncio
import logging
import weakref
+from asyncio.queues import Queue
import psycopg3
from psycopg3 import AsyncConnection
await aconn.rollback()
+@pytest.mark.slow
+@pytest.mark.skip # TODO: sometimes this test hangs?
+async def test_commit_concurrency(aconn):
+ # Check the condition reported in psycopg2#103
+ # Because of bad status check, we commit even when a commit is already on
+ # its way. We can detect this condition by the warnings.
+ notices = Queue()
+ aconn.add_notice_handler(
+ lambda diag: notices.put_nowait(diag.message_primary)
+ )
+ stop = False
+
+ async def committer():
+ nonlocal stop
+ while not stop:
+ await aconn.commit()
+
+ async def runner():
+ nonlocal stop
+ cur = aconn.cursor()
+ for i in range(1000):
+ await cur.execute("select %s;", (i,))
+ await aconn.commit()
+
+ # Stop the committer thread
+ stop = True
+
+ await asyncio.wait([committer(), runner()])
+ assert notices.empty(), "%d notices raised" % notices.qsize()
+
+
+@pytest.mark.slow
+async def test_concurrent_execution(dsn):
+ async def worker():
+ cnn = await psycopg3.AsyncConnection.connect(dsn)
+ cur = cnn.cursor()
+ await cur.execute("select pg_sleep(0.5)")
+ await cur.close()
+ await cnn.close()
+
+ workers = [worker(), worker()]
+ t0 = time.time()
+ await asyncio.wait(workers)
+ assert time.time() - t0 < 0.8, "something broken in concurrency"
+
+
async def test_auto_transaction(aconn):
aconn.pgconn.exec_(b"drop table if exists foo")
aconn.pgconn.exec_(b"create table foo (id int primary key)")
import gc
+import time
+import queue
import pytest
import logging
import weakref
+import threading
import psycopg3
from psycopg3 import Connection
conn.rollback()
+@pytest.mark.slow
+def test_commit_concurrency(conn):
+ # Check the condition reported in psycopg2#103
+ # Because of bad status check, we commit even when a commit is already on
+ # its way. We can detect this condition by the warnings.
+ notices = queue.Queue()
+ conn.add_notice_handler(lambda diag: notices.put(diag.message_primary))
+ stop = False
+
+ def committer():
+ nonlocal stop
+ while not stop:
+ conn.commit()
+
+ cur = conn.cursor()
+ t1 = threading.Thread(target=committer)
+ t1.start()
+ for i in range(1000):
+ cur.execute("select %s;", (i,))
+ conn.commit()
+
+ # Stop the committer thread
+ stop = True
+
+ assert notices.empty(), "%d notices raised" % notices.qsize()
+
+
+@pytest.mark.slow
+def test_concurrent_execution(dsn):
+ def worker():
+ cnn = psycopg3.connect(dsn)
+ cur = cnn.cursor()
+ cur.execute("select pg_sleep(0.5)")
+ cur.close()
+ cnn.close()
+
+ t1 = threading.Thread(target=worker)
+ t2 = threading.Thread(target=worker)
+ t0 = time.time()
+ t1.start()
+ t2.start()
+ t1.join()
+ t2.join()
+ assert time.time() - t0 < 0.8, "something broken in concurrency"
+
+
def test_auto_transaction(conn):
conn.pgconn.exec_(b"drop table if exists foo")
conn.pgconn.exec_(b"create table foo (id int primary key)")