import logging
-from typing import Any
import concurrent.futures
+from typing import Any
+from operator import attrgetter
+from itertools import groupby
import pytest
assert cur.nextset() is None
+def test_executemany_trace(conn, trace):
+ conn.autocommit = True
+ cur = conn.cursor()
+ cur.execute("create temp table trace (id int)")
+ t = trace.trace(conn)
+ with conn.pipeline():
+ cur.executemany("insert into trace (id) values (%s)", [(10,), (20,)])
+ cur.executemany("insert into trace (id) values (%s)", [(10,), (20,)])
+ conn.close()
+ items = list(t)
+ assert items[-1].type == "Terminate"
+ del items[-1]
+ roundtrips = [k for k, g in groupby(items, key=attrgetter("direction"))]
+ assert roundtrips == ["F", "B"]
+ assert len([i for i in items if i.type == "Sync"]) == 1
+
+
+def test_executemany_trace_returning(conn, trace):
+ conn.autocommit = True
+ cur = conn.cursor()
+ cur.execute("create temp table trace (id int)")
+ t = trace.trace(conn)
+ with conn.pipeline():
+ cur.executemany(
+ "insert into trace (id) values (%s)", [(10,), (20,)], returning=True
+ )
+ cur.executemany(
+ "insert into trace (id) values (%s)", [(10,), (20,)], returning=True
+ )
+ conn.close()
+ items = list(t)
+ assert items[-1].type == "Terminate"
+ del items[-1]
+ roundtrips = [k for k, g in groupby(items, key=attrgetter("direction"))]
+ assert roundtrips == ["F", "B"] * 3
+ assert items[-2].direction == "F" # last 2 items are F B
+ assert len([i for i in items if i.type == "Sync"]) == 3
+
+
def test_prepared(conn):
conn.autocommit = True
with conn.pipeline():
import asyncio
import logging
from typing import Any
+from operator import attrgetter
+from itertools import groupby
import pytest
assert cur.nextset() is None
+async def test_executemany_trace(aconn, trace):
+ await aconn.set_autocommit(True)
+ cur = aconn.cursor()
+ await cur.execute("create temp table trace (id int)")
+ t = trace.trace(aconn)
+ async with aconn.pipeline():
+ await cur.executemany("insert into trace (id) values (%s)", [(10,), (20,)])
+ await cur.executemany("insert into trace (id) values (%s)", [(10,), (20,)])
+ await aconn.close()
+ items = list(t)
+ assert items[-1].type == "Terminate"
+ del items[-1]
+ roundtrips = [k for k, g in groupby(items, key=attrgetter("direction"))]
+ assert roundtrips == ["F", "B"]
+ assert len([i for i in items if i.type == "Sync"]) == 1
+
+
+async def test_executemany_trace_returning(aconn, trace):
+ await aconn.set_autocommit(True)
+ cur = aconn.cursor()
+ await cur.execute("create temp table trace (id int)")
+ t = trace.trace(aconn)
+ async with aconn.pipeline():
+ await cur.executemany(
+ "insert into trace (id) values (%s)", [(10,), (20,)], returning=True
+ )
+ await cur.executemany(
+ "insert into trace (id) values (%s)", [(10,), (20,)], returning=True
+ )
+ await aconn.close()
+ items = list(t)
+ assert items[-1].type == "Terminate"
+ del items[-1]
+ roundtrips = [k for k, g in groupby(items, key=attrgetter("direction"))]
+ assert roundtrips == ["F", "B"] * 3
+ assert items[-2].direction == "F" # last 2 items are F B
+ assert len([i for i in items if i.type == "Sync"]) == 3
+
+
async def test_prepared(aconn):
await aconn.set_autocommit(True)
async with aconn.pipeline():