From: Daniele Varrazzo Date: Fri, 22 May 2020 09:03:58 +0000 (+1200) Subject: Added a few concurrency tests X-Git-Tag: 3.0.dev0~498 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=5ab888c016997131f1cfe55b2258ddb63d048d59;p=thirdparty%2Fpsycopg.git Added a few concurrency tests --- diff --git a/tests/conftest.py b/tests/conftest.py index 24a745500..6598cabdf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,3 +2,10 @@ pytest_plugins = ( "tests.fix_db", "tests.fix_pq", ) + + +def pytest_configure(config): + # register slow marker + config.addinivalue_line( + "markers", "slow: this test is kinda slow (skip with -m 'not slow')" + ) diff --git a/tests/test_async_connection.py b/tests/test_async_connection.py index dac3afbd0..fa84c2bdd 100644 --- a/tests/test_async_connection.py +++ b/tests/test_async_connection.py @@ -1,7 +1,10 @@ import gc +import time import pytest +import asyncio import logging import weakref +from asyncio.queues import Queue import psycopg3 from psycopg3 import AsyncConnection @@ -72,6 +75,52 @@ async def test_rollback(aconn): await aconn.rollback() +@pytest.mark.slow +@pytest.mark.skip # TODO: sometimes this test hangs? +async def test_commit_concurrency(aconn): + # Check the condition reported in psycopg2#103 + # Because of bad status check, we commit even when a commit is already on + # its way. We can detect this condition by the warnings. + notices = Queue() + aconn.add_notice_handler( + lambda diag: notices.put_nowait(diag.message_primary) + ) + stop = False + + async def committer(): + nonlocal stop + while not stop: + await aconn.commit() + + async def runner(): + nonlocal stop + cur = aconn.cursor() + for i in range(1000): + await cur.execute("select %s;", (i,)) + await aconn.commit() + + # Stop the committer thread + stop = True + + await asyncio.wait([committer(), runner()]) + assert notices.empty(), "%d notices raised" % notices.qsize() + + +@pytest.mark.slow +async def test_concurrent_execution(dsn): + async def worker(): + cnn = await psycopg3.AsyncConnection.connect(dsn) + cur = cnn.cursor() + await cur.execute("select pg_sleep(0.5)") + await cur.close() + await cnn.close() + + workers = [worker(), worker()] + t0 = time.time() + await asyncio.wait(workers) + assert time.time() - t0 < 0.8, "something broken in concurrency" + + async def test_auto_transaction(aconn): aconn.pgconn.exec_(b"drop table if exists foo") aconn.pgconn.exec_(b"create table foo (id int primary key)") diff --git a/tests/test_connection.py b/tests/test_connection.py index 67a450c15..794295b55 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,7 +1,10 @@ import gc +import time +import queue import pytest import logging import weakref +import threading import psycopg3 from psycopg3 import Connection @@ -69,6 +72,52 @@ def test_rollback(conn): conn.rollback() +@pytest.mark.slow +def test_commit_concurrency(conn): + # Check the condition reported in psycopg2#103 + # Because of bad status check, we commit even when a commit is already on + # its way. We can detect this condition by the warnings. + notices = queue.Queue() + conn.add_notice_handler(lambda diag: notices.put(diag.message_primary)) + stop = False + + def committer(): + nonlocal stop + while not stop: + conn.commit() + + cur = conn.cursor() + t1 = threading.Thread(target=committer) + t1.start() + for i in range(1000): + cur.execute("select %s;", (i,)) + conn.commit() + + # Stop the committer thread + stop = True + + assert notices.empty(), "%d notices raised" % notices.qsize() + + +@pytest.mark.slow +def test_concurrent_execution(dsn): + def worker(): + cnn = psycopg3.connect(dsn) + cur = cnn.cursor() + cur.execute("select pg_sleep(0.5)") + cur.close() + cnn.close() + + t1 = threading.Thread(target=worker) + t2 = threading.Thread(target=worker) + t0 = time.time() + t1.start() + t2.start() + t1.join() + t2.join() + assert time.time() - t0 < 0.8, "something broken in concurrency" + + def test_auto_transaction(conn): conn.pgconn.exec_(b"drop table if exists foo") conn.pgconn.exec_(b"create table foo (id int primary key)")