From 267e7154ce3b8fcc309ff7a8989d0d63c6a1f4f4 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 23 May 2020 16:15:08 +1200 Subject: [PATCH] Moving concurrency tests in their own files These tests are verbose, slow, and don't test the interface, rather the behaviour, in peculiar cases. Some test modules names moved to keep related tested objects together. --- tests/test_concurrency.py | 56 +++++++++++++++++++ tests/test_concurrency_async.py | 54 ++++++++++++++++++ tests/test_connection.py | 49 ---------------- ...connection.py => test_connection_async.py} | 49 ---------------- ...t_async_cursor.py => test_cursor_async.py} | 0 5 files changed, 110 insertions(+), 98 deletions(-) create mode 100644 tests/test_concurrency.py create mode 100644 tests/test_concurrency_async.py rename tests/{test_async_connection.py => test_connection_async.py} (87%) rename tests/{test_async_cursor.py => test_cursor_async.py} (100%) diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py new file mode 100644 index 000000000..66cdd9c6e --- /dev/null +++ b/tests/test_concurrency.py @@ -0,0 +1,56 @@ +""" +Tests dealing with concurrency issues. +""" + +import time +import queue +import pytest +import threading + +import psycopg3 + + +@pytest.mark.slow +def test_concurrent_execution(dsn): + def worker(): + cnn = psycopg3.connect(dsn) + cur = cnn.cursor() + cur.execute("select pg_sleep(0.5)") + cur.close() + cnn.close() + + t1 = threading.Thread(target=worker) + t2 = threading.Thread(target=worker) + t0 = time.time() + t1.start() + t2.start() + t1.join() + t2.join() + assert time.time() - t0 < 0.8, "something broken in concurrency" + + +@pytest.mark.slow +def test_commit_concurrency(conn): + # Check the condition reported in psycopg2#103 + # Because of bad status check, we commit even when a commit is already on + # its way. We can detect this condition by the warnings. + notices = queue.Queue() + conn.add_notice_handler(lambda diag: notices.put(diag.message_primary)) + stop = False + + def committer(): + nonlocal stop + while not stop: + conn.commit() + + cur = conn.cursor() + t1 = threading.Thread(target=committer) + t1.start() + for i in range(1000): + cur.execute("select %s;", (i,)) + conn.commit() + + # Stop the committer thread + stop = True + + assert notices.empty(), "%d notices raised" % notices.qsize() diff --git a/tests/test_concurrency_async.py b/tests/test_concurrency_async.py new file mode 100644 index 000000000..c91667432 --- /dev/null +++ b/tests/test_concurrency_async.py @@ -0,0 +1,54 @@ +import time +import pytest +import asyncio +from asyncio.queues import Queue + +import psycopg3 + +pytestmark = pytest.mark.asyncio + + +@pytest.mark.slow +@pytest.mark.skip # TODO: sometimes this test hangs? +async def test_commit_concurrency(aconn): + # Check the condition reported in psycopg2#103 + # Because of bad status check, we commit even when a commit is already on + # its way. We can detect this condition by the warnings. + notices = Queue() + aconn.add_notice_handler( + lambda diag: notices.put_nowait(diag.message_primary) + ) + stop = False + + async def committer(): + nonlocal stop + while not stop: + await aconn.commit() + + async def runner(): + nonlocal stop + cur = aconn.cursor() + for i in range(1000): + await cur.execute("select %s;", (i,)) + await aconn.commit() + + # Stop the committer thread + stop = True + + await asyncio.wait([committer(), runner()]) + assert notices.empty(), "%d notices raised" % notices.qsize() + + +@pytest.mark.slow +async def test_concurrent_execution(dsn): + async def worker(): + cnn = await psycopg3.AsyncConnection.connect(dsn) + cur = cnn.cursor() + await cur.execute("select pg_sleep(0.5)") + await cur.close() + await cnn.close() + + workers = [worker(), worker()] + t0 = time.time() + await asyncio.wait(workers) + assert time.time() - t0 < 0.8, "something broken in concurrency" diff --git a/tests/test_connection.py b/tests/test_connection.py index 0a83d2f9a..7bcaa60c0 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,10 +1,7 @@ import gc -import time -import queue import pytest import logging import weakref -import threading import psycopg3 from psycopg3 import Connection @@ -80,52 +77,6 @@ def test_rollback(conn): conn.rollback() -@pytest.mark.slow -def test_commit_concurrency(conn): - # Check the condition reported in psycopg2#103 - # Because of bad status check, we commit even when a commit is already on - # its way. We can detect this condition by the warnings. - notices = queue.Queue() - conn.add_notice_handler(lambda diag: notices.put(diag.message_primary)) - stop = False - - def committer(): - nonlocal stop - while not stop: - conn.commit() - - cur = conn.cursor() - t1 = threading.Thread(target=committer) - t1.start() - for i in range(1000): - cur.execute("select %s;", (i,)) - conn.commit() - - # Stop the committer thread - stop = True - - assert notices.empty(), "%d notices raised" % notices.qsize() - - -@pytest.mark.slow -def test_concurrent_execution(dsn): - def worker(): - cnn = psycopg3.connect(dsn) - cur = cnn.cursor() - cur.execute("select pg_sleep(0.5)") - cur.close() - cnn.close() - - t1 = threading.Thread(target=worker) - t2 = threading.Thread(target=worker) - t0 = time.time() - t1.start() - t2.start() - t1.join() - t2.join() - assert time.time() - t0 < 0.8, "something broken in concurrency" - - def test_auto_transaction(conn): conn.pgconn.exec_(b"drop table if exists foo") conn.pgconn.exec_(b"create table foo (id int primary key)") diff --git a/tests/test_async_connection.py b/tests/test_connection_async.py similarity index 87% rename from tests/test_async_connection.py rename to tests/test_connection_async.py index 4ea0ee068..585cb7738 100644 --- a/tests/test_async_connection.py +++ b/tests/test_connection_async.py @@ -1,10 +1,7 @@ import gc -import time import pytest -import asyncio import logging import weakref -from asyncio.queues import Queue import psycopg3 from psycopg3 import AsyncConnection @@ -83,52 +80,6 @@ async def test_rollback(aconn): await aconn.rollback() -@pytest.mark.slow -@pytest.mark.skip # TODO: sometimes this test hangs? -async def test_commit_concurrency(aconn): - # Check the condition reported in psycopg2#103 - # Because of bad status check, we commit even when a commit is already on - # its way. We can detect this condition by the warnings. - notices = Queue() - aconn.add_notice_handler( - lambda diag: notices.put_nowait(diag.message_primary) - ) - stop = False - - async def committer(): - nonlocal stop - while not stop: - await aconn.commit() - - async def runner(): - nonlocal stop - cur = aconn.cursor() - for i in range(1000): - await cur.execute("select %s;", (i,)) - await aconn.commit() - - # Stop the committer thread - stop = True - - await asyncio.wait([committer(), runner()]) - assert notices.empty(), "%d notices raised" % notices.qsize() - - -@pytest.mark.slow -async def test_concurrent_execution(dsn): - async def worker(): - cnn = await psycopg3.AsyncConnection.connect(dsn) - cur = cnn.cursor() - await cur.execute("select pg_sleep(0.5)") - await cur.close() - await cnn.close() - - workers = [worker(), worker()] - t0 = time.time() - await asyncio.wait(workers) - assert time.time() - t0 < 0.8, "something broken in concurrency" - - async def test_auto_transaction(aconn): aconn.pgconn.exec_(b"drop table if exists foo") aconn.pgconn.exec_(b"create table foo (id int primary key)") diff --git a/tests/test_async_cursor.py b/tests/test_cursor_async.py similarity index 100% rename from tests/test_async_cursor.py rename to tests/test_cursor_async.py -- 2.47.2