From: Daniele Varrazzo Date: Sat, 23 May 2020 04:15:08 +0000 (+1200) Subject: Moving concurrency tests in their own files X-Git-Tag: 3.0.dev0~493 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=267e7154ce3b8fcc309ff7a8989d0d63c6a1f4f4;p=thirdparty%2Fpsycopg.git Moving concurrency tests in their own files These tests are verbose, slow, and don't test the interface, rather the behaviour, in peculiar cases. Some test modules names moved to keep related tested objects together. --- diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py new file mode 100644 index 000000000..66cdd9c6e --- /dev/null +++ b/tests/test_concurrency.py @@ -0,0 +1,56 @@ +""" +Tests dealing with concurrency issues. +""" + +import time +import queue +import pytest +import threading + +import psycopg3 + + +@pytest.mark.slow +def test_concurrent_execution(dsn): + def worker(): + cnn = psycopg3.connect(dsn) + cur = cnn.cursor() + cur.execute("select pg_sleep(0.5)") + cur.close() + cnn.close() + + t1 = threading.Thread(target=worker) + t2 = threading.Thread(target=worker) + t0 = time.time() + t1.start() + t2.start() + t1.join() + t2.join() + assert time.time() - t0 < 0.8, "something broken in concurrency" + + +@pytest.mark.slow +def test_commit_concurrency(conn): + # Check the condition reported in psycopg2#103 + # Because of bad status check, we commit even when a commit is already on + # its way. We can detect this condition by the warnings. + notices = queue.Queue() + conn.add_notice_handler(lambda diag: notices.put(diag.message_primary)) + stop = False + + def committer(): + nonlocal stop + while not stop: + conn.commit() + + cur = conn.cursor() + t1 = threading.Thread(target=committer) + t1.start() + for i in range(1000): + cur.execute("select %s;", (i,)) + conn.commit() + + # Stop the committer thread + stop = True + + assert notices.empty(), "%d notices raised" % notices.qsize() diff --git a/tests/test_concurrency_async.py b/tests/test_concurrency_async.py new file mode 100644 index 000000000..c91667432 --- /dev/null +++ b/tests/test_concurrency_async.py @@ -0,0 +1,54 @@ +import time +import pytest +import asyncio +from asyncio.queues import Queue + +import psycopg3 + +pytestmark = pytest.mark.asyncio + + +@pytest.mark.slow +@pytest.mark.skip # TODO: sometimes this test hangs? +async def test_commit_concurrency(aconn): + # Check the condition reported in psycopg2#103 + # Because of bad status check, we commit even when a commit is already on + # its way. We can detect this condition by the warnings. + notices = Queue() + aconn.add_notice_handler( + lambda diag: notices.put_nowait(diag.message_primary) + ) + stop = False + + async def committer(): + nonlocal stop + while not stop: + await aconn.commit() + + async def runner(): + nonlocal stop + cur = aconn.cursor() + for i in range(1000): + await cur.execute("select %s;", (i,)) + await aconn.commit() + + # Stop the committer thread + stop = True + + await asyncio.wait([committer(), runner()]) + assert notices.empty(), "%d notices raised" % notices.qsize() + + +@pytest.mark.slow +async def test_concurrent_execution(dsn): + async def worker(): + cnn = await psycopg3.AsyncConnection.connect(dsn) + cur = cnn.cursor() + await cur.execute("select pg_sleep(0.5)") + await cur.close() + await cnn.close() + + workers = [worker(), worker()] + t0 = time.time() + await asyncio.wait(workers) + assert time.time() - t0 < 0.8, "something broken in concurrency" diff --git a/tests/test_connection.py b/tests/test_connection.py index 0a83d2f9a..7bcaa60c0 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,10 +1,7 @@ import gc -import time -import queue import pytest import logging import weakref -import threading import psycopg3 from psycopg3 import Connection @@ -80,52 +77,6 @@ def test_rollback(conn): conn.rollback() -@pytest.mark.slow -def test_commit_concurrency(conn): - # Check the condition reported in psycopg2#103 - # Because of bad status check, we commit even when a commit is already on - # its way. We can detect this condition by the warnings. - notices = queue.Queue() - conn.add_notice_handler(lambda diag: notices.put(diag.message_primary)) - stop = False - - def committer(): - nonlocal stop - while not stop: - conn.commit() - - cur = conn.cursor() - t1 = threading.Thread(target=committer) - t1.start() - for i in range(1000): - cur.execute("select %s;", (i,)) - conn.commit() - - # Stop the committer thread - stop = True - - assert notices.empty(), "%d notices raised" % notices.qsize() - - -@pytest.mark.slow -def test_concurrent_execution(dsn): - def worker(): - cnn = psycopg3.connect(dsn) - cur = cnn.cursor() - cur.execute("select pg_sleep(0.5)") - cur.close() - cnn.close() - - t1 = threading.Thread(target=worker) - t2 = threading.Thread(target=worker) - t0 = time.time() - t1.start() - t2.start() - t1.join() - t2.join() - assert time.time() - t0 < 0.8, "something broken in concurrency" - - def test_auto_transaction(conn): conn.pgconn.exec_(b"drop table if exists foo") conn.pgconn.exec_(b"create table foo (id int primary key)") diff --git a/tests/test_async_connection.py b/tests/test_connection_async.py similarity index 87% rename from tests/test_async_connection.py rename to tests/test_connection_async.py index 4ea0ee068..585cb7738 100644 --- a/tests/test_async_connection.py +++ b/tests/test_connection_async.py @@ -1,10 +1,7 @@ import gc -import time import pytest -import asyncio import logging import weakref -from asyncio.queues import Queue import psycopg3 from psycopg3 import AsyncConnection @@ -83,52 +80,6 @@ async def test_rollback(aconn): await aconn.rollback() -@pytest.mark.slow -@pytest.mark.skip # TODO: sometimes this test hangs? -async def test_commit_concurrency(aconn): - # Check the condition reported in psycopg2#103 - # Because of bad status check, we commit even when a commit is already on - # its way. We can detect this condition by the warnings. - notices = Queue() - aconn.add_notice_handler( - lambda diag: notices.put_nowait(diag.message_primary) - ) - stop = False - - async def committer(): - nonlocal stop - while not stop: - await aconn.commit() - - async def runner(): - nonlocal stop - cur = aconn.cursor() - for i in range(1000): - await cur.execute("select %s;", (i,)) - await aconn.commit() - - # Stop the committer thread - stop = True - - await asyncio.wait([committer(), runner()]) - assert notices.empty(), "%d notices raised" % notices.qsize() - - -@pytest.mark.slow -async def test_concurrent_execution(dsn): - async def worker(): - cnn = await psycopg3.AsyncConnection.connect(dsn) - cur = cnn.cursor() - await cur.execute("select pg_sleep(0.5)") - await cur.close() - await cnn.close() - - workers = [worker(), worker()] - t0 = time.time() - await asyncio.wait(workers) - assert time.time() - t0 < 0.8, "something broken in concurrency" - - async def test_auto_transaction(aconn): aconn.pgconn.exec_(b"drop table if exists foo") aconn.pgconn.exec_(b"create table foo (id int primary key)") diff --git a/tests/test_async_cursor.py b/tests/test_cursor_async.py similarity index 100% rename from tests/test_async_cursor.py rename to tests/test_cursor_async.py