psycopg/psycopg/errors.py: E125, E128, E302
# Allow concatenated string literals from async_to_sync
- psycopg/psycopg/_server_cursor.py: E501
psycopg_pool/psycopg_pool/pool.py: E501
# Pytest's importorskip() getting in the way
import sys
import logging
-from typing import TYPE_CHECKING, Generic, NamedTuple, TypeAlias
+from typing import TYPE_CHECKING, Any, Generic, NamedTuple, TypeAlias
from weakref import ReferenceType, ref
from warnings import warn
from functools import partial
self._deferrable: bool | None = None
self._begin_statement = b""
- def __del__(self) -> None:
+ def __del__(self, __warn: Any = warn) -> None:
# If fails on connection we might not have this attribute yet
if not hasattr(self, "pgconn"):
return
if hasattr(self, "_pool"):
return
- warn(
- f"connection {self} was deleted while still open."
- " Please use 'with' or '.close()' to close the connection",
+ __warn(
+ f"{object.__repr__(self)} was deleted while still open."
+ " Please use 'with' or '.close()' to close the connection properly",
ResourceWarning,
)
from __future__ import annotations
from typing import TYPE_CHECKING, Any, overload
-from warnings import warn
from collections.abc import Iterable
from . import errors as e
)
ServerCursorMixin.__init__(self, name, scrollable, withhold)
- def __del__(self) -> None:
- if not self.closed:
- warn(
- f"the server-side cursor {self} was deleted while still open. Please use 'with' or '.close()' to close the cursor properly",
- ResourceWarning,
- )
-
def close(self) -> None:
"""
Close the current cursor and free associated resources.
from __future__ import annotations
from typing import TYPE_CHECKING, Any, overload
-from warnings import warn
from collections.abc import Iterable
from . import errors as e
)
ServerCursorMixin.__init__(self, name, scrollable, withhold)
- def __del__(self) -> None:
- if not self.closed:
- warn(
- f"the server-side cursor {self} was deleted while still open."
- " Please use 'with' or '.close()' to close the cursor properly",
- ResourceWarning,
- )
-
async def close(self) -> None:
"""
Close the current cursor and free associated resources.
from __future__ import annotations
+from typing import Any
+from warnings import warn
+
from . import errors as e
from . import pq, sql
from .abc import ConnectionType, Params, PQGen, Query
self._iter_rows: list[Row] | None = None
self._page_pos = 0
+ def __del__(self, __warn: Any = warn) -> None:
+ if self.closed:
+ return
+
+ __warn(
+ f"{object.__repr__(self)} was deleted while still open."
+ " Please use 'with' or '.close()' to close the cursor properly",
+ ResourceWarning,
+ )
+
def __repr__(self) -> str:
# Insert the name as the second word
parts = super().__repr__().split(None, 1)
from .misc import ConninfoOption, PGnotify, PGresAttDesc, _clean_error_message
from .misc import connection_summary
from ._enums import ConnStatus, ExecStatus, Format, Trace
-
-# Imported locally to call them from __del__ methods
-from ._pq_ctypes import PQcancelFinish, PQclear, PQfinish, PQfreeCancel, PQstatus
from .._encodings import pg2pyenc
if TYPE_CHECKING:
OK = ConnStatus.OK
+# note_on_del: functions called on __del__ are imported as local values to
+# avoid warnings on interpreter shutdown in case the module is gc'd before the
+# object is destroyed.
+
def version() -> int:
"""Return the version number of the libpq currently loaded.
self._procpid = getpid()
- def __del__(self) -> None:
+ def __del__(self, __getpid: Callable[[], int] = getpid) -> None:
# Close the connection only if it was created in this process,
# not if this object is being GC'd after fork.
- if getpid() == self._procpid:
+ if __getpid() == self._procpid: # see note_on_del
self.finish()
def __repr__(self) -> str:
def connect_poll(self) -> int:
return self._call_int(impl.PQconnectPoll)
- def finish(self) -> None:
+ def finish(self, __PQfinish: Any = impl.PQfinish) -> None:
self._pgconn_ptr, p = None, self._pgconn_ptr
if p:
- PQfinish(p)
+ __PQfinish(p)
@property
def pgconn_ptr(self) -> int | None:
return self._call_bytes(impl.PQoptions)
@property
- def status(self) -> int:
- return PQstatus(self._pgconn_ptr)
+ def status(self, __PQstatus: Any = impl.PQstatus) -> int:
+ return __PQstatus(self._pgconn_ptr)
@property
def transaction_status(self) -> int:
def __repr__(self) -> str:
cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
- status = ExecStatus(self.status)
- return f"<{cls} [{status.name}] at 0x{id(self):x}>"
+ try:
+ status = ExecStatus(self.status).name
+ except ValueError:
+ status = f"{self.status} (status unknown)"
+ return f"<{cls} [{status}] at 0x{id(self):x}>"
- def clear(self) -> None:
+ def clear(self, __PQclear: Any = impl.PQclear) -> None:
self._pgresult_ptr, p = None, self._pgresult_ptr
if p:
- PQclear(p)
+ __PQclear(p) # see note_on_del
@property
def pgresult_ptr(self) -> int | None:
self._ensure_pgcancelconn()
impl.PQcancelReset(self.pgcancelconn_ptr)
- def finish(self) -> None:
+ def finish(self, __PQcancelFinish: Any = impl.PQcancelFinish) -> None:
"""
Free the data structure created by `PQcancelCreate()`.
"""
self.pgcancelconn_ptr, p = None, self.pgcancelconn_ptr
if p:
- PQcancelFinish(p)
+ __PQcancelFinish(p) # see note_on_del
def _ensure_pgcancelconn(self) -> None:
if not self.pgcancelconn_ptr:
def __del__(self) -> None:
self.free()
- def free(self) -> None:
+ def free(self, __PQfreeCancel: Any = impl.PQfreeCancel) -> None:
"""
Free the data structure created by :pq:`PQgetCancel()`.
"""
self.pgcancel_ptr, p = None, self.pgcancel_ptr
if p:
- PQfreeCancel(p)
+ __PQfreeCancel(p) # see note_on_del
def cancel(self) -> None:
"""Requests that the server abandon processing of the current command.
conn = conn_cls.connect(dsn)
del conn
gc_collect()
- assert "IDLE" in str(recwarn.pop(ResourceWarning).message)
+ assert conn_cls.__name__ in str(recwarn.pop(ResourceWarning).message)
conn = conn_cls.connect(dsn)
conn.execute("select 1")
del conn
gc_collect()
- assert "INTRANS" in str(recwarn.pop(ResourceWarning).message)
+ assert conn_cls.__name__ in str(recwarn.pop(ResourceWarning).message)
conn = conn_cls.connect(dsn)
try:
pass
del conn
gc_collect()
- assert "INERROR" in str(recwarn.pop(ResourceWarning).message)
+ assert conn_cls.__name__ in str(recwarn.pop(ResourceWarning).message)
with conn_cls.connect(dsn) as conn:
pass
conn = await aconn_cls.connect(dsn)
del conn
gc_collect()
- assert "IDLE" in str(recwarn.pop(ResourceWarning).message)
+ assert aconn_cls.__name__ in str(recwarn.pop(ResourceWarning).message)
conn = await aconn_cls.connect(dsn)
await conn.execute("select 1")
del conn
gc_collect()
- assert "INTRANS" in str(recwarn.pop(ResourceWarning).message)
+ assert aconn_cls.__name__ in str(recwarn.pop(ResourceWarning).message)
conn = await aconn_cls.connect(dsn)
try:
pass
del conn
gc_collect()
- assert "INERROR" in str(recwarn.pop(ResourceWarning).message)
+ assert aconn_cls.__name__ in str(recwarn.pop(ResourceWarning).message)
async with await aconn_cls.connect(dsn) as conn:
pass
cur.execute("select generate_series(1, 10) as bar")
del cur
gc_collect()
- assert ".close()" in str(recwarn.pop(ResourceWarning).message)
+ msg = str(recwarn.pop(ResourceWarning).message)
+ assert conn.server_cursor_factory.__name__ in msg
+ assert ".close()" in msg
def test_execute_reuse(conn):
await cur.execute("select generate_series(1, 10) as bar")
del cur
gc_collect()
- assert ".close()" in str(recwarn.pop(ResourceWarning).message)
+ msg = str(recwarn.pop(ResourceWarning).message)
+ assert aconn.server_cursor_factory.__name__ in msg
+ assert ".close()" in msg
async def test_execute_reuse(aconn):