from . import pq
from . import errors as e
-from .pq import ConnStatus
from .abc import PipelineCommand, PQGen
from ._compat import Deque, TypeAlias
from ._cmodule import _psycopg
FATAL_ERROR = pq.ExecStatus.FATAL_ERROR
PIPELINE_ABORTED = pq.ExecStatus.PIPELINE_ABORTED
+BAD = pq.ConnStatus.BAD
logger = logging.getLogger("psycopg")
def _exit(self) -> None:
self.level -= 1
- if self.level == 0 and self.pgconn.status != ConnStatus.BAD:
+ if self.level == 0 and self.pgconn.status != BAD:
self.pgconn.exit_pipeline_mode()
def _sync_gen(self) -> PQGen[None]:
from . import errors as e
from . import waiting
from . import postgres
-from .pq import ConnStatus
from .abc import AdaptContext, ConnectionType, Params, Query, RV
from .abc import PQGen, PQGenConn
from .sql import Composable, SQL
TEXT = pq.Format.TEXT
BINARY = pq.Format.BINARY
+OK = pq.ConnStatus.OK
+BAD = pq.ConnStatus.BAD
+
COMMAND_OK = pq.ExecStatus.COMMAND_OK
TUPLES_OK = pq.ExecStatus.TUPLES_OK
FATAL_ERROR = pq.ExecStatus.FATAL_ERROR
@property
def closed(self) -> bool:
"""`!True` if the connection is closed."""
- return self.pgconn.status == ConnStatus.BAD
+ return self.pgconn.status == BAD
@property
def broken(self) -> bool:
A broken connection is always `closed`, but wasn't closed in a clean
way, such as using `close()` or a ``with`` block.
"""
- return self.pgconn.status == ConnStatus.BAD and not self._closed
+ return self.pgconn.status == BAD and not self._closed
@property
def autocommit(self) -> bool:
return result
def _check_connection_ok(self) -> None:
- if self.pgconn.status == ConnStatus.OK:
+ if self.pgconn.status == OK:
return
- if self.pgconn.status == ConnStatus.BAD:
+ if self.pgconn.status == BAD:
raise e.OperationalError("the connection is closed")
raise e.InterfaceError(
"cannot execute operations: the connection is"
from . import pq
from . import errors as e
-from .pq import ConnStatus, PollingStatus
+from .pq import PollingStatus
from .abc import PipelineCommand, PQGen, PQGenConn
from .pq.abc import PGconn, PGresult
from .waiting import Wait, Ready
from ._compat import Deque
from ._encodings import pgconn_encoding, conninfo_encoding
+OK = pq.ConnStatus.OK
+BAD = pq.ConnStatus.BAD
+
COMMAND_OK = pq.ExecStatus.COMMAND_OK
COPY_OUT = pq.ExecStatus.COPY_OUT
COPY_IN = pq.ExecStatus.COPY_IN
"""
conn = pq.PGconn.connect_start(conninfo.encode())
while True:
- if conn.status == ConnStatus.BAD:
+ if conn.status == BAD:
encoding = conninfo_encoding(conninfo)
raise e.OperationalError(
f"connection is bad: {pq.error_message(conn, encoding=encoding)}",
from ._enums import ConnStatus, TransactionStatus, PipelineStatus
from .._encodings import pgconn_encoding
+OK = ConnStatus.OK
+
class PGnotify(NamedTuple):
relname: bytes
elif hasattr(obj, "error_message"):
# obj is a PGconn
obj = cast(PGconn, obj)
- if obj.status == ConnStatus.OK:
+ if obj.status == OK:
encoding = pgconn_encoding(obj)
bmsg = obj.error_message
Useful for __repr__
"""
parts = []
- if pgconn.status == ConnStatus.OK:
+ if pgconn.status == OK:
# Put together the [STATUS]
status = TransactionStatus(pgconn.transaction_status).name
if pgconn.pipeline_status:
from . import pq
from . import sql
from . import errors as e
-from .pq import ConnStatus
from .abc import ConnectionType, PQGen
if TYPE_CHECKING:
IDLE = pq.TransactionStatus.IDLE
+OK = pq.ConnStatus.OK
+
logger = logging.getLogger(__name__)
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> bool:
- if self.pgconn.status == ConnStatus.OK:
+ if self.pgconn.status == OK:
with self._conn.lock:
return self._conn.wait(self._exit_gen(exc_type, exc_val, exc_tb))
else:
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> bool:
- if self.pgconn.status == ConnStatus.OK:
+ if self.pgconn.status == OK:
async with self._conn.lock:
return await self._conn.wait(self._exit_gen(exc_type, exc_val, exc_tb))
else: