from . import errors as e
from . import waiting
from . import postgres
-from .pq import ConnStatus, TransactionStatus
+from .pq import ConnStatus
from .abc import AdaptContext, ConnectionType, Params, Query, RV
from .abc import PQGen, PQGenConn
from .sql import Composable, SQL
TUPLES_OK = pq.ExecStatus.TUPLES_OK
FATAL_ERROR = pq.ExecStatus.FATAL_ERROR
+IDLE = pq.TransactionStatus.IDLE
+INTRANS = pq.TransactionStatus.INTRANS
+
+
logger = logging.getLogger("psycopg")
def _check_intrans_gen(self, attribute: str) -> PQGen[None]:
# Raise an exception if we are in a transaction
status = self.pgconn.transaction_status
- if status == TransactionStatus.IDLE and self._pipeline:
+ if status == IDLE and self._pipeline:
yield from self._pipeline._sync_gen()
status = self.pgconn.transaction_status
- if status != TransactionStatus.IDLE:
+ if status != IDLE:
if self._num_transactions:
raise e.ProgrammingError(
f"can't change {attribute!r} now: "
raise e.ProgrammingError(
f"can't change {attribute!r} now: "
"connection in transaction status "
- f"{TransactionStatus(status).name}"
+ f"{pq.TransactionStatus(status).name}"
)
@property
if self._autocommit:
return
- if self.pgconn.transaction_status != TransactionStatus.IDLE:
+ if self.pgconn.transaction_status != IDLE:
return
yield from self._exec_command(self._get_tx_start_command())
raise e.ProgrammingError(
"commit() cannot be used during a two-phase transaction"
)
- if self.pgconn.transaction_status == TransactionStatus.IDLE:
+ if self.pgconn.transaction_status == IDLE:
return
yield from self._exec_command(b"COMMIT")
if self._pipeline and self.pgconn.pipeline_status == pq.PipelineStatus.ABORTED:
yield from self._pipeline._sync_gen()
- if self.pgconn.transaction_status == TransactionStatus.IDLE:
+ if self.pgconn.transaction_status == IDLE:
return
yield from self._exec_command(b"ROLLBACK")
if not isinstance(xid, Xid):
xid = Xid.from_string(xid)
- if self.pgconn.transaction_status != TransactionStatus.IDLE:
+ if self.pgconn.transaction_status != IDLE:
raise e.ProgrammingError(
"can't start two-phase transaction: connection in status"
- f" {TransactionStatus(self.pgconn.transaction_status).name}"
+ f" {pq.TransactionStatus(self.pgconn.transaction_status).name}"
)
if self._autocommit:
cur.execute(Xid._get_recover_query())
res = cur.fetchall()
- if (
- status == TransactionStatus.IDLE
- and self.info.transaction_status == TransactionStatus.INTRANS
- ):
+ if status == IDLE and self.info.transaction_status == INTRANS:
self.rollback()
return res
from . import pq
from . import errors as e
from . import waiting
-from .pq import TransactionStatus
from .abc import AdaptContext, Params, PQGen, PQGenConn, Query, RV
from ._tpc import Xid
from .rows import Row, AsyncRowFactory, tuple_row, TupleRow, args_row
TEXT = pq.Format.TEXT
BINARY = pq.Format.BINARY
+IDLE = pq.TransactionStatus.IDLE
+INTRANS = pq.TransactionStatus.INTRANS
+
logger = logging.getLogger("psycopg")
await cur.execute(Xid._get_recover_query())
res = await cur.fetchall()
- if (
- status == TransactionStatus.IDLE
- and self.info.transaction_status == TransactionStatus.INTRANS
- ):
+ if status == IDLE and self.info.transaction_status == INTRANS:
await self.rollback()
return res
COMMAND_OK = pq.ExecStatus.COMMAND_OK
+IDLE = pq.TransactionStatus.IDLE
+INTRANS = pq.TransactionStatus.INTRANS
+
class ServerCursorMixin(BaseCursor[ConnectionType, Row]):
"""Mixin to add ServerCursor behaviour and implementation a BaseCursor."""
ts = self._conn.pgconn.transaction_status
# if the connection is not in a sane state, don't even try
- if ts not in (pq.TransactionStatus.IDLE, pq.TransactionStatus.INTRANS):
+ if ts != IDLE and ts != INTRANS:
return
# If we are IDLE, a WITHOUT HOLD cursor will surely have gone already.
- if not self._withhold and ts == pq.TransactionStatus.IDLE:
+ if not self._withhold and ts == IDLE:
return
# if we didn't declare the cursor ourselves we still have to close it
from . import pq
from . import sql
from . import errors as e
-from .pq import TransactionStatus, ConnStatus
+from .pq import ConnStatus
from .abc import ConnectionType, PQGen
if TYPE_CHECKING:
from .connection import Connection
from .connection_async import AsyncConnection
+IDLE = pq.TransactionStatus.IDLE
+
logger = logging.getLogger(__name__)
Also set the internal state of the object and verify consistency.
"""
- self._outer_transaction = (
- self.pgconn.transaction_status == TransactionStatus.IDLE
- )
+ self._outer_transaction = self.pgconn.transaction_status == IDLE
if self._outer_transaction:
# outer transaction: if no name it's only a begin, else
# there will be an additional savepoint