--- /dev/null
+"""
+Tests dealing with concurrency issues.
+"""
+
+import time
+import queue
+import pytest
+import threading
+
+import psycopg3
+
+
+@pytest.mark.slow
+def test_concurrent_execution(dsn):
+ def worker():
+ cnn = psycopg3.connect(dsn)
+ cur = cnn.cursor()
+ cur.execute("select pg_sleep(0.5)")
+ cur.close()
+ cnn.close()
+
+ t1 = threading.Thread(target=worker)
+ t2 = threading.Thread(target=worker)
+ t0 = time.time()
+ t1.start()
+ t2.start()
+ t1.join()
+ t2.join()
+ assert time.time() - t0 < 0.8, "something broken in concurrency"
+
+
+@pytest.mark.slow
+def test_commit_concurrency(conn):
+ # Check the condition reported in psycopg2#103
+ # Because of bad status check, we commit even when a commit is already on
+ # its way. We can detect this condition by the warnings.
+ notices = queue.Queue()
+ conn.add_notice_handler(lambda diag: notices.put(diag.message_primary))
+ stop = False
+
+ def committer():
+ nonlocal stop
+ while not stop:
+ conn.commit()
+
+ cur = conn.cursor()
+ t1 = threading.Thread(target=committer)
+ t1.start()
+ for i in range(1000):
+ cur.execute("select %s;", (i,))
+ conn.commit()
+
+ # Stop the committer thread
+ stop = True
+
+ assert notices.empty(), "%d notices raised" % notices.qsize()
--- /dev/null
+import time
+import pytest
+import asyncio
+from asyncio.queues import Queue
+
+import psycopg3
+
+pytestmark = pytest.mark.asyncio
+
+
+@pytest.mark.slow
+@pytest.mark.skip # TODO: sometimes this test hangs?
+async def test_commit_concurrency(aconn):
+ # Check the condition reported in psycopg2#103
+ # Because of bad status check, we commit even when a commit is already on
+ # its way. We can detect this condition by the warnings.
+ notices = Queue()
+ aconn.add_notice_handler(
+ lambda diag: notices.put_nowait(diag.message_primary)
+ )
+ stop = False
+
+ async def committer():
+ nonlocal stop
+ while not stop:
+ await aconn.commit()
+
+ async def runner():
+ nonlocal stop
+ cur = aconn.cursor()
+ for i in range(1000):
+ await cur.execute("select %s;", (i,))
+ await aconn.commit()
+
+ # Stop the committer thread
+ stop = True
+
+ await asyncio.wait([committer(), runner()])
+ assert notices.empty(), "%d notices raised" % notices.qsize()
+
+
+@pytest.mark.slow
+async def test_concurrent_execution(dsn):
+ async def worker():
+ cnn = await psycopg3.AsyncConnection.connect(dsn)
+ cur = cnn.cursor()
+ await cur.execute("select pg_sleep(0.5)")
+ await cur.close()
+ await cnn.close()
+
+ workers = [worker(), worker()]
+ t0 = time.time()
+ await asyncio.wait(workers)
+ assert time.time() - t0 < 0.8, "something broken in concurrency"
import gc
-import time
-import queue
import pytest
import logging
import weakref
-import threading
import psycopg3
from psycopg3 import Connection
conn.rollback()
-@pytest.mark.slow
-def test_commit_concurrency(conn):
- # Check the condition reported in psycopg2#103
- # Because of bad status check, we commit even when a commit is already on
- # its way. We can detect this condition by the warnings.
- notices = queue.Queue()
- conn.add_notice_handler(lambda diag: notices.put(diag.message_primary))
- stop = False
-
- def committer():
- nonlocal stop
- while not stop:
- conn.commit()
-
- cur = conn.cursor()
- t1 = threading.Thread(target=committer)
- t1.start()
- for i in range(1000):
- cur.execute("select %s;", (i,))
- conn.commit()
-
- # Stop the committer thread
- stop = True
-
- assert notices.empty(), "%d notices raised" % notices.qsize()
-
-
-@pytest.mark.slow
-def test_concurrent_execution(dsn):
- def worker():
- cnn = psycopg3.connect(dsn)
- cur = cnn.cursor()
- cur.execute("select pg_sleep(0.5)")
- cur.close()
- cnn.close()
-
- t1 = threading.Thread(target=worker)
- t2 = threading.Thread(target=worker)
- t0 = time.time()
- t1.start()
- t2.start()
- t1.join()
- t2.join()
- assert time.time() - t0 < 0.8, "something broken in concurrency"
-
-
def test_auto_transaction(conn):
conn.pgconn.exec_(b"drop table if exists foo")
conn.pgconn.exec_(b"create table foo (id int primary key)")
import gc
-import time
import pytest
-import asyncio
import logging
import weakref
-from asyncio.queues import Queue
import psycopg3
from psycopg3 import AsyncConnection
await aconn.rollback()
-@pytest.mark.slow
-@pytest.mark.skip # TODO: sometimes this test hangs?
-async def test_commit_concurrency(aconn):
- # Check the condition reported in psycopg2#103
- # Because of bad status check, we commit even when a commit is already on
- # its way. We can detect this condition by the warnings.
- notices = Queue()
- aconn.add_notice_handler(
- lambda diag: notices.put_nowait(diag.message_primary)
- )
- stop = False
-
- async def committer():
- nonlocal stop
- while not stop:
- await aconn.commit()
-
- async def runner():
- nonlocal stop
- cur = aconn.cursor()
- for i in range(1000):
- await cur.execute("select %s;", (i,))
- await aconn.commit()
-
- # Stop the committer thread
- stop = True
-
- await asyncio.wait([committer(), runner()])
- assert notices.empty(), "%d notices raised" % notices.qsize()
-
-
-@pytest.mark.slow
-async def test_concurrent_execution(dsn):
- async def worker():
- cnn = await psycopg3.AsyncConnection.connect(dsn)
- cur = cnn.cursor()
- await cur.execute("select pg_sleep(0.5)")
- await cur.close()
- await cnn.close()
-
- workers = [worker(), worker()]
- t0 = time.time()
- await asyncio.wait(workers)
- assert time.time() - t0 < 0.8, "something broken in concurrency"
-
-
async def test_auto_transaction(aconn):
aconn.pgconn.exec_(b"drop table if exists foo")
aconn.pgconn.exec_(b"create table foo (id int primary key)")