conn = pq.PGconn.connect_start(conninfo)
logger.debug("connection started, status %s", conn.status.name)
while 1:
- if conn.status == pq.ConnStatus.CONNECTION_BAD:
+ if conn.status == pq.ConnStatus.BAD:
raise exc.OperationalError(
f"connection is bad: {pq.error_message(conn)}"
)
status = conn.connect_poll()
logger.debug("connection polled, status %s", conn.status.name)
- if status == pq.PollingStatus.PGRES_POLLING_OK:
+ if status == pq.PollingStatus.OK:
break
- elif status == pq.PollingStatus.PGRES_POLLING_READING:
+ elif status == pq.PollingStatus.READING:
yield conn.socket, Wait.R
- elif status == pq.PollingStatus.PGRES_POLLING_WRITING:
+ elif status == pq.PollingStatus.WRITING:
yield conn.socket, Wait.W
- elif status == pq.PollingStatus.PGRES_POLLING_FAILED:
+ elif status == pq.PollingStatus.FAILED:
raise exc.OperationalError(
f"connection failed: {pq.error_message(conn)}"
)
def _exec_commit_rollback(self, command):
with self.lock:
status = self.pgconn.transaction_status
- if status == pq.TransactionStatus.PQTRANS_IDLE:
+ if status == pq.TransactionStatus.IDLE:
return
self.pgconn.send_query(command)
(pgres,) = wait_select(self._exec_gen(self.pgconn))
- if pgres.status != pq.ExecStatus.PGRES_COMMAND_OK:
+ if pgres.status != pq.ExecStatus.COMMAND_OK:
raise exc.OperationalError(
f"error on {command.decode('utf8')}:"
f" {pq.error_message(pgres)}"
async def _exec_commit_rollback(self, command):
with self.lock:
status = self.pgconn.transaction_status
- if status == pq.TransactionStatus.PQTRANS_IDLE:
+ if status == pq.TransactionStatus.IDLE:
return
self.pgconn.send_query(command)
(pgres,) = await wait_async(self._exec_gen(self.pgconn))
- if pgres.status != pq.ExecStatus.PGRES_COMMAND_OK:
+ if pgres.status != pq.ExecStatus.COMMAND_OK:
raise exc.OperationalError(
f"error on {command.decode('utf8')}:"
f" {pq.error_message(pgres)}"
class ConnStatus(IntEnum):
- CONNECTION_OK = 0
- CONNECTION_BAD = auto()
-
- CONNECTION_STARTED = auto()
- CONNECTION_MADE = auto()
- CONNECTION_AWAITING_RESPONSE = auto()
- CONNECTION_AUTH_OK = auto()
- CONNECTION_SETENV = auto()
- CONNECTION_SSL_STARTUP = auto()
- CONNECTION_NEEDED = auto()
- CONNECTION_CHECK_WRITABLE = auto()
- CONNECTION_CONSUME = auto()
- CONNECTION_GSS_STARTUP = auto()
- CONNECTION_CHECK_TARGET = auto()
+ OK = 0
+ BAD = auto()
+ STARTED = auto()
+ MADE = auto()
+ AWAITING_RESPONSE = auto()
+ AUTH_OK = auto()
+ SETENV = auto()
+ SSL_STARTUP = auto()
+ NEEDED = auto()
+ CHECK_WRITABLE = auto()
+ CONSUME = auto()
+ GSS_STARTUP = auto()
+ CHECK_TARGET = auto()
class PollingStatus(IntEnum):
- PGRES_POLLING_FAILED = 0
- PGRES_POLLING_READING = auto()
- PGRES_POLLING_WRITING = auto()
- PGRES_POLLING_OK = auto()
- PGRES_POLLING_ACTIVE = auto()
+ FAILED = 0
+ READING = auto()
+ WRITING = auto()
+ OK = auto()
+ ACTIVE = auto()
class ExecStatus(IntEnum):
- PGRES_EMPTY_QUERY = 0
- PGRES_COMMAND_OK = auto()
- PGRES_TUPLES_OK = auto()
- PGRES_COPY_OUT = auto()
- PGRES_COPY_IN = auto()
- PGRES_BAD_RESPONSE = auto()
- PGRES_NONFATAL_ERROR = auto()
- PGRES_FATAL_ERROR = auto()
- PGRES_COPY_BOTH = auto()
- PGRES_SINGLE_TUPLE = auto()
+ EMPTY_QUERY = 0
+ COMMAND_OK = auto()
+ TUPLES_OK = auto()
+ COPY_OUT = auto()
+ COPY_IN = auto()
+ BAD_RESPONSE = auto()
+ NONFATAL_ERROR = auto()
+ FATAL_ERROR = auto()
+ COPY_BOTH = auto()
+ SINGLE_TUPLE = auto()
class TransactionStatus(IntEnum):
- PQTRANS_IDLE = 0
- PQTRANS_ACTIVE = auto()
- PQTRANS_INTRANS = auto()
- PQTRANS_INERROR = auto()
- PQTRANS_UNKNOWN = auto()
+ IDLE = 0
+ ACTIVE = auto()
+ INTRANS = auto()
+ INERROR = auto()
+ UNKNOWN = auto()
class Ping(IntEnum):
- PQPING_OK = 0
- PQPING_REJECT = auto()
- PQPING_NO_RESPONSE = auto()
- PQPING_NO_ATTEMPT = auto()
+ OK = 0
+ REJECT = auto()
+ NO_RESPONSE = auto()
+ NO_ATTEMPT = auto()
class DiagnosticField(IntEnum):
# from postgres_ext.h
- PG_DIAG_SEVERITY = ord("S")
- PG_DIAG_SEVERITY_NONLOCALIZED = ord("V")
- PG_DIAG_SQLSTATE = ord("C")
- PG_DIAG_MESSAGE_PRIMARY = ord("M")
- PG_DIAG_MESSAGE_DETAIL = ord("D")
- PG_DIAG_MESSAGE_HINT = ord("H")
- PG_DIAG_STATEMENT_POSITION = ord("P")
- PG_DIAG_INTERNAL_POSITION = ord("p")
- PG_DIAG_INTERNAL_QUERY = ord("q")
- PG_DIAG_CONTEXT = ord("W")
- PG_DIAG_SCHEMA_NAME = ord("s")
- PG_DIAG_TABLE_NAME = ord("t")
- PG_DIAG_COLUMN_NAME = ord("c")
- PG_DIAG_DATATYPE_NAME = ord("d")
- PG_DIAG_CONSTRAINT_NAME = ord("n")
- PG_DIAG_SOURCE_FILE = ord("F")
- PG_DIAG_SOURCE_LINE = ord("L")
- PG_DIAG_SOURCE_FUNCTION = ord("R")
+ SEVERITY = ord("S")
+ SEVERITY_NONLOCALIZED = ord("V")
+ SQLSTATE = ord("C")
+ MESSAGE_PRIMARY = ord("M")
+ MESSAGE_DETAIL = ord("D")
+ MESSAGE_HINT = ord("H")
+ STATEMENT_POSITION = ord("P")
+ INTERNAL_POSITION = ord("p")
+ INTERNAL_QUERY = ord("q")
+ CONTEXT = ord("W")
+ SCHEMA_NAME = ord("s")
+ TABLE_NAME = ord("t")
+ COLUMN_NAME = ord("c")
+ DATATYPE_NAME = ord("d")
+ CONSTRAINT_NAME = ord("n")
+ SOURCE_FILE = ord("F")
+ SOURCE_LINE = ord("L")
+ SOURCE_FUNCTION = ord("R")
msg = msg.splitlines()[0].split(b":", 1)[-1].strip()
elif isinstance(obj, pq.PGresult):
- msg = obj.error_field(pq.DiagnosticField.PG_DIAG_MESSAGE_PRIMARY)
+ msg = obj.error_field(pq.DiagnosticField.MESSAGE_PRIMARY)
if not msg:
msg = obj.error_message
res = pgconn.get_result()
if res is None:
break
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.status == pq.ExecStatus.TUPLES_OK
results.append(res)
assert len(results) == 2
def test_exec_params(pq, pgconn):
res = pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"])
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"8"
def test_exec_params_empty(pq, pgconn):
res = pgconn.exec_params(b"select 8::int", [])
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"8"
def test_exec_params_types(pq, pgconn):
res = pgconn.exec_params(b"select $1, $2", [b"8", b"8"], [1700, 23])
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"8"
assert res.ftype(0) == 1700
assert res.get_value(0, 1) == b"8"
def test_exec_params_nulls(pq, pgconn):
res = pgconn.exec_params(b"select $1, $2, $3", [b"hi", b"", None])
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"hi"
assert res.get_value(0, 1) == b""
assert res.get_value(0, 2) is None
[val, val],
param_formats=[0, 1],
)
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"3"
assert res.get_value(0, 1) == b"7"
res = pgconn.exec_params(
b"select $1::bytea", [val], param_formats=[1], result_format=fmt
)
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == out
def test_prepare(pq, pgconn):
res = pgconn.prepare(b"prep", b"select $1::int + $2::int")
- assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
res = pgconn.exec_prepared(b"prep", [b"3", b"5"])
assert res.get_value(0, 0) == b"8"
def test_prepare_types(pq, pgconn):
res = pgconn.prepare(b"prep", b"select $1 + $2", [23, 23])
- assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
res = pgconn.exec_prepared(b"prep", [b"3", b"5"])
assert res.get_value(0, 0) == b"8"
res = pgconn.prepare(b"", b"select length($1::bytea), length($2::bytea)")
res = pgconn.exec_prepared(b"", [val, val], param_formats=[0, 1])
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"3"
assert res.get_value(0, 1) == b"7"
def test_exec_prepared_binary_out(pq, pgconn, fmt, out):
val = b"foo\00bar"
res = pgconn.prepare(b"", b"select $1::bytea")
- assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
res = pgconn.exec_prepared(
b"", [val], param_formats=[1], result_format=fmt
)
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+ assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == out
declare cur cursor for select * from generate_series(1,10) foo;
"""
)
- assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
res = pgconn.describe_portal(b"cur")
- assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
assert res.nfields == 1
assert res.fname(0) == b"foo"
def test_error_message(pq, pgconn):
res = pgconn.exec_(b"wat")
- assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
+ assert res.status == pq.ExecStatus.FATAL_ERROR
msg = pq.error_message(pgconn)
assert msg == 'syntax error at or near "wat"'
assert msg == pq.error_message(res)
- assert msg == res.error_field(
- pq.DiagnosticField.PG_DIAG_MESSAGE_PRIMARY
- ).decode("ascii")
+ assert msg == res.error_field(pq.DiagnosticField.MESSAGE_PRIMARY).decode(
+ "ascii"
+ )
with pytest.raises(TypeError):
pq.error_message(None)
def test_connectdb(pq, dsn):
conn = pq.PGconn.connect(dsn.encode("utf8"))
- assert conn.status == pq.ConnStatus.CONNECTION_OK, conn.error_message
+ assert conn.status == pq.ConnStatus.OK, conn.error_message
def test_connectdb_error(pq):
conn = pq.PGconn.connect(b"dbname=psycopg3_test_not_for_real")
- assert conn.status == pq.ConnStatus.CONNECTION_BAD
+ assert conn.status == pq.ConnStatus.BAD
@pytest.mark.parametrize("baddsn", [None, 42])
conn = pq.PGconn.connect_start(dsn.encode("utf8"))
conn.nonblocking = 1
while 1:
- assert conn.status != pq.ConnStatus.CONNECTION_BAD
+ assert conn.status != pq.ConnStatus.BAD
rv = conn.connect_poll()
- if rv == pq.PollingStatus.PGRES_POLLING_OK:
+ if rv == pq.PollingStatus.OK:
break
- elif rv == pq.PollingStatus.PGRES_POLLING_READING:
+ elif rv == pq.PollingStatus.READING:
select([conn.socket], [], [])
- elif rv == pq.PollingStatus.PGRES_POLLING_WRITING:
+ elif rv == pq.PollingStatus.WRITING:
select([], [conn.socket], [])
else:
assert False, rv
- assert conn.status == pq.ConnStatus.CONNECTION_OK
+ assert conn.status == pq.ConnStatus.OK
def test_connect_async_bad(pq, dsn):
conn = pq.PGconn.connect_start(b"dbname=psycopg3_test_not_for_real")
while 1:
- assert conn.status != pq.ConnStatus.CONNECTION_BAD
+ assert conn.status != pq.ConnStatus.BAD
rv = conn.connect_poll()
- if rv == pq.PollingStatus.PGRES_POLLING_FAILED:
+ if rv == pq.PollingStatus.FAILED:
break
- elif rv == pq.PollingStatus.PGRES_POLLING_READING:
+ elif rv == pq.PollingStatus.READING:
select([conn.socket], [], [])
- elif rv == pq.PollingStatus.PGRES_POLLING_WRITING:
+ elif rv == pq.PollingStatus.WRITING:
select([], [conn.socket], [])
else:
assert False, rv
- assert conn.status == pq.ConnStatus.CONNECTION_BAD
+ assert conn.status == pq.ConnStatus.BAD
def test_info(pq, dsn, pgconn):
def test_reset(pq, pgconn):
- assert pgconn.status == pq.ConnStatus.CONNECTION_OK
+ assert pgconn.status == pq.ConnStatus.OK
# TODO: break it
pgconn.reset()
- assert pgconn.status == pq.ConnStatus.CONNECTION_OK
+ assert pgconn.status == pq.ConnStatus.OK
def test_reset_async(pq, pgconn):
- assert pgconn.status == pq.ConnStatus.CONNECTION_OK
+ assert pgconn.status == pq.ConnStatus.OK
# TODO: break it
pgconn.reset_start()
while 1:
rv = pgconn.connect_poll()
- if rv == pq.PollingStatus.PGRES_POLLING_READING:
+ if rv == pq.PollingStatus.READING:
select([pgconn.socket], [], [])
- elif rv == pq.PollingStatus.PGRES_POLLING_WRITING:
+ elif rv == pq.PollingStatus.WRITING:
select([], [pgconn.socket], [])
else:
break
- assert rv == pq.PollingStatus.PGRES_POLLING_OK
- assert pgconn.status == pq.ConnStatus.CONNECTION_OK
+ assert rv == pq.PollingStatus.OK
+ assert pgconn.status == pq.ConnStatus.OK
def test_ping(pq, dsn):
rv = pq.PGconn.ping(dsn.encode("utf8"))
- assert rv == pq.Ping.PQPING_OK
+ assert rv == pq.Ping.OK
rv = pq.PGconn.ping(b"port=99999")
- assert rv == pq.Ping.PQPING_NO_RESPONSE
+ assert rv == pq.Ping.NO_RESPONSE
def test_db(pgconn):
def test_transaction_status(pq, pgconn):
- assert pgconn.transaction_status == pq.TransactionStatus.PQTRANS_IDLE
+ assert pgconn.transaction_status == pq.TransactionStatus.IDLE
# TODO: test other states
pgconn.finish()
- assert pgconn.transaction_status == pq.TransactionStatus.PQTRANS_UNKNOWN
+ assert pgconn.transaction_status == pq.TransactionStatus.UNKNOWN
def test_parameter_status(pq, dsn, tempenv):
def test_encoding(pq, pgconn):
res = pgconn.exec_(b"set client_encoding to latin1")
- assert res.status == pq.ExecStatus.PGRES_COMMAND_OK
+ assert res.status == pq.ExecStatus.COMMAND_OK
assert pgconn.parameter_status(b"client_encoding") == b"LATIN1"
res = pgconn.exec_(b"set client_encoding to 'utf-8'")
- assert res.status == pq.ExecStatus.PGRES_COMMAND_OK
+ assert res.status == pq.ExecStatus.COMMAND_OK
assert pgconn.parameter_status(b"client_encoding") == b"UTF8"
res = pgconn.exec_(b"set client_encoding to wat")
- assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
+ assert res.status == pq.ExecStatus.FATAL_ERROR
assert pgconn.parameter_status(b"client_encoding") == b"UTF8"
def test_error_message(pq, pgconn):
res = pgconn.exec_(b"wat")
- assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
+ assert res.status == pq.ExecStatus.FATAL_ERROR
msg = pgconn.error_message
assert b"wat" in msg
def test_make_empty_result(pq, pgconn):
pgconn.exec_(b"wat")
- res = pgconn.make_empty_result(pq.ExecStatus.PGRES_FATAL_ERROR)
- assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
+ res = pgconn.make_empty_result(pq.ExecStatus.FATAL_ERROR)
+ assert res.status == pq.ExecStatus.FATAL_ERROR
assert b"wat" in res.error_message
@pytest.mark.parametrize(
"command, status",
[
- (b"", "PGRES_EMPTY_QUERY"),
- (b"select 1", "PGRES_TUPLES_OK"),
- (b"set timezone to utc", "PGRES_COMMAND_OK"),
- (b"wat", "PGRES_FATAL_ERROR"),
+ (b"", "EMPTY_QUERY"),
+ (b"select 1", "TUPLES_OK"),
+ (b"set timezone to utc", "COMMAND_OK"),
+ (b"wat", "FATAL_ERROR"),
],
)
def test_status(pq, pgconn, command, status):
def test_error_field(pq, pgconn):
res = pgconn.exec_(b"select wat")
- assert res.error_field(pq.DiagnosticField.PG_DIAG_SEVERITY) == b"ERROR"
- assert res.error_field(pq.DiagnosticField.PG_DIAG_SQLSTATE) == b"42703"
- assert b"wat" in res.error_field(
- pq.DiagnosticField.PG_DIAG_MESSAGE_PRIMARY
- )
+ assert res.error_field(pq.DiagnosticField.SEVERITY) == b"ERROR"
+ assert res.error_field(pq.DiagnosticField.SQLSTATE) == b"42703"
+ assert b"wat" in res.error_field(pq.DiagnosticField.MESSAGE_PRIMARY)
@pytest.mark.parametrize("n", range(4))
create table t2 as select 2 as f2, 3 as f3;
"""
)
- assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
res = pgconn.exec_(
b"select f1, f3, 't1'::regclass::oid, 't2'::regclass::oid from t1, t2"
)
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
assert res.ftable(0) == int(res.get_value(0, 2).decode("ascii"))
assert res.ftable(1) == int(res.get_value(0, 3).decode("ascii"))
@pytest.mark.parametrize("fmt", (0, 1))
def test_fformat(pq, pgconn, fmt):
res = pgconn.exec_params(b"select 1", [], result_format=fmt)
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
assert res.fformat(0) == fmt
assert res.binary_tuples == fmt
def test_ftype(pq, pgconn):
res = pgconn.exec_(b"select 1::int, 1::numeric, 1::text")
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
assert res.ftype(0) == 23
assert res.ftype(1) == 1700
assert res.ftype(2) == 25
def test_fmod(pq, pgconn):
res = pgconn.exec_(b"select 1::int, 1::numeric(10), 1::numeric(10,2)")
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
assert res.fmod(0) == -1
assert res.fmod(1) == 0xA0004
assert res.fmod(2) == 0xA0006
def test_fsize(pq, pgconn):
res = pgconn.exec_(b"select 1::int, 1::bigint, 1::text")
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
assert res.fsize(0) == 4
assert res.fsize(1) == 8
assert res.fsize(2) == -1
def test_get_value(pq, pgconn):
res = pgconn.exec_(b"select 'a', '', NULL")
- assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+ assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
assert res.get_value(0, 0) == b"a"
assert res.get_value(0, 1) == b""
assert res.get_value(0, 2) is None
def test_nparams_types(pq, pgconn):
res = pgconn.prepare(b"", b"select $1::int, $2::text")
- assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
res = pgconn.describe_prepared(b"")
- assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
assert res.nparams == 2
assert res.param_type(0) == 23
def test_connect(pq, dsn, loop):
conn = loop.run_until_complete(AsyncConnection.connect(dsn))
- assert conn.pgconn.status == pq.ConnStatus.CONNECTION_OK
+ assert conn.pgconn.status == pq.ConnStatus.OK
def test_connect_bad(loop):
aconn.pgconn.exec_(b"drop table if exists foo")
aconn.pgconn.exec_(b"create table foo (id int primary key)")
aconn.pgconn.exec_(b"begin")
- assert (
- aconn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_INTRANS
- )
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
res = aconn.pgconn.exec_(b"insert into foo values (1)")
loop.run_until_complete(aconn.commit())
- assert aconn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_IDLE
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE
res = aconn.pgconn.exec_(b"select id from foo where id = 1")
assert res.get_value(0, 0) == b"1"
aconn.pgconn.exec_(b"drop table if exists foo")
aconn.pgconn.exec_(b"create table foo (id int primary key)")
aconn.pgconn.exec_(b"begin")
- assert (
- aconn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_INTRANS
- )
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
res = aconn.pgconn.exec_(b"insert into foo values (1)")
loop.run_until_complete(aconn.rollback())
- assert aconn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_IDLE
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE
res = aconn.pgconn.exec_(b"select id from foo where id = 1")
assert res.get_value(0, 0) is None
def test_connect(pq, dsn):
conn = Connection.connect(dsn)
- assert conn.pgconn.status == pq.ConnStatus.CONNECTION_OK
+ assert conn.pgconn.status == pq.ConnStatus.OK
def test_connect_bad():
conn.pgconn.exec_(b"drop table if exists foo")
conn.pgconn.exec_(b"create table foo (id int primary key)")
conn.pgconn.exec_(b"begin")
- assert (
- conn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_INTRANS
- )
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
res = conn.pgconn.exec_(b"insert into foo values (1)")
conn.commit()
- assert conn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_IDLE
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
res = conn.pgconn.exec_(b"select id from foo where id = 1")
assert res.get_value(0, 0) == b"1"
conn.pgconn.exec_(b"drop table if exists foo")
conn.pgconn.exec_(b"create table foo (id int primary key)")
conn.pgconn.exec_(b"begin")
- assert (
- conn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_INTRANS
- )
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
res = conn.pgconn.exec_(b"insert into foo values (1)")
conn.rollback()
- assert conn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_IDLE
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
res = conn.pgconn.exec_(b"select id from foo where id = 1")
assert res.get_value(0, 0) is None