affected. For instance, changing the global context will only change newly
created connections, not the ones already existing.
+ .. versionchanged:: 3.3
+
+ you can call `~AdaptersMap.register_loader()` on `!Cursor.adapters`
+ after a query has returned a result to change the loaders used.
+
.. _adapt-life-cycle:
.. versionchanged:: 3.1
added `!prepare_threshold` and `!cursor_factory` parameters.
+ .. attribute:: adapters
+ :type: ~adapt.AdaptersMap
+
+ The adapters configuration used to convert Python parameters and
+ PostgreSQL results for the queries executed on this cursor.
+
+ It affects all the cursors created by this connection afterwards.
+
.. automethod:: close
.. note::
The connection this cursor is using.
+ .. attribute:: adapters
+ :type: ~adapt.AdaptersMap
+
+ The adapters configuration used to convert Python parameters and
+ PostgreSQL results for the queries executed on this cursor.
+
+ .. versionchanged:: 3.3
+
+ reconfiguring loaders using `~adapt.AdaptersMap.register_loader()`
+ affects the results of a query already executed.
+
.. automethod:: close
.. note::
`~Cursor.execute()` with multiple statements (:tickets:`#1080, #1170`).
- Add :ref:`transaction-status` to report the status during and after a
`~Connection.transaction()` block (:ticket:`#969`).
+- Allow to change loaders using `~adapt.AdaptersMap.register_loader()` on
+ `Cursor.adapters` after a query result has been already returned
+ (:ticket:`#884`).
.. rubric:: New libpq wrapper features
from __future__ import annotations
from typing import TYPE_CHECKING, Any, cast
+from collections.abc import Callable
from . import errors as e
from . import pq
# Record if a dumper or loader has an optimised version.
_optimised: dict[type, type] = {}
+ # Callable to be called when register_loader() is called.
+ _register_loader_callback: Callable[[int, type[Loader]], None] | None = None
+
def __init__(
self, template: AdaptersMap | None = None, types: TypesRegistry | None = None
):
self._own_loaders[fmt] = True
self._loaders[fmt][oid] = loader
+ if self._register_loader_callback:
+ self._register_loader_callback(oid, loader)
def get_dumper(self, cls: type, format: PyFormat) -> type[Dumper]:
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Generic, NoReturn
+from weakref import ReferenceType, ref
from functools import partial
from collections.abc import Iterable, Sequence
from . import adapt
from . import errors as e
from . import pq
-from .abc import ConnectionType, Params, PQGen, Query
+from .abc import ConnectionType, Loader, Params, PQGen, Query
from .rows import Row, RowMaker
from ._column import Column
from ._compat import Template
self._last_query: Query | None = None
self._reset()
+ # Set up a callback to allow changing loaders on already returned result.
+ self._adapters._register_loader_callback = partial(
+ self._loaders_changed, ref(self)
+ )
+
def _reset(self, reset_query: bool = True) -> None:
self._results: list[PGresult] = []
self.pgresult: PGresult | None = None
for res in results:
self._rowcount += res.command_tuples or 0
+ @classmethod
+ def _loaders_changed(
+ cls, wself: ReferenceType[BaseCursor[Any, Any]], oid: int, loader: type[Loader]
+ ) -> None:
+ """Callback called when self.adapters.set_loaders is called.
+
+ Allow to change the loaders after the results have been returned already.
+ """
+ if not (self := wself()):
+ return
+
+ # Replace the transformer with a new one, restore the current state.
+ self._tx = adapt.Transformer(self)
+ pos = self._pos
+ if self._iresult < len(self._results):
+ self._select_current_result(self._iresult)
+ self._pos = pos
+
def _send_prepare(self, name: bytes, query: PostgresQuery) -> None:
if self._conn._pipeline:
self._conn._pipeline.command_queue.append(
"""
A context describing how types are adapted.
- Example of `~AdaptContext` are `~psycopg.Connection`, `~psycopg.Cursor`,
+ Example of `!AdaptContext` are `~psycopg.Connection`, `~psycopg.Cursor`,
`~psycopg.adapt.Transformer`, `~psycopg.adapt.AdaptersMap`.
Note that this is a `~typing.Protocol`, so objects implementing
from .utils import raiseif
from .acompat import gather, spawn
from .fix_crdb import crdb_encoding
+from .test_adapt import make_loader
from ._test_cursor import my_row_factory, ph
execmany = _test_cursor.execmany # avoid F811 underneath
assert ress == [[(j + 1,) for j in range(i)] for i in range(count)]
else:
assert ress == []
+
+
+def test_change_loader_results(conn):
+ cur = conn.cursor()
+ # With no result
+ cur.adapters.register_loader("text", make_loader("1"))
+
+ cur.execute(
+ """
+ values ('foo'::text);
+ values ('bar'::text), ('baz');
+ values ('qux'::text);
+ """
+ )
+ assert cur.fetchall() == [("foo1",)]
+
+ cur.nextset()
+ assert cur.fetchone() == ("bar1",)
+ cur.adapters.register_loader("text", make_loader("2"))
+ assert cur.fetchone() == ("baz2",)
+ cur.scroll(-2)
+ assert cur.fetchall() == [("bar2",), ("baz2",)]
+
+ cur.nextset()
+ assert cur.fetchall() == [("qux2",)]
+
+ # After the final result
+ assert not cur.nextset()
+ cur.adapters.register_loader("text", make_loader("3"))
+ assert cur.fetchone() is None
+ cur.set_result(0)
+ assert cur.fetchall() == [("foo3",)]
from .utils import raiseif
from .acompat import alist, gather, spawn
from .fix_crdb import crdb_encoding
+from .test_adapt import make_loader
from ._test_cursor import my_row_factory, ph
execmany = _test_cursor.execmany # avoid F811 underneath
assert ress == [[(j + 1,) for j in range(i)] for i in range(count)]
else:
assert ress == []
+
+
+async def test_change_loader_results(aconn):
+ cur = aconn.cursor()
+ # With no result
+ cur.adapters.register_loader("text", make_loader("1"))
+
+ await cur.execute(
+ """
+ values ('foo'::text);
+ values ('bar'::text), ('baz');
+ values ('qux'::text);
+ """
+ )
+ assert (await cur.fetchall()) == [("foo1",)]
+
+ cur.nextset()
+ assert (await cur.fetchone()) == ("bar1",)
+ cur.adapters.register_loader("text", make_loader("2"))
+ assert (await cur.fetchone()) == ("baz2",)
+ await cur.scroll(-2)
+ assert (await cur.fetchall()) == [("bar2",), ("baz2",)]
+
+ cur.nextset()
+ assert (await cur.fetchall()) == [("qux2",)]
+
+ # After the final result
+ assert not cur.nextset()
+ cur.adapters.register_loader("text", make_loader("3"))
+ assert (await cur.fetchone()) is None
+ await cur.set_result(0)
+ assert (await cur.fetchall()) == [("foo3",)]