- all executemany() style calls put all sequences and SQL defaults inline into a single SQL statement
and don't do any pre-execution
- regular Insert and Update objects can have inline=True, forcing all executions to be inlined.
- no last_inserted_ids(), lastrow_has_defaults() available with inline execution
- calculation of pre/post execute pushed into compiler; DefaultExecutionContext greatly simplified
- fixed postgres reflection of primary key columns with no sequence/default generator, sets autoincrement=False
- fixed postgres executemany() behavior regarding sequences present, not present, passivedefaults, etc.
- all tests pass for sqlite, mysql, postgres; oracle tests pass as well as they did previously including all
insert/update/default functionality
- Fixed OrderedProperties pickling [ticket:762]
+- Defaults and sequences now execute "inline" for all executemany()
+ calls, using no prefetch whatsoever. inline=True flag on any
+ insert/update statement also forces the same behavior with a single
+ execute().
+
+- fixed PG executemany() behavior, [ticket:759]
+
+- postgres reflects tables with autoincrement=False for primary key
+ columns which have no defaults.
+
+- postgres no longer wraps executemany() with
+ individual execute() calls, instead favoring performance.
+ "rowcount"/"concurrency" checks with deleted items (which use executemany)
+ are disabled with PG since psycopg2 does not report proper rowcount for
+ executemany().
+
- Tickets fixed:
- [ticket:742]
{python}
>>> users.c.id==addresses.c.user_id #doctest: +ELLIPSIS
- <sqlalchemy.sql._BinaryExpression object at 0x...>
-
+ <sqlalchemy.sql.expression._BinaryExpression object at 0x...>
+
Wow, surprise ! This is neither a `True` nor a `False`. Well what is it ?
{python}
c.execute(statement, parameters)
self.context.rowcount = c.rowcount
except Exception, e:
- raise exceptions.SQLError(statement, parameters, e)
+ raise exceptions.DBAPIError.instance(statement, parameters, e)
def has_table(self, connection, tablename, schema=None):
# This approach seems to be more reliable that using DAO
self.context.rowcount = c.rowcount
c.DBPROP_COMMITPRESERVE = "Y"
except Exception, e:
- raise exceptions.SQLError(statement, parameters, e)
+ raise exceptions.DBAPIError.instance(statement, parameters, e)
def table_names(self, connection, schema):
from sqlalchemy.databases import information_schema as ischema
supports_unicode_statements = False
max_identifier_length = 30
supports_sane_rowcount = True
+ supports_sane_multi_rowcount = False
def __init__(self, use_ansi=True, auto_setinputsizes=True, auto_convert_lobs=True, threaded=True, allow_twophase=True, **kwargs):
default.DefaultDialect.__init__(self, default_paramstyle='named', **kwargs)
# locate the actual name of the table, the real owner, and any dblink clause needed.
actual_name, owner, dblink = self._resolve_table_owner(connection, self._denormalize_name(table.name), table)
- print "ACTUALNAME:", actual_name
-
c = connection.execute ("select COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE, DATA_DEFAULT from ALL_TAB_COLUMNS%(dblink)s where TABLE_NAME = :table_name and OWNER = :owner" % {'dblink':dblink}, {'table_name':actual_name, 'owner':owner})
for name, value in fks.iteritems():
table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1], name=name))
- def do_executemany(self, c, statement, parameters, context=None):
- rowcount = 0
- for param in parameters:
- c.execute(statement, param)
- rowcount += c.rowcount
- if context is not None:
- context._rowcount = rowcount
-
OracleDialect.logger = logging.class_logger(OracleDialect)
def uses_sequences_for_inserts(self):
return True
+ def visit_sequence(self, seq):
+ return self.dialect.identifier_preparer.format_sequence(seq) + ".nextval"
+
def visit_alias(self, alias, asfrom=False, **kwargs):
"""Oracle doesn't like ``FROM table AS alias``. Is the AS standard SQL??"""
else:
return self.process(alias.original, **kwargs)
- def visit_insert(self, insert):
- """``INSERT`` s are required to have the primary keys be explicitly present.
-
- Mapper will by default not put them in the insert statement
- to comply with autoincrement fields that require they not be
- present. so, put them all in for all primary key columns.
- """
-
- for c in insert.table.primary_key:
- if c.key not in self.parameters:
- self.parameters[c.key] = None
- return compiler.DefaultCompiler.visit_insert(self, insert)
-
def _TODO_visit_compound_select(self, select):
"""Need to determine how to get ``LIMIT``/``OFFSET`` into a ``UNION`` for Oracle."""
pass
supports_unicode_statements = False
max_identifier_length = 63
supports_sane_rowcount = True
+ supports_sane_multi_rowcount = False
def __init__(self, use_oids=False, server_side_cursors=False, **kwargs):
default.DefaultDialect.__init__(self, default_paramstyle='pyformat', **kwargs)
else:
return None
- def do_executemany(self, c, statement, parameters, context=None):
- """We need accurate rowcounts for updates, inserts and deletes.
-
- ``psycopg2`` is not nice enough to produce this correctly for
- an executemany, so we do our own executemany here.
- """
- rowcount = 0
- for param in parameters:
- c.execute(statement, param)
- rowcount += c.rowcount
- if context is not None:
- context._rowcount = rowcount
-
def has_table(self, connection, table_name, schema=None):
# seems like case gets folded in pg_class...
if schema is None:
c = connection.execute(t, table=table_oid)
for row in c.fetchall():
pk = row[0]
- table.primary_key.add(table.c[pk])
+ col = table.c[pk]
+ table.primary_key.add(col)
+ if col.default is None:
+ col.autoincrement=False
# Foreign keys
FK_SQL = """
def uses_sequences_for_inserts(self):
return True
+ def visit_sequence(self, seq):
+ if seq.optional:
+ return None
+ else:
+ return "nextval('%s')" % self.preparer.format_sequence(seq)
+
def limit_clause(self, select):
text = ""
if select._limit is not None:
supports_sane_rowcount
Indicate whether the dialect properly implements rowcount for ``UPDATE`` and ``DELETE`` statements.
+ supports_sane_multi_rowcount
+ Indicate whether the dialect properly implements rowcount for ``UPDATE`` and ``DELETE`` statements
+ when executed via executemany.
+
"""
def create_connect_args(self, url):
try:
self.__engine.dialect.do_begin(self.connection)
except Exception, e:
- raise exceptions.SQLError(None, None, e)
+ raise exceptions.DBAPIError.instance(None, None, e)
def _rollback_impl(self):
if self.__connection.is_valid:
try:
self.__engine.dialect.do_rollback(self.connection)
except Exception, e:
- raise exceptions.SQLError(None, None, e)
+ raise exceptions.DBAPIError.instance(None, None, e)
self.__transaction = None
def _commit_impl(self):
try:
self.__engine.dialect.do_commit(self.connection)
except Exception, e:
- raise exceptions.SQLError(None, None, e)
+ raise exceptions.DBAPIError.instance(None, None, e)
self.__transaction = None
def _savepoint_impl(self, name=None):
return self._execute_clauseelement(func.select(), multiparams, params)
def _execute_clauseelement(self, elem, multiparams=None, params=None):
- executemany = multiparams is not None and len(multiparams) > 0
- if executemany:
+ if multiparams:
param = multiparams[0]
+ executemany = len(multiparams) > 1
else:
param = params
- return self._execute_compiled(elem.compile(dialect=self.dialect, parameters=param), multiparams, params)
+ executemany = False
+ return self._execute_compiled(elem.compile(dialect=self.dialect, parameters=param, inline=executemany), multiparams, params)
def _execute_compiled(self, compiled, multiparams=None, params=None):
"""Execute a sql.Compiled object."""
self._autorollback()
if self.__close_with_result:
self.close()
- raise exceptions.SQLError(context.statement, context.parameters, e)
+ raise exceptions.DBAPIError.instance(context.statement, context.parameters, e)
def __executemany(self, context):
try:
self._autorollback()
if self.__close_with_result:
self.close()
- raise exceptions.SQLError(context.statement, context.parameters, e)
+ raise exceptions.DBAPIError.instance(context.statement, context.parameters, e)
# poor man's multimethod/generic function thingy
executors = {
return self.context.lastrow_has_defaults()
def supports_sane_rowcount(self):
- """Return ``supports_sane_rowcount()`` from the underlying ExecutionContext.
+ """Return ``supports_sane_rowcount`` from the dialect.
- See ExecutionContext for details.
+ """
+ return self.dialect.supports_sane_rowcount
+
+ def supports_sane_multi_rowcount(self):
+ """Return ``supports_sane_multi_rowcount`` from the dialect.
"""
- return self.context.supports_sane_rowcount()
+ return self.dialect.supports_sane_multi_rowcount
def _get_col(self, row, key):
rec = self._key_cache[key]
supports_unicode_statements = False
max_identifier_length = 9999
supports_sane_rowcount = True
+ supports_sane_multi_rowcount = True
def __init__(self, convert_unicode=False, encoding='utf-8', default_paramstyle='named', paramstyle=None, dbapi=None, **kwargs):
self.convert_unicode = convert_unicode
def supports_sane_rowcount(self):
return self.dialect.supports_sane_rowcount
+ def supports_sane_multi_rowcount(self):
+ return self.dialect.supports_sane_multi_rowcount
+
def last_inserted_ids(self):
return self._last_inserted_ids
"""generate default values for compiled insert/update statements,
and generate last_inserted_ids() collection."""
- if self.isinsert:
- drunner = self.dialect.defaultrunner(self)
- if self.executemany:
- # executemany doesn't populate last_inserted_ids()
- firstparam = self.compiled_parameters[0]
- processors = firstparam.get_processors()
- for c in self.compiled.statement.table.c:
- if c.default is not None:
- params = self.compiled_parameters
- for param in params:
- if not c.key in param or param.get_original(c.key) is None:
- self.compiled_parameters = param
- newid = drunner.get_column_default(c)
- if newid is not None:
- param.set_value(c.key, newid)
- self.compiled_parameters = params
- else:
- param = self.compiled_parameters
- processors = param.get_processors()
- last_inserted_ids = []
- for c in self.compiled.statement.table.c:
- if c in self.compiled.inline_params:
- self._postfetch_cols.add(c)
- if c.primary_key:
- last_inserted_ids.append(None)
- elif not c.key in param or param.get_original(c.key) is None:
- if isinstance(c.default, schema.PassiveDefault):
- self._postfetch_cols.add(c)
- newid = drunner.get_column_default(c)
- if newid is not None:
- param.set_value(c.key, newid)
- if c.primary_key:
- last_inserted_ids.append(param.get_processed(c.key, processors))
- elif c.primary_key:
- last_inserted_ids.append(None)
- elif c.primary_key:
- last_inserted_ids.append(param.get_processed(c.key, processors))
- self._last_inserted_ids = last_inserted_ids
- self._last_inserted_params = param
-
-
- elif self.isupdate:
- drunner = self.dialect.defaultrunner(self)
+ if self.isinsert or self.isupdate:
if self.executemany:
- for c in self.compiled.statement.table.c:
- if c.onupdate is not None:
- params = self.compiled_parameters
- for param in params:
- if not c.key in param or param.get_original(c.key) is None:
- self.compiled_parameters = param
- value = drunner.get_column_onupdate(c)
- if value is not None:
- param.set_value(c.key, value)
- self.compiled_parameters = params
+ if len(self.compiled.prefetch):
+ drunner = self.dialect.defaultrunner(self)
+ params = self.compiled_parameters
+ for param in params:
+ self.compiled_parameters = param
+ for c in self.compiled.prefetch:
+ if self.isinsert:
+ val = drunner.get_column_default(c)
+ else:
+ val = drunner.get_column_onupdate(c)
+ if val is not None:
+ param.set_value(c.key, val)
+ self.compiled_parameters = params
+
else:
- param = self.compiled_parameters
- for c in self.compiled.statement.table.c:
- if c in self.compiled.inline_params:
- self._postfetch_cols.add(c)
- elif c.onupdate is not None and (not c.key in param or param.get_original(c.key) is None):
- value = drunner.get_column_onupdate(c)
- if value is not None:
- param.set_value(c.key, value)
- self._last_updated_params = param
+ drunner = self.dialect.defaultrunner(self)
+ if self.isinsert:
+ self._last_inserted_ids = []
+ for c in self.compiled.prefetch:
+ print "PREFETCH COL", c.key
+ if self.isinsert:
+ val = drunner.get_column_default(c)
+ else:
+ val = drunner.get_column_onupdate(c)
+ if val is not None:
+ self.compiled_parameters.set_value(c.key, val)
+
+ if self.isinsert:
+ processors = self.compiled_parameters.get_processors()
+ for c in self.compiled.statement.table.primary_key:
+ if c.key in self.compiled_parameters:
+ self._last_inserted_ids.append(self.compiled_parameters.get_processed(c.key, processors))
+ else:
+ self._last_inserted_ids.append(None)
+
+ self._postfetch_cols = self.compiled.postfetch
+ if self.isinsert:
+ self._last_inserted_params = self.compiled_parameters
+ else:
+ self._last_updated_params = self.compiled_parameters
try:
return dbapi.connect(*cargs, **cparams)
except Exception, e:
- raise exceptions.DBAPIError(None, None, e)
+ raise exceptions.DBAPIError.instance(None, None, e)
creator = kwargs.pop('creator', connect)
poolclass = (kwargs.pop('poolclass', None) or
Its type and properties are DB-API implementation specific.
"""
- def __new__(cls, statement, params, orig, *args, **kw):
+ def instance(cls, statement, params, orig):
# Don't ever wrap these, just return them directly as if
# DBAPIError didn't exist.
if isinstance(orig, (KeyboardInterrupt, SystemExit)):
return orig
if orig is not None:
- name, glob = type(orig).__name__, globals()
+ name, glob = orig.__class__.__name__, globals()
if name in glob and issubclass(glob[name], DBAPIError):
cls = glob[name]
- return SQLAlchemyError.__new__(cls, statement, params, orig,
- *args, **kw)
-
+ return cls(statement, params, orig)
+ instance = classmethod(instance)
+
def __init__(self, statement, params, orig):
SQLAlchemyError.__init__(self, "(%s) %s" %
(orig.__class__.__name__, str(orig)))
# TODO: precompile the delete/insert queries?
statement = self.secondary.delete(sql.and_(*[c == sql.bindparam(c.key, type_=c.type) for c in self.secondary.c if c.key in associationrow]))
result = connection.execute(statement, secondary_delete)
- if result.supports_sane_rowcount() and result.rowcount != len(secondary_delete):
+ if result.supports_sane_multi_rowcount() and result.rowcount != len(secondary_delete):
raise exceptions.ConcurrentModificationError("Deleted rowcount %d does not match number of objects deleted %d" % (result.rowcount, len(secondary_delete)))
if secondary_insert:
if value is NO_ATTRIBUTE:
continue
if col.default is None or value is not None:
+ # TODO: clauseelments as bind params should
+ # be handled by Insert/Update expression upon execute()
if isinstance(value, sql.ClauseElement):
value_params[col] = value
else:
(obj, params, mapper, connection, value_params) = rec
c = connection.execute(statement.values(value_params), params)
primary_key = c.last_inserted_ids()
+
if primary_key is not None:
i = 0
for col in mapper.pks_by_table[table]:
clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col.key, type_=mapper.version_id_col.type, unique=True))
statement = table.delete(clause)
c = connection.execute(statement, del_objects)
- if c.supports_sane_rowcount() and c.rowcount != len(del_objects):
- raise exceptions.ConcurrentModificationError("Updated rowcount %d does not match number of objects updated %d" % (c.rowcount, len(delete)))
+ if c.supports_sane_multi_rowcount() and c.rowcount != len(del_objects):
+ raise exceptions.ConcurrentModificationError("Deleted rowcount %d does not match number of objects deleted %d" % (c.rowcount, len(del_objects)))
for obj, connection in deleted_objects:
for mapper in object_mapper(obj).iterate_to_root():
operators = OPERATORS
- def __init__(self, dialect, statement, parameters=None, **kwargs):
+ def __init__(self, dialect, statement, parameters=None, inline=False, **kwargs):
"""Construct a new ``DefaultCompiler`` object.
dialect
# if we are insert/update. set to true when we visit an INSERT or UPDATE
self.isinsert = self.isupdate = False
+ # compile INSERT/UPDATE defaults/sequences inlined (no pre-execute)
+ self.inline = inline or getattr(statement, 'inline', False)
+
# a dictionary of bind parameter keys to _BindParamClause instances.
self.binds = {}
# an IdentifierPreparer that formats the quoting of identifiers
self.preparer = self.dialect.identifier_preparer
- # for UPDATE and INSERT statements, a set of columns whos values are being set
- # from a SQL expression (i.e., not one of the bind parameter values). if present,
- # default-value logic in the Dialect knows not to fire off column defaults
- # and also knows postfetching will be needed to get the values represented by these
- # parameters.
- self.inline_params = None
def after_compile(self):
# this re will search for params like :param
def uses_sequences_for_inserts(self):
return False
-
- def visit_insert(self, insert_stmt):
- # search for columns who will be required to have an explicit bound value.
- # for inserts, this includes Python-side defaults, columns with sequences for dialects
- # that support sequences, and primary key columns for dialects that explicitly insert
- # pre-generated primary key values
- required_cols = [
- c for c in insert_stmt.table.c
- if \
- isinstance(c, schema.SchemaItem) and \
- (self.parameters is None or self.parameters.get(c.key, None) is None) and \
- (
- ((c.primary_key or isinstance(c.default, schema.Sequence)) and self.uses_sequences_for_inserts()) or
- isinstance(c.default, schema.ColumnDefault)
- )
- ]
+ def visit_sequence(self, seq):
+ raise NotImplementedError()
+
+ def visit_insert(self, insert_stmt):
self.isinsert = True
- colparams = self._get_colparams(insert_stmt, required_cols)
+ colparams = self._get_colparams(insert_stmt)
return ("INSERT INTO " + self.preparer.format_table(insert_stmt.table) + " (" + string.join([self.preparer.format_column(c[0]) for c in colparams], ', ') + ")" +
" VALUES (" + string.join([c[1] for c in colparams], ', ') + ")")
def visit_update(self, update_stmt):
self.stack.append({'from':util.Set([update_stmt.table])})
- # search for columns who will be required to have an explicit bound value.
- # for updates, this includes Python-side "onupdate" defaults.
- required_cols = [c for c in update_stmt.table.c
- if
- isinstance(c, schema.SchemaItem) and \
- (self.parameters is None or self.parameters.get(c.key, None) is None) and
- isinstance(c.onupdate, schema.ColumnDefault)
- ]
-
self.isupdate = True
- colparams = self._get_colparams(update_stmt, required_cols)
+ colparams = self._get_colparams(update_stmt)
text = "UPDATE " + self.preparer.format_table(update_stmt.table) + " SET " + string.join(["%s=%s" % (self.preparer.format_column(c[0]), c[1]) for c in colparams], ', ')
return text
- def _get_colparams(self, stmt, required_cols):
+ def _get_colparams(self, stmt):
"""create a set of tuples representing column/string pairs for use
in an INSERT or UPDATE statement.
- This method may generate new bind params within this compiled
- based on the given set of "required columns", which are required
- to have a value set in the statement.
"""
def create_bind_param(col, value):
self.binds[col.key] = bindparam
return self.bindparam_string(self._truncate_bindparam(bindparam))
- self.inline_params = util.Set()
-
+ self.postfetch = util.Set()
+ self.prefetch = util.Set()
+
def to_col(key):
if not isinstance(key, sql._ColumnClause):
return stmt.table.columns.get(unicode(key), key)
for k, v in stmt.parameters.iteritems():
parameters.setdefault(getattr(k, 'key', k), v)
- for col in required_cols:
- parameters.setdefault(col.key, None)
-
# create a list of column assignment clauses as tuples
values = []
for c in stmt.table.columns:
if c.key in parameters:
value = parameters[c.key]
- else:
- continue
- if sql._is_literal(value):
- value = create_bind_param(c, value)
- else:
- self.inline_params.add(c)
- value = self.process(value)
- values.append((c, value))
-
+ if sql._is_literal(value):
+ value = create_bind_param(c, value)
+ else:
+ self.postfetch.add(c)
+ value = self.process(value.self_group())
+ values.append((c, value))
+ elif isinstance(c, schema.Column):
+ if self.isinsert:
+ if isinstance(c.default, schema.ColumnDefault):
+ if self.inline and isinstance(c.default.arg, sql.ClauseElement):
+ values.append((c, self.process(c.default.arg)))
+ self.postfetch.add(c)
+ else:
+ values.append((c, create_bind_param(c, None)))
+ self.prefetch.add(c)
+ elif isinstance(c.default, schema.PassiveDefault):
+ if c.primary_key and self.uses_sequences_for_inserts() and not self.inline:
+ values.append((c, create_bind_param(c, None)))
+ self.prefetch.add(c)
+ else:
+ self.postfetch.add(c)
+ elif (c.primary_key or isinstance(c.default, schema.Sequence)) and self.uses_sequences_for_inserts():
+ if self.inline:
+ if c.default is not None:
+ proc = self.process(c.default)
+ if proc is not None:
+ values.append((c, proc))
+ self.postfetch.add(c)
+ else:
+ print "ISINSERT, HAS A SEQUENCE, IS PRIMARY KEY, ADDING PREFETCH:", c.key
+ values.append((c, create_bind_param(c, None)))
+ self.prefetch.add(c)
+ elif self.isupdate:
+ if isinstance(c.onupdate, schema.ColumnDefault):
+ if self.inline and isinstance(c.onupdate.arg, sql.ClauseElement):
+ values.append((c, self.process(c.onupdate.arg)))
+ self.postfetch.add(c)
+ else:
+ values.append((c, create_bind_param(c, None)))
+ self.prefetch.add(c)
+ elif isinstance(c.onupdate, schema.PassiveDefault):
+ self.postfetch.add(c)
return values
def visit_delete(self, delete_stmt):
return Select(*args, **kwargs).alias(alias)
-def insert(table, values = None, **kwargs):
+def insert(table, values=None, inline=False):
"""Return an [sqlalchemy.sql#Insert] clause element.
Similar functionality is available via the ``insert()`` method on
bind parameters also are None during the compile phase, then the
column specifications will be generated from the full list of
table columns.
+
+ inline
+ if True, SQL defaults will be compiled 'inline' into the statement
+ and not pre-executed.
If both `values` and compile-time bind parameters are present, the
compile-time bind parameters override the information specified
against the ``INSERT`` statement.
"""
- return Insert(table, values, **kwargs)
+ return Insert(table, values, inline=inline)
-def update(table, whereclause = None, values = None, **kwargs):
+def update(table, whereclause=None, values=None, inline=False):
"""Return an [sqlalchemy.sql#Update] clause element.
Similar functionality is available via the ``update()`` method on
``SET`` conditions will be generated from the full list of table
columns.
+ inline
+ if True, SQL defaults will be compiled 'inline' into the statement
+ and not pre-executed.
+
+
If both `values` and compile-time bind parameters are present, the
compile-time bind parameters override the information specified
within `values` on a per-key basis.
against the ``UPDATE`` statement.
"""
- return Update(table, whereclause, values, **kwargs)
+ return Update(table, whereclause=whereclause, values=values, inline=inline)
def delete(table, whereclause = None, **kwargs):
"""Return a [sqlalchemy.sql#Delete] clause element.
compile_params = multiparams[0]
else:
compile_params = params
- return self.compile(bind=self.bind, parameters=compile_params).execute(*multiparams, **params)
+ return self.compile(bind=self.bind, parameters=compile_params, inline=(len(multiparams) > 1)).execute(*multiparams, **params)
def scalar(self, *multiparams, **params):
"""Compile and execute this ``ClauseElement``, returning the result's scalar representation."""
return self.execute(*multiparams, **params).scalar()
- def compile(self, bind=None, parameters=None, compiler=None, dialect=None):
+ def compile(self, bind=None, parameters=None, compiler=None, dialect=None, inline=False):
"""Compile this SQL expression.
Uses the given ``Compiler``, or the given ``AbstractDialect``
if compiler is None:
if dialect is not None:
- compiler = dialect.statement_compiler(dialect, self, parameters)
+ compiler = dialect.statement_compiler(dialect, self, parameters, inline=inline)
elif bind is not None:
- compiler = bind.statement_compiler(self, parameters)
+ compiler = bind.statement_compiler(self, parameters, inline=inline)
elif self.bind is not None:
- compiler = self.bind.statement_compiler(self, parameters)
+ compiler = self.bind.statement_compiler(self, parameters, inline=inline)
if compiler is None:
from sqlalchemy.engine.default import DefaultDialect
dialect = DefaultDialect()
- compiler = dialect.statement_compiler(dialect, self, parameters=parameters)
+ compiler = dialect.statement_compiler(dialect, self, parameters=parameters, inline=inline)
compiler.compile()
return compiler
def select(self, whereclause = None, **params):
return select([self], whereclause, **params)
- def insert(self, values = None):
- return insert(self, values=values)
+ def insert(self, values=None, inline=False):
+ return insert(self, values=values, inline=inline)
- def update(self, whereclause = None, values = None):
- return update(self, whereclause, values)
+ def update(self, whereclause=None, values=None, inline=False):
+ return update(self, whereclause=whereclause, values=values, inline=inline)
- def delete(self, whereclause = None):
+ def delete(self, whereclause=None):
return delete(self, whereclause)
def _get_from_objects(self, **modifiers):
return iter([self.table])
def _process_colparams(self, parameters):
- """Receive the *values* of an ``INSERT`` or ``UPDATE`` statement and construct appropriate bind parameters."""
if parameters is None:
return None
if isinstance(parameters, (list, tuple)):
pp = {}
- i = 0
- for c in self.table.c:
+ for i, c in enumerate(self.table.c):
pp[c.key] = parameters[i]
- i +=1
- parameters = pp
-
- for key in parameters.keys():
- value = parameters[key]
- if isinstance(value, ClauseElement):
- parameters[key] = value.self_group()
- elif _is_literal(value):
- if _is_literal(key):
- col = self.table.c[key]
- else:
- col = key
- try:
- parameters[key] = bindparam(col, value, unique=True)
- except KeyError:
- del parameters[key]
- return parameters
-
+ return pp
+ else:
+ return parameters
+
def _find_engine(self):
return self.table.bind
class Insert(_UpdateBase):
- def __init__(self, table, values=None):
+ def __init__(self, table, values=None, inline=False):
self.table = table
self.select = None
+ self.inline=inline
self.parameters = self._process_colparams(values)
def get_children(self, **kwargs):
return u
class Update(_UpdateBase):
- def __init__(self, table, whereclause, values=None):
+ def __init__(self, table, whereclause, values=None, inline=False):
self.table = table
self._whereclause = whereclause
+ self.inline = inline
self.parameters = self._process_colparams(values)
def get_children(self, **kwargs):
import testbase
import datetime
from sqlalchemy import *
+from sqlalchemy import exceptions
from sqlalchemy.databases import postgres
from testlib import *
+class InsertTest(AssertMixin):
+ @testing.supported('postgres')
+ def setUpAll(self):
+ global metadata
+ metadata = MetaData(testbase.db)
+
+ @testing.supported('postgres')
+ def tearDown(self):
+ metadata.drop_all()
+ metadata.tables.clear()
+
+ @testing.supported('postgres')
+ def test_compiled_insert(self):
+ table = Table('testtable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(30)))
+
+ metadata.create_all()
+ ins = table.insert(values={'data':bindparam('x')}).compile()
+ ins.execute({'x':"five"}, {'x':"seven"})
+ assert table.select().execute().fetchall() == [(1, 'five'), (2, 'seven')]
+
+ @testing.supported('postgres')
+ def test_sequence_insert(self):
+ table = Table('testtable', metadata,
+ Column('id', Integer, Sequence('my_seq'), primary_key=True),
+ Column('data', String(30)))
+ metadata.create_all()
+ self._assert_data_with_sequence(table, "my_seq")
+
+ @testing.supported('postgres')
+ def test_opt_sequence_insert(self):
+ table = Table('testtable', metadata,
+ Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True),
+ Column('data', String(30)))
+ metadata.create_all()
+ self._assert_data_autoincrement(table)
+
+ @testing.supported('postgres')
+ def test_autoincrement_insert(self):
+ table = Table('testtable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(30)))
+ metadata.create_all()
+ self._assert_data_autoincrement(table)
+
+ @testing.supported('postgres')
+ def test_noautoincrement_insert(self):
+ table = Table('testtable', metadata,
+ Column('id', Integer, primary_key=True, autoincrement=False),
+ Column('data', String(30)))
+ metadata.create_all()
+ self._assert_data_noautoincrement(table)
+
+ def _assert_data_autoincrement(self, table):
+ def go():
+ # execute with explicit id
+ r = table.insert().execute({'id':30, 'data':'d1'})
+ assert r.last_inserted_ids() == [30]
+
+ # execute with prefetch id
+ r = table.insert().execute({'data':'d2'})
+ assert r.last_inserted_ids() == [1]
+
+ # executemany with explicit ids
+ table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
+
+ # executemany, uses SERIAL
+ table.insert().execute({'data':'d5'}, {'data':'d6'})
+
+ # single execute, explicit id, inline
+ table.insert(inline=True).execute({'id':33, 'data':'d7'})
+
+ # single execute, inline, uses SERIAL
+ table.insert(inline=True).execute({'data':'d8'})
+
+ # note that the test framework doesnt capture the "preexecute" of a seqeuence
+ # or default. we just see it in the bind params.
+
+ self.assert_sql(testbase.db, go, [], with_sequences=[
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ {'id':30, 'data':'d1'}
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ {'id':1, 'data':'d2'}
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
+ ),
+ (
+ "INSERT INTO testtable (data) VALUES (:data)",
+ [{'data':'d5'}, {'data':'d6'}]
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ [{'id':33, 'data':'d7'}]
+ ),
+ (
+ "INSERT INTO testtable (data) VALUES (:data)",
+ [{'data':'d8'}]
+ ),
+ ])
+
+ assert table.select().execute().fetchall() == [
+ (30, 'd1'),
+ (1, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (2, 'd5'),
+ (3, 'd6'),
+ (33, 'd7'),
+ (4, 'd8'),
+ ]
+ table.delete().execute()
+
+ # test the same series of events using a reflected
+ # version of the table
+ m2 = MetaData(testbase.db)
+ table = Table(table.name, m2, autoload=True)
+
+ def go():
+ table.insert().execute({'id':30, 'data':'d1'})
+ r = table.insert().execute({'data':'d2'})
+ assert r.last_inserted_ids() == [5]
+ table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
+ table.insert().execute({'data':'d5'}, {'data':'d6'})
+ table.insert(inline=True).execute({'id':33, 'data':'d7'})
+ table.insert(inline=True).execute({'data':'d8'})
+
+ self.assert_sql(testbase.db, go, [], with_sequences=[
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ {'id':30, 'data':'d1'}
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ {'id':5, 'data':'d2'}
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
+ ),
+ (
+ "INSERT INTO testtable (data) VALUES (:data)",
+ [{'data':'d5'}, {'data':'d6'}]
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ [{'id':33, 'data':'d7'}]
+ ),
+ (
+ "INSERT INTO testtable (data) VALUES (:data)",
+ [{'data':'d8'}]
+ ),
+ ])
+
+ assert table.select().execute().fetchall() == [
+ (30, 'd1'),
+ (5, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (6, 'd5'),
+ (7, 'd6'),
+ (33, 'd7'),
+ (8, 'd8'),
+ ]
+ table.delete().execute()
+
+ def _assert_data_with_sequence(self, table, seqname):
+ def go():
+ table.insert().execute({'id':30, 'data':'d1'})
+ table.insert().execute({'data':'d2'})
+ table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
+ table.insert().execute({'data':'d5'}, {'data':'d6'})
+ table.insert(inline=True).execute({'id':33, 'data':'d7'})
+ table.insert(inline=True).execute({'data':'d8'})
+
+ self.assert_sql(testbase.db, go, [], with_sequences=[
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ {'id':30, 'data':'d1'}
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ {'id':1, 'data':'d2'}
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
+ [{'data':'d5'}, {'data':'d6'}]
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (:id, :data)",
+ [{'id':33, 'data':'d7'}]
+ ),
+ (
+ "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
+ [{'data':'d8'}]
+ ),
+ ])
+
+ assert table.select().execute().fetchall() == [
+ (30, 'd1'),
+ (1, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (2, 'd5'),
+ (3, 'd6'),
+ (33, 'd7'),
+ (4, 'd8'),
+ ]
+
+ # cant test reflection here since the Sequence must be
+ # explicitly specified
+
+ def _assert_data_noautoincrement(self, table):
+ table.insert().execute({'id':30, 'data':'d1'})
+ try:
+ table.insert().execute({'data':'d2'})
+ assert False
+ except exceptions.IntegrityError, e:
+ assert "violates not-null constraint" in str(e)
+ try:
+ table.insert().execute({'data':'d2'}, {'data':'d3'})
+ assert False
+ except exceptions.IntegrityError, e:
+ assert "violates not-null constraint" in str(e)
+
+ table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
+ table.insert(inline=True).execute({'id':33, 'data':'d4'})
+
+ assert table.select().execute().fetchall() == [
+ (30, 'd1'),
+ (31, 'd2'),
+ (32, 'd3'),
+ (33, 'd4'),
+ ]
+ table.delete().execute()
+
+ # test the same series of events using a reflected
+ # version of the table
+ m2 = MetaData(testbase.db)
+ table = Table(table.name, m2, autoload=True)
+ table.insert().execute({'id':30, 'data':'d1'})
+ try:
+ table.insert().execute({'data':'d2'})
+ assert False
+ except exceptions.IntegrityError, e:
+ assert "violates not-null constraint" in str(e)
+ try:
+ table.insert().execute({'data':'d2'}, {'data':'d3'})
+ assert False
+ except exceptions.IntegrityError, e:
+ assert "violates not-null constraint" in str(e)
+
+ table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
+ table.insert(inline=True).execute({'id':33, 'data':'d4'})
+
+ assert table.select().execute().fetchall() == [
+ (30, 'd1'),
+ (31, 'd2'),
+ (32, 'd3'),
+ (33, 'd4'),
+ ]
+
class DomainReflectionTest(AssertMixin):
"Test PostgreSQL domains"
except exceptions.ConcurrentModificationError, e:
#print e
success = True
- if testbase.db.dialect.supports_sane_rowcount:
+ if testbase.db.dialect.supports_sane_multi_rowcount:
assert success
@engines.close_open_connections
Column('c1', Integer, primary_key=True),
Column('c2', String(30)))
- @profiling.profiled('ctest_insert', call_range=(50, 60), always=True)
+ @profiling.profiled('ctest_insert', call_range=(40, 50), always=True)
def test_insert(self):
t1.insert().compile()
- @profiling.profiled('ctest_update', call_range=(50, 60), always=True)
+ @profiling.profiled('ctest_update', call_range=(40, 50), always=True)
def test_update(self):
t1.update().compile()
db = testbase.db
metadata = MetaData(db)
default_generator = {'x':50}
+
def mydefault():
default_generator['x'] += 1
return default_generator['x']
def myupdate_with_ctx(ctx):
- return len(ctx.compiled_parameters['col2'])
+ conn = ctx.connection
+ return conn.execute(select([text('13')])).scalar()
def mydefault_using_connection(ctx):
conn = ctx.connection
try:
- if db.engine.name == 'oracle':
- return conn.execute("select 12 from dual").scalar()
- else:
- return conn.execute("select 12").scalar()
+ return conn.execute(select([text('12')])).scalar()
finally:
# ensure a "close()" on this connection does nothing,
# since its a "branched" connection
currenttime = func.current_date(type_=Date, bind=db);
if is_oracle:
ts = db.func.trunc(func.sysdate(), literal_column("'DAY'")).scalar()
- f = select([func.count(1) + 5], bind=db).scalar()
- f2 = select([func.count(1) + 14], bind=db).scalar()
+ f = select([func.length('abcdef')], bind=db).scalar()
+ f2 = select([func.length('abcdefghijk')], bind=db).scalar()
# TODO: engine propigation across nested functions not working
currenttime = func.trunc(currenttime, literal_column("'DAY'"), bind=db)
def1 = currenttime
def2 = func.trunc(text("sysdate"), literal_column("'DAY'"))
deftype = Date
elif use_function_defaults:
- f = select([func.count(1) + 5], bind=db).scalar()
- f2 = select([func.count(1) + 14], bind=db).scalar()
+ f = select([func.length('abcdef')], bind=db).scalar()
+ f2 = select([func.length('abcdefghijk')], bind=db).scalar()
def1 = currenttime
def2 = text("current_date")
deftype = Date
ts = db.func.current_date().scalar()
else:
- f = select([func.count(1) + 5], bind=db).scalar()
- f2 = select([func.count(1) + 14], bind=db).scalar()
+ f = select([func.length('abcdef')], bind=db).scalar()
+ f2 = select([func.length('abcdefghijk')], bind=db).scalar()
def1 = def2 = "3"
ts = 3
deftype = Integer
Column('col2', String(20), default="imthedefault", onupdate="im the update"),
# preexecute expression
- Column('col3', Integer, default=func.count(1) + 5, onupdate=func.count(1) + 14),
+ Column('col3', Integer, default=func.length('abcdef'), onupdate=func.length('abcdefghijk')),
# SQL-side default from sql expression
Column('col4', deftype, PassiveDefault(def1)),
self.assert_(50 <= x <= 57)
self.assert_(y == 'imthedefault')
self.assert_(z == f)
- # mysql/other db's return 0 or 1 for count(1)
- self.assert_(5 <= z <= 6)
+ self.assert_(f2==11)
def testinsert(self):
r = t.insert().execute()
- self.assert_(r.lastrow_has_defaults())
+ assert r.lastrow_has_defaults()
+ assert util.Set(r.context.postfetch_cols()) == util.Set([t.c.col5, t.c.col4])
+
+ r = t.insert(inline=True).execute()
+ assert r.lastrow_has_defaults()
+ assert util.Set(r.context.postfetch_cols()) == util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])
+
t.insert().execute()
t.insert().execute()
ctexec = currenttime.scalar()
l = t.select().execute()
today = datetime.date.today()
- self.assert_(l.fetchall() == [(51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today), (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today)])
+ self.assert_(l.fetchall() == [
+ (51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),
+ (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),
+ (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),
+ (54, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),
+ ])
def testinsertmany(self):
r = t.insert().execute({}, {}, {})
pk = r.last_inserted_ids()[0]
t.update(t.c.col1==pk).execute(col4=None, col5=None)
ctexec = currenttime.scalar()
- print "Currenttime "+ repr(ctexec)
l = t.select(t.c.col1==pk).execute()
l = l.fetchone()
self.assert_(l == (pk, 'im the update', f2, None, None, ctexec, True, False, 13, datetime.date.today()))
- # mysql/other db's return 0 or 1 for count(1)
- self.assert_(14 <= f2 <= 15)
+ self.assert_(f2==11)
def testupdatevalues(self):
r = t.insert().execute()
try:
table.insert().execute(data='row 1')
table.insert().execute(data='row 2')
+ table.insert().execute({'data':'row 3'}, {'data':'row 4'})
+ assert table.select().execute().fetchall() == [(1, "row 1"), (2, "row 2"), (3, "row 3"), (4, "row 4")]
finally:
table.drop()
table.create(checkfirst=True)
try:
- # simulate working on a table that doesn't already exist
meta2 = MetaData(testbase.db)
table2 = Table("aitest", meta2,
Column('id', Integer, Sequence('ai_id_seq', optional=True), primary_key=True),
"""test sequences fire off as defaults on non-pk columns"""
sometable.insert().execute(name="somename")
sometable.insert().execute(name="someother")
+ sometable.insert().execute(
+ {'name':'name3'},
+ {'name':'name4'}
+ )
assert sometable.select().execute().fetchall() == [
(1, "somename", 1),
(2, "someother", 2),
+ (3, "name3", 3),
+ (4, "name4", 4),
]
@testing.supported('postgres', 'oracle')
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)"
)
-
+ def testinlineinsert(self):
+ metadata = MetaData()
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo', Integer, default=func.foobar()))
+ self.assert_compile(table.insert(values={}, inline=True), "INSERT INTO sometable (foo) VALUES (foobar())")
+ self.assert_compile(table.insert(inline=True), "INSERT INTO sometable (foo) VALUES (foobar())", params={})
+
def testinsertexpression(self):
self.assert_compile(insert(table1), "INSERT INTO mytable (myid) VALUES (lala())", params=dict(myid=func.lala()))