* Support for ``SET IDENTITY_INSERT ON`` mode (automagic on / off for
``INSERT`` s)
-* Support for auto-fetching of ``@@IDENTITY`` on ``INSERT``
+* Support for auto-fetching of ``@@IDENTITY/@@SCOPE_IDENTITY()`` on ``INSERT``
* ``select.limit`` implemented as ``SELECT TOP n``
self.cursor.execute("SET IDENTITY_INSERT %s OFF" % self.compiled.statement.table.name)
self.IINSERT = False
elif self.HASIDENT:
- self.cursor.execute("SELECT @@IDENTITY AS lastrowid")
+ if self.dialect.use_scope_identity:
+ self.cursor.execute("SELECT scope_identity() AS lastrowid")
+ else:
+ self.cursor.execute("SELECT @@identity AS lastrowid")
row = self.cursor.fetchone()
self._last_inserted_ids = [int(row[0])]
# print "LAST ROW ID", self._last_inserted_ids
super(MSSQLExecutionContext, self).post_exec()
+class MSSQLExecutionContext_pyodbc (MSSQLExecutionContext):
+ def pre_exec(self):
+ """execute "set nocount on" on all connections, as a partial
+ workaround for multiple result set issues."""
+
+ if not getattr(self.connection, 'pyodbc_done_nocount', False):
+ self.connection.execute('SET nocount ON')
+ self.connection.pyodbc_done_nocount = True
+
+ super(MSSQLExecutionContext_pyodbc, self).pre_exec()
+
+
class MSSQLDialect(ansisql.ANSIDialect):
colspecs = {
sqltypes.Unicode : MSNVarchar,
super(MSSQLDialect, self).__init__(**params)
self.auto_identity_insert = auto_identity_insert
self.text_as_varchar = False
+ self.use_scope_identity = True
self.set_default_schema_name("dbo")
def dbapi(cls, module_name=None):
except KeyError:
raise exceptions.InvalidRequestError("Unsupported MSSQL module '%s' requested (must be adodbpi, pymssql or pyodbc)" % module_name)
else:
- for dialect_cls in [MSSQLDialect_adodbapi, MSSQLDialect_pymssql, MSSQLDialect_pyodbc]:
+ for dialect_cls in [MSSQLDialect_pyodbc, MSSQLDialect_pymssql, MSSQLDialect_adodbapi]:
try:
return dialect_cls.import_dbapi()
except ImportError, e:
pass
else:
- raise ImportError('No DBAPI module detected for MSSQL - please install adodbapi, pymssql or pyodbc')
+ raise ImportError('No DBAPI module detected for MSSQL - please install pyodbc, pymssql, or adodbapi')
dbapi = classmethod(dbapi)
def create_connect_args(self, url):
opts = url.translate_connect_args(['host', 'database', 'user', 'password', 'port'])
opts.update(url.query)
if opts.has_key('auto_identity_insert'):
- self.auto_identity_insert = bool(opts.pop('auto_identity_insert'))
+ self.auto_identity_insert = bool(int(opts.pop('auto_identity_insert')))
if opts.has_key('query_timeout'):
self.query_timeout = int(opts.pop('query_timeout'))
if opts.has_key('text_as_varchar'):
- self.text_as_varchar = bool(opts.pop('text_as_varchar'))
+ self.text_as_varchar = bool(int(opts.pop('text_as_varchar')))
+ if opts.has_key('use_scope_identity'):
+ self.use_scope_identity = bool(int(opts.pop('use_scope_identity')))
return self.make_connect_string(opts)
def create_execution_context(self, *args, **kwargs):
def is_disconnect(self, e):
return isinstance(e, self.dbapi.Error) and '[08S01]' in e.args[1]
+ def create_execution_context(self, *args, **kwargs):
+ return MSSQLExecutionContext_pyodbc(self, *args, **kwargs)
+
class MSSQLDialect_adodbapi(MSSQLDialect):
def import_dbapi(cls):
dialect = MSSQLDialect
+
r.close()
finally:
shadowed.drop(checkfirst=True)
+
+ @testbase.supported('mssql')
+ def test_fetchid_trigger(self):
+ meta = BoundMetaData(testbase.db)
+ t1 = Table('t1', meta,
+ Column('id', Integer, Sequence('fred', 100, 1), primary_key=True),
+ Column('descr', String(200)))
+ t2 = Table('t2', meta,
+ Column('id', Integer, Sequence('fred', 200, 1), primary_key=True),
+ Column('descr', String(200)))
+ meta.create_all()
+ con = testbase.db.connect()
+ con.execute("""create trigger paj on t1 for insert as
+ insert into t2 (descr) select descr from inserted""")
+
+ try:
+ tr = con.begin()
+ r = con.execute(t2.insert(), descr='hello')
+ self.assert_(r.last_inserted_ids() == [200])
+ r = con.execute(t1.insert(), descr='hello')
+ self.assert_(r.last_inserted_ids() == [100])
+
+ finally:
+ tr.commit()
+ con.execute("""drop trigger paj""")
+ meta.drop_all()
class CompoundTest(PersistTest):