This allows to test psycopg pool 3.2 tests with psycopg 3.1 installed.
from typing_extensions import TypeGuard
if sys.version_info >= (3, 11):
- from typing import LiteralString, assert_type
+ from typing import LiteralString
else:
- from typing_extensions import LiteralString, assert_type
+ from typing_extensions import LiteralString
__all__ = [
"Counter",
"LiteralString",
"TypeGuard",
"ZoneInfo",
- "assert_type",
"cache",
]
import psycopg
from psycopg.pq import TransactionStatus
from psycopg.rows import class_row, Row, TupleRow
-from psycopg._compat import assert_type, Counter
+from ..utils import assert_type, Counter, set_autocommit
from ..acompat import Event, spawn, gather, sleep, skip_sync
from .test_pool_common import delay_connection
def test_generic_connection_type(dsn):
- def set_autocommit(conn: psycopg.Connection[Any]) -> None:
- conn.set_autocommit(True)
+ def configure(conn: psycopg.Connection[Any]) -> None:
+ set_autocommit(conn, True)
class MyConnection(psycopg.Connection[Row]):
pass
dsn,
connection_class=MyConnection[MyRow],
kwargs=dict(row_factory=class_row(MyRow)),
- configure=set_autocommit,
+ configure=configure,
) as p1:
with p1.connection() as conn1:
cur1 = conn1.execute("select 1 as x")
def test_non_generic_connection_type(dsn):
- def set_autocommit(conn: psycopg.Connection[Any]) -> None:
- conn.set_autocommit(True)
+ def configure(conn: psycopg.Connection[Any]) -> None:
+ set_autocommit(conn, True)
class MyConnection(psycopg.Connection[MyRow]):
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
with pool.ConnectionPool(
- dsn, connection_class=MyConnection, configure=set_autocommit
+ dsn, connection_class=MyConnection, configure=configure
) as p1:
with p1.connection() as conn1:
cur1 = conn1.execute("select 1 as x")
import psycopg
from psycopg.pq import TransactionStatus
from psycopg.rows import class_row, Row, TupleRow
-from psycopg._compat import assert_type, Counter
+from ..utils import assert_type, Counter, set_autocommit
from ..acompat import AEvent, spawn, gather, asleep, skip_sync
from .test_pool_common_async import delay_connection
async def test_generic_connection_type(dsn):
- async def set_autocommit(conn: psycopg.AsyncConnection[Any]) -> None:
- await conn.set_autocommit(True)
+ async def configure(conn: psycopg.AsyncConnection[Any]) -> None:
+ await set_autocommit(conn, True)
class MyConnection(psycopg.AsyncConnection[Row]):
pass
dsn,
connection_class=MyConnection[MyRow],
kwargs=dict(row_factory=class_row(MyRow)),
- configure=set_autocommit,
+ configure=configure,
) as p1:
async with p1.connection() as conn1:
cur1 = await conn1.execute("select 1 as x")
async def test_non_generic_connection_type(dsn):
- async def set_autocommit(conn: psycopg.AsyncConnection[Any]) -> None:
- await conn.set_autocommit(True)
+ async def configure(conn: psycopg.AsyncConnection[Any]) -> None:
+ await set_autocommit(conn, True)
class MyConnection(psycopg.AsyncConnection[MyRow]):
def __init__(self, *args: Any, **kwargs: Any):
async with pool.AsyncConnectionPool(
dsn,
connection_class=MyConnection,
- configure=set_autocommit,
+ configure=configure,
) as p1:
async with p1.connection() as conn1:
cur1 = await conn1.execute("select 1 as x")
import psycopg
+from ..utils import set_autocommit
from ..acompat import Event, spawn, gather, sleep, is_alive, skip_async, skip_sync
try:
@pytest.mark.parametrize("autocommit", [True, False])
def test_check_connection(pool_cls, conn_cls, dsn, autocommit):
conn = conn_cls.connect(dsn)
- conn.set_autocommit(autocommit)
+ set_autocommit(conn, autocommit)
pool_cls.check_connection(conn)
assert not conn.closed
assert conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE
import psycopg
+from ..utils import set_autocommit
from ..acompat import AEvent, spawn, gather, asleep, is_alive, skip_async, skip_sync
try:
@pytest.mark.parametrize("autocommit", [True, False])
async def test_check_connection(pool_cls, aconn_cls, dsn, autocommit):
conn = await aconn_cls.connect(dsn)
- await conn.set_autocommit(autocommit)
+ await set_autocommit(conn, autocommit)
await pool_cls.check_connection(conn)
assert not conn.closed
assert conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE
import psycopg
from psycopg.pq import TransactionStatus
from psycopg.rows import class_row, Row, TupleRow
-from psycopg._compat import assert_type
+from ..utils import assert_type, set_autocommit
from ..acompat import Event, sleep, spawn, gather, skip_sync
from .test_pool_common import delay_connection, ensure_waiting
def test_generic_connection_type(dsn):
- def set_autocommit(conn: psycopg.Connection[Any]) -> None:
- conn.set_autocommit(True)
+ def configure(conn: psycopg.Connection[Any]) -> None:
+ set_autocommit(conn, True)
class MyConnection(psycopg.Connection[Row]):
pass
dsn,
connection_class=MyConnection[MyRow],
kwargs={"row_factory": class_row(MyRow)},
- configure=set_autocommit,
+ configure=configure,
) as p1:
with p1.connection() as conn1:
cur1 = conn1.execute("select 1 as x")
def test_non_generic_connection_type(dsn):
- def set_autocommit(conn: psycopg.Connection[Any]) -> None:
- conn.set_autocommit(True)
+ def configure(conn: psycopg.Connection[Any]) -> None:
+ set_autocommit(conn, True)
class MyConnection(psycopg.Connection[MyRow]):
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
with pool.NullConnectionPool(
- dsn, connection_class=MyConnection, configure=set_autocommit
+ dsn, connection_class=MyConnection, configure=configure
) as p1:
with p1.connection() as conn1:
(row1,) = conn1.execute("select 1 as x").fetchall()
import psycopg
from psycopg.pq import TransactionStatus
from psycopg.rows import class_row, Row, TupleRow
-from psycopg._compat import assert_type
+from ..utils import assert_type, set_autocommit
from ..acompat import AEvent, asleep, spawn, gather, skip_sync
from .test_pool_common_async import delay_connection, ensure_waiting
async def test_generic_connection_type(dsn):
- async def set_autocommit(conn: psycopg.AsyncConnection[Any]) -> None:
- await conn.set_autocommit(True)
+ async def configure(conn: psycopg.AsyncConnection[Any]) -> None:
+ await set_autocommit(conn, True)
class MyConnection(psycopg.AsyncConnection[Row]):
pass
dsn,
connection_class=MyConnection[MyRow],
kwargs={"row_factory": class_row(MyRow)},
- configure=set_autocommit,
+ configure=configure,
) as p1:
async with p1.connection() as conn1:
cur1 = await conn1.execute("select 1 as x")
async def test_non_generic_connection_type(dsn):
- async def set_autocommit(conn: psycopg.AsyncConnection[Any]) -> None:
- await conn.set_autocommit(True)
+ async def configure(conn: psycopg.AsyncConnection[Any]) -> None:
+ await set_autocommit(conn, True)
class MyConnection(psycopg.AsyncConnection[MyRow]):
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
async with pool.AsyncNullConnectionPool(
- dsn, connection_class=MyConnection, configure=set_autocommit
+ dsn, connection_class=MyConnection, configure=configure
) as p1:
async with p1.connection() as conn1:
(row1,) = await (await conn1.execute("select 1 as x")).fetchall()
import weakref
import datetime as dt
from typing import Any, List
+from packaging.version import parse as ver
import pytest
execmany = execmany # avoid F811 underneath
+cursor_classes = [psycopg.Cursor, psycopg.ClientCursor]
+# Allow to import (not necessarily to run) the module with psycopg 3.1.
+# Needed to test psycopg_pool 3.2 tests with psycopg 3.1 imported, i.e. to run
+# `pytest -m pool`. (which might happen when releasing pool packages).
+if ver(psycopg.__version__) >= ver("3.2.0.dev0"):
+ cursor_classes.append(psycopg.RawCursor)
-@pytest.fixture(params=[psycopg.Cursor, psycopg.ClientCursor, psycopg.RawCursor])
+
+@pytest.fixture(params=cursor_classes)
def conn(conn, request, anyio_backend):
conn.cursor_factory = request.param
return conn
import weakref
import datetime as dt
from typing import Any, List
+from packaging.version import parse as ver
import pytest
execmany = execmany # avoid F811 underneath
-@pytest.fixture(
- params=[psycopg.AsyncCursor, psycopg.AsyncClientCursor, psycopg.AsyncRawCursor]
-)
+cursor_classes = [psycopg.AsyncCursor, psycopg.AsyncClientCursor]
+# Allow to import (not necessarily to run) the module with psycopg 3.1.
+# Needed to test psycopg_pool 3.2 tests with psycopg 3.1 imported, i.e. to run
+# `pytest -m pool`. (which might happen when releasing pool packages).
+if ver(psycopg.__version__) >= ver("3.2.0.dev0"):
+ cursor_classes.append(psycopg.AsyncRawCursor)
+
+
+@pytest.fixture(params=cursor_classes)
async def aconn(aconn, request, anyio_backend):
aconn.cursor_factory = request.param
return aconn
from typing import Callable, Optional, Tuple
from contextlib import contextmanager
+
+if sys.version_info >= (3, 9):
+ import collections
+
+ Counter = collections.Counter
+else:
+ import typing
+
+ Counter = typing.Counter
+
+if sys.version_info >= (3, 11):
+ import typing
+
+ assert_type = typing.assert_type
+else:
+ import typing_extensions
+
+ assert_type = typing_extensions.assert_type
+
+
import pytest
eur = "\u20ac"
with pytest.raises(*args, **kwargs) as ex:
yield ex
return
+
+
+def set_autocommit(conn, value):
+ """
+ Set autocommit on a connection.
+
+ Give an uniform interface to both sync and async connection for psycopg
+ < 3.2, in order to run psycopg_pool 3.2 tests using psycopg 3.1.
+ """
+ import psycopg
+
+ if isinstance(conn, psycopg.Connection):
+ conn.autocommit = value
+ elif isinstance(conn, psycopg.AsyncConnection):
+ return conn.set_autocommit(value)
+ else:
+ raise TypeError(f"not a connection: {conn}")