]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Enum values simplified
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 21 Mar 2020 09:40:34 +0000 (22:40 +1300)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 21 Mar 2020 09:40:34 +0000 (22:40 +1300)
In C the scope is global so the enum values need a healty dose of
prefixing. In Python you can't import the value without separating from
the class so even if there is more than one called ``OK`` it's... ok.

psycopg3/connection.py
psycopg3/pq/enums.py
psycopg3/pq/misc.py
tests/pq/test_async.py
tests/pq/test_exec.py
tests/pq/test_misc.py
tests/pq/test_pgconn.py
tests/pq/test_pgresult.py
tests/test_async_connection.py
tests/test_connection.py

index 5a5c5d59b9ec5ca7a851d97849daedb95579a9ee..c93d14f46cdc29fa9735e3d6cbb780b7c72cf11e 100644 (file)
@@ -41,20 +41,20 @@ class BaseConnection:
         conn = pq.PGconn.connect_start(conninfo)
         logger.debug("connection started, status %s", conn.status.name)
         while 1:
-            if conn.status == pq.ConnStatus.CONNECTION_BAD:
+            if conn.status == pq.ConnStatus.BAD:
                 raise exc.OperationalError(
                     f"connection is bad: {pq.error_message(conn)}"
                 )
 
             status = conn.connect_poll()
             logger.debug("connection polled, status %s", conn.status.name)
-            if status == pq.PollingStatus.PGRES_POLLING_OK:
+            if status == pq.PollingStatus.OK:
                 break
-            elif status == pq.PollingStatus.PGRES_POLLING_READING:
+            elif status == pq.PollingStatus.READING:
                 yield conn.socket, Wait.R
-            elif status == pq.PollingStatus.PGRES_POLLING_WRITING:
+            elif status == pq.PollingStatus.WRITING:
                 yield conn.socket, Wait.W
-            elif status == pq.PollingStatus.PGRES_POLLING_FAILED:
+            elif status == pq.PollingStatus.FAILED:
                 raise exc.OperationalError(
                     f"connection failed: {pq.error_message(conn)}"
                 )
@@ -129,12 +129,12 @@ class Connection(BaseConnection):
     def _exec_commit_rollback(self, command):
         with self.lock:
             status = self.pgconn.transaction_status
-            if status == pq.TransactionStatus.PQTRANS_IDLE:
+            if status == pq.TransactionStatus.IDLE:
                 return
 
             self.pgconn.send_query(command)
             (pgres,) = wait_select(self._exec_gen(self.pgconn))
-            if pgres.status != pq.ExecStatus.PGRES_COMMAND_OK:
+            if pgres.status != pq.ExecStatus.COMMAND_OK:
                 raise exc.OperationalError(
                     f"error on {command.decode('utf8')}:"
                     f" {pq.error_message(pgres)}"
@@ -165,12 +165,12 @@ class AsyncConnection(BaseConnection):
     async def _exec_commit_rollback(self, command):
         with self.lock:
             status = self.pgconn.transaction_status
-            if status == pq.TransactionStatus.PQTRANS_IDLE:
+            if status == pq.TransactionStatus.IDLE:
                 return
 
             self.pgconn.send_query(command)
             (pgres,) = await wait_async(self._exec_gen(self.pgconn))
-            if pgres.status != pq.ExecStatus.PGRES_COMMAND_OK:
+            if pgres.status != pq.ExecStatus.COMMAND_OK:
                 raise exc.OperationalError(
                     f"error on {command.decode('utf8')}:"
                     f" {pq.error_message(pgres)}"
index cdc1288fdc8b260d04bcd87bc9ad83f56e6c03ca..a8bbab7af18c0961acc3fa03f70e042093531541 100644 (file)
@@ -8,75 +8,74 @@ from enum import IntEnum, auto
 
 
 class ConnStatus(IntEnum):
-    CONNECTION_OK = 0
-    CONNECTION_BAD = auto()
-
-    CONNECTION_STARTED = auto()
-    CONNECTION_MADE = auto()
-    CONNECTION_AWAITING_RESPONSE = auto()
-    CONNECTION_AUTH_OK = auto()
-    CONNECTION_SETENV = auto()
-    CONNECTION_SSL_STARTUP = auto()
-    CONNECTION_NEEDED = auto()
-    CONNECTION_CHECK_WRITABLE = auto()
-    CONNECTION_CONSUME = auto()
-    CONNECTION_GSS_STARTUP = auto()
-    CONNECTION_CHECK_TARGET = auto()
+    OK = 0
+    BAD = auto()
+    STARTED = auto()
+    MADE = auto()
+    AWAITING_RESPONSE = auto()
+    AUTH_OK = auto()
+    SETENV = auto()
+    SSL_STARTUP = auto()
+    NEEDED = auto()
+    CHECK_WRITABLE = auto()
+    CONSUME = auto()
+    GSS_STARTUP = auto()
+    CHECK_TARGET = auto()
 
 
 class PollingStatus(IntEnum):
-    PGRES_POLLING_FAILED = 0
-    PGRES_POLLING_READING = auto()
-    PGRES_POLLING_WRITING = auto()
-    PGRES_POLLING_OK = auto()
-    PGRES_POLLING_ACTIVE = auto()
+    FAILED = 0
+    READING = auto()
+    WRITING = auto()
+    OK = auto()
+    ACTIVE = auto()
 
 
 class ExecStatus(IntEnum):
-    PGRES_EMPTY_QUERY = 0
-    PGRES_COMMAND_OK = auto()
-    PGRES_TUPLES_OK = auto()
-    PGRES_COPY_OUT = auto()
-    PGRES_COPY_IN = auto()
-    PGRES_BAD_RESPONSE = auto()
-    PGRES_NONFATAL_ERROR = auto()
-    PGRES_FATAL_ERROR = auto()
-    PGRES_COPY_BOTH = auto()
-    PGRES_SINGLE_TUPLE = auto()
+    EMPTY_QUERY = 0
+    COMMAND_OK = auto()
+    TUPLES_OK = auto()
+    COPY_OUT = auto()
+    COPY_IN = auto()
+    BAD_RESPONSE = auto()
+    NONFATAL_ERROR = auto()
+    FATAL_ERROR = auto()
+    COPY_BOTH = auto()
+    SINGLE_TUPLE = auto()
 
 
 class TransactionStatus(IntEnum):
-    PQTRANS_IDLE = 0
-    PQTRANS_ACTIVE = auto()
-    PQTRANS_INTRANS = auto()
-    PQTRANS_INERROR = auto()
-    PQTRANS_UNKNOWN = auto()
+    IDLE = 0
+    ACTIVE = auto()
+    INTRANS = auto()
+    INERROR = auto()
+    UNKNOWN = auto()
 
 
 class Ping(IntEnum):
-    PQPING_OK = 0
-    PQPING_REJECT = auto()
-    PQPING_NO_RESPONSE = auto()
-    PQPING_NO_ATTEMPT = auto()
+    OK = 0
+    REJECT = auto()
+    NO_RESPONSE = auto()
+    NO_ATTEMPT = auto()
 
 
 class DiagnosticField(IntEnum):
     # from postgres_ext.h
-    PG_DIAG_SEVERITY = ord("S")
-    PG_DIAG_SEVERITY_NONLOCALIZED = ord("V")
-    PG_DIAG_SQLSTATE = ord("C")
-    PG_DIAG_MESSAGE_PRIMARY = ord("M")
-    PG_DIAG_MESSAGE_DETAIL = ord("D")
-    PG_DIAG_MESSAGE_HINT = ord("H")
-    PG_DIAG_STATEMENT_POSITION = ord("P")
-    PG_DIAG_INTERNAL_POSITION = ord("p")
-    PG_DIAG_INTERNAL_QUERY = ord("q")
-    PG_DIAG_CONTEXT = ord("W")
-    PG_DIAG_SCHEMA_NAME = ord("s")
-    PG_DIAG_TABLE_NAME = ord("t")
-    PG_DIAG_COLUMN_NAME = ord("c")
-    PG_DIAG_DATATYPE_NAME = ord("d")
-    PG_DIAG_CONSTRAINT_NAME = ord("n")
-    PG_DIAG_SOURCE_FILE = ord("F")
-    PG_DIAG_SOURCE_LINE = ord("L")
-    PG_DIAG_SOURCE_FUNCTION = ord("R")
+    SEVERITY = ord("S")
+    SEVERITY_NONLOCALIZED = ord("V")
+    SQLSTATE = ord("C")
+    MESSAGE_PRIMARY = ord("M")
+    MESSAGE_DETAIL = ord("D")
+    MESSAGE_HINT = ord("H")
+    STATEMENT_POSITION = ord("P")
+    INTERNAL_POSITION = ord("p")
+    INTERNAL_QUERY = ord("q")
+    CONTEXT = ord("W")
+    SCHEMA_NAME = ord("s")
+    TABLE_NAME = ord("t")
+    COLUMN_NAME = ord("c")
+    DATATYPE_NAME = ord("d")
+    CONSTRAINT_NAME = ord("n")
+    SOURCE_FILE = ord("F")
+    SOURCE_LINE = ord("L")
+    SOURCE_FUNCTION = ord("R")
index 9385cc7ac228601f7846bf09af8b30e741e83e65..ed1e6cca4c87b7ab760207af898252b49439e902 100644 (file)
@@ -21,7 +21,7 @@ def error_message(obj):
             msg = msg.splitlines()[0].split(b":", 1)[-1].strip()
 
     elif isinstance(obj, pq.PGresult):
-        msg = obj.error_field(pq.DiagnosticField.PG_DIAG_MESSAGE_PRIMARY)
+        msg = obj.error_field(pq.DiagnosticField.MESSAGE_PRIMARY)
         if not msg:
             msg = obj.error_message
 
index 730011dc2ae9d8bea83d3fc35900cd3964c8f809..f91cfff7f931306fc0f53b0c98c819abb58ee698 100644 (file)
@@ -40,7 +40,7 @@ def test_send_query(pq, pgconn):
         res = pgconn.get_result()
         if res is None:
             break
-        assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+        assert res.status == pq.ExecStatus.TUPLES_OK
         results.append(res)
 
     assert len(results) == 2
index 16f09964887487113d3968070a605d902cb4bcf4..e991d753c3cb413a26d28fb0d60a57b772f042ea 100644 (file)
@@ -15,19 +15,19 @@ def test_exec(pq, pgconn):
 
 def test_exec_params(pq, pgconn):
     res = pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"])
-    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+    assert res.status == pq.ExecStatus.TUPLES_OK
     assert res.get_value(0, 0) == b"8"
 
 
 def test_exec_params_empty(pq, pgconn):
     res = pgconn.exec_params(b"select 8::int", [])
-    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+    assert res.status == pq.ExecStatus.TUPLES_OK
     assert res.get_value(0, 0) == b"8"
 
 
 def test_exec_params_types(pq, pgconn):
     res = pgconn.exec_params(b"select $1, $2", [b"8", b"8"], [1700, 23])
-    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+    assert res.status == pq.ExecStatus.TUPLES_OK
     assert res.get_value(0, 0) == b"8"
     assert res.ftype(0) == 1700
     assert res.get_value(0, 1) == b"8"
@@ -39,7 +39,7 @@ def test_exec_params_types(pq, pgconn):
 
 def test_exec_params_nulls(pq, pgconn):
     res = pgconn.exec_params(b"select $1, $2, $3", [b"hi", b"", None])
-    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+    assert res.status == pq.ExecStatus.TUPLES_OK
     assert res.get_value(0, 0) == b"hi"
     assert res.get_value(0, 1) == b""
     assert res.get_value(0, 2) is None
@@ -52,7 +52,7 @@ def test_exec_params_binary_in(pq, pgconn):
         [val, val],
         param_formats=[0, 1],
     )
-    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+    assert res.status == pq.ExecStatus.TUPLES_OK
     assert res.get_value(0, 0) == b"3"
     assert res.get_value(0, 1) == b"7"
 
@@ -68,13 +68,13 @@ def test_exec_params_binary_out(pq, pgconn, fmt, out):
     res = pgconn.exec_params(
         b"select $1::bytea", [val], param_formats=[1], result_format=fmt
     )
-    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+    assert res.status == pq.ExecStatus.TUPLES_OK
     assert res.get_value(0, 0) == out
 
 
 def test_prepare(pq, pgconn):
     res = pgconn.prepare(b"prep", b"select $1::int + $2::int")
-    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
 
     res = pgconn.exec_prepared(b"prep", [b"3", b"5"])
     assert res.get_value(0, 0) == b"8"
@@ -82,7 +82,7 @@ def test_prepare(pq, pgconn):
 
 def test_prepare_types(pq, pgconn):
     res = pgconn.prepare(b"prep", b"select $1 + $2", [23, 23])
-    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
 
     res = pgconn.exec_prepared(b"prep", [b"3", b"5"])
     assert res.get_value(0, 0) == b"8"
@@ -93,7 +93,7 @@ def test_exec_prepared_binary_in(pq, pgconn):
     res = pgconn.prepare(b"", b"select length($1::bytea), length($2::bytea)")
 
     res = pgconn.exec_prepared(b"", [val, val], param_formats=[0, 1])
-    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+    assert res.status == pq.ExecStatus.TUPLES_OK
     assert res.get_value(0, 0) == b"3"
     assert res.get_value(0, 1) == b"7"
 
@@ -107,12 +107,12 @@ def test_exec_prepared_binary_in(pq, pgconn):
 def test_exec_prepared_binary_out(pq, pgconn, fmt, out):
     val = b"foo\00bar"
     res = pgconn.prepare(b"", b"select $1::bytea")
-    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
 
     res = pgconn.exec_prepared(
         b"", [val], param_formats=[1], result_format=fmt
     )
-    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
+    assert res.status == pq.ExecStatus.TUPLES_OK
     assert res.get_value(0, 0) == out
 
 
@@ -123,9 +123,9 @@ def test_describe_portal(pq, pgconn):
         declare cur cursor for select * from generate_series(1,10) foo;
         """
     )
-    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
 
     res = pgconn.describe_portal(b"cur")
-    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
     assert res.nfields == 1
     assert res.fname(0) == b"foo"
index c193c87f0e5ecad9aa9d4151497246864f3ad1ad..86c936061b965dd7935e0eb40abeca9aae7e0982 100644 (file)
@@ -3,13 +3,13 @@ import pytest
 
 def test_error_message(pq, pgconn):
     res = pgconn.exec_(b"wat")
-    assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
+    assert res.status == pq.ExecStatus.FATAL_ERROR
     msg = pq.error_message(pgconn)
     assert msg == 'syntax error at or near "wat"'
     assert msg == pq.error_message(res)
-    assert msg == res.error_field(
-        pq.DiagnosticField.PG_DIAG_MESSAGE_PRIMARY
-    ).decode("ascii")
+    assert msg == res.error_field(pq.DiagnosticField.MESSAGE_PRIMARY).decode(
+        "ascii"
+    )
 
     with pytest.raises(TypeError):
         pq.error_message(None)
index 3d65d0e4cf73027e64c47c478b62600bbbfcd2be..91fa3fdd161286ae96d5a3b1f9c7d66464be0dfd 100644 (file)
@@ -5,12 +5,12 @@ import pytest
 
 def test_connectdb(pq, dsn):
     conn = pq.PGconn.connect(dsn.encode("utf8"))
-    assert conn.status == pq.ConnStatus.CONNECTION_OK, conn.error_message
+    assert conn.status == pq.ConnStatus.OK, conn.error_message
 
 
 def test_connectdb_error(pq):
     conn = pq.PGconn.connect(b"dbname=psycopg3_test_not_for_real")
-    assert conn.status == pq.ConnStatus.CONNECTION_BAD
+    assert conn.status == pq.ConnStatus.BAD
 
 
 @pytest.mark.parametrize("baddsn", [None, 42])
@@ -23,35 +23,35 @@ def test_connect_async(pq, dsn):
     conn = pq.PGconn.connect_start(dsn.encode("utf8"))
     conn.nonblocking = 1
     while 1:
-        assert conn.status != pq.ConnStatus.CONNECTION_BAD
+        assert conn.status != pq.ConnStatus.BAD
         rv = conn.connect_poll()
-        if rv == pq.PollingStatus.PGRES_POLLING_OK:
+        if rv == pq.PollingStatus.OK:
             break
-        elif rv == pq.PollingStatus.PGRES_POLLING_READING:
+        elif rv == pq.PollingStatus.READING:
             select([conn.socket], [], [])
-        elif rv == pq.PollingStatus.PGRES_POLLING_WRITING:
+        elif rv == pq.PollingStatus.WRITING:
             select([], [conn.socket], [])
         else:
             assert False, rv
 
-    assert conn.status == pq.ConnStatus.CONNECTION_OK
+    assert conn.status == pq.ConnStatus.OK
 
 
 def test_connect_async_bad(pq, dsn):
     conn = pq.PGconn.connect_start(b"dbname=psycopg3_test_not_for_real")
     while 1:
-        assert conn.status != pq.ConnStatus.CONNECTION_BAD
+        assert conn.status != pq.ConnStatus.BAD
         rv = conn.connect_poll()
-        if rv == pq.PollingStatus.PGRES_POLLING_FAILED:
+        if rv == pq.PollingStatus.FAILED:
             break
-        elif rv == pq.PollingStatus.PGRES_POLLING_READING:
+        elif rv == pq.PollingStatus.READING:
             select([conn.socket], [], [])
-        elif rv == pq.PollingStatus.PGRES_POLLING_WRITING:
+        elif rv == pq.PollingStatus.WRITING:
             select([], [conn.socket], [])
         else:
             assert False, rv
 
-    assert conn.status == pq.ConnStatus.CONNECTION_BAD
+    assert conn.status == pq.ConnStatus.BAD
 
 
 def test_info(pq, dsn, pgconn):
@@ -69,35 +69,35 @@ def test_info(pq, dsn, pgconn):
 
 
 def test_reset(pq, pgconn):
-    assert pgconn.status == pq.ConnStatus.CONNECTION_OK
+    assert pgconn.status == pq.ConnStatus.OK
     # TODO: break it
     pgconn.reset()
-    assert pgconn.status == pq.ConnStatus.CONNECTION_OK
+    assert pgconn.status == pq.ConnStatus.OK
 
 
 def test_reset_async(pq, pgconn):
-    assert pgconn.status == pq.ConnStatus.CONNECTION_OK
+    assert pgconn.status == pq.ConnStatus.OK
     # TODO: break it
     pgconn.reset_start()
     while 1:
         rv = pgconn.connect_poll()
-        if rv == pq.PollingStatus.PGRES_POLLING_READING:
+        if rv == pq.PollingStatus.READING:
             select([pgconn.socket], [], [])
-        elif rv == pq.PollingStatus.PGRES_POLLING_WRITING:
+        elif rv == pq.PollingStatus.WRITING:
             select([], [pgconn.socket], [])
         else:
             break
 
-    assert rv == pq.PollingStatus.PGRES_POLLING_OK
-    assert pgconn.status == pq.ConnStatus.CONNECTION_OK
+    assert rv == pq.PollingStatus.OK
+    assert pgconn.status == pq.ConnStatus.OK
 
 
 def test_ping(pq, dsn):
     rv = pq.PGconn.ping(dsn.encode("utf8"))
-    assert rv == pq.Ping.PQPING_OK
+    assert rv == pq.Ping.OK
 
     rv = pq.PGconn.ping(b"port=99999")
-    assert rv == pq.Ping.PQPING_NO_RESPONSE
+    assert rv == pq.Ping.NO_RESPONSE
 
 
 def test_db(pgconn):
@@ -131,10 +131,10 @@ def test_tty(pgconn):
 
 
 def test_transaction_status(pq, pgconn):
-    assert pgconn.transaction_status == pq.TransactionStatus.PQTRANS_IDLE
+    assert pgconn.transaction_status == pq.TransactionStatus.IDLE
     # TODO: test other states
     pgconn.finish()
-    assert pgconn.transaction_status == pq.TransactionStatus.PQTRANS_UNKNOWN
+    assert pgconn.transaction_status == pq.TransactionStatus.UNKNOWN
 
 
 def test_parameter_status(pq, dsn, tempenv):
@@ -146,15 +146,15 @@ def test_parameter_status(pq, dsn, tempenv):
 
 def test_encoding(pq, pgconn):
     res = pgconn.exec_(b"set client_encoding to latin1")
-    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK
+    assert res.status == pq.ExecStatus.COMMAND_OK
     assert pgconn.parameter_status(b"client_encoding") == b"LATIN1"
 
     res = pgconn.exec_(b"set client_encoding to 'utf-8'")
-    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK
+    assert res.status == pq.ExecStatus.COMMAND_OK
     assert pgconn.parameter_status(b"client_encoding") == b"UTF8"
 
     res = pgconn.exec_(b"set client_encoding to wat")
-    assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
+    assert res.status == pq.ExecStatus.FATAL_ERROR
     assert pgconn.parameter_status(b"client_encoding") == b"UTF8"
 
 
@@ -168,7 +168,7 @@ def test_server_version(pgconn):
 
 def test_error_message(pq, pgconn):
     res = pgconn.exec_(b"wat")
-    assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
+    assert res.status == pq.ExecStatus.FATAL_ERROR
     msg = pgconn.error_message
     assert b"wat" in msg
 
@@ -214,6 +214,6 @@ def test_ssl_in_use(pgconn):
 
 def test_make_empty_result(pq, pgconn):
     pgconn.exec_(b"wat")
-    res = pgconn.make_empty_result(pq.ExecStatus.PGRES_FATAL_ERROR)
-    assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
+    res = pgconn.make_empty_result(pq.ExecStatus.FATAL_ERROR)
+    assert res.status == pq.ExecStatus.FATAL_ERROR
     assert b"wat" in res.error_message
index 6bb55e2a552005a1ba4754db2c62d47718d166ab..3c3641ac04d3b13ee6203b55321d5aa7d846a53d 100644 (file)
@@ -4,10 +4,10 @@ import pytest
 @pytest.mark.parametrize(
     "command, status",
     [
-        (b"", "PGRES_EMPTY_QUERY"),
-        (b"select 1", "PGRES_TUPLES_OK"),
-        (b"set timezone to utc", "PGRES_COMMAND_OK"),
-        (b"wat", "PGRES_FATAL_ERROR"),
+        (b"", "EMPTY_QUERY"),
+        (b"select 1", "TUPLES_OK"),
+        (b"set timezone to utc", "COMMAND_OK"),
+        (b"wat", "FATAL_ERROR"),
     ],
 )
 def test_status(pq, pgconn, command, status):
@@ -24,11 +24,9 @@ def test_error_message(pgconn):
 
 def test_error_field(pq, pgconn):
     res = pgconn.exec_(b"select wat")
-    assert res.error_field(pq.DiagnosticField.PG_DIAG_SEVERITY) == b"ERROR"
-    assert res.error_field(pq.DiagnosticField.PG_DIAG_SQLSTATE) == b"42703"
-    assert b"wat" in res.error_field(
-        pq.DiagnosticField.PG_DIAG_MESSAGE_PRIMARY
-    )
+    assert res.error_field(pq.DiagnosticField.SEVERITY) == b"ERROR"
+    assert res.error_field(pq.DiagnosticField.SQLSTATE) == b"42703"
+    assert b"wat" in res.error_field(pq.DiagnosticField.MESSAGE_PRIMARY)
 
 
 @pytest.mark.parametrize("n", range(4))
@@ -60,12 +58,12 @@ def test_ftable_and_col(pq, pgconn):
         create table t2 as select 2 as f2, 3 as f3;
         """
     )
-    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
 
     res = pgconn.exec_(
         b"select f1, f3, 't1'::regclass::oid, 't2'::regclass::oid from t1, t2"
     )
-    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+    assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
 
     assert res.ftable(0) == int(res.get_value(0, 2).decode("ascii"))
     assert res.ftable(1) == int(res.get_value(0, 3).decode("ascii"))
@@ -76,14 +74,14 @@ def test_ftable_and_col(pq, pgconn):
 @pytest.mark.parametrize("fmt", (0, 1))
 def test_fformat(pq, pgconn, fmt):
     res = pgconn.exec_params(b"select 1", [], result_format=fmt)
-    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+    assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
     assert res.fformat(0) == fmt
     assert res.binary_tuples == fmt
 
 
 def test_ftype(pq, pgconn):
     res = pgconn.exec_(b"select 1::int, 1::numeric, 1::text")
-    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+    assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
     assert res.ftype(0) == 23
     assert res.ftype(1) == 1700
     assert res.ftype(2) == 25
@@ -91,7 +89,7 @@ def test_ftype(pq, pgconn):
 
 def test_fmod(pq, pgconn):
     res = pgconn.exec_(b"select 1::int, 1::numeric(10), 1::numeric(10,2)")
-    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+    assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
     assert res.fmod(0) == -1
     assert res.fmod(1) == 0xA0004
     assert res.fmod(2) == 0xA0006
@@ -99,7 +97,7 @@ def test_fmod(pq, pgconn):
 
 def test_fsize(pq, pgconn):
     res = pgconn.exec_(b"select 1::int, 1::bigint, 1::text")
-    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+    assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
     assert res.fsize(0) == 4
     assert res.fsize(1) == 8
     assert res.fsize(2) == -1
@@ -107,7 +105,7 @@ def test_fsize(pq, pgconn):
 
 def test_get_value(pq, pgconn):
     res = pgconn.exec_(b"select 'a', '', NULL")
-    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+    assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
     assert res.get_value(0, 0) == b"a"
     assert res.get_value(0, 1) == b""
     assert res.get_value(0, 2) is None
@@ -115,10 +113,10 @@ def test_get_value(pq, pgconn):
 
 def test_nparams_types(pq, pgconn):
     res = pgconn.prepare(b"", b"select $1::int, $2::text")
-    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
 
     res = pgconn.describe_prepared(b"")
-    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
 
     assert res.nparams == 2
     assert res.param_type(0) == 23
index 992760d089eddf57b16d24de396e0f63c2dcfc49..ddf39ed36b23e2da3fd935623658c37350b3c38a 100644 (file)
@@ -6,7 +6,7 @@ from psycopg3 import AsyncConnection
 
 def test_connect(pq, dsn, loop):
     conn = loop.run_until_complete(AsyncConnection.connect(dsn))
-    assert conn.pgconn.status == pq.ConnStatus.CONNECTION_OK
+    assert conn.pgconn.status == pq.ConnStatus.OK
 
 
 def test_connect_bad(loop):
@@ -18,12 +18,10 @@ def test_commit(loop, pq, aconn):
     aconn.pgconn.exec_(b"drop table if exists foo")
     aconn.pgconn.exec_(b"create table foo (id int primary key)")
     aconn.pgconn.exec_(b"begin")
-    assert (
-        aconn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_INTRANS
-    )
+    assert aconn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
     res = aconn.pgconn.exec_(b"insert into foo values (1)")
     loop.run_until_complete(aconn.commit())
-    assert aconn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_IDLE
+    assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE
     res = aconn.pgconn.exec_(b"select id from foo where id = 1")
     assert res.get_value(0, 0) == b"1"
 
@@ -32,11 +30,9 @@ def test_rollback(loop, pq, aconn):
     aconn.pgconn.exec_(b"drop table if exists foo")
     aconn.pgconn.exec_(b"create table foo (id int primary key)")
     aconn.pgconn.exec_(b"begin")
-    assert (
-        aconn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_INTRANS
-    )
+    assert aconn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
     res = aconn.pgconn.exec_(b"insert into foo values (1)")
     loop.run_until_complete(aconn.rollback())
-    assert aconn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_IDLE
+    assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE
     res = aconn.pgconn.exec_(b"select id from foo where id = 1")
     assert res.get_value(0, 0) is None
index 69e0532c6c5eeacc545d34248d8cebe595cb983c..c009c16fc5e9e4ba7679273f6de014b143c555df 100644 (file)
@@ -6,7 +6,7 @@ from psycopg3 import Connection
 
 def test_connect(pq, dsn):
     conn = Connection.connect(dsn)
-    assert conn.pgconn.status == pq.ConnStatus.CONNECTION_OK
+    assert conn.pgconn.status == pq.ConnStatus.OK
 
 
 def test_connect_bad():
@@ -18,12 +18,10 @@ def test_commit(pq, conn):
     conn.pgconn.exec_(b"drop table if exists foo")
     conn.pgconn.exec_(b"create table foo (id int primary key)")
     conn.pgconn.exec_(b"begin")
-    assert (
-        conn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_INTRANS
-    )
+    assert conn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
     res = conn.pgconn.exec_(b"insert into foo values (1)")
     conn.commit()
-    assert conn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_IDLE
+    assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
     res = conn.pgconn.exec_(b"select id from foo where id = 1")
     assert res.get_value(0, 0) == b"1"
 
@@ -32,11 +30,9 @@ def test_rollback(pq, conn):
     conn.pgconn.exec_(b"drop table if exists foo")
     conn.pgconn.exec_(b"create table foo (id int primary key)")
     conn.pgconn.exec_(b"begin")
-    assert (
-        conn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_INTRANS
-    )
+    assert conn.pgconn.transaction_status == pq.TransactionStatus.INTRANS
     res = conn.pgconn.exec_(b"insert into foo values (1)")
     conn.rollback()
-    assert conn.pgconn.transaction_status == pq.TransactionStatus.PQTRANS_IDLE
+    assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
     res = conn.pgconn.exec_(b"select id from foo where id = 1")
     assert res.get_value(0, 0) is None