(a version number check is performed). This occurs if no end-user
returning() was specified.
+ - Databases which rely upon postfetch of "last inserted id" to get at a
+ generated sequence value (i.e. MySQL, MS-SQL) now work correctly
+ when there is a composite primary key where the "autoincrement" column
+ is not the first primary key column in the table.
- engines
- transaction isolation level may be specified with
def visit_SQL_VARIANT(self, type_):
return 'SQL_VARIANT'
-def _has_implicit_sequence(column):
- return column.primary_key and \
- column.autoincrement and \
- isinstance(column.type, sqltypes.Integer) and \
- not column.foreign_keys and \
- (
- column.default is None or
- (
- isinstance(column.default, sa_schema.Sequence) and
- column.default.optional)
- )
-
-def _table_sequence_column(tbl):
- if not hasattr(tbl, '_ms_has_sequence'):
- tbl._ms_has_sequence = None
- for column in tbl.c:
- if getattr(column, 'sequence', False) or _has_implicit_sequence(column):
- tbl._ms_has_sequence = column
- break
- return tbl._ms_has_sequence
-
class MSExecutionContext(default.DefaultExecutionContext):
_enable_identity_insert = False
_select_lastrowid = False
_result_proxy = None
+ _lastrowid = None
def pre_exec(self):
"""Activate IDENTITY_INSERT if needed."""
if self.isinsert:
tbl = self.compiled.statement.table
- seq_column = _table_sequence_column(tbl)
- insert_has_sequence = bool(seq_column)
+ seq_column = tbl._autoincrement_column
+ insert_has_sequence = seq_column is not None
if insert_has_sequence:
- self._enable_identity_insert = tbl._ms_has_sequence.key in self.compiled_parameters[0]
+ self._enable_identity_insert = seq_column.key in self.compiled_parameters[0]
else:
self._enable_identity_insert = False
if not column.table:
raise exc.InvalidRequestError("mssql requires Table-bound columns in order to generate DDL")
- seq_col = _table_sequence_column(column.table)
+ seq_col = column.table._autoincrement_column
# install a IDENTITY Sequence if we have an implicit IDENTITY column
if seq_col is column:
preexecute_pk_sequences = True
supports_unicode_binds = True
-
+ postfetch_lastrowid = True
+
server_version_info = ()
statement_compiler = MSSQLCompiler
class DefaultExecutionContext(base.ExecutionContext):
- _lastrowid = None
def __init__(self, dialect, connection, compiled_sql=None, compiled_ddl=None, statement=None, parameters=None):
self.dialect = dialect
def post_insert(self):
if self.dialect.postfetch_lastrowid and \
- self._lastrowid is None and \
(not len(self._last_inserted_ids) or \
- self._last_inserted_ids[0] is None):
+ None in self._last_inserted_ids):
+
+ table = self.compiled.statement.table
+ lastrowid = self.get_lastrowid()
+ self._last_inserted_ids = [c is table._autoincrement_column and lastrowid or v
+ for c, v in zip(table.primary_key, self._last_inserted_ids)
+ ]
- self._lastrowid = self.get_lastrowid()
-
def last_inserted_ids(self, resultproxy):
if not self.isinsert:
raise exc.InvalidRequestError("Statement is not an insert() expression construct.")
if self.dialect.implicit_returning and \
not self.compiled.statement._returning and \
not resultproxy.closed:
-
+
+ table = self.compiled.statement.table
row = resultproxy.first()
self._last_inserted_ids = [v is not None and v or row[c]
- for c, v in zip(self.compiled.statement.table.primary_key, self._last_inserted_ids)
+ for c, v in zip(table.primary_key, self._last_inserted_ids)
]
return self._last_inserted_ids
- elif self._lastrowid is not None:
- return [self._lastrowid] + self._last_inserted_ids[1:]
else:
return self._last_inserted_ids
compiled_parameters[c.key] = val
if self.isinsert:
- self._last_inserted_ids = [compiled_parameters.get(c.key, None) for c in self.compiled.statement.table.primary_key]
+ self._last_inserted_ids = [compiled_parameters.get(c.key, None)
+ for c in self.compiled.statement.table.primary_key]
self._last_inserted_params = compiled_parameters
else:
self._last_updated_params = compiled_parameters
for c in pk.columns:
c.primary_key = True
+ @util.memoized_property
+ def _autoincrement_column(self):
+ for col in self.primary_key:
+ if col.autoincrement and \
+ isinstance(col.type, types.Integer) and \
+ not col.foreign_keys:
+
+ return col
+
@property
def key(self):
return _get_table_key(self.name, self.schema)
ret[c.key] = row[c]
return ret
- if testing.against('firebird', 'postgres', 'oracle', 'mssql'):
+ if testing.against('firebird', 'postgres', 'oracle'): #, 'mssql'):
test_engines = [
engines.testing_engine(options={'implicit_returning':False}),
engines.testing_engine(options={'implicit_returning':True}),
finally:
table.drop(bind=engine)
+ @testing.fails_on('sqlite', "sqlite autoincremnt doesn't work with composite pks")
+ def test_misordered_lastrow(self):
+ related = Table('related', metadata,
+ Column('id', Integer, primary_key=True)
+ )
+ t6 = Table("t6", metadata,
+ Column('manual_id', Integer, ForeignKey('related.id'), primary_key=True),
+ Column('auto_id', Integer, primary_key=True),
+ )
+
+ metadata.create_all()
+ r = related.insert().values(id=12).execute()
+ id = r.last_inserted_ids()[0]
+ assert id==12
+
+ r = t6.insert().values(manual_id=id).execute()
+ eq_(r.last_inserted_ids(), [12, 1])
+
+
def test_row_iteration(self):
users.insert().execute(
{'user_id':7, 'user_name':'jack'},