from sqlalchemy.test.testing import assert_raises, assert_raises_message
-from sqlalchemy import Integer, String, ForeignKey, Sequence, exc as sa_exc
+from sqlalchemy import Integer, String, ForeignKey, Sequence, \
+ exc as sa_exc
from sqlalchemy.test.schema import Table, Column
-from sqlalchemy.orm import mapper, relationship, create_session, sessionmaker, class_mapper, backref
+from sqlalchemy.orm import mapper, relationship, create_session, \
+ sessionmaker, class_mapper, backref
from sqlalchemy.orm import attributes, exc as orm_exc
from sqlalchemy.test import testing
from sqlalchemy.test.testing import eq_
@testing.resolve_artifact_names
def setup_mappers(cls):
mapper(Address, addresses)
- mapper(User, users, properties = dict(
- addresses = relationship(Address, cascade="all, delete-orphan", backref="user"),
- orders = relationship(
- mapper(Order, orders), cascade="all, delete-orphan", order_by=orders.c.id)
- ))
- mapper(Dingaling,dingalings, properties={
- 'address':relationship(Address)
- })
+ mapper(User, users,
+ properties=dict(addresses=relationship(Address,
+ cascade='all, delete-orphan', backref='user'),
+ orders=relationship(mapper(Order, orders),
+ cascade='all, delete-orphan', order_by=orders.c.id)))
+ mapper(Dingaling, dingalings, properties={'address'
+ : relationship(Address)})
@testing.resolve_artifact_names
def test_list_assignment(self):
@testing.resolve_artifact_names
def test_save_update_sends_pending(self):
- """test that newly added and deleted collection items are cascaded on save-update"""
-
+ """test that newly added and deleted collection items are
+ cascaded on save-update"""
+
sess = sessionmaker(expire_on_commit=False)()
- o1, o2, o3 = Order(description='o1'), Order(description='o2'), Order(description='o3')
+ o1, o2, o3 = Order(description='o1'), Order(description='o2'), \
+ Order(description='o3')
u = User(name='jack', orders=[o1, o2])
sess.add(u)
sess.commit()
sess.close()
-
u.orders.append(o3)
u.orders.remove(o1)
-
sess.add(u)
assert o1 in sess
assert o2 in sess
@testing.resolve_artifact_names
def test_delete_unloaded_collections(self):
- """Unloaded collections are still included in a delete-cascade by default."""
+ """Unloaded collections are still included in a delete-cascade
+ by default."""
sess = create_session()
u = User(name='jack',
addresses=[Address(email_address="address1"),
@testing.resolve_artifact_names
def test_cascades_onlycollection(self):
- """Cascade only reaches instances that are still part of the collection,
- not those that have been removed"""
+ """Cascade only reaches instances that are still part of the
+ collection, not those that have been removed"""
sess = create_session()
u = User(name='jack',
@testing.resolve_artifact_names
def test_cascade_nosideeffects(self):
- """test that cascade leaves the state of unloaded scalars/collections unchanged."""
+ """test that cascade leaves the state of unloaded
+ scalars/collections unchanged."""
sess = create_session()
u = User(name='jack')
@testing.resolve_artifact_names
def setup_mappers(cls):
mapper(Address, addresses)
- mapper(User, users, properties = {
- 'address':relationship(Address, backref=backref("user", single_parent=True), uselist=False)
- })
+ mapper(User, users, properties={'address'
+ : relationship(Address, backref=backref('user',
+ single_parent=True), uselist=False)})
@testing.resolve_artifact_names
def test_single_parent_raise(self):
a1 = Address(email_address='some address')
u1 = User(name='u1', address=a1)
-
- assert_raises(sa_exc.InvalidRequestError, Address, email_address='asd', user=u1)
-
+ assert_raises(sa_exc.InvalidRequestError, Address,
+ email_address='asd', user=u1)
a2 = Address(email_address='asd')
u1.address = a2
assert u1.address is not a1
@classmethod
@testing.resolve_artifact_names
def setup_mappers(cls):
- mapper(User, users, properties = dict(
- orders = relationship(
- mapper(Order, orders), cascade="all, delete-orphan", backref="user")
- ))
+ mapper(User, users,
+ properties=dict(orders=relationship(mapper(Order,
+ orders), cascade='all, delete-orphan', backref='user')))
@testing.resolve_artifact_names
def test_lazyload_bug(self):
@testing.resolve_artifact_names
def test_unidirectional_cascade_m2m(self):
- mapper(Item, items, properties={
- 'keywords':relationship(Keyword, secondary=item_keywords, cascade="none", backref="items")
- })
+ mapper(Item, items, properties={'keywords'
+ : relationship(Keyword, secondary=item_keywords,
+ cascade='none', backref='items')})
mapper(Keyword, keywords)
sess = create_session()
class M2OCascadeTest(_base.MappedTest):
+
@classmethod
def define_tables(cls, metadata):
- Table("extra", metadata,
- Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
- Column("prefs_id", Integer, ForeignKey("prefs.id")))
-
- Table('prefs', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('data', String(40)))
-
- Table('users', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Table('extra', metadata, Column('id', Integer,
+ primary_key=True, test_needs_autoincrement=True),
+ Column('prefs_id', Integer, ForeignKey('prefs.id')))
+ Table('prefs', metadata, Column('id', Integer,
+ primary_key=True, test_needs_autoincrement=True),
+ Column('data', String(40)))
+ Table(
+ 'users',
+ metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('name', String(40)),
Column('pref_id', Integer, ForeignKey('prefs.id')),
- Column('foo_id', Integer, ForeignKey('foo.id'))
+ Column('foo_id', Integer, ForeignKey('foo.id')),
)
+ Table('foo', metadata, Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True), Column('data',
+ String(40)))
- Table('foo', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('data', String(40))
- )
@classmethod
def setup_classes(cls):
class User(_fixtures.Base):
@testing.resolve_artifact_names
def setup_mappers(cls):
mapper(Extra, extra)
- mapper(Pref, prefs, properties=dict(
- extra = relationship(Extra, cascade="all, delete")
- ))
- mapper(User, users, properties = dict(
- pref = relationship(Pref, lazy='joined', cascade="all, delete-orphan", single_parent=True ),
- foo = relationship(Foo) # straight m2o
- ))
+ mapper(Pref, prefs, properties=dict(extra=relationship(Extra,
+ cascade='all, delete')))
+ mapper(User, users, properties=dict(pref=relationship(Pref,
+ lazy='joined', cascade='all, delete-orphan',
+ single_parent=True), foo=relationship(Foo))) # straight m2o
mapper(Foo, foo)
@classmethod
@testing.resolve_artifact_names
def test_save_update_sends_pending(self):
- """test that newly added and deleted scalar items are cascaded on save-update"""
+ """test that newly added and deleted scalar items are cascaded
+ on save-update"""
sess = sessionmaker(expire_on_commit=False)()
p1, p2 = Pref(data='p1'), Pref(data='p2')
[Pref(data="pref 1"), Pref(data="pref 3"), Pref(data="newpref")])
class M2OCascadeDeleteTest(_base.MappedTest):
+
@classmethod
def define_tables(cls, metadata):
- Table('t1', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('data', String(50)),
+ Table('t1', metadata, Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('data',String(50)),
Column('t2id', Integer, ForeignKey('t2.id')))
- Table('t2', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('data', String(50)),
+
+ Table('t2', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('data',String(50)),
Column('t3id', Integer, ForeignKey('t3.id')))
- Table('t3', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+
+ Table('t3', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(50)))
@classmethod
@classmethod
def define_tables(cls, metadata):
Table('t1', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(50)),
Column('t2id', Integer, ForeignKey('t2.id')))
+
Table('t2', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(50)),
Column('t3id', Integer, ForeignKey('t3.id')))
+
Table('t3', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(50)))
@classmethod
@classmethod
@testing.resolve_artifact_names
def setup_mappers(cls):
- mapper(T1, t1, properties=dict(
- t2=relationship(T2, cascade="all, delete-orphan", single_parent=True)))
- mapper(T2, t2, properties=dict(
- t3=relationship(T3, cascade="all, delete-orphan", single_parent=True, backref=backref('t2', uselist=False))))
+ mapper(T1, t1, properties=dict(t2=relationship(T2,
+ cascade='all, delete-orphan', single_parent=True)))
+ mapper(T2, t2, properties=dict(t3=relationship(T3,
+ cascade='all, delete-orphan', single_parent=True,
+ backref=backref('t2', uselist=False))))
mapper(T3, t3)
@testing.resolve_artifact_names
@classmethod
def define_tables(cls, metadata):
Table('a', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(30)),
test_needs_fk=True
)
Table('b', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(30)),
test_needs_fk=True
)
Table('c', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(30)),
Column('bid', Integer, ForeignKey('b.id')),
test_needs_fk=True
@testing.resolve_artifact_names
def test_delete_orphan(self):
- mapper(A, a, properties={
- # if no backref here, delete-orphan failed until [ticket:427] was
- # fixed
- 'bs': relationship(B, secondary=atob, cascade="all, delete-orphan", single_parent=True)
- })
+
+ # if no backref here, delete-orphan failed until [ticket:427]
+ # was fixed
+
+ mapper(A, a, properties={'bs': relationship(B, secondary=atob,
+ cascade='all, delete-orphan', single_parent=True)})
mapper(B, b)
sess = create_session()
@testing.resolve_artifact_names
def test_delete_orphan_dynamic(self):
- mapper(A, a, properties={
- # if no backref here, delete-orphan failed until [ticket:427] was
- # fixed
- 'bs': relationship(B, secondary=atob,
- cascade="all, delete-orphan", single_parent=True,lazy="dynamic")
- })
+ mapper(A, a, properties={'bs': relationship(B, secondary=atob,
+ cascade='all, delete-orphan', single_parent=True,
+ lazy='dynamic')}) # if no backref here, delete-orphan
+ # failed until [ticket:427] was fixed
mapper(B, b)
sess = create_session()
mapper(A, a, properties={
# if no backref here, delete-orphan failed until [ticket:427] was
# fixed
- 'bs':relationship(B, secondary=atob, cascade="all, delete-orphan", single_parent=True)
+ 'bs':relationship(B, secondary=atob, cascade="all, delete-orphan",
+ single_parent=True)
})
- mapper(B, b, properties={'cs':relationship(C, cascade="all, delete-orphan")})
+ mapper(B, b, properties={'cs':
+ relationship(C, cascade="all, delete-orphan")})
mapper(C, c)
sess = create_session()
@testing.resolve_artifact_names
def test_cascade_delete(self):
mapper(A, a, properties={
- 'bs':relationship(B, secondary=atob, cascade="all, delete-orphan", single_parent=True)
+ 'bs':relationship(B, secondary=atob, cascade="all, delete-orphan",
+ single_parent=True)
})
mapper(B, b)
@testing.resolve_artifact_names
def test_single_parent_raise(self):
mapper(A, a, properties={
- 'bs':relationship(B, secondary=atob, cascade="all, delete-orphan", single_parent=True)
+ 'bs':relationship(B, secondary=atob, cascade="all, delete-orphan",
+ single_parent=True)
})
mapper(B, b)
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
- Column('user_id', Integer,primary_key=True, test_needs_autoincrement=True),
+ Column('user_id', Integer,primary_key=True,
+ test_needs_autoincrement=True),
Column('name', String(40)))
Table('addresses', metadata,
- Column('address_id', Integer,primary_key=True, test_needs_autoincrement=True),
+ Column('address_id', Integer,primary_key=True,
+ test_needs_autoincrement=True),
Column('user_id', Integer, ForeignKey('users.user_id')),
Column('email_address', String(40)))
@testing.resolve_artifact_names
def test_pending_standalone_orphan(self):
- """An entity that never had a parent on a delete-orphan cascade can't be saved."""
+ """An entity that never had a parent on a delete-orphan cascade
+ can't be saved."""
mapper(Address, addresses)
mapper(User, users, properties=dict(
- addresses=relationship(Address, cascade="all,delete-orphan", backref="user")
+ addresses=relationship(Address, cascade="all,delete-orphan",
+ backref="user")
))
s = create_session()
a = Address()
@testing.resolve_artifact_names
def test_pending_collection_expunge(self):
- """Removing a pending item from a collection expunges it from the session."""
+ """Removing a pending item from a collection expunges it from
+ the session."""
mapper(Address, addresses)
mapper(User, users, properties=dict(
- addresses=relationship(Address, cascade="all,delete-orphan", backref="user")
+ addresses=relationship(Address, cascade="all,delete-orphan",
+ backref="user")
))
s = create_session()
def test_nonorphans_ok(self):
mapper(Address, addresses)
mapper(User, users, properties=dict(
- addresses=relationship(Address, cascade="all,delete", backref="user")
+ addresses=relationship(Address, cascade="all,delete",
+ backref="user")
))
s = create_session()
u = User(name='u1', addresses=[Address(email_address='ad1')])
@classmethod
def define_tables(cls, meta):
Table('orders', meta,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('name', String(50)))
Table('items', meta,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('order_id', Integer, ForeignKey('orders.id'),
nullable=False),
Column('name', String(50)))
Table('attributes', meta,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('item_id', Integer, ForeignKey('items.id'),
nullable=False),
Column('name', String(50)))
mapper(Attribute, attributes)
mapper(Item, items, properties=dict(
- attributes=relationship(Attribute, cascade="all,delete-orphan", backref="item")
+ attributes=relationship(Attribute, cascade="all,delete-orphan",
+ backref="item")
))
mapper(Order, orders, properties=dict(
- items=relationship(Item, cascade="all,delete-orphan", backref="order")
+ items=relationship(Item, cascade="all,delete-orphan",
+ backref="order")
))
s = create_session()
@classmethod
def define_tables(cls, meta):
Table('sales_reps', meta,
- Column('sales_rep_id', Integer,primary_key=True, test_needs_autoincrement=True),
+ Column('sales_rep_id', Integer,primary_key=True,
+ test_needs_autoincrement=True),
Column('name', String(50)))
Table('accounts', meta,
- Column('account_id', Integer,primary_key=True, test_needs_autoincrement=True),
+ Column('account_id', Integer,primary_key=True,
+ test_needs_autoincrement=True),
Column('balance', Integer))
Table('customers', meta,
- Column('customer_id', Integer,primary_key=True, test_needs_autoincrement=True),
+ Column('customer_id', Integer,primary_key=True,
+ test_needs_autoincrement=True),
Column('name', String(50)),
Column('sales_rep_id', Integer,
ForeignKey('sales_reps.sales_rep_id')),
@testing.resolve_artifact_names
def test_double_parent_expunge_o2m(self):
- """test the delete-orphan uow event for multiple delete-orphan parent relationships."""
+ """test the delete-orphan uow event for multiple delete-orphan
+ parent relationships."""
class Customer(_fixtures.Base):
pass
assert c in s, "Should not expunge customer yet, still has one parent"
sr.customers.remove(c)
- assert c not in s, "Should expunge customer when both parents are gone"
+ assert c not in s, \
+ 'Should expunge customer when both parents are gone'
@testing.resolve_artifact_names
def test_double_parent_expunge_o2o(self):
- """test the delete-orphan uow event for multiple delete-orphan parent relationships."""
+ """test the delete-orphan uow event for multiple delete-orphan
+ parent relationships."""
class Customer(_fixtures.Base):
pass
assert c in s, "Should not expunge customer yet, still has one parent"
sr.customer = None
- assert c not in s, "Should expunge customer when both parents are gone"
+ assert c not in s, \
+ 'Should expunge customer when both parents are gone'
class DoubleParentOrphanTest(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('addresses', metadata,
- Column('address_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('address_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('street', String(30)),
)
Table('homes', metadata,
- Column('home_id', Integer, primary_key=True, key="id", test_needs_autoincrement=True),
+ Column('home_id', Integer, primary_key=True, key="id",
+ test_needs_autoincrement=True),
Column('description', String(30)),
Column('address_id', Integer, ForeignKey('addresses.address_id'),
nullable=False),
)
Table('businesses', metadata,
- Column('business_id', Integer, primary_key=True, key="id", test_needs_autoincrement=True),
+ Column('business_id', Integer, primary_key=True, key="id",
+ test_needs_autoincrement=True),
Column('description', String(30), key="description"),
Column('address_id', Integer, ForeignKey('addresses.address_id'),
nullable=False),
@testing.resolve_artifact_names
def test_non_orphan(self):
- """test that an entity can have two parent delete-orphan cascades, and persists normally."""
+ """test that an entity can have two parent delete-orphan
+ cascades, and persists normally."""
class Address(_fixtures.Base):
pass
pass
mapper(Address, addresses)
- mapper(Home, homes, properties={'address':relationship(Address, cascade="all,delete-orphan", single_parent=True)})
- mapper(Business, businesses, properties={'address':relationship(Address, cascade="all,delete-orphan", single_parent=True)})
+ mapper(Home, homes, properties={'address'
+ : relationship(Address, cascade='all,delete-orphan',
+ single_parent=True)})
+ mapper(Business, businesses, properties={'address'
+ : relationship(Address, cascade='all,delete-orphan',
+ single_parent=True)})
session = create_session()
h1 = Home(description='home1', address=Address(street='address1'))
- b1 = Business(description='business1', address=Address(street='address2'))
+ b1 = Business(description='business1',
+ address=Address(street='address2'))
session.add_all((h1,b1))
session.flush()
session.expunge_all()
- eq_(session.query(Home).get(h1.id), Home(description='home1', address=Address(street='address1')))
- eq_(session.query(Business).get(b1.id), Business(description='business1', address=Address(street='address2')))
+ eq_(session.query(Home).get(h1.id), Home(description='home1',
+ address=Address(street='address1')))
+ eq_(session.query(Business).get(b1.id),
+ Business(description='business1',
+ address=Address(street='address2')))
@testing.resolve_artifact_names
def test_orphan(self):
- """test that an entity can have two parent delete-orphan cascades, and is detected as an orphan
- when saved without a parent."""
+ """test that an entity can have two parent delete-orphan
+ cascades, and is detected as an orphan when saved without a
+ parent."""
class Address(_fixtures.Base):
pass
+
class Home(_fixtures.Base):
pass
+
class Business(_fixtures.Base):
pass
mapper(Address, addresses)
- mapper(Home, homes, properties={'address':relationship(Address, cascade="all,delete-orphan", single_parent=True)})
- mapper(Business, businesses, properties={'address':relationship(Address, cascade="all,delete-orphan", single_parent=True)})
-
+ mapper(Home, homes, properties={'address'
+ : relationship(Address, cascade='all,delete-orphan',
+ single_parent=True)})
+ mapper(Business, businesses, properties={'address'
+ : relationship(Address, cascade='all,delete-orphan',
+ single_parent=True)})
session = create_session()
a1 = Address()
session.add(a1)
class CollectionAssignmentOrphanTest(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
- Table('table_a', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Table('table_a', metadata,
+ Column('id', Integer,
+ primary_key=True, test_needs_autoincrement=True),
Column('name', String(30)))
- Table('table_b', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('name', String(30)),
+ Table('table_b', metadata,
+ Column('id', Integer,
+ primary_key=True, test_needs_autoincrement=True),
+ Column('name', String(30)),
Column('a_id', Integer, ForeignKey('table_a.id')))
@testing.resolve_artifact_names
@classmethod
def define_tables(cls, metadata):
Table("parent", metadata,
- Column("id", Integer, primary_key=True, test_needs_autoincrement=True)
+ Column("id", Integer, primary_key=True,
+ test_needs_autoincrement=True)
)
Table("child", metadata,
- Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
- Column('parent_id', Integer, ForeignKey('parent.id'), nullable=False)
+ Column("id", Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('parent_id', Integer, ForeignKey('parent.id'),
+ nullable=False)
)
@classmethod
@testing.resolve_artifact_names
def test_o2o_delorphan_delete_old(self):
mapper(Parent, parent, properties={
- 'child':relationship(Child, uselist=False, cascade="all, delete, delete-orphan")
+ 'child':relationship(Child, uselist=False,
+ cascade="all, delete, delete-orphan")
})
mapper(Child, child)
self._do_delete_old_test()
@classmethod
def define_tables(cls, metadata):
Table("base", metadata,
- Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
+ Column("id", Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column("descr", String(50))
)
Table("noninh_child", metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('base_id', Integer, ForeignKey('base.id'))
)
-from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
+from sqlalchemy.test.testing import eq_, assert_raises, \
+ assert_raises_message
from sqlalchemy.test.util import gc_collect
import inspect
import pickle
-from sqlalchemy.orm import create_session, sessionmaker, attributes, make_transient
+from sqlalchemy.orm import create_session, sessionmaker, attributes, \
+ make_transient
from sqlalchemy.orm.attributes import instance_state
import sqlalchemy as sa
from sqlalchemy.test import engines, testing, config
[User(id=1, name='ed')])
# test expression binding
- sess.execute(users_unbound.insert(), params=dict(id=2, name='jack'))
- eq_(sess.execute(users_unbound.select(users_unbound.c.id == 2)).fetchall(),
- [(2, 'jack')])
+
+ sess.execute(users_unbound.insert(), params=dict(id=2,
+ name='jack'))
+ eq_(sess.execute(users_unbound.select(users_unbound.c.id
+ == 2)).fetchall(), [(2, 'jack')])
eq_(sess.execute(users_unbound.select(User.id == 2)).fetchall(),
[(2, 'jack')])
[User(id=1, name='ed')])
sess.execute(users_unbound.insert(), params=dict(id=2, name='jack'))
- eq_(sess.execute(users_unbound.select(users_unbound.c.id == 2)).fetchall(),
- [(2, 'jack')])
+
+ eq_(sess.execute(users_unbound.select(users_unbound.c.id
+ == 2)).fetchall(), [(2, 'jack')])
eq_(sess.execute(users_unbound.select(User.id == 2)).fetchall(),
[(2, 'jack')])
assert conn2.execute("select count(1) from users").scalar() == 0
sess.commit()
assert conn1.execute("select count(1) from users").scalar() == 1
- assert testing.db.connect().execute("select count(1) from users").scalar() == 1
+
+ assert testing.db.connect().execute('select count(1) from users'
+ ).scalar() == 1
sess.close()
@testing.requires.independent_connections
eq_(q.one(), Address(email_address='foo'))
+
@testing.requires.independent_connections
@engines.close_open_connections
@testing.resolve_artifact_names
def test_autoflush_unbound(self):
mapper(User, users)
-
try:
sess = create_session(autocommit=False, autoflush=True)
u = User()
- u.name='ed'
+ u.name = 'ed'
sess.add(u)
u2 = sess.query(User).filter_by(name='ed').one()
assert u2 is u
- assert sess.execute("select count(1) from users", mapper=User).scalar() == 1
- assert testing.db.connect().execute("select count(1) from users").scalar() == 0
+ assert sess.execute('select count(1) from users',
+ mapper=User).scalar() == 1
+ assert testing.db.connect().execute('select count(1) from '
+ 'users').scalar() == 0
sess.commit()
- assert sess.execute("select count(1) from users", mapper=User).scalar() == 1
- assert testing.db.connect().execute("select count(1) from users").scalar() == 1
+ assert sess.execute('select count(1) from users',
+ mapper=User).scalar() == 1
+ assert testing.db.connect().execute('select count(1) from '
+ 'users').scalar() == 1
sess.close()
except:
sess.rollback()
mapper(User, users)
conn1 = testing.db.connect()
conn2 = testing.db.connect()
-
- sess = create_session(bind=conn1, autocommit=False, autoflush=True)
+ sess = create_session(bind=conn1, autocommit=False,
+ autoflush=True)
u = User()
- u.name='ed'
+ u.name = 'ed'
sess.add(u)
sess.commit()
- assert conn1.execute("select count(1) from users").scalar() == 1
- assert testing.db.connect().execute("select count(1) from users").scalar() == 1
+ assert conn1.execute('select count(1) from users').scalar() == 1
+ assert testing.db.connect().execute('select count(1) from users'
+ ).scalar() == 1
sess.commit()
@testing.resolve_artifact_names
@testing.resolve_artifact_names
def test_heavy_nesting(self):
session = create_session(bind=testing.db)
-
session.begin()
- session.connection().execute(users.insert().values(name='user1'))
-
+ session.connection().execute(users.insert().values(name='user1'
+ ))
session.begin(subtransactions=True)
-
session.begin_nested()
-
- session.connection().execute(users.insert().values(name='user2'))
- assert session.connection().execute("select count(1) from users").scalar() == 2
-
+ session.connection().execute(users.insert().values(name='user2'
+ ))
+ assert session.connection().execute('select count(1) from users'
+ ).scalar() == 2
session.rollback()
- assert session.connection().execute("select count(1) from users").scalar() == 1
- session.connection().execute(users.insert().values(name='user3'))
-
+ assert session.connection().execute('select count(1) from users'
+ ).scalar() == 1
+ session.connection().execute(users.insert().values(name='user3'
+ ))
session.commit()
- assert session.connection().execute("select count(1) from users").scalar() == 2
+ assert session.connection().execute('select count(1) from users'
+ ).scalar() == 2
@testing.fails_on('sqlite', 'FIXME: unknown')
@testing.resolve_artifact_names
@testing.resolve_artifact_names
def test_error_on_using_inactive_session(self):
mapper(User, users)
-
sess = create_session(autocommit=True)
-
sess.begin()
sess.begin(subtransactions=True)
-
sess.add(User(name='u1'))
sess.flush()
-
sess.rollback()
- assert_raises_message(sa.exc.InvalidRequestError, "inactive due to a rollback in a subtransaction", sess.begin, subtransactions=True)
+ assert_raises_message(sa.exc.InvalidRequestError,
+ 'inactive due to a rollback in a '
+ 'subtransaction', sess.begin,
+ subtransactions=True)
sess.close()
@testing.resolve_artifact_names
def test_no_autocommit_with_explicit_commit(self):
mapper(User, users)
session = create_session(autocommit=False)
-
session.add(User(name='ed'))
session.transaction.commit()
- assert session.transaction is not None, "autocommit=False should start a new transaction"
+ assert session.transaction is not None, \
+ 'autocommit=False should start a new transaction'
@engines.close_open_connections
@testing.resolve_artifact_names
u = User(name='u1')
sess.add(u)
sess.flush()
- assert transaction._connection_for_bind(testing.db) is transaction._connection_for_bind(c) is c
-
- assert_raises_message(sa.exc.InvalidRequestError, "Session already has a Connection associated", transaction._connection_for_bind, testing.db.connect())
-
+ assert transaction._connection_for_bind(testing.db) \
+ is transaction._connection_for_bind(c) is c
+
+ assert_raises_message(sa.exc.InvalidRequestError,
+ 'Session already has a Connection '
+ 'associated',
+ transaction._connection_for_bind,
+ testing.db.connect())
transaction.rollback()
assert len(sess.query(User).all()) == 0
sess.close()
user = User(name='u1')
- assert_raises_message(sa.exc.InvalidRequestError, "is not persisted", s.delete, user)
+ assert_raises_message(sa.exc.InvalidRequestError,
+ 'is not persisted', s.delete, user)
s.add(user)
s.flush()
assert user not in s.dirty
s2 = create_session()
- assert_raises_message(sa.exc.InvalidRequestError, "is already attached to session", s2.delete, user)
-
+ assert_raises_message(sa.exc.InvalidRequestError,
+ 'is already attached to session',
+ s2.delete, user)
u2 = s2.query(User).get(user.id)
- assert_raises_message(sa.exc.InvalidRequestError, "another instance with key", s.delete, u2)
-
+ assert_raises_message(sa.exc.InvalidRequestError,
+ 'another instance with key', s.delete, u2)
s.expire(user)
s.expunge(user)
assert user not in s
@testing.resolve_artifact_names
def test_weak_ref(self):
- """test the weak-referencing identity map, which strongly-references modified items."""
+ """test the weak-referencing identity map, which strongly-
+ references modified items."""
s = create_session()
mapper(User, users)
s.expunge(u2)
s.identity_map.add(sa.orm.attributes.instance_state(u1))
- assert_raises(AssertionError, s.identity_map.add, sa.orm.attributes.instance_state(u2))
+ assert_raises(AssertionError, s.identity_map.add,
+ sa.orm.attributes.instance_state(u2))
@testing.resolve_artifact_names
log.append('after_begin')
def after_attach(self, session, instance):
log.append('after_attach')
- def after_bulk_update(self, session, query, query_context, result):
+ def after_bulk_update(
+ self,
+ session,
+ query,
+ query_context,
+ result,
+ ):
log.append('after_bulk_update')
- def after_bulk_delete(self, session, query, query_context, result):
+
+ def after_bulk_delete(
+ self,
+ session,
+ query,
+ query_context,
+ result,
+ ):
log.append('after_bulk_delete')
sess = create_session(extension = MyExt())
u = User(name='u1')
sess.add(u)
sess.flush()
- assert log == ['after_attach', 'before_flush', 'after_begin', 'after_flush', 'before_commit', 'after_commit', 'after_flush_postexec']
-
+ assert log == [
+ 'after_attach',
+ 'before_flush',
+ 'after_begin',
+ 'after_flush',
+ 'before_commit',
+ 'after_commit',
+ 'after_flush_postexec',
+ ]
log = []
sess = create_session(autocommit=False, extension=MyExt())
u = User(name='u1')
sess.add(u)
sess.flush()
- assert log == ['after_attach', 'before_flush', 'after_begin', 'after_flush', 'after_flush_postexec']
-
+ assert log == ['after_attach', 'before_flush', 'after_begin',
+ 'after_flush', 'after_flush_postexec']
log = []
u.name = 'ed'
sess.commit()
- assert log == ['before_commit', 'before_flush', 'after_flush', 'after_flush_postexec', 'after_commit']
-
+ assert log == ['before_commit', 'before_flush', 'after_flush',
+ 'after_flush_postexec', 'after_commit']
log = []
sess.commit()
assert log == ['before_commit', 'after_commit']
-
log = []
sess.query(User).delete()
assert log == ['after_begin', 'after_bulk_delete']
-
log = []
sess.query(User).update({'name': 'foo'})
assert log == ['after_bulk_update']
-
log = []
- sess = create_session(autocommit=False, extension=MyExt(), bind=testing.db)
+ sess = create_session(autocommit=False, extension=MyExt(),
+ bind=testing.db)
conn = sess.connection()
assert log == ['after_begin']
session.add(User(name='another %s' % obj.name))
for obj in list(session.deleted):
if isinstance(obj, User):
- x = session.query(User).filter(User.name=='another %s' % obj.name).one()
+ x = session.query(User).filter(User.name
+ == 'another %s' % obj.name).one()
session.delete(x)
sess = create_session(extension = MyExt(), autoflush=True)
sess = create_session(extension=MyExt())
sess.add(User(name='foo'))
- assert_raises_message(sa.exc.InvalidRequestError, "already flushing", sess.flush)
+ assert_raises_message(sa.exc.InvalidRequestError,
+ 'already flushing', sess.flush)
@testing.resolve_artifact_names
def test_pickled_update(self):
mapper(User, users)
sess1 = create_session()
sess2 = create_session()
-
u1 = User(name='u1')
sess1.add(u1)
-
- assert_raises_message(sa.exc.InvalidRequestError, "already attached to session", sess2.add, u1)
-
+ assert_raises_message(sa.exc.InvalidRequestError,
+ 'already attached to session', sess2.add,
+ u1)
u2 = pickle.loads(pickle.dumps(u1))
-
sess2.add(u2)
@testing.resolve_artifact_names
@classmethod
def define_tables(cls, metadata):
global t1
- t1 = Table('t1', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('data', String(50))
- )
+ t1 = Table('t1', metadata, Column('id', Integer,
+ primary_key=True, test_needs_autoincrement=True),
+ Column('data', String(50)))
@classmethod
def setup_mappers(cls):
super(DisposedStates, self).teardown()
def _set_imap_in_disposal(self, sess, *objs):
- """remove selected objects from the given session, as though they
- were dereferenced and removed from WeakIdentityMap.
+ """remove selected objects from the given session, as though
+ they were dereferenced and removed from WeakIdentityMap.
- Hardcodes the identity map's "all_states()" method to return the full list
- of states. This simulates the all_states() method returning results, afterwhich
- some of the states get garbage collected (this normally only happens during
- asynchronous gc). The Session now has one or more
- InstanceState's which have been removed from the identity map and disposed.
+ Hardcodes the identity map's "all_states()" method to return the
+ full list of states. This simulates the all_states() method
+ returning results, afterwhich some of the states get garbage
+ collected (this normally only happens during asynchronous gc).
+ The Session now has one or more InstanceState's which have been
+ removed from the identity map and disposed.
Will the Session not trip over this ??? Stay tuned.
"""
+
all_states = sess.identity_map.all_states()
- sess.identity_map.all_states = lambda: all_states
+ sess.identity_map.all_states = lambda : all_states
for obj in objs:
state = attributes.instance_state(obj)
sess.identity_map.remove(state)
global sess
sess = create_session(**kwargs)
- data = o1, o2, o3, o4, o5 = [T('t1'), T('t2'), T('t3'), T('t4'), T('t5')]
+ data = o1, o2, o3, o4, o5 = [T('t1'), T('t2'), T('t3'), T('t4'
+ ), T('t5')]
sess.add_all(data)
return ok
def _map_it(self, cls):
- return mapper(cls, Table('t', sa.MetaData(),
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True)))
+ return mapper(cls, Table('t', sa.MetaData(), Column('id',
+ Integer, primary_key=True,
+ test_needs_autoincrement=True)))
@testing.uses_deprecated()
def _test_instance_guards(self, user_arg):
raises_('refresh', user_arg)
- instance_methods = self._public_session_methods() - self._class_methods
+ instance_methods = self._public_session_methods() \
+ - self._class_methods
eq_(watchdog, instance_methods,
watchdog.symmetric_difference(instance_methods))
@classmethod
def define_tables(cls, metadata):
- Table('users', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('name', String(20)),
- test_needs_acid=True)
+ Table('users', metadata, Column('id', Integer,
+ primary_key=True, test_needs_autoincrement=True),
+ Column('name', String(20)), test_needs_acid=True)
@classmethod
def setup_classes(cls):
"""Test various algorithmic properties of selectables."""
-from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
+from sqlalchemy.test.testing import eq_, assert_raises, \
+ assert_raises_message
from sqlalchemy import *
from sqlalchemy.test import *
from sqlalchemy.sql import util as sql_util, visitors
Column('coly', Integer),
)
+
class SelectableTest(TestBase, AssertsExecutionResults):
+
def test_distance_on_labels(self):
+
# same column three times
- s = select([table1.c.col1.label('c2'), table1.c.col1, table1.c.col1.label('c1')])
- # didnt do this yet...col.label().make_proxy() has same "distance" as col.make_proxy() so far
- #assert s.corresponding_column(table1.c.col1) is s.c.col1
+ s = select([table1.c.col1.label('c2'), table1.c.col1,
+ table1.c.col1.label('c1')])
+
+ # didnt do this yet...col.label().make_proxy() has same
+ # "distance" as col.make_proxy() so far assert
+ # s.corresponding_column(table1.c.col1) is s.c.col1
+
assert s.corresponding_column(s.c.col1) is s.c.col1
assert s.corresponding_column(s.c.c1) is s.c.c1
def test_distance_on_aliases(self):
a1 = table1.alias('a1')
-
- for s in (
- select([a1, table1], use_labels=True),
- select([table1, a1], use_labels=True)
- ):
- assert s.corresponding_column(table1.c.col1) is s.c.table1_col1
+ for s in (select([a1, table1], use_labels=True),
+ select([table1, a1], use_labels=True)):
+ assert s.corresponding_column(table1.c.col1) \
+ is s.c.table1_col1
assert s.corresponding_column(a1.c.col1) is s.c.a1_col1
-
def test_join_against_self(self):
jj = select([table1.c.col1.label('bar_col1')])
- jjj = join(table1, jj, table1.c.col1==jj.c.bar_col1)
+ jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1)
# test column directly agaisnt itself
- assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
+ assert jjj.corresponding_column(jjj.c.table1_col1) \
+ is jjj.c.table1_col1
assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1
# test alias of the join
+
j2 = jjj.alias('foo')
- assert j2.corresponding_column(table1.c.col1) is j2.c.table1_col1
+ assert j2.corresponding_column(table1.c.col1) \
+ is j2.c.table1_col1
def test_against_cloned_non_table(self):
# test that corresponding column digs across
def test_select_on_table(self):
sel = select([table1, table2], use_labels=True)
- assert sel.corresponding_column(table1.c.col1) is sel.c.table1_col1
- assert sel.corresponding_column(table1.c.col1, require_embedded=True) is sel.c.table1_col1
- assert table1.corresponding_column(sel.c.table1_col1) is table1.c.col1
- assert table1.corresponding_column(sel.c.table1_col1, require_embedded=True) is None
- def test_join_against_join(self):
- j = outerjoin(table1, table2, table1.c.col1==table2.c.col2)
- jj = select([ table1.c.col1.label('bar_col1')],from_obj=[j]).alias('foo')
- jjj = join(table1, jj, table1.c.col1==jj.c.bar_col1)
- assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
+ assert sel.corresponding_column(table1.c.col1) \
+ is sel.c.table1_col1
+ assert sel.corresponding_column(table1.c.col1,
+ require_embedded=True) is sel.c.table1_col1
+ assert table1.corresponding_column(sel.c.table1_col1) \
+ is table1.c.col1
+ assert table1.corresponding_column(sel.c.table1_col1,
+ require_embedded=True) is None
- j2 = jjj.alias('foo')
- assert j2.corresponding_column(jjj.c.table1_col1) is j2.c.table1_col1
+ def test_join_against_join(self):
+ j = outerjoin(table1, table2, table1.c.col1 == table2.c.col2)
+ jj = select([table1.c.col1.label('bar_col1')],
+ from_obj=[j]).alias('foo')
+ jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1)
+ assert jjj.corresponding_column(jjj.c.table1_col1) \
+ is jjj.c.table1_col1
+ j2 = jjj.alias('foo')
+ assert j2.corresponding_column(jjj.c.table1_col1) \
+ is j2.c.table1_col1
assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1
def test_table_alias(self):
criterion = a.c.col1 == table2.c.col2
self.assert_(criterion.compare(j.onclause))
+
def test_union(self):
- # tests that we can correspond a column in a Select statement with a certain Table, against
- # a column in a Union where one of its underlying Selects matches to that same Table
- u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
- select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
- )
+
+ # tests that we can correspond a column in a Select statement
+ # with a certain Table, against a column in a Union where one of
+ # its underlying Selects matches to that same Table
+
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3,
+ table1.c.colx, null().label('coly'
+ )]).union(select([table2.c.col1, table2.c.col2,
+ table2.c.col3, null().label('colx'),
+ table2.c.coly]))
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
c = u.corresponding_column(s1.c.table1_col2)
assert u1.corresponding_column(table1.c.colx) is u1.c.col2
assert u1.corresponding_column(table1.c.col3) is u1.c.col1
- def test_singular_union(self):
- u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]), select([table1.c.col1, table1.c.col2, table1.c.col3]))
+ def test_singular_union(self):
+ u = union(select([table1.c.col1, table1.c.col2,
+ table1.c.col3]), select([table1.c.col1,
+ table1.c.col2, table1.c.col3]))
u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]))
assert u.c.col1 is not None
assert u.c.col2 is not None
assert u.c.col3 is not None
-
+
def test_alias_union(self):
+
# same as testunion, except its an alias of the union
- u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
- select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
- ).alias('analias')
+
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3,
+ table1.c.colx, null().label('coly'
+ )]).union(select([table2.c.col1, table2.c.col2,
+ table2.c.col3, null().label('colx'),
+ table2.c.coly])).alias('analias')
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
assert s2.corresponding_column(u.c.coly) is s2.c.table2_coly
def test_select_union(self):
+
# like testaliasunion, but off a Select off the union.
- u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
- select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
- ).alias('analias')
+
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3,
+ table1.c.colx, null().label('coly'
+ )]).union(select([table2.c.col1, table2.c.col2,
+ table2.c.col3, null().label('colx'),
+ table2.c.coly])).alias('analias')
s = select([u])
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
assert s.corresponding_column(s2.c.table2_col2) is s.c.col2
def test_union_against_join(self):
+
# same as testunion, except its an alias of the union
- u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
- select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
- ).alias('analias')
+
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3,
+ table1.c.colx, null().label('coly'
+ )]).union(select([table2.c.col1, table2.c.col2,
+ table2.c.col3, null().label('colx'),
+ table2.c.coly])).alias('analias')
j1 = table1.join(table2)
assert u.corresponding_column(j1.c.table1_colx) is u.c.colx
assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx
criterion = a.c.table1_col1 == table2.c.col2
self.assert_(criterion.compare(j.onclause))
+
def test_column_labels(self):
- a = select([table1.c.col1.label('acol1'), table1.c.col2.label('acol2'), table1.c.col3.label('acol3')])
+ a = select([table1.c.col1.label('acol1'),
+ table1.c.col2.label('acol2'),
+ table1.c.col3.label('acol3')])
j = join(a, table2)
criterion = a.c.acol1 == table2.c.col2
self.assert_(criterion.compare(j.onclause))
assert_raises(exc.NoReferencedTableError, s.join, t1)
+
def test_join_condition(self):
m = MetaData()
t1 = Table('t1', m, Column('id', Integer))
- t2 = Table('t2', m, Column('id', Integer), Column('t1id', ForeignKey('t1.id')))
- t3 = Table('t3', m, Column('id', Integer),
- Column('t1id', ForeignKey('t1.id')),
- Column('t2id', ForeignKey('t2.id')))
- t4 = Table('t4', m, Column('id', Integer), Column('t2id', ForeignKey('t2.id')))
-
+ t2 = Table('t2', m, Column('id', Integer), Column('t1id',
+ ForeignKey('t1.id')))
+ t3 = Table('t3', m, Column('id', Integer), Column('t1id',
+ ForeignKey('t1.id')), Column('t2id',
+ ForeignKey('t2.id')))
+ t4 = Table('t4', m, Column('id', Integer), Column('t2id',
+ ForeignKey('t2.id')))
t1t2 = t1.join(t2)
t2t3 = t2.join(t3)
-
- for left, right, a_subset, expected in [
- (t1, t2, None, t1.c.id==t2.c.t1id),
- (t1t2, t3, t2, t1t2.c.t2_id==t3.c.t2id),
- (t2t3, t1, t3, t1.c.id==t3.c.t1id),
- (t2t3, t4, None, t2t3.c.t2_id==t4.c.t2id),
- (t2t3, t4, t3, t2t3.c.t2_id==t4.c.t2id),
- (t2t3.join(t1), t4, None, t2t3.c.t2_id==t4.c.t2id),
- (t2t3.join(t1), t4, t1, t2t3.c.t2_id==t4.c.t2id),
- (t1t2, t2t3, t2, t1t2.c.t2_id==t2t3.c.t3_t2id),
- ]:
- assert expected.compare(
- sql_util.join_condition(left, right, a_subset=a_subset)
- )
+ for (left, right, a_subset, expected) in [
+ (t1, t2, None, t1.c.id == t2.c.t1id),
+ (t1t2, t3, t2, t1t2.c.t2_id == t3.c.t2id),
+ (t2t3, t1, t3, t1.c.id == t3.c.t1id),
+ (t2t3, t4, None, t2t3.c.t2_id == t4.c.t2id),
+ (t2t3, t4, t3, t2t3.c.t2_id == t4.c.t2id),
+ (t2t3.join(t1), t4, None, t2t3.c.t2_id == t4.c.t2id),
+ (t2t3.join(t1), t4, t1, t2t3.c.t2_id == t4.c.t2id),
+ (t1t2, t2t3, t2, t1t2.c.t2_id == t2t3.c.t3_t2id),
+ ]:
+ assert expected.compare(sql_util.join_condition(left,
+ right, a_subset=a_subset))
# these are ambiguous, or have no joins
for left, right, a_subset in [
left.join(right).onclause
)
- # TODO: this raises due to right side being "grouped",
- # and no longer has FKs. Did we want to make
- # _FromGrouping friendlier ?
- assert_raises_message(
- exc.ArgumentError,
- r"Perhaps you meant to convert the right side to a subquery using alias\(\)\?",
- t1t2.join, t2t3
- )
- assert_raises_message(
- exc.ArgumentError,
- r"Perhaps you meant to convert the right side to a subquery using alias\(\)\?",
- t1t2.join, t2t3.select(use_labels=True)
- )
+
+ # TODO: this raises due to right side being "grouped", and no
+ # longer has FKs. Did we want to make _FromGrouping friendlier
+ # ?
+
+ assert_raises_message(exc.ArgumentError,
+ "Perhaps you meant to convert the right "
+ "side to a subquery using alias\(\)\?",
+ t1t2.join, t2t3)
+ assert_raises_message(exc.ArgumentError,
+ "Perhaps you meant to convert the right "
+ "side to a subquery using alias\(\)\?",
+ t1t2.join, t2t3.select(use_labels=True))
+
class PrimaryKeyTest(TestBase, AssertsExecutionResults):
+
def test_join_pk_collapse_implicit(self):
- """test that redundant columns in a join get 'collapsed' into a minimal primary key,
- which is the root column along a chain of foreign key relationships."""
+ """test that redundant columns in a join get 'collapsed' into a
+ minimal primary key, which is the root column along a chain of
+ foreign key relationships."""
meta = MetaData()
a = Table('a', meta, Column('id', Integer, primary_key=True))
- b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True))
- c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True))
- d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True))
-
+ b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'),
+ primary_key=True))
+ c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'),
+ primary_key=True))
+ d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'),
+ primary_key=True))
assert c.c.id.references(b.c.id)
assert not d.c.id.references(a.c.id)
-
assert list(a.join(b).primary_key) == [a.c.id]
assert list(b.join(c).primary_key) == [b.c.id]
assert list(a.join(b).join(c).primary_key) == [a.c.id]
assert list(d.join(c).join(b).primary_key) == [b.c.id]
assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id]
+
def test_join_pk_collapse_explicit(self):
- """test that redundant columns in a join get 'collapsed' into a minimal primary key,
- which is the root column along a chain of explicit join conditions."""
+ """test that redundant columns in a join get 'collapsed' into a
+ minimal primary key, which is the root column along a chain of
+ explicit join conditions."""
meta = MetaData()
- a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))
- b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer))
- c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), primary_key=True), Column('x', Integer))
- d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), primary_key=True), Column('x', Integer))
-
- print list(a.join(b, a.c.x==b.c.id).primary_key)
- assert list(a.join(b, a.c.x==b.c.id).primary_key) == [a.c.id]
- assert list(b.join(c, b.c.x==c.c.id).primary_key) == [b.c.id]
- assert list(a.join(b).join(c, c.c.id==b.c.x).primary_key) == [a.c.id]
- assert list(b.join(c, c.c.x==b.c.id).join(d).primary_key) == [b.c.id]
- assert list(b.join(c, c.c.id==b.c.x).join(d).primary_key) == [b.c.id]
- assert list(d.join(b, d.c.id==b.c.id).join(c, b.c.id==c.c.x).primary_key) == [b.c.id]
- assert list(a.join(b).join(c, c.c.id==b.c.x).join(d).primary_key) == [a.c.id]
-
- assert list(a.join(b, and_(a.c.id==b.c.id, a.c.x==b.c.id)).primary_key) == [a.c.id]
+ a = Table('a', meta, Column('id', Integer, primary_key=True),
+ Column('x', Integer))
+ b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'),
+ primary_key=True), Column('x', Integer))
+ c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'),
+ primary_key=True), Column('x', Integer))
+ d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'),
+ primary_key=True), Column('x', Integer))
+ print list(a.join(b, a.c.x == b.c.id).primary_key)
+ assert list(a.join(b, a.c.x == b.c.id).primary_key) == [a.c.id]
+ assert list(b.join(c, b.c.x == c.c.id).primary_key) == [b.c.id]
+ assert list(a.join(b).join(c, c.c.id == b.c.x).primary_key) \
+ == [a.c.id]
+ assert list(b.join(c, c.c.x == b.c.id).join(d).primary_key) \
+ == [b.c.id]
+ assert list(b.join(c, c.c.id == b.c.x).join(d).primary_key) \
+ == [b.c.id]
+ assert list(d.join(b, d.c.id == b.c.id).join(c, b.c.id
+ == c.c.x).primary_key) == [b.c.id]
+ assert list(a.join(b).join(c, c.c.id
+ == b.c.x).join(d).primary_key) == [a.c.id]
+ assert list(a.join(b, and_(a.c.id == b.c.id, a.c.x
+ == b.c.id)).primary_key) == [a.c.id]
def test_init_doesnt_blowitaway(self):
meta = MetaData()
Column('id', Integer, primary_key= True),
)
- engineer = Table( 'Engineer', metadata,
- Column('id', Integer, ForeignKey( 'Employee.id', ), primary_key=True),
- )
+ engineer = Table('Engineer', metadata,
+ Column('id', Integer,
+ ForeignKey('Employee.id'), primary_key=True))
- eq_(
- util.column_set(employee.join(engineer, employee.c.id==engineer.c.id).primary_key),
- util.column_set([employee.c.id])
- )
- eq_(
- util.column_set(employee.join(engineer, engineer.c.id==employee.c.id).primary_key),
- util.column_set([employee.c.id])
- )
+ eq_(util.column_set(employee.join(engineer, employee.c.id
+ == engineer.c.id).primary_key),
+ util.column_set([employee.c.id]))
+ eq_(util.column_set(employee.join(engineer, engineer.c.id
+ == employee.c.id).primary_key),
+ util.column_set([employee.c.id]))
class ReduceTest(TestBase, AssertsExecutionResults):
Column('t3id', Integer, ForeignKey('t2.t2id'), primary_key=True),
Column('t3data', String(30)))
-
- eq_(
- util.column_set(sql_util.reduce_columns([
- t1.c.t1id, t1.c.t1data, t2.c.t2id,
- t2.c.t2data, t3.c.t3id, t3.c.t3data])),
- util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, t3.c.t3data])
- )
+ eq_(util.column_set(sql_util.reduce_columns([
+ t1.c.t1id,
+ t1.c.t1data,
+ t2.c.t2id,
+ t2.c.t2data,
+ t3.c.t3id,
+ t3.c.t3data,
+ ])), util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data,
+ t3.c.t3data]))
+
def test_reduce_selectable(self):
- metadata = MetaData()
-
- engineers = Table('engineers', metadata,
- Column('engineer_id', Integer, primary_key=True),
- Column('engineer_name', String(50)),
- )
-
- managers = Table('managers', metadata,
- Column('manager_id', Integer, primary_key=True),
- Column('manager_name', String(50))
- )
-
- s = select([engineers, managers]).where(engineers.c.engineer_name==managers.c.manager_name)
-
- eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)),
- util.column_set([s.c.engineer_id, s.c.engineer_name, s.c.manager_id])
- )
+ metadata = MetaData()
+ engineers = Table('engineers', metadata, Column('engineer_id',
+ Integer, primary_key=True),
+ Column('engineer_name', String(50)))
+ managers = Table('managers', metadata, Column('manager_id',
+ Integer, primary_key=True),
+ Column('manager_name', String(50)))
+ s = select([engineers,
+ managers]).where(engineers.c.engineer_name
+ == managers.c.manager_name)
+ eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)),
+ util.column_set([s.c.engineer_id, s.c.engineer_name,
+ s.c.manager_id]))
+
def test_reduce_aliased_join(self):
metadata = MetaData()
- people = Table('people', metadata,
- Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
- Column('name', String(50)),
- Column('type', String(30)))
-
- engineers = Table('engineers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
- Column('status', String(30)),
- Column('engineer_name', String(50)),
- Column('primary_language', String(50)),
- )
-
- managers = Table('managers', metadata,
- Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
- Column('status', String(30)),
- Column('manager_name', String(50))
- )
-
- pjoin = people.outerjoin(engineers).\
- outerjoin(managers).select(use_labels=True).\
- alias('pjoin')
- eq_(
- util.column_set(sql_util.reduce_columns([
- pjoin.c.people_person_id, pjoin.c.engineers_person_id,
- pjoin.c.managers_person_id])),
- util.column_set([pjoin.c.people_person_id])
- )
+ people = Table('people', metadata, Column('person_id', Integer,
+ Sequence('person_id_seq', optional=True),
+ primary_key=True), Column('name', String(50)),
+ Column('type', String(30)))
+ engineers = Table(
+ 'engineers',
+ metadata,
+ Column('person_id', Integer, ForeignKey('people.person_id'
+ ), primary_key=True),
+ Column('status', String(30)),
+ Column('engineer_name', String(50)),
+ Column('primary_language', String(50)),
+ )
+ managers = Table('managers', metadata, Column('person_id',
+ Integer, ForeignKey('people.person_id'),
+ primary_key=True), Column('status',
+ String(30)), Column('manager_name',
+ String(50)))
+ pjoin = \
+ people.outerjoin(engineers).outerjoin(managers).\
+ select(use_labels=True).alias('pjoin'
+ )
+ eq_(util.column_set(sql_util.reduce_columns([pjoin.c.people_person_id,
+ pjoin.c.engineers_person_id, pjoin.c.managers_person_id])),
+ util.column_set([pjoin.c.people_person_id]))
def test_reduce_aliased_union(self):
metadata = MetaData()
- item_table = Table(
- 'item', metadata,
- Column('id', Integer, ForeignKey('base_item.id'), primary_key=True),
- Column('dummy', Integer, default=0))
- base_item_table = Table(
- 'base_item', metadata,
- Column('id', Integer, primary_key=True),
- Column('child_name', String(255), default=None))
-
+ item_table = Table('item', metadata, Column('id', Integer,
+ ForeignKey('base_item.id'),
+ primary_key=True), Column('dummy', Integer,
+ default=0))
+ base_item_table = Table('base_item', metadata, Column('id',
+ Integer, primary_key=True),
+ Column('child_name', String(255),
+ default=None))
from sqlalchemy.orm.util import polymorphic_union
-
- item_join = polymorphic_union( {
- 'BaseItem':base_item_table.select(base_item_table.c.child_name=='BaseItem'),
- 'Item':base_item_table.join(item_table),
- }, None, 'item_join')
-
- eq_(
- util.column_set(sql_util.reduce_columns([
- item_join.c.id, item_join.c.dummy, item_join.c.child_name
- ])),
- util.column_set([item_join.c.id, item_join.c.dummy, item_join.c.child_name])
- )
+ item_join = polymorphic_union({
+ 'BaseItem':
+ base_item_table.select(
+ base_item_table.c.child_name
+ == 'BaseItem'),
+ 'Item': base_item_table.join(item_table)},
+ None, 'item_join')
+ eq_(util.column_set(sql_util.reduce_columns([item_join.c.id,
+ item_join.c.dummy, item_join.c.child_name])),
+ util.column_set([item_join.c.id, item_join.c.dummy,
+ item_join.c.child_name]))
+
def test_reduce_aliased_union_2(self):
metadata = MetaData()
-
- page_table = Table('page', metadata,
- Column('id', Integer, primary_key=True),
- )
+ page_table = Table('page', metadata, Column('id', Integer,
+ primary_key=True))
magazine_page_table = Table('magazine_page', metadata,
- Column('page_id', Integer, ForeignKey('page.id'), primary_key=True),
- )
+ Column('page_id', Integer,
+ ForeignKey('page.id'),
+ primary_key=True))
classified_page_table = Table('classified_page', metadata,
- Column('magazine_page_id', Integer,
- ForeignKey('magazine_page.page_id'), primary_key=True),
- )
-
- # this is essentially the union formed by the ORM's polymorphic_union function.
- # we define two versions with different ordering of selects.
+ Column('magazine_page_id', Integer,
+ ForeignKey('magazine_page.page_id'), primary_key=True))
+
+ # this is essentially the union formed by the ORM's
+ # polymorphic_union function. we define two versions with
+ # different ordering of selects.
+ #
+ # the first selectable has the "real" column
+ # classified_page.magazine_page_id
- # the first selectable has the "real" column classified_page.magazine_page_id
pjoin = union(
- select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- classified_page_table.c.magazine_page_id
- ]).select_from(page_table.join(magazine_page_table).join(classified_page_table)),
-
- select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- cast(null(), Integer).label('magazine_page_id')
- ]).select_from(page_table.join(magazine_page_table)),
- ).alias('pjoin')
-
- eq_(
- util.column_set(sql_util.reduce_columns([pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])),
- util.column_set([pjoin.c.id])
- )
+ select([
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ classified_page_table.c.magazine_page_id
+ ]).
+ select_from(
+ page_table.join(magazine_page_table).
+ join(classified_page_table)),
+
+ select([
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ cast(null(), Integer).label('magazine_page_id')
+ ]).
+ select_from(page_table.join(magazine_page_table))
+ ).alias('pjoin')
+ eq_(util.column_set(sql_util.reduce_columns([pjoin.c.id,
+ pjoin.c.page_id, pjoin.c.magazine_page_id])),
+ util.column_set([pjoin.c.id]))
# the first selectable has a CAST, which is a placeholder for
- # classified_page.magazine_page_id in the second selectable. reduce_columns
- # needs to take into account all foreign keys derived from pjoin.c.magazine_page_id.
- # the UNION construct currently makes the external column look like that of the first
- # selectable only.
- pjoin = union(
- select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- cast(null(), Integer).label('magazine_page_id')
- ]).select_from(page_table.join(magazine_page_table)),
-
- select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- classified_page_table.c.magazine_page_id
- ]).select_from(page_table.join(magazine_page_table).join(classified_page_table))
- ).alias('pjoin')
-
- eq_(
- util.column_set(sql_util.reduce_columns([
- pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])),
- util.column_set([pjoin.c.id])
- )
+ # classified_page.magazine_page_id in the second selectable.
+ # reduce_columns needs to take into account all foreign keys
+ # derived from pjoin.c.magazine_page_id. the UNION construct
+ # currently makes the external column look like that of the
+ # first selectable only.
+
+ pjoin = union(select([
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ cast(null(), Integer).label('magazine_page_id')
+ ]).
+ select_from(page_table.join(magazine_page_table)),
+
+ select([
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ classified_page_table.c.magazine_page_id
+ ]).
+ select_from(page_table.join(magazine_page_table).
+ join(classified_page_table))
+ ).alias('pjoin')
+ eq_(util.column_set(sql_util.reduce_columns([pjoin.c.id,
+ pjoin.c.page_id, pjoin.c.magazine_page_id])),
+ util.column_set([pjoin.c.id]))
class DerivedTest(TestBase, AssertsExecutionResults):
def test_table(self):
meta = MetaData()
- t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
- t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
+
+ t1 = Table('t1', meta, Column('c1', Integer, primary_key=True),
+ Column('c2', String(30)))
+ t2 = Table('t2', meta, Column('c1', Integer, primary_key=True),
+ Column('c2', String(30)))
assert t1.is_derived_from(t1)
assert not t2.is_derived_from(t1)
+
def test_alias(self):
meta = MetaData()
- t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
- t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
+ t1 = Table('t1', meta, Column('c1', Integer, primary_key=True),
+ Column('c2', String(30)))
+ t2 = Table('t2', meta, Column('c1', Integer, primary_key=True),
+ Column('c2', String(30)))
assert t1.alias().is_derived_from(t1)
assert not t2.alias().is_derived_from(t1)
def test_select(self):
meta = MetaData()
- t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
- t2 = Table('t2', meta, Column('c1', Integer, primary_key=True), Column('c2', String(30)))
+
+ t1 = Table('t1', meta, Column('c1', Integer, primary_key=True),
+ Column('c2', String(30)))
+ t2 = Table('t2', meta, Column('c1', Integer, primary_key=True),
+ Column('c2', String(30)))
assert t1.select().is_derived_from(t1)
assert not t2.select().is_derived_from(t1)
t1 = s1._annotate({})
t2 = s1
- # t1 needs to share the same _make_proxy() columns as t2, even though it's
- # annotated. otherwise paths will diverge once they are corresponded against "inner" below.
+ # t1 needs to share the same _make_proxy() columns as t2, even
+ # though it's annotated. otherwise paths will diverge once they
+ # are corresponded against "inner" below.
+
assert t1.c is t2.c
assert t1.c.col1 is t2.c.col1
inner = select([s1])
- assert inner.corresponding_column(t2.c.col1, require_embedded=False) is inner.corresponding_column(t2.c.col1, require_embedded=True) is inner.c.col1
- assert inner.corresponding_column(t1.c.col1, require_embedded=False) is inner.corresponding_column(t1.c.col1, require_embedded=True) is inner.c.col1
+
+ assert inner.corresponding_column(t2.c.col1,
+ require_embedded=False) \
+ is inner.corresponding_column(t2.c.col1,
+ require_embedded=True) is inner.c.col1
+ assert inner.corresponding_column(t1.c.col1,
+ require_embedded=False) \
+ is inner.corresponding_column(t1.c.col1,
+ require_embedded=True) is inner.c.col1
def test_annotated_visit(self):
table1 = table('table1', column("col1"), column("col2"))
b2 = visitors.cloned_traverse(bin, {}, {'binary':visit_binary})
assert str(b2) == "table1.col1 = table1.col2"
- b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary':visit_binary})
- assert str(b3) == "table1.col1 = table1.col2"
+
+ b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary'
+ : visit_binary})
+ assert str(b3) == 'table1.col1 = table1.col2'
def visit_binary(b):
b.left = bindparam('bar')
b5 = visitors.cloned_traverse(b3, {}, {'binary':visit_binary})
assert str(b5) == ":bar = table1.col2"
+
def test_annotate_expressions(self):
- table1 = table('table1', column("col1"), column("col2"))
-
- for expr, expected in [
- (table1.c.col1, "table1.col1"),
- (table1.c.col1 == 5, "table1.col1 = :col1_1"),
- (table1.c.col1.in_([2,3,4]), "table1.col1 IN (:col1_1, :col1_2, :col1_3)")
- ]:
+ table1 = table('table1', column('col1'), column('col2'))
+ for expr, expected in [(table1.c.col1, 'table1.col1'),
+ (table1.c.col1 == 5,
+ 'table1.col1 = :col1_1'),
+ (table1.c.col1.in_([2, 3, 4]),
+ 'table1.col1 IN (:col1_1, :col1_2, '
+ ':col1_3)')]:
eq_(str(expr), expected)
eq_(str(expr._annotate({})), expected)
eq_(str(sql_util._deep_annotate(expr, {})), expected)
- eq_(str(sql_util._deep_annotate(expr, {}, exclude=[table1.c.col1])), expected)
-
+ eq_(str(sql_util._deep_annotate(expr, {},
+ exclude=[table1.c.col1])), expected)
def test_deannotate(self):
table1 = table('table1', column("col1"), column("col2"))
for elem in (b2._annotations, b2.left._annotations):
assert '_orm_adapt' in elem
- for elem in (b3._annotations, b3.left._annotations, b4._annotations, b4.left._annotations):
+ for elem in b3._annotations, b3.left._annotations, \
+ b4._annotations, b4.left._annotations:
assert elem == {}
-
+
assert b2.left is not bin.left
assert b3.left is not b2.left is not bin.left
assert b4.left is bin.left # since column is immutable