]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Moving concurrency tests in their own files
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 23 May 2020 04:15:08 +0000 (16:15 +1200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 23 May 2020 04:15:08 +0000 (16:15 +1200)
These tests are verbose, slow, and don't test the interface, rather the
behaviour, in peculiar cases.

Some test modules names moved to keep related tested objects together.

tests/test_concurrency.py [new file with mode: 0644]
tests/test_concurrency_async.py [new file with mode: 0644]
tests/test_connection.py
tests/test_connection_async.py [moved from tests/test_async_connection.py with 87% similarity]
tests/test_cursor_async.py [moved from tests/test_async_cursor.py with 100% similarity]

diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py
new file mode 100644 (file)
index 0000000..66cdd9c
--- /dev/null
@@ -0,0 +1,56 @@
+"""
+Tests dealing with concurrency issues.
+"""
+
+import time
+import queue
+import pytest
+import threading
+
+import psycopg3
+
+
+@pytest.mark.slow
+def test_concurrent_execution(dsn):
+    def worker():
+        cnn = psycopg3.connect(dsn)
+        cur = cnn.cursor()
+        cur.execute("select pg_sleep(0.5)")
+        cur.close()
+        cnn.close()
+
+    t1 = threading.Thread(target=worker)
+    t2 = threading.Thread(target=worker)
+    t0 = time.time()
+    t1.start()
+    t2.start()
+    t1.join()
+    t2.join()
+    assert time.time() - t0 < 0.8, "something broken in concurrency"
+
+
+@pytest.mark.slow
+def test_commit_concurrency(conn):
+    # Check the condition reported in psycopg2#103
+    # Because of bad status check, we commit even when a commit is already on
+    # its way. We can detect this condition by the warnings.
+    notices = queue.Queue()
+    conn.add_notice_handler(lambda diag: notices.put(diag.message_primary))
+    stop = False
+
+    def committer():
+        nonlocal stop
+        while not stop:
+            conn.commit()
+
+    cur = conn.cursor()
+    t1 = threading.Thread(target=committer)
+    t1.start()
+    for i in range(1000):
+        cur.execute("select %s;", (i,))
+        conn.commit()
+
+    # Stop the committer thread
+    stop = True
+
+    assert notices.empty(), "%d notices raised" % notices.qsize()
diff --git a/tests/test_concurrency_async.py b/tests/test_concurrency_async.py
new file mode 100644 (file)
index 0000000..c916674
--- /dev/null
@@ -0,0 +1,54 @@
+import time
+import pytest
+import asyncio
+from asyncio.queues import Queue
+
+import psycopg3
+
+pytestmark = pytest.mark.asyncio
+
+
+@pytest.mark.slow
+@pytest.mark.skip  # TODO: sometimes this test hangs?
+async def test_commit_concurrency(aconn):
+    # Check the condition reported in psycopg2#103
+    # Because of bad status check, we commit even when a commit is already on
+    # its way. We can detect this condition by the warnings.
+    notices = Queue()
+    aconn.add_notice_handler(
+        lambda diag: notices.put_nowait(diag.message_primary)
+    )
+    stop = False
+
+    async def committer():
+        nonlocal stop
+        while not stop:
+            await aconn.commit()
+
+    async def runner():
+        nonlocal stop
+        cur = aconn.cursor()
+        for i in range(1000):
+            await cur.execute("select %s;", (i,))
+            await aconn.commit()
+
+        # Stop the committer thread
+        stop = True
+
+    await asyncio.wait([committer(), runner()])
+    assert notices.empty(), "%d notices raised" % notices.qsize()
+
+
+@pytest.mark.slow
+async def test_concurrent_execution(dsn):
+    async def worker():
+        cnn = await psycopg3.AsyncConnection.connect(dsn)
+        cur = cnn.cursor()
+        await cur.execute("select pg_sleep(0.5)")
+        await cur.close()
+        await cnn.close()
+
+    workers = [worker(), worker()]
+    t0 = time.time()
+    await asyncio.wait(workers)
+    assert time.time() - t0 < 0.8, "something broken in concurrency"
index 0a83d2f9affda8bab66e050c83780107544ded99..7bcaa60c09e3b1b5db85af816d8a1105f9c8838f 100644 (file)
@@ -1,10 +1,7 @@
 import gc
-import time
-import queue
 import pytest
 import logging
 import weakref
-import threading
 
 import psycopg3
 from psycopg3 import Connection
@@ -80,52 +77,6 @@ def test_rollback(conn):
         conn.rollback()
 
 
-@pytest.mark.slow
-def test_commit_concurrency(conn):
-    # Check the condition reported in psycopg2#103
-    # Because of bad status check, we commit even when a commit is already on
-    # its way. We can detect this condition by the warnings.
-    notices = queue.Queue()
-    conn.add_notice_handler(lambda diag: notices.put(diag.message_primary))
-    stop = False
-
-    def committer():
-        nonlocal stop
-        while not stop:
-            conn.commit()
-
-    cur = conn.cursor()
-    t1 = threading.Thread(target=committer)
-    t1.start()
-    for i in range(1000):
-        cur.execute("select %s;", (i,))
-        conn.commit()
-
-    # Stop the committer thread
-    stop = True
-
-    assert notices.empty(), "%d notices raised" % notices.qsize()
-
-
-@pytest.mark.slow
-def test_concurrent_execution(dsn):
-    def worker():
-        cnn = psycopg3.connect(dsn)
-        cur = cnn.cursor()
-        cur.execute("select pg_sleep(0.5)")
-        cur.close()
-        cnn.close()
-
-    t1 = threading.Thread(target=worker)
-    t2 = threading.Thread(target=worker)
-    t0 = time.time()
-    t1.start()
-    t2.start()
-    t1.join()
-    t2.join()
-    assert time.time() - t0 < 0.8, "something broken in concurrency"
-
-
 def test_auto_transaction(conn):
     conn.pgconn.exec_(b"drop table if exists foo")
     conn.pgconn.exec_(b"create table foo (id int primary key)")
similarity index 87%
rename from tests/test_async_connection.py
rename to tests/test_connection_async.py
index 4ea0ee0688158bc78436618ffda3230c245ba205..585cb77385be74a4b64f2356f94f710e53777436 100644 (file)
@@ -1,10 +1,7 @@
 import gc
-import time
 import pytest
-import asyncio
 import logging
 import weakref
-from asyncio.queues import Queue
 
 import psycopg3
 from psycopg3 import AsyncConnection
@@ -83,52 +80,6 @@ async def test_rollback(aconn):
         await aconn.rollback()
 
 
-@pytest.mark.slow
-@pytest.mark.skip  # TODO: sometimes this test hangs?
-async def test_commit_concurrency(aconn):
-    # Check the condition reported in psycopg2#103
-    # Because of bad status check, we commit even when a commit is already on
-    # its way. We can detect this condition by the warnings.
-    notices = Queue()
-    aconn.add_notice_handler(
-        lambda diag: notices.put_nowait(diag.message_primary)
-    )
-    stop = False
-
-    async def committer():
-        nonlocal stop
-        while not stop:
-            await aconn.commit()
-
-    async def runner():
-        nonlocal stop
-        cur = aconn.cursor()
-        for i in range(1000):
-            await cur.execute("select %s;", (i,))
-            await aconn.commit()
-
-        # Stop the committer thread
-        stop = True
-
-    await asyncio.wait([committer(), runner()])
-    assert notices.empty(), "%d notices raised" % notices.qsize()
-
-
-@pytest.mark.slow
-async def test_concurrent_execution(dsn):
-    async def worker():
-        cnn = await psycopg3.AsyncConnection.connect(dsn)
-        cur = cnn.cursor()
-        await cur.execute("select pg_sleep(0.5)")
-        await cur.close()
-        await cnn.close()
-
-    workers = [worker(), worker()]
-    t0 = time.time()
-    await asyncio.wait(workers)
-    assert time.time() - t0 < 0.8, "something broken in concurrency"
-
-
 async def test_auto_transaction(aconn):
     aconn.pgconn.exec_(b"drop table if exists foo")
     aconn.pgconn.exec_(b"create table foo (id int primary key)")