Just use the module as a normal program would to.
import os
-
import pytest
+from psycopg3 import pq
+
def pytest_addoption(parser):
parser.addoption(
@pytest.fixture
-def pgconn(pq, dsn):
+def pgconn(dsn):
"""Return a PGconn connection open to `--test-dsn`."""
conn = pq.PGconn.connect(dsn.encode("utf8"))
if conn.status != pq.ConnStatus.OK:
@pytest.fixture
-async def aconn(dsn, pq):
+async def aconn(dsn):
"""Return an `AsyncConnection` connected to the ``--test-dsn`` database."""
from psycopg3 import AsyncConnection
import operator
import pytest
+from psycopg3 import pq
-def pytest_report_header(config):
- try:
- from psycopg3 import pq
- return [
- f"libpq available: {pq.version()}",
- f"libpq wrapper implementation: {pq.__impl__}",
- ]
- except Exception:
- # you will die of something else
- pass
+def pytest_report_header(config):
+ return [
+ f"libpq available: {pq.version()}",
+ f"libpq wrapper implementation: {pq.__impl__}",
+ ]
def pytest_configure(config):
)
-@pytest.fixture
-def pq(request):
- """The libpq module wrapper to test."""
- from psycopg3 import pq
-
- for m in request.node.iter_markers(name="libpq"):
+def pytest_runtest_setup(item):
+ for m in item.iter_markers(name="libpq"):
check_libpq_version(pq.version(), m.args)
- return pq
-
@pytest.fixture
def libpq():
import pytest
from select import select
import psycopg3
+from psycopg3 import pq
from psycopg3.generators import execute
-def test_send_query(pq, pgconn):
+def test_send_query(pgconn):
# This test shows how to process an async query in all its glory
pgconn.nonblocking = 1
assert results[1].get_value(0, 0) == b"1"
-def test_send_query_compact_test(pq, pgconn):
+def test_send_query_compact_test(pgconn):
# Like the above test but use psycopg3 facilities for compactness
pgconn.send_query(
b"/* %s */ select pg_sleep(0.01); select 1 as foo;"
pgconn.send_query(b"select 1")
-def test_send_query_params(pq, pgconn):
+def test_send_query_params(pgconn):
pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"])
(res,) = psycopg3.waiting.wait(execute(pgconn))
assert res.status == pq.ExecStatus.TUPLES_OK
pgconn.send_query_params(b"select $1", [b"1"])
-def test_send_prepare(pq, pgconn):
+def test_send_prepare(pgconn):
pgconn.send_prepare(b"prep", b"select $1::int + $2::int")
(res,) = psycopg3.waiting.wait(execute(pgconn))
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_query_prepared(b"prep", [b"3", b"5"])
-def test_send_prepare_types(pq, pgconn):
+def test_send_prepare_types(pgconn):
pgconn.send_prepare(b"prep", b"select $1 + $2", [23, 23])
(res,) = psycopg3.waiting.wait(execute(pgconn))
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
assert res.get_value(0, 0) == b"8"
-def test_send_prepared_binary_in(pq, pgconn):
+def test_send_prepared_binary_in(pgconn):
val = b"foo\00bar"
pgconn.send_prepare(b"", b"select length($1::bytea), length($2::bytea)")
(res,) = psycopg3.waiting.wait(execute(pgconn))
@pytest.mark.parametrize(
"fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")]
)
-def test_send_prepared_binary_out(pq, pgconn, fmt, out):
+def test_send_prepared_binary_out(pgconn, fmt, out):
val = b"foo\00bar"
pgconn.send_prepare(b"", b"select $1::bytea")
(res,) = psycopg3.waiting.wait(execute(pgconn))
import pytest
+from psycopg3 import pq
-def test_defaults(pq, monkeypatch):
+
+def test_defaults(monkeypatch):
monkeypatch.setenv("PGPORT", "15432")
defs = pq.Conninfo.get_defaults()
assert len(defs) > 20
assert port.dispsize == 6
-def test_conninfo_parse(pq):
+def test_conninfo_parse():
info = pq.Conninfo.parse(
b"postgresql://host1:123,host2:456/somedb"
b"?target_session_attrs=any&application_name=myapp"
assert info[b"application_name"] == b"myapp"
-def test_conninfo_parse_bad(pq):
+def test_conninfo_parse_bad():
with pytest.raises(pq.PQerror) as e:
pq.Conninfo.parse(b"bad_conninfo=")
assert "bad_conninfo" in str(e.value)
import pytest
import psycopg3
+from psycopg3 import pq
@pytest.mark.parametrize(
"data", [(b"hello\00world"), (b"\00\00\00\00")],
)
-def test_escape_bytea(pq, pgconn, data):
+def test_escape_bytea(pgconn, data):
exp = br"\x" + b"".join(b"%02x" % c for c in data)
esc = pq.Escaping(pgconn)
rv = esc.escape_bytea(data)
esc.escape_bytea(data)
-def test_escape_noconn(pq, pgconn):
+def test_escape_noconn(pgconn):
data = bytes(range(256))
esc = pq.Escaping()
escdata = esc.escape_bytea(data)
assert res.get_value(0, 0) == data
-def test_escape_1char(pq, pgconn):
+def test_escape_1char(pgconn):
esc = pq.Escaping(pgconn)
for c in range(256):
rv = esc.escape_bytea(bytes([c]))
@pytest.mark.parametrize(
"data", [(b"hello\00world"), (b"\00\00\00\00")],
)
-def test_unescape_bytea(pq, pgconn, data):
+def test_unescape_bytea(pgconn, data):
enc = br"\x" + b"".join(b"%02x" % c for c in data)
esc = pq.Escaping(pgconn)
rv = esc.unescape_bytea(enc)
import pytest
import psycopg3
+from psycopg3 import pq
-def test_exec_none(pq, pgconn):
+def test_exec_none(pgconn):
with pytest.raises(TypeError):
pgconn.exec_(None)
-def test_exec(pq, pgconn):
+def test_exec(pgconn):
res = pgconn.exec_(b"select 'hel' || 'lo'")
assert res.get_value(0, 0) == b"hello"
pgconn.finish()
pgconn.exec_(b"select 'hello'")
-def test_exec_params(pq, pgconn):
+def test_exec_params(pgconn):
res = pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"])
assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"8"
pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"])
-def test_exec_params_empty(pq, pgconn):
+def test_exec_params_empty(pgconn):
res = pgconn.exec_params(b"select 8::int", [])
assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"8"
-def test_exec_params_types(pq, pgconn):
+def test_exec_params_types(pgconn):
res = pgconn.exec_params(b"select $1, $2", [b"8", b"8"], [1700, 23])
assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"8"
pgconn.exec_params(b"select $1, $2", [b"8", b"8"], [1700])
-def test_exec_params_nulls(pq, pgconn):
+def test_exec_params_nulls(pgconn):
res = pgconn.exec_params(
b"select $1::text, $2::text, $3::text", [b"hi", b"", None]
)
assert res.get_value(0, 2) is None
-def test_exec_params_binary_in(pq, pgconn):
+def test_exec_params_binary_in(pgconn):
val = b"foo\00bar"
res = pgconn.exec_params(
b"select length($1::bytea), length($2::bytea)",
@pytest.mark.parametrize(
"fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")]
)
-def test_exec_params_binary_out(pq, pgconn, fmt, out):
+def test_exec_params_binary_out(pgconn, fmt, out):
val = b"foo\00bar"
res = pgconn.exec_params(
b"select $1::bytea", [val], param_formats=[1], result_format=fmt
assert res.get_value(0, 0) == out
-def test_prepare(pq, pgconn):
+def test_prepare(pgconn):
res = pgconn.prepare(b"prep", b"select $1::int + $2::int")
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.exec_prepared(b"prep", [b"3", b"5"])
-def test_prepare_types(pq, pgconn):
+def test_prepare_types(pgconn):
res = pgconn.prepare(b"prep", b"select $1 + $2", [23, 23])
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
assert res.get_value(0, 0) == b"8"
-def test_exec_prepared_binary_in(pq, pgconn):
+def test_exec_prepared_binary_in(pgconn):
val = b"foo\00bar"
res = pgconn.prepare(b"", b"select length($1::bytea), length($2::bytea)")
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
@pytest.mark.parametrize(
"fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")]
)
-def test_exec_prepared_binary_out(pq, pgconn, fmt, out):
+def test_exec_prepared_binary_out(pgconn, fmt, out):
val = b"foo\00bar"
res = pgconn.prepare(b"", b"select $1::bytea")
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
assert res.get_value(0, 0) == out
-def test_describe_portal(pq, pgconn):
+def test_describe_portal(pgconn):
res = pgconn.exec_(
b"""
begin;
import pytest
+from psycopg3 import pq
-def test_error_message(pq, pgconn):
+
+def test_error_message(pgconn):
res = pgconn.exec_(b"wat")
assert res.status == pq.ExecStatus.FATAL_ERROR
msg = pq.error_message(pgconn)
import pytest
import psycopg3
+from psycopg3 import pq
import psycopg3.generators
-def test_connectdb(pq, dsn):
+def test_connectdb(dsn):
conn = pq.PGconn.connect(dsn.encode("utf8"))
assert conn.status == pq.ConnStatus.OK, conn.error_message
-def test_connectdb_error(pq):
+def test_connectdb_error():
conn = pq.PGconn.connect(b"dbname=psycopg3_test_not_for_real")
assert conn.status == pq.ConnStatus.BAD
@pytest.mark.parametrize("baddsn", [None, 42])
-def test_connectdb_badtype(pq, baddsn):
+def test_connectdb_badtype(baddsn):
with pytest.raises(TypeError):
pq.PGconn.connect(baddsn)
-def test_connect_async(pq, dsn):
+def test_connect_async(dsn):
conn = pq.PGconn.connect_start(dsn.encode("utf8"))
conn.nonblocking = 1
while 1:
conn.connect_poll()
-def test_connect_async_bad(pq, dsn):
+def test_connect_async_bad(dsn):
conn = pq.PGconn.connect_start(b"dbname=psycopg3_test_not_for_real")
while 1:
assert conn.status != pq.ConnStatus.BAD
assert conn.status == pq.ConnStatus.BAD
-def test_finish(pgconn, pq):
+def test_finish(pgconn):
assert pgconn.status == pq.ConnStatus.OK
pgconn.finish()
assert pgconn.status == pq.ConnStatus.BAD
assert pgconn.status == pq.ConnStatus.BAD
-def test_weakref(pq, dsn):
+def test_weakref(dsn):
conn = pq.PGconn.connect(dsn.encode("utf8"))
w = weakref.ref(conn)
conn.finish()
assert pgconn.pgconn_ptr is None
-def test_info(pq, dsn, pgconn):
+def test_info(dsn, pgconn):
info = pgconn.info
assert len(info) > 20
dbname = [d for d in info if d.keyword == b"dbname"][0]
pgconn.info
-def test_reset(pq, pgconn):
+def test_reset(pgconn):
assert pgconn.status == pq.ConnStatus.OK
pgconn.exec_(b"select pg_terminate_backend(pg_backend_pid())")
assert pgconn.status == pq.ConnStatus.BAD
assert pgconn.status == pq.ConnStatus.BAD
-def test_reset_async(pq, pgconn):
+def test_reset_async(pgconn):
assert pgconn.status == pq.ConnStatus.OK
pgconn.exec_(b"select pg_terminate_backend(pg_backend_pid())")
assert pgconn.status == pq.ConnStatus.BAD
pgconn.reset_poll()
-def test_ping(pq, dsn):
+def test_ping(dsn):
rv = pq.PGconn.ping(dsn.encode("utf8"))
assert rv == pq.Ping.OK
pgconn.tty
-def test_transaction_status(pq, pgconn):
+def test_transaction_status(pgconn):
assert pgconn.transaction_status == pq.TransactionStatus.IDLE
pgconn.exec_(b"begin")
assert pgconn.transaction_status == pq.TransactionStatus.INTRANS
assert pgconn.transaction_status == pq.TransactionStatus.UNKNOWN
-def test_parameter_status(pq, dsn, monkeypatch):
+def test_parameter_status(dsn, monkeypatch):
monkeypatch.setenv("PGAPPNAME", "psycopg3 tests")
pgconn = pq.PGconn.connect(dsn.encode("utf8"))
assert pgconn.parameter_status(b"application_name") == b"psycopg3 tests"
pgconn.parameter_status(b"application_name")
-def test_encoding(pq, pgconn):
+def test_encoding(pgconn):
res = pgconn.exec_(b"set client_encoding to latin1")
assert res.status == pq.ExecStatus.COMMAND_OK
assert pgconn.parameter_status(b"client_encoding") == b"LATIN1"
pgconn.server_version
-def test_error_message(pq, pgconn):
+def test_error_message(pgconn):
assert pgconn.error_message == b""
res = pgconn.exec_(b"wat")
assert res.status == pq.ExecStatus.FATAL_ERROR
pgconn.needs_password
-def test_used_password(pq, pgconn, dsn, monkeypatch):
+def test_used_password(pgconn, dsn, monkeypatch):
assert isinstance(pgconn.used_password, bool)
# Assume that if a password was passed then it was needed.
pgconn.ssl_in_use
-def test_make_empty_result(pq, pgconn):
+def test_make_empty_result(pgconn):
pgconn.exec_(b"wat")
res = pgconn.make_empty_result(pq.ExecStatus.FATAL_ERROR)
assert res.status == pq.ExecStatus.FATAL_ERROR
assert res.error_message == b""
-def test_notice_nohandler(pq, pgconn):
+def test_notice_nohandler(pgconn):
pgconn.exec_(b"set client_min_messages to notice")
res = pgconn.exec_(
b"do $$begin raise notice 'hello notice'; end$$ language plpgsql"
assert res.status == pq.ExecStatus.COMMAND_OK
-def test_notice(pq, pgconn):
+def test_notice(pgconn):
msgs = []
def callback(res):
assert msgs and msgs[0] == b"hello notice"
-def test_notice_error(pq, pgconn, caplog):
+def test_notice_error(pgconn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg3")
def callback(res):
import ctypes
import pytest
+from psycopg3 import pq
+
@pytest.mark.parametrize(
"command, status",
(b"wat", "FATAL_ERROR"),
],
)
-def test_status(pq, pgconn, command, status):
+def test_status(pgconn, command, status):
res = pgconn.exec_(command)
assert res.status == getattr(pq.ExecStatus, status)
-def test_clear(pq, pgconn):
+def test_clear(pgconn):
res = pgconn.exec_(b"select 1")
assert res.status == pq.ExecStatus.TUPLES_OK
res.clear()
assert res.error_message == b""
-def test_error_field(pq, pgconn):
+def test_error_field(pgconn):
res = pgconn.exec_(b"select wat")
assert res.error_field(pq.DiagnosticField.SEVERITY) == b"ERROR"
assert res.error_field(pq.DiagnosticField.SQLSTATE) == b"42703"
assert res.fname(0) is None
-def test_ftable_and_col(pq, pgconn):
+def test_ftable_and_col(pgconn):
res = pgconn.exec_(
b"""
drop table if exists t1, t2;
@pytest.mark.parametrize("fmt", (0, 1))
-def test_fformat(pq, pgconn, fmt):
+def test_fformat(pgconn, fmt):
res = pgconn.exec_params(b"select 1", [], result_format=fmt)
assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
assert res.fformat(0) == fmt
assert res.binary_tuples == 0
-def test_ftype(pq, pgconn):
+def test_ftype(pgconn):
res = pgconn.exec_(b"select 1::int, 1::numeric, 1::text")
assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
assert res.ftype(0) == 23
assert res.ftype(0) == 0
-def test_fmod(pq, pgconn):
+def test_fmod(pgconn):
res = pgconn.exec_(b"select 1::int, 1::numeric(10), 1::numeric(10,2)")
assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
assert res.fmod(0) == -1
assert res.fmod(0) == 0
-def test_fsize(pq, pgconn):
+def test_fsize(pgconn):
res = pgconn.exec_(b"select 1::int, 1::bigint, 1::text")
assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
assert res.fsize(0) == 4
assert res.fsize(0) == 0
-def test_get_value(pq, pgconn):
+def test_get_value(pgconn):
res = pgconn.exec_(b"select 'a', '', NULL")
assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
assert res.get_value(0, 0) == b"a"
assert res.get_value(0, 0) is None
-def test_nparams_types(pq, pgconn):
+def test_nparams_types(pgconn):
res = pgconn.prepare(b"", b"select $1::int, $2::text")
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
assert res.param_type(0) == 0
-def test_command_status(pq, pgconn):
+def test_command_status(pgconn):
res = pgconn.exec_(b"select 1")
assert res.command_status == b"SELECT 1"
res = pgconn.exec_(b"set timezone to utf8")
assert res.command_status is None
-def test_command_tuples(pq, pgconn):
+def test_command_tuples(pgconn):
res = pgconn.exec_(b"set timezone to utf8")
assert res.command_tuples is None
res = pgconn.exec_(b"select * from generate_series(1, 10)")
assert res.command_tuples is None
-def test_oid_value(pq, pgconn):
+def test_oid_value(pgconn):
res = pgconn.exec_(b"select 1")
assert res.oid_value == 0
res.clear()
-def test_version(pq):
+from psycopg3 import pq
+
+
+def test_version():
rv = pq.version()
assert rv > 90500
assert rv < 200000 # you are good for a while
import pytest
+from psycopg3 import pq
from psycopg3 import errors as e
eur = "\u20ac"
assert diag.severity == "ERROR"
-def test_diag_all_attrs(pgconn, pq):
+def test_diag_all_attrs(pgconn):
res = pgconn.make_empty_result(pq.ExecStatus.NONFATAL_ERROR)
diag = e.Diagnostic(res)
for d in pq.DiagnosticField:
assert val is None or isinstance(val, str)
-def test_diag_right_attr(pgconn, pq, monkeypatch):
+def test_diag_right_attr(pgconn, monkeypatch):
res = pgconn.make_empty_result(pq.ExecStatus.NONFATAL_ERROR)
diag = e.Diagnostic(res)