From: Daniele Varrazzo Date: Sat, 6 Jan 2024 14:50:14 +0000 (+0100) Subject: refactor: separate ConnectionInfo object from conninfo module X-Git-Tag: 3.1.17~3^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3e02cbdf666a59e034c1128f20b85cdc1271e77c;p=thirdparty%2Fpsycopg.git refactor: separate ConnectionInfo object from conninfo module It was there just because the name is similar, but its purpose is entirely different. --- diff --git a/psycopg/psycopg/__init__.py b/psycopg/psycopg/__init__.py index baadf30c3..25d639964 100644 --- a/psycopg/psycopg/__init__.py +++ b/psycopg/psycopg/__init__.py @@ -17,13 +17,13 @@ from .errors import Warning, Error, InterfaceError, DatabaseError from .errors import DataError, OperationalError, IntegrityError from .errors import InternalError, ProgrammingError, NotSupportedError from ._column import Column -from .conninfo import ConnectionInfo from ._pipeline import Pipeline, AsyncPipeline from .connection import BaseConnection, Connection, Notify from .transaction import Rollback, Transaction, AsyncTransaction from .cursor_async import AsyncCursor from .server_cursor import AsyncServerCursor, ServerCursor from .client_cursor import AsyncClientCursor, ClientCursor +from ._connection_info import ConnectionInfo from .connection_async import AsyncConnection from . import dbapi20 diff --git a/psycopg/psycopg/_connection_info.py b/psycopg/psycopg/_connection_info.py new file mode 100644 index 000000000..79a781cb0 --- /dev/null +++ b/psycopg/psycopg/_connection_info.py @@ -0,0 +1,174 @@ +""" +Objects to return information about a PostgreSQL connection. +""" + +# Copyright (C) 2020 The Psycopg Team + +from __future__ import annotations + +from pathlib import Path +from datetime import tzinfo + +from . import pq +from ._tz import get_tzinfo +from ._encodings import pgconn_encoding +from .conninfo import make_conninfo + + +class ConnectionInfo: + """Allow access to information about the connection.""" + + __module__ = "psycopg" + + def __init__(self, pgconn: pq.abc.PGconn): + self.pgconn = pgconn + + @property + def vendor(self) -> str: + """A string representing the database vendor connected to.""" + return "PostgreSQL" + + @property + def host(self) -> str: + """The server host name of the active connection. See :pq:`PQhost()`.""" + return self._get_pgconn_attr("host") + + @property + def hostaddr(self) -> str: + """The server IP address of the connection. See :pq:`PQhostaddr()`.""" + return self._get_pgconn_attr("hostaddr") + + @property + def port(self) -> int: + """The port of the active connection. See :pq:`PQport()`.""" + return int(self._get_pgconn_attr("port")) + + @property + def dbname(self) -> str: + """The database name of the connection. See :pq:`PQdb()`.""" + return self._get_pgconn_attr("db") + + @property + def user(self) -> str: + """The user name of the connection. See :pq:`PQuser()`.""" + return self._get_pgconn_attr("user") + + @property + def password(self) -> str: + """The password of the connection. See :pq:`PQpass()`.""" + return self._get_pgconn_attr("password") + + @property + def options(self) -> str: + """ + The command-line options passed in the connection request. + See :pq:`PQoptions`. + """ + return self._get_pgconn_attr("options") + + def get_parameters(self) -> dict[str, str]: + """Return the connection parameters values. + + Return all the parameters set to a non-default value, which might come + either from the connection string and parameters passed to + `~Connection.connect()` or from environment variables. The password + is never returned (you can read it using the `password` attribute). + """ + pyenc = self.encoding + + # Get the known defaults to avoid reporting them + defaults = { + i.keyword: i.compiled + for i in pq.Conninfo.get_defaults() + if i.compiled is not None + } + # Not returned by the libq. Bug? Bet we're using SSH. + defaults.setdefault(b"channel_binding", b"prefer") + defaults[b"passfile"] = str(Path.home() / ".pgpass").encode() + + return { + i.keyword.decode(pyenc): i.val.decode(pyenc) + for i in self.pgconn.info + if i.val is not None + and i.keyword != b"password" + and i.val != defaults.get(i.keyword) + } + + @property + def dsn(self) -> str: + """Return the connection string to connect to the database. + + The string contains all the parameters set to a non-default value, + which might come either from the connection string and parameters + passed to `~Connection.connect()` or from environment variables. The + password is never returned (you can read it using the `password` + attribute). + """ + return make_conninfo(**self.get_parameters()) + + @property + def status(self) -> pq.ConnStatus: + """The status of the connection. See :pq:`PQstatus()`.""" + return pq.ConnStatus(self.pgconn.status) + + @property + def transaction_status(self) -> pq.TransactionStatus: + """ + The current in-transaction status of the session. + See :pq:`PQtransactionStatus()`. + """ + return pq.TransactionStatus(self.pgconn.transaction_status) + + @property + def pipeline_status(self) -> pq.PipelineStatus: + """ + The current pipeline status of the client. + See :pq:`PQpipelineStatus()`. + """ + return pq.PipelineStatus(self.pgconn.pipeline_status) + + def parameter_status(self, param_name: str) -> str | None: + """ + Return a parameter setting of the connection. + + Return `None` is the parameter is unknown. + """ + res = self.pgconn.parameter_status(param_name.encode(self.encoding)) + return res.decode(self.encoding) if res is not None else None + + @property + def server_version(self) -> int: + """ + An integer representing the server version. See :pq:`PQserverVersion()`. + """ + return self.pgconn.server_version + + @property + def backend_pid(self) -> int: + """ + The process ID (PID) of the backend process handling this connection. + See :pq:`PQbackendPID()`. + """ + return self.pgconn.backend_pid + + @property + def error_message(self) -> str: + """ + The error message most recently generated by an operation on the connection. + See :pq:`PQerrorMessage()`. + """ + return self._get_pgconn_attr("error_message") + + @property + def timezone(self) -> tzinfo: + """The Python timezone info of the connection's timezone.""" + return get_tzinfo(self.pgconn) + + @property + def encoding(self) -> str: + """The Python codec name of the connection's client encoding.""" + return pgconn_encoding(self.pgconn) + + def _get_pgconn_attr(self, name: str) -> str: + value: bytes = getattr(self.pgconn, name) + return value.decode(self.encoding) diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index ef49d0e88..560945b00 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -30,7 +30,7 @@ from ._enums import IsolationLevel from .cursor import Cursor from ._compat import LiteralString, Self from .pq.misc import connection_summary -from .conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo +from .conninfo import make_conninfo, conninfo_to_dict from .conninfo import conninfo_attempts, ConnDict, timeout_from_conninfo from ._pipeline import BasePipeline, Pipeline from .generators import notifies, connect, execute @@ -38,6 +38,7 @@ from ._encodings import pgconn_encoding from ._preparing import PrepareManager from .transaction import Transaction from .server_cursor import ServerCursor +from ._connection_info import ConnectionInfo if TYPE_CHECKING: from .pq.abc import PGconn, PGresult diff --git a/psycopg/psycopg/conninfo.py b/psycopg/psycopg/conninfo.py index c57a6d9c0..9044550cb 100644 --- a/psycopg/psycopg/conninfo.py +++ b/psycopg/psycopg/conninfo.py @@ -13,8 +13,6 @@ import asyncio import logging from typing import Any from random import shuffle -from pathlib import Path -from datetime import tzinfo from functools import lru_cache from ipaddress import ip_address from dataclasses import dataclass @@ -22,8 +20,6 @@ from typing_extensions import TypeAlias from . import pq from . import errors as e -from ._tz import get_tzinfo -from ._encodings import pgconn_encoding ConnDict: TypeAlias = "dict[str, Any]" @@ -131,165 +127,6 @@ def _param_escape(s: str) -> str: return s -class ConnectionInfo: - """Allow access to information about the connection.""" - - __module__ = "psycopg" - - def __init__(self, pgconn: pq.abc.PGconn): - self.pgconn = pgconn - - @property - def vendor(self) -> str: - """A string representing the database vendor connected to.""" - return "PostgreSQL" - - @property - def host(self) -> str: - """The server host name of the active connection. See :pq:`PQhost()`.""" - return self._get_pgconn_attr("host") - - @property - def hostaddr(self) -> str: - """The server IP address of the connection. See :pq:`PQhostaddr()`.""" - return self._get_pgconn_attr("hostaddr") - - @property - def port(self) -> int: - """The port of the active connection. See :pq:`PQport()`.""" - return int(self._get_pgconn_attr("port")) - - @property - def dbname(self) -> str: - """The database name of the connection. See :pq:`PQdb()`.""" - return self._get_pgconn_attr("db") - - @property - def user(self) -> str: - """The user name of the connection. See :pq:`PQuser()`.""" - return self._get_pgconn_attr("user") - - @property - def password(self) -> str: - """The password of the connection. See :pq:`PQpass()`.""" - return self._get_pgconn_attr("password") - - @property - def options(self) -> str: - """ - The command-line options passed in the connection request. - See :pq:`PQoptions`. - """ - return self._get_pgconn_attr("options") - - def get_parameters(self) -> dict[str, str]: - """Return the connection parameters values. - - Return all the parameters set to a non-default value, which might come - either from the connection string and parameters passed to - `~Connection.connect()` or from environment variables. The password - is never returned (you can read it using the `password` attribute). - """ - pyenc = self.encoding - - # Get the known defaults to avoid reporting them - defaults = { - i.keyword: i.compiled - for i in pq.Conninfo.get_defaults() - if i.compiled is not None - } - # Not returned by the libq. Bug? Bet we're using SSH. - defaults.setdefault(b"channel_binding", b"prefer") - defaults[b"passfile"] = str(Path.home() / ".pgpass").encode() - - return { - i.keyword.decode(pyenc): i.val.decode(pyenc) - for i in self.pgconn.info - if i.val is not None - and i.keyword != b"password" - and i.val != defaults.get(i.keyword) - } - - @property - def dsn(self) -> str: - """Return the connection string to connect to the database. - - The string contains all the parameters set to a non-default value, - which might come either from the connection string and parameters - passed to `~Connection.connect()` or from environment variables. The - password is never returned (you can read it using the `password` - attribute). - """ - return make_conninfo(**self.get_parameters()) - - @property - def status(self) -> pq.ConnStatus: - """The status of the connection. See :pq:`PQstatus()`.""" - return pq.ConnStatus(self.pgconn.status) - - @property - def transaction_status(self) -> pq.TransactionStatus: - """ - The current in-transaction status of the session. - See :pq:`PQtransactionStatus()`. - """ - return pq.TransactionStatus(self.pgconn.transaction_status) - - @property - def pipeline_status(self) -> pq.PipelineStatus: - """ - The current pipeline status of the client. - See :pq:`PQpipelineStatus()`. - """ - return pq.PipelineStatus(self.pgconn.pipeline_status) - - def parameter_status(self, param_name: str) -> str | None: - """ - Return a parameter setting of the connection. - - Return `None` is the parameter is unknown. - """ - res = self.pgconn.parameter_status(param_name.encode(self.encoding)) - return res.decode(self.encoding) if res is not None else None - - @property - def server_version(self) -> int: - """ - An integer representing the server version. See :pq:`PQserverVersion()`. - """ - return self.pgconn.server_version - - @property - def backend_pid(self) -> int: - """ - The process ID (PID) of the backend process handling this connection. - See :pq:`PQbackendPID()`. - """ - return self.pgconn.backend_pid - - @property - def error_message(self) -> str: - """ - The error message most recently generated by an operation on the connection. - See :pq:`PQerrorMessage()`. - """ - return self._get_pgconn_attr("error_message") - - @property - def timezone(self) -> tzinfo: - """The Python timezone info of the connection's timezone.""" - return get_tzinfo(self.pgconn) - - @property - def encoding(self) -> str: - """The Python codec name of the connection's client encoding.""" - return pgconn_encoding(self.pgconn) - - def _get_pgconn_attr(self, name: str) -> str: - value: bytes = getattr(self.pgconn, name) - return value.decode(self.encoding) - - def conninfo_attempts(params: ConnDict) -> list[ConnDict]: """Split a set of connection params on the single attempts to perform. diff --git a/psycopg/psycopg/crdb/connection.py b/psycopg/psycopg/crdb/connection.py index 49b7d5ffa..07c3837dc 100644 --- a/psycopg/psycopg/crdb/connection.py +++ b/psycopg/psycopg/crdb/connection.py @@ -11,9 +11,9 @@ from .. import errors as e from ..abc import AdaptContext from ..rows import Row, RowFactory, AsyncRowFactory, TupleRow from .._compat import Self -from ..conninfo import ConnectionInfo from ..connection import Connection from .._adapters_map import AdaptersMap +from .._connection_info import ConnectionInfo from ..connection_async import AsyncConnection from ._types import adapters diff --git a/tests/test_connection_info.py b/tests/test_connection_info.py new file mode 100644 index 000000000..956a3c4fa --- /dev/null +++ b/tests/test_connection_info.py @@ -0,0 +1,252 @@ +import datetime as dt + +import pytest + +import psycopg +from psycopg.conninfo import make_conninfo, conninfo_to_dict +from psycopg._encodings import pg2pyenc + +from .fix_crdb import crdb_encoding + + +@pytest.mark.parametrize( + "attr", + [("dbname", "db"), "host", "hostaddr", "user", "password", "options"], +) +def test_attrs(conn, attr): + if isinstance(attr, tuple): + info_attr, pgconn_attr = attr + else: + info_attr = pgconn_attr = attr + + if info_attr == "hostaddr" and psycopg.pq.version() < 120000: + pytest.skip("hostaddr not supported on libpq < 12") + + info_val = getattr(conn.info, info_attr) + pgconn_val = getattr(conn.pgconn, pgconn_attr).decode() + assert info_val == pgconn_val + + conn.close() + with pytest.raises(psycopg.OperationalError): + getattr(conn.info, info_attr) + + +@pytest.mark.libpq("< 12") +def test_hostaddr_not_supported(conn): + with pytest.raises(psycopg.NotSupportedError): + conn.info.hostaddr + + +def test_port(conn): + assert conn.info.port == int(conn.pgconn.port.decode()) + conn.close() + with pytest.raises(psycopg.OperationalError): + conn.info.port + + +def test_get_params(conn, dsn): + info = conn.info.get_parameters() + for k, v in conninfo_to_dict(dsn).items(): + if k != "password": + assert info.get(k) == v + else: + assert k not in info + + +def test_dsn(conn, dsn): + dsn = conn.info.dsn + assert "password" not in dsn + for k, v in conninfo_to_dict(dsn).items(): + if k != "password": + assert f"{k}=" in dsn + + +def test_get_params_env(conn_cls, dsn, monkeypatch): + dsn = conninfo_to_dict(dsn) + dsn.pop("application_name", None) + + monkeypatch.delenv("PGAPPNAME", raising=False) + with conn_cls.connect(**dsn) as conn: + assert "application_name" not in conn.info.get_parameters() + + monkeypatch.setenv("PGAPPNAME", "hello test") + with conn_cls.connect(**dsn) as conn: + assert conn.info.get_parameters()["application_name"] == "hello test" + + +def test_dsn_env(conn_cls, dsn, monkeypatch): + dsn = conninfo_to_dict(dsn) + dsn.pop("application_name", None) + + monkeypatch.delenv("PGAPPNAME", raising=False) + with conn_cls.connect(**dsn) as conn: + assert "application_name=" not in conn.info.dsn + + monkeypatch.setenv("PGAPPNAME", "hello test") + with conn_cls.connect(**dsn) as conn: + assert "application_name='hello test'" in conn.info.dsn + + +def test_status(conn): + assert conn.info.status.name == "OK" + conn.close() + assert conn.info.status.name == "BAD" + + +def test_transaction_status(conn): + assert conn.info.transaction_status.name == "IDLE" + conn.close() + assert conn.info.transaction_status.name == "UNKNOWN" + + +@pytest.mark.pipeline +def test_pipeline_status(conn): + assert not conn.info.pipeline_status + assert conn.info.pipeline_status.name == "OFF" + with conn.pipeline(): + assert conn.info.pipeline_status + assert conn.info.pipeline_status.name == "ON" + + +@pytest.mark.libpq("< 14") +def test_pipeline_status_no_pipeline(conn): + assert not conn.info.pipeline_status + assert conn.info.pipeline_status.name == "OFF" + + +def test_no_password(dsn): + dsn2 = make_conninfo(dsn, password="the-pass-word") + pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode()) + info = psycopg.ConnectionInfo(pgconn) + assert info.password == "the-pass-word" + assert "password" not in info.get_parameters() + assert info.get_parameters()["dbname"] == info.dbname + + +def test_dsn_no_password(dsn): + dsn2 = make_conninfo(dsn, password="the-pass-word") + pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode()) + info = psycopg.ConnectionInfo(pgconn) + assert info.password == "the-pass-word" + assert "password" not in info.dsn + assert f"dbname={info.dbname}" in info.dsn + + +def test_parameter_status(conn): + assert conn.info.parameter_status("nosuchparam") is None + tz = conn.info.parameter_status("TimeZone") + assert tz and isinstance(tz, str) + assert tz == conn.execute("show timezone").fetchone()[0] + + +@pytest.mark.crdb("skip") +def test_server_version(conn): + assert conn.info.server_version == conn.pgconn.server_version + + +def test_error_message(conn): + assert conn.info.error_message == "" + with pytest.raises(psycopg.ProgrammingError) as ex: + conn.execute("wat") + + assert conn.info.error_message + assert str(ex.value) in conn.info.error_message + assert ex.value.diag.severity in conn.info.error_message + + conn.close() + assert "NULL" in conn.info.error_message + + +@pytest.mark.crdb_skip("backend pid") +def test_backend_pid(conn): + assert conn.info.backend_pid + assert conn.info.backend_pid == conn.pgconn.backend_pid + conn.close() + with pytest.raises(psycopg.OperationalError): + conn.info.backend_pid + + +def test_timezone(conn): + conn.execute("set timezone to 'Europe/Rome'") + tz = conn.info.timezone + assert isinstance(tz, dt.tzinfo) + offset = tz.utcoffset(dt.datetime(2000, 1, 1)) + assert offset and offset.total_seconds() == 3600 + offset = tz.utcoffset(dt.datetime(2000, 7, 1)) + assert offset and offset.total_seconds() == 7200 + + +@pytest.mark.crdb("skip", reason="crdb doesn't allow invalid timezones") +def test_timezone_warn(conn, caplog): + conn.execute("set timezone to 'FOOBAR0'") + assert len(caplog.records) == 0 + tz = conn.info.timezone + assert tz == dt.timezone.utc + assert len(caplog.records) == 1 + assert "FOOBAR0" in caplog.records[0].message + + conn.info.timezone + assert len(caplog.records) == 1 + + conn.execute("set timezone to 'FOOBAAR0'") + assert len(caplog.records) == 1 + conn.info.timezone + assert len(caplog.records) == 2 + assert "FOOBAAR0" in caplog.records[1].message + + +def test_encoding(conn): + enc = conn.execute("show client_encoding").fetchone()[0] + assert conn.info.encoding == pg2pyenc(enc.encode()) + + +@pytest.mark.crdb("skip", reason="encoding not normalized") +@pytest.mark.parametrize( + "enc, out, codec", + [ + ("utf8", "UTF8", "utf-8"), + ("utf-8", "UTF8", "utf-8"), + ("utf_8", "UTF8", "utf-8"), + ("eucjp", "EUC_JP", "euc_jp"), + ("euc-jp", "EUC_JP", "euc_jp"), + ("latin9", "LATIN9", "iso8859-15"), + ], +) +def test_normalize_encoding(conn, enc, out, codec): + conn.execute("select set_config('client_encoding', %s, false)", [enc]) + assert conn.info.parameter_status("client_encoding") == out + assert conn.info.encoding == codec + + +@pytest.mark.parametrize( + "enc, out, codec", + [ + ("utf8", "UTF8", "utf-8"), + ("utf-8", "UTF8", "utf-8"), + ("utf_8", "UTF8", "utf-8"), + crdb_encoding("eucjp", "EUC_JP", "euc_jp"), + crdb_encoding("euc-jp", "EUC_JP", "euc_jp"), + ], +) +def test_encoding_env_var(conn_cls, dsn, monkeypatch, enc, out, codec): + monkeypatch.setenv("PGCLIENTENCODING", enc) + with conn_cls.connect(dsn) as conn: + clienc = conn.info.parameter_status("client_encoding") + assert clienc + if conn.info.vendor == "PostgreSQL": + assert clienc == out + else: + assert clienc.replace("-", "").replace("_", "").upper() == out + assert conn.info.encoding == codec + + +@pytest.mark.crdb_skip("encoding") +def test_set_encoding_unsupported(conn): + cur = conn.cursor() + cur.execute("set client_encoding to EUC_TW") + with pytest.raises(psycopg.NotSupportedError): + cur.execute("select 'x'") + + +def test_vendor(conn): + assert conn.info.vendor diff --git a/tests/test_conninfo.py b/tests/test_conninfo.py index d9888d3bd..ae892ed3c 100644 --- a/tests/test_conninfo.py +++ b/tests/test_conninfo.py @@ -1,17 +1,13 @@ import socket import asyncio -import datetime as dt import pytest import psycopg from psycopg import ProgrammingError -from psycopg.conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo +from psycopg.conninfo import make_conninfo, conninfo_to_dict from psycopg.conninfo import conninfo_attempts, conninfo_attempts_async from psycopg.conninfo import timeout_from_conninfo, _DEFAULT_CONNECT_TIMEOUT -from psycopg._encodings import pg2pyenc - -from .fix_crdb import crdb_encoding snowman = "\u2603" @@ -96,227 +92,6 @@ def test_no_munging(): assert dsnin == dsnout -class TestConnectionInfo: - @pytest.mark.parametrize( - "attr", - [("dbname", "db"), "host", "hostaddr", "user", "password", "options"], - ) - def test_attrs(self, conn, attr): - if isinstance(attr, tuple): - info_attr, pgconn_attr = attr - else: - info_attr = pgconn_attr = attr - - if info_attr == "hostaddr" and psycopg.pq.version() < 120000: - pytest.skip("hostaddr not supported on libpq < 12") - - info_val = getattr(conn.info, info_attr) - pgconn_val = getattr(conn.pgconn, pgconn_attr).decode() - assert info_val == pgconn_val - - conn.close() - with pytest.raises(psycopg.OperationalError): - getattr(conn.info, info_attr) - - @pytest.mark.libpq("< 12") - def test_hostaddr_not_supported(self, conn): - with pytest.raises(psycopg.NotSupportedError): - conn.info.hostaddr - - def test_port(self, conn): - assert conn.info.port == int(conn.pgconn.port.decode()) - conn.close() - with pytest.raises(psycopg.OperationalError): - conn.info.port - - def test_get_params(self, conn, dsn): - info = conn.info.get_parameters() - for k, v in conninfo_to_dict(dsn).items(): - if k != "password": - assert info.get(k) == v - else: - assert k not in info - - def test_dsn(self, conn, dsn): - dsn = conn.info.dsn - assert "password" not in dsn - for k, v in conninfo_to_dict(dsn).items(): - if k != "password": - assert f"{k}=" in dsn - - def test_get_params_env(self, conn_cls, dsn, monkeypatch): - dsn = conninfo_to_dict(dsn) - dsn.pop("application_name", None) - - monkeypatch.delenv("PGAPPNAME", raising=False) - with conn_cls.connect(**dsn) as conn: - assert "application_name" not in conn.info.get_parameters() - - monkeypatch.setenv("PGAPPNAME", "hello test") - with conn_cls.connect(**dsn) as conn: - assert conn.info.get_parameters()["application_name"] == "hello test" - - def test_dsn_env(self, conn_cls, dsn, monkeypatch): - dsn = conninfo_to_dict(dsn) - dsn.pop("application_name", None) - - monkeypatch.delenv("PGAPPNAME", raising=False) - with conn_cls.connect(**dsn) as conn: - assert "application_name=" not in conn.info.dsn - - monkeypatch.setenv("PGAPPNAME", "hello test") - with conn_cls.connect(**dsn) as conn: - assert "application_name='hello test'" in conn.info.dsn - - def test_status(self, conn): - assert conn.info.status.name == "OK" - conn.close() - assert conn.info.status.name == "BAD" - - def test_transaction_status(self, conn): - assert conn.info.transaction_status.name == "IDLE" - conn.close() - assert conn.info.transaction_status.name == "UNKNOWN" - - @pytest.mark.pipeline - def test_pipeline_status(self, conn): - assert not conn.info.pipeline_status - assert conn.info.pipeline_status.name == "OFF" - with conn.pipeline(): - assert conn.info.pipeline_status - assert conn.info.pipeline_status.name == "ON" - - @pytest.mark.libpq("< 14") - def test_pipeline_status_no_pipeline(self, conn): - assert not conn.info.pipeline_status - assert conn.info.pipeline_status.name == "OFF" - - def test_no_password(self, dsn): - dsn2 = make_conninfo(dsn, password="the-pass-word") - pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode()) - info = ConnectionInfo(pgconn) - assert info.password == "the-pass-word" - assert "password" not in info.get_parameters() - assert info.get_parameters()["dbname"] == info.dbname - - def test_dsn_no_password(self, dsn): - dsn2 = make_conninfo(dsn, password="the-pass-word") - pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode()) - info = ConnectionInfo(pgconn) - assert info.password == "the-pass-word" - assert "password" not in info.dsn - assert f"dbname={info.dbname}" in info.dsn - - def test_parameter_status(self, conn): - assert conn.info.parameter_status("nosuchparam") is None - tz = conn.info.parameter_status("TimeZone") - assert tz and isinstance(tz, str) - assert tz == conn.execute("show timezone").fetchone()[0] - - @pytest.mark.crdb("skip") - def test_server_version(self, conn): - assert conn.info.server_version == conn.pgconn.server_version - - def test_error_message(self, conn): - assert conn.info.error_message == "" - with pytest.raises(psycopg.ProgrammingError) as ex: - conn.execute("wat") - - assert conn.info.error_message - assert str(ex.value) in conn.info.error_message - assert ex.value.diag.severity in conn.info.error_message - - conn.close() - assert "NULL" in conn.info.error_message - - @pytest.mark.crdb_skip("backend pid") - def test_backend_pid(self, conn): - assert conn.info.backend_pid - assert conn.info.backend_pid == conn.pgconn.backend_pid - conn.close() - with pytest.raises(psycopg.OperationalError): - conn.info.backend_pid - - def test_timezone(self, conn): - conn.execute("set timezone to 'Europe/Rome'") - tz = conn.info.timezone - assert isinstance(tz, dt.tzinfo) - offset = tz.utcoffset(dt.datetime(2000, 1, 1)) - assert offset and offset.total_seconds() == 3600 - offset = tz.utcoffset(dt.datetime(2000, 7, 1)) - assert offset and offset.total_seconds() == 7200 - - @pytest.mark.crdb("skip", reason="crdb doesn't allow invalid timezones") - def test_timezone_warn(self, conn, caplog): - conn.execute("set timezone to 'FOOBAR0'") - assert len(caplog.records) == 0 - tz = conn.info.timezone - assert tz == dt.timezone.utc - assert len(caplog.records) == 1 - assert "FOOBAR0" in caplog.records[0].message - - conn.info.timezone - assert len(caplog.records) == 1 - - conn.execute("set timezone to 'FOOBAAR0'") - assert len(caplog.records) == 1 - conn.info.timezone - assert len(caplog.records) == 2 - assert "FOOBAAR0" in caplog.records[1].message - - def test_encoding(self, conn): - enc = conn.execute("show client_encoding").fetchone()[0] - assert conn.info.encoding == pg2pyenc(enc.encode()) - - @pytest.mark.crdb("skip", reason="encoding not normalized") - @pytest.mark.parametrize( - "enc, out, codec", - [ - ("utf8", "UTF8", "utf-8"), - ("utf-8", "UTF8", "utf-8"), - ("utf_8", "UTF8", "utf-8"), - ("eucjp", "EUC_JP", "euc_jp"), - ("euc-jp", "EUC_JP", "euc_jp"), - ("latin9", "LATIN9", "iso8859-15"), - ], - ) - def test_normalize_encoding(self, conn, enc, out, codec): - conn.execute("select set_config('client_encoding', %s, false)", [enc]) - assert conn.info.parameter_status("client_encoding") == out - assert conn.info.encoding == codec - - @pytest.mark.parametrize( - "enc, out, codec", - [ - ("utf8", "UTF8", "utf-8"), - ("utf-8", "UTF8", "utf-8"), - ("utf_8", "UTF8", "utf-8"), - crdb_encoding("eucjp", "EUC_JP", "euc_jp"), - crdb_encoding("euc-jp", "EUC_JP", "euc_jp"), - ], - ) - def test_encoding_env_var(self, conn_cls, dsn, monkeypatch, enc, out, codec): - monkeypatch.setenv("PGCLIENTENCODING", enc) - with conn_cls.connect(dsn) as conn: - clienc = conn.info.parameter_status("client_encoding") - assert clienc - if conn.info.vendor == "PostgreSQL": - assert clienc == out - else: - assert clienc.replace("-", "").replace("_", "").upper() == out - assert conn.info.encoding == codec - - @pytest.mark.crdb_skip("encoding") - def test_set_encoding_unsupported(self, conn): - cur = conn.cursor() - cur.execute("set client_encoding to EUC_TW") - with pytest.raises(psycopg.NotSupportedError): - cur.execute("select 'x'") - - def test_vendor(self, conn): - assert conn.info.vendor - - @pytest.mark.parametrize( "conninfo, want, env", [