]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor: separate ConnectionInfo object from conninfo module
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 6 Jan 2024 14:50:14 +0000 (15:50 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 6 Jan 2024 19:21:43 +0000 (20:21 +0100)
It was there just because the name is similar, but its purpose is
entirely different.

psycopg/psycopg/__init__.py
psycopg/psycopg/_connection_info.py [new file with mode: 0644]
psycopg/psycopg/connection.py
psycopg/psycopg/conninfo.py
psycopg/psycopg/crdb/connection.py
tests/test_connection_info.py [new file with mode: 0644]
tests/test_conninfo.py

index baadf30c3885888a12bab4905bd03ebbc5ecf61b..25d63996431410c9393413ae34bb1128e09ed6d8 100644 (file)
@@ -17,13 +17,13 @@ from .errors import Warning, Error, InterfaceError, DatabaseError
 from .errors import DataError, OperationalError, IntegrityError
 from .errors import InternalError, ProgrammingError, NotSupportedError
 from ._column import Column
-from .conninfo import ConnectionInfo
 from ._pipeline import Pipeline, AsyncPipeline
 from .connection import BaseConnection, Connection, Notify
 from .transaction import Rollback, Transaction, AsyncTransaction
 from .cursor_async import AsyncCursor
 from .server_cursor import AsyncServerCursor, ServerCursor
 from .client_cursor import AsyncClientCursor, ClientCursor
+from ._connection_info import ConnectionInfo
 from .connection_async import AsyncConnection
 
 from . import dbapi20
diff --git a/psycopg/psycopg/_connection_info.py b/psycopg/psycopg/_connection_info.py
new file mode 100644 (file)
index 0000000..79a781c
--- /dev/null
@@ -0,0 +1,174 @@
+"""
+Objects to return information about a PostgreSQL connection.
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
+from __future__ import annotations
+
+from pathlib import Path
+from datetime import tzinfo
+
+from . import pq
+from ._tz import get_tzinfo
+from ._encodings import pgconn_encoding
+from .conninfo import make_conninfo
+
+
+class ConnectionInfo:
+    """Allow access to information about the connection."""
+
+    __module__ = "psycopg"
+
+    def __init__(self, pgconn: pq.abc.PGconn):
+        self.pgconn = pgconn
+
+    @property
+    def vendor(self) -> str:
+        """A string representing the database vendor connected to."""
+        return "PostgreSQL"
+
+    @property
+    def host(self) -> str:
+        """The server host name of the active connection. See :pq:`PQhost()`."""
+        return self._get_pgconn_attr("host")
+
+    @property
+    def hostaddr(self) -> str:
+        """The server IP address of the connection. See :pq:`PQhostaddr()`."""
+        return self._get_pgconn_attr("hostaddr")
+
+    @property
+    def port(self) -> int:
+        """The port of the active connection. See :pq:`PQport()`."""
+        return int(self._get_pgconn_attr("port"))
+
+    @property
+    def dbname(self) -> str:
+        """The database name of the connection. See :pq:`PQdb()`."""
+        return self._get_pgconn_attr("db")
+
+    @property
+    def user(self) -> str:
+        """The user name of the connection. See :pq:`PQuser()`."""
+        return self._get_pgconn_attr("user")
+
+    @property
+    def password(self) -> str:
+        """The password of the connection. See :pq:`PQpass()`."""
+        return self._get_pgconn_attr("password")
+
+    @property
+    def options(self) -> str:
+        """
+        The command-line options passed in the connection request.
+        See :pq:`PQoptions`.
+        """
+        return self._get_pgconn_attr("options")
+
+    def get_parameters(self) -> dict[str, str]:
+        """Return the connection parameters values.
+
+        Return all the parameters set to a non-default value, which might come
+        either from the connection string and parameters passed to
+        `~Connection.connect()` or from environment variables. The password
+        is never returned (you can read it using the `password` attribute).
+        """
+        pyenc = self.encoding
+
+        # Get the known defaults to avoid reporting them
+        defaults = {
+            i.keyword: i.compiled
+            for i in pq.Conninfo.get_defaults()
+            if i.compiled is not None
+        }
+        # Not returned by the libq. Bug? Bet we're using SSH.
+        defaults.setdefault(b"channel_binding", b"prefer")
+        defaults[b"passfile"] = str(Path.home() / ".pgpass").encode()
+
+        return {
+            i.keyword.decode(pyenc): i.val.decode(pyenc)
+            for i in self.pgconn.info
+            if i.val is not None
+            and i.keyword != b"password"
+            and i.val != defaults.get(i.keyword)
+        }
+
+    @property
+    def dsn(self) -> str:
+        """Return the connection string to connect to the database.
+
+        The string contains all the parameters set to a non-default value,
+        which might come either from the connection string and parameters
+        passed to `~Connection.connect()` or from environment variables. The
+        password is never returned (you can read it using the `password`
+        attribute).
+        """
+        return make_conninfo(**self.get_parameters())
+
+    @property
+    def status(self) -> pq.ConnStatus:
+        """The status of the connection. See :pq:`PQstatus()`."""
+        return pq.ConnStatus(self.pgconn.status)
+
+    @property
+    def transaction_status(self) -> pq.TransactionStatus:
+        """
+        The current in-transaction status of the session.
+        See :pq:`PQtransactionStatus()`.
+        """
+        return pq.TransactionStatus(self.pgconn.transaction_status)
+
+    @property
+    def pipeline_status(self) -> pq.PipelineStatus:
+        """
+        The current pipeline status of the client.
+        See :pq:`PQpipelineStatus()`.
+        """
+        return pq.PipelineStatus(self.pgconn.pipeline_status)
+
+    def parameter_status(self, param_name: str) -> str | None:
+        """
+        Return a parameter setting of the connection.
+
+        Return `None` is the parameter is unknown.
+        """
+        res = self.pgconn.parameter_status(param_name.encode(self.encoding))
+        return res.decode(self.encoding) if res is not None else None
+
+    @property
+    def server_version(self) -> int:
+        """
+        An integer representing the server version. See :pq:`PQserverVersion()`.
+        """
+        return self.pgconn.server_version
+
+    @property
+    def backend_pid(self) -> int:
+        """
+        The process ID (PID) of the backend process handling this connection.
+        See :pq:`PQbackendPID()`.
+        """
+        return self.pgconn.backend_pid
+
+    @property
+    def error_message(self) -> str:
+        """
+        The error message most recently generated by an operation on the connection.
+        See :pq:`PQerrorMessage()`.
+        """
+        return self._get_pgconn_attr("error_message")
+
+    @property
+    def timezone(self) -> tzinfo:
+        """The Python timezone info of the connection's timezone."""
+        return get_tzinfo(self.pgconn)
+
+    @property
+    def encoding(self) -> str:
+        """The Python codec name of the connection's client encoding."""
+        return pgconn_encoding(self.pgconn)
+
+    def _get_pgconn_attr(self, name: str) -> str:
+        value: bytes = getattr(self.pgconn, name)
+        return value.decode(self.encoding)
index ef49d0e88b5875c748377ee2efefdf48a4d3ac40..560945b002e287901dce8a771292cacd8ed45a41 100644 (file)
@@ -30,7 +30,7 @@ from ._enums import IsolationLevel
 from .cursor import Cursor
 from ._compat import LiteralString, Self
 from .pq.misc import connection_summary
-from .conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo
+from .conninfo import make_conninfo, conninfo_to_dict
 from .conninfo import conninfo_attempts, ConnDict, timeout_from_conninfo
 from ._pipeline import BasePipeline, Pipeline
 from .generators import notifies, connect, execute
@@ -38,6 +38,7 @@ from ._encodings import pgconn_encoding
 from ._preparing import PrepareManager
 from .transaction import Transaction
 from .server_cursor import ServerCursor
+from ._connection_info import ConnectionInfo
 
 if TYPE_CHECKING:
     from .pq.abc import PGconn, PGresult
index c57a6d9c095ff4d70fcd0fb528b36280788d306e..9044550cb555fc8e851fd0723a3071dcfd071842 100644 (file)
@@ -13,8 +13,6 @@ import asyncio
 import logging
 from typing import Any
 from random import shuffle
-from pathlib import Path
-from datetime import tzinfo
 from functools import lru_cache
 from ipaddress import ip_address
 from dataclasses import dataclass
@@ -22,8 +20,6 @@ from typing_extensions import TypeAlias
 
 from . import pq
 from . import errors as e
-from ._tz import get_tzinfo
-from ._encodings import pgconn_encoding
 
 ConnDict: TypeAlias = "dict[str, Any]"
 
@@ -131,165 +127,6 @@ def _param_escape(s: str) -> str:
     return s
 
 
-class ConnectionInfo:
-    """Allow access to information about the connection."""
-
-    __module__ = "psycopg"
-
-    def __init__(self, pgconn: pq.abc.PGconn):
-        self.pgconn = pgconn
-
-    @property
-    def vendor(self) -> str:
-        """A string representing the database vendor connected to."""
-        return "PostgreSQL"
-
-    @property
-    def host(self) -> str:
-        """The server host name of the active connection. See :pq:`PQhost()`."""
-        return self._get_pgconn_attr("host")
-
-    @property
-    def hostaddr(self) -> str:
-        """The server IP address of the connection. See :pq:`PQhostaddr()`."""
-        return self._get_pgconn_attr("hostaddr")
-
-    @property
-    def port(self) -> int:
-        """The port of the active connection. See :pq:`PQport()`."""
-        return int(self._get_pgconn_attr("port"))
-
-    @property
-    def dbname(self) -> str:
-        """The database name of the connection. See :pq:`PQdb()`."""
-        return self._get_pgconn_attr("db")
-
-    @property
-    def user(self) -> str:
-        """The user name of the connection. See :pq:`PQuser()`."""
-        return self._get_pgconn_attr("user")
-
-    @property
-    def password(self) -> str:
-        """The password of the connection. See :pq:`PQpass()`."""
-        return self._get_pgconn_attr("password")
-
-    @property
-    def options(self) -> str:
-        """
-        The command-line options passed in the connection request.
-        See :pq:`PQoptions`.
-        """
-        return self._get_pgconn_attr("options")
-
-    def get_parameters(self) -> dict[str, str]:
-        """Return the connection parameters values.
-
-        Return all the parameters set to a non-default value, which might come
-        either from the connection string and parameters passed to
-        `~Connection.connect()` or from environment variables. The password
-        is never returned (you can read it using the `password` attribute).
-        """
-        pyenc = self.encoding
-
-        # Get the known defaults to avoid reporting them
-        defaults = {
-            i.keyword: i.compiled
-            for i in pq.Conninfo.get_defaults()
-            if i.compiled is not None
-        }
-        # Not returned by the libq. Bug? Bet we're using SSH.
-        defaults.setdefault(b"channel_binding", b"prefer")
-        defaults[b"passfile"] = str(Path.home() / ".pgpass").encode()
-
-        return {
-            i.keyword.decode(pyenc): i.val.decode(pyenc)
-            for i in self.pgconn.info
-            if i.val is not None
-            and i.keyword != b"password"
-            and i.val != defaults.get(i.keyword)
-        }
-
-    @property
-    def dsn(self) -> str:
-        """Return the connection string to connect to the database.
-
-        The string contains all the parameters set to a non-default value,
-        which might come either from the connection string and parameters
-        passed to `~Connection.connect()` or from environment variables. The
-        password is never returned (you can read it using the `password`
-        attribute).
-        """
-        return make_conninfo(**self.get_parameters())
-
-    @property
-    def status(self) -> pq.ConnStatus:
-        """The status of the connection. See :pq:`PQstatus()`."""
-        return pq.ConnStatus(self.pgconn.status)
-
-    @property
-    def transaction_status(self) -> pq.TransactionStatus:
-        """
-        The current in-transaction status of the session.
-        See :pq:`PQtransactionStatus()`.
-        """
-        return pq.TransactionStatus(self.pgconn.transaction_status)
-
-    @property
-    def pipeline_status(self) -> pq.PipelineStatus:
-        """
-        The current pipeline status of the client.
-        See :pq:`PQpipelineStatus()`.
-        """
-        return pq.PipelineStatus(self.pgconn.pipeline_status)
-
-    def parameter_status(self, param_name: str) -> str | None:
-        """
-        Return a parameter setting of the connection.
-
-        Return `None` is the parameter is unknown.
-        """
-        res = self.pgconn.parameter_status(param_name.encode(self.encoding))
-        return res.decode(self.encoding) if res is not None else None
-
-    @property
-    def server_version(self) -> int:
-        """
-        An integer representing the server version. See :pq:`PQserverVersion()`.
-        """
-        return self.pgconn.server_version
-
-    @property
-    def backend_pid(self) -> int:
-        """
-        The process ID (PID) of the backend process handling this connection.
-        See :pq:`PQbackendPID()`.
-        """
-        return self.pgconn.backend_pid
-
-    @property
-    def error_message(self) -> str:
-        """
-        The error message most recently generated by an operation on the connection.
-        See :pq:`PQerrorMessage()`.
-        """
-        return self._get_pgconn_attr("error_message")
-
-    @property
-    def timezone(self) -> tzinfo:
-        """The Python timezone info of the connection's timezone."""
-        return get_tzinfo(self.pgconn)
-
-    @property
-    def encoding(self) -> str:
-        """The Python codec name of the connection's client encoding."""
-        return pgconn_encoding(self.pgconn)
-
-    def _get_pgconn_attr(self, name: str) -> str:
-        value: bytes = getattr(self.pgconn, name)
-        return value.decode(self.encoding)
-
-
 def conninfo_attempts(params: ConnDict) -> list[ConnDict]:
     """Split a set of connection params on the single attempts to perform.
 
index 49b7d5ffa3a465feecac4f31f12efebcac9e009c..07c3837dc0c3b36161ba46d4e750ff8647474ca3 100644 (file)
@@ -11,9 +11,9 @@ from .. import errors as e
 from ..abc import AdaptContext
 from ..rows import Row, RowFactory, AsyncRowFactory, TupleRow
 from .._compat import Self
-from ..conninfo import ConnectionInfo
 from ..connection import Connection
 from .._adapters_map import AdaptersMap
+from .._connection_info import ConnectionInfo
 from ..connection_async import AsyncConnection
 from ._types import adapters
 
diff --git a/tests/test_connection_info.py b/tests/test_connection_info.py
new file mode 100644 (file)
index 0000000..956a3c4
--- /dev/null
@@ -0,0 +1,252 @@
+import datetime as dt
+
+import pytest
+
+import psycopg
+from psycopg.conninfo import make_conninfo, conninfo_to_dict
+from psycopg._encodings import pg2pyenc
+
+from .fix_crdb import crdb_encoding
+
+
+@pytest.mark.parametrize(
+    "attr",
+    [("dbname", "db"), "host", "hostaddr", "user", "password", "options"],
+)
+def test_attrs(conn, attr):
+    if isinstance(attr, tuple):
+        info_attr, pgconn_attr = attr
+    else:
+        info_attr = pgconn_attr = attr
+
+    if info_attr == "hostaddr" and psycopg.pq.version() < 120000:
+        pytest.skip("hostaddr not supported on libpq < 12")
+
+    info_val = getattr(conn.info, info_attr)
+    pgconn_val = getattr(conn.pgconn, pgconn_attr).decode()
+    assert info_val == pgconn_val
+
+    conn.close()
+    with pytest.raises(psycopg.OperationalError):
+        getattr(conn.info, info_attr)
+
+
+@pytest.mark.libpq("< 12")
+def test_hostaddr_not_supported(conn):
+    with pytest.raises(psycopg.NotSupportedError):
+        conn.info.hostaddr
+
+
+def test_port(conn):
+    assert conn.info.port == int(conn.pgconn.port.decode())
+    conn.close()
+    with pytest.raises(psycopg.OperationalError):
+        conn.info.port
+
+
+def test_get_params(conn, dsn):
+    info = conn.info.get_parameters()
+    for k, v in conninfo_to_dict(dsn).items():
+        if k != "password":
+            assert info.get(k) == v
+        else:
+            assert k not in info
+
+
+def test_dsn(conn, dsn):
+    dsn = conn.info.dsn
+    assert "password" not in dsn
+    for k, v in conninfo_to_dict(dsn).items():
+        if k != "password":
+            assert f"{k}=" in dsn
+
+
+def test_get_params_env(conn_cls, dsn, monkeypatch):
+    dsn = conninfo_to_dict(dsn)
+    dsn.pop("application_name", None)
+
+    monkeypatch.delenv("PGAPPNAME", raising=False)
+    with conn_cls.connect(**dsn) as conn:
+        assert "application_name" not in conn.info.get_parameters()
+
+    monkeypatch.setenv("PGAPPNAME", "hello test")
+    with conn_cls.connect(**dsn) as conn:
+        assert conn.info.get_parameters()["application_name"] == "hello test"
+
+
+def test_dsn_env(conn_cls, dsn, monkeypatch):
+    dsn = conninfo_to_dict(dsn)
+    dsn.pop("application_name", None)
+
+    monkeypatch.delenv("PGAPPNAME", raising=False)
+    with conn_cls.connect(**dsn) as conn:
+        assert "application_name=" not in conn.info.dsn
+
+    monkeypatch.setenv("PGAPPNAME", "hello test")
+    with conn_cls.connect(**dsn) as conn:
+        assert "application_name='hello test'" in conn.info.dsn
+
+
+def test_status(conn):
+    assert conn.info.status.name == "OK"
+    conn.close()
+    assert conn.info.status.name == "BAD"
+
+
+def test_transaction_status(conn):
+    assert conn.info.transaction_status.name == "IDLE"
+    conn.close()
+    assert conn.info.transaction_status.name == "UNKNOWN"
+
+
+@pytest.mark.pipeline
+def test_pipeline_status(conn):
+    assert not conn.info.pipeline_status
+    assert conn.info.pipeline_status.name == "OFF"
+    with conn.pipeline():
+        assert conn.info.pipeline_status
+        assert conn.info.pipeline_status.name == "ON"
+
+
+@pytest.mark.libpq("< 14")
+def test_pipeline_status_no_pipeline(conn):
+    assert not conn.info.pipeline_status
+    assert conn.info.pipeline_status.name == "OFF"
+
+
+def test_no_password(dsn):
+    dsn2 = make_conninfo(dsn, password="the-pass-word")
+    pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode())
+    info = psycopg.ConnectionInfo(pgconn)
+    assert info.password == "the-pass-word"
+    assert "password" not in info.get_parameters()
+    assert info.get_parameters()["dbname"] == info.dbname
+
+
+def test_dsn_no_password(dsn):
+    dsn2 = make_conninfo(dsn, password="the-pass-word")
+    pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode())
+    info = psycopg.ConnectionInfo(pgconn)
+    assert info.password == "the-pass-word"
+    assert "password" not in info.dsn
+    assert f"dbname={info.dbname}" in info.dsn
+
+
+def test_parameter_status(conn):
+    assert conn.info.parameter_status("nosuchparam") is None
+    tz = conn.info.parameter_status("TimeZone")
+    assert tz and isinstance(tz, str)
+    assert tz == conn.execute("show timezone").fetchone()[0]
+
+
+@pytest.mark.crdb("skip")
+def test_server_version(conn):
+    assert conn.info.server_version == conn.pgconn.server_version
+
+
+def test_error_message(conn):
+    assert conn.info.error_message == ""
+    with pytest.raises(psycopg.ProgrammingError) as ex:
+        conn.execute("wat")
+
+    assert conn.info.error_message
+    assert str(ex.value) in conn.info.error_message
+    assert ex.value.diag.severity in conn.info.error_message
+
+    conn.close()
+    assert "NULL" in conn.info.error_message
+
+
+@pytest.mark.crdb_skip("backend pid")
+def test_backend_pid(conn):
+    assert conn.info.backend_pid
+    assert conn.info.backend_pid == conn.pgconn.backend_pid
+    conn.close()
+    with pytest.raises(psycopg.OperationalError):
+        conn.info.backend_pid
+
+
+def test_timezone(conn):
+    conn.execute("set timezone to 'Europe/Rome'")
+    tz = conn.info.timezone
+    assert isinstance(tz, dt.tzinfo)
+    offset = tz.utcoffset(dt.datetime(2000, 1, 1))
+    assert offset and offset.total_seconds() == 3600
+    offset = tz.utcoffset(dt.datetime(2000, 7, 1))
+    assert offset and offset.total_seconds() == 7200
+
+
+@pytest.mark.crdb("skip", reason="crdb doesn't allow invalid timezones")
+def test_timezone_warn(conn, caplog):
+    conn.execute("set timezone to 'FOOBAR0'")
+    assert len(caplog.records) == 0
+    tz = conn.info.timezone
+    assert tz == dt.timezone.utc
+    assert len(caplog.records) == 1
+    assert "FOOBAR0" in caplog.records[0].message
+
+    conn.info.timezone
+    assert len(caplog.records) == 1
+
+    conn.execute("set timezone to 'FOOBAAR0'")
+    assert len(caplog.records) == 1
+    conn.info.timezone
+    assert len(caplog.records) == 2
+    assert "FOOBAAR0" in caplog.records[1].message
+
+
+def test_encoding(conn):
+    enc = conn.execute("show client_encoding").fetchone()[0]
+    assert conn.info.encoding == pg2pyenc(enc.encode())
+
+
+@pytest.mark.crdb("skip", reason="encoding not normalized")
+@pytest.mark.parametrize(
+    "enc, out, codec",
+    [
+        ("utf8", "UTF8", "utf-8"),
+        ("utf-8", "UTF8", "utf-8"),
+        ("utf_8", "UTF8", "utf-8"),
+        ("eucjp", "EUC_JP", "euc_jp"),
+        ("euc-jp", "EUC_JP", "euc_jp"),
+        ("latin9", "LATIN9", "iso8859-15"),
+    ],
+)
+def test_normalize_encoding(conn, enc, out, codec):
+    conn.execute("select set_config('client_encoding', %s, false)", [enc])
+    assert conn.info.parameter_status("client_encoding") == out
+    assert conn.info.encoding == codec
+
+
+@pytest.mark.parametrize(
+    "enc, out, codec",
+    [
+        ("utf8", "UTF8", "utf-8"),
+        ("utf-8", "UTF8", "utf-8"),
+        ("utf_8", "UTF8", "utf-8"),
+        crdb_encoding("eucjp", "EUC_JP", "euc_jp"),
+        crdb_encoding("euc-jp", "EUC_JP", "euc_jp"),
+    ],
+)
+def test_encoding_env_var(conn_cls, dsn, monkeypatch, enc, out, codec):
+    monkeypatch.setenv("PGCLIENTENCODING", enc)
+    with conn_cls.connect(dsn) as conn:
+        clienc = conn.info.parameter_status("client_encoding")
+        assert clienc
+        if conn.info.vendor == "PostgreSQL":
+            assert clienc == out
+        else:
+            assert clienc.replace("-", "").replace("_", "").upper() == out
+        assert conn.info.encoding == codec
+
+
+@pytest.mark.crdb_skip("encoding")
+def test_set_encoding_unsupported(conn):
+    cur = conn.cursor()
+    cur.execute("set client_encoding to EUC_TW")
+    with pytest.raises(psycopg.NotSupportedError):
+        cur.execute("select 'x'")
+
+
+def test_vendor(conn):
+    assert conn.info.vendor
index d9888d3bdd44fc11139aa1d0cac224b44efb71c9..ae892ed3c19d5dc1c5d55e930f1c7a0ebe84f1bf 100644 (file)
@@ -1,17 +1,13 @@
 import socket
 import asyncio
-import datetime as dt
 
 import pytest
 
 import psycopg
 from psycopg import ProgrammingError
-from psycopg.conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo
+from psycopg.conninfo import make_conninfo, conninfo_to_dict
 from psycopg.conninfo import conninfo_attempts, conninfo_attempts_async
 from psycopg.conninfo import timeout_from_conninfo, _DEFAULT_CONNECT_TIMEOUT
-from psycopg._encodings import pg2pyenc
-
-from .fix_crdb import crdb_encoding
 
 snowman = "\u2603"
 
@@ -96,227 +92,6 @@ def test_no_munging():
     assert dsnin == dsnout
 
 
-class TestConnectionInfo:
-    @pytest.mark.parametrize(
-        "attr",
-        [("dbname", "db"), "host", "hostaddr", "user", "password", "options"],
-    )
-    def test_attrs(self, conn, attr):
-        if isinstance(attr, tuple):
-            info_attr, pgconn_attr = attr
-        else:
-            info_attr = pgconn_attr = attr
-
-        if info_attr == "hostaddr" and psycopg.pq.version() < 120000:
-            pytest.skip("hostaddr not supported on libpq < 12")
-
-        info_val = getattr(conn.info, info_attr)
-        pgconn_val = getattr(conn.pgconn, pgconn_attr).decode()
-        assert info_val == pgconn_val
-
-        conn.close()
-        with pytest.raises(psycopg.OperationalError):
-            getattr(conn.info, info_attr)
-
-    @pytest.mark.libpq("< 12")
-    def test_hostaddr_not_supported(self, conn):
-        with pytest.raises(psycopg.NotSupportedError):
-            conn.info.hostaddr
-
-    def test_port(self, conn):
-        assert conn.info.port == int(conn.pgconn.port.decode())
-        conn.close()
-        with pytest.raises(psycopg.OperationalError):
-            conn.info.port
-
-    def test_get_params(self, conn, dsn):
-        info = conn.info.get_parameters()
-        for k, v in conninfo_to_dict(dsn).items():
-            if k != "password":
-                assert info.get(k) == v
-            else:
-                assert k not in info
-
-    def test_dsn(self, conn, dsn):
-        dsn = conn.info.dsn
-        assert "password" not in dsn
-        for k, v in conninfo_to_dict(dsn).items():
-            if k != "password":
-                assert f"{k}=" in dsn
-
-    def test_get_params_env(self, conn_cls, dsn, monkeypatch):
-        dsn = conninfo_to_dict(dsn)
-        dsn.pop("application_name", None)
-
-        monkeypatch.delenv("PGAPPNAME", raising=False)
-        with conn_cls.connect(**dsn) as conn:
-            assert "application_name" not in conn.info.get_parameters()
-
-        monkeypatch.setenv("PGAPPNAME", "hello test")
-        with conn_cls.connect(**dsn) as conn:
-            assert conn.info.get_parameters()["application_name"] == "hello test"
-
-    def test_dsn_env(self, conn_cls, dsn, monkeypatch):
-        dsn = conninfo_to_dict(dsn)
-        dsn.pop("application_name", None)
-
-        monkeypatch.delenv("PGAPPNAME", raising=False)
-        with conn_cls.connect(**dsn) as conn:
-            assert "application_name=" not in conn.info.dsn
-
-        monkeypatch.setenv("PGAPPNAME", "hello test")
-        with conn_cls.connect(**dsn) as conn:
-            assert "application_name='hello test'" in conn.info.dsn
-
-    def test_status(self, conn):
-        assert conn.info.status.name == "OK"
-        conn.close()
-        assert conn.info.status.name == "BAD"
-
-    def test_transaction_status(self, conn):
-        assert conn.info.transaction_status.name == "IDLE"
-        conn.close()
-        assert conn.info.transaction_status.name == "UNKNOWN"
-
-    @pytest.mark.pipeline
-    def test_pipeline_status(self, conn):
-        assert not conn.info.pipeline_status
-        assert conn.info.pipeline_status.name == "OFF"
-        with conn.pipeline():
-            assert conn.info.pipeline_status
-            assert conn.info.pipeline_status.name == "ON"
-
-    @pytest.mark.libpq("< 14")
-    def test_pipeline_status_no_pipeline(self, conn):
-        assert not conn.info.pipeline_status
-        assert conn.info.pipeline_status.name == "OFF"
-
-    def test_no_password(self, dsn):
-        dsn2 = make_conninfo(dsn, password="the-pass-word")
-        pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode())
-        info = ConnectionInfo(pgconn)
-        assert info.password == "the-pass-word"
-        assert "password" not in info.get_parameters()
-        assert info.get_parameters()["dbname"] == info.dbname
-
-    def test_dsn_no_password(self, dsn):
-        dsn2 = make_conninfo(dsn, password="the-pass-word")
-        pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode())
-        info = ConnectionInfo(pgconn)
-        assert info.password == "the-pass-word"
-        assert "password" not in info.dsn
-        assert f"dbname={info.dbname}" in info.dsn
-
-    def test_parameter_status(self, conn):
-        assert conn.info.parameter_status("nosuchparam") is None
-        tz = conn.info.parameter_status("TimeZone")
-        assert tz and isinstance(tz, str)
-        assert tz == conn.execute("show timezone").fetchone()[0]
-
-    @pytest.mark.crdb("skip")
-    def test_server_version(self, conn):
-        assert conn.info.server_version == conn.pgconn.server_version
-
-    def test_error_message(self, conn):
-        assert conn.info.error_message == ""
-        with pytest.raises(psycopg.ProgrammingError) as ex:
-            conn.execute("wat")
-
-        assert conn.info.error_message
-        assert str(ex.value) in conn.info.error_message
-        assert ex.value.diag.severity in conn.info.error_message
-
-        conn.close()
-        assert "NULL" in conn.info.error_message
-
-    @pytest.mark.crdb_skip("backend pid")
-    def test_backend_pid(self, conn):
-        assert conn.info.backend_pid
-        assert conn.info.backend_pid == conn.pgconn.backend_pid
-        conn.close()
-        with pytest.raises(psycopg.OperationalError):
-            conn.info.backend_pid
-
-    def test_timezone(self, conn):
-        conn.execute("set timezone to 'Europe/Rome'")
-        tz = conn.info.timezone
-        assert isinstance(tz, dt.tzinfo)
-        offset = tz.utcoffset(dt.datetime(2000, 1, 1))
-        assert offset and offset.total_seconds() == 3600
-        offset = tz.utcoffset(dt.datetime(2000, 7, 1))
-        assert offset and offset.total_seconds() == 7200
-
-    @pytest.mark.crdb("skip", reason="crdb doesn't allow invalid timezones")
-    def test_timezone_warn(self, conn, caplog):
-        conn.execute("set timezone to 'FOOBAR0'")
-        assert len(caplog.records) == 0
-        tz = conn.info.timezone
-        assert tz == dt.timezone.utc
-        assert len(caplog.records) == 1
-        assert "FOOBAR0" in caplog.records[0].message
-
-        conn.info.timezone
-        assert len(caplog.records) == 1
-
-        conn.execute("set timezone to 'FOOBAAR0'")
-        assert len(caplog.records) == 1
-        conn.info.timezone
-        assert len(caplog.records) == 2
-        assert "FOOBAAR0" in caplog.records[1].message
-
-    def test_encoding(self, conn):
-        enc = conn.execute("show client_encoding").fetchone()[0]
-        assert conn.info.encoding == pg2pyenc(enc.encode())
-
-    @pytest.mark.crdb("skip", reason="encoding not normalized")
-    @pytest.mark.parametrize(
-        "enc, out, codec",
-        [
-            ("utf8", "UTF8", "utf-8"),
-            ("utf-8", "UTF8", "utf-8"),
-            ("utf_8", "UTF8", "utf-8"),
-            ("eucjp", "EUC_JP", "euc_jp"),
-            ("euc-jp", "EUC_JP", "euc_jp"),
-            ("latin9", "LATIN9", "iso8859-15"),
-        ],
-    )
-    def test_normalize_encoding(self, conn, enc, out, codec):
-        conn.execute("select set_config('client_encoding', %s, false)", [enc])
-        assert conn.info.parameter_status("client_encoding") == out
-        assert conn.info.encoding == codec
-
-    @pytest.mark.parametrize(
-        "enc, out, codec",
-        [
-            ("utf8", "UTF8", "utf-8"),
-            ("utf-8", "UTF8", "utf-8"),
-            ("utf_8", "UTF8", "utf-8"),
-            crdb_encoding("eucjp", "EUC_JP", "euc_jp"),
-            crdb_encoding("euc-jp", "EUC_JP", "euc_jp"),
-        ],
-    )
-    def test_encoding_env_var(self, conn_cls, dsn, monkeypatch, enc, out, codec):
-        monkeypatch.setenv("PGCLIENTENCODING", enc)
-        with conn_cls.connect(dsn) as conn:
-            clienc = conn.info.parameter_status("client_encoding")
-            assert clienc
-            if conn.info.vendor == "PostgreSQL":
-                assert clienc == out
-            else:
-                assert clienc.replace("-", "").replace("_", "").upper() == out
-            assert conn.info.encoding == codec
-
-    @pytest.mark.crdb_skip("encoding")
-    def test_set_encoding_unsupported(self, conn):
-        cur = conn.cursor()
-        cur.execute("set client_encoding to EUC_TW")
-        with pytest.raises(psycopg.NotSupportedError):
-            cur.execute("select 'x'")
-
-    def test_vendor(self, conn):
-        assert conn.info.vendor
-
-
 @pytest.mark.parametrize(
     "conninfo, want, env",
     [