]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Added tests for PGresult
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 15 Mar 2020 13:24:11 +0000 (02:24 +1300)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 15 Mar 2020 13:24:11 +0000 (02:24 +1300)
psycopg3/pq/__init__.py
tests/pq/test_exec.py
tests/pq/test_pgresult.py [new file with mode: 0644]

index 0d3aec5e5d4862b000ec3d6e5e4e081af61089b2..641610137efe30fb4a90364b30d3718b7645da55 100644 (file)
@@ -15,6 +15,7 @@ from .enums import (
     ExecStatus,
     TransactionStatus,
     Ping,
+    DiagnosticField,
 )
 
 from . import pq_ctypes as pq_module
@@ -29,6 +30,7 @@ __all__ = (
     "TransactionStatus",
     "ExecStatus",
     "Ping",
+    "DiagnosticField",
     "PGconn",
     "Conninfo",
     "PQerror",
index 226ded78a35d0fe06dd752f54b16220d087544a9..cfbe6de2dd0d55c0e4aaccaece3e794a2fbff0ba 100644 (file)
@@ -8,42 +8,26 @@ def test_exec_none(pq, pgconn):
         pgconn.exec_(None)
 
 
-def test_exec_empty(pq, pgconn):
-    res = pgconn.exec_(b"")
-    assert res.status == pq.ExecStatus.PGRES_EMPTY_QUERY
-
-
-def test_exec_command(pq, pgconn):
-    res = pgconn.exec_(b"set timezone to utc")
-    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK
-
-
-def test_exec_error(pq, pgconn):
-    res = pgconn.exec_(b"wat")
-    assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
+def test_exec(pq, pgconn):
+    res = pgconn.exec_(b"select 'hel' || 'lo'")
+    assert res.get_value(0, 0) == b"hello"
 
 
 def test_exec_params(pq, pgconn):
     res = pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"])
     assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
-    assert res.nfields == 1
-    assert res.ntuples == 1
     assert res.get_value(0, 0) == b"8"
 
 
 def test_exec_params_empty(pq, pgconn):
     res = pgconn.exec_params(b"select 8::int", [])
     assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
-    assert res.nfields == 1
-    assert res.ntuples == 1
     assert res.get_value(0, 0) == b"8"
 
 
 def test_exec_params_types(pq, pgconn):
     res = pgconn.exec_params(b"select $1, $2", [b"8", b"8"], [1700, 23])
     assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
-    assert res.nfields == 2
-    assert res.ntuples == 1
     assert res.get_value(0, 0) == b"8"
     assert res.ftype(0) == 1700
     assert res.get_value(0, 1) == b"8"
@@ -56,8 +40,6 @@ def test_exec_params_types(pq, pgconn):
 def test_exec_params_nulls(pq, pgconn):
     res = pgconn.exec_params(b"select $1, $2, $3", [b"hi", b"", None])
     assert res.status == pq.ExecStatus.PGRES_TUPLES_OK
-    assert res.nfields == 3
-    assert res.ntuples == 1
     assert res.get_value(0, 0) == b"hi"
     assert res.get_value(0, 1) == b""
     assert res.get_value(0, 2) is None
diff --git a/tests/pq/test_pgresult.py b/tests/pq/test_pgresult.py
new file mode 100644 (file)
index 0000000..530a974
--- /dev/null
@@ -0,0 +1,113 @@
+import pytest
+
+
+@pytest.mark.parametrize(
+    "command, status",
+    [
+        (b"", "PGRES_EMPTY_QUERY"),
+        (b"select 1", "PGRES_TUPLES_OK"),
+        (b"set timezone to utc", "PGRES_COMMAND_OK"),
+        (b"wat", "PGRES_FATAL_ERROR"),
+    ],
+)
+def test_status(pq, pgconn, command, status):
+    res = pgconn.exec_(command)
+    assert res.status == getattr(pq.ExecStatus, status)
+
+
+def test_error_message(pgconn):
+    res = pgconn.exec_(b"select 1")
+    assert res.error_message == b""
+    res = pgconn.exec_(b"select wat")
+    assert b"wat" in res.error_message
+
+
+def test_error_field(pq, pgconn):
+    res = pgconn.exec_(b"select wat")
+    assert res.error_field(pq.DiagnosticField.PG_DIAG_SEVERITY) == b"ERROR"
+    assert res.error_field(pq.DiagnosticField.PG_DIAG_SQLSTATE) == b"42703"
+    assert b"wat" in res.error_field(
+        pq.DiagnosticField.PG_DIAG_MESSAGE_PRIMARY
+    )
+
+
+@pytest.mark.parametrize("n", range(4))
+def test_ntuples(pgconn, n):
+    res = pgconn.exec_params(
+        b"select generate_series(1, $1)", [str(n).encode("ascii")]
+    )
+    assert res.ntuples == n
+
+
+def test_nfields(pgconn):
+    res = pgconn.exec_(b"select 1, 2, 3")
+    assert res.nfields == 3
+    res = pgconn.exec_(b"select wat")
+    assert res.nfields == 0
+
+
+def test_fname(pgconn):
+    res = pgconn.exec_(b'select 1 as foo, 2 as "BAR"')
+    assert res.fname(0) == b"foo"
+    assert res.fname(1) == b"BAR"
+
+
+def test_ftable_and_col(pq, pgconn):
+    res = pgconn.exec_(
+        b"""
+        drop table if exists t1, t2;
+        create table t1 as select 1 as f1;
+        create table t2 as select 2 as f2, 3 as f3;
+        """
+    )
+    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message
+
+    res = pgconn.exec_(
+        b"select f1, f3, 't1'::regclass::oid, 't2'::regclass::oid from t1, t2"
+    )
+    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+
+    assert res.ftable(0) == int(res.get_value(0, 2).decode("ascii"))
+    assert res.ftable(1) == int(res.get_value(0, 3).decode("ascii"))
+    assert res.ftablecol(0) == 1
+    assert res.ftablecol(1) == 2
+
+
+@pytest.mark.parametrize("fmt", (0, 1))
+def test_fformat(pq, pgconn, fmt):
+    res = pgconn.exec_params(b"select 1", [], result_format=fmt)
+    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+    assert res.fformat(0) == fmt
+    assert res.binary_tuples == fmt
+
+
+def test_ftype(pq, pgconn):
+    res = pgconn.exec_(b"select 1::int, 1::numeric, 1::text")
+    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+    assert res.ftype(0) == 23
+    assert res.ftype(1) == 1700
+    assert res.ftype(2) == 25
+
+
+def test_fmod(pq, pgconn):
+    res = pgconn.exec_(b"select 1::int, 1::numeric(10), 1::numeric(10,2)")
+    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+    assert res.fmod(0) == -1
+    assert res.fmod(1) == 0xA0004
+    assert res.fmod(2) == 0xA0006
+
+
+def test_fsize(pq, pgconn):
+    res = pgconn.exec_(b"select 1::int, 1::bigint, 1::text")
+    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+    assert res.fsize(0) == 4
+    assert res.fsize(1) == 8
+    assert res.fsize(2) == -1
+
+
+def test_get_value(pq, pgconn):
+    res = pgconn.exec_(b"select 'a', '', NULL")
+    assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message
+    assert res.get_value(0, 0) == b"a"
+    assert res.get_value(0, 1) == b""
+    assert res.get_value(0, 2) is None