- support for cx_Oracle's "native unicode" mode which does not require NLS_LANG
to be set. Use the latest 5.0.2 or later of cx_oracle.
- an NCLOB type is added to the base types.
-
+ - func.char_length is a generic function for LENGTH
+ - ForeignKey() which includes onupdate=<value> will emit a warning, not
+ emit ON UPDATE CASCADE which is unsupported by oracle
+
- new dialects
- pg8000
- pyodbc+mysql
this was stepping into the bounds of optimization that is better left on the DBA side, but this
prefix can be added by enabling the optimize_limits=True flag on create_engine().
+ON UPDATE CASCADE
+-----------------
+
+Oracle doesn't have native ON UPDATE CASCADE functionality. A trigger based solution
+is available at http://asktom.oracle.com/tkyte/update_cascade/index.html .
+
+When using the SQLAlchemy ORM, the ORM has limited ability to manually issue
+cascading updates - specify ForeignKey objects using the
+"deferrable=True, initially='deferred'" keyword arguments,
+and specify "passive_updates=False" on each relation().
+
Oracle 8 Compatibility
----------------------
return self.visit_DATE(type_)
def visit_float(self, type_):
- #return "NUMERIC(%(precision)s, %(scale)s)" % {'precision': type_.precision, 'scale' : 2}
- return self.visit_NUMERIC(type_)
+ if type_.precision is None:
+ return "NUMERIC"
+ else:
+ return "NUMERIC(%(precision)s, %(scale)s)" % {'precision': type_.precision, 'scale' : 2}
def visit_unicode(self, type_):
return self.visit_NVARCHAR(type_)
def visit_now_func(self, fn, **kw):
return "CURRENT_TIMESTAMP"
+
+ def visit_char_length_func(self, fn, **kw):
+ return "LENGTH" + self.function_argspec(fn, **kw)
def visit_match_op(self, binary, **kw):
return "CONTAINS (%s, %s)" % (self.process(binary.left), self.process(binary.right))
def visit_drop_sequence(self, drop):
return "DROP SEQUENCE %s" % self.preparer.format_sequence(drop.element)
+ def define_constraint_cascades(self, constraint):
+ text = ""
+ if constraint.ondelete is not None:
+ text += " ON DELETE %s" % constraint.ondelete
+
+ # oracle has no ON UPDATE CASCADE -
+ # its only available via triggers http://asktom.oracle.com/tkyte/update_cascade/index.html
+ if constraint.onupdate is not None:
+ util.warn(
+ "Oracle does not contain native UPDATE CASCADE "
+ "functionality - onupdates will not be rendered for foreign keys."
+ "Consider using deferrable=True, initially='deferred' or triggers.")
+
+ return text
+
class OracleDefaultRunner(base.DefaultRunner):
def visit_sequence(self, seq):
return self.execute_string("SELECT " + self.dialect.identifier_preparer.format_sequence(seq) + ".nextval FROM DUAL", {})
def _check_ddl_on(self, on):
if (on is not None and
- (not isinstance(on, basestring) and not util.callable(on))):
+ (not isinstance(on, (basestring, tuple, list, set)) and not util.callable(on))):
raise exc.ArgumentError(
- "Expected the name of a database dialect or a callable for "
+ "Expected the name of a database dialect, a tuple of names, or a callable for "
"'on' criteria, got type '%s'." % type(on).__name__)
def _should_execute(self, event, schema_item, bind, **kw):
return True
elif isinstance(self.on, basestring):
return self.on == bind.engine.name
+ elif isinstance(self.on, (tuple, list, set)):
+ return bind.engine.name in self.on
else:
return self.on(event, schema_item, bind, **kw)
SQL bind parameters are not available in DDL statements.
on
- Optional filtering criteria. May be a string or a callable
+ Optional filtering criteria. May be a string, tuple or a callable
predicate. If a string, it will be compared to the name of the
executing database dialect::
DDL('something', on='postgres')
-
+
+ If a tuple, specifies multiple dialect names:
+
+ DDL('something', on=('postgres', 'mysql'))
+
If a callable, it will be invoked with three positional arguments
as well as optional keyword arguments:
', '.join(preparer.quote(f.column.name, f.column.quote)
for f in constraint._elements.values())
)
- if constraint.ondelete is not None:
- text += " ON DELETE %s" % constraint.ondelete
- if constraint.onupdate is not None:
- text += " ON UPDATE %s" % constraint.onupdate
+ text += self.define_constraint_cascades(constraint)
text += self.define_constraint_deferrability(constraint)
return text
text += self.define_constraint_deferrability(constraint)
return text
+ def define_constraint_cascades(self, constraint):
+ text = ""
+ if constraint.ondelete is not None:
+ text += " ON DELETE %s" % constraint.ondelete
+ if constraint.onupdate is not None:
+ text += " ON UPDATE %s" % constraint.onupdate
+ return text
+
def define_constraint_deferrability(self, constraint):
text = ""
if constraint.deferrable is not None:
# no access to same table
no_support('mysql', 'requires SUPER priv'),
exclude('mysql', '<', (5, 0, 10), 'not supported by database'),
- no_support('postgres', 'not supported by database: no statements'),
+
+ # huh? TODO: implement triggers for PG tests, remove this
+ no_support('postgres', 'PG triggers need to be implemented for tests'),
)
+def correlated_outer_joins(fn):
+ """Target must support an outer join to a subquery which correlates to the parent."""
+
+ return _chain_decorators_on(
+ fn,
+ no_support('oracle', 'Raises "ORA-01799: a column may not be outer-joined to a subquery"')
+ )
+
def savepoints(fn):
"""Target database must support savepoints."""
return _chain_decorators_on(
"UPDATE dt SET col2='ins', col4='ins' "
"WHERE dt.id IN (SELECT id FROM inserted);",
on='mssql'),
- ):
- if testing.against(ins.on):
- break
- else:
- ins = sa.DDL("CREATE TRIGGER dt_ins BEFORE INSERT ON dt "
+ sa.DDL("CREATE TRIGGER dt_ins BEFORE INSERT "
+ "ON dt "
+ "FOR EACH ROW "
+ "BEGIN "
+ ":NEW.col2 := 'ins'; :NEW.col4 := 'ins'; END;",
+ on='oracle'),
+ sa.DDL("CREATE TRIGGER dt_ins BEFORE INSERT ON dt "
"FOR EACH ROW BEGIN "
- "SET NEW.col2='ins'; SET NEW.col4='ins'; END")
- ins.execute_at('after-create', dt)
+ "SET NEW.col2='ins'; SET NEW.col4='ins'; END",
+ on=lambda event, schema_item, bind, **kw:
+ bind.engine.name not in ('oracle', 'mssql', 'sqlite')
+ ),
+ ):
+ ins.execute_at('after-create', dt)
+
sa.DDL("DROP TRIGGER dt_ins").execute_at('before-drop', dt)
-
for up in (
sa.DDL("CREATE TRIGGER dt_up AFTER UPDATE ON dt "
"FOR EACH ROW BEGIN "
"UPDATE dt SET col3='up', col4='up' "
"WHERE dt.id IN (SELECT id FROM deleted);",
on='mssql'),
- ):
- if testing.against(up.on):
- break
- else:
- up = sa.DDL("CREATE TRIGGER dt_up BEFORE UPDATE ON dt "
+ sa.DDL("CREATE TRIGGER dt_up BEFORE UPDATE ON dt "
+ "FOR EACH ROW BEGIN "
+ ":NEW.col3 := 'up'; :NEW.col4 := 'up'; END;",
+ on='oracle'),
+ sa.DDL("CREATE TRIGGER dt_up BEFORE UPDATE ON dt "
"FOR EACH ROW BEGIN "
- "SET NEW.col3='up'; SET NEW.col4='up'; END")
- up.execute_at('after-create', dt)
+ "SET NEW.col3='up'; SET NEW.col4='up'; END",
+ on=lambda event, schema_item, bind, **kw:
+ bind.engine.name not in ('oracle', 'mssql', 'sqlite')
+ ),
+ ):
+ up.execute_at('after-create', dt)
+
sa.DDL("DROP TRIGGER dt_up").execute_at('before-drop', dt)
from sqlalchemy.test import testing
from sqlalchemy.orm import eagerload, deferred, undefer
from sqlalchemy import Integer, String, Date, ForeignKey, and_, select, func
-from sqlalchemy.test.schema import Table
-from sqlalchemy.test.schema import Column
+from sqlalchemy.test.schema import Table, Column
from sqlalchemy.orm import mapper, relation, create_session, lazyload, aliased
from sqlalchemy.test.testing import eq_
from sqlalchemy.test.assertsql import CompiledSQL
})
mapper(User, users, properties={
'addresses':relation(mapper(Address, addresses), lazy=False, order_by=addresses.c.id),
- 'orders':relation(Order, lazy=True)
+ 'orders':relation(Order, lazy=True, order_by=orders.c.id)
})
sess = create_session()
q = sess.query(User)
- if testing.against('mysql'):
- l = q.limit(2).all()
- assert self.static.user_all_result[:2] == l
- else:
- l = q.order_by(User.id).limit(2).offset(1).all()
- print self.static.user_all_result[1:3]
- print l
- assert self.static.user_all_result[1:3] == l
+ l = q.order_by(User.id).limit(2).offset(1).all()
+ eq_(self.static.user_all_result[1:3], l)
@testing.resolve_artifact_names
def test_distinct(self):
s = sa.union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
mapper(User, users, properties={
- 'addresses':relation(mapper(Address, addresses), lazy=False),
+ 'addresses':relation(mapper(Address, addresses), lazy=False, order_by=addresses.c.id),
})
sess = create_session()
q = sess.query(User)
def go():
- l = q.filter(s.c.u2_id==User.id).distinct().all()
- assert self.static.user_address_result == l
+ l = q.filter(s.c.u2_id==User.id).distinct().order_by(User.id).all()
+ eq_(self.static.user_address_result, l)
self.assert_sql_count(testing.db, go, 1)
@testing.fails_on('maxdb', 'FIXME: unknown')
mapper(Order, orders)
mapper(User, users, properties={
- 'orders':relation(Order, backref='user', lazy=False),
- 'max_order':relation(mapper(Order, max_orders, non_primary=True), lazy=False, uselist=False)
+ 'orders':relation(Order, backref='user', lazy=False, order_by=orders.c.id),
+ 'max_order':relation(
+ mapper(Order, max_orders, non_primary=True),
+ lazy=False, uselist=False)
})
q = create_session().query(User)
max_order=Order(id=4)
),
User(id=10),
- ] == q.all()
+ ] == q.order_by(User.id).all()
self.assert_sql_count(testing.db, go, 1)
@testing.resolve_artifact_names
@classmethod
def define_tables(cls, metadata):
Table('m2m', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('aid', Integer, ForeignKey('a.id')),
Column('bid', Integer, ForeignKey('b.id')))
Table('a', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)))
Table('b', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)))
@classmethod
@classmethod
def define_tables(cls, metadata):
Table('nodes', metadata,
- Column('id', Integer, sa.Sequence('node_id_seq', optional=True),
- primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('parent_id', Integer, ForeignKey('nodes.id')),
Column('data', String(30)))
@classmethod
def define_tables(cls, metadata):
Table('a_table', metadata,
- Column('id', Integer, primary_key=True)
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True)
)
Table('b_table', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('parent_b1_id', Integer, ForeignKey('b_table.id')),
Column('parent_a_id', Integer, ForeignKey('a_table.id')),
Column('parent_b2_id', Integer, ForeignKey('b_table.id')))
@classmethod
def define_tables(cls, metadata):
Table('widget', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', sa.Unicode(40), nullable=False, unique=True),
)
@classmethod
def define_tables(cls, metadata):
Table('t1', metadata,
- Column('c1', Integer, primary_key=True),
+ Column('c1', Integer, primary_key=True, test_needs_autoincrement=True),
Column('c2', String(30)),
Column('type', String(30))
)
Table('t2', metadata,
- Column('c1', Integer, primary_key=True),
+ Column('c1', Integer, primary_key=True, test_needs_autoincrement=True),
Column('c2', String(30)),
Column('type', String(30)),
Column('t1.id', Integer, ForeignKey('t1.c1')))
@classmethod
def define_tables(cls, metadata):
Table('users_table', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(16))
)
Table('tags_table', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('user_id', Integer, ForeignKey("users_table.id")),
Column('score1', sa.Float),
Column('score2', sa.Float),
Exercises a variety of ways to configure this.
"""
+
+ # another argument for eagerload learning about inner joins
+
+ __requires__ = ('correlated_outer_joins', )
@classmethod
def define_tables(cls, metadata):
users = Table('users', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50))
)
stuff = Table('stuff', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('date', Date),
Column('user_id', Integer, ForeignKey('users.id')))
if ondate:
# the more 'relational' way to do this, join on the max date
- stuff_view = select([func.max(salias.c.date).label('max_date')]).where(salias.c.user_id==users.c.id).correlate(users)
+ stuff_view = select([func.max(salias.c.date).label('max_date')]).\
+ where(salias.c.user_id==users.c.id).correlate(users)
else:
# a common method with the MySQL crowd, which actually might perform better in some
# cases - subquery does a limit with order by DESC, join on the id
- stuff_view = select([salias.c.id]).where(salias.c.user_id==users.c.id).correlate(users).order_by(salias.c.date.desc()).limit(1)
+ stuff_view = select([salias.c.id]).where(salias.c.user_id==users.c.id).\
+ correlate(users).order_by(salias.c.date.desc()).limit(1)
if labeled == 'label':
stuff_view = stuff_view.label('foo')
# use a union all to get a lot of rows to join against
u2 = users.alias('u2')
s = sa.union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
- print [key for key in s.c.keys()]
- l = q.filter(s.c.u2_id==User.id).distinct().all()
- assert self.static.user_all_result == l
+ l = q.filter(s.c.u2_id==User.id).order_by(User.id).distinct().all()
+ eq_(self.static.user_all_result, l)
@testing.resolve_artifact_names
def test_one_to_many_scalar(self):
import sqlalchemy as sa
from sqlalchemy.test import testing, pickleable
from sqlalchemy import MetaData, Integer, String, ForeignKey, func, util
-from sqlalchemy.test.schema import Table
-from sqlalchemy.test.schema import Column
+from sqlalchemy.test.schema import Table, Column
from sqlalchemy.engine import default
from sqlalchemy.orm import mapper, relation, backref, create_session, class_mapper, compile_mappers, reconstructor, validates, aliased
from sqlalchemy.orm import defer, deferred, synonym, attributes, column_property, composite, relation, dynamic_loader, comparable_property
@testing.resolve_artifact_names
def test_self_ref_synonym(self):
t = Table('nodes', MetaData(),
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('parent_id', Integer, ForeignKey('nodes.id')))
class Node(object):
@testing.resolve_artifact_names
def test_prop_filters(self):
t = Table('person', MetaData(),
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('type', String(128)),
Column('name', String(128)),
Column('employee_number', Integer),
@classmethod
def define_tables(cls, metadata):
Table("thing", metadata,
- Column("id", Integer, primary_key=True),
+ Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
Column("name", String(20)))
Table("human", metadata,
- Column("id", Integer, primary_key=True),
+ Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
Column("thing_id", Integer, ForeignKey("thing.id")),
Column("name", String(20)))
@classmethod
def define_tables(cls, metadata):
Table('graphs', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('version_id', Integer, primary_key=True, nullable=True),
Column('name', String(30)))
Table('edges', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('graph_id', Integer, nullable=False),
Column('graph_version_id', Integer, nullable=False),
Column('x1', Integer),
['graphs.id', 'graphs.version_id']))
Table('foobars', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('x1', Integer, default=2),
Column('x2', Integer),
Column('x3', Integer, default=15),
@classmethod
def define_tables(cls, metadata):
Table('ht1', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('value', String(10)))
Table('ht2', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('ht1_id', Integer, ForeignKey('ht1.id')),
Column('value', String(10)))
Table('ht3', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('value', String(10)))
Table('ht4', metadata,
- Column('ht1_id', Integer, ForeignKey('ht1.id'),
- primary_key=True),
- Column('ht3_id', Integer, ForeignKey('ht3.id'),
- primary_key=True))
+ Column('ht1_id', Integer, ForeignKey('ht1.id'), primary_key=True),
+ Column('ht3_id', Integer, ForeignKey('ht3.id'), primary_key=True))
Table('ht5', metadata,
- Column('ht1_id', Integer, ForeignKey('ht1.id'),
- primary_key=True))
+ Column('ht1_id', Integer, ForeignKey('ht1.id'), primary_key=True))
Table('ht6', metadata,
- Column('ht1a_id', Integer, ForeignKey('ht1.id'),
- primary_key=True),
- Column('ht1b_id', Integer, ForeignKey('ht1.id'),
- primary_key=True),
+ Column('ht1a_id', Integer, ForeignKey('ht1.id'), primary_key=True),
+ Column('ht1b_id', Integer, ForeignKey('ht1.id'), primary_key=True),
Column('value', String(10)))
# Py2K
@classmethod
def define_tables(cls, metadata):
Table('cartographers', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50)),
Column('alias', String(50)),
Column('quip', String(100)))
Table('maps', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('cart_id', Integer,
ForeignKey('cartographers.id')),
Column('state', String(2)),
for reserved in (sa.orm.attributes.ClassManager.STATE_ATTR,
sa.orm.attributes.ClassManager.MANAGER_ATTR):
t = Table('t', sa.MetaData(),
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column(reserved, Integer))
class T(object):
pass
from sqlalchemy.test.testing import assert_raises, assert_raises_message
import sqlalchemy as sa
-from sqlalchemy import Table, Column, Integer, PickleType
+from sqlalchemy import Integer, PickleType
import operator
from sqlalchemy.test import testing
from sqlalchemy.util import OrderedSet
from sqlalchemy.orm import mapper, relation, create_session, PropComparator, synonym, comparable_property, sessionmaker
from sqlalchemy.test.testing import eq_, ne_
from test.orm import _base, _fixtures
-
+from sqlalchemy.test.schema import Table, Column
class MergeTest(_fixtures.FixtureTest):
"""Session.merge() functionality"""
'addresses':relation(Address,
backref='user',
collection_class=OrderedSet,
+ order_by=addresses.c.id,
cascade="all, delete-orphan")
})
mapper(Address, addresses)
mapper(User, users, properties={
'addresses':relation(Address,
backref='user',
+ order_by=addresses.c.id,
collection_class=OrderedSet)})
mapper(Address, addresses)
on_load = self.on_load_tracker(User)
@classmethod
def define_tables(cls, metadata):
Table("data", metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', PickleType(comparator=operator.eq))
)
import sqlalchemy as sa
from sqlalchemy.test import testing
from sqlalchemy import Integer, String, ForeignKey
-from sqlalchemy.test.schema import Table
-from sqlalchemy.test.schema import Column
+from sqlalchemy.test.schema import Table, Column
from sqlalchemy.orm import mapper, relation, create_session
from sqlalchemy.test.testing import eq_
from test.orm import _base
@classmethod
def define_tables(cls, metadata):
+ if testing.against('oracle'):
+ fk_args = dict(deferrable=True, initially='deferred')
+ else:
+ fk_args = dict(onupdate='cascade')
+
users = Table('users', metadata,
Column('username', String(50), primary_key=True),
Column('fullname', String(100)),
addresses = Table('addresses', metadata,
Column('email', String(50), primary_key=True),
- Column('username', String(50), ForeignKey('users.username', onupdate="cascade")),
+ Column('username', String(50), ForeignKey('users.username', **fk_args)),
test_needs_fk=True)
items = Table('items', metadata,
test_needs_fk=True)
users_to_items = Table('users_to_items', metadata,
- Column('username', String(50), ForeignKey('users.username', onupdate='cascade'), primary_key=True),
- Column('itemname', String(50), ForeignKey('items.itemname', onupdate='cascade'), primary_key=True),
+ Column('username', String(50), ForeignKey('users.username', **fk_args), primary_key=True),
+ Column('itemname', String(50), ForeignKey('items.itemname', **fk_args), primary_key=True),
test_needs_fk=True)
@classmethod
@testing.fails_on('sqlite', 'sqlite doesnt support ON UPDATE CASCADE')
+ @testing.fails_on('oracle', 'oracle doesnt support ON UPDATE CASCADE')
def test_onetomany_passive(self):
self._test_onetomany(True)
@testing.fails_on('sqlite', 'sqlite doesnt support ON UPDATE CASCADE')
+ @testing.fails_on('oracle', 'oracle doesnt support ON UPDATE CASCADE')
def test_manytoone_passive(self):
self._test_manytoone(True)
eq_([Address(username='ed'), Address(username='ed')], sess.query(Address).all())
@testing.fails_on('sqlite', 'sqlite doesnt support ON UPDATE CASCADE')
+ @testing.fails_on('oracle', 'oracle doesnt support ON UPDATE CASCADE')
def test_onetoone_passive(self):
self._test_onetoone(True)
eq_([Address(username='ed')], sess.query(Address).all())
@testing.fails_on('sqlite', 'sqlite doesnt support ON UPDATE CASCADE')
+ @testing.fails_on('oracle', 'oracle doesnt support ON UPDATE CASCADE')
def test_bidirectional_passive(self):
self._test_bidirectional(True)
@testing.fails_on('sqlite', 'sqlite doesnt support ON UPDATE CASCADE')
+ @testing.fails_on('oracle', 'oracle doesnt support ON UPDATE CASCADE')
def test_manytomany_passive(self):
self._test_manytomany(True)
@classmethod
def define_tables(cls, metadata):
+ if testing.against('oracle'):
+ fk_args = dict(deferrable=True, initially='deferred')
+ else:
+ fk_args = dict(onupdate='cascade')
+
Table('nodes', metadata,
Column('name', String(50), primary_key=True),
Column('parent', String(50),
- ForeignKey('nodes.name', onupdate='cascade')))
+ ForeignKey('nodes.name', **fk_args)))
@classmethod
def setup_classes(cls):
class NonPKCascadeTest(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
+ if testing.against('oracle'):
+ fk_args = dict(deferrable=True, initially='deferred')
+ else:
+ fk_args = dict(onupdate='cascade')
+
Table('users', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('username', String(50), unique=True),
Column('fullname', String(100)),
test_needs_fk=True)
Table('addresses', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('email', String(50)),
Column('username', String(50),
- ForeignKey('users.username', onupdate="cascade")),
+ ForeignKey('users.username', **fk_args)),
test_needs_fk=True
)
pass
@testing.fails_on('sqlite', 'sqlite doesnt support ON UPDATE CASCADE')
+ @testing.fails_on('oracle', 'oracle doesnt support ON UPDATE CASCADE')
def test_onetomany_passive(self):
self._test_onetomany(True)
import sqlalchemy as sa
from sqlalchemy.test import testing
from sqlalchemy import Integer, String, ForeignKey
-from sqlalchemy.test.schema import Table
-from sqlalchemy.test.schema import Column
+from sqlalchemy.test.schema import Table, Column
from sqlalchemy.orm import mapper, relation, create_session, attributes
from test.orm import _base, _fixtures
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(30)),
Column('type', String(30)))
Table('email_users', metadata,
class DistinctTest(QueryTest):
def test_basic(self):
- assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).distinct().all()
- assert [User(id=7), User(id=9), User(id=8),User(id=10)] == create_session().query(User).distinct().order_by(desc(User.name)).all()
+ eq_(
+ [User(id=7), User(id=8), User(id=9),User(id=10)],
+ create_session().query(User).order_by(User.id).distinct().all()
+ )
+ eq_(
+ [User(id=7), User(id=9), User(id=8),User(id=10)],
+ create_session().query(User).distinct().order_by(desc(User.name)).all()
+ )
def test_joined(self):
"""test that orderbys from a joined table get placed into the columns clause when DISTINCT is used"""
import sqlalchemy as sa
from sqlalchemy.test import testing
from sqlalchemy import Integer, String, ForeignKey, MetaData, and_
-from sqlalchemy.test.schema import Table
-from sqlalchemy.test.schema import Column
+from sqlalchemy.test.schema import Table, Column
from sqlalchemy.orm import mapper, relation, backref, create_session, compile_mappers, clear_mappers, sessionmaker
from sqlalchemy.test.testing import eq_, startswith_
from test.orm import _base, _fixtures
@classmethod
def define_tables(cls, metadata):
Table("tbl_a", metadata,
- Column("id", Integer, primary_key=True),
+ Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
Column("name", String(128)))
Table("tbl_b", metadata,
- Column("id", Integer, primary_key=True),
+ Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
Column("name", String(128)))
Table("tbl_c", metadata,
- Column("id", Integer, primary_key=True),
+ Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
Column("tbl_a_id", Integer, ForeignKey("tbl_a.id"), nullable=False),
Column("name", String(128)))
Table("tbl_d", metadata,
- Column("id", Integer, primary_key=True),
+ Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
Column("tbl_c_id", Integer, ForeignKey("tbl_c.id"), nullable=False),
Column("tbl_b_id", Integer, ForeignKey("tbl_b.id")),
Column("name", String(128)))
@classmethod
def define_tables(cls, metadata):
Table('company_t', metadata,
- Column('company_id', Integer, primary_key=True),
+ Column('company_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', sa.Unicode(30)))
Table('employee_t', metadata,
@classmethod
def define_tables(cls, metadata):
Table("tableA", metadata,
- Column("id",Integer,primary_key=True),
+ Column("id",Integer,primary_key=True, test_needs_autoincrement=True),
Column("foo",Integer,),
test_needs_fk=True)
Table("tableB",metadata,
@classmethod
def define_tables(cls, metadata):
- Table('tags', metadata, Column("id", Integer, primary_key=True),
+ Table('tags', metadata, Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
Column("data", String(50)),
)
Table('tag_foo', metadata,
- Column("id", Integer, primary_key=True),
+ Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
Column('tagid', Integer),
Column("data", String(50)),
)
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(50))
)
Table('addresses', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('user_id', Integer),
Column('email', String(50))
)
@classmethod
def define_tables(cls, metadata):
subscriber_table = Table('subscriber', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('dummy', String(10)) # to appease older sqlite version
)
@classmethod
def define_tables(cls, metadata):
Table("a", metadata,
- Column('aid', Integer, primary_key=True),
+ Column('aid', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30)))
Table("b", metadata,
- Column('bid', Integer, primary_key=True),
+ Column('bid', Integer, primary_key=True, test_needs_autoincrement=True),
Column("a_id", Integer, ForeignKey("a.aid")),
Column('data', String(30)))
Table("c", metadata,
- Column('cid', Integer, primary_key=True),
+ Column('cid', Integer, primary_key=True, test_needs_autoincrement=True),
Column("b_id", Integer, ForeignKey("b.bid")),
Column('data', String(30)))
Table("d", metadata,
- Column('did', Integer, primary_key=True),
+ Column('did', Integer, primary_key=True, test_needs_autoincrement=True),
Column("a_id", Integer, ForeignKey("a.aid")),
Column('data', String(30)))
@classmethod
def define_tables(cls, metadata):
Table("t1", metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(40)))
Table("t2", metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(40)),
Column('t1id', Integer, ForeignKey('t1.id')))
Table("t3", metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(40)),
Column('t2id', Integer, ForeignKey('t2.id')))
@classmethod
def define_tables(cls, metadata):
Table("t1", metadata,
- Column('t1id', Integer, primary_key=True),
+ Column('t1id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(40)))
Table("t2", metadata,
- Column('t2id', Integer, primary_key=True),
+ Column('t2id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(40)),
Column('t1id_ref', Integer, ForeignKey('t1.t1id')))
Table("t3", metadata,
- Column('t3id', Integer, primary_key=True),
+ Column('t3id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(40)),
Column('t2id_ref', Integer, ForeignKey('t2.t2id')))
@classmethod
def define_tables(cls, metadata):
Table('foos', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('bid1', Integer,ForeignKey('bars.id')),
Column('bid2', Integer,ForeignKey('bars.id')))
Table('bars', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)))
@testing.resolve_artifact_names
@classmethod
def define_tables(cls, metadata):
Table('foos', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)))
- Table('bars', metadata, Column('id', Integer, primary_key=True),
+ Table('bars', metadata, Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('fid1', Integer, ForeignKey('foos.id')),
Column('fid2', Integer, ForeignKey('foos.id')),
Column('data', String(50)))
@classmethod
def define_tables(cls, metadata):
Table('t1', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)))
Table('t2', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)),
Column('t1id', Integer, ForeignKey('t1.id')))
Table('t3', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)))
Table('t2tot3', metadata,
Column('t2id', Integer, ForeignKey('t2.id')),
@classmethod
def define_tables(cls, metadata):
Table('t1', metadata,
- Column('id', String(50), primary_key=True),
+ Column('id', String(50), primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)))
Table('t2', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)),
Column('t1id', String(50)))
from sqlalchemy.test import testing
from sqlalchemy.orm import scoped_session
from sqlalchemy import Integer, String, ForeignKey
-from sqlalchemy.test.schema import Table
-from sqlalchemy.test.schema import Column
+from sqlalchemy.test.schema import Table, Column
from sqlalchemy.orm import mapper, relation, query
from sqlalchemy.test.testing import eq_
from test.orm import _base
+
class _ScopedTest(_base.MappedTest):
"""Adds another lookup bucket to emulate Session globals."""
@classmethod
def define_tables(cls, metadata):
Table('table1', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30)))
Table('table2', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('someid', None, ForeignKey('table1.id')))
@testing.resolve_artifact_names
@classmethod
def define_tables(cls, metadata):
Table('table1', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30)))
Table('table2', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('someid', None, ForeignKey('table1.id')))
@classmethod
@classmethod
def define_tables(cls, metadata):
Table('table1', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(30)),
Column('type', String(30)))
Table('table2', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('someid', None, ForeignKey('table1.id')),
Column('somedata', String(30)))
import sqlalchemy as sa
from sqlalchemy.test import testing
from sqlalchemy import String, Integer, select
-from sqlalchemy.test.schema import Table
-from sqlalchemy.test.schema import Column
+from sqlalchemy.test.schema import Table, Column
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.test.testing import eq_
from test.orm import _base
@classmethod
def define_tables(cls, metadata):
Table('common', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', Integer),
Column('extra', String(45)))
import sqlalchemy as sa
from sqlalchemy.test import engines, testing, config
from sqlalchemy import Integer, String, Sequence
-from sqlalchemy.test.schema import Table
-from sqlalchemy.test.schema import Column
+from sqlalchemy.test.schema import Table, Column
from sqlalchemy.orm import mapper, relation, backref, eagerload
from sqlalchemy.test.testing import eq_
from test.engine import _base as engine_base
session = create_session(bind=testing.db)
session.begin()
- session.connection().execute("insert into users (name) values ('user1')")
+ session.connection().execute(users.insert().values(name='user1'))
session.begin(subtransactions=True)
session.begin_nested()
- session.connection().execute("insert into users (name) values ('user2')")
+ session.connection().execute(users.insert().values(name='user2'))
assert session.connection().execute("select count(1) from users").scalar() == 2
session.rollback()
assert session.connection().execute("select count(1) from users").scalar() == 1
- session.connection().execute("insert into users (name) values ('user3')")
+ session.connection().execute(users.insert().values(name='user3'))
session.commit()
assert session.connection().execute("select count(1) from users").scalar() == 2
def define_tables(cls, metadata):
global t1
t1 = Table('t1', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50))
)
def _map_it(self, cls):
return mapper(cls, Table('t', sa.MetaData(),
- Column('id', Integer, primary_key=True)))
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True)))
@testing.uses_deprecated()
def _test_instance_guards(self, user_arg):
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('name', String(20)),
test_needs_acid=True)
Column('foober', String(30), default="im foober", onupdate="im the update"))
st = Table('secondary_table', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', String(50)))
if testing.against('postgres', 'oracle'):
@classmethod
def define_tables(cls, metadata):
Table('data', metadata,
- Column('id', Integer, primary_key=True),
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('a', String(50)),
Column('b', String(50))
)
sess.add(o5)
sess.flush()
- assert list(sess.execute(t5.select(), mapper=T5)) == [(1, 'some t5')]
- assert list(sess.execute(t6.select(), mapper=T5)) == [(1, 'some t6', 1), (2, 'some other t6', 1)]
+ eq_(
+ list(sess.execute(t5.select(), mapper=T5)),
+ [(1, 'some t5')]
+ )
+ eq_(
+ list(sess.execute(t6.select().order_by(t6.c.id), mapper=T5)),
+ [(1, 'some t6', 1), (2, 'some other t6', 1)]
+ )
o6 = T5(data='some other t5', id=o5.id, t6s=[
T6(data='third t6', id=3),
sess.add(o6)
sess.flush()
- assert list(sess.execute(t5.select(), mapper=T5)) == [(1, 'some other t5')]
- assert list(sess.execute(t6.select(), mapper=T5)) == [(3, 'third t6', 1), (4, 'fourth t6', 1)]
+ eq_(
+ list(sess.execute(t5.select(), mapper=T5)),
+ [(1, 'some other t5')]
+ )
+ eq_(
+ list(sess.execute(t6.select().order_by(t6.c.id), mapper=T5)),
+ [(3, 'third t6', 1), (4, 'fourth t6', 1)]
+ )
@testing.resolve_artifact_names
def test_manytomany(self):