--- /dev/null
+.. change::
+ :tags: usecase, engine, orm
+ :tickets: 6990
+
+ Added new methods :meth:`_orm.Session.scalars`,
+ :meth:`_engine.Connection.scalars`, :meth:`_asyncio.AsyncSession.scalars`
+ and :meth:`_asyncio.AsyncSession.stream_scalars`, which provide a short cut
+ to the use case of receiving a row-oriented :class:`_result.Result` object
+ and converting it to a :class:`_result.ScalarResult` object via the
+ :meth:`_engine.Result.scalars` method, to return a list of values rather
+ than a list of rows. The new methods are analogous to the long existing
+ :meth:`_orm.Session.scalar` and :meth:`_engine.Connection.scalar` methods
+ used to return a single value from the first row only. Pull request
+ courtesy Miguel Grinberg.
"""Executes and returns the first column of the first row.
The underlying result/cursor is closed after execution.
+
"""
return self.execute(object_, *multiparams, **params).scalar()
+ def scalars(self, object_, *multiparams, **params):
+ """Executes and returns a scalar result set, which yields scalar values
+ from the first column of each row.
+
+ This method is equivalent to calling :meth:`_engine.Connection.execute`
+ to receive a :class:`_result.Result` object, then invoking the
+ :meth:`_result.Result.scalars` method to produce a
+ :class:`_result.ScalarResult` instance.
+
+ :return: a :class:`_result.ScalarResult`
+
+ .. versionadded:: 1.4.24
+
+ """
+
+ return self.execute(object_, *multiparams, **params).scalars()
+
def execute(self, statement, *multiparams, **params):
r"""Executes a SQL statement construct and returns a
:class:`_engine.CursorResult`.
result = await self.execute(statement, parameters, execution_options)
return result.scalar()
+ async def scalars(
+ self,
+ statement,
+ parameters=None,
+ execution_options=util.EMPTY_DICT,
+ ):
+ r"""Executes a SQL statement construct and returns a scalar objects.
+
+ This method is shorthand for invoking the
+ :meth:`_engine.Result.scalars` method after invoking the
+ :meth:`_future.Connection.execute` method. Parameters are equivalent.
+
+ :return: a :class:`_engine.ScalarResult` object.
+
+ .. versionadded:: 1.4.24
+
+ """
+ result = await self.execute(statement, parameters, execution_options)
+ return result.scalars()
+
+ async def stream_scalars(
+ self,
+ statement,
+ parameters=None,
+ execution_options=util.EMPTY_DICT,
+ ):
+ r"""Executes a SQL statement and returns a streaming scalar result
+ object.
+
+ This method is shorthand for invoking the
+ :meth:`_engine.AsyncResult.scalars` method after invoking the
+ :meth:`_future.Connection.stream` method. Parameters are equivalent.
+
+ :return: an :class:`_asyncio.AsyncScalarResult` object.
+
+ .. versionadded:: 1.4.24
+
+ """
+ result = await self.stream(statement, parameters, execution_options)
+ return result.scalars()
+
async def run_sync(self, fn, *arg, **kw):
"""Invoke the given sync callable passing self as the first argument.
)
return result.scalar()
+ async def scalars(
+ self,
+ statement,
+ params=None,
+ execution_options=util.EMPTY_DICT,
+ bind_arguments=None,
+ **kw
+ ):
+ """Execute a statement and return scalar results.
+
+ :return: a :class:`_result.ScalarResult` object
+
+ .. versionadded:: 1.4.24
+
+ .. seealso::
+
+ :meth:`_orm.Session.scalars` - main documentation for scalars
+
+ :meth:`_asyncio.AsyncSession.stream_scalars` - streaming version
+
+ """
+
+ result = await self.execute(
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw
+ )
+ return result.scalars()
+
async def get(
self,
entity,
)
return _result.AsyncResult(result)
+ async def stream_scalars(
+ self,
+ statement,
+ params=None,
+ execution_options=util.EMPTY_DICT,
+ bind_arguments=None,
+ **kw
+ ):
+ """Execute a statement and return a stream of scalar results.
+
+ :return: an :class:`_asyncio.AsyncScalarResult` object
+
+ .. versionadded:: 1.4.24
+
+ .. seealso::
+
+ :meth:`_orm.Session.scalars` - main documentation for scalars
+
+ :meth:`_asyncio.AsyncSession.scalars` - non streaming version
+
+ """
+
+ result = await self.stream(
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw
+ )
+ return result.scalars()
+
async def delete(self, instance):
"""Mark an instance as deleted.
**kw
).scalar()
+ def scalars(
+ self,
+ statement,
+ params=None,
+ execution_options=util.EMPTY_DICT,
+ bind_arguments=None,
+ **kw
+ ):
+ """Execute a statement and return the results as scalars.
+
+ Usage and parameters are the same as that of
+ :meth:`_orm.Session.execute`; the return result is a
+ :class:`_result.ScalarResult` filtering object which
+ will return single elements rather than :class:`_row.Row` objects.
+
+ :return: a :class:`_result.ScalarResult` object
+
+ .. versionadded:: 1.4.24
+
+ """
+
+ return self.execute(
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw
+ ).scalars()
+
def close(self):
"""Close out the transactional resources and ORM objects used by this
:class:`_orm.Session`.
eq_(conn.scalar(select(1)), 1)
eng.dispose()
+ def test_scalar(self, connection):
+ conn = connection
+ users = self.tables.users
+ conn.execute(
+ users.insert(),
+ [
+ {"user_id": 1, "user_name": "sandy"},
+ {"user_id": 2, "user_name": "spongebob"},
+ ],
+ )
+ res = conn.scalar(select(users.c.user_name).order_by(users.c.user_id))
+ eq_(res, "sandy")
+
+ def test_scalars(self, connection):
+ conn = connection
+ users = self.tables.users
+ conn.execute(
+ users.insert(),
+ [
+ {"user_id": 1, "user_name": "sandy"},
+ {"user_id": 2, "user_name": "spongebob"},
+ ],
+ )
+ res = conn.scalars(select(users.c.user_name).order_by(users.c.user_id))
+ eq_(res.all(), ["sandy", "spongebob"])
+
class UnicodeReturnsTest(fixtures.TestBase):
@testing.requires.python3
):
await result.one()
+ @testing.combinations(
+ ("scalars",), ("stream_scalars",), argnames="filter_"
+ )
+ @async_test
+ async def test_scalars(self, async_engine, filter_):
+ users = self.tables.users
+ async with async_engine.connect() as conn:
+ if filter_ == "scalars":
+ result = (await conn.scalars(select(users))).all()
+ elif filter_ == "stream_scalars":
+ result = await (await conn.stream_scalars(select(users))).all()
+
+ eq_(result, list(range(1, 20)))
+
class TextSyncDBAPI(fixtures.TestBase):
def test_sync_dbapi_raises(self):
result = await async_session.scalar(stmt)
eq_(result, 7)
+ @testing.combinations(
+ ("scalars",), ("stream_scalars",), argnames="filter_"
+ )
+ @async_test
+ async def test_scalars(self, async_session, filter_):
+ User = self.classes.User
+
+ stmt = (
+ select(User)
+ .options(selectinload(User.addresses))
+ .order_by(User.id)
+ )
+
+ if filter_ == "scalars":
+ result = (await async_session.scalars(stmt)).all()
+ elif filter_ == "stream_scalars":
+ result = await (await async_session.stream_scalars(stmt)).all()
+ eq_(result, self.static.user_address_result)
+
@async_test
async def test_get(self, async_session):
User = self.classes.User
stmt = select(b1).filter(b1.c.x.between("d3d1", "d5d1"))
eq_(
- sess.execute(stmt).scalars().all(),
+ sess.scalars(stmt).all(),
[("d3d1", "d3d2"), ("d4d1", "d4d2"), ("d5d1", "d5d2")],
)
stmt = select(b1).filter(b1.c.d1.between("d3d1", "d5d1"))
eq_(
- sess.execute(stmt).scalars().all(),
+ sess.scalars(stmt).all(),
[("d3d1", "d3d2"), ("d4d1", "d4d2"), ("d5d1", "d5d2")],
)
class SessionInterface(fixtures.TestBase):
"""Bogus args to Session methods produce actionable exceptions."""
- _class_methods = set(("connection", "execute", "get_bind", "scalar"))
+ _class_methods = set(
+ ("connection", "execute", "get_bind", "scalar", "scalars")
+ )
def _public_session_methods(self):
Session = sa.orm.session.Session
"scalar", text("SELECT 1"), bind_arguments=dict(mapper=user_arg)
)
+ raises_(
+ "scalars", text("SELECT 1"), bind_arguments=dict(mapper=user_arg)
+ )
+
eq_(
watchdog,
self._class_methods,