+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'test_connection_async.py'
+# DO NOT CHANGE! Change the original file instead.
import sys
import time
import pytest
from psycopg.rows import tuple_row
from psycopg.conninfo import conninfo_to_dict, make_conninfo
-from .utils import gc_collect
+from .utils import gc_collect, is_async
from ._test_cursor import my_row_factory
from ._test_connection import tx_params, tx_params_isolation, tx_values_map
-from ._test_connection import conninfo_params_timeout
+from ._test_connection import conninfo_params_timeout, conn_set
from ._test_connection import testctx # noqa: F401 # fixture
from .test_adapt import make_bin_dumper, make_dumper
# TODO: the INERROR started failing in the C implementation in Python 3.12a7
# compiled with Cython-3.0.0b3, not before.
+
+
@pytest.mark.xfail(
- (pq.__impl__ in ("c", "binary") and sys.version_info[:2] == (3, 12)),
+ pq.__impl__ in ("c", "binary")
+ and sys.version_info[:2] == (3, 12)
+ and (not is_async(__name__)),
reason="Something with Exceptions, C, Python 3.12",
)
def test_connection_warn_close(conn_cls, dsn, recwarn):
with conn_cls.connect(dsn) as conn2:
conn2.execute("select 1")
conn.execute(
- "select pg_terminate_backend(%s::int)",
- [conn2.pgconn.backend_pid],
+ "select pg_terminate_backend(%s::int)", [conn2.pgconn.backend_pid]
)
1 / 0
@pytest.mark.crdb_skip("deferrable")
def test_commit_error(conn):
- conn.execute(
- """
- drop table if exists selfref;
- create table selfref (
- x serial primary key,
- y int references selfref (x) deferrable initially deferred)
- """
- )
+ sql = [
+ "drop table if exists selfref;",
+ "create table selfref (",
+ "x serial primary key,",
+ "y int references selfref (x) deferrable initially deferred)",
+ ]
+
+ conn.execute("".join(sql))
conn.commit()
conn.execute("insert into selfref (y) values (-1)")
conn.commit()
assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
- assert cur.execute("select * from foo").fetchone() == (1,)
+ cur.execute("select * from foo")
+ assert cur.fetchone() == (1,)
assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
conn.commit()
assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
- assert cur.execute("select * from foo").fetchone() is None
+ cur.execute("select * from foo")
+ assert cur.fetchone() is None
assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
+@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+def test_autocommit_readonly_property(conn):
+ with pytest.raises(AttributeError):
+ conn.autocommit = True
+ assert not conn.autocommit
+
+
def test_autocommit(conn):
assert conn.autocommit is False
+
conn.autocommit = True
assert conn.autocommit
cur = conn.cursor()
- assert cur.execute("select 1").fetchone() == (1,)
+ cur.execute("select 1")
+ assert cur.fetchone() == (1,)
assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
conn.autocommit = ""
- assert conn.autocommit is False # type: ignore[comparison-overlap]
+ assert isinstance(conn.autocommit, bool)
+ assert conn.autocommit is False
+
conn.autocommit = "yeah"
+ assert isinstance(conn.autocommit, bool)
assert conn.autocommit is True
def test_autocommit_intrans(conn):
cur = conn.cursor()
- assert cur.execute("select 1").fetchone() == (1,)
+ cur.execute("select 1")
+ assert cur.fetchone() == (1,)
assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
with pytest.raises(psycopg.ProgrammingError):
conn.autocommit = True
[
((), {}, ""),
(("",), {}, ""),
- (("host=foo user=bar",), {}, "host=foo user=bar"),
- (("host=foo",), {"user": "baz"}, "host=foo user=baz"),
+ (("dbname=foo user=bar",), {}, "dbname=foo user=bar"),
+ (("dbname=foo",), {"user": "baz"}, "dbname=foo user=baz"),
(
- ("host=foo port=5432",),
- {"host": "qux", "user": "joe"},
- "host=qux user=joe port=5432",
+ ("dbname=foo port=5432",),
+ {"dbname": "qux", "user": "joe"},
+ "dbname=qux user=joe port=5432",
),
- (("host=foo",), {"user": None}, "host=foo"),
+ (("dbname=foo",), {"user": None}, "dbname=foo"),
],
)
-def test_connect_args(conn_cls, monkeypatch, pgconn, args, kwargs, want):
+def test_connect_args(conn_cls, monkeypatch, setpgenv, pgconn, args, kwargs, want):
the_conninfo: str
def fake_connect(conninfo):
return pgconn
yield
+ setpgenv({})
monkeypatch.setattr(psycopg.connection, "connect", fake_connect)
conn = conn_cls.connect(*args, **kwargs)
assert conninfo_to_dict(the_conninfo) == conninfo_to_dict(want)
@pytest.mark.parametrize("param", tx_params)
def test_transaction_param_default(conn, param):
assert getattr(conn, param.name) is None
- current, default = conn.execute(
+ cur = conn.execute(
"select current_setting(%s), current_setting(%s)",
[f"transaction_{param.guc}", f"default_transaction_{param.guc}"],
- ).fetchone()
+ )
+ (current, default) = cur.fetchone()
assert current == default
+@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+@pytest.mark.parametrize("param", tx_params)
+def test_transaction_param_readonly_property(conn, param):
+ with pytest.raises(AttributeError):
+ setattr(conn, param.name, None)
+
+
@pytest.mark.parametrize("autocommit", [True, False])
@pytest.mark.parametrize("param", tx_params_isolation)
def test_set_transaction_param_implicit(conn, param, autocommit):
conn.autocommit = autocommit
for value in param.values:
- setattr(conn, param.name, value)
- pgval, default = conn.execute(
+ conn_set(conn, param.name, value)
+ cur = conn.execute(
"select current_setting(%s), current_setting(%s)",
[f"transaction_{param.guc}", f"default_transaction_{param.guc}"],
- ).fetchone()
+ )
+ (pgval, default) = cur.fetchone()
if autocommit:
assert pgval == default
else:
conn.commit()
for value in param.values:
- setattr(conn, param.name, value)
- (pgval,) = conn.execute(
- "select current_setting(%s)", [f"transaction_{param.guc}"]
- ).fetchone()
+ conn_set(conn, param.name, value)
+ cur = conn.execute("select current_setting(%s)", [f"transaction_{param.guc}"])
+ (pgval,) = cur.fetchone()
assert tx_values_map[pgval] == value
conn.rollback()
- setattr(conn, param.name, None)
- (pgval,) = conn.execute(
- "select current_setting(%s)", [f"transaction_{param.guc}"]
- ).fetchone()
+ conn_set(conn, param.name, None)
+ cur = conn.execute("select current_setting(%s)", [f"transaction_{param.guc}"])
+ (pgval,) = cur.fetchone()
assert tx_values_map[pgval] == tx_values_map[param.non_default]
conn.rollback()
def test_set_transaction_param_block(conn, param, autocommit):
conn.autocommit = autocommit
for value in param.values:
- setattr(conn, param.name, value)
+ conn_set(conn, param.name, value)
with conn.transaction():
- pgval = conn.execute(
+ cur = conn.execute(
"select current_setting(%s)", [f"transaction_{param.guc}"]
- ).fetchone()[0]
+ )
+ pgval = cur.fetchone()[0]
assert tx_values_map[pgval] == value
@pytest.mark.parametrize("param", tx_params)
def test_set_transaction_param_not_intrans_implicit(conn, param):
conn.execute("select 1")
+ value = param.values[0]
with pytest.raises(psycopg.ProgrammingError):
- setattr(conn, param.name, param.values[0])
+ conn_set(conn, param.name, value)
@pytest.mark.parametrize("param", tx_params)
def test_set_transaction_param_not_intrans_block(conn, param):
+ value = param.values[0]
with conn.transaction():
with pytest.raises(psycopg.ProgrammingError):
- setattr(conn, param.name, param.values[0])
+ conn_set(conn, param.name, value)
@pytest.mark.parametrize("param", tx_params)
def test_set_transaction_param_not_intrans_external(conn, param):
+ value = param.values[0]
+
conn.autocommit = True
conn.execute("begin")
with pytest.raises(psycopg.ProgrammingError):
- setattr(conn, param.name, param.values[0])
+ conn_set(conn, param.name, value)
@pytest.mark.crdb("skip", reason="transaction isolation")
for param in params:
value = param.values[0]
- setattr(conn, param.name, value)
+ conn_set(conn, param.name, value)
for param in params:
- pgval = conn.execute(
- "select current_setting(%s)", [f"transaction_{param.guc}"]
- ).fetchone()[0]
+ cur = conn.execute("select current_setting(%s)", [f"transaction_{param.guc}"])
+ pgval = cur.fetchone()[0]
assert tx_values_map[pgval] == value
@pytest.mark.parametrize("dsn, kwargs, exp", conninfo_params_timeout)
-def test_get_connection_params(conn_cls, dsn, kwargs, exp):
+def test_get_connection_params(conn_cls, dsn, kwargs, exp, setpgenv):
+ setpgenv({})
params = conn_cls._get_connection_params(dsn, **kwargs)
conninfo = make_conninfo(**params)
assert conninfo_to_dict(conninfo) == exp[0]
- assert params.get("connect_timeout") == exp[1]
+ assert params["connect_timeout"] == exp[1]
-def test_connect_context(conn_cls, dsn):
+def test_connect_context_adapters(conn_cls, dsn):
ctx = psycopg.adapt.AdaptersMap(psycopg.adapters)
ctx.register_dumper(str, make_bin_dumper("b"))
ctx.register_dumper(str, make_dumper("t"))
+import sys
import time
import pytest
import logging
import weakref
-from typing import List, Any
+from typing import Any, List
import psycopg
-from psycopg import Notify, errors as e
+from psycopg import Notify, pq, errors as e
from psycopg.rows import tuple_row
from psycopg.conninfo import conninfo_to_dict, make_conninfo
-from .utils import gc_collect
+from .utils import gc_collect, is_async
from ._test_cursor import my_row_factory
from ._test_connection import tx_params, tx_params_isolation, tx_values_map
-from ._test_connection import conninfo_params_timeout
+from ._test_connection import conninfo_params_timeout, aconn_set
from ._test_connection import testctx # noqa: F401 # fixture
from .test_adapt import make_bin_dumper, make_dumper
-from .test_conninfo import fake_resolve # noqa: F401 # fixture
async def test_connect(aconn_cls, dsn):
aconn.cursor()
+# TODO: the INERROR started failing in the C implementation in Python 3.12a7
+# compiled with Cython-3.0.0b3, not before.
+@pytest.mark.xfail(
+ pq.__impl__ in ("c", "binary")
+ and sys.version_info[:2] == (3, 12)
+ and not is_async(__name__),
+ reason="Something with Exceptions, C, Python 3.12",
+)
async def test_connection_warn_close(aconn_cls, dsn, recwarn):
conn = await aconn_cls.connect(dsn)
await conn.close()
@pytest.mark.crdb_skip("deferrable")
async def test_commit_error(aconn):
- await aconn.execute(
- """
- drop table if exists selfref;
- create table selfref (
- x serial primary key,
- y int references selfref (x) deferrable initially deferred)
- """
- )
+ sql = [
+ "drop table if exists selfref;",
+ "create table selfref (",
+ "x serial primary key,",
+ "y int references selfref (x) deferrable initially deferred)",
+ ]
+
+ await aconn.execute("".join(sql))
await aconn.commit()
await aconn.execute("insert into selfref (y) values (-1)")
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
-async def test_autocommit(aconn):
- assert aconn.autocommit is False
+@pytest.mark.skipif(not is_async(__name__), reason="async test only")
+async def test_autocommit_readonly_property(aconn):
with pytest.raises(AttributeError):
aconn.autocommit = True
assert not aconn.autocommit
+
+async def test_autocommit(aconn):
+ assert aconn.autocommit is False
await aconn.set_autocommit(True)
assert aconn.autocommit
cur = aconn.cursor()
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
await aconn.set_autocommit("")
+ assert isinstance(aconn.autocommit, bool)
assert aconn.autocommit is False
+
await aconn.set_autocommit("yeah")
+ assert isinstance(aconn.autocommit, bool)
assert aconn.autocommit is True
assert n.channel == "foo"
assert n.payload == "n2"
assert n.pid == aconn.pgconn.backend_pid
+ assert hash(n)
with pytest.raises(ValueError):
aconn.remove_notify_handler(cb1)
assert current == default
+@pytest.mark.skipif(not is_async(__name__), reason="async test only")
@pytest.mark.parametrize("param", tx_params)
async def test_transaction_param_readonly_property(aconn, param):
with pytest.raises(AttributeError):
async def test_set_transaction_param_implicit(aconn, param, autocommit):
await aconn.set_autocommit(autocommit)
for value in param.values:
- await getattr(aconn, f"set_{param.name}")(value)
+ await aconn_set(aconn, param.name, value)
cur = await aconn.execute(
"select current_setting(%s), current_setting(%s)",
[f"transaction_{param.guc}", f"default_transaction_{param.guc}"],
await aconn.commit()
for value in param.values:
- await getattr(aconn, f"set_{param.name}")(value)
+ await aconn_set(aconn, param.name, value)
cur = await aconn.execute(
"select current_setting(%s)", [f"transaction_{param.guc}"]
)
assert tx_values_map[pgval] == value
await aconn.rollback()
- await getattr(aconn, f"set_{param.name}")(None)
+ await aconn_set(aconn, param.name, None)
cur = await aconn.execute(
"select current_setting(%s)", [f"transaction_{param.guc}"]
)
async def test_set_transaction_param_block(aconn, param, autocommit):
await aconn.set_autocommit(autocommit)
for value in param.values:
- await getattr(aconn, f"set_{param.name}")(value)
+ await aconn_set(aconn, param.name, value)
async with aconn.transaction():
cur = await aconn.execute(
"select current_setting(%s)", [f"transaction_{param.guc}"]
await aconn.execute("select 1")
value = param.values[0]
with pytest.raises(psycopg.ProgrammingError):
- await getattr(aconn, f"set_{param.name}")(value)
+ await aconn_set(aconn, param.name, value)
@pytest.mark.parametrize("param", tx_params)
value = param.values[0]
async with aconn.transaction():
with pytest.raises(psycopg.ProgrammingError):
- await getattr(aconn, f"set_{param.name}")(value)
+ await aconn_set(aconn, param.name, value)
@pytest.mark.parametrize("param", tx_params)
await aconn.set_autocommit(True)
await aconn.execute("begin")
with pytest.raises(psycopg.ProgrammingError):
- await getattr(aconn, f"set_{param.name}")(value)
+ await aconn_set(aconn, param.name, value)
@pytest.mark.crdb("skip", reason="transaction isolation")
for param in params:
value = param.values[0]
- await getattr(aconn, f"set_{param.name}")(value)
+ await aconn_set(aconn, param.name, value)
for param in params:
cur = await aconn.execute(
aconn.adapters.register_dumper(str, make_bin_dumper("b"))
aconn.adapters.register_dumper(str, make_dumper("t"))
- aconn2 = await aconn_cls.connect(dsn, context=aconn)
+ conn2 = await aconn_cls.connect(dsn, context=aconn)
- cur = await aconn2.execute("select %s", ["hello"])
+ cur = await conn2.execute("select %s", ["hello"])
assert (await cur.fetchone())[0] == "hellot"
- cur = await aconn2.execute("select %b", ["hello"])
+ cur = await conn2.execute("select %b", ["hello"])
assert (await cur.fetchone())[0] == "hellob"
- await aconn2.close()
+ await conn2.close()
async def test_cancel_closed(aconn):
await aconn.close()
aconn.cancel()
-
-
-@pytest.mark.usefixtures("fake_resolve")
-async def test_resolve_hostaddr_conn(aconn_cls, monkeypatch):
- got = []
-
- def fake_connect_gen(conninfo, **kwargs):
- got.append(conninfo)
- 1 / 0
-
- monkeypatch.setattr(aconn_cls, "_connect_gen", fake_connect_gen)
-
- with pytest.raises(ZeroDivisionError):
- await aconn_cls.connect("host=foo.com")
-
- assert len(got) == 1
- want = {"host": "foo.com", "hostaddr": "1.1.1.1"}
- assert conninfo_to_dict(got[0]) == want