from .errors import DataError, OperationalError, IntegrityError
from .errors import InternalError, ProgrammingError, NotSupportedError
from ._column import Column
-from .conninfo import ConnectionInfo
from ._pipeline import Pipeline, AsyncPipeline
from .connection import BaseConnection, Connection, Notify
from .transaction import Rollback, Transaction, AsyncTransaction
from .cursor_async import AsyncCursor
from .server_cursor import AsyncServerCursor, ServerCursor
from .client_cursor import AsyncClientCursor, ClientCursor
+from ._connection_info import ConnectionInfo
from .connection_async import AsyncConnection
from . import dbapi20
--- /dev/null
+"""
+Objects to return information about a PostgreSQL connection.
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
+from __future__ import annotations
+
+from pathlib import Path
+from datetime import tzinfo
+
+from . import pq
+from ._tz import get_tzinfo
+from ._encodings import pgconn_encoding
+from .conninfo import make_conninfo
+
+
+class ConnectionInfo:
+ """Allow access to information about the connection."""
+
+ __module__ = "psycopg"
+
+ def __init__(self, pgconn: pq.abc.PGconn):
+ self.pgconn = pgconn
+
+ @property
+ def vendor(self) -> str:
+ """A string representing the database vendor connected to."""
+ return "PostgreSQL"
+
+ @property
+ def host(self) -> str:
+ """The server host name of the active connection. See :pq:`PQhost()`."""
+ return self._get_pgconn_attr("host")
+
+ @property
+ def hostaddr(self) -> str:
+ """The server IP address of the connection. See :pq:`PQhostaddr()`."""
+ return self._get_pgconn_attr("hostaddr")
+
+ @property
+ def port(self) -> int:
+ """The port of the active connection. See :pq:`PQport()`."""
+ return int(self._get_pgconn_attr("port"))
+
+ @property
+ def dbname(self) -> str:
+ """The database name of the connection. See :pq:`PQdb()`."""
+ return self._get_pgconn_attr("db")
+
+ @property
+ def user(self) -> str:
+ """The user name of the connection. See :pq:`PQuser()`."""
+ return self._get_pgconn_attr("user")
+
+ @property
+ def password(self) -> str:
+ """The password of the connection. See :pq:`PQpass()`."""
+ return self._get_pgconn_attr("password")
+
+ @property
+ def options(self) -> str:
+ """
+ The command-line options passed in the connection request.
+ See :pq:`PQoptions`.
+ """
+ return self._get_pgconn_attr("options")
+
+ def get_parameters(self) -> dict[str, str]:
+ """Return the connection parameters values.
+
+ Return all the parameters set to a non-default value, which might come
+ either from the connection string and parameters passed to
+ `~Connection.connect()` or from environment variables. The password
+ is never returned (you can read it using the `password` attribute).
+ """
+ pyenc = self.encoding
+
+ # Get the known defaults to avoid reporting them
+ defaults = {
+ i.keyword: i.compiled
+ for i in pq.Conninfo.get_defaults()
+ if i.compiled is not None
+ }
+ # Not returned by the libq. Bug? Bet we're using SSH.
+ defaults.setdefault(b"channel_binding", b"prefer")
+ defaults[b"passfile"] = str(Path.home() / ".pgpass").encode()
+
+ return {
+ i.keyword.decode(pyenc): i.val.decode(pyenc)
+ for i in self.pgconn.info
+ if i.val is not None
+ and i.keyword != b"password"
+ and i.val != defaults.get(i.keyword)
+ }
+
+ @property
+ def dsn(self) -> str:
+ """Return the connection string to connect to the database.
+
+ The string contains all the parameters set to a non-default value,
+ which might come either from the connection string and parameters
+ passed to `~Connection.connect()` or from environment variables. The
+ password is never returned (you can read it using the `password`
+ attribute).
+ """
+ return make_conninfo(**self.get_parameters())
+
+ @property
+ def status(self) -> pq.ConnStatus:
+ """The status of the connection. See :pq:`PQstatus()`."""
+ return pq.ConnStatus(self.pgconn.status)
+
+ @property
+ def transaction_status(self) -> pq.TransactionStatus:
+ """
+ The current in-transaction status of the session.
+ See :pq:`PQtransactionStatus()`.
+ """
+ return pq.TransactionStatus(self.pgconn.transaction_status)
+
+ @property
+ def pipeline_status(self) -> pq.PipelineStatus:
+ """
+ The current pipeline status of the client.
+ See :pq:`PQpipelineStatus()`.
+ """
+ return pq.PipelineStatus(self.pgconn.pipeline_status)
+
+ def parameter_status(self, param_name: str) -> str | None:
+ """
+ Return a parameter setting of the connection.
+
+ Return `None` is the parameter is unknown.
+ """
+ res = self.pgconn.parameter_status(param_name.encode(self.encoding))
+ return res.decode(self.encoding) if res is not None else None
+
+ @property
+ def server_version(self) -> int:
+ """
+ An integer representing the server version. See :pq:`PQserverVersion()`.
+ """
+ return self.pgconn.server_version
+
+ @property
+ def backend_pid(self) -> int:
+ """
+ The process ID (PID) of the backend process handling this connection.
+ See :pq:`PQbackendPID()`.
+ """
+ return self.pgconn.backend_pid
+
+ @property
+ def error_message(self) -> str:
+ """
+ The error message most recently generated by an operation on the connection.
+ See :pq:`PQerrorMessage()`.
+ """
+ return self._get_pgconn_attr("error_message")
+
+ @property
+ def timezone(self) -> tzinfo:
+ """The Python timezone info of the connection's timezone."""
+ return get_tzinfo(self.pgconn)
+
+ @property
+ def encoding(self) -> str:
+ """The Python codec name of the connection's client encoding."""
+ return pgconn_encoding(self.pgconn)
+
+ def _get_pgconn_attr(self, name: str) -> str:
+ value: bytes = getattr(self.pgconn, name)
+ return value.decode(self.encoding)
from .cursor import Cursor
from ._compat import LiteralString, Self
from .pq.misc import connection_summary
-from .conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo
+from .conninfo import make_conninfo, conninfo_to_dict
from .conninfo import conninfo_attempts, ConnDict, timeout_from_conninfo
from ._pipeline import BasePipeline, Pipeline
from .generators import notifies, connect, execute
from ._preparing import PrepareManager
from .transaction import Transaction
from .server_cursor import ServerCursor
+from ._connection_info import ConnectionInfo
if TYPE_CHECKING:
from .pq.abc import PGconn, PGresult
import logging
from typing import Any
from random import shuffle
-from pathlib import Path
-from datetime import tzinfo
from functools import lru_cache
from ipaddress import ip_address
from dataclasses import dataclass
from . import pq
from . import errors as e
-from ._tz import get_tzinfo
-from ._encodings import pgconn_encoding
ConnDict: TypeAlias = "dict[str, Any]"
return s
-class ConnectionInfo:
- """Allow access to information about the connection."""
-
- __module__ = "psycopg"
-
- def __init__(self, pgconn: pq.abc.PGconn):
- self.pgconn = pgconn
-
- @property
- def vendor(self) -> str:
- """A string representing the database vendor connected to."""
- return "PostgreSQL"
-
- @property
- def host(self) -> str:
- """The server host name of the active connection. See :pq:`PQhost()`."""
- return self._get_pgconn_attr("host")
-
- @property
- def hostaddr(self) -> str:
- """The server IP address of the connection. See :pq:`PQhostaddr()`."""
- return self._get_pgconn_attr("hostaddr")
-
- @property
- def port(self) -> int:
- """The port of the active connection. See :pq:`PQport()`."""
- return int(self._get_pgconn_attr("port"))
-
- @property
- def dbname(self) -> str:
- """The database name of the connection. See :pq:`PQdb()`."""
- return self._get_pgconn_attr("db")
-
- @property
- def user(self) -> str:
- """The user name of the connection. See :pq:`PQuser()`."""
- return self._get_pgconn_attr("user")
-
- @property
- def password(self) -> str:
- """The password of the connection. See :pq:`PQpass()`."""
- return self._get_pgconn_attr("password")
-
- @property
- def options(self) -> str:
- """
- The command-line options passed in the connection request.
- See :pq:`PQoptions`.
- """
- return self._get_pgconn_attr("options")
-
- def get_parameters(self) -> dict[str, str]:
- """Return the connection parameters values.
-
- Return all the parameters set to a non-default value, which might come
- either from the connection string and parameters passed to
- `~Connection.connect()` or from environment variables. The password
- is never returned (you can read it using the `password` attribute).
- """
- pyenc = self.encoding
-
- # Get the known defaults to avoid reporting them
- defaults = {
- i.keyword: i.compiled
- for i in pq.Conninfo.get_defaults()
- if i.compiled is not None
- }
- # Not returned by the libq. Bug? Bet we're using SSH.
- defaults.setdefault(b"channel_binding", b"prefer")
- defaults[b"passfile"] = str(Path.home() / ".pgpass").encode()
-
- return {
- i.keyword.decode(pyenc): i.val.decode(pyenc)
- for i in self.pgconn.info
- if i.val is not None
- and i.keyword != b"password"
- and i.val != defaults.get(i.keyword)
- }
-
- @property
- def dsn(self) -> str:
- """Return the connection string to connect to the database.
-
- The string contains all the parameters set to a non-default value,
- which might come either from the connection string and parameters
- passed to `~Connection.connect()` or from environment variables. The
- password is never returned (you can read it using the `password`
- attribute).
- """
- return make_conninfo(**self.get_parameters())
-
- @property
- def status(self) -> pq.ConnStatus:
- """The status of the connection. See :pq:`PQstatus()`."""
- return pq.ConnStatus(self.pgconn.status)
-
- @property
- def transaction_status(self) -> pq.TransactionStatus:
- """
- The current in-transaction status of the session.
- See :pq:`PQtransactionStatus()`.
- """
- return pq.TransactionStatus(self.pgconn.transaction_status)
-
- @property
- def pipeline_status(self) -> pq.PipelineStatus:
- """
- The current pipeline status of the client.
- See :pq:`PQpipelineStatus()`.
- """
- return pq.PipelineStatus(self.pgconn.pipeline_status)
-
- def parameter_status(self, param_name: str) -> str | None:
- """
- Return a parameter setting of the connection.
-
- Return `None` is the parameter is unknown.
- """
- res = self.pgconn.parameter_status(param_name.encode(self.encoding))
- return res.decode(self.encoding) if res is not None else None
-
- @property
- def server_version(self) -> int:
- """
- An integer representing the server version. See :pq:`PQserverVersion()`.
- """
- return self.pgconn.server_version
-
- @property
- def backend_pid(self) -> int:
- """
- The process ID (PID) of the backend process handling this connection.
- See :pq:`PQbackendPID()`.
- """
- return self.pgconn.backend_pid
-
- @property
- def error_message(self) -> str:
- """
- The error message most recently generated by an operation on the connection.
- See :pq:`PQerrorMessage()`.
- """
- return self._get_pgconn_attr("error_message")
-
- @property
- def timezone(self) -> tzinfo:
- """The Python timezone info of the connection's timezone."""
- return get_tzinfo(self.pgconn)
-
- @property
- def encoding(self) -> str:
- """The Python codec name of the connection's client encoding."""
- return pgconn_encoding(self.pgconn)
-
- def _get_pgconn_attr(self, name: str) -> str:
- value: bytes = getattr(self.pgconn, name)
- return value.decode(self.encoding)
-
-
def conninfo_attempts(params: ConnDict) -> list[ConnDict]:
"""Split a set of connection params on the single attempts to perform.
from ..abc import AdaptContext
from ..rows import Row, RowFactory, AsyncRowFactory, TupleRow
from .._compat import Self
-from ..conninfo import ConnectionInfo
from ..connection import Connection
from .._adapters_map import AdaptersMap
+from .._connection_info import ConnectionInfo
from ..connection_async import AsyncConnection
from ._types import adapters
--- /dev/null
+import datetime as dt
+
+import pytest
+
+import psycopg
+from psycopg.conninfo import make_conninfo, conninfo_to_dict
+from psycopg._encodings import pg2pyenc
+
+from .fix_crdb import crdb_encoding
+
+
+@pytest.mark.parametrize(
+ "attr",
+ [("dbname", "db"), "host", "hostaddr", "user", "password", "options"],
+)
+def test_attrs(conn, attr):
+ if isinstance(attr, tuple):
+ info_attr, pgconn_attr = attr
+ else:
+ info_attr = pgconn_attr = attr
+
+ if info_attr == "hostaddr" and psycopg.pq.version() < 120000:
+ pytest.skip("hostaddr not supported on libpq < 12")
+
+ info_val = getattr(conn.info, info_attr)
+ pgconn_val = getattr(conn.pgconn, pgconn_attr).decode()
+ assert info_val == pgconn_val
+
+ conn.close()
+ with pytest.raises(psycopg.OperationalError):
+ getattr(conn.info, info_attr)
+
+
+@pytest.mark.libpq("< 12")
+def test_hostaddr_not_supported(conn):
+ with pytest.raises(psycopg.NotSupportedError):
+ conn.info.hostaddr
+
+
+def test_port(conn):
+ assert conn.info.port == int(conn.pgconn.port.decode())
+ conn.close()
+ with pytest.raises(psycopg.OperationalError):
+ conn.info.port
+
+
+def test_get_params(conn, dsn):
+ info = conn.info.get_parameters()
+ for k, v in conninfo_to_dict(dsn).items():
+ if k != "password":
+ assert info.get(k) == v
+ else:
+ assert k not in info
+
+
+def test_dsn(conn, dsn):
+ dsn = conn.info.dsn
+ assert "password" not in dsn
+ for k, v in conninfo_to_dict(dsn).items():
+ if k != "password":
+ assert f"{k}=" in dsn
+
+
+def test_get_params_env(conn_cls, dsn, monkeypatch):
+ dsn = conninfo_to_dict(dsn)
+ dsn.pop("application_name", None)
+
+ monkeypatch.delenv("PGAPPNAME", raising=False)
+ with conn_cls.connect(**dsn) as conn:
+ assert "application_name" not in conn.info.get_parameters()
+
+ monkeypatch.setenv("PGAPPNAME", "hello test")
+ with conn_cls.connect(**dsn) as conn:
+ assert conn.info.get_parameters()["application_name"] == "hello test"
+
+
+def test_dsn_env(conn_cls, dsn, monkeypatch):
+ dsn = conninfo_to_dict(dsn)
+ dsn.pop("application_name", None)
+
+ monkeypatch.delenv("PGAPPNAME", raising=False)
+ with conn_cls.connect(**dsn) as conn:
+ assert "application_name=" not in conn.info.dsn
+
+ monkeypatch.setenv("PGAPPNAME", "hello test")
+ with conn_cls.connect(**dsn) as conn:
+ assert "application_name='hello test'" in conn.info.dsn
+
+
+def test_status(conn):
+ assert conn.info.status.name == "OK"
+ conn.close()
+ assert conn.info.status.name == "BAD"
+
+
+def test_transaction_status(conn):
+ assert conn.info.transaction_status.name == "IDLE"
+ conn.close()
+ assert conn.info.transaction_status.name == "UNKNOWN"
+
+
+@pytest.mark.pipeline
+def test_pipeline_status(conn):
+ assert not conn.info.pipeline_status
+ assert conn.info.pipeline_status.name == "OFF"
+ with conn.pipeline():
+ assert conn.info.pipeline_status
+ assert conn.info.pipeline_status.name == "ON"
+
+
+@pytest.mark.libpq("< 14")
+def test_pipeline_status_no_pipeline(conn):
+ assert not conn.info.pipeline_status
+ assert conn.info.pipeline_status.name == "OFF"
+
+
+def test_no_password(dsn):
+ dsn2 = make_conninfo(dsn, password="the-pass-word")
+ pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode())
+ info = psycopg.ConnectionInfo(pgconn)
+ assert info.password == "the-pass-word"
+ assert "password" not in info.get_parameters()
+ assert info.get_parameters()["dbname"] == info.dbname
+
+
+def test_dsn_no_password(dsn):
+ dsn2 = make_conninfo(dsn, password="the-pass-word")
+ pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode())
+ info = psycopg.ConnectionInfo(pgconn)
+ assert info.password == "the-pass-word"
+ assert "password" not in info.dsn
+ assert f"dbname={info.dbname}" in info.dsn
+
+
+def test_parameter_status(conn):
+ assert conn.info.parameter_status("nosuchparam") is None
+ tz = conn.info.parameter_status("TimeZone")
+ assert tz and isinstance(tz, str)
+ assert tz == conn.execute("show timezone").fetchone()[0]
+
+
+@pytest.mark.crdb("skip")
+def test_server_version(conn):
+ assert conn.info.server_version == conn.pgconn.server_version
+
+
+def test_error_message(conn):
+ assert conn.info.error_message == ""
+ with pytest.raises(psycopg.ProgrammingError) as ex:
+ conn.execute("wat")
+
+ assert conn.info.error_message
+ assert str(ex.value) in conn.info.error_message
+ assert ex.value.diag.severity in conn.info.error_message
+
+ conn.close()
+ assert "NULL" in conn.info.error_message
+
+
+@pytest.mark.crdb_skip("backend pid")
+def test_backend_pid(conn):
+ assert conn.info.backend_pid
+ assert conn.info.backend_pid == conn.pgconn.backend_pid
+ conn.close()
+ with pytest.raises(psycopg.OperationalError):
+ conn.info.backend_pid
+
+
+def test_timezone(conn):
+ conn.execute("set timezone to 'Europe/Rome'")
+ tz = conn.info.timezone
+ assert isinstance(tz, dt.tzinfo)
+ offset = tz.utcoffset(dt.datetime(2000, 1, 1))
+ assert offset and offset.total_seconds() == 3600
+ offset = tz.utcoffset(dt.datetime(2000, 7, 1))
+ assert offset and offset.total_seconds() == 7200
+
+
+@pytest.mark.crdb("skip", reason="crdb doesn't allow invalid timezones")
+def test_timezone_warn(conn, caplog):
+ conn.execute("set timezone to 'FOOBAR0'")
+ assert len(caplog.records) == 0
+ tz = conn.info.timezone
+ assert tz == dt.timezone.utc
+ assert len(caplog.records) == 1
+ assert "FOOBAR0" in caplog.records[0].message
+
+ conn.info.timezone
+ assert len(caplog.records) == 1
+
+ conn.execute("set timezone to 'FOOBAAR0'")
+ assert len(caplog.records) == 1
+ conn.info.timezone
+ assert len(caplog.records) == 2
+ assert "FOOBAAR0" in caplog.records[1].message
+
+
+def test_encoding(conn):
+ enc = conn.execute("show client_encoding").fetchone()[0]
+ assert conn.info.encoding == pg2pyenc(enc.encode())
+
+
+@pytest.mark.crdb("skip", reason="encoding not normalized")
+@pytest.mark.parametrize(
+ "enc, out, codec",
+ [
+ ("utf8", "UTF8", "utf-8"),
+ ("utf-8", "UTF8", "utf-8"),
+ ("utf_8", "UTF8", "utf-8"),
+ ("eucjp", "EUC_JP", "euc_jp"),
+ ("euc-jp", "EUC_JP", "euc_jp"),
+ ("latin9", "LATIN9", "iso8859-15"),
+ ],
+)
+def test_normalize_encoding(conn, enc, out, codec):
+ conn.execute("select set_config('client_encoding', %s, false)", [enc])
+ assert conn.info.parameter_status("client_encoding") == out
+ assert conn.info.encoding == codec
+
+
+@pytest.mark.parametrize(
+ "enc, out, codec",
+ [
+ ("utf8", "UTF8", "utf-8"),
+ ("utf-8", "UTF8", "utf-8"),
+ ("utf_8", "UTF8", "utf-8"),
+ crdb_encoding("eucjp", "EUC_JP", "euc_jp"),
+ crdb_encoding("euc-jp", "EUC_JP", "euc_jp"),
+ ],
+)
+def test_encoding_env_var(conn_cls, dsn, monkeypatch, enc, out, codec):
+ monkeypatch.setenv("PGCLIENTENCODING", enc)
+ with conn_cls.connect(dsn) as conn:
+ clienc = conn.info.parameter_status("client_encoding")
+ assert clienc
+ if conn.info.vendor == "PostgreSQL":
+ assert clienc == out
+ else:
+ assert clienc.replace("-", "").replace("_", "").upper() == out
+ assert conn.info.encoding == codec
+
+
+@pytest.mark.crdb_skip("encoding")
+def test_set_encoding_unsupported(conn):
+ cur = conn.cursor()
+ cur.execute("set client_encoding to EUC_TW")
+ with pytest.raises(psycopg.NotSupportedError):
+ cur.execute("select 'x'")
+
+
+def test_vendor(conn):
+ assert conn.info.vendor
import socket
import asyncio
-import datetime as dt
import pytest
import psycopg
from psycopg import ProgrammingError
-from psycopg.conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo
+from psycopg.conninfo import make_conninfo, conninfo_to_dict
from psycopg.conninfo import conninfo_attempts, conninfo_attempts_async
from psycopg.conninfo import timeout_from_conninfo, _DEFAULT_CONNECT_TIMEOUT
-from psycopg._encodings import pg2pyenc
-
-from .fix_crdb import crdb_encoding
snowman = "\u2603"
assert dsnin == dsnout
-class TestConnectionInfo:
- @pytest.mark.parametrize(
- "attr",
- [("dbname", "db"), "host", "hostaddr", "user", "password", "options"],
- )
- def test_attrs(self, conn, attr):
- if isinstance(attr, tuple):
- info_attr, pgconn_attr = attr
- else:
- info_attr = pgconn_attr = attr
-
- if info_attr == "hostaddr" and psycopg.pq.version() < 120000:
- pytest.skip("hostaddr not supported on libpq < 12")
-
- info_val = getattr(conn.info, info_attr)
- pgconn_val = getattr(conn.pgconn, pgconn_attr).decode()
- assert info_val == pgconn_val
-
- conn.close()
- with pytest.raises(psycopg.OperationalError):
- getattr(conn.info, info_attr)
-
- @pytest.mark.libpq("< 12")
- def test_hostaddr_not_supported(self, conn):
- with pytest.raises(psycopg.NotSupportedError):
- conn.info.hostaddr
-
- def test_port(self, conn):
- assert conn.info.port == int(conn.pgconn.port.decode())
- conn.close()
- with pytest.raises(psycopg.OperationalError):
- conn.info.port
-
- def test_get_params(self, conn, dsn):
- info = conn.info.get_parameters()
- for k, v in conninfo_to_dict(dsn).items():
- if k != "password":
- assert info.get(k) == v
- else:
- assert k not in info
-
- def test_dsn(self, conn, dsn):
- dsn = conn.info.dsn
- assert "password" not in dsn
- for k, v in conninfo_to_dict(dsn).items():
- if k != "password":
- assert f"{k}=" in dsn
-
- def test_get_params_env(self, conn_cls, dsn, monkeypatch):
- dsn = conninfo_to_dict(dsn)
- dsn.pop("application_name", None)
-
- monkeypatch.delenv("PGAPPNAME", raising=False)
- with conn_cls.connect(**dsn) as conn:
- assert "application_name" not in conn.info.get_parameters()
-
- monkeypatch.setenv("PGAPPNAME", "hello test")
- with conn_cls.connect(**dsn) as conn:
- assert conn.info.get_parameters()["application_name"] == "hello test"
-
- def test_dsn_env(self, conn_cls, dsn, monkeypatch):
- dsn = conninfo_to_dict(dsn)
- dsn.pop("application_name", None)
-
- monkeypatch.delenv("PGAPPNAME", raising=False)
- with conn_cls.connect(**dsn) as conn:
- assert "application_name=" not in conn.info.dsn
-
- monkeypatch.setenv("PGAPPNAME", "hello test")
- with conn_cls.connect(**dsn) as conn:
- assert "application_name='hello test'" in conn.info.dsn
-
- def test_status(self, conn):
- assert conn.info.status.name == "OK"
- conn.close()
- assert conn.info.status.name == "BAD"
-
- def test_transaction_status(self, conn):
- assert conn.info.transaction_status.name == "IDLE"
- conn.close()
- assert conn.info.transaction_status.name == "UNKNOWN"
-
- @pytest.mark.pipeline
- def test_pipeline_status(self, conn):
- assert not conn.info.pipeline_status
- assert conn.info.pipeline_status.name == "OFF"
- with conn.pipeline():
- assert conn.info.pipeline_status
- assert conn.info.pipeline_status.name == "ON"
-
- @pytest.mark.libpq("< 14")
- def test_pipeline_status_no_pipeline(self, conn):
- assert not conn.info.pipeline_status
- assert conn.info.pipeline_status.name == "OFF"
-
- def test_no_password(self, dsn):
- dsn2 = make_conninfo(dsn, password="the-pass-word")
- pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode())
- info = ConnectionInfo(pgconn)
- assert info.password == "the-pass-word"
- assert "password" not in info.get_parameters()
- assert info.get_parameters()["dbname"] == info.dbname
-
- def test_dsn_no_password(self, dsn):
- dsn2 = make_conninfo(dsn, password="the-pass-word")
- pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode())
- info = ConnectionInfo(pgconn)
- assert info.password == "the-pass-word"
- assert "password" not in info.dsn
- assert f"dbname={info.dbname}" in info.dsn
-
- def test_parameter_status(self, conn):
- assert conn.info.parameter_status("nosuchparam") is None
- tz = conn.info.parameter_status("TimeZone")
- assert tz and isinstance(tz, str)
- assert tz == conn.execute("show timezone").fetchone()[0]
-
- @pytest.mark.crdb("skip")
- def test_server_version(self, conn):
- assert conn.info.server_version == conn.pgconn.server_version
-
- def test_error_message(self, conn):
- assert conn.info.error_message == ""
- with pytest.raises(psycopg.ProgrammingError) as ex:
- conn.execute("wat")
-
- assert conn.info.error_message
- assert str(ex.value) in conn.info.error_message
- assert ex.value.diag.severity in conn.info.error_message
-
- conn.close()
- assert "NULL" in conn.info.error_message
-
- @pytest.mark.crdb_skip("backend pid")
- def test_backend_pid(self, conn):
- assert conn.info.backend_pid
- assert conn.info.backend_pid == conn.pgconn.backend_pid
- conn.close()
- with pytest.raises(psycopg.OperationalError):
- conn.info.backend_pid
-
- def test_timezone(self, conn):
- conn.execute("set timezone to 'Europe/Rome'")
- tz = conn.info.timezone
- assert isinstance(tz, dt.tzinfo)
- offset = tz.utcoffset(dt.datetime(2000, 1, 1))
- assert offset and offset.total_seconds() == 3600
- offset = tz.utcoffset(dt.datetime(2000, 7, 1))
- assert offset and offset.total_seconds() == 7200
-
- @pytest.mark.crdb("skip", reason="crdb doesn't allow invalid timezones")
- def test_timezone_warn(self, conn, caplog):
- conn.execute("set timezone to 'FOOBAR0'")
- assert len(caplog.records) == 0
- tz = conn.info.timezone
- assert tz == dt.timezone.utc
- assert len(caplog.records) == 1
- assert "FOOBAR0" in caplog.records[0].message
-
- conn.info.timezone
- assert len(caplog.records) == 1
-
- conn.execute("set timezone to 'FOOBAAR0'")
- assert len(caplog.records) == 1
- conn.info.timezone
- assert len(caplog.records) == 2
- assert "FOOBAAR0" in caplog.records[1].message
-
- def test_encoding(self, conn):
- enc = conn.execute("show client_encoding").fetchone()[0]
- assert conn.info.encoding == pg2pyenc(enc.encode())
-
- @pytest.mark.crdb("skip", reason="encoding not normalized")
- @pytest.mark.parametrize(
- "enc, out, codec",
- [
- ("utf8", "UTF8", "utf-8"),
- ("utf-8", "UTF8", "utf-8"),
- ("utf_8", "UTF8", "utf-8"),
- ("eucjp", "EUC_JP", "euc_jp"),
- ("euc-jp", "EUC_JP", "euc_jp"),
- ("latin9", "LATIN9", "iso8859-15"),
- ],
- )
- def test_normalize_encoding(self, conn, enc, out, codec):
- conn.execute("select set_config('client_encoding', %s, false)", [enc])
- assert conn.info.parameter_status("client_encoding") == out
- assert conn.info.encoding == codec
-
- @pytest.mark.parametrize(
- "enc, out, codec",
- [
- ("utf8", "UTF8", "utf-8"),
- ("utf-8", "UTF8", "utf-8"),
- ("utf_8", "UTF8", "utf-8"),
- crdb_encoding("eucjp", "EUC_JP", "euc_jp"),
- crdb_encoding("euc-jp", "EUC_JP", "euc_jp"),
- ],
- )
- def test_encoding_env_var(self, conn_cls, dsn, monkeypatch, enc, out, codec):
- monkeypatch.setenv("PGCLIENTENCODING", enc)
- with conn_cls.connect(dsn) as conn:
- clienc = conn.info.parameter_status("client_encoding")
- assert clienc
- if conn.info.vendor == "PostgreSQL":
- assert clienc == out
- else:
- assert clienc.replace("-", "").replace("_", "").upper() == out
- assert conn.info.encoding == codec
-
- @pytest.mark.crdb_skip("encoding")
- def test_set_encoding_unsupported(self, conn):
- cur = conn.cursor()
- cur.execute("set client_encoding to EUC_TW")
- with pytest.raises(psycopg.NotSupportedError):
- cur.execute("select 'x'")
-
- def test_vendor(self, conn):
- assert conn.info.vendor
-
-
@pytest.mark.parametrize(
"conninfo, want, env",
[