From: Daniele Varrazzo Date: Sun, 15 Mar 2020 13:24:11 +0000 (+1300) Subject: Added tests for PGresult X-Git-Tag: 3.0.dev0~708 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=e93b626c0a5500c89e8752be15df36789ac43c2b;p=thirdparty%2Fpsycopg.git Added tests for PGresult --- diff --git a/psycopg3/pq/__init__.py b/psycopg3/pq/__init__.py index 0d3aec5e5..641610137 100644 --- a/psycopg3/pq/__init__.py +++ b/psycopg3/pq/__init__.py @@ -15,6 +15,7 @@ from .enums import ( ExecStatus, TransactionStatus, Ping, + DiagnosticField, ) from . import pq_ctypes as pq_module @@ -29,6 +30,7 @@ __all__ = ( "TransactionStatus", "ExecStatus", "Ping", + "DiagnosticField", "PGconn", "Conninfo", "PQerror", diff --git a/tests/pq/test_exec.py b/tests/pq/test_exec.py index 226ded78a..cfbe6de2d 100644 --- a/tests/pq/test_exec.py +++ b/tests/pq/test_exec.py @@ -8,42 +8,26 @@ def test_exec_none(pq, pgconn): pgconn.exec_(None) -def test_exec_empty(pq, pgconn): - res = pgconn.exec_(b"") - assert res.status == pq.ExecStatus.PGRES_EMPTY_QUERY - - -def test_exec_command(pq, pgconn): - res = pgconn.exec_(b"set timezone to utc") - assert res.status == pq.ExecStatus.PGRES_COMMAND_OK - - -def test_exec_error(pq, pgconn): - res = pgconn.exec_(b"wat") - assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR +def test_exec(pq, pgconn): + res = pgconn.exec_(b"select 'hel' || 'lo'") + assert res.get_value(0, 0) == b"hello" def test_exec_params(pq, pgconn): res = pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"]) assert res.status == pq.ExecStatus.PGRES_TUPLES_OK - assert res.nfields == 1 - assert res.ntuples == 1 assert res.get_value(0, 0) == b"8" def test_exec_params_empty(pq, pgconn): res = pgconn.exec_params(b"select 8::int", []) assert res.status == pq.ExecStatus.PGRES_TUPLES_OK - assert res.nfields == 1 - assert res.ntuples == 1 assert res.get_value(0, 0) == b"8" def test_exec_params_types(pq, pgconn): res = pgconn.exec_params(b"select $1, $2", [b"8", b"8"], [1700, 23]) assert res.status == pq.ExecStatus.PGRES_TUPLES_OK - assert res.nfields == 2 - assert res.ntuples == 1 assert res.get_value(0, 0) == b"8" assert res.ftype(0) == 1700 assert res.get_value(0, 1) == b"8" @@ -56,8 +40,6 @@ def test_exec_params_types(pq, pgconn): def test_exec_params_nulls(pq, pgconn): res = pgconn.exec_params(b"select $1, $2, $3", [b"hi", b"", None]) assert res.status == pq.ExecStatus.PGRES_TUPLES_OK - assert res.nfields == 3 - assert res.ntuples == 1 assert res.get_value(0, 0) == b"hi" assert res.get_value(0, 1) == b"" assert res.get_value(0, 2) is None diff --git a/tests/pq/test_pgresult.py b/tests/pq/test_pgresult.py new file mode 100644 index 000000000..530a9748b --- /dev/null +++ b/tests/pq/test_pgresult.py @@ -0,0 +1,113 @@ +import pytest + + +@pytest.mark.parametrize( + "command, status", + [ + (b"", "PGRES_EMPTY_QUERY"), + (b"select 1", "PGRES_TUPLES_OK"), + (b"set timezone to utc", "PGRES_COMMAND_OK"), + (b"wat", "PGRES_FATAL_ERROR"), + ], +) +def test_status(pq, pgconn, command, status): + res = pgconn.exec_(command) + assert res.status == getattr(pq.ExecStatus, status) + + +def test_error_message(pgconn): + res = pgconn.exec_(b"select 1") + assert res.error_message == b"" + res = pgconn.exec_(b"select wat") + assert b"wat" in res.error_message + + +def test_error_field(pq, pgconn): + res = pgconn.exec_(b"select wat") + assert res.error_field(pq.DiagnosticField.PG_DIAG_SEVERITY) == b"ERROR" + assert res.error_field(pq.DiagnosticField.PG_DIAG_SQLSTATE) == b"42703" + assert b"wat" in res.error_field( + pq.DiagnosticField.PG_DIAG_MESSAGE_PRIMARY + ) + + +@pytest.mark.parametrize("n", range(4)) +def test_ntuples(pgconn, n): + res = pgconn.exec_params( + b"select generate_series(1, $1)", [str(n).encode("ascii")] + ) + assert res.ntuples == n + + +def test_nfields(pgconn): + res = pgconn.exec_(b"select 1, 2, 3") + assert res.nfields == 3 + res = pgconn.exec_(b"select wat") + assert res.nfields == 0 + + +def test_fname(pgconn): + res = pgconn.exec_(b'select 1 as foo, 2 as "BAR"') + assert res.fname(0) == b"foo" + assert res.fname(1) == b"BAR" + + +def test_ftable_and_col(pq, pgconn): + res = pgconn.exec_( + b""" + drop table if exists t1, t2; + create table t1 as select 1 as f1; + create table t2 as select 2 as f2, 3 as f3; + """ + ) + assert res.status == pq.ExecStatus.PGRES_COMMAND_OK, res.error_message + + res = pgconn.exec_( + b"select f1, f3, 't1'::regclass::oid, 't2'::regclass::oid from t1, t2" + ) + assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message + + assert res.ftable(0) == int(res.get_value(0, 2).decode("ascii")) + assert res.ftable(1) == int(res.get_value(0, 3).decode("ascii")) + assert res.ftablecol(0) == 1 + assert res.ftablecol(1) == 2 + + +@pytest.mark.parametrize("fmt", (0, 1)) +def test_fformat(pq, pgconn, fmt): + res = pgconn.exec_params(b"select 1", [], result_format=fmt) + assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message + assert res.fformat(0) == fmt + assert res.binary_tuples == fmt + + +def test_ftype(pq, pgconn): + res = pgconn.exec_(b"select 1::int, 1::numeric, 1::text") + assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message + assert res.ftype(0) == 23 + assert res.ftype(1) == 1700 + assert res.ftype(2) == 25 + + +def test_fmod(pq, pgconn): + res = pgconn.exec_(b"select 1::int, 1::numeric(10), 1::numeric(10,2)") + assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message + assert res.fmod(0) == -1 + assert res.fmod(1) == 0xA0004 + assert res.fmod(2) == 0xA0006 + + +def test_fsize(pq, pgconn): + res = pgconn.exec_(b"select 1::int, 1::bigint, 1::text") + assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message + assert res.fsize(0) == 4 + assert res.fsize(1) == 8 + assert res.fsize(2) == -1 + + +def test_get_value(pq, pgconn): + res = pgconn.exec_(b"select 'a', '', NULL") + assert res.status == pq.ExecStatus.PGRES_TUPLES_OK, res.error_message + assert res.get_value(0, 0) == b"a" + assert res.get_value(0, 1) == b"" + assert res.get_value(0, 2) is None