from sqlalchemy.engine import default
from sqlalchemy.orm import mapper, relationship, backref, \
create_session, class_mapper, configure_mappers, reconstructor, \
- validates, aliased, defer, deferred, synonym, attributes, \
+ aliased, deferred, synonym, attributes, \
column_property, composite, dynamic_loader, \
comparable_property, Session
from sqlalchemy.orm.persistence import _sort_states
import logging
import logging.handlers
+
class MapperTest(_fixtures.FixtureTest, AssertsCompiledSQL):
__dialect__ = 'default'
"""A backref name may not shadow an existing property name."""
Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
-
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
mapper(Address, addresses)
mapper(User, users,
- properties={
- 'addresses':relationship(Address, backref='email_address')
- })
+ properties={
+ 'addresses': relationship(Address, backref='email_address')
+ })
assert_raises(sa.exc.ArgumentError, sa.orm.configure_mappers)
def test_update_attr_keys(self):
- """test that update()/insert() use the correct key when given InstrumentedAttributes."""
+ """test that update()/insert() use the correct key when given
+ InstrumentedAttributes."""
User, users = self.classes.User, self.tables.users
-
mapper(User, users, properties={
- 'foobar':users.c.name
+ 'foobar': users.c.name
})
- users.insert().values({User.foobar:'name1'}).execute()
- eq_(sa.select([User.foobar]).where(User.foobar=='name1').execute().fetchall(), [('name1',)])
+ users.insert().values({User.foobar: 'name1'}).execute()
+ eq_(sa.select([User.foobar]).where(User.foobar == 'name1').
+ execute().fetchall(), [('name1',)])
- users.update().values({User.foobar:User.foobar + 'foo'}).execute()
- eq_(sa.select([User.foobar]).where(User.foobar=='name1foo').execute().fetchall(), [('name1foo',)])
+ users.update().values({User.foobar: User.foobar + 'foo'}).execute()
+ eq_(sa.select([User.foobar]).where(User.foobar == 'name1foo').
+ execute().fetchall(), [('name1foo',)])
def test_utils(self):
users = self.tables.users
class Foo(object):
x = "something"
+
@property
def y(self):
return "something else"
-
- m = mapper(Foo, users, properties={"addresses":relationship(Address)})
+ m = mapper(Foo, users, properties={"addresses": relationship(Address)})
mapper(Address, addresses)
a1 = aliased(Foo)
class Foo(object):
x = "something"
+
@property
def y(self):
return "something else"
m = mapper(Foo, users)
a1 = aliased(Foo)
- f = Foo()
-
for arg, key, ret in [
(m, "x", Foo.x),
(Foo, "x", Foo.x),
def boom():
raise Exception("it broke")
mapper(User, users, properties={
- 'addresses':relationship(boom)
+ 'addresses': relationship(boom)
})
# test that QueryableAttribute.__str__() doesn't
"""
Address, addresses, User = (self.classes.Address,
- self.tables.addresses,
- self.classes.User)
-
+ self.tables.addresses,
+ self.classes.User)
mapper(Address, addresses, properties={
- 'user':relationship(User)
+ 'user': relationship(User)
})
try:
"initialize - can't proceed with "
"initialization of other mappers. "
"Original exception was: Class "
- "'test.orm._fixtures.User' is not mapped$"
- , configure_mappers)
+ "'test.orm._fixtures.User' is not mapped$",
+ configure_mappers)
def test_column_prefix(self):
users, User = self.tables.users, self.classes.User
s = create_session()
u = s.query(User).get(7)
eq_(u._name, 'jack')
- eq_(u._id,7)
+ eq_(u._id, 7)
u2 = s.query(User).filter_by(user_name='jack').one()
assert u is u2
still triggers a check against all mappers."""
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users)
sa.orm.configure_mappers()
assert sa.orm.mapperlib.Mapper._new_mappers is False
m = mapper(Address, addresses, properties={
- 'user': relationship(User, backref="addresses")})
+ 'user': relationship(User, backref="addresses")})
assert m.configured is False
assert sa.orm.mapperlib.Mapper._new_mappers is True
def test_column_not_present(self):
users, addresses, User = (self.tables.users,
- self.tables.addresses,
- self.classes.User)
+ self.tables.addresses,
+ self.classes.User)
assert_raises_message(sa.exc.ArgumentError,
"not represented in the mapper's table",
- mapper, User, users, properties={'foo'
- : addresses.c.user_id})
+ mapper, User, users,
+ properties={'foo': addresses.c.user_id})
def test_constructor_exc(self):
"""TypeError is raised for illegal constructor args,
users, addresses = self.tables.users, self.tables.addresses
-
class Foo(object):
+
def __init__(self):
pass
+
class Bar(object):
pass
"""
class Foo(object):
+
def __init__(self, id):
self.id = id
m = MetaData()
foo_t = Table('foo', m,
- Column('id', String, primary_key=True)
- )
+ Column('id', String, primary_key=True)
+ )
m = mapper(Foo, foo_t)
+
class DontCompareMeToString(int):
if util.py2k:
def __lt__(self, other):
[states[4], states[3], states[0], states[1], states[2]]
)
-
def test_props(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
- m = mapper(User, users, properties = {
- 'addresses' : relationship(mapper(Address, addresses))
+ m = mapper(User, users, properties={
+ 'addresses': relationship(mapper(Address, addresses))
})
assert User.addresses.property is m.get_property('addresses')
def test_unicode_relationship_backref_names(self):
# test [ticket:2901]
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(Address, addresses)
mapper(User, users, properties={
def test_configure_on_prop_1(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
- mapper(User, users, properties = {
- 'addresses' : relationship(mapper(Address, addresses))
+ mapper(User, users, properties={
+ 'addresses': relationship(mapper(Address, addresses))
})
- User.addresses.any(Address.email_address=='foo@bar.com')
+ User.addresses.any(Address.email_address == 'foo@bar.com')
def test_configure_on_prop_2(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
- mapper(User, users, properties = {
- 'addresses' : relationship(mapper(Address, addresses))
+ mapper(User, users, properties={
+ 'addresses': relationship(mapper(Address, addresses))
})
- eq_(str(User.id == 3), str(users.c.id==3))
+ eq_(str(User.id == 3), str(users.c.id == 3))
def test_configure_on_prop_3(self):
users, addresses, User = (self.tables.users,
- self.tables.addresses,
- self.classes.User)
+ self.tables.addresses,
+ self.classes.User)
+
+ class Foo(User):
+ pass
- class Foo(User):pass
mapper(User, users)
mapper(Foo, addresses, inherits=User, properties={
- 'address_id': addresses.c.id
- })
+ 'address_id': addresses.c.id
+ })
assert getattr(Foo().__class__, 'name').impl is not None
def test_deferred_subclass_attribute_instrument(self):
users, addresses, User = (self.tables.users,
- self.tables.addresses,
- self.classes.User)
+ self.tables.addresses,
+ self.classes.User)
+
+ class Foo(User):
+ pass
- class Foo(User):pass
mapper(User, users)
configure_mappers()
mapper(Foo, addresses, inherits=User, properties={
- 'address_id': addresses.c.id
- })
+ 'address_id': addresses.c.id
+ })
assert getattr(Foo().__class__, 'name').impl is not None
def test_check_descriptor_as_method(self):
User, users = self.classes.User, self.tables.users
m = mapper(User, users)
+
class MyClass(User):
+
def foo(self):
pass
m._is_userland_descriptor(MyClass.foo)
def test_configure_on_get_props_1(self):
User, users = self.classes.User, self.tables.users
- m =mapper(User, users)
+ m = mapper(User, users)
assert not m.configured
assert list(m.iterate_properties)
assert m.configured
def test_configure_on_get_props_2(self):
User, users = self.classes.User, self.tables.users
- m= mapper(User, users)
+ m = mapper(User, users)
assert not m.configured
assert m.get_property('name')
assert m.configured
def test_configure_on_get_props_3(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
- m= mapper(User, users)
+ m = mapper(User, users)
assert not m.configured
configure_mappers()
m2 = mapper(Address, addresses, properties={
- 'user':relationship(User, backref='addresses')
- })
+ 'user': relationship(User, backref='addresses')
+ })
assert m.get_property('addresses')
def test_info(self):
users = self.tables.users
Address = self.classes.Address
+
class MyComposite(object):
pass
for constructor, args in [
# create specific tables here as we don't want
# users.c.id.info to be pre-initialized
users = Table('u', m, Column('id', Integer, primary_key=True),
- Column('name', String))
+ Column('name', String))
addresses = Table('a', m, Column('id', Integer, primary_key=True),
- Column('name', String),
- Column('user_id', Integer, ForeignKey('u.id')))
+ Column('name', String),
+ Column('user_id', Integer, ForeignKey('u.id')))
Address = self.classes.Address
User = self.classes.User
mapper(User, users, properties={
- "name_lower": column_property(func.lower(users.c.name)),
- "addresses": relationship(Address)
- })
+ "name_lower": column_property(func.lower(users.c.name)),
+ "addresses": relationship(Address)
+ })
mapper(Address, addresses)
# attr.info goes down to the original Column object
# same for relationships
is_(User.addresses.info, User.addresses.property.info)
-
def test_add_property(self):
users, addresses, Address = (self.tables.users,
- self.tables.addresses,
- self.classes.Address)
+ self.tables.addresses,
+ self.classes.Address)
assert_col = []
class User(fixtures.ComparableEntity):
+
def _get_name(self):
assert_col.append(('get', self._name))
return self._name
+
def _set_name(self, name):
assert_col.append(('set', name))
self._name = name
m.add_property('addresses', relationship(Address))
m.add_property('uc_name', sa.orm.comparable_property(UCComparator))
m.add_property('uc_name2', sa.orm.comparable_property(
- UCComparator, User.uc_name2))
+ UCComparator, User.uc_name2))
sess = create_session(autocommit=False)
assert sess.query(User).get(7)
User()
m2 = mapper(Address, addresses, properties={
- 'user':relationship(User, backref="addresses")
+ 'user': relationship(User, backref="addresses")
})
# configure mappers takes place when User is generated
User()
users, User = self.tables.users, self.classes.User
m = mapper(User, users)
- m.add_property('_name',users.c.name)
+ m.add_property('_name', users.c.name)
m.add_property('name', synonym('_name'))
sess = create_session()
addresses, Address = self.tables.addresses, self.classes.Address
m = mapper(User, users, properties={
- "addresses": relationship(Address)
- })
+ "addresses": relationship(Address)
+ })
mapper(Address, addresses)
assert_raises_message(
def test_add_column_prop_deannotate(self):
User, users = self.classes.User, self.tables.users
Address, addresses = self.classes.Address, self.tables.addresses
+
class SubUser(User):
pass
m = mapper(User, users)
m2 = mapper(SubUser, addresses, inherits=User, properties={
- 'address_id': addresses.c.id
- })
+ 'address_id': addresses.c.id
+ })
m3 = mapper(Address, addresses, properties={
- 'foo':relationship(m2)
+ 'foo': relationship(m2)
})
# add property using annotated User.name,
# needs to be deannotated
"addresses_1.email_address AS "
"addresses_1_email_address, "
"users_1.name || :name_1 AS anon_1 "
- "FROM addresses JOIN (users AS users_1 JOIN addresses AS addresses_1 ON users_1.id = "
+ "FROM addresses JOIN (users AS users_1 JOIN addresses "
+ "AS addresses_1 ON users_1.id = "
"addresses_1.user_id) ON "
"users_1.id = addresses.user_id"
)
assert User.y.property.columns[0] is not expr2
assert User.y.property.columns[0].element.\
- _raw_columns[0] is users.c.name
+ _raw_columns[0] is users.c.name
assert User.y.property.columns[0].element.\
- _raw_columns[1] is users.c.id
+ _raw_columns[1] is users.c.id
def test_synonym_replaces_backref(self):
addresses, users, User = (self.tables.addresses,
- self.tables.users,
- self.classes.User)
+ self.tables.users,
+ self.classes.User)
assert_calls = []
+
class Address(object):
+
def _get_user(self):
assert_calls.append("get")
return self._user
+
def _set_user(self, user):
assert_calls.append("set")
self._user = user
# synonym is created against nonexistent prop
mapper(Address, addresses, properties={
- 'user':synonym('_user')
+ 'user': synonym('_user')
})
sa.orm.configure_mappers()
# later, backref sets up the prop
mapper(User, users, properties={
- 'addresses':relationship(Address, backref='_user')
+ 'addresses': relationship(Address, backref='_user')
})
sess = create_session()
u1 = sess.query(User).get(7)
u2 = sess.query(User).get(8)
# comparaison ops need to work
- a1 = sess.query(Address).filter(Address.user==u1).one()
+ a1 = sess.query(Address).filter(Address.user == u1).one()
eq_(a1.id, 1)
a1.user = u2
assert a1.user is u2
def test_self_ref_synonym(self):
t = Table('nodes', MetaData(),
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('parent_id', Integer, ForeignKey('nodes.id')))
+ Column(
+ 'id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('parent_id', Integer, ForeignKey('nodes.id')))
class Node(object):
pass
mapper(Node, t, properties={
- '_children':relationship(Node, backref=backref('_parent', remote_side=t.c.id)),
- 'children':synonym('_children'),
- 'parent':synonym('_parent')
+ '_children': relationship(
+ Node, backref=backref('_parent', remote_side=t.c.id)),
+ 'children': synonym('_children'),
+ 'parent': synonym('_parent')
})
n1 = Node()
def test_non_primary_identity_class(self):
User = self.classes.User
users, addresses = self.tables.users, self.tables.addresses
+
class AddressUser(User):
pass
m1 = mapper(User, users, polymorphic_identity='user')
m2 = mapper(AddressUser, addresses, inherits=User,
- polymorphic_identity='address', properties={
- 'address_id': addresses.c.id
- })
+ polymorphic_identity='address', properties={
+ 'address_id': addresses.c.id
+ })
m3 = mapper(AddressUser, addresses, non_primary=True)
assert m3._identity_class is m2._identity_class
eq_(
def test_reassign_polymorphic_identity_warns(self):
User = self.classes.User
users = self.tables.users
+
class MyUser(User):
pass
m1 = mapper(User, users, polymorphic_on=users.c.name,
MyUser, users, inherits=User, polymorphic_identity='user'
)
-
def test_illegal_non_primary(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users)
mapper(Address, addresses)
mapper(User, users, non_primary=True, properties={
- 'addresses':relationship(Address)
+ 'addresses': relationship(Address)
})
assert_raises_message(
sa.exc.ArgumentError,
class Base(object):
pass
+
class Sub(Base):
pass
mapper(Base, users)
assert_raises_message(sa.exc.InvalidRequestError,
- "Configure a primary mapper first",
- mapper, Sub, addresses, non_primary=True
- )
+ "Configure a primary mapper first",
+ mapper, Sub, addresses, non_primary=True
+ )
def test_prop_filters(self):
t = Table('person', MetaData(),
Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ test_needs_autoincrement=True),
Column('type', String(128)),
Column('name', String(128)),
Column('employee_number', Integer),
Column('boss_id', Integer, ForeignKey('person.id')),
Column('vendor_id', Integer))
- class Person(object): pass
- class Vendor(Person): pass
- class Employee(Person): pass
- class Manager(Employee): pass
- class Hoho(object): pass
- class Lala(object): pass
- class Fub(object):pass
- class Frob(object):pass
+ class Person(object):
+ pass
+
+ class Vendor(Person):
+ pass
+
+ class Employee(Person):
+ pass
+
+ class Manager(Employee):
+ pass
+
+ class Hoho(object):
+ pass
+
+ class Lala(object):
+ pass
+
+ class Fub(object):
+ pass
+
+ class Frob(object):
+ pass
+
class HasDef(object):
+
def name(self):
pass
- class Empty(object):pass
- empty = mapper(Empty, t, properties={'empty_id' : t.c.id},
- include_properties=[])
+ class Empty(object):
+ pass
+
+ mapper(
+ Empty, t, properties={'empty_id': t.c.id},
+ include_properties=[])
p_m = mapper(Person, t, polymorphic_on=t.c.type,
include_properties=('id', 'type', 'name'))
e_m = mapper(Employee, inherits=p_m,
- polymorphic_identity='employee', properties={'boss'
- : relationship(Manager, backref=backref('peon'),
- remote_side=t.c.id)},
+ polymorphic_identity='employee',
+ properties={
+ 'boss': relationship(
+ Manager, backref=backref('peon'),
+ remote_side=t.c.id)},
exclude_properties=('vendor_id', ))
- m_m = mapper(Manager, inherits=e_m, polymorphic_identity='manager',
- include_properties=('id', 'type'))
+ mapper(
+ Manager, inherits=e_m, polymorphic_identity='manager',
+ include_properties=('id', 'type'))
- v_m = mapper(Vendor, inherits=p_m, polymorphic_identity='vendor',
- exclude_properties=('boss_id', 'employee_number'))
- h_m = mapper(Hoho, t, include_properties=('id', 'type', 'name'))
- l_m = mapper(Lala, t, exclude_properties=('vendor_id', 'boss_id'),
- column_prefix="p_")
+ mapper(
+ Vendor, inherits=p_m, polymorphic_identity='vendor',
+ exclude_properties=('boss_id', 'employee_number'))
+ mapper(Hoho, t, include_properties=('id', 'type', 'name'))
+ mapper(
+ Lala, t, exclude_properties=('vendor_id', 'boss_id'),
+ column_prefix="p_")
- hd_m = mapper(HasDef, t, column_prefix="h_")
+ mapper(HasDef, t, column_prefix="h_")
- fb_m = mapper(Fub, t, include_properties=(t.c.id, t.c.type))
- frb_m = mapper(Frob, t, column_prefix='f_',
- exclude_properties=(t.c.boss_id,
- 'employee_number', t.c.vendor_id))
+ mapper(Fub, t, include_properties=(t.c.id, t.c.type))
+ mapper(
+ Frob, t, column_prefix='f_',
+ exclude_properties=(
+ t.c.boss_id,
+ 'employee_number', t.c.vendor_id))
configure_mappers()
eq_(have, want)
assert_props(HasDef, ['h_boss_id', 'h_employee_number', 'h_id',
- 'name', 'h_name', 'h_vendor_id', 'h_type'])
+ 'name', 'h_name', 'h_vendor_id', 'h_type'])
assert_props(Person, ['id', 'name', 'type'])
assert_instrumented(Person, ['id', 'name', 'type'])
assert_props(Employee, ['boss', 'boss_id', 'employee_number',
'id', 'name', 'type'])
- assert_instrumented(Employee,['boss', 'boss_id', 'employee_number',
- 'id', 'name', 'type'])
+ assert_instrumented(Employee, ['boss', 'boss_id', 'employee_number',
+ 'id', 'name', 'type'])
assert_props(Manager, ['boss', 'boss_id', 'employee_number', 'peon',
'id', 'name', 'type'])
assert_props(Fub, ['id', 'type'])
assert_props(Frob, ['f_id', 'f_type', 'f_name', ])
-
# putting the discriminator column in exclude_properties,
# very weird. As of 0.7.4 this re-maps it.
class Foo(Person):
def test_prop_filters_defaults(self):
metadata = self.metadata
t = Table('t', metadata,
- Column('id', Integer(), primary_key=True, test_needs_autoincrement=True),
- Column('x', Integer(), nullable=False, server_default='0')
- )
+ Column(
+ 'id', Integer(), primary_key=True,
+ test_needs_autoincrement=True),
+ Column('x', Integer(), nullable=False, server_default='0')
+ )
t.create()
+
class A(object):
pass
mapper(A, t, include_properties=['id'])
def test_we_dont_call_bool(self):
class NoBoolAllowed(object):
+
def __bool__(self):
raise Exception("nope")
mapper(NoBoolAllowed, self.tables.users)
def test_we_dont_call_eq(self):
class NoEqAllowed(object):
+
def __eq__(self, other):
raise Exception("nope")
Address = self.classes.Address
mapper(NoEqAllowed, users, properties={
- 'addresses':relationship(Address, backref='user')
+ 'addresses': relationship(Address, backref='user')
})
mapper(Address, addresses)
"""Test implicit merging of two cols raises."""
addresses, users, User = (self.tables.addresses,
- self.tables.users,
- self.classes.User)
-
+ self.tables.users,
+ self.classes.User)
usersaddresses = sa.join(users, addresses,
users.c.id == addresses.c.user_id)
"""Mapping to a join"""
User, addresses, users = (self.classes.User,
- self.tables.addresses,
- self.tables.users)
-
+ self.tables.addresses,
+ self.tables.users)
usersaddresses = sa.join(users, addresses, users.c.id
== addresses.c.user_id)
mapper(User, usersaddresses, primary_key=[users.c.id],
- properties={'add_id':addresses.c.id}
+ properties={'add_id': addresses.c.id}
)
l = create_session().query(User).order_by(users.c.id).all()
eq_(l, self.static.user_result[:3])
"""Mapping to a join"""
User, addresses, users = (self.classes.User,
- self.tables.addresses,
- self.tables.users)
-
+ self.tables.addresses,
+ self.tables.users)
usersaddresses = sa.join(users, addresses, users.c.id
== addresses.c.user_id)
def test_mapping_to_join_no_pk(self):
email_bounces, addresses, Address = (self.tables.email_bounces,
- self.tables.addresses,
- self.classes.Address)
+ self.tables.addresses,
+ self.classes.Address)
m = mapper(Address,
- addresses.join(email_bounces),
- properties={'id':[addresses.c.id, email_bounces.c.id]}
- )
+ addresses.join(email_bounces),
+ properties={'id': [addresses.c.id, email_bounces.c.id]}
+ )
configure_mappers()
assert addresses in m._pks_by_table
assert email_bounces not in m._pks_by_table
"""Mapping to an outer join with a nullable composite primary key."""
users, addresses, User = (self.tables.users,
- self.tables.addresses,
- self.classes.User)
-
-
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users.outerjoin(addresses),
primary_key=[users.c.id, addresses.c.id],
"""test the allow_partial_pks=False flag."""
users, addresses, User = (self.tables.users,
- self.tables.addresses,
- self.classes.User)
-
-
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users.outerjoin(addresses),
- allow_partial_pks=False,
+ allow_partial_pks=False,
primary_key=[users.c.id, addresses.c.id],
properties=dict(
address_id=addresses.c.id))
def test_scalar_pk_arg(self):
users, Keyword, items, Item, User, keywords = (self.tables.users,
- self.classes.Keyword,
- self.tables.items,
- self.classes.Item,
- self.classes.User,
- self.tables.keywords)
+ self.classes.Keyword,
+ self.tables.items,
+ self.classes.Item,
+ self.classes.User,
+ self.tables.keywords)
m1 = mapper(Item, items, primary_key=[items.c.id])
m2 = mapper(Keyword, keywords, primary_key=keywords.c.id)
assert m2.primary_key[0] is keywords.c.id
assert m3.primary_key[0] is users.c.id
-
def test_custom_join(self):
"""select_from totally replace the FROM parameters."""
- users, items, order_items, orders, Item, User, Order = (self.tables.users,
- self.tables.items,
- self.tables.order_items,
- self.tables.orders,
- self.classes.Item,
- self.classes.User,
- self.classes.Order)
-
+ users, items, order_items, orders, Item, User, Order = (
+ self.tables.users,
+ self.tables.items,
+ self.tables.order_items,
+ self.tables.orders,
+ self.classes.Item,
+ self.classes.User,
+ self.classes.Order)
mapper(Item, items)
mapper(User, users, order_by=users.c.name.desc())
- assert "order by users.name desc" in str(create_session().query(User).statement).lower()
- assert "order by" not in str(create_session().query(User).order_by(None).statement).lower()
- assert "order by users.name asc" in str(create_session().query(User).order_by(User.name.asc()).statement).lower()
+ assert "order by users.name desc" in \
+ str(create_session().query(User).statement).lower()
+ assert "order by" not in \
+ str(create_session().query(User).order_by(None).statement).lower()
+ assert "order by users.name asc" in \
+ str(create_session().query(User).order_by(
+ User.name.asc()).statement).lower()
eq_(
create_session().query(User).all(),
- [User(id=7, name='jack'), User(id=9, name='fred'), User(id=8, name='ed'), User(id=10, name='chuck')]
+ [User(id=7, name='jack'), User(id=9, name='fred'),
+ User(id=8, name='ed'), User(id=10, name='chuck')]
)
eq_(
create_session().query(User).order_by(User.name).all(),
- [User(id=10, name='chuck'), User(id=8, name='ed'), User(id=9, name='fred'), User(id=7, name='jack')]
+ [User(id=10, name='chuck'), User(id=8, name='ed'),
+ User(id=9, name='fred'), User(id=7, name='jack')]
)
# 'Raises a "expression evaluation not supported" error at prepare time
"""Mapping to a SELECT statement that has functions in it."""
addresses, users, User = (self.tables.addresses,
- self.tables.users,
- self.classes.User)
-
+ self.tables.users,
+ self.classes.User)
s = sa.select([users,
(users.c.id * 2).label('concat'),
User, users = self.classes.User, self.tables.users
-
mapper(User, users)
session = create_session()
q = session.query(User)
eq_(q.count(), 4)
- eq_(q.filter(User.id.in_([8,9])).count(), 2)
- eq_(q.filter(users.c.id.in_([8,9])).count(), 2)
+ eq_(q.filter(User.id.in_([8, 9])).count(), 2)
+ eq_(q.filter(users.c.id.in_([8, 9])).count(), 2)
eq_(session.query(User.id).count(), 4)
eq_(session.query(User.id).filter(User.id.in_((8, 9))).count(), 2)
def test_many_to_many_count(self):
- keywords, items, item_keywords, Keyword, Item = (self.tables.keywords,
- self.tables.items,
- self.tables.item_keywords,
- self.classes.Keyword,
- self.classes.Item)
+ keywords, items, item_keywords, Keyword, Item = (
+ self.tables.keywords,
+ self.tables.items,
+ self.tables.item_keywords,
+ self.classes.Keyword,
+ self.classes.Item)
mapper(Keyword, keywords)
mapper(Item, items, properties=dict(
- keywords = relationship(Keyword, item_keywords, lazy='select')))
+ keywords=relationship(Keyword, item_keywords, lazy='select')))
session = create_session()
q = (session.query(Item).
"""Overriding a column raises an error."""
Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
def go():
mapper(User, users,
"""exclude_properties cancels the error."""
Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
-
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
mapper(User, users,
exclude_properties=['name'],
"""The column being named elsewhere also cancels the error,"""
Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
mapper(User, users,
properties=dict(
def test_synonym(self):
users, addresses, Address = (self.tables.users,
- self.tables.addresses,
- self.classes.Address)
-
+ self.tables.addresses,
+ self.classes.Address)
assert_col = []
+
class extendedproperty(property):
attribute = 123
class User(object):
+
def _get_name(self):
assert_col.append(('get', self.name))
return self.name
+
def _set_name(self, name):
assert_col.append(('set', name))
self.name = name
uname = extendedproperty(_get_name, _set_name)
mapper(User, users, properties=dict(
- addresses = relationship(mapper(Address, addresses), lazy='select'),
- uname = synonym('name'),
- adlist = synonym('addresses'),
- adname = synonym('addresses')
+ addresses=relationship(mapper(Address, addresses), lazy='select'),
+ uname=synonym('name'),
+ adlist=synonym('addresses'),
+ adname=synonym('addresses')
))
# ensure the synonym can get at the proxied comparators without
row = sess.query(User.id, User.uname).first()
assert row.uname == row[1]
- u = sess.query(User).filter(User.uname=='jack').one()
+ u = sess.query(User).filter(User.uname == 'jack').one()
fixture = self.static.user_address_result[0].addresses
eq_(u.adlist, fixture)
eq_(User.uname.attribute, 123)
def test_synonym_of_synonym(self):
- users, User = (self.tables.users,
- self.classes.User)
+ users, User = (self.tables.users,
+ self.classes.User)
mapper(User, users, properties={
- 'x':synonym('id'),
- 'y':synonym('x')
+ 'x': synonym('id'),
+ 'y': synonym('x')
})
s = Session()
- u = s.query(User).filter(User.y==8).one()
+ u = s.query(User).filter(User.y == 8).one()
eq_(u.y, 8)
-
def test_synonym_column_location(self):
users, User = self.tables.users, self.classes.User
def go():
mapper(User, users, properties={
- 'not_name':synonym('_name', map_column=True)})
+ 'not_name': synonym('_name', map_column=True)})
assert_raises_message(
sa.exc.ArgumentError,
go)
def test_column_synonyms(self):
- """Synonyms which automatically instrument properties, set up aliased column, etc."""
+ """Synonyms which automatically instrument properties,
+ set up aliased column, etc."""
addresses, users, Address = (self.tables.addresses,
- self.tables.users,
- self.classes.Address)
-
-
+ self.tables.users,
+ self.classes.Address)
assert_col = []
+
class User(object):
+
def _get_name(self):
assert_col.append(('get', self._name))
return self._name
+
def _set_name(self, name):
assert_col.append(('set', name))
self._name = name
name = property(_get_name, _set_name)
mapper(Address, addresses)
- mapper(User, users, properties = {
- 'addresses':relationship(Address, lazy='select'),
- 'name':synonym('_name', map_column=True)
+ mapper(User, users, properties={
+ 'addresses': relationship(Address, lazy='select'),
+ 'name': synonym('_name', map_column=True)
})
# test compile
return "method1"
from sqlalchemy.orm.properties import ColumnProperty
+
class UCComparator(ColumnProperty.Comparator):
__hash__ = None
def map_(with_explicit_property):
class User(object):
+
@extendedproperty
def uc_name(self):
if self.name is None:
else:
args = (UCComparator,)
mapper(User, users, properties=dict(
- uc_name = sa.orm.comparable_property(*args)))
+ uc_name=sa.orm.comparable_property(*args)))
return User
for User in (map_(True), map_(False)):
assert_raises_message(
AttributeError,
"Neither 'extendedproperty' object nor 'UCComparator' "
- "object associated with User.uc_name has an attribute 'nonexistent'",
+ "object associated with User.uc_name has an attribute "
+ "'nonexistent'",
getattr, User.uc_name, 'nonexistent')
# test compile
assert not isinstance(User.uc_name == 'jack', bool)
- u = q.filter(User.uc_name=='JACK').one()
+ u = q.filter(User.uc_name == 'JACK').one()
assert u.uc_name == "JACK"
assert u not in sess.dirty
class MyComparator(sa.orm.properties.ColumnProperty.Comparator):
__hash__ = None
+
def __eq__(self, other):
# lower case comparison
return func.lower(self.__clause_element__()
- ) == func.lower(other)
+ ) == func.lower(other)
def intersects(self, other):
# non-standard comparator
mapper(User, users, properties={
'name': sa.orm.column_property(users.c.name,
- comparator_factory=MyComparator)
+ comparator_factory=MyComparator)
})
assert_raises_message(
eq_(
str((User.name == 'ed').compile(
- dialect=sa.engine.default.DefaultDialect())),
+ dialect=sa.engine.default.DefaultDialect())),
"lower(users.name) = lower(:lower_1)")
eq_(
str((User.name.intersects('ed')).compile(
- dialect=sa.engine.default.DefaultDialect())),
+ dialect=sa.engine.default.DefaultDialect())),
"users.name &= :name_1")
-
def test_reentrant_compile(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
class MyFakeProperty(sa.orm.properties.ColumnProperty):
+
def post_instrument_class(self, mapper):
super(MyFakeProperty, self).post_instrument_class(mapper)
configure_mappers()
m1 = mapper(User, users, properties={
- 'name':MyFakeProperty(users.c.name)
+ 'name': MyFakeProperty(users.c.name)
})
m2 = mapper(Address, addresses)
configure_mappers()
sa.orm.clear_mappers()
+
class MyFakeProperty(sa.orm.properties.ColumnProperty):
+
def post_instrument_class(self, mapper):
super(MyFakeProperty, self).post_instrument_class(mapper)
configure_mappers()
m1 = mapper(User, users, properties={
- 'name':MyFakeProperty(users.c.name)
+ 'name': MyFakeProperty(users.c.name)
})
m2 = mapper(Address, addresses)
configure_mappers()
recon = []
class User(object):
+
@reconstructor
def reconstruct(self):
recon.append('go')
users = self.tables.users
recon = []
+
class A(object):
+
@reconstructor
def reconstruct(self):
assert isinstance(self, A)
recon.append('A')
class B(A):
+
@reconstructor
def reconstruct(self):
assert isinstance(self, B)
recon.append('B')
class C(A):
+
@reconstructor
def reconstruct(self):
assert isinstance(self, C)
users = self.tables.users
recon = []
+
class Base(object):
+
@reconstructor
def reconstruct(self):
recon.append('go')
def test_unmapped_error(self):
Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
mapper(Address, addresses)
sa.orm.clear_mappers()
mapper(User, users, properties={
- 'addresses':relationship(Address)
+ 'addresses': relationship(Address)
})
assert_raises_message(
Address = self.classes.Address
mapper(User, users, properties={
- "addresses": relationship(Address,
- primaryjoin=lambda: users.c.id == addresses.wrong.user_id)
- })
+ "addresses": relationship(
+ Address,
+ primaryjoin=lambda: users.c.id == addresses.wrong.user_id)
+ })
mapper(Address, addresses)
assert_raises_message(
AttributeError,
Address = self.classes.Address
mapper(User, users, properties={
- "addresses": relationship(Address,
- primaryjoin=lambda: users.c.id ==
- addresses.__dict__['wrong'].user_id)
- })
+ "addresses": relationship(Address,
+ primaryjoin=lambda: users.c.id ==
+ addresses.__dict__['wrong'].user_id)
+ })
mapper(Address, addresses)
assert_raises_message(
KeyError,
class Base(object):
pass
+
class Sub(Base):
pass
# using it with an ORM operation, raises
assert_raises(sa.orm.exc.UnmappedClassError,
- create_session().add, Sub())
+ create_session().add, Sub())
def test_unmapped_subclass_error_premap(self):
users = self.tables.users
# using it with an ORM operation, raises
assert_raises(sa.orm.exc.UnmappedClassError,
- create_session().add, Sub())
+ create_session().add, Sub())
def test_oldstyle_mixin(self):
users = self.tables.users
class OldStyle:
pass
+
class NewStyle(object):
pass
mapper(B, users)
+
class DocumentTest(fixtures.TestBase):
def test_doc_propagate(self):
metadata = MetaData()
t1 = Table('t1', metadata,
- Column('col1', Integer, primary_key=True, doc="primary key column"),
- Column('col2', String, doc="data col"),
- Column('col3', String, doc="data col 2"),
- Column('col4', String, doc="data col 3"),
- Column('col5', String),
- )
+ Column('col1', Integer, primary_key=True,
+ doc="primary key column"),
+ Column('col2', String, doc="data col"),
+ Column('col3', String, doc="data col 2"),
+ Column('col4', String, doc="data col 3"),
+ Column('col5', String),
+ )
t2 = Table('t2', metadata,
- Column('col1', Integer, primary_key=True, doc="primary key column"),
- Column('col2', String, doc="data col"),
- Column('col3', Integer, ForeignKey('t1.col1'), doc="foreign key to t1.col1")
- )
+ Column('col1', Integer, primary_key=True,
+ doc="primary key column"),
+ Column('col2', String, doc="data col"),
+ Column('col3', Integer, ForeignKey('t1.col1'),
+ doc="foreign key to t1.col1")
+ )
class Foo(object):
pass
pass
mapper(Foo, t1, properties={
- 'bars':relationship(Bar,
- doc="bar relationship",
- backref=backref('foo',doc='foo relationship')
- ),
- 'foober':column_property(t1.c.col3, doc='alternate data col'),
- 'hoho':synonym("col4", doc="syn of col4")
+ 'bars': relationship(Bar,
+ doc="bar relationship",
+ backref=backref('foo', doc='foo relationship')
+ ),
+ 'foober': column_property(t1.c.col3, doc='alternate data col'),
+ 'hoho': synonym("col4", doc="syn of col4")
})
mapper(Bar, t2)
configure_mappers()
eq_(Bar.col1.__doc__, "primary key column")
eq_(Bar.foo.__doc__, "foo relationship")
+
class ORMLoggingTest(_fixtures.FixtureTest):
+
def setup(self):
self.buf = logging.handlers.BufferingHandler(100)
for log in [
for msg in self._current_messages():
assert msg.startswith('(User|%%(%d anon)s) ' % id(tb))
+
class OptionsTest(_fixtures.FixtureTest):
def test_synonym_options(self):
Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
mapper(User, users, properties=dict(
- addresses = relationship(mapper(Address, addresses), lazy='select',
- order_by=addresses.c.id),
- adlist = synonym('addresses')))
+ addresses=relationship(mapper(Address, addresses), lazy='select',
+ order_by=addresses.c.id),
+ adlist=synonym('addresses')))
def go():
sess = create_session()
"""A lazy relationship can be upgraded to an eager relationship."""
Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
mapper(User, users, properties=dict(
- addresses = relationship(mapper(Address, addresses),
- order_by=addresses.c.id)))
+ addresses=relationship(mapper(Address, addresses),
+ order_by=addresses.c.id)))
sess = create_session()
l = (sess.query(User).
def test_eager_options_with_limit(self):
Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
mapper(User, users, properties=dict(
addresses=relationship(mapper(Address, addresses), lazy='select')))
def test_lazy_options_with_limit(self):
Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
mapper(User, users, properties=dict(
- addresses = relationship(mapper(Address, addresses), lazy='joined')))
+ addresses=relationship(mapper(Address, addresses), lazy='joined')))
sess = create_session()
u = (sess.query(User).
if eager columns are not available"""
Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
mapper(User, users, properties=dict(
- addresses = relationship(mapper(Address, addresses),
- lazy='joined', order_by=addresses.c.id)))
+ addresses=relationship(mapper(Address, addresses),
+ lazy='joined', order_by=addresses.c.id)))
sess = create_session()
# first test straight eager load, 1 statement
+
def go():
l = sess.query(User).order_by(User.id).all()
eq_(l, self.static.user_address_result)
# (previous users in session fell out of scope and were removed from
# session's identity map)
r = users.select().order_by(users.c.id).execute()
+
def go():
l = list(sess.query(User).instances(r))
eq_(l, self.static.user_address_result)
self.sql_count_(4, go)
def test_eager_degrade_deep(self):
- users, Keyword, items, order_items, orders, Item, User, Address, keywords, item_keywords, Order, addresses = (self.tables.users,
- self.classes.Keyword,
- self.tables.items,
- self.tables.order_items,
- self.tables.orders,
- self.classes.Item,
- self.classes.User,
- self.classes.Address,
- self.tables.keywords,
- self.tables.item_keywords,
- self.classes.Order,
- self.tables.addresses)
+ users, Keyword, items, order_items, orders, \
+ Item, User, Address, keywords, item_keywords, Order, addresses = (
+ self.tables.users,
+ self.classes.Keyword,
+ self.tables.items,
+ self.tables.order_items,
+ self.tables.orders,
+ self.classes.Item,
+ self.classes.User,
+ self.classes.Address,
+ self.tables.keywords,
+ self.tables.item_keywords,
+ self.classes.Order,
+ self.tables.addresses)
# test with a deeper set of eager loads. when we first load the three
# users, they will have no addresses or orders. the number of lazy
mapper(Item, items, properties=dict(
keywords=relationship(Keyword, secondary=item_keywords,
- lazy='joined',
- order_by=item_keywords.c.keyword_id)))
+ lazy='joined',
+ order_by=item_keywords.c.keyword_id)))
mapper(Order, orders, properties=dict(
items=relationship(Item, secondary=order_items, lazy='joined',
- order_by=order_items.c.item_id)))
+ order_by=order_items.c.item_id)))
mapper(User, users, properties=dict(
addresses=relationship(Address, lazy='joined',
- order_by=addresses.c.id),
+ order_by=addresses.c.id),
orders=relationship(Order, lazy='joined',
- order_by=orders.c.id)))
+ order_by=orders.c.id)))
sess = create_session()
# then select just from users. run it into instances.
# then assert the data, which will launch 6 more lazy loads
r = users.select().execute()
+
def go():
l = list(sess.query(User).instances(r))
eq_(l, self.static.user_all_result)
"""An eager relationship can be upgraded to a lazy relationship."""
Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
mapper(User, users, properties=dict(
- addresses = relationship(mapper(Address, addresses), lazy='joined')
+ addresses=relationship(mapper(Address, addresses), lazy='joined')
))
sess = create_session()
self.sql_count_(4, go)
def test_option_propagate(self):
- users, items, order_items, Order, Item, User, orders = (self.tables.users,
- self.tables.items,
- self.tables.order_items,
- self.classes.Order,
- self.classes.Item,
- self.classes.User,
- self.tables.orders)
+ users, items, order_items, Order, Item, User, orders = (
+ self.tables.users,
+ self.tables.items,
+ self.tables.order_items,
+ self.classes.Order,
+ self.classes.Item,
+ self.classes.User,
+ self.tables.orders)
mapper(User, users, properties=dict(
- orders = relationship(Order)
+ orders=relationship(Order)
))
mapper(Order, orders, properties=dict(
- items = relationship(Item, secondary=order_items)
+ items=relationship(Item, secondary=order_items)
))
mapper(Item, items)
oalias = aliased(Order)
opt1 = sa.orm.joinedload(User.orders, Order.items)
opt2 = sa.orm.contains_eager(User.orders, Order.items, alias=oalias)
- u1 = sess.query(User).join(oalias, User.orders).options(opt1, opt2).first()
+ u1 = sess.query(User).join(oalias, User.orders).\
+ options(opt1, opt2).first()
ustate = attributes.instance_state(u1)
assert opt1 in ustate.load_options
assert opt2 not in ustate.load_options
class DeepOptionsTest(_fixtures.FixtureTest):
+
@classmethod
def setup_mappers(cls):
- users, Keyword, items, order_items, Order, Item, User, keywords, item_keywords, orders = (cls.tables.users,
- cls.classes.Keyword,
- cls.tables.items,
- cls.tables.order_items,
- cls.classes.Order,
- cls.classes.Item,
- cls.classes.User,
- cls.tables.keywords,
- cls.tables.item_keywords,
- cls.tables.orders)
+ users, Keyword, items, order_items, Order, Item, User, \
+ keywords, item_keywords, orders = (
+ cls.tables.users,
+ cls.classes.Keyword,
+ cls.tables.items,
+ cls.tables.order_items,
+ cls.classes.Order,
+ cls.classes.Item,
+ cls.classes.User,
+ cls.tables.keywords,
+ cls.tables.item_keywords,
+ cls.tables.orders)
mapper(Keyword, keywords)
mapper(Item, items, properties=dict(
keywords=relationship(Keyword, item_keywords,
- order_by=item_keywords.c.item_id)))
+ order_by=item_keywords.c.item_id)))
mapper(Order, orders, properties=dict(
items=relationship(Item, order_items,
- order_by=items.c.id)))
+ order_by=items.c.id)))
mapper(User, users, order_by=users.c.id, properties=dict(
orders=relationship(Order, order_by=orders.c.id)))
# joinedload nothing.
u = sess.query(User).all()
+
def go():
- x = u[0].orders[1].items[0].keywords[1]
+ u[0].orders[1].items[0].keywords[1]
self.assert_sql_count(testing.db, go, 3)
def test_deep_options_2(self):
User = self.classes.User
-
sess = create_session()
l = (sess.query(User).
- options(sa.orm.joinedload_all('orders.items.keywords'))).all()
+ options(sa.orm.joinedload_all('orders.items.keywords'))).all()
+
def go():
- x = l[0].orders[1].items[0].keywords[1]
+ l[0].orders[1].items[0].keywords[1]
self.sql_count_(0, go)
sess = create_session()
l = (sess.query(User).
- options(sa.orm.subqueryload_all('orders.items.keywords'))).all()
+ options(sa.orm.subqueryload_all('orders.items.keywords'))).all()
+
def go():
- x = l[0].orders[1].items[0].keywords[1]
+ l[0].orders[1].items[0].keywords[1]
self.sql_count_(0, go)
-
def test_deep_options_3(self):
User = self.classes.User
options(sa.orm.joinedload('orders.items')).
options(sa.orm.joinedload('orders.items.keywords')))
u = q2.all()
+
def go():
- x = u[0].orders[1].items[0].keywords[1]
+ u[0].orders[1].items[0].keywords[1]
self.sql_count_(0, go)
def test_deep_options_4(self):
Item, User, Order = (self.classes.Item,
- self.classes.User,
- self.classes.Order)
+ self.classes.User,
+ self.classes.Order)
sess = create_session()
# joinedload "keywords" on items. it will lazy load "orders", then
# lazy load the "items" on the order, but on "items" it will eager
# load the "keywords"
- q3 = sess.query(User).options(sa.orm.joinedload('orders.items.keywords'))
+ q3 = sess.query(User).options(
+ sa.orm.joinedload('orders.items.keywords'))
u = q3.all()
+
def go():
- x = u[0].orders[1].items[0].keywords[1]
+ u[0].orders[1].items[0].keywords[1]
self.sql_count_(2, go)
sess = create_session()
q3 = sess.query(User).options(
- sa.orm.joinedload(User.orders, Order.items, Item.keywords))
+ sa.orm.joinedload(User.orders, Order.items, Item.keywords))
u = q3.all()
+
def go():
- x = u[0].orders[1].items[0].keywords[1]
+ u[0].orders[1].items[0].keywords[1]
self.sql_count_(2, go)
+
class ComparatorFactoryTest(_fixtures.FixtureTest, AssertsCompiledSQL):
+
def test_kwarg_accepted(self):
users, Address = self.tables.users, self.classes.Address
class DummyComposite(object):
+
def __init__(self, x, y):
pass
class MyFactory(ColumnProperty.Comparator):
__hash__ = None
+
def __eq__(self, other):
- return func.foobar(self.__clause_element__()) == func.foobar(other)
- mapper(User, users, properties={'name':column_property(users.c.name, comparator_factory=MyFactory)})
- self.assert_compile(User.name == 'ed', "foobar(users.name) = foobar(:foobar_1)", dialect=default.DefaultDialect())
- self.assert_compile(aliased(User).name == 'ed', "foobar(users_1.name) = foobar(:foobar_1)", dialect=default.DefaultDialect())
+ return func.foobar(self.__clause_element__()) == \
+ func.foobar(other)
+ mapper(
+ User, users,
+ properties={
+ 'name': column_property(
+ users.c.name, comparator_factory=MyFactory)})
+ self.assert_compile(
+ User.name == 'ed',
+ "foobar(users.name) = foobar(:foobar_1)",
+ dialect=default.DefaultDialect()
+ )
+ self.assert_compile(
+ aliased(User).name == 'ed',
+ "foobar(users_1.name) = foobar(:foobar_1)",
+ dialect=default.DefaultDialect())
def test_synonym(self):
users, User = self.tables.users, self.classes.User
from sqlalchemy.orm.properties import ColumnProperty
+
class MyFactory(ColumnProperty.Comparator):
__hash__ = None
+
def __eq__(self, other):
return func.foobar(self.__clause_element__()) ==\
- func.foobar(other)
+ func.foobar(other)
mapper(User, users, properties={
- 'name':synonym('_name', map_column=True,
- comparator_factory=MyFactory)
- })
+ 'name': synonym('_name', map_column=True,
+ comparator_factory=MyFactory)
+ })
self.assert_compile(
- User.name == 'ed',
- "foobar(users.name) = foobar(:foobar_1)",
- dialect=default.DefaultDialect())
+ User.name == 'ed',
+ "foobar(users.name) = foobar(:foobar_1)",
+ dialect=default.DefaultDialect())
self.assert_compile(
- aliased(User).name == 'ed',
- "foobar(users_1.name) = foobar(:foobar_1)",
- dialect=default.DefaultDialect())
+ aliased(User).name == 'ed',
+ "foobar(users_1.name) = foobar(:foobar_1)",
+ dialect=default.DefaultDialect())
def test_relationship(self):
users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
from sqlalchemy.orm.properties import RelationshipProperty
# primaryjoin/secondaryjoin
class MyFactory(RelationshipProperty.Comparator):
__hash__ = None
+
def __eq__(self, other):
return func.foobar(self._source_selectable().c.user_id) == \
func.foobar(other.id)
class MyFactory2(RelationshipProperty.Comparator):
__hash__ = None
+
def __eq__(self, other):
return func.foobar(self._source_selectable().c.id) == \
func.foobar(other.user_id)
mapper(User, users)
mapper(Address, addresses, properties={
- 'user': relationship(User, comparator_factory=MyFactory,
+ 'user': relationship(
+ User, comparator_factory=MyFactory,
backref=backref("addresses", comparator_factory=MyFactory2)
)
- }
+ }
)
# these are kind of nonsensical tests.
self.assert_compile(Address.user == User(id=5),
- "foobar(addresses.user_id) = foobar(:foobar_1)",
- dialect=default.DefaultDialect())
+ "foobar(addresses.user_id) = foobar(:foobar_1)",
+ dialect=default.DefaultDialect())
self.assert_compile(User.addresses == Address(id=5, user_id=7),
- "foobar(users.id) = foobar(:foobar_1)",
- dialect=default.DefaultDialect())
+ "foobar(users.id) = foobar(:foobar_1)",
+ dialect=default.DefaultDialect())
self.assert_compile(
- aliased(Address).user == User(id=5),
- "foobar(addresses_1.user_id) = foobar(:foobar_1)",
- dialect=default.DefaultDialect())
+ aliased(Address).user == User(id=5),
+ "foobar(addresses_1.user_id) = foobar(:foobar_1)",
+ dialect=default.DefaultDialect())
self.assert_compile(
- aliased(User).addresses == Address(id=5, user_id=7),
- "foobar(users_1.id) = foobar(:foobar_1)",
- dialect=default.DefaultDialect())
-
+ aliased(User).addresses == Address(id=5, user_id=7),
+ "foobar(users_1.id) = foobar(:foobar_1)",
+ dialect=default.DefaultDialect())
class SecondaryOptionsTest(fixtures.MappedTest):
- """test that the contains_eager() option doesn't bleed into a secondary load."""
+
+ """test that the contains_eager() option doesn't bleed
+ into a secondary load."""
run_inserts = 'once'
@classmethod
def define_tables(cls, metadata):
Table("base", metadata,
- Column('id', Integer, primary_key=True),
- Column('type', String(50), nullable=False)
- )
+ Column('id', Integer, primary_key=True),
+ Column('type', String(50), nullable=False)
+ )
Table("child1", metadata,
- Column('id', Integer, ForeignKey('base.id'), primary_key=True),
- Column('child2id', Integer, ForeignKey('child2.id'), nullable=False)
- )
+ Column('id', Integer, ForeignKey('base.id'), primary_key=True),
+ Column(
+ 'child2id', Integer, ForeignKey('child2.id'), nullable=False)
+ )
Table("child2", metadata,
- Column('id', Integer, ForeignKey('base.id'), primary_key=True),
- )
+ Column('id', Integer, ForeignKey('base.id'), primary_key=True),
+ )
Table('related', metadata,
- Column('id', Integer, ForeignKey('base.id'), primary_key=True),
- )
+ Column('id', Integer, ForeignKey('base.id'), primary_key=True),
+ )
@classmethod
def setup_mappers(cls):
child1, child2, base, related = (cls.tables.child1,
- cls.tables.child2,
- cls.tables.base,
- cls.tables.related)
+ cls.tables.child2,
+ cls.tables.base,
+ cls.tables.related)
class Base(cls.Comparable):
pass
+
class Child1(Base):
pass
+
class Child2(Base):
pass
+
class Related(cls.Comparable):
pass
mapper(Base, base, polymorphic_on=base.c.type, properties={
- 'related':relationship(Related, uselist=False)
+ 'related': relationship(Related, uselist=False)
})
mapper(Child1, child1, inherits=Base,
- polymorphic_identity='child1',
- properties={
- 'child2':relationship(Child2,
- primaryjoin=child1.c.child2id==base.c.id,
- foreign_keys=child1.c.child2id)
- })
+ polymorphic_identity='child1',
+ properties={
+ 'child2': relationship(Child2,
+ primaryjoin=child1.c.child2id == base.c.id,
+ foreign_keys=child1.c.child2id)
+ })
mapper(Child2, child2, inherits=Base, polymorphic_identity='child2')
mapper(Related, related)
@classmethod
def insert_data(cls):
child1, child2, base, related = (cls.tables.child1,
- cls.tables.child2,
- cls.tables.base,
- cls.tables.related)
+ cls.tables.child2,
+ cls.tables.base,
+ cls.tables.related)
base.insert().execute([
- {'id':1, 'type':'child1'},
- {'id':2, 'type':'child1'},
- {'id':3, 'type':'child1'},
- {'id':4, 'type':'child2'},
- {'id':5, 'type':'child2'},
- {'id':6, 'type':'child2'},
+ {'id': 1, 'type': 'child1'},
+ {'id': 2, 'type': 'child1'},
+ {'id': 3, 'type': 'child1'},
+ {'id': 4, 'type': 'child2'},
+ {'id': 5, 'type': 'child2'},
+ {'id': 6, 'type': 'child2'},
])
child2.insert().execute([
- {'id':4},
- {'id':5},
- {'id':6},
+ {'id': 4},
+ {'id': 5},
+ {'id': 6},
])
child1.insert().execute([
- {'id':1, 'child2id':4},
- {'id':2, 'child2id':5},
- {'id':3, 'child2id':6},
+ {'id': 1, 'child2id': 4},
+ {'id': 2, 'child2id': 5},
+ {'id': 3, 'child2id': 6},
])
related.insert().execute([
- {'id':1},
- {'id':2},
- {'id':3},
- {'id':4},
- {'id':5},
- {'id':6},
+ {'id': 1},
+ {'id': 2},
+ {'id': 3},
+ {'id': 4},
+ {'id': 5},
+ {'id': 6},
])
def test_contains_eager(self):
sess = create_session()
child1s = sess.query(Child1).\
- join(Child1.related).\
- options(sa.orm.contains_eager(Child1.related)).\
- order_by(Child1.id)
+ join(Child1.related).\
+ options(sa.orm.contains_eager(Child1.related)).\
+ order_by(Child1.id)
def go():
eq_(
testing.db,
lambda: c1.child2,
CompiledSQL(
- "SELECT child2.id AS child2_id, base.id AS base_id, base.type AS base_type "
+ "SELECT child2.id AS child2_id, base.id AS base_id, "
+ "base.type AS base_type "
"FROM base JOIN child2 ON base.id = child2.id "
"WHERE base.id = :param_1",
- {'param_1':4}
+ {'param_1': 4}
)
)
sess = create_session()
- child1s = sess.query(Child1).join(Child1.related).options(sa.orm.joinedload(Child1.related)).order_by(Child1.id)
+ child1s = sess.query(Child1).join(Child1.related).options(
+ sa.orm.joinedload(Child1.related)).order_by(Child1.id)
def go():
eq_(
child1s.all(),
- [Child1(id=1, related=Related(id=1)), Child1(id=2, related=Related(id=2)), Child1(id=3, related=Related(id=3))]
+ [Child1(id=1, related=Related(id=1)),
+ Child1(id=2, related=Related(id=2)),
+ Child1(id=3, related=Related(id=3))]
)
self.assert_sql_count(testing.db, go, 1)
testing.db,
lambda: c1.child2,
CompiledSQL(
- "SELECT child2.id AS child2_id, base.id AS base_id, base.type AS base_type "
- "FROM base JOIN child2 ON base.id = child2.id WHERE base.id = :param_1",
-
-# joinedload- this shouldn't happen
-# "SELECT base.id AS base_id, child2.id AS child2_id, base.type AS base_type, "
-# "related_1.id AS related_1_id FROM base JOIN child2 ON base.id = child2.id "
-# "LEFT OUTER JOIN related AS related_1 ON base.id = related_1.id WHERE base.id = :param_1",
- {'param_1':4}
+ "SELECT child2.id AS child2_id, base.id AS base_id, "
+ "base.type AS base_type "
+ "FROM base JOIN child2 ON base.id = child2.id "
+ "WHERE base.id = :param_1",
+
+ {'param_1': 4}
)
)
def test_joinedload_on_same(self):
Child1, Child2, Related = (self.classes.Child1,
- self.classes.Child2,
- self.classes.Related)
+ self.classes.Child2,
+ self.classes.Related)
sess = create_session()
- child1s = sess.query(Child1).join(Child1.related).options(sa.orm.joinedload(Child1.child2, Child2.related)).order_by(Child1.id)
+ child1s = sess.query(Child1).join(Child1.related).options(
+ sa.orm.joinedload(Child1.child2, Child2.related)
+ ).order_by(Child1.id)
def go():
eq_(
child1s.all(),
- [Child1(id=1, related=Related(id=1)), Child1(id=2, related=Related(id=2)), Child1(id=3, related=Related(id=3))]
+ [Child1(id=1, related=Related(id=1)),
+ Child1(id=2, related=Related(id=2)),
+ Child1(id=3, related=Related(id=3))]
)
self.assert_sql_count(testing.db, go, 4)
testing.db,
lambda: c1.child2,
CompiledSQL(
- "SELECT child2.id AS child2_id, base.id AS base_id, base.type AS base_type, "
- "related_1.id AS related_1_id FROM base JOIN child2 ON base.id = child2.id "
- "LEFT OUTER JOIN related AS related_1 ON base.id = related_1.id WHERE base.id = :param_1",
- {'param_1':4}
+ "SELECT child2.id AS child2_id, base.id AS base_id, "
+ "base.type AS base_type, "
+ "related_1.id AS related_1_id FROM base JOIN child2 "
+ "ON base.id = child2.id "
+ "LEFT OUTER JOIN related AS related_1 "
+ "ON base.id = related_1.id WHERE base.id = :param_1",
+ {'param_1': 4}
)
)
class DeferredPopulationTest(fixtures.MappedTest):
+
@classmethod
def define_tables(cls, metadata):
Table("thing", metadata,
- Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
- Column("name", String(20)))
+ Column(
+ "id", Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column("name", String(20)))
Table("human", metadata,
- Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
- Column("thing_id", Integer, ForeignKey("thing.id")),
- Column("name", String(20)))
+ Column(
+ "id", Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column("thing_id", Integer, ForeignKey("thing.id")),
+ Column("name", String(20)))
@classmethod
def setup_mappers(cls):
thing, human = cls.tables.thing, cls.tables.human
- class Human(cls.Basic): pass
- class Thing(cls.Basic): pass
+ class Human(cls.Basic):
+ pass
+
+ class Thing(cls.Basic):
+ pass
mapper(Human, human, properties={"thing": relationship(Thing)})
mapper(Thing, thing, properties={"name": deferred(thing.c.name)})
Thing = self.classes.Thing
session = create_session()
- result = session.query(Thing).first()
+ result = session.query(Thing).first() # noqa
session.expunge_all()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
Thing = self.classes.Thing
session = create_session()
- result = session.query(Thing).first()
+ result = session.query(Thing).first() # noqa
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
Thing, Human = self.classes.Thing, self.classes.Human
session = create_session()
- human = session.query(Human).options(sa.orm.joinedload("thing")).first()
+ human = session.query(Human).options( # noqa
+ sa.orm.joinedload("thing")).first()
session.expunge_all()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
Thing, Human = self.classes.Thing, self.classes.Human
session = create_session()
- human = session.query(Human).options(sa.orm.joinedload("thing")).first()
+ human = session.query(Human).options( # noqa
+ sa.orm.joinedload("thing")).first()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
Thing, Human = self.classes.Thing, self.classes.Human
session = create_session()
- result = session.query(Human).add_entity(Thing).join("thing").first()
+ result = session.query(Human).add_entity( # noqa
+ Thing).join("thing").first()
session.expunge_all()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
Thing, Human = self.classes.Thing, self.classes.Human
session = create_session()
- result = session.query(Human).add_entity(Thing).join("thing").first()
+ result = session.query(Human).add_entity( # noqa
+ Thing).join("thing").first()
thing = session.query(Thing).options(sa.orm.undefer("name")).first()
self._test(thing)
-
-
class NoLoadTest(_fixtures.FixtureTest):
run_inserts = 'once'
run_deletes = None
- def test_basic(self):
- """A basic one-to-many lazy load"""
+ def test_o2m_noload(self):
- Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
+ Address, addresses, users, User = (
+ self.classes.Address,
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
m = mapper(User, users, properties=dict(
- addresses = relationship(mapper(Address, addresses), lazy='noload')
+ addresses=relationship(mapper(Address, addresses), lazy='noload')
))
q = create_session().query(m)
l = [None]
+
def go():
x = q.filter(User.id == 7).all()
x[0].addresses
l[0] = x
self.assert_sql_count(testing.db, go, 1)
- self.assert_result(l[0], User,
- {'id' : 7, 'addresses' : (Address, [])},
- )
+ self.assert_result(
+ l[0], User,
+ {'id': 7, 'addresses': (Address, [])},
+ )
- def test_options(self):
- Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
+ def test_upgrade_o2m_noload_lazyload_option(self):
+ Address, addresses, users, User = (
+ self.classes.Address,
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
m = mapper(User, users, properties=dict(
- addresses = relationship(mapper(Address, addresses), lazy='noload')
+ addresses=relationship(mapper(Address, addresses), lazy='noload')
))
q = create_session().query(m).options(sa.orm.lazyload('addresses'))
l = [None]
+
def go():
x = q.filter(User.id == 7).all()
x[0].addresses
l[0] = x
self.sql_count_(2, go)
- self.assert_result(l[0], User,
- {'id' : 7, 'addresses' : (Address, [{'id' : 1}])},
- )
-
-
-
+ self.assert_result(
+ l[0], User,
+ {'id': 7, 'addresses': (Address, [{'id': 1}])},
+ )
class RequirementsTest(fixtures.MappedTest):
+
"""Tests the contract for user classes."""
@classmethod
def define_tables(cls, metadata):
Table('ht1', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column(
+ 'id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('value', String(10)))
Table('ht2', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column(
+ 'id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('ht1_id', Integer, ForeignKey('ht1.id')),
Column('value', String(10)))
Table('ht3', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column(
+ 'id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('value', String(10)))
Table('ht4', metadata,
- Column('ht1_id', Integer, ForeignKey('ht1.id'), primary_key=True),
- Column('ht3_id', Integer, ForeignKey('ht3.id'), primary_key=True))
+ Column('ht1_id', Integer, ForeignKey('ht1.id'),
+ primary_key=True),
+ Column('ht3_id', Integer, ForeignKey('ht3.id'),
+ primary_key=True))
Table('ht5', metadata,
- Column('ht1_id', Integer, ForeignKey('ht1.id'), primary_key=True))
+ Column('ht1_id', Integer, ForeignKey('ht1.id'),
+ primary_key=True))
Table('ht6', metadata,
- Column('ht1a_id', Integer, ForeignKey('ht1.id'), primary_key=True),
- Column('ht1b_id', Integer, ForeignKey('ht1.id'), primary_key=True),
+ Column('ht1a_id', Integer, ForeignKey('ht1.id'),
+ primary_key=True),
+ Column('ht1b_id', Integer, ForeignKey('ht1.id'),
+ primary_key=True),
Column('value', String(10)))
if util.py2k:
pass
# TODO: is weakref support detectable without an instance?
- #self.assertRaises(sa.exc.ArgumentError, mapper, NoWeakrefSupport, t2)
+ # self.assertRaises(
+ # sa.exc.ArgumentError, mapper, NoWeakrefSupport, t2)
class _ValueBase(object):
+
def __init__(self, value='abc', id=None):
self.id = id
self.value = value
+
def __bool__(self):
return False
+
def __hash__(self):
return hash(self.value)
+
def __eq__(self, other):
if isinstance(other, type(self)):
return self.value == other.value
"""
ht6, ht5, ht4, ht3, ht2, ht1 = (self.tables.ht6,
- self.tables.ht5,
- self.tables.ht4,
- self.tables.ht3,
- self.tables.ht2,
- self.tables.ht1)
-
+ self.tables.ht5,
+ self.tables.ht4,
+ self.tables.ht3,
+ self.tables.ht2,
+ self.tables.ht1)
class H1(self._ValueBase):
pass
+
class H2(self._ValueBase):
pass
+
class H3(self._ValueBase):
pass
+
class H6(self._ValueBase):
pass
'h3s': relationship(H3, secondary=ht4, backref='h1s'),
'h1s': relationship(H1, secondary=ht5, backref='parent_h1'),
't6a': relationship(H6, backref='h1a',
- primaryjoin=ht1.c.id==ht6.c.ht1a_id),
+ primaryjoin=ht1.c.id == ht6.c.ht1a_id),
't6b': relationship(H6, backref='h1b',
- primaryjoin=ht1.c.id==ht6.c.ht1b_id),
- })
+ primaryjoin=ht1.c.id == ht6.c.ht1b_id),
+ })
mapper(H2, ht2)
mapper(H3, ht3)
mapper(H6, ht6)
sa.orm.joinedload_all('h3s.h1s')).all()
eq_(len(h1s), 5)
-
def test_composite_results(self):
ht2, ht1 = (self.tables.ht2,
- self.tables.ht1)
-
+ self.tables.ht1)
class H1(self._ValueBase):
+
def __init__(self, value, id, h2s):
self.value = value
self.id = id
self.h2s = h2s
+
class H2(self._ValueBase):
+
def __init__(self, value, id):
self.value = value
self.id = id
s.commit()
eq_(
[(h1.value, h1.id, h2.value, h2.id)
- for h1, h2 in
- s.query(H1, H2).join(H1.h2s).order_by(H1.id, H2.id)],
+ for h1, h2 in
+ s.query(H1, H2).join(H1.h2s).order_by(H1.id, H2.id)],
[
('abc', 1, 'abc', 1),
('abc', 1, 'def', 2),
ht1 = self.tables.ht1
class H1(object):
+
def __len__(self):
return len(self.get_value())
return self.value
class H2(object):
+
def __bool__(self):
return bool(self.get_value())
h1 = H1()
h1.value = "Asdf"
- h1.value = "asdf asdf" # ding
+ h1.value = "asdf asdf" # ding
h2 = H2()
h2.value = "Asdf"
- h2.value = "asdf asdf" # ding
+ h2.value = "asdf asdf" # ding
+
class IsUserlandTest(fixtures.MappedTest):
+
@classmethod
def define_tables(cls, metadata):
Table('foo', metadata,
- Column('id', Integer, primary_key=True),
- Column('someprop', Integer)
- )
+ Column('id', Integer, primary_key=True),
+ Column('someprop', Integer)
+ )
def _test(self, value, instancelevel=None):
class Foo(object):
return "hi"
self._test(property(somefunc), "hi")
+
class MagicNamesTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('cartographers', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('name', String(50)),
Column('alias', String(50)),
Column('quip', String(100)))
Table('maps', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('cart_id', Integer,
ForeignKey('cartographers.id')),
Column('state', String(2)),
def test_mappish(self):
maps, Cartographer, cartographers, Map = (self.tables.maps,
- self.classes.Cartographer,
- self.tables.cartographers,
- self.classes.Map)
+ self.classes.Cartographer,
+ self.tables.cartographers,
+ self.classes.Map)
mapper(Cartographer, cartographers, properties=dict(
query=cartographers.c.quip))
c = Cartographer(name='Lenny', alias='The Dude',
query='Where be dragons?')
- m = Map(state='AK', mapper=c)
+ Map(state='AK', mapper=c)
sess = create_session()
sess.add(c)
for C, M in ((Cartographer, Map),
(sa.orm.aliased(Cartographer), sa.orm.aliased(Map))):
c1 = (sess.query(C).
- filter(C.alias=='The Dude').
- filter(C.query=='Where be dragons?')).one()
- m1 = sess.query(M).filter(M.mapper==c1).one()
+ filter(C.alias == 'The Dude').
+ filter(C.query == 'Where be dragons?')).one()
+ sess.query(M).filter(M.mapper == c1).one()
def test_direct_stateish(self):
for reserved in (sa.orm.instrumentation.ClassManager.STATE_ATTR,
sa.orm.instrumentation.ClassManager.MANAGER_ATTR):
t = Table('t', sa.MetaData(),
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column(reserved, Integer))
+
class T(object):
pass
assert_raises_message(
('requested attribute name conflicts with '
'instrumentation attribute of the same name'),
mapper, M, maps, properties={
- reserved: maps.c.state})
-
-
+ reserved: maps.c.state})