Add respective properties to read back the state.
.. automethod:: close
- .. note:: You can use :ref:`with connect(): ...<with-connection>` to
- close the connection automatically when the block is exited.
+ .. note::
+
+ You can use::
+
+ with psycopg.connect() as conn:
+ ...
+
+ to close the connection automatically when the block is exited.
+ See :ref:`with-connection`.
.. autoattribute:: closed
.. autoattribute:: broken
- .. method:: cursor(*, binary: bool = False, row_factory: Optional[RowFactory] = None) -> Cursor
- .. method:: cursor(name: str, *, binary: bool = False, row_factory: Optional[RowFactory] = None) -> ServerCursor
+ .. method:: cursor(*, binary: bool = False, \
+ row_factory: Optional[RowFactory] = None) -> Cursor
+ .. method:: cursor(name: str, *, binary: bool = False, \
+ row_factory: Optional[RowFactory] = None, \
+ scrollable: Optional[bool] = None, withhold: bool = False) -> ServerCursor
:noindex:
Return a new cursor to send commands and queries to the connection.
automatically when the block is exited, but be careful about
the async quirkness: see :ref:`async-with` for details.
- .. method:: cursor(*, binary: bool = False, row_factory: Optional[RowFactory] = None) -> AsyncCursor
- .. method:: cursor(name: str, *, binary: bool = False, row_factory: Optional[RowFactory] = None) -> AsyncServerCursor
+ .. method:: cursor(*, binary: bool = False, \
+ row_factory: Optional[RowFactory] = None) -> AsyncCursor
+ .. method:: cursor(name: str, *, binary: bool = False, \
+ row_factory: Optional[RowFactory] = None, \
+ scrollable: Optional[bool] = None, withhold: bool = False) -> AsyncServerCursor
:noindex:
.. note:: You can use ``async with conn.cursor() as cur: ...`` to
documented the differences:
.. autoattribute:: name
+ .. autoattribute:: scrollable
+
+ .. seealso:: The PostgreSQL DECLARE_ statement documetation
+ for the description of :sql:`[NO] SCROLL`.
+
+ .. autoattribute:: withhold
+
+ .. seealso:: The PostgreSQL DECLARE_ statement documetation
+ for the description of :sql:`{WITH|WITHOUT} HOLD`.
+
+ .. _DECLARE: https://www.postgresql.org/docs/current/sql-declare.html
+
.. automethod:: close
...` pattern is especially useful so that the cursor is closed at
the end of the block.
- .. automethod:: execute(query, params=None, *, scrollable=None, withhold=False) -> ServerCursor
+ .. automethod:: execute(query, params=None, *) -> ServerCursor
:param query: The query to execute.
:type query: `!str`, `!bytes`, or `sql.Composable`
:param params: The parameters to pass to the query, if any.
:type params: Sequence or Mapping
- :param scrollable: if `!True` make the cursor scrollable, if `!False`
- not. if `!None` leave the choice to the server.
- :type scrollable: `!Optional[bool]`
- :param withhold: if `!True` allow the cursor to be used after the
- transaction creating it has committed.
- :type withhold: `!bool`
Create a server cursor with given `name` and the *query* in argument.
If using :sql:`DECLARE` is not appropriate you can avoid to use
Using `!execute()` more than once will close the previous cursor and
open a new one with the same name.
- .. seealso:: The PostgreSQL DECLARE_ statement documetation describe
- in details all the parameters.
-
- .. _DECLARE: https://www.postgresql.org/docs/current/sql-declare.html
-
.. automethod:: executemany(query: Query, params_seq: Sequence[Args])
.. automethod:: fetchone
This method uses the MOVE_ SQL statement to move the current position
in the server-side cursor, which will affect following `!fetch*()`
operations. If you need to scroll backwards you should probably
- use `scrollable=True` in `execute()`.
+ call `~Connection.cursor()` using `scrollable=True`.
Note that PostgreSQL doesn't provide a reliable way to report when a
cursor moves out of bound, so the method might not raise `!IndexError`
.. note:: You can close the cursor automatically using :samp:`async
with conn.cursor({name}): ...`
- .. automethod:: execute(query, params=None, *, scrollable=None, withhold=False) -> AsyncServerCursor
+ .. automethod:: execute(query, params=None) -> AsyncServerCursor
.. automethod:: executemany(query: Query, params_seq: Sequence[Args])
.. automethod:: fetchone
.. automethod:: fetchmany
...
@overload
- def cursor(self, name: str, *, binary: bool = False) -> ServerCursor[Row]:
+ def cursor(
+ self,
+ name: str,
+ *,
+ binary: bool = False,
+ scrollable: Optional[bool] = None,
+ withhold: bool = False,
+ ) -> ServerCursor[Row]:
...
@overload
*,
binary: bool = False,
row_factory: RowFactory[CursorRow],
+ scrollable: Optional[bool] = None,
+ withhold: bool = False,
) -> ServerCursor[CursorRow]:
...
*,
binary: bool = False,
row_factory: Optional[RowFactory[Any]] = None,
+ scrollable: Optional[bool] = None,
+ withhold: bool = False,
) -> Union[Cursor[Any], ServerCursor[Any]]:
"""
Return a new cursor to send commands and queries to the connection.
cur: Union[Cursor[Any], ServerCursor[Any]]
if name:
cur = self.server_cursor_factory(
- self, name=name, row_factory=row_factory
+ self,
+ name=name,
+ row_factory=row_factory,
+ scrollable=scrollable,
+ withhold=withhold,
)
else:
cur = self.cursor_factory(self, row_factory=row_factory)
@overload
def cursor(
- self, name: str, *, binary: bool = False
+ self,
+ name: str,
+ *,
+ binary: bool = False,
+ scrollable: Optional[bool] = None,
+ withhold: bool = False,
) -> AsyncServerCursor[Row]:
...
*,
binary: bool = False,
row_factory: RowFactory[CursorRow],
+ scrollable: Optional[bool] = None,
+ withhold: bool = False,
) -> AsyncServerCursor[CursorRow]:
...
*,
binary: bool = False,
row_factory: Optional[RowFactory[Any]] = None,
+ scrollable: Optional[bool] = None,
+ withhold: bool = False,
) -> Union[AsyncCursor[Any], AsyncServerCursor[Any]]:
"""
Return a new `AsyncCursor` to send commands and queries to the connection.
cur: Union[AsyncCursor[Any], AsyncServerCursor[Any]]
if name:
cur = self.server_cursor_factory(
- self, name=name, row_factory=row_factory
+ self,
+ name=name,
+ row_factory=row_factory,
+ scrollable=scrollable,
+ withhold=withhold,
)
else:
cur = self.cursor_factory(self, row_factory=row_factory)
class ServerCursorHelper(Generic[ConnectionType, Row]):
- __slots__ = ("name", "described")
+ __slots__ = ("name", "scrollable", "withhold", "described")
"""Helper object for common ServerCursor code.
TODO: this should be a mixin, but couldn't find a way to work it
correctly with the generic.
"""
- def __init__(self, name: str):
+ def __init__(
+ self,
+ name: str,
+ scrollable: Optional[bool],
+ withhold: bool,
+ ):
self.name = name
+ self.scrollable = scrollable
+ self.withhold = withhold
self.described = False
def _repr(self, cur: BaseCursor[ConnectionType, Row]) -> str:
self,
cur: BaseCursor[ConnectionType, Row],
query: Query,
- scrollable: Optional[bool],
- withhold: bool,
) -> sql.Composable:
if isinstance(query, bytes):
sql.SQL("declare"),
sql.Identifier(self.name),
]
- if scrollable is not None:
- parts.append(sql.SQL("scroll" if scrollable else "no scroll"))
+ if self.scrollable is not None:
+ parts.append(sql.SQL("scroll" if self.scrollable else "no scroll"))
parts.append(sql.SQL("cursor"))
- if withhold:
+ if self.withhold:
parts.append(sql.SQL("with hold"))
parts.append(sql.SQL("for"))
parts.append(query)
name: str,
*,
row_factory: RowFactory[Row],
+ scrollable: Optional[bool] = None,
+ withhold: bool = False,
):
super().__init__(connection, row_factory=row_factory)
self._helper: ServerCursorHelper["Connection[Any]", Row]
- self._helper = ServerCursorHelper(name)
+ self._helper = ServerCursorHelper(name, scrollable, withhold)
self.itersize: int = DEFAULT_ITERSIZE
def __del__(self) -> None:
"""The name of the cursor."""
return self._helper.name
+ @property
+ def scrollable(self) -> Optional[bool]:
+ """
+ Whether the cursor is scrollable or not.
+
+ If `!None` leave the choice to the server. Use `!True` if you want to
+ use `scroll()` on the cursor.
+ """
+ return self._helper.scrollable
+
+ @property
+ def withhold(self) -> bool:
+ """
+ If the cursor can be used after the creating transaction has committed.
+ """
+ return self._helper.withhold
+
def close(self) -> None:
"""
Close the current cursor and free associated resources.
self,
query: Query,
params: Optional[Params] = None,
- *,
- scrollable: Optional[bool] = None,
- withhold: bool = False,
) -> "ServerCursor[Row]":
"""
Open a cursor to execute a query to the database.
"""
- query = self._helper._make_declare_statement(
- self, query, scrollable=scrollable, withhold=withhold
- )
+ query = self._helper._make_declare_statement(self, query)
with self._conn.lock:
self._conn.wait(self._helper._declare_gen(self, query, params))
return self
name: str,
*,
row_factory: RowFactory[Row],
+ scrollable: Optional[bool] = None,
+ withhold: bool = False,
):
super().__init__(connection, row_factory=row_factory)
self._helper: ServerCursorHelper["AsyncConnection[Any]", Row]
- self._helper = ServerCursorHelper(name)
+ self._helper = ServerCursorHelper(name, scrollable, withhold)
self.itersize: int = DEFAULT_ITERSIZE
def __del__(self) -> None:
def name(self) -> str:
return self._helper.name
+ @property
+ def scrollable(self) -> Optional[bool]:
+ return self._helper.scrollable
+
+ @property
+ def withhold(self) -> bool:
+ return self._helper.withhold
+
async def close(self) -> None:
async with self._conn.lock:
await self._conn.wait(self._helper._close_gen(self))
self,
query: Query,
params: Optional[Params] = None,
- *,
- scrollable: Optional[bool] = None,
- withhold: bool = False,
) -> "AsyncServerCursor[Row]":
- query = self._helper._make_declare_statement(
- self, query, scrollable=scrollable, withhold=withhold
- )
+ query = self._helper._make_declare_statement(self, query)
async with self._conn.lock:
await self._conn.wait(
self._helper._declare_gen(self, query, params)
n += 1
return lambda values: [n] + [-v for v in values]
- cur = conn.cursor("foo", row_factory=my_row_factory)
- cur.execute("select generate_series(1, 3) as x", scrollable=True)
+ cur = conn.cursor("foo", row_factory=my_row_factory, scrollable=True)
+ cur.execute("select generate_series(1, 3) as x")
rows = cur.fetchall()
cur.scroll(0, "absolute")
while 1:
assert ("fetch forward 2") in cmd.lower()
-def test_scroll(conn):
+def test_cant_scroll_by_default(conn):
cur = conn.cursor("tmp")
+ assert cur.scrollable is None
with pytest.raises(e.ProgrammingError):
cur.scroll(0)
- cur.execute("select generate_series(0,9)", scrollable=True)
+
+def test_scroll(conn):
+ cur = conn.cursor("tmp", scrollable=True)
+ cur.execute("select generate_series(0,9)")
cur.scroll(2)
assert cur.fetchone() == (2,)
cur.scroll(2)
def test_scrollable(conn):
- curs = conn.cursor("foo")
- curs.execute("select generate_series(0, 5)", scrollable=True)
+ curs = conn.cursor("foo", scrollable=True)
+ assert curs.scrollable is True
+ curs.execute("select generate_series(0, 5)")
curs.scroll(5)
for i in range(4, -1, -1):
curs.scroll(-1)
def test_non_scrollable(conn):
- curs = conn.cursor("foo")
- curs.execute("select generate_series(0, 5)", scrollable=False)
+ curs = conn.cursor("foo", scrollable=False)
+ assert curs.scrollable is False
+ curs.execute("select generate_series(0, 5)")
curs.scroll(5)
with pytest.raises(e.OperationalError):
curs.scroll(-1)
@pytest.mark.parametrize("kwargs", [{}, {"withhold": False}])
def test_no_hold(conn, kwargs):
with pytest.raises(e.InvalidCursorName):
- with conn.cursor("foo") as curs:
- curs.execute("select generate_series(0, 2)", **kwargs)
+ with conn.cursor("foo", **kwargs) as curs:
+ assert curs.withhold is False
+ curs.execute("select generate_series(0, 2)")
assert curs.fetchone() == (0,)
conn.commit()
curs.fetchone()
def test_hold(conn):
- with conn.cursor("foo") as curs:
- curs.execute("select generate_series(0, 5)", withhold=True)
+ with conn.cursor("foo", withhold=True) as curs:
+ assert curs.withhold is True
+ curs.execute("select generate_series(0, 5)")
assert curs.fetchone() == (0,)
conn.commit()
assert curs.fetchone() == (1,)
n += 1
return lambda values: [n] + [-v for v in values]
- cur = aconn.cursor("foo", row_factory=my_row_factory)
- await cur.execute("select generate_series(1, 3) as x", scrollable=True)
+ cur = aconn.cursor("foo", row_factory=my_row_factory, scrollable=True)
+ await cur.execute("select generate_series(1, 3) as x")
rows = await cur.fetchall()
await cur.scroll(0, "absolute")
while 1:
assert ("fetch forward 2") in cmd.lower()
-async def test_scroll(aconn):
+async def test_cant_scroll_by_default(aconn):
cur = aconn.cursor("tmp")
+ assert cur.scrollable is None
with pytest.raises(e.ProgrammingError):
await cur.scroll(0)
- await cur.execute("select generate_series(0,9)", scrollable=True)
+
+async def test_scroll(aconn):
+ cur = aconn.cursor("tmp", scrollable=True)
+ await cur.execute("select generate_series(0,9)")
await cur.scroll(2)
assert await cur.fetchone() == (2,)
await cur.scroll(2)
async def test_scrollable(aconn):
- curs = aconn.cursor("foo")
- await curs.execute("select generate_series(0, 5)", scrollable=True)
+ curs = aconn.cursor("foo", scrollable=True)
+ assert curs.scrollable is True
+ await curs.execute("select generate_series(0, 5)")
await curs.scroll(5)
for i in range(4, -1, -1):
await curs.scroll(-1)
async def test_non_scrollable(aconn):
- curs = aconn.cursor("foo")
- await curs.execute("select generate_series(0, 5)", scrollable=False)
+ curs = aconn.cursor("foo", scrollable=False)
+ assert curs.scrollable is False
+ await curs.execute("select generate_series(0, 5)")
await curs.scroll(5)
with pytest.raises(e.OperationalError):
await curs.scroll(-1)
@pytest.mark.parametrize("kwargs", [{}, {"withhold": False}])
async def test_no_hold(aconn, kwargs):
with pytest.raises(e.InvalidCursorName):
- async with aconn.cursor("foo") as curs:
- await curs.execute("select generate_series(0, 2)", **kwargs)
+ async with aconn.cursor("foo", **kwargs) as curs:
+ assert curs.withhold is False
+ await curs.execute("select generate_series(0, 2)")
assert await curs.fetchone() == (0,)
await aconn.commit()
await curs.fetchone()
async def test_hold(aconn):
- async with aconn.cursor("foo") as curs:
- await curs.execute("select generate_series(0, 5)", withhold=True)
+ async with aconn.cursor("foo", withhold=True) as curs:
+ assert curs.withhold is True
+ await curs.execute("select generate_series(0, 5)")
assert await curs.fetchone() == (0,)
await aconn.commit()
assert await curs.fetchone() == (1,)