super(MSSQLExecutionContext_pyodbc, self).pre_exec()
+ # where appropriate, issue "select scope_identity()" in the same statement
+ if self.compiled.isinsert and self.HASIDENT and (not self.IINSERT) and self.dialect.use_scope_identity:
+ self.statement += "; select scope_identity()"
+
+ def post_exec(self):
+ if self.compiled.isinsert and self.HASIDENT and (not self.IINSERT) and self.dialect.use_scope_identity:
+ # do nothing - id was fetched in dialect.do_execute()
+ self.HASIDENT = False
+ else:
+ super(MSSQLExecutionContext_pyodbc, self).post_exec()
+
class MSSQLDialect(ansisql.ANSIDialect):
colspecs = {
return [[";".join (connectors)], {}]
def is_disconnect(self, e):
- return isinstance(e, self.dbapi.Error) and '[08S01]' in e.args[1]
+ return isinstance(e, self.dbapi.Error) and '[08S01]' in str(e)
def create_execution_context(self, *args, **kwargs):
return MSSQLExecutionContext_pyodbc(self, *args, **kwargs)
+ def do_execute(self, cursor, statement, parameters, context=None, **kwargs):
+ super(MSSQLDialect_pyodbc, self).do_execute(cursor, statement, parameters, context=context, **kwargs)
+ if context and context.HASIDENT and (not context.IINSERT) and context.dialect.use_scope_identity:
+ import pyodbc
+ # fetch the last inserted id from the manipulated statement (pre_exec).
+ try:
+ row = cursor.fetchone()
+ except pyodbc.Error, e:
+ # if nocount OFF fetchone throws an exception and we have to jump over
+ # the rowcount to the resultset
+ cursor.nextset()
+ row = cursor.fetchone()
+ context._last_inserted_ids = [int(row[0])]
class MSSQLDialect_adodbapi(MSSQLDialect):
def import_dbapi(cls):
parameters = [p.get_original_dict() for p in ctx.compiled_parameters]
query = self.convert_statement(query)
+ if db.engine.name == 'mssql' and statement.endswith('; select scope_identity()'):
+ statement = statement[:-25]
testdata.unittest.assert_(statement == query and (params is None or params == parameters), "Testing for query '%s' params %s, received '%s' with params %s" % (query, repr(params), statement, repr(parameters)))
testdata.sql_count += 1
self.ctx.post_exec()