"""Initialize execution context for a DDLElement construct."""
self = cls.__new__(cls)
- self.dialect = dialect
self.root_connection = connection
self._dbapi_connection = dbapi_connection
- self.engine = connection.engine
+ self.dialect = connection.dialect
self.compiled = compiled = compiled_ddl
self.isddl = True
"""Initialize execution context for a Compiled construct."""
self = cls.__new__(cls)
- self.dialect = dialect
self.root_connection = connection
self._dbapi_connection = dbapi_connection
- self.engine = connection.engine
+ self.dialect = connection.dialect
self.compiled = compiled
self.isupdate = compiled.isupdate
self.isdelete = compiled.isdelete
- if self.isinsert or self.isupdate or self.isdelete:
- self._is_explicit_returning = bool(compiled.statement._returning)
- self._is_implicit_returning = bool(
- compiled.returning and not compiled.statement._returning)
-
if not parameters:
self.compiled_parameters = [compiled.construct_params()]
else:
self.executemany = len(parameters) > 1
self.cursor = self.create_cursor()
- if self.isinsert or self.isupdate:
- self.postfetch_cols = self.compiled.postfetch
- self.prefetch_cols = self.compiled.prefetch
- self.returning_cols = self.compiled.returning
- self.__process_defaults()
+
+ if self.isinsert or self.isupdate or self.isdelete:
+ self._is_explicit_returning = bool(compiled.statement._returning)
+ self._is_implicit_returning = bool(
+ compiled.returning and not compiled.statement._returning)
+
+ if not self.isdelete:
+ if self.compiled.prefetch:
+ if self.executemany:
+ self._process_executemany_defaults()
+ else:
+ self._process_executesingle_defaults()
processors = compiled._bind_processors
else:
encode = not dialect.supports_unicode_statements
for compiled_params in self.compiled_parameters:
- param = {}
+
if encode:
- for key in compiled_params:
- if key in processors:
- param[dialect._encoder(key)[0]] = \
- processors[key](compiled_params[key])
- else:
- param[dialect._encoder(key)[0]] = \
- compiled_params[key]
+ param = dict(
+ (
+ dialect._encoder(key)[0],
+ processors[key](compiled_params[key])
+ if key in processors
+ else compiled_params[key]
+ )
+ for key in compiled_params
+ )
else:
- for key in compiled_params:
- if key in processors:
- param[key] = processors[key](compiled_params[key])
- else:
- param[key] = compiled_params[key]
+ param = dict(
+ (
+ key,
+ processors[key](compiled_params[key])
+ if key in processors
+ else compiled_params[key]
+ )
+ for key in compiled_params
+ )
+
parameters.append(param)
self.parameters = dialect.execute_sequence_format(parameters)
"""Initialize execution context for a string SQL statement."""
self = cls.__new__(cls)
- self.dialect = dialect
self.root_connection = connection
self._dbapi_connection = dbapi_connection
- self.engine = connection.engine
+ self.dialect = connection.dialect
# plain text statement
self.execution_options = connection._execution_options
"""Initialize execution context for a ColumnDefault construct."""
self = cls.__new__(cls)
- self.dialect = dialect
self.root_connection = connection
self._dbapi_connection = dbapi_connection
- self.engine = connection.engine
+ self.dialect = connection.dialect
self.execution_options = connection._execution_options
self.cursor = self.create_cursor()
return self
+ @util.memoized_property
+ def engine(self):
+ return self.root_connection.engine
+
+ @util.memoized_property
+ def postfetch_cols(self):
+ return self.compiled.postfetch
+
+ @util.memoized_property
+ def prefetch_cols(self):
+ return self.compiled.prefetch
+
+ @util.memoized_property
+ def returning_cols(self):
+ self.compiled.returning
+
@util.memoized_property
def no_parameters(self):
return self.execution_options.get("no_parameters", False)
return self.dialect.supports_sane_multi_rowcount
def post_insert(self):
- if not self._is_implicit_returning and \
- not self._is_explicit_returning and \
- not self.compiled.inline and \
- self.dialect.postfetch_lastrowid and \
- (not self.inserted_primary_key or
- None in self.inserted_primary_key):
-
- table = self.compiled.statement.table
- lastrowid = self.get_lastrowid()
- autoinc_col = table._autoincrement_column
- if autoinc_col is not None:
- # apply type post processors to the lastrowid
- proc = autoinc_col.type._cached_result_processor(
- self.dialect, None)
- if proc is not None:
- lastrowid = proc(lastrowid)
+ key_getter = self.compiled._key_getters_for_crud_column[2]
+ table = self.compiled.statement.table
+
+ if not self._is_implicit_returning and \
+ not self._is_explicit_returning and \
+ not self.compiled.inline and \
+ self.dialect.postfetch_lastrowid:
+
+ lastrowid = self.get_lastrowid()
+ autoinc_col = table._autoincrement_column
+ if autoinc_col is not None:
+ # apply type post processors to the lastrowid
+ proc = autoinc_col.type._cached_result_processor(
+ self.dialect, None)
+ if proc is not None:
+ lastrowid = proc(lastrowid)
+ self.inserted_primary_key = [
+ lastrowid if c is autoinc_col else
+ self.compiled_parameters[0].get(key_getter(c), None)
+ for c in table.primary_key
+ ]
+ else:
self.inserted_primary_key = [
- lastrowid if c is autoinc_col else v
- for c, v in zip(
- table.primary_key,
- self.inserted_primary_key)
+ self.compiled_parameters[0].get(key_getter(c), None)
+ for c in table.primary_key
]
def _fetch_implicit_returning(self, resultproxy):
def lastrow_has_defaults(self):
return (self.isinsert or self.isupdate) and \
- bool(self.postfetch_cols)
+ bool(self.compiled.postfetch)
def set_input_sizes(self, translate=None, exclude_types=None):
"""Given a cursor and ClauseParameters, call the appropriate
else:
return self._exec_default(column.onupdate, column.type)
- def __process_defaults(self):
- """Generate default values for compiled insert/update statements,
- and generate inserted_primary_key collection.
- """
-
+ def _process_executemany_defaults(self):
key_getter = self.compiled._key_getters_for_crud_column[2]
- if self.executemany:
- if len(self.compiled.prefetch):
- scalar_defaults = {}
-
- # pre-determine scalar Python-side defaults
- # to avoid many calls of get_insert_default()/
- # get_update_default()
- for c in self.prefetch_cols:
- if self.isinsert and c.default and c.default.is_scalar:
- scalar_defaults[c] = c.default.arg
- elif self.isupdate and c.onupdate and c.onupdate.is_scalar:
- scalar_defaults[c] = c.onupdate.arg
-
- for param in self.compiled_parameters:
- self.current_parameters = param
- for c in self.prefetch_cols:
- if c in scalar_defaults:
- val = scalar_defaults[c]
- elif self.isinsert:
- val = self.get_insert_default(c)
- else:
- val = self.get_update_default(c)
- if val is not None:
- param[key_getter(c)] = val
- del self.current_parameters
- else:
- self.current_parameters = compiled_parameters = \
- self.compiled_parameters[0]
-
- for c in self.compiled.prefetch:
- if self.isinsert:
+ prefetch = self.compiled.prefetch
+ scalar_defaults = {}
+
+ # pre-determine scalar Python-side defaults
+ # to avoid many calls of get_insert_default()/
+ # get_update_default()
+ for c in prefetch:
+ if self.isinsert and c.default and c.default.is_scalar:
+ scalar_defaults[c] = c.default.arg
+ elif self.isupdate and c.onupdate and c.onupdate.is_scalar:
+ scalar_defaults[c] = c.onupdate.arg
+
+ for param in self.compiled_parameters:
+ self.current_parameters = param
+ for c in prefetch:
+ if c in scalar_defaults:
+ val = scalar_defaults[c]
+ elif self.isinsert:
val = self.get_insert_default(c)
else:
val = self.get_update_default(c)
-
if val is not None:
- compiled_parameters[key_getter(c)] = val
- del self.current_parameters
+ param[key_getter(c)] = val
+ del self.current_parameters
+
+ def _process_executesingle_defaults(self):
+ key_getter = self.compiled._key_getters_for_crud_column[2]
+ prefetch = self.compiled.prefetch
+ self.current_parameters = compiled_parameters = \
+ self.compiled_parameters[0]
+
+ for c in prefetch:
if self.isinsert:
- self.inserted_primary_key = [
- self.compiled_parameters[0].get(key_getter(c), None)
- for c in self.compiled.
- statement.table.primary_key
- ]
+ val = self.get_insert_default(c)
+ else:
+ val = self.get_update_default(c)
+
+ if val is not None:
+ compiled_parameters[key_getter(c)] = val
+ del self.current_parameters
+
+
DefaultDialect.execution_ctx_cls = DefaultExecutionContext
-from sqlalchemy import exc as exceptions, select, MetaData, Integer, or_
+from sqlalchemy import exc as exceptions, select, MetaData, Integer, or_, \
+ bindparam
from sqlalchemy.engine import default
from sqlalchemy.sql import table, column
+from sqlalchemy.sql.elements import _truncated_label
from sqlalchemy.testing import AssertsCompiledSQL, assert_raises, engines,\
- fixtures
+ fixtures, eq_
from sqlalchemy.testing.schema import Table, Column
IDENT_LENGTH = 29
dialect=self._length_fixture(positional=True)
)
+ def test_bind_param_non_truncated(self):
+ table1 = self.table1
+ stmt = table1.insert().values(
+ this_is_the_data_column=
+ bindparam("this_is_the_long_bindparam_name")
+ )
+ compiled = stmt.compile(dialect=self._length_fixture(length=10))
+ eq_(
+ compiled.construct_params(
+ params={"this_is_the_long_bindparam_name": 5}),
+ {'this_is_the_long_bindparam_name': 5}
+ )
+
+ def test_bind_param_truncated_named(self):
+ table1 = self.table1
+ bp = bindparam(_truncated_label("this_is_the_long_bindparam_name"))
+ stmt = table1.insert().values(
+ this_is_the_data_column=bp
+ )
+ compiled = stmt.compile(dialect=self._length_fixture(length=10))
+ eq_(
+ compiled.construct_params(params={
+ "this_is_the_long_bindparam_name": 5}),
+ {"this_1": 5}
+ )
+
+ def test_bind_param_truncated_positional(self):
+ table1 = self.table1
+ bp = bindparam(_truncated_label("this_is_the_long_bindparam_name"))
+ stmt = table1.insert().values(
+ this_is_the_data_column=bp
+ )
+ compiled = stmt.compile(
+ dialect=self._length_fixture(length=10, positional=True))
+
+ eq_(
+ compiled.construct_params(params={
+ "this_is_the_long_bindparam_name": 5}),
+ {"this_1": 5}
+ )
+
class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'DefaultDialect'