return ""
def bindparam_string(self, name):
+ # TODO: its not clear how much of bind parameter quoting is "Oracle"
+ # and how much is "cx_Oracle".
if self.preparer._bindparam_requires_quotes(name):
quoted_name = '"%s"' % name
self._quoted_bind_names[name] = quoted_name
class OracleIdentifierPreparer(compiler.IdentifierPreparer):
reserved_words = set([x.lower() for x in RESERVED_WORDS])
+ illegal_initial_characters = re.compile(r'[0-9_$]')
def _bindparam_requires_quotes(self, value):
"""Return True if the given identifier requires quoting."""
del param[fromname]
if self.dialect.auto_setinputsizes:
- self.set_input_sizes(quoted_bind_names)
+ self.set_input_sizes(quoted_bind_names, exclude_types=(self.dialect.dbapi.STRING,))
if len(self.compiled_parameters) == 1:
for key in self.compiled.binds:
if hasattr(self.dbapi, 'version'):
cx_oracle_ver = vers(self.dbapi.version)
self.supports_unicode_binds = cx_oracle_ver >= (5, 0)
-
+
if self.dbapi is None or not self.auto_convert_lobs or not 'CLOB' in self.dbapi.__dict__:
self.dbapi_type_map = {}
self.ORACLE_BINARY_TYPES = []
def lastrow_has_defaults(self):
return hasattr(self, 'postfetch_cols') and len(self.postfetch_cols)
- def set_input_sizes(self, translate=None):
+ def set_input_sizes(self, translate=None, exclude_types=None):
"""Given a cursor and ClauseParameters, call the appropriate
style of ``setinputsizes()`` on the cursor, using DB-API types
from the bind parameter's ``TypeEngine`` objects.
for key in self.compiled.positiontup:
typeengine = types[key]
dbtype = typeengine.dialect_impl(self.dialect).get_dbapi_type(self.dialect.dbapi)
- if dbtype is not None:
+ if dbtype is not None and (not exclude_types or dbtype not in exclude_types):
inputsizes.append(dbtype)
try:
self.cursor.setinputsizes(*inputsizes)
for key in self.compiled.bind_names.values():
typeengine = types[key]
dbtype = typeengine.dialect_impl(self.dialect).get_dbapi_type(self.dialect.dbapi)
- if dbtype is not None:
+ if dbtype is not None and (not exclude_types or dbtype not in exclude_types):
if translate:
key = translate.get(key, key)
inputsizes[key.encode(self.dialect.encoding)] = dbtype
__visit_name__ = "drop_table"
+ def __init__(self, element, cascade=False, **kw):
+ self.cascade = cascade
+ super(DropTable, self).__init__(element, **kw)
+
class CreateSequence(_CreateDropBase):
"""Represent a CREATE SEQUENCE statement."""
return text
def visit_drop_table(self, drop):
- return "\nDROP TABLE " + self.preparer.format_table(drop.element)
+ ret = "\nDROP TABLE " + self.preparer.format_table(drop.element)
+ if drop.cascade:
+ ret += " CASCADE CONSTRAINTS"
+ return ret
def visit_create_index(self, create):
index = create.element
if fk.onupdate is None:
fk.onupdate = 'CASCADE'
- if testing.against('firebird', 'oracle'):
- pk_seqs = [col for col in args
- if (isinstance(col, schema.Column)
- and col.primary_key
- and getattr(col, '_needs_autoincrement', False))]
- for c in pk_seqs:
- c._init_items(schema.Sequence(args[0] + '_' + c.name + '_seq', optional=True))
return schema.Table(*args, **kw)
test_opts = dict([(k,kw.pop(k)) for k in kw.keys()
if k.startswith('test_')])
- c = schema.Column(*args, **kw)
- if testing.against('firebird', 'oracle'):
- if 'test_needs_autoincrement' in test_opts:
- c._needs_autoincrement = True
- return c
+ col = schema.Column(*args, **kw)
+ if 'test_needs_autoincrement' in test_opts and \
+ kw.get('primary_key', False) and \
+ testing.against('firebird', 'oracle'):
+ def add_seq(tbl):
+ col._init_items(schema.Sequence(tbl.name + '_' + col.name + '_seq', optional=True))
+ col._on_table_attach(add_seq)
+ return col
from sqlalchemy import *
from sqlalchemy import types as sqltypes
from sqlalchemy.sql import table, column
-from sqlalchemy.databases import oracle
from sqlalchemy.test import *
from sqlalchemy.test.testing import eq_
from sqlalchemy.test.engines import testing_engine
-from sqlalchemy.dialect.oracle import cx_oracle
+from sqlalchemy.dialects.oracle import cx_oracle, base as oracle
import os
b = bindparam("foo", u"hello world!")
assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING'
- def test_timestamp_adapt(self):
- dialect = oracle.OracleDialect()
- t1 = types.DateTime
- t3 = types.TIMESTAMP
-
-
- assert isinstance(dialect.type_descriptor(t1), cx_oracle.OracleTimestamp)
- assert isinstance(dialect.type_descriptor(t3), cx_oracle.OracleTimestamp)
-
- def test_string_adapt(self):
- oracle_dialect = oracle.OracleDialect()
-
- for dialect, start, test in [
- (oracle_dialect, String(), String),
- (oracle_dialect, VARCHAR(), VARCHAR),
- (oracle_dialect, String(50), String),
- (oracle_dialect, Unicode(), Unicode),
- (oracle_dialect, UnicodeText(), cx_oracle.OracleUnicodeText),
- (oracle_dialect, NCHAR(), NCHAR),
- (oracle_dialect, oracle.OracleRaw(50), oracle.OracleRaw),
+ def test_type_adapt(self):
+ dialect = cx_oracle.dialect()
+
+ for start, test in [
+ (DateTime(), cx_oracle.OracleDateTime),
+ (TIMESTAMP(), cx_oracle.OracleTimestamp),
+ (oracle.OracleRaw(), cx_oracle.cxOracleRaw),
+ (String(), String),
+ (VARCHAR(), VARCHAR),
+ (String(50), String),
+ (Unicode(), Unicode),
+ (Text(), cx_oracle.OracleText),
+ (UnicodeText(), cx_oracle.OracleUnicodeText),
+ (NCHAR(), NCHAR),
+ (oracle.OracleRaw(50), oracle.OracleRaw),
]:
assert isinstance(start.dialect_impl(dialect), test), "wanted %r got %r" % (test, start.dialect_impl(dialect))
Column('data', Binary)
)
meta.create_all()
- stream = os.path.join(os.path.dirname(testenv.__file__), 'binary_data_one.dat')
+ stream = os.path.join(os.path.dirname(__file__), "..", 'binary_data_one.dat')
stream = file(stream).read(12000)
for i in range(1, 11):
import sqlalchemy as sa
from sqlalchemy.test import testing
from sqlalchemy import MetaData, Integer, String, ForeignKey, ForeignKeyConstraint, asc, Index
-from sqlalchemy.test.schema import Table
-from sqlalchemy.test.schema import Column
+from sqlalchemy.test.schema import Table, Column
from sqlalchemy.orm import relation, create_session, class_mapper, eagerload, compile_mappers, backref, clear_mappers, polymorphic_union, deferred
from sqlalchemy.test.testing import eq_
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
name = Column('name', String(50))
addresses = relation("Address", backref="user")
class Address(Base, ComparableEntity):
__tablename__ = 'addresses'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
email = Column(String(50), key='_email')
user_id = Column('user_id', Integer, ForeignKey('users.id'),
key='_user_id')
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
name = Column(String(50))
addresses = relation("Address", order_by="desc(Address.email)",
primaryjoin="User.id==Address.user_id", foreign_keys="[Address.user_id]",
class Address(Base, ComparableEntity):
__tablename__ = 'addresses'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
email = Column(String(50))
user_id = Column(Integer) # note no foreign key
def test_uncompiled_attributes_in_relation(self):
class Address(Base, ComparableEntity):
__tablename__ = 'addresses'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
email = Column(String(50))
user_id = Column(Integer, ForeignKey('users.id'))
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
name = Column(String(50))
addresses = relation("Address", order_by=Address.email,
foreign_keys=Address.user_id,
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
User.name = Column('name', String(50))
User.addresses = relation("Address", backref="user")
class Address(Base, ComparableEntity):
__tablename__ = 'addresses'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
Address.email = Column(String(50), key='_email')
Address.user_id = Column('user_id', Integer, ForeignKey('users.id'),
key='_user_id')
class Address(Base, ComparableEntity):
__tablename__ = 'addresses'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
email = Column('email', String(50))
user_id = Column('user_id', Integer, ForeignKey('users.id'))
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
name = Column('name', String(50))
addresses = relation("Address", order_by=Address.email)
class Address(Base, ComparableEntity):
__tablename__ = 'addresses'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
email = Column('email', String(50))
user_id = Column('user_id', Integer, ForeignKey('users.id'))
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
name = Column('name', String(50))
addresses = relation("Address", order_by=(Address.email, Address.id))
class User(ComparableEntity):
__tablename__ = 'users'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
name = Column('name', String(50))
addresses = relation("Address", backref="user")
class Address(ComparableEntity):
__tablename__ = 'addresses'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
email = Column('email', String(50))
user_id = Column('user_id', Integer, ForeignKey('users.id'))
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
name = Column('name', String(50))
addresses = relation("Address", backref="user")
class Address(Base, ComparableEntity):
__tablename__ = 'addresses'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
email = Column('email', String(50))
user_id = Column('user_id', Integer, ForeignKey('users.id'))
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
name = Column('name', String(50))
User.a = Column('a', String(10))
def test_column_properties(self):
class Address(Base, ComparableEntity):
__tablename__ = 'addresses'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
email = Column(String(50))
user_id = Column(Integer, ForeignKey('users.id'))
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
name = Column('name', String(50))
adr_count = sa.orm.column_property(
sa.select([sa.func.count(Address.id)], Address.user_id == id).
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
name = sa.orm.deferred(Column(String(50)))
Base.metadata.create_all()
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
_name = Column('name', String(50))
def _set_name(self, name):
self._name = "SOMENAME " + name
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
_name = Column('name', String(50))
name = sa.orm.synonym('_name', comparator_factory=CustomCompare)
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
_name = Column('name', String(50))
def _set_name(self, name):
self._name = "SOMENAME " + name
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
name = Column('name', String(50))
addresses = relation("Address", backref="user")
class Address(Base, ComparableEntity):
__tablename__ = 'addresses'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
email = Column('email', String(50))
user_id = Column('user_id', Integer, ForeignKey(User.id))
class Address(Base, ComparableEntity):
__tablename__ = 'addresses'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
email = Column('email', String(50))
user_id = Column('user_id', Integer, ForeignKey('users.id'))
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
name = Column('name', String(50))
addresses = relation("Address", backref="user",
primaryjoin=id == Address.user_id)
def test_with_explicit_autoloaded(self):
meta = MetaData(testing.db)
t1 = Table('t1', meta,
- Column('id', String(50), primary_key=True),
+ Column('id', String(50), primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)))
meta.create_all()
try:
finally:
meta.drop_all()
+ def test_synonym_for(self):
+ class User(Base, ComparableEntity):
+ __tablename__ = 'users'
+
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ name = Column('name', String(50))
+
+ @decl.synonym_for('name')
+ @property
+ def namesyn(self):
+ return self.name
+
+ Base.metadata.create_all()
+
+ sess = create_session()
+ u1 = User(name='someuser')
+ eq_(u1.name, "someuser")
+ eq_(u1.namesyn, 'someuser')
+ sess.add(u1)
+ sess.flush()
+
+ rt = sess.query(User).filter(User.namesyn == 'someuser').one()
+ eq_(rt, u1)
+
+ def test_comparable_using(self):
+ class NameComparator(sa.orm.PropComparator):
+ @property
+ def upperself(self):
+ cls = self.prop.parent.class_
+ col = getattr(cls, 'name')
+ return sa.func.upper(col)
+
+ def operate(self, op, other, **kw):
+ return op(self.upperself, other, **kw)
+
+ class User(Base, ComparableEntity):
+ __tablename__ = 'users'
+
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+ name = Column('name', String(50))
+
+ @decl.comparable_using(NameComparator)
+ @property
+ def uc_name(self):
+ return self.name is not None and self.name.upper() or None
+
+ Base.metadata.create_all()
+
+ sess = create_session()
+ u1 = User(name='someuser')
+ eq_(u1.name, "someuser", u1.name)
+ eq_(u1.uc_name, 'SOMEUSER', u1.uc_name)
+ sess.add(u1)
+ sess.flush()
+ sess.expunge_all()
+
+ rt = sess.query(User).filter(User.uc_name == 'SOMEUSER').one()
+ eq_(rt, u1)
+ sess.expunge_all()
+
+ rt = sess.query(User).filter(User.uc_name.startswith('SOMEUSE')).one()
+ eq_(rt, u1)
+
+
class DeclarativeInheritanceTest(DeclarativeTestBase):
def test_custom_join_condition(self):
class Foo(Base):
def test_joined(self):
class Company(Base, ComparableEntity):
__tablename__ = 'companies'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
name = Column('name', String(50))
employees = relation("Person")
class Person(Base, ComparableEntity):
__tablename__ = 'people'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
company_id = Column('company_id', Integer,
ForeignKey('companies.id'))
name = Column('name', String(50))
class Company(Base, ComparableEntity):
__tablename__ = 'companies'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
name = Column('name', String(50))
employees = relation("Person")
class Person(Base, ComparableEntity):
__tablename__ = 'people'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
company_id = Column('company_id', Integer,
ForeignKey('companies.id'))
name = Column('name', String(50))
class Company(Base, ComparableEntity):
__tablename__ = 'companies'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
name = Column('name', String(50))
employees = relation("Person")
class Person(Base, ComparableEntity):
__tablename__ = 'people'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
company_id = Column(Integer,
ForeignKey('companies.id'))
name = Column(String(50))
def test_joined_from_single(self):
class Company(Base, ComparableEntity):
__tablename__ = 'companies'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
name = Column('name', String(50))
employees = relation("Person")
class Person(Base, ComparableEntity):
__tablename__ = 'people'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
company_id = Column(Integer, ForeignKey('companies.id'))
name = Column(String(50))
discriminator = Column('type', String(50))
def test_add_deferred(self):
class Person(Base, ComparableEntity):
__tablename__ = 'people'
- id = Column('id', Integer, primary_key=True)
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
Person.name = deferred(Column(String(10)))
class Person(Base, ComparableEntity):
__tablename__ = 'people'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
name = Column(String(50))
discriminator = Column('type', String(50))
__mapper_args__ = {'polymorphic_on':discriminator}
class Language(Base, ComparableEntity):
__tablename__ = 'languages'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
name = Column(String(50))
assert not hasattr(Person, 'primary_language_id')
def test_concrete(self):
engineers = Table('engineers', Base.metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('primary_language', String(50))
)
managers = Table('managers', Base.metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('golf_swing', String(50))
)
class User(Base, ComparableEntity):
__tablename__ = 'users'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
name = Column(String(50))
class Address(Base, ComparableEntity):
__tablename__ = 'addresses'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
email = Column(String(50))
user_id = Column(Integer, ForeignKey('users.id'))
if inline:
reflection_metadata = MetaData(testing.db)
Table('users', reflection_metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
test_needs_fk=True)
Table('addresses', reflection_metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('email', String(50)),
Column('user_id', Integer, ForeignKey('users.id')),
test_needs_fk=True)
Table('imhandles', reflection_metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('user_id', Integer),
Column('network', String(50)),
Column('handle', String(50)),
class User(Base, ComparableEntity):
__tablename__ = 'users'
__autoload__ = True
+ if testing.against('oracle', 'firebird'):
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
addresses = relation("Address", backref="user")
class Address(Base, ComparableEntity):
__tablename__ = 'addresses'
__autoload__ = True
+ if testing.against('oracle', 'firebird'):
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
+
u1 = User(name='u1', addresses=[
Address(email='one'),
Address(email='two'),
class User(Base, ComparableEntity):
__tablename__ = 'users'
__autoload__ = True
+ if testing.against('oracle', 'firebird'):
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
nom = Column('name', String(50), key='nom')
addresses = relation("Address", backref="user")
class Address(Base, ComparableEntity):
__tablename__ = 'addresses'
__autoload__ = True
+ if testing.against('oracle', 'firebird'):
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
u1 = User(nom='u1', addresses=[
Address(email='one'),
class IMHandle(Base, ComparableEntity):
__tablename__ = 'imhandles'
__autoload__ = True
+ if testing.against('oracle', 'firebird'):
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
user_id = Column('user_id', Integer,
ForeignKey('users.id'))
class User(Base, ComparableEntity):
__tablename__ = 'users'
__autoload__ = True
+ if testing.against('oracle', 'firebird'):
+ id = Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
handles = relation("IMHandle", backref="user")
u1 = User(name='u1', handles=[
eq_(a1, IMHandle(network='lol', handle='zomg'))
eq_(a1.user, User(name='u1'))
- def test_synonym_for(self):
- class User(Base, ComparableEntity):
- __tablename__ = 'users'
-
- id = Column('id', Integer, primary_key=True)
- name = Column('name', String(50))
-
- @decl.synonym_for('name')
- @property
- def namesyn(self):
- return self.name
-
- Base.metadata.create_all()
-
- sess = create_session()
- u1 = User(name='someuser')
- eq_(u1.name, "someuser")
- eq_(u1.namesyn, 'someuser')
- sess.add(u1)
- sess.flush()
-
- rt = sess.query(User).filter(User.namesyn == 'someuser').one()
- eq_(rt, u1)
-
- def test_comparable_using(self):
- class NameComparator(sa.orm.PropComparator):
- @property
- def upperself(self):
- cls = self.prop.parent.class_
- col = getattr(cls, 'name')
- return sa.func.upper(col)
-
- def operate(self, op, other, **kw):
- return op(self.upperself, other, **kw)
-
- class User(Base, ComparableEntity):
- __tablename__ = 'users'
-
- id = Column('id', Integer, primary_key=True)
- name = Column('name', String(50))
-
- @decl.comparable_using(NameComparator)
- @property
- def uc_name(self):
- return self.name is not None and self.name.upper() or None
-
- Base.metadata.create_all()
-
- sess = create_session()
- u1 = User(name='someuser')
- eq_(u1.name, "someuser", u1.name)
- eq_(u1.uc_name, 'SOMEUSER', u1.uc_name)
- sess.add(u1)
- sess.flush()
- sess.expunge_all()
-
- rt = sess.query(User).filter(User.uc_name == 'SOMEUSER').one()
- eq_(rt, u1)
- sess.expunge_all()
-
- rt = sess.query(User).filter(User.uc_name.startswith('SOMEUSE')).one()
- eq_(rt, u1)
-
-
-if __name__ == '__main__':
- testing.main()
orders = fixture_table(
Table('orders', fixture_metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('user_id', None, ForeignKey('users.id')),
Column('address_id', None, ForeignKey('addresses.id')),
Column('description', String(30)),
dingalings = fixture_table(
Table("dingalings", fixture_metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('address_id', None, ForeignKey('addresses.id')),
Column('data', String(30)),
test_needs_acid=True,
from sqlalchemy.orm.interfaces import ONETOMANY, MANYTOONE
from sqlalchemy.test import testing
+from sqlalchemy.test.schema import Table, Column
from test.orm import _base
def define_tables(cls, metadata):
global ta, tb, tc
ta = ["a", metadata]
- ta.append(Column('id', Integer, primary_key=True)),
+ ta.append(Column('id', Integer, primary_key=True, test_needs_autoincrement=True)),
ta.append(Column('a_data', String(30)))
if "a"== parent and direction == MANYTOONE:
ta.append(Column('child_id', Integer, ForeignKey("%s.id" % child, use_alter=True, name="foo")))
from sqlalchemy.util import function_named
from test.orm import _base, _fixtures
+from sqlalchemy.test.schema import Table, Column
class ABCTest(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
global a, b, c
a = Table('a', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('adata', String(30)),
Column('type', String(30)),
)
C(cdata='c1', bdata='c1', adata='c1'),
C(cdata='c2', bdata='c2', adata='c2'),
C(cdata='c2', bdata='c2', adata='c2'),
- ] == sess.query(A).all()
+ ] == sess.query(A).order_by(A.id).all()
assert [
B(bdata='b1', adata='b1'),
from sqlalchemy.test import testing, engines
from sqlalchemy.util import function_named
from test.orm import _base, _fixtures
+from sqlalchemy.test.schema import Table, Column
class O2MTest(_base.MappedTest):
"""deals with inheritance and one-to-many relationships"""
def define_tables(cls, metadata):
global foo, bar, blub
foo = Table('foo', metadata,
- Column('id', Integer, Sequence('foo_seq', optional=True),
- primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(20)))
bar = Table('bar', metadata,
@classmethod
def define_tables(cls, metadata):
global t1
- t1 = Table('t1', metadata, Column('id', Integer, primary_key=True), Column('type', Integer, nullable=False))
+ t1 = Table('t1', metadata,
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('type', Integer, nullable=False))
def test_false_discriminator(self):
class Foo(object):pass
def define_tables(cls, metadata):
global t1, t2
t1 = Table('t1', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('type', String(10), nullable=False),
Column('info', String(255)))
t2 = Table('t2', metadata,
def define_tables(cls, metadata):
global t1, t2, t3, t4
t1= Table('t1', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30))
)
t2 = Table('t2', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('t1id', Integer, ForeignKey('t1.id')),
Column('type', String(30)),
Column('data', String(30))
Column('moredata', String(30)))
t4 = Table('t4', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('t3id', Integer, ForeignKey('t3.id')),
Column('data', String(30)))
def define_tables(cls, metadata):
global foo, bar, blub
foo = Table('foo', metadata,
- Column('id', Integer, Sequence('foo_seq', optional=True),
- primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('type', String(30)),
Column('data', String(20)))
Column('data', String(20)))
blub = Table('blub', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('foo_id', Integer, ForeignKey('foo.id')),
Column('bar_id', Integer, ForeignKey('bar.id')),
Column('data', String(20)))
def define_tables(cls, metadata):
global foo, bar, bar_foo
foo = Table('foo', metadata,
- Column('id', Integer, Sequence('foo_seq', optional=True),
- primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30)))
bar = Table('bar', metadata,
Column('id', Integer, ForeignKey('foo.id'), primary_key=True),
def define_tables(cls, metadata):
global users, roles, user_roles, admins
users = Table('users', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('email', String(128)),
Column('password', String(16)),
)
roles = Table('role', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('description', String(32))
)
)
admins = Table('admin', metadata,
- Column('admin_id', Integer, primary_key=True),
+ Column('admin_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('user_id', Integer, ForeignKey('users.id'))
)
def define_tables(cls, metadata):
global base, subtable, stuff
base = Table('base', metadata,
- Column('id', Integer, Sequence('version_test_seq', optional=True), primary_key=True ),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('version_id', Integer, nullable=False),
Column('value', String(40)),
Column('discriminator', Integer, nullable=False)
Column('subdata', String(50))
)
stuff = Table('stuff', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('parent', Integer, ForeignKey('base.id'))
)
global person_table, employee_table, Person, Employee
person_table = Table("persons", metadata,
- Column("id", Integer, primary_key=True),
+ Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
Column("name", String(80)),
)
employee_table = Table("employees", metadata,
- Column("id", Integer, primary_key=True),
+ Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
Column("salary", Integer),
Column("person_id", Integer, ForeignKey("persons.id")),
)
global _a_table, _b_table, _c_table
_a_table = Table('a', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data1', String(128))
)
global base, subtable
base = Table('base', metadata,
- Column('base_id', Integer, primary_key=True),
+ Column('base_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(255)),
Column('sqlite_fixer', String(10))
)
def define_tables(cls, metadata):
global base, sub
base = Table('base', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)),
Column('type', String(50))
)
@classmethod
def define_tables(cls, metadata):
parents = Table('parents', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(60)))
children = Table('children', metadata,
def define_tables(cls, metadata):
global single, parent
single = Table('single', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('type', String(50), nullable=False),
Column('data', String(50)),
Column('parent_id', Integer, ForeignKey('parent.id'), nullable=False),
)
parent = Table('parent', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50))
)
from test.orm import _base
from sqlalchemy.orm import attributes
from sqlalchemy.test.testing import eq_
+from sqlalchemy.test.schema import Table, Column
class Employee(object):
def __init__(self, name):
global managers_table, engineers_table, hackers_table, companies, employees_table
companies = Table('companies', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)))
employees_table = Table('employees', metadata,
- Column('employee_id', Integer, primary_key=True),
+ Column('employee_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('company_id', Integer, ForeignKey('companies.id'))
)
managers_table = Table('managers', metadata,
- Column('employee_id', Integer, primary_key=True),
+ Column('employee_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('manager_data', String(50)),
Column('company_id', Integer, ForeignKey('companies.id'))
)
engineers_table = Table('engineers', metadata,
- Column('employee_id', Integer, primary_key=True),
+ Column('employee_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('engineer_info', String(50)),
Column('company_id', Integer, ForeignKey('companies.id'))
)
hackers_table = Table('hackers', metadata,
- Column('employee_id', Integer, primary_key=True),
+ Column('employee_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('engineer_info', String(50)),
Column('company_id', Integer, ForeignKey('companies.id')),
@classmethod
def define_tables(cls, metadata):
Table('a_table', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('some_c_id', Integer, ForeignKey('c_table.id')),
Column('aname', String(50)),
)
Table('b_table', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('some_c_id', Integer, ForeignKey('c_table.id')),
Column('bname', String(50)),
)
Table('c_table', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('cname', String(50)),
)
def define_tables(cls, metadata):
global offices_table, refugees_table
refugees_table = Table('refugee', metadata,
- Column('refugee_fid', Integer, primary_key=True),
+ Column('refugee_fid', Integer, primary_key=True, test_needs_autoincrement=True),
Column('refugee_name', Unicode(30), key='name'))
offices_table = Table('office', metadata,
- Column('office_fid', Integer, primary_key=True),
+ Column('office_fid', Integer, primary_key=True, test_needs_autoincrement=True),
Column('office_name', Unicode(30), key='name'))
@classmethod
from sqlalchemy.test import testing
from sqlalchemy.util import function_named
from test.orm import _base
+from sqlalchemy.test.schema import Table, Column
class BaseObject(object):
def __init__(self, *args, **kwargs):
global publication_table, issue_table, location_table, location_name_table, magazine_table, \
page_table, magazine_page_table, classified_page_table, page_size_table
- zerodefault = {} #{'default':0}
publication_table = Table('publication', metadata,
- Column('id', Integer, primary_key=True, default=None),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(45), default=''),
)
issue_table = Table('issue', metadata,
- Column('id', Integer, primary_key=True, default=None),
- Column('publication_id', Integer, ForeignKey('publication.id'), **zerodefault),
- Column('issue', Integer, **zerodefault),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('publication_id', Integer, ForeignKey('publication.id')),
+ Column('issue', Integer),
)
location_table = Table('location', metadata,
- Column('id', Integer, primary_key=True, default=None),
- Column('issue_id', Integer, ForeignKey('issue.id'), **zerodefault),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('issue_id', Integer, ForeignKey('issue.id')),
Column('ref', CHAR(3), default=''),
- Column('location_name_id', Integer, ForeignKey('location_name.id'), **zerodefault),
+ Column('location_name_id', Integer, ForeignKey('location_name.id')),
)
location_name_table = Table('location_name', metadata,
- Column('id', Integer, primary_key=True, default=None),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(45), default=''),
)
magazine_table = Table('magazine', metadata,
- Column('id', Integer, primary_key=True, default=None),
- Column('location_id', Integer, ForeignKey('location.id'), **zerodefault),
- Column('page_size_id', Integer, ForeignKey('page_size.id'), **zerodefault),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('location_id', Integer, ForeignKey('location.id')),
+ Column('page_size_id', Integer, ForeignKey('page_size.id')),
)
page_table = Table('page', metadata,
- Column('id', Integer, primary_key=True, default=None),
- Column('page_no', Integer, **zerodefault),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('page_no', Integer),
Column('type', CHAR(1), default='p'),
)
magazine_page_table = Table('magazine_page', metadata,
- Column('page_id', Integer, ForeignKey('page.id'), primary_key=True, **zerodefault),
- Column('magazine_id', Integer, ForeignKey('magazine.id'), **zerodefault),
- Column('orders', TEXT, default=''),
+ Column('page_id', Integer, ForeignKey('page.id'), primary_key=True),
+ Column('magazine_id', Integer, ForeignKey('magazine.id')),
+ Column('orders', Text, default=''),
)
classified_page_table = Table('classified_page', metadata,
- Column('magazine_page_id', Integer, ForeignKey('magazine_page.page_id'), primary_key=True, **zerodefault),
+ Column('magazine_page_id', Integer, ForeignKey('magazine_page.page_id'), primary_key=True),
Column('titles', String(45), default=''),
)
page_size_table = Table('page_size', metadata,
- Column('id', Integer, primary_key=True, default=None),
- Column('width', Integer, **zerodefault),
- Column('height', Integer, **zerodefault),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('width', Integer),
+ Column('height', Integer),
Column('name', String(45), default=''),
)
'magazine': relation(Magazine, backref=backref('pages', order_by=page_table.c.page_no))
})
- classified_page_mapper = mapper(ClassifiedPage, classified_page_table, inherits=magazine_page_mapper, polymorphic_identity='c', primary_key=[page_table.c.id])
- #compile_mappers()
- #print [str(s) for s in classified_page_mapper.primary_key]
- #print classified_page_mapper.columntoproperty[page_table.c.id]
+ classified_page_mapper = mapper(ClassifiedPage,
+ classified_page_table,
+ inherits=magazine_page_mapper,
+ polymorphic_identity='c',
+ primary_key=[page_table.c.id])
session = create_session()
from test.orm import _base
from sqlalchemy.test import testing
+from sqlalchemy.test.schema import Table, Column
class PolymorphicCircularTest(_base.MappedTest):
def define_tables(cls, metadata):
global Table1, Table1B, Table2, Table3, Data
table1 = Table('table1', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('related_id', Integer, ForeignKey('table1.id'), nullable=True),
Column('type', String(30)),
Column('name', String(30))
)
data = Table('data', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('node_id', Integer, ForeignKey('table1.id')),
Column('data', String(30))
)
from sqlalchemy.util import function_named
from test.orm import _base, _fixtures
from sqlalchemy.test.testing import eq_
+from sqlalchemy.test.schema import Table, Column
class AttrSettable(object):
def __init__(self, **kwargs):
def define_tables(cls, metadata):
global people, managers, data
people = Table('people', metadata,
- Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
+ Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('type', String(30)))
def define_tables(cls, metadata):
global people, managers, data
people = Table('people', metadata,
- Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
+ Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('colleague_id', Integer, ForeignKey('people.person_id')),
Column('name', String(50)),
Column('type', String(30)))
def define_tables(cls, metadata):
global people, engineers, managers, cars
people = Table('people', metadata,
- Column('person_id', Integer, primary_key=True),
+ Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)))
engineers = Table('engineers', metadata,
Column('longer_status', String(70)))
cars = Table('cars', metadata,
- Column('car_id', Integer, primary_key=True),
+ Column('car_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('owner', Integer, ForeignKey('people.person_id')))
def testmanytoonepolymorphic(self):
def define_tables(cls, metadata):
global people, engineers, managers, cars
people = Table('people', metadata,
- Column('person_id', Integer, primary_key=True),
+ Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('type', String(50)))
Column('longer_status', String(70)))
cars = Table('cars', metadata,
- Column('car_id', Integer, primary_key=True),
+ Column('car_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('owner', Integer, ForeignKey('people.person_id')))
def testeagerempty(self):
def define_tables(cls, metadata):
global people, managers, data
people = Table('people', metadata,
- Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
+ Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
)
def define_tables(cls, metadata):
global people, engineers, managers, cars, offroad_cars
cars = Table('cars', metadata,
- Column('car_id', Integer, primary_key=True),
+ Column('car_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(30)))
offroad_cars = Table('offroad_cars', metadata,
Column('car_id',Integer, ForeignKey('cars.car_id'),nullable=False,primary_key=True))
people = Table('people', metadata,
- Column('person_id', Integer, primary_key=True),
+ Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('car_id', Integer, ForeignKey('cars.car_id'), nullable=False),
Column('name', String(50)))
def define_tables(cls, metadata):
global taggable, users
taggable = Table('taggable', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('type', String(30)),
Column('owner_id', Integer, ForeignKey('taggable.id')),
)
metadata = MetaData(testing.db)
# table definitions
status = Table('status', metadata,
- Column('status_id', Integer, primary_key=True),
+ Column('status_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(20)))
people = Table('people', metadata,
- Column('person_id', Integer, primary_key=True),
+ Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('status_id', Integer, ForeignKey('status.status_id'), nullable=False),
Column('name', String(50)))
Column('category', String(70)))
cars = Table('cars', metadata,
- Column('car_id', Integer, primary_key=True),
+ Column('car_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('status_id', Integer, ForeignKey('status.status_id'), nullable=False),
Column('owner', Integer, ForeignKey('people.person_id'), nullable=False))
global table_Employee, table_Engineer, table_Manager
table_Employee = Table( 'Employee', metadata,
Column( 'name', type_= String(100), ),
- Column( 'id', primary_key= True, type_= Integer, ),
+ Column( 'id', primary_key= True, type_= Integer, test_needs_autoincrement=True),
Column( 'atype', type_= String(100), ),
)
global base_item_table, item_table, base_item_collection_table, collection_table
base_item_table = Table(
'base_item', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('child_name', String(255), default=None))
item_table = Table(
collection_table = Table(
'collection', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', Unicode(255)))
def test_pjoin_compile(self):
def define_tables(cls, metadata):
global t1, t2
t1 = Table('t1', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('type', String(30), nullable=False),
Column('data', String(30)))
# note that the primary key column in t2 is named differently
global people, employees, tags, peopleTags
people = Table('people', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('_type', String(30), nullable=False),
)
)
tags = Table('tags', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('label', String(50), nullable=False),
)
def define_tables(cls, metadata):
global tablea, tableb, tablec, tabled
tablea = Table('tablea', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('adata', String(50)),
)
tableb = Table('tableb', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('aid', Integer, ForeignKey('tablea.id')),
Column('data', String(50)),
)
from sqlalchemy.test import AssertsCompiledSQL, testing
from test.orm import _base, _fixtures
from sqlalchemy.test.testing import eq_
+from sqlalchemy.test.schema import Table, Column
class Company(_fixtures.Base):
pass
global companies, people, engineers, managers, boss, paperwork, machines
companies = Table('companies', metadata,
- Column('company_id', Integer, Sequence('company_id_seq', optional=True), primary_key=True),
+ Column('company_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)))
people = Table('people', metadata,
- Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
+ Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('company_id', Integer, ForeignKey('companies.company_id')),
Column('name', String(50)),
Column('type', String(30)))
)
machines = Table('machines', metadata,
- Column('machine_id', Integer, primary_key=True),
+ Column('machine_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('engineer_id', Integer, ForeignKey('engineers.person_id')))
)
paperwork = Table('paperwork', metadata,
- Column('paperwork_id', Integer, primary_key=True),
+ Column('paperwork_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('description', String(50)),
Column('person_id', Integer, ForeignKey('people.person_id')))
def define_tables(cls, metadata):
global people, engineers
people = Table('people', metadata,
- Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
+ Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('type', String(30)))
def define_tables(cls, metadata):
global people, engineers, managers
people = Table('people', metadata,
- Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
+ Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('type', String(30)))
global people, engineers, organizations, engineers_to_org
organizations = Table('organizations', metadata,
- Column('id', Integer, Sequence('org_id_seq', optional=True), primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
)
engineers_to_org = Table('engineers_org', metadata,
)
people = Table('people', metadata,
- Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
+ Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('type', String(30)))
class Parent(Base):
__tablename__ = 'parent'
- id = Column(Integer, primary_key=True)
+ id = Column(Integer, primary_key=True, test_needs_autoincrement=True)
cls = Column(String(50))
__mapper_args__ = dict(polymorphic_on = cls )
from sqlalchemy.test import testing
from test.orm import _fixtures
from test.orm._base import MappedTest, ComparableEntity
+from sqlalchemy.test.schema import Table, Column
class SingleInheritanceTest(MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('employees', metadata,
- Column('employee_id', Integer, primary_key=True),
+ Column('employee_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('manager_data', String(50)),
Column('engineer_info', String(50)),
Column('type', String(20)))
Table('reports', metadata,
- Column('report_id', Integer, primary_key=True),
+ Column('report_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('employee_id', ForeignKey('employees.employee_id')),
Column('name', String(50)),
)
@classmethod
def define_tables(cls, metadata):
Table('employees', metadata,
- Column('employee_id', Integer, primary_key=True),
+ Column('employee_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('manager_data', String(50)),
Column('engineer_info', String(50)),
)
Table('companies', metadata,
- Column('company_id', Integer, primary_key=True),
+ Column('company_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
)
global persons_table, employees_table
persons_table = Table('persons', metadata,
- Column('person_id', Integer, primary_key=True),
+ Column('person_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('type', String(20), nullable=False)
)
from sqlalchemy.test.testing import assert_raises, assert_raises_message
from sqlalchemy import Integer, String, ForeignKey, Sequence, exc as sa_exc
-from sqlalchemy.test.schema import Table
-from sqlalchemy.test.schema import Column
+from sqlalchemy.test.schema import Table, Column
from sqlalchemy.orm import mapper, relation, create_session, class_mapper, backref
from sqlalchemy.orm import attributes, exc as orm_exc
from sqlalchemy.test import testing
mapper(User, users, properties = dict(
addresses = relation(Address, cascade="all, delete-orphan", backref="user"),
orders = relation(
- mapper(Order, orders), cascade="all, delete-orphan")
+ mapper(Order, orders), cascade="all, delete-orphan", order_by=orders.c.id)
))
mapper(Dingaling,dingalings, properties={
'address':relation(Address)
orders=[Order(description="order 3"),
Order(description="order 4")]))
- eq_(sess.query(Order).all(),
+ eq_(sess.query(Order).order_by(Order.id).all(),
[Order(description="order 3"), Order(description="order 4")])
o5 = Order(description="order 5")
sess.add(o5)
- try:
- sess.flush()
- assert False
- except orm_exc.FlushError, e:
- assert "is an orphan" in str(e)
+ assert_raises_message(orm_exc.FlushError, "is an orphan", sess.flush)
@testing.resolve_artifact_names
@classmethod
def define_tables(cls, metadata):
Table("extra", metadata,
- Column("id", Integer, Sequence("extra_id_seq", optional=True),
- primary_key=True),
+ Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
Column("prefs_id", Integer, ForeignKey("prefs.id")))
Table('prefs', metadata,
- Column('id', Integer, Sequence('prefs_id_seq', optional=True),
- primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(40)))
Table('users', metadata,
- Column('id', Integer, Sequence('user_id_seq', optional=True),
- primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(40)),
Column('pref_id', Integer, ForeignKey('prefs.id')))
@classmethod
def define_tables(cls, metadata):
Table('t1', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)),
Column('t2id', Integer, ForeignKey('t2.id')))
Table('t2', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)),
Column('t3id', Integer, ForeignKey('t3.id')))
Table('t3', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)))
@classmethod
@classmethod
def define_tables(cls, metadata):
Table('t1', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)),
Column('t2id', Integer, ForeignKey('t2.id')))
Table('t2', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)),
Column('t3id', Integer, ForeignKey('t3.id')))
Table('t3', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)))
@classmethod
@classmethod
def define_tables(cls, metadata):
Table('a', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30)),
test_needs_fk=True
)
Table('b', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30)),
test_needs_fk=True
)
Table('c', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30)),
Column('bid', Integer, ForeignKey('b.id')),
test_needs_fk=True
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
- Column('user_id', Integer,
- Sequence('user_id_seq', optional=True),
- primary_key=True),
+ Column('user_id', Integer,primary_key=True, test_needs_autoincrement=True),
Column('name', String(40)))
Table('addresses', metadata,
- Column('address_id', Integer,
- Sequence('address_id_seq', optional=True),
- primary_key=True),
+ Column('address_id', Integer,primary_key=True, test_needs_autoincrement=True),
Column('user_id', Integer, ForeignKey('users.user_id')),
Column('email_address', String(40)))
@classmethod
def define_tables(cls, meta):
Table('orders', meta,
- Column('id', Integer, Sequence('order_id_seq'),
- primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)))
Table('items', meta,
- Column('id', Integer, Sequence('item_id_seq'),
- primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('order_id', Integer, ForeignKey('orders.id'),
nullable=False),
Column('name', String(50)))
Table('attributes', meta,
- Column('id', Integer, Sequence('attribute_id_seq'),
- primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('item_id', Integer, ForeignKey('items.id'),
nullable=False),
Column('name', String(50)))
@classmethod
def define_tables(cls, meta):
Table('sales_reps', meta,
- Column('sales_rep_id', Integer,
- Sequence('sales_rep_id_seq'),
- primary_key=True),
+ Column('sales_rep_id', Integer,primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)))
Table('accounts', meta,
- Column('account_id', Integer,
- Sequence('account_id_seq'),
- primary_key=True),
+ Column('account_id', Integer,primary_key=True, test_needs_autoincrement=True),
Column('balance', Integer))
Table('customers', meta,
- Column('customer_id', Integer,
- Sequence('customer_id_seq'),
- primary_key=True),
+ Column('customer_id', Integer,primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('sales_rep_id', Integer,
ForeignKey('sales_reps.sales_rep_id')),
@classmethod
def define_tables(cls, metadata):
Table('addresses', metadata,
- Column('address_id', Integer, primary_key=True),
+ Column('address_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('street', String(30)),
)
Table('homes', metadata,
- Column('home_id', Integer, primary_key=True, key="id"),
+ Column('home_id', Integer, primary_key=True, key="id", test_needs_autoincrement=True),
Column('description', String(30)),
Column('address_id', Integer, ForeignKey('addresses.address_id'),
nullable=False),
)
Table('businesses', metadata,
- Column('business_id', Integer, primary_key=True, key="id"),
+ Column('business_id', Integer, primary_key=True, key="id", test_needs_autoincrement=True),
Column('description', String(30), key="description"),
Column('address_id', Integer, ForeignKey('addresses.address_id'),
nullable=False),
@classmethod
def define_tables(cls, metadata):
Table('table_a', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(30)))
Table('table_b', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(30)),
Column('a_id', Integer, ForeignKey('table_a.id')))
@classmethod
def define_tables(cls, metadata):
Table("base", metadata,
- Column("id", Integer, primary_key=True),
+ Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
Column("descr", String(50))
)
Table("noninh_child", metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('base_id', Integer, ForeignKey('base.id'))
)
from sqlalchemy.orm import dynamic_loader, backref
from sqlalchemy.test import testing
from sqlalchemy import Integer, String, ForeignKey, desc, select, func
-from sqlalchemy.test.schema import Table
-from sqlalchemy.test.schema import Column
+from sqlalchemy.test.schema import Table, Column
from sqlalchemy.orm import mapper, relation, create_session, Query, attributes
from sqlalchemy.orm.dynamic import AppenderMixin
from sqlalchemy.test.testing import eq_
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(40)),
Column('fullname', String(100)),
Column('password', String(15)))
Table('addresses', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('email_address', String(100), nullable=False),
Column('user_id', Integer, ForeignKey('users.id')))
metadata = MetaData(engines.utf8_engine())
table = Table('unicode_data', metadata,
- Column('id', Unicode(40), primary_key=True),
+ Column('id', Unicode(40), primary_key=True, test_needs_autoincrement=True),
Column('data', Unicode(40)))
try:
metadata.create_all()
sess = create_session()
assert [Address(id=5)] == sess.query(Address).filter(Address.user.has(name='fred')).all()
- assert [Address(id=2), Address(id=3), Address(id=4), Address(id=5)] == sess.query(Address).filter(Address.user.has(User.name.like('%ed%'))).all()
+ assert [Address(id=2), Address(id=3), Address(id=4), Address(id=5)] == \
+ sess.query(Address).filter(Address.user.has(User.name.like('%ed%'))).order_by(Address.id).all()
- assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter(Address.user.has(User.name.like('%ed%'), id=8)).all()
+ assert [Address(id=2), Address(id=3), Address(id=4)] == \
+ sess.query(Address).filter(Address.user.has(User.name.like('%ed%'), id=8)).order_by(Address.id).all()
# test has() doesn't overcorrelate
- assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).join("user").filter(Address.user.has(User.name.like('%ed%'), id=8)).all()
+ assert [Address(id=2), Address(id=3), Address(id=4)] == \
+ sess.query(Address).join("user").filter(Address.user.has(User.name.like('%ed%'), id=8)).order_by(Address.id).all()
# test has() doesnt' get subquery contents adapted by aliased join
- assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).join("user", aliased=True).filter(Address.user.has(User.name.like('%ed%'), id=8)).all()
+ assert [Address(id=2), Address(id=3), Address(id=4)] == \
+ sess.query(Address).join("user", aliased=True).filter(Address.user.has(User.name.like('%ed%'), id=8)).order_by(Address.id).all()
dingaling = sess.query(Dingaling).get(2)
assert [User(id=9)] == sess.query(User).filter(User.addresses.any(Address.dingaling==dingaling)).all()
assert [Address(id=5)] == sess.query(Address).filter(Address.dingaling==dingaling).all()
# m2m
- eq_(sess.query(Item).filter(Item.keywords==None).all(), [Item(id=4), Item(id=5)])
- eq_(sess.query(Item).filter(Item.keywords!=None).all(), [Item(id=1),Item(id=2), Item(id=3)])
+ eq_(sess.query(Item).filter(Item.keywords==None).order_by(Item.id).all(), [Item(id=4), Item(id=5)])
+ eq_(sess.query(Item).filter(Item.keywords!=None).order_by(Item.id).all(), [Item(id=1),Item(id=2), Item(id=3)])
def test_filter_by(self):
sess = create_session()
sess = create_session()
# o2o
- eq_([Address(id=1), Address(id=3), Address(id=4)], sess.query(Address).filter(Address.dingaling==None).all())
- eq_([Address(id=2), Address(id=5)], sess.query(Address).filter(Address.dingaling != None).all())
+ eq_([Address(id=1), Address(id=3), Address(id=4)],
+ sess.query(Address).filter(Address.dingaling==None).order_by(Address.id).all())
+ eq_([Address(id=2), Address(id=5)], sess.query(Address).filter(Address.dingaling != None).order_by(Address.id).all())
# m2o
eq_([Order(id=5)], sess.query(Order).filter(Order.address==None).all())
s = create_session()
+ oracle_as = "AS " if not testing.against('oracle') else ""
+
self.assert_compile(
s.query(User).options(eagerload(User.addresses)).from_self().statement,
"SELECT anon_1.users_id, anon_1.users_name, addresses_1.id, addresses_1.user_id, "\
- "addresses_1.email_address FROM (SELECT users.id AS users_id, users.name AS users_name FROM users) AS anon_1 "\
- "LEFT OUTER JOIN addresses AS addresses_1 ON anon_1.users_id = addresses_1.user_id ORDER BY addresses_1.id"
+ "addresses_1.email_address FROM (SELECT users.id AS users_id, users.name AS users_name FROM users) %(oracle_as)sanon_1 "\
+ "LEFT OUTER JOIN addresses %(oracle_as)saddresses_1 ON anon_1.users_id = addresses_1.user_id ORDER BY addresses_1.id" % {
+ 'oracle_as':oracle_as
+ }
)
def test_aliases(self):
def define_tables(cls, metadata):
global t1, t2, t1t2_1, t1t2_2
t1 = Table('t1', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30))
)
t2 = Table('t2', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30))
)
eq_(list(q2), [(u'jack',), (u'ed',)])
q = sess.query(User)
- q2 = q.order_by(User.id).values(User.name, User.name + " " + cast(User.id, String))
+ q2 = q.order_by(User.id).values(User.name, User.name + " " + cast(User.id, String(50)))
eq_(list(q2), [(u'jack', u'jack 7'), (u'ed', u'ed 8'), (u'fred', u'fred 9'), (u'chuck', u'chuck 10')])
q2 = q.join('addresses').filter(User.name.like('%e%')).order_by(User.id, Address.id).values(User.name, Address.email_address)
eq_(list(q2), [(u'jack', u'jack', u'jack'), (u'jack', u'jack', u'ed'), (u'jack', u'jack', u'fred'), (u'jack', u'jack', u'chuck'), (u'ed', u'ed', u'jack'), (u'ed', u'ed', u'ed'), (u'ed', u'ed', u'fred'), (u'ed', u'ed', u'chuck')])
@testing.fails_on('mssql', 'FIXME: unknown')
+ @testing.fails_on('oracle', "Oracle doesn't support boolean expressions as columns")
@testing.fails_on('postgres+pg8000', "pg8000 parses the SQL itself before passing on to PG, doesn't parse this")
def test_values_with_boolean_selects(self):
"""Tests a values clause that works with select boolean evaluations"""
for x in range(2):
sess.expunge_all()
def go():
- eq_(sess.query(Address).options(eagerload('user')).all(), address_result)
+ eq_(sess.query(Address).options(eagerload('user')).order_by(Address.id).all(), address_result)
self.assert_sql_count(testing.db, go, 1)
ualias = aliased(User)
)
ua = aliased(User)
- eq_(sess.query(Address, ua.concat, ua.count).select_from(join(Address, ua, 'user')).options(eagerload(Address.user)).all(),
+ eq_(sess.query(Address, ua.concat, ua.count).
+ select_from(join(Address, ua, 'user')).
+ options(eagerload(Address.user)).order_by(Address.id).all(),
[
(Address(id=1, user=User(id=7, concat=14, count=1)), 14, 1),
(Address(id=2, user=User(id=8, concat=16, count=3)), 16, 3),
def define_tables(cls, metadata):
global base, sub1, sub2
base = Table('base', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50))
)
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(32)),
Column('age', Integer))
Table('documents', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('user_id', None, ForeignKey('users.id')),
Column('title', String(32)))
def setup_mappers(cls):
mapper(User, users, properties={
'addresses':relation(Address, backref='user',
- cascade="all, delete-orphan"),
+ cascade="all, delete-orphan", order_by=addresses.c.id),
})
mapper(Address, addresses)
self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)
+ def test_round_trip_executemany(self):
+ # cx_oracle was producing different behavior for cursor.executemany()
+ # vs. cursor.execute()
+
+ unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
+
+ unicode_table.insert().execute(
+ dict(unicode_varchar=unicodedata,unicode_text=unicodedata),
+ dict(unicode_varchar=unicodedata,unicode_text=unicodedata)
+ )
+
+ x = unicode_table.select().execute().fetchone()
+ self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
+ self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)
+
def test_union(self):
"""ensure compiler processing works for UNIONs"""
"""application table metadata objects are described here."""
from sqlalchemy import *
-
+from sqlalchemy.test.schema import Table, Column
metadata = MetaData()
users = Table('users', metadata,
- Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key=True),
+ Column('user_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('user_name', String(30), nullable=False),
Column('fullname', String(100), nullable=False),
Column('password', String(40), nullable=False),
)
blogs = Table('blogs', metadata,
- Column('blog_id', Integer, Sequence('blog_id_seq', optional=True), primary_key=True),
+ Column('blog_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('owner_id', Integer, ForeignKey('users.user_id'), nullable=False),
Column('name', String(100), nullable=False),
Column('description', String(500))
)
posts = Table('posts', metadata,
- Column('post_id', Integer, Sequence('post_id_seq', optional=True), primary_key=True),
+ Column('post_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('blog_id', Integer, ForeignKey('blogs.blog_id'), nullable=False),
Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
Column('datetime', DateTime, nullable=False),
)
topics = Table('topics', metadata,
- Column('topic_id', Integer, primary_key=True),
+ Column('topic_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('keyword', String(50), nullable=False),
Column('description', String(500))
)
)
comments = Table('comments', metadata,
- Column('comment_id', Integer, primary_key=True),
+ Column('comment_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False),
Column('datetime', DateTime, nullable=False),