for large unsized string fields. Use the new "text_as_varchar" to
turn it on. [ticket:509]
+ - ORDER BY clauses without a LIMIT are now stripped in subqueries, as
+ MS-SQL forbids this usage
+
- cleanup of module importing code; specifiable DB-API module; more
explicit ordering of module preferences. [ticket:480]
0.1.0
initial release
+
self.HASIDENT = bool(tbl.has_sequence)
if engine.dialect.auto_identity_insert and self.HASIDENT:
if isinstance(parameters, list):
- self.IINSERT = parameters[0].has_key(tbl.has_sequence.name)
+ self.IINSERT = parameters[0].has_key(tbl.has_sequence.key)
else:
- self.IINSERT = parameters.has_key(tbl.has_sequence.name)
+ self.IINSERT = parameters.has_key(tbl.has_sequence.key)
else:
self.IINSERT = False
R.c.constraint_name == RR.c.unique_constraint_name,
C.c.ordinal_position == R.c.ordinal_position
),
- order_by = [RR.c.constraint_name])
+ order_by = [RR.c.constraint_name, R.c.ordinal_position])
rows = connection.execute(s).fetchall()
# group rows by constraint ID, to handle multi-column FKs
# "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use
return ''
+ def order_by_clause(self, select):
+ order_by = self.get_str(select.order_by_clause)
+
+ # MSSQL only allows ORDER BY in subqueries if there is a LIMIT
+ if order_by and (not select.is_subquery or select.limit):
+ return " ORDER BY " + order_by
+ else:
+ return ""
+
+
class MSSQLSchemaGenerator(ansisql.ANSISchemaGenerator):
def get_column_specification(self, column, **kwargs):
colspec = self.preparer.format_column(column) + " " + column.type.engine_impl(self.engine).get_col_spec()
try:
objectstore.context.current = s1
objectstore.flush()
- assert False
+ # Only dialects with a sane rowcount can detect the ConcurrentModificationError
+ if testbase.db.dialect.supports_sane_rowcount():
+ assert False
except exceptions.ConcurrentModificationError:
pass
version_table.delete().execute()
UnitOfWorkTest.tearDown(self)
- @testbase.unsupported('mssql')
def testbasic(self):
s = create_session()
class Foo(object):pass
except exceptions.ConcurrentModificationError, e:
#print e
success = True
- assert success
+
+ # Only dialects with a sane rowcount can detect the ConcurrentModificationError
+ if testbase.db.dialect.supports_sane_rowcount():
+ assert success
s.clear()
f1 = s.query(Foo).get(f1.id)
except exceptions.ConcurrentModificationError, e:
#print e
success = True
- assert success
+ if testbase.db.dialect.supports_sane_rowcount():
+ assert success
+
def testversioncheck(self):
"""test that query.with_lockmode performs a 'version check' on an already loaded instance"""
s1 = create_session()
class PKTest(UnitOfWorkTest):
- @testbase.unsupported('mssql')
def setUpAll(self):
UnitOfWorkTest.setUpAll(self)
global table
table.create()
table2.create()
table3.create()
- @testbase.unsupported('mssql')
+
def tearDownAll(self):
table.drop()
table2.drop()
# not support on sqlite since sqlite's auto-pk generation only works with
# single column primary keys
- @testbase.unsupported('sqlite', 'mssql')
+ @testbase.unsupported('sqlite')
def testprimarykey(self):
class Entry(object):
pass
self.assert_(e is not e2 and e._instance_key == e2._instance_key)
# this one works with sqlite since we are manually setting up pk values
- @testbase.unsupported('mssql')
def testmanualpk(self):
class Entry(object):
pass
e.data = 'im the data'
ctx.current.flush()
- @testbase.unsupported('mssql')
def testkeypks(self):
import datetime
class Entity(object):