]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
test: add test to verify the number of roundtrips in pipelined executemany
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 29 Mar 2022 19:36:46 +0000 (21:36 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 2 Apr 2022 23:23:22 +0000 (01:23 +0200)
Add fixture to access the trace data as Python object. I think it's cute <3

tests/test_pipeline.py
tests/test_pipeline_async.py

index dc68062c302d3dfed9d5245e5d285cc0eac7db21..6af8b44d060912610b68c205c394f2d03dd1dcd7 100644 (file)
@@ -1,6 +1,8 @@
 import logging
-from typing import Any
 import concurrent.futures
+from typing import Any
+from operator import attrgetter
+from itertools import groupby
 
 import pytest
 
@@ -197,6 +199,45 @@ def test_executemany_no_returning(conn):
         assert cur.nextset() is None
 
 
+def test_executemany_trace(conn, trace):
+    conn.autocommit = True
+    cur = conn.cursor()
+    cur.execute("create temp table trace (id int)")
+    t = trace.trace(conn)
+    with conn.pipeline():
+        cur.executemany("insert into trace (id) values (%s)", [(10,), (20,)])
+        cur.executemany("insert into trace (id) values (%s)", [(10,), (20,)])
+    conn.close()
+    items = list(t)
+    assert items[-1].type == "Terminate"
+    del items[-1]
+    roundtrips = [k for k, g in groupby(items, key=attrgetter("direction"))]
+    assert roundtrips == ["F", "B"]
+    assert len([i for i in items if i.type == "Sync"]) == 1
+
+
+def test_executemany_trace_returning(conn, trace):
+    conn.autocommit = True
+    cur = conn.cursor()
+    cur.execute("create temp table trace (id int)")
+    t = trace.trace(conn)
+    with conn.pipeline():
+        cur.executemany(
+            "insert into trace (id) values (%s)", [(10,), (20,)], returning=True
+        )
+        cur.executemany(
+            "insert into trace (id) values (%s)", [(10,), (20,)], returning=True
+        )
+    conn.close()
+    items = list(t)
+    assert items[-1].type == "Terminate"
+    del items[-1]
+    roundtrips = [k for k, g in groupby(items, key=attrgetter("direction"))]
+    assert roundtrips == ["F", "B"] * 3
+    assert items[-2].direction == "F"  # last 2 items are F B
+    assert len([i for i in items if i.type == "Sync"]) == 3
+
+
 def test_prepared(conn):
     conn.autocommit = True
     with conn.pipeline():
index d11992b6e65e4bd0995be2c550d9d4625b6ab0de..b79ab8728c7536197f2a9e60851b773d335483b6 100644 (file)
@@ -1,6 +1,8 @@
 import asyncio
 import logging
 from typing import Any
+from operator import attrgetter
+from itertools import groupby
 
 import pytest
 
@@ -200,6 +202,45 @@ async def test_executemany_no_returning(aconn):
         assert cur.nextset() is None
 
 
+async def test_executemany_trace(aconn, trace):
+    await aconn.set_autocommit(True)
+    cur = aconn.cursor()
+    await cur.execute("create temp table trace (id int)")
+    t = trace.trace(aconn)
+    async with aconn.pipeline():
+        await cur.executemany("insert into trace (id) values (%s)", [(10,), (20,)])
+        await cur.executemany("insert into trace (id) values (%s)", [(10,), (20,)])
+    await aconn.close()
+    items = list(t)
+    assert items[-1].type == "Terminate"
+    del items[-1]
+    roundtrips = [k for k, g in groupby(items, key=attrgetter("direction"))]
+    assert roundtrips == ["F", "B"]
+    assert len([i for i in items if i.type == "Sync"]) == 1
+
+
+async def test_executemany_trace_returning(aconn, trace):
+    await aconn.set_autocommit(True)
+    cur = aconn.cursor()
+    await cur.execute("create temp table trace (id int)")
+    t = trace.trace(aconn)
+    async with aconn.pipeline():
+        await cur.executemany(
+            "insert into trace (id) values (%s)", [(10,), (20,)], returning=True
+        )
+        await cur.executemany(
+            "insert into trace (id) values (%s)", [(10,), (20,)], returning=True
+        )
+    await aconn.close()
+    items = list(t)
+    assert items[-1].type == "Terminate"
+    del items[-1]
+    roundtrips = [k for k, g in groupby(items, key=attrgetter("direction"))]
+    assert roundtrips == ["F", "B"] * 3
+    assert items[-2].direction == "F"  # last 2 items are F B
+    assert len([i for i in items if i.type == "Sync"]) == 3
+
+
 async def test_prepared(aconn):
     await aconn.set_autocommit(True)
     async with aconn.pipeline():