.. changelog::
:version: 1.0.14
+ .. change::
+ :tags: bug, sql
+ :tickets: 3724
+
+ :meth:`.FromClause.count` is pending deprecation for 1.1. This function
+ makes use of an arbitrary column in the table and is not reliable;
+ for Core use, ``func.count()`` should be preferred.
+
.. change::
:tags: bug, sql
:tickets: 3722
.. changelog::
:version: 1.1.0b1
+ .. change::
+ :tags: bug, sql
+ :tickets: 3724
+
+ :meth:`.FromClause.count` is deprecated. This function makes use of
+ an arbitrary column in the table and is not reliable; for Core use,
+ ``func.count()`` should be preferred.
+
.. change::
:tags: feature, postgresql
:pullreq: bitbucket:84
_memoized_property = util.group_expirable_memoized_property(["_columns"])
+ @util.deprecated(
+ '1.1',
+ message="``FromClause.count()`` is deprecated. Counting "
+ "rows requires that the correct column expression and "
+ "accommodations for joins, DISTINCT, etc. must be made, "
+ "otherwise results may not be what's expected. "
+ "Please use an appropriate ``func.count()`` expression "
+ "directly.")
@util.dependencies("sqlalchemy.sql.functions")
def count(self, functions, whereclause=None, **params):
"""return a SELECT COUNT generated against this
- :class:`.FromClause`."""
+ :class:`.FromClause`.
+
+ The function generates COUNT against the
+ first column in the primary key of the table, or against
+ the first column in the table overall. Explicit use of
+ ``func.count()`` should be preferred::
+
+ row_count = conn.scalar(
+ select([func.count('*')]).select_from(table)
+ )
+
+
+ .. seealso::
+
+ :data:`.func`
+
+ """
if self.primary_key:
col = list(self.primary_key)[0]
else:
return []
- @util.dependencies("sqlalchemy.sql.functions")
- def count(self, functions, whereclause=None, **params):
- """return a SELECT COUNT generated against this
- :class:`.TableClause`."""
-
- if self.primary_key:
- col = list(self.primary_key)[0]
- else:
- col = list(self.columns)[0]
- return Select(
- [functions.func.count(col).label('tbl_row_count')],
- whereclause,
- from_obj=[self],
- **params)
-
@util.dependencies("sqlalchemy.sql.dml")
def insert(self, dml, values=None, inline=False, **kwargs):
"""Generate an :func:`.insert` construct against this
from sqlalchemy import Integer, String, Table, Column, select, MetaData,\
update, delete, insert, extract, union, func, PrimaryKeyConstraint, \
UniqueConstraint, Index, Sequence, literal
-
+from sqlalchemy import testing
class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = mssql.dialect()
'(SELECT sometable.somecolumn FROM '
'sometable)')
+ @testing.uses_deprecated
def test_count(self):
t = table('sometable', column('somecolumn'))
self.assert_compile(t.count(),
result = engine.execute(s)
assert result.cursor.name
+ @testing.provide_metadata
def test_roundtrip(self):
+ md = self.metadata
+
engine = self._fixture(True)
- test_table = Table('test_table', MetaData(engine),
+ test_table = Table('test_table', md,
Column('id', Integer, primary_key=True),
Column('data', String(50)))
test_table.create(checkfirst=True)
- try:
- test_table.insert().execute(data='data1')
- nextid = engine.execute(Sequence('test_table_id_seq'))
- test_table.insert().execute(id=nextid, data='data2')
- eq_(test_table.select().execute().fetchall(), [(1, 'data1'
- ), (2, 'data2')])
- test_table.update().where(
- test_table.c.id == 2).values(
- data=test_table.c.data +
- ' updated').execute()
- eq_(test_table.select().execute().fetchall(),
- [(1, 'data1'), (2, 'data2 updated')])
- test_table.delete().execute()
- eq_(test_table.count().scalar(), 0)
- finally:
- test_table.drop(checkfirst=True)
+ test_table.insert().execute(data='data1')
+ nextid = engine.execute(Sequence('test_table_id_seq'))
+ test_table.insert().execute(id=nextid, data='data2')
+ eq_(test_table.select().execute().fetchall(), [(1, 'data1'
+ ), (2, 'data2')])
+ test_table.update().where(
+ test_table.c.id == 2).values(
+ data=test_table.c.data +
+ ' updated').execute()
+ eq_(test_table.select().execute().fetchall(),
+ [(1, 'data1'), (2, 'data2 updated')])
+ test_table.delete().execute()
+ eq_(select([func.count('*')]).select_from(test_table).scalar(), 0)
class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
def _assert_no_data(self):
eq_(
- testing.db.scalar(self.table.count()), 0
+ testing.db.scalar(
+ select([func.count('*')]).select_from(self.table)), 0
)
def _assert_fn(self, x, value=None):
sess.add(a)
sess.flush()
- assert user_roles.count().scalar() == 1
+ eq_(select([func.count('*')]).select_from(user_roles).scalar(), 1)
def test_two(self):
admins, users, roles, user_roles = (self.tables.admins,
a.password = 'sadmin'
sess.flush()
- assert user_roles.count().scalar() == 1
+ eq_(select([func.count('*')]).select_from(user_roles).scalar(), 1)
class PassiveDeletesTest(fixtures.MappedTest):
session.delete(c)
session.flush()
- eq_(people.count().scalar(), 0)
+ eq_(select([func.count('*')]).select_from(people).scalar(), 0)
test_roundtrip = function_named(
test_roundtrip, "test_%s%s%s_%s" % (
-from sqlalchemy import func, desc
+from sqlalchemy import func, desc, select
from sqlalchemy.orm import interfaces, create_session, joinedload, joinedload_all, \
subqueryload, subqueryload_all, aliased,\
class_mapper, with_polymorphic
all_employees[1:3])
self.assert_sql_count(testing.db, go, 3)
- eq_(sess.query(Person).with_polymorphic('*')
+ eq_(
+ select([func.count('*')]).select_from(
+ sess.query(Person).with_polymorphic('*')
.options(joinedload(Engineer.machines))
- .limit(2).offset(1).with_labels()
- .subquery().count().scalar(),
- 2)
+ .limit(2).offset(1).with_labels().subquery()
+ ).scalar(),
+ 2)
def test_get_one(self):
"""
contains_eager, joinedload, subqueryload, subqueryload_all,\
Session, aliased, with_polymorphic, joinedload_all
-from sqlalchemy import Integer, String, ForeignKey
+from sqlalchemy import Integer, String, ForeignKey, select, func
from sqlalchemy.engine import default
from sqlalchemy.testing import AssertsCompiledSQL, fixtures
)
# another way to check
- assert q.limit(1).with_labels().subquery().count().scalar() == 1
+ eq_(
+ select([func.count('*')]).select_from(
+ q.limit(1).with_labels().subquery()
+ ).scalar(),
+ 1
+ )
assert q.first() is c1
def test_subquery_load(self):
from sqlalchemy import testing
-from sqlalchemy import Integer, String, ForeignKey
+from sqlalchemy import Integer, String, ForeignKey, func, select
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, create_session
from sqlalchemy.testing import fixtures
item2.keywords.append(KeywordAssociation(Keyword('green'), 'green_assoc'))
sess.add_all((item1, item2))
sess.flush()
- eq_(item_keywords.count().scalar(), 3)
+ eq_(select([func.count('*')]).select_from(item_keywords).scalar(), 3)
sess.delete(item1)
sess.delete(item2)
sess.flush()
- eq_(item_keywords.count().scalar(), 0)
+ eq_(select([func.count('*')]).select_from(item_keywords).scalar(), 0)
import copy
from sqlalchemy.testing import assert_raises, assert_raises_message
-from sqlalchemy import Integer, String, ForeignKey, Sequence, \
- exc as sa_exc, util
+from sqlalchemy import Integer, String, ForeignKey, \
+ exc as sa_exc, util, select, func
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, create_session, \
sessionmaker, class_mapper, backref, Session, util as orm_util,\
sess.delete(u)
sess.flush()
- assert users.count().scalar() == 0
- assert orders.count().scalar() == 0
+ eq_(select([func.count('*')]).select_from(users).scalar(), 0)
+ eq_(select([func.count('*')]).select_from(orders).scalar(), 0)
def test_delete_unloaded_collections(self):
"""Unloaded collections are still included in a delete-cascade
sess.add(u)
sess.flush()
sess.expunge_all()
- assert addresses.count().scalar() == 2
- assert users.count().scalar() == 1
+ eq_(select([func.count('*')]).select_from(addresses).scalar(), 2)
+ eq_(select([func.count('*')]).select_from(users).scalar(), 1)
u = sess.query(User).get(u.id)
assert 'addresses' not in u.__dict__
sess.delete(u)
sess.flush()
- assert addresses.count().scalar() == 0
- assert users.count().scalar() == 0
+ eq_(select([func.count('*')]).select_from(addresses).scalar(), 0)
+ eq_(select([func.count('*')]).select_from(users).scalar(), 0)
def test_cascades_onlycollection(self):
"""Cascade only reaches instances that are still part of the
sess.add(u2)
sess.flush()
sess.expunge_all()
- assert users.count().scalar() == 1
- assert orders.count().scalar() == 1
+ eq_(select([func.count('*')]).select_from(users).scalar(), 1)
+ eq_(select([func.count('*')]).select_from(orders).scalar(), 1)
eq_(sess.query(User).all(),
[User(name='newuser',
orders=[Order(description='someorder')])])
Order(description='someotherorder')])
sess.add(u)
sess.flush()
- assert users.count().scalar() == 1
- assert orders.count().scalar() == 2
+ eq_(select([func.count('*')]).select_from(users).scalar(), 1)
+ eq_(select([func.count('*')]).select_from(orders).scalar(), 2)
del u.orders[0]
sess.delete(u)
sess.flush()
- assert users.count().scalar() == 0
- assert orders.count().scalar() == 0
+ eq_(select([func.count('*')]).select_from(users).scalar(), 0)
+ eq_(select([func.count('*')]).select_from(orders).scalar(), 0)
def test_collection_orphans(self):
User, users, orders, Order = (self.classes.User,
sess.add(u)
sess.flush()
- assert users.count().scalar() == 1
- assert orders.count().scalar() == 2
+ eq_(select([func.count('*')]).select_from(users).scalar(), 1)
+ eq_(select([func.count('*')]).select_from(orders).scalar(), 2)
u.orders[:] = []
sess.flush()
- assert users.count().scalar() == 1
- assert orders.count().scalar() == 0
+ eq_(select([func.count('*')]).select_from(users).scalar(), 1)
+ eq_(select([func.count('*')]).select_from(orders).scalar(), 0)
class O2MCascadeTest(fixtures.MappedTest):
run_inserts = None
Order(description='someotherorder')])
sess.add(u)
sess.flush()
- assert users.count().scalar() == 1
- assert orders.count().scalar() == 2
+ eq_(select([func.count('*')]).select_from(users).scalar(), 1)
+ eq_(select([func.count('*')]).select_from(orders).scalar(), 2)
del u.orders[0]
sess.delete(u)
sess.flush()
- assert users.count().scalar() == 0
- assert orders.count().scalar() == 1
+ eq_(select([func.count('*')]).select_from(users).scalar(), 0)
+ eq_(select([func.count('*')]).select_from(orders).scalar(), 1)
class O2OSingleParentTest(_fixtures.FixtureTest):
run_inserts = None
self.tables.extra)
sess = create_session()
- assert prefs.count().scalar() == 3
- assert extra.count().scalar() == 3
+ eq_(select([func.count('*')]).select_from(prefs).scalar(), 3)
+ eq_(select([func.count('*')]).select_from(extra).scalar(), 3)
jack = sess.query(User).filter_by(name="jack").one()
jack.pref = None
sess.flush()
- assert prefs.count().scalar() == 2
- assert extra.count().scalar() == 2
+ eq_(select([func.count('*')]).select_from(prefs).scalar(), 2)
+ eq_(select([func.count('*')]).select_from(extra).scalar(), 2)
def test_cascade_on_deleted(self):
"""test a bug introduced by r6711"""
assert p in sess
assert e in sess
sess.flush()
- assert prefs.count().scalar() == 2
- assert extra.count().scalar() == 2
+ eq_(select([func.count('*')]).select_from(prefs).scalar(), 2)
+ eq_(select([func.count('*')]).select_from(extra).scalar(), 2)
def test_pending_expunge(self):
Pref, User = self.classes.Pref, self.classes.User
a1.bs.remove(b1)
sess.flush()
- assert atob.count().scalar() ==0
- assert b.count().scalar() == 0
- assert a.count().scalar() == 1
+ eq_(select([func.count('*')]).select_from(atob).scalar(), 0)
+ eq_(select([func.count('*')]).select_from(b).scalar(), 0)
+ eq_(select([func.count('*')]).select_from(a).scalar(), 1)
def test_delete_orphan_dynamic(self):
a, A, B, b, atob = (self.tables.a,
a1.bs.remove(b1)
sess.flush()
- assert atob.count().scalar() == 0
- assert b.count().scalar() == 0
- assert a.count().scalar() == 1
+ eq_(select([func.count('*')]).select_from(atob).scalar(), 0)
+ eq_(select([func.count('*')]).select_from(b).scalar(), 0)
+ eq_(select([func.count('*')]).select_from(a).scalar(), 1)
def test_delete_orphan_cascades(self):
a, A, c, b, C, B, atob = (self.tables.a,
a1.bs.remove(b1)
sess.flush()
- assert atob.count().scalar() ==0
- assert b.count().scalar() == 0
- assert a.count().scalar() == 1
- assert c.count().scalar() == 0
+ eq_(select([func.count('*')]).select_from(atob).scalar(), 0)
+ eq_(select([func.count('*')]).select_from(b).scalar(), 0)
+ eq_(select([func.count('*')]).select_from(a).scalar(), 1)
+ eq_(select([func.count('*')]).select_from(c).scalar(), 0)
def test_cascade_delete(self):
a, A, B, b, atob = (self.tables.a,
sess.delete(a1)
sess.flush()
- assert atob.count().scalar() ==0
- assert b.count().scalar() == 0
- assert a.count().scalar() == 0
+ eq_(select([func.count('*')]).select_from(atob).scalar(), 0)
+ eq_(select([func.count('*')]).select_from(b).scalar(), 0)
+ eq_(select([func.count('*')]).select_from(a).scalar(), 0)
def test_single_parent_error(self):
a, A, B, b, atob = (self.tables.a,
)
sess.add(u)
sess.commit()
- eq_(testing.db.scalar(addresses.count(addresses.c.user_id == None)), 0)
- eq_(testing.db.scalar(addresses.count(addresses.c.user_id != None)), 6)
+ eq_(
+ testing.db.scalar(
+ select([func.count('*')]).where(addresses.c.user_id == None)),
+ 0)
+ eq_(
+ testing.db.scalar(
+ select([func.count('*')]).where(addresses.c.user_id != None)),
+ 6)
sess.delete(u)
if expected:
eq_(
testing.db.scalar(
- addresses.count(addresses.c.user_id == None)), 6)
+ select([func.count('*')]).where(
+ addresses.c.user_id == None
+ )
+ ),
+ 6
+ )
eq_(
testing.db.scalar(
- addresses.count(addresses.c.user_id != None)), 0)
+ select([func.count('*')]).where(
+ addresses.c.user_id != None
+ )
+ ),
+ 0
+ )
else:
- eq_(testing.db.scalar(addresses.count()), 0)
+ eq_(
+ testing.db.scalar(
+ select([func.count('*')]).select_from(addresses)
+ ),
+ 0)
def test_delete_nocascade(self):
self._test_delete_cascade(True)
from sqlalchemy.testing import assert_raises, assert_raises_message
import sqlalchemy as sa
from sqlalchemy import testing
-from sqlalchemy import MetaData, Integer, String, ForeignKey, func, util
+from sqlalchemy import MetaData, Integer, String, \
+ ForeignKey, func, util, select
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.engine import default
from sqlalchemy.orm import mapper, relationship, backref, \
sess.add(a)
sess.flush()
- eq_(addresses.count().scalar(), 6)
- eq_(email_bounces.count().scalar(), 5)
+ eq_(
+ select([func.count('*')]).select_from(addresses).scalar(), 6)
+ eq_(
+ select([func.count('*')]).select_from(email_bounces).scalar(), 5)
def test_mapping_to_outerjoin(self):
"""Mapping to an outer join with a nullable composite primary key."""
h1.h1s.append(H1())
s.flush()
- eq_(ht1.count().scalar(), 4)
+ eq_(
+ select([func.count('*')]).select_from(ht1)
+ .scalar(), 4)
h6 = H6()
h6.h1a = h1
import datetime
import sqlalchemy as sa
from sqlalchemy import testing
-from sqlalchemy import Integer, String, ForeignKey, MetaData, and_
+from sqlalchemy import Integer, String, ForeignKey, MetaData, and_, \
+ select, func
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, relation, \
backref, create_session, configure_mappers, \
sess.add(a)
sess.flush()
- assert t3.count().scalar() == 2
+ eq_(select([func.count('*')]).select_from(t3).scalar(), 2)
a.t2s.remove(c)
sess.flush()
- assert t3.count().scalar() == 1
+ eq_(select([func.count('*')]).select_from(t3).scalar(), 1)
class CustomOperatorTest(fixtures.MappedTest, AssertsCompiledSQL):
sess.commit()
sess.close()
engine2.dispose()
- assert users.count().scalar() == 1
- assert addresses.count().scalar() == 1
+ eq_(select([func.count('*')]).select_from(users).scalar(), 1)
+ eq_(select([func.count('*')]).select_from(addresses).scalar(), 1)
@testing.requires.independent_connections
def test_invalidate(self):
# because the identity map would switch it
u1 = s.query(User).filter_by(name='ed').one()
assert u1_state not in s.identity_map.all_states()
- assert s.scalar(users.count()) == 1
+
+ eq_(s.scalar(select([func.count('*')]).select_from(users)), 1)
s.delete(u1)
s.flush()
- assert s.scalar(users.count()) == 0
+ eq_(s.scalar(select([func.count('*')]).select_from(users)), 0)
s.commit()
def test_trans_deleted_cleared_on_rollback(self):
import sqlalchemy as sa
from sqlalchemy.util import u, ue, b
from sqlalchemy import Integer, String, ForeignKey, \
- literal_column, event, Boolean
-from sqlalchemy.testing import engines
+ literal_column, event, Boolean, select, func
from sqlalchemy import testing
from sqlalchemy.testing.schema import Table
from sqlalchemy.testing.schema import Column
session.add(p)
session.flush()
- p_count = people.count(people.c.person=='im the key').scalar()
+ p_count = select([func.count('*')]).where(
+ people.c.person=='im the key').scalar()
eq_(p_count, 1)
- eq_(peoplesites.count(peoplesites.c.person=='im the key').scalar(), 1)
+ eq_(select([func.count('*')]).where(peoplesites.c.person=='im the key').scalar(), 1)
class ClauseAttributesTest(fixtures.MappedTest):
session.flush()
session.expunge_all()
- assert myothertable.count().scalar() == 4
+ eq_(select([func.count('*')]).select_from(myothertable).scalar(), 4)
mc = session.query(MyClass).get(mc.id)
session.delete(mc)
session.flush()
- assert mytable.count().scalar() == 0
- assert myothertable.count().scalar() == 0
+ eq_(select([func.count('*')]).select_from(mytable).scalar(), 0)
+ eq_(select([func.count('*')]).select_from(myothertable).scalar(), 0)
@testing.emits_warning(r".*'passive_deletes' is normally configured on one-to-many")
def test_backwards_pd(self):
session.add(mco)
session.flush()
- assert mytable.count().scalar() == 1
- assert myothertable.count().scalar() == 1
+ eq_(select([func.count('*')]).select_from(mytable).scalar(), 1)
+ eq_(select([func.count('*')]).select_from(myothertable).scalar(), 1)
session.expire(mco, ['myclass'])
session.delete(mco)
session.flush()
# mytable wasn't deleted, is the point.
- assert mytable.count().scalar() == 1
- assert myothertable.count().scalar() == 0
+ eq_(select([func.count('*')]).select_from(mytable).scalar(), 1)
+ eq_(select([func.count('*')]).select_from(myothertable).scalar(), 0)
def test_aaa_m2o_emits_warning(self):
myothertable, MyClass, MyOtherClass, mytable = (self.tables.myothertable,
session.flush()
session.expunge_all()
- assert myothertable.count().scalar() == 4
+ eq_(select([func.count('*')]).select_from(myothertable).scalar(), 4)
mc = session.query(MyClass).get(mc.id)
session.delete(mc)
assert_raises(sa.exc.DBAPIError, session.flush)
session.flush()
session.expunge_all()
- assert myothertable.count().scalar() == 1
+ eq_(select([func.count('*')]).select_from(myothertable).scalar(), 1)
mc = session.query(MyClass).get(mc.id)
session.delete(mc)
u = session.query(User).get(u.id)
session.delete(u)
session.flush()
- assert users.count().scalar() == 0
- assert addresses.count().scalar() == 0
+ eq_(select([func.count('*')]).select_from(users).scalar(), 0)
+ eq_(select([func.count('*')]).select_from(addresses).scalar(), 0)
def test_batch_mode(self):
"""The 'batch=False' flag on mapper()"""
session.add(i)
session.flush()
- assert item_keywords.count().scalar() == 2
+ eq_(select([func.count('*')]).select_from(item_keywords).scalar(), 2)
i.keywords = []
session.flush()
- assert item_keywords.count().scalar() == 0
+ eq_(select([func.count('*')]).select_from(item_keywords).scalar(), 0)
def test_scalar(self):
"""sa.dependency won't delete an m2m relationship referencing None."""
session.add(i)
session.flush()
- assert assoc.count().scalar() == 2
+ eq_(select([func.count('*')]).select_from(assoc).scalar(), 2)
i.keywords = []
session.flush()
- assert assoc.count().scalar() == 0
+ eq_(select([func.count('*')]).select_from(assoc).scalar(), 0)
class BooleanColTest(fixtures.MappedTest):
from sqlalchemy import exc, util
from sqlalchemy.testing import fixtures, config
from sqlalchemy import Integer, String, ForeignKey, func, \
- literal, FetchedValue, text
+ literal, FetchedValue, text, select
from sqlalchemy.orm import mapper, relationship, backref, \
create_session, unitofwork, attributes,\
Session, exc as orm_exc
eq_(p1.id, 5)
sess.flush()
- eq_(sess.scalar(self.tables.parent.count()), 0)
+ eq_(
+ select([func.count('*')]).select_from(self.tables.parent).scalar(),
+ 0)
class RowswitchM2OTest(fixtures.MappedTest):
testing.db.execute(dataset_no_autoinc.insert())
eq_(
- testing.db.scalar(dataset_no_autoinc.count()), 1
+ testing.db.scalar(
+ select([func.count('*')]).select_from(dataset_no_autoinc)), 1
)
class MetaDataTest(fixtures.TestBase, ComparesTables):
- def test_metadata_connect(self):
- metadata = MetaData()
- t1 = Table('table1', metadata,
- Column('col1', Integer, primary_key=True),
- Column('col2', String(20)))
- metadata.bind = testing.db
- metadata.create_all()
- try:
- assert t1.count().scalar() == 0
- finally:
- metadata.drop_all()
-
def test_metadata_contains(self):
metadata = MetaData()
t1 = Table('t1', metadata, Column('x', Integer))
data = os.urandom(32)
binary_table.insert().execute(data=data)
eq_(
- binary_table.select().where(binary_table.c.data == data).alias().
- count().scalar(), 1)
+ select([func.count('*')]).select_from(binary_table).
+ where(binary_table.c.data == data).scalar(), 1)
@testing.requires.binary_literals
def test_literal_roundtrip(self):