]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Dropped pq fixture
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 23 May 2020 03:36:15 +0000 (15:36 +1200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 23 May 2020 03:36:15 +0000 (15:36 +1200)
Just use the module as a normal program would to.

tests/fix_db.py
tests/fix_pq.py
tests/pq/test_async.py
tests/pq/test_conninfo.py
tests/pq/test_escaping.py
tests/pq/test_exec.py
tests/pq/test_misc.py
tests/pq/test_pgconn.py
tests/pq/test_pgresult.py
tests/pq/test_pq.py
tests/test_errors.py

index 9d53b7868cf5c28fae16900f502796f7723082fd..2eb890259767ec694b4593b902b945a75894bdfd 100644 (file)
@@ -1,7 +1,8 @@
 import os
-
 import pytest
 
+from psycopg3 import pq
+
 
 def pytest_addoption(parser):
     parser.addoption(
@@ -23,7 +24,7 @@ def dsn(request):
 
 
 @pytest.fixture
-def pgconn(pq, dsn):
+def pgconn(dsn):
     """Return a PGconn connection open to `--test-dsn`."""
     conn = pq.PGconn.connect(dsn.encode("utf8"))
     if conn.status != pq.ConnStatus.OK:
@@ -45,7 +46,7 @@ def conn(dsn):
 
 
 @pytest.fixture
-async def aconn(dsn, pq):
+async def aconn(dsn):
     """Return an `AsyncConnection` connected to the ``--test-dsn`` database."""
     from psycopg3 import AsyncConnection
 
index aaf93304600c045b0cb8aa48a61f096a4177bde3..ae1e2a399f61ff265f864eaec43b9ed2e28eb7eb 100644 (file)
@@ -2,18 +2,14 @@ import re
 import operator
 import pytest
 
+from psycopg3 import pq
 
-def pytest_report_header(config):
-    try:
-        from psycopg3 import pq
 
-        return [
-            f"libpq available: {pq.version()}",
-            f"libpq wrapper implementation: {pq.__impl__}",
-        ]
-    except Exception:
-        # you will die of something else
-        pass
+def pytest_report_header(config):
+    return [
+        f"libpq available: {pq.version()}",
+        f"libpq wrapper implementation: {pq.__impl__}",
+    ]
 
 
 def pytest_configure(config):
@@ -25,16 +21,10 @@ def pytest_configure(config):
     )
 
 
-@pytest.fixture
-def pq(request):
-    """The libpq module wrapper to test."""
-    from psycopg3 import pq
-
-    for m in request.node.iter_markers(name="libpq"):
+def pytest_runtest_setup(item):
+    for m in item.iter_markers(name="libpq"):
         check_libpq_version(pq.version(), m.args)
 
-    return pq
-
 
 @pytest.fixture
 def libpq():
index 47378e464498d5492394906cc6fdef5bd174ae5f..9fddde267881a4a5fe20ba63e707dda427d0ec75 100644 (file)
@@ -1,10 +1,11 @@
 import pytest
 from select import select
 import psycopg3
+from psycopg3 import pq
 from psycopg3.generators import execute
 
 
-def test_send_query(pq, pgconn):
+def test_send_query(pgconn):
     # This test shows how to process an async query in all its glory
     pgconn.nonblocking = 1
 
@@ -56,7 +57,7 @@ def test_send_query(pq, pgconn):
     assert results[1].get_value(0, 0) == b"1"
 
 
-def test_send_query_compact_test(pq, pgconn):
+def test_send_query_compact_test(pgconn):
     # Like the above test but use psycopg3 facilities for compactness
     pgconn.send_query(
         b"/* %s */ select pg_sleep(0.01); select 1 as foo;"
@@ -77,7 +78,7 @@ def test_send_query_compact_test(pq, pgconn):
         pgconn.send_query(b"select 1")
 
 
-def test_send_query_params(pq, pgconn):
+def test_send_query_params(pgconn):
     pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"])
     (res,) = psycopg3.waiting.wait(execute(pgconn))
     assert res.status == pq.ExecStatus.TUPLES_OK
@@ -88,7 +89,7 @@ def test_send_query_params(pq, pgconn):
         pgconn.send_query_params(b"select $1", [b"1"])
 
 
-def test_send_prepare(pq, pgconn):
+def test_send_prepare(pgconn):
     pgconn.send_prepare(b"prep", b"select $1::int + $2::int")
     (res,) = psycopg3.waiting.wait(execute(pgconn))
     assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
@@ -104,7 +105,7 @@ def test_send_prepare(pq, pgconn):
         pgconn.send_query_prepared(b"prep", [b"3", b"5"])
 
 
-def test_send_prepare_types(pq, pgconn):
+def test_send_prepare_types(pgconn):
     pgconn.send_prepare(b"prep", b"select $1 + $2", [23, 23])
     (res,) = psycopg3.waiting.wait(execute(pgconn))
     assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
@@ -114,7 +115,7 @@ def test_send_prepare_types(pq, pgconn):
     assert res.get_value(0, 0) == b"8"
 
 
-def test_send_prepared_binary_in(pq, pgconn):
+def test_send_prepared_binary_in(pgconn):
     val = b"foo\00bar"
     pgconn.send_prepare(b"", b"select length($1::bytea), length($2::bytea)")
     (res,) = psycopg3.waiting.wait(execute(pgconn))
@@ -133,7 +134,7 @@ def test_send_prepared_binary_in(pq, pgconn):
 @pytest.mark.parametrize(
     "fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")]
 )
-def test_send_prepared_binary_out(pq, pgconn, fmt, out):
+def test_send_prepared_binary_out(pgconn, fmt, out):
     val = b"foo\00bar"
     pgconn.send_prepare(b"", b"select $1::bytea")
     (res,) = psycopg3.waiting.wait(execute(pgconn))
index 176d52f5433545eeb84249d17958079f25923692..729d9ab212d54914f45803fc0fa2e90d675417f2 100644 (file)
@@ -1,7 +1,9 @@
 import pytest
 
+from psycopg3 import pq
 
-def test_defaults(pq, monkeypatch):
+
+def test_defaults(monkeypatch):
     monkeypatch.setenv("PGPORT", "15432")
     defs = pq.Conninfo.get_defaults()
     assert len(defs) > 20
@@ -14,7 +16,7 @@ def test_defaults(pq, monkeypatch):
     assert port.dispsize == 6
 
 
-def test_conninfo_parse(pq):
+def test_conninfo_parse():
     info = pq.Conninfo.parse(
         b"postgresql://host1:123,host2:456/somedb"
         b"?target_session_attrs=any&application_name=myapp"
@@ -26,7 +28,7 @@ def test_conninfo_parse(pq):
     assert info[b"application_name"] == b"myapp"
 
 
-def test_conninfo_parse_bad(pq):
+def test_conninfo_parse_bad():
     with pytest.raises(pq.PQerror) as e:
         pq.Conninfo.parse(b"bad_conninfo=")
         assert "bad_conninfo" in str(e.value)
index 86062175f9a4bd37aefc163378880fecceeaffe2..3dd28210860152796ed9b47ce2d8b32f95f92982 100644 (file)
@@ -1,12 +1,13 @@
 import pytest
 
 import psycopg3
+from psycopg3 import pq
 
 
 @pytest.mark.parametrize(
     "data", [(b"hello\00world"), (b"\00\00\00\00")],
 )
-def test_escape_bytea(pq, pgconn, data):
+def test_escape_bytea(pgconn, data):
     exp = br"\x" + b"".join(b"%02x" % c for c in data)
     esc = pq.Escaping(pgconn)
     rv = esc.escape_bytea(data)
@@ -17,7 +18,7 @@ def test_escape_bytea(pq, pgconn, data):
         esc.escape_bytea(data)
 
 
-def test_escape_noconn(pq, pgconn):
+def test_escape_noconn(pgconn):
     data = bytes(range(256))
     esc = pq.Escaping()
     escdata = esc.escape_bytea(data)
@@ -28,7 +29,7 @@ def test_escape_noconn(pq, pgconn):
     assert res.get_value(0, 0) == data
 
 
-def test_escape_1char(pq, pgconn):
+def test_escape_1char(pgconn):
     esc = pq.Escaping(pgconn)
     for c in range(256):
         rv = esc.escape_bytea(bytes([c]))
@@ -39,7 +40,7 @@ def test_escape_1char(pq, pgconn):
 @pytest.mark.parametrize(
     "data", [(b"hello\00world"), (b"\00\00\00\00")],
 )
-def test_unescape_bytea(pq, pgconn, data):
+def test_unescape_bytea(pgconn, data):
     enc = br"\x" + b"".join(b"%02x" % c for c in data)
     esc = pq.Escaping(pgconn)
     rv = esc.unescape_bytea(enc)
index f090e0afd7214da112dab288c163c0f194c9e730..bb80bbfeb129f963a571650a5a628f92bbb76741 100644 (file)
@@ -3,14 +3,15 @@
 import pytest
 
 import psycopg3
+from psycopg3 import pq
 
 
-def test_exec_none(pq, pgconn):
+def test_exec_none(pgconn):
     with pytest.raises(TypeError):
         pgconn.exec_(None)
 
 
-def test_exec(pq, pgconn):
+def test_exec(pgconn):
     res = pgconn.exec_(b"select 'hel' || 'lo'")
     assert res.get_value(0, 0) == b"hello"
     pgconn.finish()
@@ -18,7 +19,7 @@ def test_exec(pq, pgconn):
         pgconn.exec_(b"select 'hello'")
 
 
-def test_exec_params(pq, pgconn):
+def test_exec_params(pgconn):
     res = pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"])
     assert res.status == pq.ExecStatus.TUPLES_OK
     assert res.get_value(0, 0) == b"8"
@@ -27,13 +28,13 @@ def test_exec_params(pq, pgconn):
         pgconn.exec_params(b"select $1::int + $2", [b"5", b"3"])
 
 
-def test_exec_params_empty(pq, pgconn):
+def test_exec_params_empty(pgconn):
     res = pgconn.exec_params(b"select 8::int", [])
     assert res.status == pq.ExecStatus.TUPLES_OK
     assert res.get_value(0, 0) == b"8"
 
 
-def test_exec_params_types(pq, pgconn):
+def test_exec_params_types(pgconn):
     res = pgconn.exec_params(b"select $1, $2", [b"8", b"8"], [1700, 23])
     assert res.status == pq.ExecStatus.TUPLES_OK
     assert res.get_value(0, 0) == b"8"
@@ -45,7 +46,7 @@ def test_exec_params_types(pq, pgconn):
         pgconn.exec_params(b"select $1, $2", [b"8", b"8"], [1700])
 
 
-def test_exec_params_nulls(pq, pgconn):
+def test_exec_params_nulls(pgconn):
     res = pgconn.exec_params(
         b"select $1::text, $2::text, $3::text", [b"hi", b"", None]
     )
@@ -55,7 +56,7 @@ def test_exec_params_nulls(pq, pgconn):
     assert res.get_value(0, 2) is None
 
 
-def test_exec_params_binary_in(pq, pgconn):
+def test_exec_params_binary_in(pgconn):
     val = b"foo\00bar"
     res = pgconn.exec_params(
         b"select length($1::bytea), length($2::bytea)",
@@ -73,7 +74,7 @@ def test_exec_params_binary_in(pq, pgconn):
 @pytest.mark.parametrize(
     "fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")]
 )
-def test_exec_params_binary_out(pq, pgconn, fmt, out):
+def test_exec_params_binary_out(pgconn, fmt, out):
     val = b"foo\00bar"
     res = pgconn.exec_params(
         b"select $1::bytea", [val], param_formats=[1], result_format=fmt
@@ -82,7 +83,7 @@ def test_exec_params_binary_out(pq, pgconn, fmt, out):
     assert res.get_value(0, 0) == out
 
 
-def test_prepare(pq, pgconn):
+def test_prepare(pgconn):
     res = pgconn.prepare(b"prep", b"select $1::int + $2::int")
     assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
 
@@ -96,7 +97,7 @@ def test_prepare(pq, pgconn):
         pgconn.exec_prepared(b"prep", [b"3", b"5"])
 
 
-def test_prepare_types(pq, pgconn):
+def test_prepare_types(pgconn):
     res = pgconn.prepare(b"prep", b"select $1 + $2", [23, 23])
     assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
 
@@ -104,7 +105,7 @@ def test_prepare_types(pq, pgconn):
     assert res.get_value(0, 0) == b"8"
 
 
-def test_exec_prepared_binary_in(pq, pgconn):
+def test_exec_prepared_binary_in(pgconn):
     val = b"foo\00bar"
     res = pgconn.prepare(b"", b"select length($1::bytea), length($2::bytea)")
     assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
@@ -121,7 +122,7 @@ def test_exec_prepared_binary_in(pq, pgconn):
 @pytest.mark.parametrize(
     "fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")]
 )
-def test_exec_prepared_binary_out(pq, pgconn, fmt, out):
+def test_exec_prepared_binary_out(pgconn, fmt, out):
     val = b"foo\00bar"
     res = pgconn.prepare(b"", b"select $1::bytea")
     assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
@@ -133,7 +134,7 @@ def test_exec_prepared_binary_out(pq, pgconn, fmt, out):
     assert res.get_value(0, 0) == out
 
 
-def test_describe_portal(pq, pgconn):
+def test_describe_portal(pgconn):
     res = pgconn.exec_(
         b"""
         begin;
index d61457d3f037dec113ab364a72760c72692d86ba..3daed30459967a7ece887569f7037c42325d4efe 100644 (file)
@@ -1,7 +1,9 @@
 import pytest
 
+from psycopg3 import pq
 
-def test_error_message(pq, pgconn):
+
+def test_error_message(pgconn):
     res = pgconn.exec_(b"wat")
     assert res.status == pq.ExecStatus.FATAL_ERROR
     msg = pq.error_message(pgconn)
index 6dcace393079fc9a61e6faadbce2f2e0c2707cf2..a75fd70570f64ad0455ed7679d8023e0f16603de 100644 (file)
@@ -8,26 +8,27 @@ from select import select
 import pytest
 
 import psycopg3
+from psycopg3 import pq
 import psycopg3.generators
 
 
-def test_connectdb(pq, dsn):
+def test_connectdb(dsn):
     conn = pq.PGconn.connect(dsn.encode("utf8"))
     assert conn.status == pq.ConnStatus.OK, conn.error_message
 
 
-def test_connectdb_error(pq):
+def test_connectdb_error():
     conn = pq.PGconn.connect(b"dbname=psycopg3_test_not_for_real")
     assert conn.status == pq.ConnStatus.BAD
 
 
 @pytest.mark.parametrize("baddsn", [None, 42])
-def test_connectdb_badtype(pq, baddsn):
+def test_connectdb_badtype(baddsn):
     with pytest.raises(TypeError):
         pq.PGconn.connect(baddsn)
 
 
-def test_connect_async(pq, dsn):
+def test_connect_async(dsn):
     conn = pq.PGconn.connect_start(dsn.encode("utf8"))
     conn.nonblocking = 1
     while 1:
@@ -49,7 +50,7 @@ def test_connect_async(pq, dsn):
         conn.connect_poll()
 
 
-def test_connect_async_bad(pq, dsn):
+def test_connect_async_bad(dsn):
     conn = pq.PGconn.connect_start(b"dbname=psycopg3_test_not_for_real")
     while 1:
         assert conn.status != pq.ConnStatus.BAD
@@ -66,7 +67,7 @@ def test_connect_async_bad(pq, dsn):
     assert conn.status == pq.ConnStatus.BAD
 
 
-def test_finish(pgconn, pq):
+def test_finish(pgconn):
     assert pgconn.status == pq.ConnStatus.OK
     pgconn.finish()
     assert pgconn.status == pq.ConnStatus.BAD
@@ -74,7 +75,7 @@ def test_finish(pgconn, pq):
     assert pgconn.status == pq.ConnStatus.BAD
 
 
-def test_weakref(pq, dsn):
+def test_weakref(dsn):
     conn = pq.PGconn.connect(dsn.encode("utf8"))
     w = weakref.ref(conn)
     conn.finish()
@@ -96,7 +97,7 @@ def test_pgconn_ptr(pgconn, libpq):
     assert pgconn.pgconn_ptr is None
 
 
-def test_info(pq, dsn, pgconn):
+def test_info(dsn, pgconn):
     info = pgconn.info
     assert len(info) > 20
     dbname = [d for d in info if d.keyword == b"dbname"][0]
@@ -114,7 +115,7 @@ def test_info(pq, dsn, pgconn):
         pgconn.info
 
 
-def test_reset(pq, pgconn):
+def test_reset(pgconn):
     assert pgconn.status == pq.ConnStatus.OK
     pgconn.exec_(b"select pg_terminate_backend(pg_backend_pid())")
     assert pgconn.status == pq.ConnStatus.BAD
@@ -128,7 +129,7 @@ def test_reset(pq, pgconn):
     assert pgconn.status == pq.ConnStatus.BAD
 
 
-def test_reset_async(pq, pgconn):
+def test_reset_async(pgconn):
     assert pgconn.status == pq.ConnStatus.OK
     pgconn.exec_(b"select pg_terminate_backend(pg_backend_pid())")
     assert pgconn.status == pq.ConnStatus.BAD
@@ -153,7 +154,7 @@ def test_reset_async(pq, pgconn):
         pgconn.reset_poll()
 
 
-def test_ping(pq, dsn):
+def test_ping(dsn):
     rv = pq.PGconn.ping(dsn.encode("utf8"))
     assert rv == pq.Ping.OK
 
@@ -227,7 +228,7 @@ def test_tty(pgconn):
         pgconn.tty
 
 
-def test_transaction_status(pq, pgconn):
+def test_transaction_status(pgconn):
     assert pgconn.transaction_status == pq.TransactionStatus.IDLE
     pgconn.exec_(b"begin")
     assert pgconn.transaction_status == pq.TransactionStatus.INTRANS
@@ -239,7 +240,7 @@ def test_transaction_status(pq, pgconn):
     assert pgconn.transaction_status == pq.TransactionStatus.UNKNOWN
 
 
-def test_parameter_status(pq, dsn, monkeypatch):
+def test_parameter_status(dsn, monkeypatch):
     monkeypatch.setenv("PGAPPNAME", "psycopg3 tests")
     pgconn = pq.PGconn.connect(dsn.encode("utf8"))
     assert pgconn.parameter_status(b"application_name") == b"psycopg3 tests"
@@ -249,7 +250,7 @@ def test_parameter_status(pq, dsn, monkeypatch):
         pgconn.parameter_status(b"application_name")
 
 
-def test_encoding(pq, pgconn):
+def test_encoding(pgconn):
     res = pgconn.exec_(b"set client_encoding to latin1")
     assert res.status == pq.ExecStatus.COMMAND_OK
     assert pgconn.parameter_status(b"client_encoding") == b"LATIN1"
@@ -281,7 +282,7 @@ def test_server_version(pgconn):
         pgconn.server_version
 
 
-def test_error_message(pq, pgconn):
+def test_error_message(pgconn):
     assert pgconn.error_message == b""
     res = pgconn.exec_(b"wat")
     assert res.status == pq.ExecStatus.FATAL_ERROR
@@ -306,7 +307,7 @@ def test_needs_password(pgconn):
         pgconn.needs_password
 
 
-def test_used_password(pq, pgconn, dsn, monkeypatch):
+def test_used_password(pgconn, dsn, monkeypatch):
     assert isinstance(pgconn.used_password, bool)
 
     # Assume that if a password was passed then it was needed.
@@ -344,7 +345,7 @@ def test_ssl_in_use(pgconn):
         pgconn.ssl_in_use
 
 
-def test_make_empty_result(pq, pgconn):
+def test_make_empty_result(pgconn):
     pgconn.exec_(b"wat")
     res = pgconn.make_empty_result(pq.ExecStatus.FATAL_ERROR)
     assert res.status == pq.ExecStatus.FATAL_ERROR
@@ -356,7 +357,7 @@ def test_make_empty_result(pq, pgconn):
     assert res.error_message == b""
 
 
-def test_notice_nohandler(pq, pgconn):
+def test_notice_nohandler(pgconn):
     pgconn.exec_(b"set client_min_messages to notice")
     res = pgconn.exec_(
         b"do $$begin raise notice 'hello notice'; end$$ language plpgsql"
@@ -364,7 +365,7 @@ def test_notice_nohandler(pq, pgconn):
     assert res.status == pq.ExecStatus.COMMAND_OK
 
 
-def test_notice(pq, pgconn):
+def test_notice(pgconn):
     msgs = []
 
     def callback(res):
@@ -381,7 +382,7 @@ def test_notice(pq, pgconn):
     assert msgs and msgs[0] == b"hello notice"
 
 
-def test_notice_error(pq, pgconn, caplog):
+def test_notice_error(pgconn, caplog):
     caplog.set_level(logging.WARNING, logger="psycopg3")
 
     def callback(res):
index d3b70ed0bd0110e5bb050c10f14c51cdc14777f0..69b68eda09f51f3afe9665c3a7805404d0838800 100644 (file)
@@ -1,6 +1,8 @@
 import ctypes
 import pytest
 
+from psycopg3 import pq
+
 
 @pytest.mark.parametrize(
     "command, status",
@@ -11,12 +13,12 @@ import pytest
         (b"wat", "FATAL_ERROR"),
     ],
 )
-def test_status(pq, pgconn, command, status):
+def test_status(pgconn, command, status):
     res = pgconn.exec_(command)
     assert res.status == getattr(pq.ExecStatus, status)
 
 
-def test_clear(pq, pgconn):
+def test_clear(pgconn):
     res = pgconn.exec_(b"select 1")
     assert res.status == pq.ExecStatus.TUPLES_OK
     res.clear()
@@ -47,7 +49,7 @@ def test_error_message(pgconn):
     assert res.error_message == b""
 
 
-def test_error_field(pq, pgconn):
+def test_error_field(pgconn):
     res = pgconn.exec_(b"select wat")
     assert res.error_field(pq.DiagnosticField.SEVERITY) == b"ERROR"
     assert res.error_field(pq.DiagnosticField.SQLSTATE) == b"42703"
@@ -85,7 +87,7 @@ def test_fname(pgconn):
     assert res.fname(0) is None
 
 
-def test_ftable_and_col(pq, pgconn):
+def test_ftable_and_col(pgconn):
     res = pgconn.exec_(
         b"""
         drop table if exists t1, t2;
@@ -110,7 +112,7 @@ def test_ftable_and_col(pq, pgconn):
 
 
 @pytest.mark.parametrize("fmt", (0, 1))
-def test_fformat(pq, pgconn, fmt):
+def test_fformat(pgconn, fmt):
     res = pgconn.exec_params(b"select 1", [], result_format=fmt)
     assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
     assert res.fformat(0) == fmt
@@ -120,7 +122,7 @@ def test_fformat(pq, pgconn, fmt):
     assert res.binary_tuples == 0
 
 
-def test_ftype(pq, pgconn):
+def test_ftype(pgconn):
     res = pgconn.exec_(b"select 1::int, 1::numeric, 1::text")
     assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
     assert res.ftype(0) == 23
@@ -130,7 +132,7 @@ def test_ftype(pq, pgconn):
     assert res.ftype(0) == 0
 
 
-def test_fmod(pq, pgconn):
+def test_fmod(pgconn):
     res = pgconn.exec_(b"select 1::int, 1::numeric(10), 1::numeric(10,2)")
     assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
     assert res.fmod(0) == -1
@@ -140,7 +142,7 @@ def test_fmod(pq, pgconn):
     assert res.fmod(0) == 0
 
 
-def test_fsize(pq, pgconn):
+def test_fsize(pgconn):
     res = pgconn.exec_(b"select 1::int, 1::bigint, 1::text")
     assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
     assert res.fsize(0) == 4
@@ -150,7 +152,7 @@ def test_fsize(pq, pgconn):
     assert res.fsize(0) == 0
 
 
-def test_get_value(pq, pgconn):
+def test_get_value(pgconn):
     res = pgconn.exec_(b"select 'a', '', NULL")
     assert res.status == pq.ExecStatus.TUPLES_OK, res.error_message
     assert res.get_value(0, 0) == b"a"
@@ -160,7 +162,7 @@ def test_get_value(pq, pgconn):
     assert res.get_value(0, 0) is None
 
 
-def test_nparams_types(pq, pgconn):
+def test_nparams_types(pgconn):
     res = pgconn.prepare(b"", b"select $1::int, $2::text")
     assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
 
@@ -176,7 +178,7 @@ def test_nparams_types(pq, pgconn):
     assert res.param_type(0) == 0
 
 
-def test_command_status(pq, pgconn):
+def test_command_status(pgconn):
     res = pgconn.exec_(b"select 1")
     assert res.command_status == b"SELECT 1"
     res = pgconn.exec_(b"set timezone to utf8")
@@ -185,7 +187,7 @@ def test_command_status(pq, pgconn):
     assert res.command_status is None
 
 
-def test_command_tuples(pq, pgconn):
+def test_command_tuples(pgconn):
     res = pgconn.exec_(b"set timezone to utf8")
     assert res.command_tuples is None
     res = pgconn.exec_(b"select * from generate_series(1, 10)")
@@ -194,7 +196,7 @@ def test_command_tuples(pq, pgconn):
     assert res.command_tuples is None
 
 
-def test_oid_value(pq, pgconn):
+def test_oid_value(pgconn):
     res = pgconn.exec_(b"select 1")
     assert res.oid_value == 0
     res.clear()
index 0978496b60d50469c14ce06fbdf81e6dafe62959..0411bf2f950a29958ae605f9b669997799fb9284 100644 (file)
@@ -1,4 +1,7 @@
-def test_version(pq):
+from psycopg3 import pq
+
+
+def test_version():
     rv = pq.version()
     assert rv > 90500
     assert rv < 200000  # you are good for a while
index ee76f7e4f359c25496ffd27e778d5a68a62b01e0..c6429cbe51cee99a8bca38cef292f04506b2101e 100644 (file)
@@ -1,4 +1,5 @@
 import pytest
+from psycopg3 import pq
 from psycopg3 import errors as e
 
 eur = "\u20ac"
@@ -15,7 +16,7 @@ def test_error_diag(conn):
     assert diag.severity == "ERROR"
 
 
-def test_diag_all_attrs(pgconn, pq):
+def test_diag_all_attrs(pgconn):
     res = pgconn.make_empty_result(pq.ExecStatus.NONFATAL_ERROR)
     diag = e.Diagnostic(res)
     for d in pq.DiagnosticField:
@@ -23,7 +24,7 @@ def test_diag_all_attrs(pgconn, pq):
         assert val is None or isinstance(val, str)
 
 
-def test_diag_right_attr(pgconn, pq, monkeypatch):
+def test_diag_right_attr(pgconn, monkeypatch):
     res = pgconn.make_empty_result(pq.ExecStatus.NONFATAL_ERROR)
     diag = e.Diagnostic(res)