Unused in the codebase, and not documented. Only used in the test suite.
ProgrammingError = e.ProgrammingError
NotSupportedError = e.NotSupportedError
- # Enums useful for the connection
- ConnStatus = pq.ConnStatus
- TransactionStatus = pq.TransactionStatus
-
def __init__(self, pgconn: PGconn):
self.pgconn = pgconn
self._autocommit = False
__weakref__
""".split()
- ExecStatus = pq.ExecStatus
-
_tx: Transformer
_make_row: RowMaker[Row]
_pgconn: PGconn
import struct
-from psycopg.pq import Format
+from psycopg import pq
from psycopg.copy import AsyncWriter
from psycopg.copy import FileWriter as FileWriter # noqa: F401
def py_to_raw(item, fmt):
"""Convert from Python type to the expected result from the db"""
- if fmt == Format.TEXT:
+ if fmt == pq.Format.TEXT:
if isinstance(item, int):
return str(item)
else:
import sys
import pytest
import psycopg
+from psycopg import pq
# TODOCRDB: is this the expected behaviour?
def in_transaction(conn):
- if conn.pgconn.transaction_status == conn.TransactionStatus.IDLE:
+ if conn.pgconn.transaction_status == pq.TransactionStatus.IDLE:
return False
- elif conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS:
+ elif conn.pgconn.transaction_status == pq.TransactionStatus.INTRANS:
return True
else:
assert False, conn.pgconn.transaction_status
from random import randrange, choice
from typing import Any # noqa: ignore
-from psycopg import sql, errors as e
-from psycopg.pq import Format
+from psycopg import pq, sql, errors as e
from psycopg.adapt import PyFormat
from psycopg.types.numeric import Int4
@pytest.mark.parametrize(
- "format, buffer", [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")]
+ "format, buffer",
+ [(pq.Format.TEXT, "sample_text"), (pq.Format.BINARY, "sample_binary")],
)
def test_copy_in_buffers(conn, format, buffer):
cur = conn.cursor()
with cur.copy("copy copy_in from stdin") as copy:
copy.write(sample_text)
copy.write(sample_text)
- assert conn.info.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.info.transaction_status == pq.TransactionStatus.INERROR
def test_copy_in_str(conn):
with cur.copy("copy copy_in from stdin with binary") as copy:
copy.write(sample_text.decode())
- assert conn.info.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.info.transaction_status == pq.TransactionStatus.INERROR
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_copy_in_empty(conn, format):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
with cur.copy(f"copy copy_in from stdin {copyopt(format)}"):
pass
- assert conn.info.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.info.transaction_status == pq.TransactionStatus.INTRANS
assert cur.rowcount == 0
copy.write(sample_text)
copy.write(sample_text)
- assert conn.info.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.info.transaction_status == pq.TransactionStatus.INERROR
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_copy_in_records(conn, format):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
with cur.copy(f"copy copy_in from stdin {copyopt(format)}") as copy:
row: "tuple[Any, ...]"
for row in sample_records:
- if format == Format.BINARY:
+ if format == pq.Format.BINARY:
row = tuple((Int4(i) if isinstance(i, int) else i for i in row))
copy.write_row(row)
assert data == sample_records
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_copy_in_records_set_types(conn, format):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
assert data == sample_records
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_copy_in_records_binary(conn, format):
cur = conn.cursor()
ensure_table(cur, "col1 serial primary key, col2 int4, data text")
raise Exception("nuttengoggenio")
assert "nuttengoggenio" in str(exc.value)
- assert conn.info.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.info.transaction_status == pq.TransactionStatus.INERROR
def test_copy_in_allchars(conn):
@pytest.mark.slow
@pytest.mark.parametrize(
- "fmt, set_types", [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)]
+ "fmt, set_types",
+ [(pq.Format.TEXT, True), (pq.Format.TEXT, False), (pq.Format.BINARY, True)],
)
@pytest.mark.crdb_skip("copy array")
def test_copy_from_leaks(conn_cls, dsn, faker, fmt, set_types, gc):
def copyopt(format):
- return "with binary" if format == Format.BINARY else ""
+ return "with binary" if format == pq.Format.BINARY else ""
from random import randrange, choice
from typing import Any # noqa: ignore
-from psycopg import sql, errors as e
-from psycopg.pq import Format
+from psycopg import pq, sql, errors as e
from psycopg.adapt import PyFormat
from psycopg.types.numeric import Int4
@pytest.mark.parametrize(
"format, buffer",
- [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")],
+ [(pq.Format.TEXT, "sample_text"), (pq.Format.BINARY, "sample_binary")],
)
async def test_copy_in_buffers(aconn, format, buffer):
cur = aconn.cursor()
async with cur.copy("copy copy_in from stdin") as copy:
await copy.write(sample_text)
await copy.write(sample_text)
- assert aconn.info.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.info.transaction_status == pq.TransactionStatus.INERROR
async def test_copy_in_str(aconn):
async with cur.copy("copy copy_in from stdin with binary") as copy:
await copy.write(sample_text.decode())
- assert aconn.info.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.info.transaction_status == pq.TransactionStatus.INERROR
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_copy_in_empty(aconn, format):
cur = aconn.cursor()
await ensure_table_async(cur, sample_tabledef)
async with cur.copy(f"copy copy_in from stdin {copyopt(format)}"):
pass
- assert aconn.info.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.info.transaction_status == pq.TransactionStatus.INTRANS
assert cur.rowcount == 0
await copy.write(sample_text)
await copy.write(sample_text)
- assert aconn.info.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.info.transaction_status == pq.TransactionStatus.INERROR
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_copy_in_records(aconn, format):
cur = aconn.cursor()
await ensure_table_async(cur, sample_tabledef)
async with cur.copy(f"copy copy_in from stdin {copyopt(format)}") as copy:
row: "tuple[Any, ...]"
for row in sample_records:
- if format == Format.BINARY:
+ if format == pq.Format.BINARY:
row = tuple(Int4(i) if isinstance(i, int) else i for i in row)
await copy.write_row(row)
assert data == sample_records
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_copy_in_records_set_types(aconn, format):
cur = aconn.cursor()
await ensure_table_async(cur, sample_tabledef)
assert data == sample_records
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_copy_in_records_binary(aconn, format):
cur = aconn.cursor()
await ensure_table_async(cur, "col1 serial primary key, col2 int4, data text")
raise Exception("nuttengoggenio")
assert "nuttengoggenio" in str(exc.value)
- assert aconn.info.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.info.transaction_status == pq.TransactionStatus.INERROR
async def test_copy_in_allchars(aconn):
@pytest.mark.slow
@pytest.mark.parametrize(
"fmt, set_types",
- [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
+ [(pq.Format.TEXT, True), (pq.Format.TEXT, False), (pq.Format.BINARY, True)],
)
@pytest.mark.crdb_skip("copy array")
async def test_copy_from_leaks(aconn_cls, dsn, faker, fmt, set_types, gc):
def copyopt(format):
- return "with binary" if format == Format.BINARY else ""
+ return "with binary" if format == pq.Format.BINARY else ""
for row in cur.stream(f"experimental changefeed for {testfeed}"):
q.put_nowait(row)
except e.QueryCanceled:
- assert conn.info.transaction_status == conn.TransactionStatus.IDLE
+ assert conn.info.transaction_status == pq.TransactionStatus.IDLE
q.put_nowait(None)
except Exception as ex:
q.put_nowait(ex)
):
q.put_nowait(row)
except e.QueryCanceled:
- assert conn.info.transaction_status == conn.TransactionStatus.IDLE
+ assert conn.info.transaction_status == pq.TransactionStatus.IDLE
q.put_nowait(None)
except Exception as ex:
q.put_nowait(ex)
def test_connect(conn_cls, dsn):
conn = conn_cls.connect(dsn)
assert not conn.closed
- assert conn.pgconn.status == conn.ConnStatus.OK
+ assert conn.pgconn.status == pq.ConnStatus.OK
conn.close()
conn = conn_cls.connect(MyString(dsn))
assert not conn.closed
- assert conn.pgconn.status == conn.ConnStatus.OK
+ assert conn.pgconn.status == pq.ConnStatus.OK
conn.close()
conn.close()
assert conn.closed
assert not conn.broken
- assert conn.pgconn.status == conn.ConnStatus.BAD
+ assert conn.pgconn.status == pq.ConnStatus.BAD
conn.close()
assert conn.closed
- assert conn.pgconn.status == conn.ConnStatus.BAD
+ assert conn.pgconn.status == pq.ConnStatus.BAD
with pytest.raises(psycopg.OperationalError):
cur.execute("select 1")
conn.pgconn.exec_(b"copy (select generate_series(1, 10)) to stdout")
assert not conn.pgconn.error_message
status = conn.info.transaction_status
- assert status == conn.TransactionStatus.ACTIVE
+ assert status == pq.TransactionStatus.ACTIVE
1 / 0
assert len(caplog.records) == 1
conn.pgconn.exec_(b"drop table if exists foo")
conn.pgconn.exec_(b"create table foo (id int primary key)")
conn.pgconn.exec_(b"begin")
- assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
conn.pgconn.exec_(b"insert into foo values (1)")
conn.commit()
- assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
res = conn.pgconn.exec_(b"select id from foo where id = 1")
assert res.get_value(0, 0) == b"1"
conn.execute("insert into selfref (y) values (-1)")
with pytest.raises(e.ForeignKeyViolation):
conn.commit()
- assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
cur = conn.execute("select 1")
assert cur.fetchone() == (1,)
conn.pgconn.exec_(b"drop table if exists foo")
conn.pgconn.exec_(b"create table foo (id int primary key)")
conn.pgconn.exec_(b"begin")
- assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
conn.pgconn.exec_(b"insert into foo values (1)")
conn.rollback()
- assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
res = conn.pgconn.exec_(b"select id from foo where id = 1")
assert res.ntuples == 0
conn.pgconn.exec_(b"create table foo (id int primary key)")
cur = conn.cursor()
- assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
cur.execute("insert into foo values (1)")
- assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
conn.commit()
- assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
cur.execute("select * from foo")
assert cur.fetchone() == (1,)
- assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
def test_auto_transaction_fail(conn):
conn.pgconn.exec_(b"create table foo (id int primary key)")
cur = conn.cursor()
- assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
cur.execute("insert into foo values (1)")
- assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
with pytest.raises(psycopg.DatabaseError):
cur.execute("meh")
- assert conn.pgconn.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.INERROR
with pytest.raises(psycopg.errors.InFailedSqlTransaction):
cur.execute("select 1")
conn.commit()
- assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
cur.execute("select * from foo")
assert cur.fetchone() is None
- assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
@skip_sync
cur = conn.cursor()
cur.execute("select 1")
assert cur.fetchone() == (1,)
- assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
conn.set_autocommit("")
assert isinstance(conn.autocommit, bool)
cur = conn.cursor()
cur.execute("select 1")
assert cur.fetchone() == (1,)
- assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
conn.autocommit = ""
assert isinstance(conn.autocommit, bool)
cur = conn.cursor()
cur.execute("select 1")
assert cur.fetchone() == (1,)
- assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
with pytest.raises(psycopg.ProgrammingError):
conn.set_autocommit(True)
assert not conn.autocommit
cur = conn.cursor()
with pytest.raises(psycopg.DatabaseError):
cur.execute("meh")
- assert conn.pgconn.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.INERROR
with pytest.raises(psycopg.ProgrammingError):
conn.set_autocommit(True)
assert not conn.autocommit
def test_autocommit_unknown(conn):
conn.close()
- assert conn.pgconn.transaction_status == conn.TransactionStatus.UNKNOWN
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.UNKNOWN
with pytest.raises(psycopg.OperationalError):
conn.set_autocommit(True)
assert not conn.autocommit
async def test_connect(aconn_cls, dsn):
conn = await aconn_cls.connect(dsn)
assert not conn.closed
- assert conn.pgconn.status == conn.ConnStatus.OK
+ assert conn.pgconn.status == pq.ConnStatus.OK
await conn.close()
conn = await aconn_cls.connect(MyString(dsn))
assert not conn.closed
- assert conn.pgconn.status == conn.ConnStatus.OK
+ assert conn.pgconn.status == pq.ConnStatus.OK
await conn.close()
await aconn.close()
assert aconn.closed
assert not aconn.broken
- assert aconn.pgconn.status == aconn.ConnStatus.BAD
+ assert aconn.pgconn.status == pq.ConnStatus.BAD
await aconn.close()
assert aconn.closed
- assert aconn.pgconn.status == aconn.ConnStatus.BAD
+ assert aconn.pgconn.status == pq.ConnStatus.BAD
with pytest.raises(psycopg.OperationalError):
await cur.execute("select 1")
conn.pgconn.exec_(b"copy (select generate_series(1, 10)) to stdout")
assert not conn.pgconn.error_message
status = conn.info.transaction_status
- assert status == conn.TransactionStatus.ACTIVE
+ assert status == pq.TransactionStatus.ACTIVE
1 / 0
assert len(caplog.records) == 1
aconn.pgconn.exec_(b"drop table if exists foo")
aconn.pgconn.exec_(b"create table foo (id int primary key)")
aconn.pgconn.exec_(b"begin")
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
aconn.pgconn.exec_(b"insert into foo values (1)")
await aconn.commit()
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE
res = aconn.pgconn.exec_(b"select id from foo where id = 1")
assert res.get_value(0, 0) == b"1"
await aconn.execute("insert into selfref (y) values (-1)")
with pytest.raises(e.ForeignKeyViolation):
await aconn.commit()
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE
cur = await aconn.execute("select 1")
assert await cur.fetchone() == (1,)
aconn.pgconn.exec_(b"drop table if exists foo")
aconn.pgconn.exec_(b"create table foo (id int primary key)")
aconn.pgconn.exec_(b"begin")
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
aconn.pgconn.exec_(b"insert into foo values (1)")
await aconn.rollback()
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE
res = aconn.pgconn.exec_(b"select id from foo where id = 1")
assert res.ntuples == 0
aconn.pgconn.exec_(b"create table foo (id int primary key)")
cur = aconn.cursor()
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE
await cur.execute("insert into foo values (1)")
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
await aconn.commit()
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE
await cur.execute("select * from foo")
assert await cur.fetchone() == (1,)
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
async def test_auto_transaction_fail(aconn):
aconn.pgconn.exec_(b"create table foo (id int primary key)")
cur = aconn.cursor()
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE
await cur.execute("insert into foo values (1)")
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
with pytest.raises(psycopg.DatabaseError):
await cur.execute("meh")
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.INERROR
with pytest.raises(psycopg.errors.InFailedSqlTransaction):
await cur.execute("select 1")
await aconn.commit()
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE
await cur.execute("select * from foo")
assert await cur.fetchone() is None
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
@skip_sync
cur = aconn.cursor()
await cur.execute("select 1")
assert await cur.fetchone() == (1,)
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE
await aconn.set_autocommit("")
assert isinstance(aconn.autocommit, bool)
cur = conn.cursor()
cur.execute("select 1")
assert cur.fetchone() == (1,)
- assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
conn.autocommit = ""
assert isinstance(conn.autocommit, bool)
cur = aconn.cursor()
await cur.execute("select 1")
assert await cur.fetchone() == (1,)
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
with pytest.raises(psycopg.ProgrammingError):
await aconn.set_autocommit(True)
assert not aconn.autocommit
cur = aconn.cursor()
with pytest.raises(psycopg.DatabaseError):
await cur.execute("meh")
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.INERROR
with pytest.raises(psycopg.ProgrammingError):
await aconn.set_autocommit(True)
assert not aconn.autocommit
async def test_autocommit_unknown(aconn):
await aconn.close()
- assert aconn.pgconn.transaction_status == aconn.TransactionStatus.UNKNOWN
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.UNKNOWN
with pytest.raises(psycopg.OperationalError):
await aconn.set_autocommit(True)
assert not aconn.autocommit
import pytest
import psycopg
-from psycopg import pq
-from psycopg import sql
-from psycopg import errors as e
-from psycopg.pq import Format
+from psycopg import pq, sql, errors as e
from psycopg.copy import Copy, LibpqWriter, QueuedLibpqWriter
from psycopg.adapt import PyFormat
from psycopg.types import TypeInfo
pytestmark = pytest.mark.crdb_skip("copy")
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_copy_out_read(conn, format):
if format == pq.Format.TEXT:
want = [row + b"\n" for row in sample_text.splitlines()]
for row in want:
got = copy.read()
assert got == row
- assert conn.info.transaction_status == conn.TransactionStatus.ACTIVE
+ assert conn.info.transaction_status == pq.TransactionStatus.ACTIVE
assert copy.read() == b""
assert copy.read() == b""
assert copy.read() == b""
- assert conn.info.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.info.transaction_status == pq.TransactionStatus.INTRANS
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
def test_copy_out_iter(conn, format, row_factory):
if format == pq.Format.TEXT:
with cur.copy(f"copy ({sample_values}) to stdout (format {format.name})") as copy:
assert list(copy) == want
- assert conn.info.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.info.transaction_status == pq.TransactionStatus.INTRANS
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
def test_copy_out_no_result(conn, format, row_factory):
rf = getattr(psycopg.rows, row_factory)
copy.set_types(["int4"])
assert list(copy.rows()) == [(i + 1,) for i in range(10)]
- assert conn.info.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.info.transaction_status == pq.TransactionStatus.INTRANS
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
@pytest.mark.parametrize("typetype", ["names", "oids"])
def test_read_rows(conn, format, typetype):
cur = conn.cursor()
assert copy.read_row() is None
assert row == (10, "hello", [0.0, 1.0])
- assert conn.info.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.info.transaction_status == pq.TransactionStatus.INTRANS
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_rows(conn, format):
cur = conn.cursor()
with cur.copy(f"copy ({sample_values}) to stdout (format {format.name})") as copy:
rows = list(copy.rows())
assert rows == sample_records
- assert conn.info.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.info.transaction_status == pq.TransactionStatus.INTRANS
def test_set_custom_type(conn, hstore):
assert rows == [({"a": "1", "b": "2"},)]
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_copy_out_allchars(conn, format):
cur = conn.cursor()
chars = list(map(chr, range(1, 256))) + [eur]
assert rows == chars
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_read_row_notypes(conn, format):
cur = conn.cursor()
with cur.copy(f"copy ({sample_values}) to stdout (format {format.name})") as copy:
assert rows == ref
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_rows_notypes(conn, format):
cur = conn.cursor()
with cur.copy(f"copy ({sample_values}) to stdout (format {format.name})") as copy:
@pytest.mark.parametrize("err", [-1, 1])
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_copy_out_badntypes(conn, format, err):
cur = conn.cursor()
with cur.copy(f"copy ({sample_values}) to stdout (format {format.name})") as copy:
@pytest.mark.parametrize(
- "format, buffer", [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")]
+ "format, buffer",
+ [(pq.Format.TEXT, "sample_text"), (pq.Format.BINARY, "sample_binary")],
)
def test_copy_in_buffers(conn, format, buffer):
cur = conn.cursor()
with cur.copy("copy copy_in from stdin (format text)") as copy:
copy.write(sample_text)
copy.write(sample_text)
- assert conn.info.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.info.transaction_status == pq.TransactionStatus.INERROR
def test_copy_bad_result(conn):
with cur.copy("copy copy_in from stdin (format binary)") as copy:
copy.write(sample_text.decode())
- assert conn.info.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.info.transaction_status == pq.TransactionStatus.INERROR
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_copy_in_empty(conn, format):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
with cur.copy(f"copy copy_in from stdin (format {format.name})"):
pass
- assert conn.info.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.info.transaction_status == pq.TransactionStatus.INTRANS
assert cur.rowcount == 0
assert cur.fetchone() == (data,)
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_subclass_adapter(conn, format):
- if format == Format.TEXT:
+ if format == pq.Format.TEXT:
from psycopg.types.string import StrDumper as BaseDumper
else:
from psycopg.types.string import StrBinaryDumper
assert rec[0] == "hellohello"
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_subclass_nulling_dumper(conn, format):
- Base: type = StrNoneDumper if format == Format.TEXT else StrNoneBinaryDumper
+ Base: type = StrNoneDumper if format == pq.Format.TEXT else StrNoneBinaryDumper
class MyStrDumper(Base): # type: ignore
assert recs == [("hello",), (None,)]
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_copy_in_error_empty(conn, format):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
with cur.copy(f"copy copy_in from stdin (format {format.name})"):
raise ZeroDivisionError("mannaggiamiseria")
- assert conn.info.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.info.transaction_status == pq.TransactionStatus.INERROR
def test_copy_in_buffers_with_pg_error(conn):
copy.write(sample_text)
copy.write(sample_text)
- assert conn.info.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.info.transaction_status == pq.TransactionStatus.INERROR
def test_copy_in_buffers_with_py_error(conn):
copy.write(sample_text)
raise ZeroDivisionError("nuttengoggenio")
- assert conn.info.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.info.transaction_status == pq.TransactionStatus.INERROR
def test_copy_out_error_with_copy_finished(conn):
copy.read_row()
1 / 0
- assert conn.info.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.info.transaction_status == pq.TransactionStatus.INTRANS
def test_copy_out_error_with_copy_not_finished(conn):
copy.read_row()
1 / 0
- assert conn.info.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.info.transaction_status == pq.TransactionStatus.INERROR
def test_copy_out_server_error(conn):
for block in copy:
pass
- assert conn.info.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.info.transaction_status == pq.TransactionStatus.INERROR
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_copy_in_records(conn, format):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
with cur.copy(f"copy copy_in from stdin (format {format.name})") as copy:
for row in sample_records:
- if format == Format.BINARY:
+ if format == pq.Format.BINARY:
row2 = tuple((Int4(i) if isinstance(i, int) else i for i in row))
row = row2 # type: ignore[assignment]
copy.write_row(row)
assert data == sample_records
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_copy_in_records_set_types(conn, format):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
assert data == sample_records
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
def test_copy_in_records_binary(conn, format):
cur = conn.cursor()
ensure_table(cur, "col1 serial primary key, col2 int, data text")
@pytest.mark.parametrize(
- "format, buffer", [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")]
+ "format, buffer",
+ [(pq.Format.TEXT, "sample_text"), (pq.Format.BINARY, "sample_binary")],
)
def test_file_writer(conn, format, buffer):
file = BytesIO()
@pytest.mark.parametrize(
- "format, buffer", [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")]
+ "format, buffer",
+ [(pq.Format.TEXT, "sample_text"), (pq.Format.BINARY, "sample_binary")],
)
def test_worker_life(conn, format, buffer):
cur = conn.cursor()
@pytest.mark.parametrize(
- "format, buffer", [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")]
+ "format, buffer",
+ [(pq.Format.TEXT, "sample_text"), (pq.Format.BINARY, "sample_binary")],
)
def test_connection_writer(conn, format, buffer):
cur = conn.cursor()
@pytest.mark.slow
@pytest.mark.parametrize(
- "fmt, set_types", [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)]
+ "fmt, set_types",
+ [(pq.Format.TEXT, True), (pq.Format.TEXT, False), (pq.Format.BINARY, True)],
)
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
def test_copy_to_leaks(conn_cls, dsn, faker, fmt, set_types, method, gc):
@pytest.mark.slow
@pytest.mark.parametrize(
- "fmt, set_types", [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)]
+ "fmt, set_types",
+ [(pq.Format.TEXT, True), (pq.Format.TEXT, False), (pq.Format.BINARY, True)],
)
def test_copy_from_leaks(conn_cls, dsn, faker, fmt, set_types, gc):
faker.format = PyFormat.from_pq(fmt)
import pytest
import psycopg
-from psycopg import pq
-from psycopg import sql
-from psycopg import errors as e
-from psycopg.pq import Format
+from psycopg import pq, sql, errors as e
from psycopg.copy import AsyncCopy, AsyncLibpqWriter, AsyncQueuedLibpqWriter
from psycopg.adapt import PyFormat
from psycopg.types import TypeInfo
pytestmark = pytest.mark.crdb_skip("copy")
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_copy_out_read(aconn, format):
if format == pq.Format.TEXT:
want = [row + b"\n" for row in sample_text.splitlines()]
for row in want:
got = await copy.read()
assert got == row
- assert aconn.info.transaction_status == aconn.TransactionStatus.ACTIVE
+ assert aconn.info.transaction_status == pq.TransactionStatus.ACTIVE
assert await copy.read() == b""
assert await copy.read() == b""
assert await copy.read() == b""
- assert aconn.info.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.info.transaction_status == pq.TransactionStatus.INTRANS
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
async def test_copy_out_iter(aconn, format, row_factory):
if format == pq.Format.TEXT:
) as copy:
assert await alist(copy) == want
- assert aconn.info.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.info.transaction_status == pq.TransactionStatus.INTRANS
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
async def test_copy_out_no_result(aconn, format, row_factory):
rf = getattr(psycopg.rows, row_factory)
copy.set_types(["int4"])
assert await alist(copy.rows()) == [(i + 1,) for i in range(10)]
- assert aconn.info.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.info.transaction_status == pq.TransactionStatus.INTRANS
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
@pytest.mark.parametrize("typetype", ["names", "oids"])
async def test_read_rows(aconn, format, typetype):
cur = aconn.cursor()
assert (await copy.read_row()) is None
assert row == (10, "hello", [0.0, 1.0])
- assert aconn.info.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.info.transaction_status == pq.TransactionStatus.INTRANS
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_rows(aconn, format):
cur = aconn.cursor()
async with cur.copy(
rows = await alist(copy.rows())
assert rows == sample_records
- assert aconn.info.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.info.transaction_status == pq.TransactionStatus.INTRANS
async def test_set_custom_type(aconn, hstore):
assert rows == [({"a": "1", "b": "2"},)]
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_copy_out_allchars(aconn, format):
cur = aconn.cursor()
chars = list(map(chr, range(1, 256))) + [eur]
assert rows == chars
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_read_row_notypes(aconn, format):
cur = aconn.cursor()
async with cur.copy(
assert rows == ref
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_rows_notypes(aconn, format):
cur = aconn.cursor()
async with cur.copy(
@pytest.mark.parametrize("err", [-1, 1])
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_copy_out_badntypes(aconn, format, err):
cur = aconn.cursor()
async with cur.copy(
@pytest.mark.parametrize(
- "format, buffer", [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")]
+ "format, buffer",
+ [(pq.Format.TEXT, "sample_text"), (pq.Format.BINARY, "sample_binary")],
)
async def test_copy_in_buffers(aconn, format, buffer):
cur = aconn.cursor()
async with cur.copy("copy copy_in from stdin (format text)") as copy:
await copy.write(sample_text)
await copy.write(sample_text)
- assert aconn.info.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.info.transaction_status == pq.TransactionStatus.INERROR
async def test_copy_bad_result(aconn):
async with cur.copy("copy copy_in from stdin (format binary)") as copy:
await copy.write(sample_text.decode())
- assert aconn.info.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.info.transaction_status == pq.TransactionStatus.INERROR
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_copy_in_empty(aconn, format):
cur = aconn.cursor()
await ensure_table_async(cur, sample_tabledef)
async with cur.copy(f"copy copy_in from stdin (format {format.name})"):
pass
- assert aconn.info.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.info.transaction_status == pq.TransactionStatus.INTRANS
assert cur.rowcount == 0
assert await cur.fetchone() == (data,)
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_subclass_adapter(aconn, format):
- if format == Format.TEXT:
+ if format == pq.Format.TEXT:
from psycopg.types.string import StrDumper as BaseDumper
else:
from psycopg.types.string import StrBinaryDumper
assert rec[0] == "hellohello"
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_subclass_nulling_dumper(aconn, format):
- Base: type = StrNoneDumper if format == Format.TEXT else StrNoneBinaryDumper
+ Base: type = StrNoneDumper if format == pq.Format.TEXT else StrNoneBinaryDumper
class MyStrDumper(Base): # type: ignore
def dump(self, obj):
assert recs == [("hello",), (None,)]
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_copy_in_error_empty(aconn, format):
cur = aconn.cursor()
await ensure_table_async(cur, sample_tabledef)
async with cur.copy(f"copy copy_in from stdin (format {format.name})"):
raise ZeroDivisionError("mannaggiamiseria")
- assert aconn.info.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.info.transaction_status == pq.TransactionStatus.INERROR
async def test_copy_in_buffers_with_pg_error(aconn):
await copy.write(sample_text)
await copy.write(sample_text)
- assert aconn.info.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.info.transaction_status == pq.TransactionStatus.INERROR
async def test_copy_in_buffers_with_py_error(aconn):
await copy.write(sample_text)
raise ZeroDivisionError("nuttengoggenio")
- assert aconn.info.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.info.transaction_status == pq.TransactionStatus.INERROR
async def test_copy_out_error_with_copy_finished(aconn):
await copy.read_row()
1 / 0
- assert aconn.info.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.info.transaction_status == pq.TransactionStatus.INTRANS
async def test_copy_out_error_with_copy_not_finished(aconn):
await copy.read_row()
1 / 0
- assert aconn.info.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.info.transaction_status == pq.TransactionStatus.INERROR
async def test_copy_out_server_error(aconn):
async for block in copy:
pass
- assert aconn.info.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.info.transaction_status == pq.TransactionStatus.INERROR
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_copy_in_records(aconn, format):
cur = aconn.cursor()
await ensure_table_async(cur, sample_tabledef)
async with cur.copy(f"copy copy_in from stdin (format {format.name})") as copy:
for row in sample_records:
- if format == Format.BINARY:
+ if format == pq.Format.BINARY:
row2 = tuple(Int4(i) if isinstance(i, int) else i for i in row)
row = row2 # type: ignore[assignment]
await copy.write_row(row)
assert data == sample_records
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_copy_in_records_set_types(aconn, format):
cur = aconn.cursor()
await ensure_table_async(cur, sample_tabledef)
assert data == sample_records
-@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("format", pq.Format)
async def test_copy_in_records_binary(aconn, format):
cur = aconn.cursor()
await ensure_table_async(cur, "col1 serial primary key, col2 int, data text")
@pytest.mark.parametrize(
- "format, buffer", [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")]
+ "format, buffer",
+ [(pq.Format.TEXT, "sample_text"), (pq.Format.BINARY, "sample_binary")],
)
async def test_file_writer(aconn, format, buffer):
file = BytesIO()
@pytest.mark.parametrize(
- "format, buffer", [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")]
+ "format, buffer",
+ [(pq.Format.TEXT, "sample_text"), (pq.Format.BINARY, "sample_binary")],
)
async def test_worker_life(aconn, format, buffer):
cur = aconn.cursor()
@pytest.mark.parametrize(
- "format, buffer", [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")]
+ "format, buffer",
+ [(pq.Format.TEXT, "sample_text"), (pq.Format.BINARY, "sample_binary")],
)
async def test_connection_writer(aconn, format, buffer):
cur = aconn.cursor()
@pytest.mark.slow
@pytest.mark.parametrize(
"fmt, set_types",
- [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
+ [(pq.Format.TEXT, True), (pq.Format.TEXT, False), (pq.Format.BINARY, True)],
)
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
async def test_copy_to_leaks(aconn_cls, dsn, faker, fmt, set_types, method, gc):
@pytest.mark.slow
@pytest.mark.parametrize(
"fmt, set_types",
- [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
+ [(pq.Format.TEXT, True), (pq.Format.TEXT, False), (pq.Format.BINARY, True)],
)
async def test_copy_from_leaks(aconn_cls, dsn, faker, fmt, set_types, gc):
faker.format = PyFormat.from_pq(fmt)
import pytest
import psycopg
-from psycopg import sql, rows
+from psycopg import pq, sql, rows
from psycopg.adapt import PyFormat
from psycopg.types import TypeInfo
def test_execute_empty_query(conn, query):
cur = conn.cursor()
cur.execute(query)
- assert cur.pgresult.status == cur.ExecStatus.EMPTY_QUERY
+ assert cur.pgresult.status == pq.ExecStatus.EMPTY_QUERY
with pytest.raises(psycopg.ProgrammingError):
cur.fetchone()
with pytest.raises(psycopg.ProgrammingError):
for rec in cur.stream("wat"):
pass
- assert conn.info.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.info.transaction_status == pq.TransactionStatus.INERROR
def test_stream_error_notx(conn):
with pytest.raises(psycopg.ProgrammingError):
for rec in cur.stream("wat"):
pass
- assert conn.info.transaction_status == conn.TransactionStatus.IDLE
+ assert conn.info.transaction_status == pq.TransactionStatus.IDLE
def test_stream_error_python_to_consume(conn):
for rec in gen:
1 / 0
assert conn.info.transaction_status in (
- conn.TransactionStatus.INTRANS,
- conn.TransactionStatus.INERROR,
+ pq.TransactionStatus.INTRANS,
+ pq.TransactionStatus.INERROR,
)
1 / 0
gen.close()
- assert conn.info.transaction_status == conn.TransactionStatus.INTRANS
+ assert conn.info.transaction_status == pq.TransactionStatus.INTRANS
@pytest.mark.parametrize("autocommit", [False, True])
import pytest
import psycopg
-from psycopg import sql, rows
+from psycopg import pq, sql, rows
from psycopg.adapt import PyFormat
from psycopg.types import TypeInfo
async def test_execute_empty_query(aconn, query):
cur = aconn.cursor()
await cur.execute(query)
- assert cur.pgresult.status == cur.ExecStatus.EMPTY_QUERY
+ assert cur.pgresult.status == pq.ExecStatus.EMPTY_QUERY
with pytest.raises(psycopg.ProgrammingError):
await cur.fetchone()
with pytest.raises(psycopg.ProgrammingError):
async for rec in cur.stream("wat"):
pass
- assert aconn.info.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.info.transaction_status == pq.TransactionStatus.INERROR
async def test_stream_error_notx(aconn):
with pytest.raises(psycopg.ProgrammingError):
async for rec in cur.stream("wat"):
pass
- assert aconn.info.transaction_status == aconn.TransactionStatus.IDLE
+ assert aconn.info.transaction_status == pq.TransactionStatus.IDLE
async def test_stream_error_python_to_consume(aconn):
async for rec in gen:
1 / 0
assert aconn.info.transaction_status in (
- aconn.TransactionStatus.INTRANS,
- aconn.TransactionStatus.INERROR,
+ pq.TransactionStatus.INTRANS,
+ pq.TransactionStatus.INERROR,
)
1 / 0
await gen.aclose()
- assert aconn.info.transaction_status == aconn.TransactionStatus.INTRANS
+ assert aconn.info.transaction_status == pq.TransactionStatus.INTRANS
@pytest.mark.parametrize("autocommit", [False, True])
import pytest
import psycopg
-from psycopg import rows, errors as e
-from psycopg.pq import Format
+from psycopg import pq, rows, errors as e
pytestmark = pytest.mark.crdb_skip("server-side cursor")
def test_format(conn):
cur = conn.cursor("foo")
- assert cur.format == Format.TEXT
+ assert cur.format == pq.Format.TEXT
cur.close()
cur = conn.cursor("foo", binary=True)
- assert cur.format == Format.BINARY
+ assert cur.format == pq.Format.BINARY
cur.close()
def test_close(conn, recwarn):
- if conn.info.transaction_status == conn.TransactionStatus.INTRANS:
+ if conn.info.transaction_status == pq.TransactionStatus.INTRANS:
# connection dirty from previous failure
conn.execute("close foo")
recwarn.clear()
cur.execute("select 1")
with pytest.raises(e.ProgrammingError):
conn.execute("wat")
- assert conn.info.transaction_status == conn.TransactionStatus.INERROR
+ assert conn.info.transaction_status == pq.TransactionStatus.INERROR
cur.close()
import pytest
import psycopg
-from psycopg import rows, errors as e
-from psycopg.pq import Format
+from psycopg import pq, rows, errors as e
from .acompat import alist
async def test_format(aconn):
cur = aconn.cursor("foo")
- assert cur.format == Format.TEXT
+ assert cur.format == pq.Format.TEXT
await cur.close()
cur = aconn.cursor("foo", binary=True)
- assert cur.format == Format.BINARY
+ assert cur.format == pq.Format.BINARY
await cur.close()
async def test_close(aconn, recwarn):
- if aconn.info.transaction_status == aconn.TransactionStatus.INTRANS:
+ if aconn.info.transaction_status == pq.TransactionStatus.INTRANS:
# connection dirty from previous failure
await aconn.execute("close foo")
recwarn.clear()
await cur.execute("select 1")
with pytest.raises(e.ProgrammingError):
await aconn.execute("wat")
- assert aconn.info.transaction_status == aconn.TransactionStatus.INERROR
+ assert aconn.info.transaction_status == pq.TransactionStatus.INERROR
await cur.close()
import pytest
from psycopg import Rollback
-from psycopg import errors as e
+from psycopg import pq, errors as e
from ._test_transaction import in_transaction, insert_row, inserted, get_exc_info
from ._test_transaction import ExpectedException, crdb_skip_external_observer
with conn.transaction():
conn.pgconn.exec_(b"copy (select generate_series(1, 10)) to stdout")
status = conn.info.transaction_status
- assert status == conn.TransactionStatus.ACTIVE
+ assert status == pq.TransactionStatus.ACTIVE
1 / 0
assert len(caplog.records) == 1
import pytest
from psycopg import Rollback
-from psycopg import errors as e
+from psycopg import pq, errors as e
from ._test_transaction import in_transaction, insert_row, inserted, get_exc_info
from ._test_transaction import ExpectedException, crdb_skip_external_observer
async with conn.transaction():
conn.pgconn.exec_(b"copy (select generate_series(1, 10)) to stdout")
status = conn.info.transaction_status
- assert status == conn.TransactionStatus.ACTIVE
+ assert status == pq.TransactionStatus.ACTIVE
1 / 0
assert len(caplog.records) == 1