]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Added a few concurrency tests
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 22 May 2020 09:03:58 +0000 (21:03 +1200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 22 May 2020 18:15:43 +0000 (06:15 +1200)
tests/conftest.py
tests/test_async_connection.py
tests/test_connection.py

index 24a74550064d0594f8bf29079b396a57f4a618cf..6598cabdf2f92fbf1fc6c69fea0c1df142107c91 100644 (file)
@@ -2,3 +2,10 @@ pytest_plugins = (
     "tests.fix_db",
     "tests.fix_pq",
 )
+
+
+def pytest_configure(config):
+    # register slow marker
+    config.addinivalue_line(
+        "markers", "slow: this test is kinda slow (skip with -m 'not slow')"
+    )
index dac3afbd05037772c619da047e31de1362260d28..fa84c2bdd301b9f738c77617ea54f33e1fbff20f 100644 (file)
@@ -1,7 +1,10 @@
 import gc
+import time
 import pytest
+import asyncio
 import logging
 import weakref
+from asyncio.queues import Queue
 
 import psycopg3
 from psycopg3 import AsyncConnection
@@ -72,6 +75,52 @@ async def test_rollback(aconn):
         await aconn.rollback()
 
 
+@pytest.mark.slow
+@pytest.mark.skip  # TODO: sometimes this test hangs?
+async def test_commit_concurrency(aconn):
+    # Check the condition reported in psycopg2#103
+    # Because of bad status check, we commit even when a commit is already on
+    # its way. We can detect this condition by the warnings.
+    notices = Queue()
+    aconn.add_notice_handler(
+        lambda diag: notices.put_nowait(diag.message_primary)
+    )
+    stop = False
+
+    async def committer():
+        nonlocal stop
+        while not stop:
+            await aconn.commit()
+
+    async def runner():
+        nonlocal stop
+        cur = aconn.cursor()
+        for i in range(1000):
+            await cur.execute("select %s;", (i,))
+            await aconn.commit()
+
+        # Stop the committer thread
+        stop = True
+
+    await asyncio.wait([committer(), runner()])
+    assert notices.empty(), "%d notices raised" % notices.qsize()
+
+
+@pytest.mark.slow
+async def test_concurrent_execution(dsn):
+    async def worker():
+        cnn = await psycopg3.AsyncConnection.connect(dsn)
+        cur = cnn.cursor()
+        await cur.execute("select pg_sleep(0.5)")
+        await cur.close()
+        await cnn.close()
+
+    workers = [worker(), worker()]
+    t0 = time.time()
+    await asyncio.wait(workers)
+    assert time.time() - t0 < 0.8, "something broken in concurrency"
+
+
 async def test_auto_transaction(aconn):
     aconn.pgconn.exec_(b"drop table if exists foo")
     aconn.pgconn.exec_(b"create table foo (id int primary key)")
index 67a450c15f2861783c59a7f583aeec285565e545..794295b55dd85892b87983f9ecb511284c3a371c 100644 (file)
@@ -1,7 +1,10 @@
 import gc
+import time
+import queue
 import pytest
 import logging
 import weakref
+import threading
 
 import psycopg3
 from psycopg3 import Connection
@@ -69,6 +72,52 @@ def test_rollback(conn):
         conn.rollback()
 
 
+@pytest.mark.slow
+def test_commit_concurrency(conn):
+    # Check the condition reported in psycopg2#103
+    # Because of bad status check, we commit even when a commit is already on
+    # its way. We can detect this condition by the warnings.
+    notices = queue.Queue()
+    conn.add_notice_handler(lambda diag: notices.put(diag.message_primary))
+    stop = False
+
+    def committer():
+        nonlocal stop
+        while not stop:
+            conn.commit()
+
+    cur = conn.cursor()
+    t1 = threading.Thread(target=committer)
+    t1.start()
+    for i in range(1000):
+        cur.execute("select %s;", (i,))
+        conn.commit()
+
+    # Stop the committer thread
+    stop = True
+
+    assert notices.empty(), "%d notices raised" % notices.qsize()
+
+
+@pytest.mark.slow
+def test_concurrent_execution(dsn):
+    def worker():
+        cnn = psycopg3.connect(dsn)
+        cur = cnn.cursor()
+        cur.execute("select pg_sleep(0.5)")
+        cur.close()
+        cnn.close()
+
+    t1 = threading.Thread(target=worker)
+    t2 = threading.Thread(target=worker)
+    t0 = time.time()
+    t1.start()
+    t2.start()
+    t1.join()
+    t2.join()
+    assert time.time() - t0 < 0.8, "something broken in concurrency"
+
+
 def test_auto_transaction(conn):
     conn.pgconn.exec_(b"drop table if exists foo")
     conn.pgconn.exec_(b"create table foo (id int primary key)")