--- /dev/null
+.. change::
+ :tags: orm, usecase
+ :tickets: 12960
+
+ Added :class:`_orm.DictBundle` as a subclass of :class:`_orm.Bundle`
+ that returns ``dict`` objects.
.. autoclass:: sqlalchemy.orm.Bundle
:members:
+.. autoclass:: sqlalchemy.orm.DictBundle
+
.. autofunction:: sqlalchemy.orm.with_loader_criteria
.. autofunction:: sqlalchemy.orm.join
and custom column groupings. :class:`_orm.Bundle` may also be subclassed in
order to return alternate data structures; see
:meth:`_orm.Bundle.create_row_processor` for an example.
+A dict-returning subclass :class:`_orm.DictBundle` is provided for convenience.
.. seealso::
:class:`_orm.Bundle`
+ :class:`_orm.DictBundle`
+
:meth:`_orm.Bundle.create_row_processor`
from .unitofwork import UOWTransaction as UOWTransaction
from .util import Bundle as Bundle
from .util import CascadeOptions as CascadeOptions
+from .util import DictBundle as DictBundle
from .util import LoaderCriteriaOption as LoaderCriteriaOption
from .util import object_mapper as object_mapper
from .util import polymorphic_union as polymorphic_union
import weakref
from . import attributes # noqa
-from . import exc
from . import exc as orm_exc
from ._typing import _O
from ._typing import insp_is_aliased_class
if class_manager is None or not class_manager.is_mapped:
return None
mapper = class_manager.mapper
- except exc.NO_STATE:
+ except orm_exc.NO_STATE:
return None
else:
return mapper
:ref:`bundles`
+ :class:`.DictBundle`
"""
def __init__(
self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
- ):
+ ) -> None:
r"""Construct a new :class:`.Bundle`.
e.g.::
for row in session.execute(select(bn)).where(bn.c.data1 == "d1"):
print(row.mybundle["data1"], row.mybundle["data2"])
+ The above example is available natively using :class:`.DictBundle`
+
+ .. seealso::
+
+ :class:`.DictBundle`
+
""" # noqa: E501
keyed_tuple = result_tuple(labels, [() for l in labels])
return proc
+class DictBundle(Bundle[_T]):
+ """Like :class:`.Bundle` but returns ``dict`` instances instead of
+ named tuple like objects::
+
+ bn = DictBundle("mybundle", MyClass.data1, MyClass.data2)
+ for row in session.execute(select(bn)).where(bn.c.data1 == "d1"):
+ print(row.mybundle["data1"], row.mybundle["data2"])
+
+ Differently from :class:`.Bundle`, multiple columns with the same name are
+ not supported.
+
+ .. versionadded:: 2.1
+
+ .. seealso::
+
+ :ref:`bundles`
+
+ :class:`.Bundle`
+ """
+
+ def __init__(
+ self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
+ ) -> None:
+ super().__init__(name, *exprs, **kw)
+ if len(set(self.c.keys())) != len(self.c):
+ raise sa_exc.ArgumentError(
+ "DictBundle does not support duplicate column names"
+ )
+
+ def create_row_processor(
+ self,
+ query: Select[Unpack[TupleAny]],
+ procs: Sequence[Callable[[Row[Unpack[TupleAny]]], Any]],
+ labels: Sequence[str],
+ ) -> Callable[[Row[Unpack[TupleAny]]], dict[str, Any]]:
+ def proc(row: Row[Unpack[TupleAny]]) -> dict[str, Any]:
+ return dict(zip(labels, (proc(row) for proc in procs)))
+
+ return proc
+
+
def _orm_full_deannotate(element: _SA) -> _SA:
return sql_util._deep_deannotate(element)
from sqlalchemy.orm import Bundle
from sqlalchemy.orm import column_property
from sqlalchemy.orm import DeclarativeBase
+from sqlalchemy.orm import DictBundle
from sqlalchemy.orm import immediateload
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import lazyload
"insert_type",
["bulk", ("values", testing.requires.multivalues_inserts), "single"],
)
- def test_insert_returning_bundle(self, decl_base, insert_type):
+ @testing.combinations(Bundle, DictBundle, argnames="kind")
+ def test_insert_returning_bundle(self, decl_base, insert_type, kind):
"""test #10776"""
class User(decl_base):
decl_base.metadata.create_all(testing.db)
insert_stmt = insert(User).returning(
- User.name, Bundle("mybundle", User.id, User.x, User.y)
+ User.name, kind("mybundle", User.id, User.x, User.y)
)
s = fixture_session()
insert_type.fail()
if insert_type.single:
- eq_(result.all(), [("some name 1", (1, 1, 2))])
+ exp = {Bundle: (1, 1, 2), DictBundle: {"id": 1, "x": 1, "y": 2}}
+ eq_(result.all(), [("some name 1", exp[kind])])
else:
- eq_(
- result.all(),
- [
+ if kind is Bundle:
+ exp = [
("some name 1", (1, 1, 2)),
("some name 2", (2, 2, 3)),
("some name 3", (3, 3, 4)),
- ],
- )
+ ]
+ else:
+ exp = [
+ ("some name 1", {"id": 1, "x": 1, "y": 2}),
+ ("some name 2", {"id": 2, "x": 2, "y": 3}),
+ ("some name 3", {"id": 3, "x": 3, "y": 4}),
+ ]
+
+ eq_(result.all(), exp)
@testing.variation(
"use_returning", [(True, testing.requires.insert_returning), False]
from sqlalchemy import tuple_
from sqlalchemy.orm import aliased
from sqlalchemy.orm import Bundle
+from sqlalchemy.orm import DictBundle
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
from sqlalchemy.sql.elements import ClauseList
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.assertions import expect_raises_message
from sqlalchemy.testing.fixtures import fixture_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
self.assert_compile(ClauseList(Data.id, Other.id), "data.id, other.id")
self.assert_compile(bundle.__clause_element__(), "data.id, other.id")
+ def test_dict_same_named_col_clauselist(self):
+ Data, Other = self.classes("Data", "Other")
+ with expect_raises_message(
+ exc.ArgumentError,
+ "DictBundle does not support duplicate column names",
+ ):
+ DictBundle("pk", Data.id, Other.id)
+
def test_same_named_col_in_orderby(self):
Data, Other = self.classes("Data", "Other")
bundle = Bundle("pk", Data.id, Other.id)
[((1, 1),), ((2, 2),)],
)
- def test_c_attr(self):
+ @testing.combinations(Bundle, DictBundle, argnames="kind")
+ def test_c_attr(self, kind):
Data = self.classes.Data
- b1 = Bundle("b1", Data.d1, Data.d2)
+ b1 = kind("b1", Data.d1, Data.d2)
self.assert_compile(
select(b1.c.d1, b1.c.d2), "SELECT data.d1, data.d2 FROM data"
Data = self.classes.Data
sess = fixture_session()
- class DictBundle(Bundle):
- def create_row_processor(self, query, procs, labels):
- def proc(row):
- return dict(zip(labels, (proc(row) for proc in procs)))
-
- return proc
-
if col_type.core:
data_table = self.tables.data
class MyBundle(Bundle):
def create_row_processor(self, query, procs, labels):
def proc(row):
- return dict(zip(labels, (proc(row) for proc in procs)))
+ return list(zip(labels, (proc(row) for proc in procs)))
return proc
eq_(
sess.query(b1).filter(b1.c.d1.between("d3d1", "d5d1")).all(),
[
- ({"d2": "d3d2", "d1": "d3d1"},),
- ({"d2": "d4d2", "d1": "d4d1"},),
- ({"d2": "d5d2", "d1": "d5d1"},),
+ ([("d1", "d3d1"), ("d2", "d3d2")],),
+ ([("d1", "d4d1"), ("d2", "d4d2")],),
+ ([("d1", "d5d1"), ("d2", "d5d2")],),
],
)