Show the underlying connection basic identification and status.
from . import errors as e
from . import waiting
from . import encodings
-from .pq import TransactionStatus, ExecStatus, Format
+from .pq import ConnStatus, ExecStatus, TransactionStatus, Format
from .sql import Composable
from .proto import PQGen, PQGenConn, RV, Query, Params, AdaptContext
from .proto import ConnectionType
if status == TransactionStatus.UNKNOWN:
return
- elif status == TransactionStatus.INTRANS:
- msg = (
- f"connection {self} was deleted with an open transaction,"
- " changes discarded by the server"
- )
- else:
- status = TransactionStatus(status) # in case we got an int
- msg = f"connection {self} was deleted open in status {status.name}"
+ status = TransactionStatus(status) # in case we got an int
+ warnings.warn(
+ f"connection {self} was deleted while still open."
+ f" Please use 'with' or '.close()' to close the connection",
+ ResourceWarning,
+ )
- warnings.warn(msg, ResourceWarning)
+ def __repr__(self) -> str:
+ cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
+ info = pq.misc.connection_summary(self.pgconn)
+ return f"<{cls} {info} at 0x{id(self):x}>"
@property
def closed(self) -> bool:
"""`True` if the connection is closed."""
- return self.pgconn.status == self.ConnStatus.BAD
+ return self.pgconn.status == ConnStatus.BAD
@property
def autocommit(self) -> bool:
Only used to implement internal commands such as commit, returning
no result. The cursor can do more complex stuff.
"""
- if self.pgconn.status != self.ConnStatus.OK:
- if self.pgconn.status == self.ConnStatus.BAD:
+ if self.pgconn.status != ConnStatus.OK:
+ if self.pgconn.status == ConnStatus.BAD:
raise e.OperationalError("the connection is closed")
raise e.InterfaceError(
f"cannot execute operations: the connection is"
from typing import Any, Dict, List, Match, Optional, Sequence, Type, Union
from types import TracebackType
+from . import pq
from .pq import Format, ExecStatus
from .proto import ConnectionType
from .generators import copy_from, copy_to, copy_end
else:
self._format_copy_row = self._format_row_binary
+ def __repr__(self) -> str:
+ cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
+ info = pq.misc.connection_summary(self.connection.pgconn)
+ return f"<{cls} {info} at 0x{id(self):x}>"
+
def _format_row(self, row: Sequence[Any]) -> bytes:
"""Convert a Python sequence to the data to send for copy"""
out: List[Optional[bytes]] = []
self._query: Optional[bytes] = None
self._params: Optional[List[Optional[bytes]]] = None
+ def __repr__(self) -> str:
+ cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
+ info = pq.misc.connection_summary(self._conn.pgconn)
+ status = " (closed)" if self._closed else ""
+ return f"<{cls}{status} {info} at 0x{id(self):x}>"
+
@property
def connection(self) -> ConnectionType:
"""The connection this cursor is using."""
from typing import cast, NamedTuple, Optional, Union
from ..errors import OperationalError
-from ._enums import DiagnosticField, ConnStatus
+from ._enums import DiagnosticField, ConnStatus, TransactionStatus
from .proto import PGconn, PGresult
msg = "no details available"
return msg
+
+
+def connection_summary(pgconn: PGconn) -> str:
+ """
+ Return summary information on a connection.
+
+ Useful for __repr__
+ """
+ parts = []
+ if pgconn.status == ConnStatus.OK:
+
+ status = TransactionStatus(pgconn.transaction_status).name
+ if not pgconn.host.startswith(b"/"):
+ parts.append(("host", pgconn.host.decode("utf-8")))
+ if pgconn.port != b"5432":
+ parts.append(("port", pgconn.port.decode("utf-8")))
+ if pgconn.user != pgconn.db:
+ parts.append(("user", pgconn.user.decode("utf-8")))
+ parts.append(("database", pgconn.db.decode("utf-8")))
+ else:
+ status = ConnStatus(pgconn.status).name
+
+ sparts = " ".join("%s=%s" % part for part in parts)
+ if sparts:
+ sparts = f" ({sparts})"
+ return f"[{status}]{sparts}"
from . import _pq_ctypes as impl
from .misc import PGnotify, ConninfoOption, PQerror, PGresAttDesc
-from .misc import error_message
+from .misc import error_message, connection_summary
from ._enums import ConnStatus, DiagnosticField, ExecStatus, Format
from ._enums import Ping, PollingStatus, TransactionStatus
if os.getpid() == self._procpid:
self.finish()
+ def __repr__(self) -> str:
+ cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
+ info = connection_summary(self)
+ return f"<{cls} {info} at 0x{id(self):x}>"
+
@classmethod
def connect(cls, conninfo: bytes) -> "PGconn":
if not isinstance(conninfo, bytes):
from types import TracebackType
from typing import Generic, Optional, Type, Union, TYPE_CHECKING
+from . import pq
from . import sql
from .pq import TransactionStatus
from .proto import ConnectionType, PQGen
self._conn = connection
self._savepoint_name = savepoint_name or ""
self.force_rollback = force_rollback
- self._yolo = True
+ self._entered = self._exited = False
@property
def connection(self) -> ConnectionType:
return self._savepoint_name
def __repr__(self) -> str:
- args = [f"connection={self.connection}"]
- if not self.savepoint_name:
- args.append(f"savepoint_name={self.savepoint_name!r}")
- if self.force_rollback:
- args.append("force_rollback=True")
- return f"{self.__class__.__qualname__}({', '.join(args)})"
+ cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
+ info = pq.misc.connection_summary(self._conn.pgconn)
+ if not self._entered:
+ status = "inactive"
+ elif not self._exited:
+ status = "active"
+ else:
+ status = "terminated"
+
+ sp = f"{self.savepoint_name!r} " if self.savepoint_name else ""
+ return f"<{cls} {sp}({status}) {info} at 0x{id(self):x}>"
def _enter_gen(self) -> PQGen[None]:
- if not self._yolo:
+ if self._entered:
raise TypeError("transaction blocks can be used only once")
- else:
- self._yolo = False
+ self._entered = True
self._outer_transaction = (
self._conn.pgconn.transaction_status == TransactionStatus.IDLE
def _commit_gen(self) -> PQGen[None]:
assert self._conn._savepoints[-1] == self._savepoint_name
self._conn._savepoints.pop()
+ self._exited = True
commands = []
if self._savepoint_name and not self._outer_transaction:
import logging
from psycopg3_c.pq.libpq cimport Oid
-from psycopg3.pq.misc import PGnotify
+from psycopg3.pq.misc import PGnotify, connection_summary
logger = logging.getLogger('psycopg3')
if self._procpid == getpid():
self.finish()
+ def __repr__(self) -> str:
+ cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
+ info = connection_summary(self)
+ return f"<{cls} {info} at 0x{id(self):x}>"
+
@classmethod
def connect(cls, const char *conninfo) -> PGconn:
cdef libpq.PGconn* pgconn = libpq.PQconnectdb(conninfo)
rec = caplog.records[0]
assert rec.levelno == logging.ERROR
assert "hello error" in rec.message
+
+
+def test_str(pgconn, dsn):
+ assert "[IDLE]" in str(pgconn)
+ pgconn.finish()
+ assert "[BAD]" in str(pgconn)
+
+ pgconn2 = pq.PGconn.connect_start(dsn.encode("utf8"))
+ assert "[" in str(pgconn2)
+ assert "[IDLE]" not in str(pgconn2)
conn = Connection.connect(dsn)
conn.execute("select 1")
del conn
- assert "discarded" in str(recwarn.pop(ResourceWarning).message)
+ assert "INTRANS" in str(recwarn.pop(ResourceWarning).message)
conn = Connection.connect(dsn)
try:
cur = conn.execute("select 12, 22")
assert cur.fetchone() == (12, 22)
+
+
+def test_str(conn):
+ assert "[IDLE]" in str(conn)
+ conn.close()
+ assert "[BAD]" in str(conn)
conn = await AsyncConnection.connect(dsn)
await conn.execute("select 1")
del conn
- assert "discarded" in str(recwarn.pop(ResourceWarning).message)
+ assert "INTRANS" in str(recwarn.pop(ResourceWarning).message)
conn = await AsyncConnection.connect(dsn)
try:
cur = await aconn.execute("select 12, 22")
assert await cur.fetchone() == (12, 22)
+
+
+async def test_str(aconn):
+ assert "[IDLE]" in str(aconn)
+ await aconn.close()
+ assert "[BAD]" in str(aconn)
list(copy)
+def test_str(conn):
+ cur = conn.cursor()
+ with cur.copy("copy (select 1) to stdout") as copy:
+ assert "[ACTIVE]" in str(copy)
+ list(copy)
+
+ assert "[INTRANS]" in str(copy)
+
+
def ensure_table(cur, tabledef, name="copy_in"):
cur.execute(f"drop table if exists {name}")
cur.execute(f"create table {name} ({tabledef})")
pass
+async def test_str(aconn):
+ cur = await aconn.cursor()
+ async with cur.copy("copy (select 1) to stdout") as copy:
+ assert "[ACTIVE]" in str(copy)
+ async for record in copy:
+ pass
+
+ assert "[INTRANS]" in str(copy)
+
+
async def ensure_table(cur, tabledef, name="copy_in"):
await cur.execute(f"drop table if exists {name}")
await cur.execute(f"create table {name} ({tabledef})")
pickled = pickle.dumps(description, pickle.HIGHEST_PROTOCOL)
unpickled = pickle.loads(pickled)
assert [tuple(d) for d in description] == [tuple(d) for d in unpickled]
+
+
+def test_str(conn):
+ cur = conn.cursor()
+ assert "[IDLE]" in str(cur)
+ assert "(closed)" not in str(cur)
+ cur.close()
+ assert "(closed)" in str(cur)
+ assert "[IDLE]" in str(cur)
assert (await cur.fetchone()) == (3,)
async for rec in cur:
assert False
+
+
+async def test_str(aconn):
+ cur = await aconn.cursor()
+ assert "[IDLE]" in str(cur)
+ assert "(closed)" not in str(cur)
+ await cur.close()
+ assert "(closed)" in str(cur)
+ assert "[IDLE]" in str(cur)
assert not inserted(svcconn) # Not yet committed
# Changes committed
assert inserted(svcconn) == {"outer-before", "outer-after"}
+
+
+def test_str(conn):
+ with conn.transaction() as tx:
+ assert "[INTRANS]" in str(tx)
+ assert "(active)" in str(tx)
+ assert "'" not in str(tx)
+ with conn.transaction("wat") as tx2:
+ assert "[INTRANS]" in str(tx2)
+ assert "'wat'" in str(tx2)
+
+ assert "[IDLE]" in str(tx)
+ assert "(terminated)" in str(tx)
assert not inserted(svcconn) # Not yet committed
# Changes committed
assert inserted(svcconn) == {"outer-before", "outer-after"}
+
+
+async def test_str(aconn):
+ async with aconn.transaction() as tx:
+ assert "[INTRANS]" in str(tx)
+ assert "(active)" in str(tx)
+ assert "'" not in str(tx)
+ async with aconn.transaction("wat") as tx2:
+ assert "[INTRANS]" in str(tx2)
+ assert "'wat'" in str(tx2)
+
+ assert "[IDLE]" in str(tx)
+ assert "(terminated)" in str(tx)