definition, using strings as column names, as an alternative
to the creation of the index outside of the Table.
+ - A TypeDecorator of Integer can be used with a primary key
+ column, and the "autoincrement" feature of various dialects
+ as well as the "sqlite_autoincrement" flag will honor
+ the underlying database type as being Integer-based.
+ [ticket:2005]
+
+ - Result-row processors are applied to pre-executed SQL
+ defaults, as well as cursor.lastrowid, when determining
+ the contents of result.inserted_primary_key.
+ [ticket:2006]
+
+ - Bind parameters present in the "columns clause" of a select
+ are now auto-labeled like other "anonymous" clauses,
+ which among other things allows their "type" to be meaningful
+ when the row is fetched, as in result row processors.
+
+ - TypeDecorator is present in the "sqlalchemy" import space.
+
- mssql
- the String/Unicode types, and their counterparts VARCHAR/
NVARCHAR, emit "max" as the length when no length is
TIMESTAMP,
Text,
Time,
+ TypeDecorator,
Unicode,
UnicodeText,
VARCHAR,
class FBExecutionContext(default.DefaultExecutionContext):
- def fire_sequence(self, seq):
+ def fire_sequence(self, seq, proc):
"""Get the next value from the sequence using ``gen_id()``."""
return self._execute_scalar(
"SELECT gen_id(%s, 1) FROM rdb$database" %
- self.dialect.identifier_preparer.format_sequence(seq)
+ self.dialect.identifier_preparer.format_sequence(seq),
+ proc
)
elif column.nullable and is_timestamp and default is None:
colspec.append('NULL')
- if column.primary_key and column.autoincrement:
- try:
- first = [c for c in column.table.primary_key.columns
- if (c.autoincrement and
- isinstance(c.type, sqltypes.Integer) and
- not c.foreign_keys)].pop(0)
- if column is first:
- colspec.append('AUTO_INCREMENT')
- except IndexError:
- pass
+ if column is column.table._autoincrement_column:
+ colspec.append('AUTO_INCREMENT')
return ' '.join(colspec)
arg = "'%s'" % arg.replace("\\", "\\\\").replace("'", "''")
if opt in ('DATA_DIRECTORY', 'INDEX_DIRECTORY',
- 'DEFAULT_CHARACTER_SET', 'CHARACTER_SET', 'DEFAULT_CHARSET',
+ 'DEFAULT_CHARACTER_SET', 'CHARACTER_SET',
+ 'DEFAULT_CHARSET',
'DEFAULT_COLLATE'):
opt = opt.replace('_', ' ')
class OracleExecutionContext(default.DefaultExecutionContext):
- def fire_sequence(self, seq):
+ def fire_sequence(self, seq, proc):
return int(self._execute_scalar("SELECT " +
self.dialect.identifier_preparer.format_sequence(seq) +
- ".nextval FROM DUAL"))
+ ".nextval FROM DUAL"), proc)
class OracleDialect(default.DefaultDialect):
name = 'oracle'
class PGDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kwargs):
colspec = self.preparer.format_column(column)
+ type_affinity = column.type._type_affinity
if column.primary_key and \
len(column.foreign_keys)==0 and \
column.autoincrement and \
- isinstance(column.type, sqltypes.Integer) and \
- not isinstance(column.type, sqltypes.SmallInteger) and \
+ issubclass(type_affinity, sqltypes.Integer) and \
+ not issubclass(type_affinity, sqltypes.SmallInteger) and \
(column.default is None or
(isinstance(column.default, schema.Sequence) and
column.default.optional)):
- if isinstance(column.type, sqltypes.BigInteger):
+ if issubclass(type_affinity, sqltypes.BigInteger):
colspec += " BIGSERIAL"
else:
colspec += " SERIAL"
__visit_name__ = "drop_enum_type"
class PGExecutionContext(default.DefaultExecutionContext):
- def fire_sequence(self, seq):
+ def fire_sequence(self, seq, proc):
if not seq.optional:
return self._execute_scalar(("select nextval('%s')" % \
- self.dialect.identifier_preparer.format_sequence(seq)))
+ self.dialect.identifier_preparer.format_sequence(seq)), proc)
else:
return None
- def get_insert_default(self, column):
+ def get_insert_default(self, column, proc):
if column.primary_key:
if (isinstance(column.server_default, schema.DefaultClause) and
column.server_default.arg is not None):
# pre-execute passive defaults on primary key columns
return self._execute_scalar("select %s" %
- column.server_default.arg)
+ column.server_default.arg, proc)
elif column is column.table._autoincrement_column \
and (column.default is None or
exc = "select nextval('\"%s_%s_seq\"')" % \
(column.table.name, column.name)
- return self._execute_scalar(exc)
+ return self._execute_scalar(exc, proc)
- return super(PGExecutionContext, self).get_insert_default(column)
+ return super(PGExecutionContext, self).get_insert_default(column, proc)
class PGDialect(default.DefaultDialect):
name = 'postgresql'
if column.primary_key and \
column.table.kwargs.get('sqlite_autoincrement', False) and \
len(column.table.primary_key.columns) == 1 and \
- isinstance(column.type, sqltypes.Integer) and \
+ issubclass(column.type._type_affinity, sqltypes.Integer) and \
not column.foreign_keys:
colspec += " PRIMARY KEY AUTOINCREMENT"
c = list(constraint)[0]
if c.primary_key and \
c.table.kwargs.get('sqlite_autoincrement', False) and \
- isinstance(c.type, sqltypes.Integer) and \
+ issubclass(c.type._type_affinity, sqltypes.Integer) and \
not c.foreign_keys:
return None
self._handle_dbapi_exception(e, None, None, None, None)
raise
- ret = ctx._exec_default(default)
+ ret = ctx._exec_default(default, None)
if self.should_close_with_result:
self.close()
return ret
self.cursor = self.create_cursor()
if self.isinsert or self.isupdate:
- self.__process_defaults()
self.postfetch_cols = self.compiled.postfetch
self.prefetch_cols = self.compiled.prefetch
+ self.__process_defaults()
processors = compiled._bind_processors
else:
return autocommit
- def _execute_scalar(self, stmt):
+ def _execute_scalar(self, stmt, proc):
"""Execute a string statement on the current cursor, returning a
scalar result.
default_params = {}
conn._cursor_execute(self.cursor, stmt, default_params)
- return self.cursor.fetchone()[0]
+ r = self.cursor.fetchone()[0]
+ if proc:
+ return proc(r)
+ else:
+ return r
@property
def connection(self):
table = self.compiled.statement.table
lastrowid = self.get_lastrowid()
self.inserted_primary_key = [
- c is table._autoincrement_column and lastrowid or v
- for c, v in zip(table.primary_key, self.inserted_primary_key)
+ c is table._autoincrement_column and (
+ proc and proc(lastrowid)
+ or lastrowid
+ ) or v
+ for c, v, proc in zip(
+ table.primary_key,
+ self.inserted_primary_key,
+ self.compiled._pk_processors)
]
def _fetch_implicit_returning(self, resultproxy):
self.root_connection._handle_dbapi_exception(e, None, None, None, self)
raise
- def _exec_default(self, default):
+ def _exec_default(self, default, proc):
if default.is_sequence:
- return self.fire_sequence(default)
+ return self.fire_sequence(default, proc)
elif default.is_callable:
return default.arg(self)
elif default.is_clause_element:
else:
return default.arg
- def get_insert_default(self, column):
+ def get_insert_default(self, column, proc):
if column.default is None:
return None
else:
- return self._exec_default(column.default)
+ return self._exec_default(column.default, proc)
- def get_update_default(self, column):
+ def get_update_default(self, column, proc):
if column.onupdate is None:
return None
else:
- return self._exec_default(column.onupdate)
+ return self._exec_default(column.onupdate, proc)
def __process_defaults(self):
"""Generate default values for compiled insert/update statements,
# pre-determine scalar Python-side defaults
# to avoid many calls of get_insert_default()/
# get_update_default()
- for c in self.compiled.prefetch:
+ for c in self.prefetch_cols:
if self.isinsert and c.default and c.default.is_scalar:
scalar_defaults[c] = c.default.arg
elif self.isupdate and c.onupdate and c.onupdate.is_scalar:
for param in self.compiled_parameters:
self.current_parameters = param
- for c in self.compiled.prefetch:
+ for c, proc in zip(self.prefetch_cols,
+ self.compiled._prefetch_processors):
if c in scalar_defaults:
val = scalar_defaults[c]
elif self.isinsert:
- val = self.get_insert_default(c)
+ val = self.get_insert_default(c, proc)
else:
- val = self.get_update_default(c)
+ val = self.get_update_default(c, proc)
if val is not None:
param[c.key] = val
del self.current_parameters
self.current_parameters = compiled_parameters = \
self.compiled_parameters[0]
- for c in self.compiled.prefetch:
+ for c, proc in zip(self.compiled.prefetch,
+ self.compiled._prefetch_processors):
if self.isinsert:
- val = self.get_insert_default(c)
+ val = self.get_insert_default(c, proc)
else:
- val = self.get_update_default(c)
+ val = self.get_update_default(c, proc)
if val is not None:
compiled_parameters[c.key] = val
def _autoincrement_column(self):
for col in self.primary_key:
if col.autoincrement and \
- isinstance(col.type, types.Integer) and \
+ issubclass(col.type._type_affinity, types.Integer) and \
not col.foreign_keys and \
- isinstance(col.default, (type(None), Sequence)):
+ isinstance(col.default, (type(None), Sequence)) and \
+ col.server_default is None:
return col
The setting *only* has an effect for columns which are:
- * Integer derived (i.e. INT, SMALLINT, BIGINT)
+ * Integer derived (i.e. INT, SMALLINT, BIGINT).
* Part of the primary key
if value is not None
)
+ @util.memoized_property
+ def _pk_processors(self):
+ return [
+ col.type._cached_result_processor(self.dialect, None)
+ for col in self.statement.table.primary_key
+ ]
+
+ @util.memoized_property
+ def _prefetch_processors(self):
+ return [
+ col.type._cached_result_processor(self.dialect, None)
+ for col in self.prefetch
+ ]
+
def is_subquery(self):
return len(self.stack) > 1
)
self.binds[bindparam.key] = self.binds[name] = bindparam
+
return self.bindparam_string(name)
def render_literal_bindparam(self, bindparam, **kw):
not isinstance(column.table, sql.Select):
return _CompileLabel(column, sql._generated_label(column.name))
elif not isinstance(column,
- (sql._UnaryExpression, sql._TextClause,
- sql._BindParamClause)) \
+ (sql._UnaryExpression, sql._TextClause)) \
and (not hasattr(column, 'name') or \
isinstance(column, sql.Function)):
return _CompileLabel(column, column.anon_label)
with-_sqlalchemy = true
exclude = ^examples
first-package-wins = true
+where = test
for expr, compile in [
(
select([literal("x"), literal("y")]),
- "SELECT 'x', 'y'",
+ "SELECT 'x' AS anon_1, 'y' AS anon_2",
),
(
select([t]).where(t.c.foo.in_(['x', 'y', 'z'])),
'CREATE TABLE noautoinctable (id INTEGER '
'NOT NULL, x INTEGER, PRIMARY KEY (id))',
dialect=sqlite.dialect())
+
+ def test_sqlite_autoincrement_int_affinity(self):
+ class MyInteger(TypeDecorator):
+ impl = Integer
+ table = Table(
+ 'autoinctable',
+ MetaData(),
+ Column('id', MyInteger, primary_key=True),
+ sqlite_autoincrement=True,
+ )
+ self.assert_compile(schema.CreateTable(table),
+ 'CREATE TABLE autoinctable (id INTEGER NOT '
+ 'NULL PRIMARY KEY AUTOINCREMENT)',
+ dialect=sqlite.dialect())
def returning(fn):
return _chain_decorators_on(
fn,
- no_support('access', 'not supported by database'),
- no_support('sqlite', 'not supported by database'),
- no_support('mysql', 'not supported by database'),
- no_support('maxdb', 'not supported by database'),
- no_support('sybase', 'not supported by database'),
- no_support('informix', 'not supported by database'),
+ no_support('access', "'returning' not supported by database"),
+ no_support('sqlite', "'returning' not supported by database"),
+ no_support('mysql', "'returning' not supported by database"),
+ no_support('maxdb', "'returning' not supported by database"),
+ no_support('sybase', "'returning' not supported by database"),
+ no_support('informix', "'returning' not supported by database"),
)
def two_phase_transactions(fn):
self.assert_compile(
select([bindparam('a'), bindparam('b'), bindparam('c')]),
- "SELECT :a, :b, :c"
+ "SELECT :a AS anon_1, :b AS anon_2, :c AS anon_3"
, dialect=default.DefaultDialect(paramstyle='named')
)
self.assert_compile(
select([bindparam('a'), bindparam('b'), bindparam('c')]),
- "SELECT ?, ?, ?"
+ "SELECT ? AS anon_1, ? AS anon_2, ? AS anon_3"
, dialect=default.DefaultDialect(paramstyle='qmark'),
)
self.assert_compile(
select([literal("someliteral")]),
- "SELECT 'someliteral'",
+ "SELECT 'someliteral' AS anon_1",
dialect=dialect
)
def test_literal(self):
- self.assert_compile(select([literal('foo')]), "SELECT :param_1")
+ self.assert_compile(select([literal('foo')]), "SELECT :param_1 AS anon_1")
self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]),
"SELECT :param_1 || :param_2 AS anon_1 FROM mytable")
from test.lib.testing import eq_, assert_raises, assert_raises_message
import datetime
-from sqlalchemy import Sequence, Column, func
from sqlalchemy.schema import CreateSequence, DropSequence
-from sqlalchemy.sql import select, text
+from sqlalchemy.sql import select, text, literal_column
import sqlalchemy as sa
from test.lib import testing, engines
-from sqlalchemy import MetaData, Integer, String, ForeignKey, Boolean, exc
+from sqlalchemy import MetaData, Integer, String, ForeignKey, Boolean, exc,\
+ Sequence, Column, func, literal
+from sqlalchemy.types import TypeDecorator
from test.lib.schema import Table
from test.lib.testing import eq_
from test.sql import _base
metadata.drop_all()
+class SpecialTypePKTest(testing.TestBase):
+ """test process_result_value in conjunction with primary key columns.
+
+ Also tests that "autoincrement" checks are against column.type._type_affinity,
+ rather than the class of "type" itself.
+
+ """
+
+ @classmethod
+ def setup_class(cls):
+ class MyInteger(TypeDecorator):
+ impl = Integer
+ def process_bind_param(self, value, dialect):
+ return int(value[4:])
+
+ def process_result_value(self, value, dialect):
+ return "INT_%d" % value
+
+ cls.MyInteger = MyInteger
+
+ @testing.provide_metadata
+ def _run_test(self, *arg, **kw):
+ implicit_returning = kw.pop('implicit_returning', True)
+ kw['primary_key'] = True
+ t = Table('x', metadata,
+ Column('y', self.MyInteger, *arg, **kw),
+ Column('data', Integer),
+ implicit_returning=implicit_returning
+ )
+
+ t.create()
+ r = t.insert().values(data=5).execute()
+ eq_(r.inserted_primary_key, ['INT_1'])
+ r.close()
+
+ eq_(
+ t.select().execute().first(),
+ ('INT_1', 5)
+ )
+
+ def test_plain(self):
+ # among other things, tests that autoincrement
+ # is enabled.
+ self._run_test()
+
+ def test_literal_default_label(self):
+ self._run_test(default=literal("INT_1", type_=self.MyInteger).label('foo'))
+
+ def test_literal_default_no_label(self):
+ self._run_test(default=literal("INT_1", type_=self.MyInteger))
+
+ def test_sequence(self):
+ self._run_test(Sequence('foo_seq'))
+
+ @testing.fails_on('mysql', "Pending [ticket:2021]")
+ @testing.fails_on('sqlite', "Pending [ticket:2021]")
+ def test_server_default(self):
+ # note that the MySQL dialect has to not render AUTOINCREMENT on this one
+ self._run_test(server_default='1',)
+
+ @testing.fails_on('mysql', "Pending [ticket:2021]")
+ @testing.fails_on('sqlite', "Pending [ticket:2021]")
+ def test_server_default_no_autoincrement(self):
+ self._run_test(server_default='1', autoincrement=False)
+
+ def test_clause(self):
+ stmt = select([literal("INT_1", type_=self.MyInteger)]).as_scalar()
+ self._run_test(default=stmt)
+
+ @testing.requires.returning
+ def test_no_implicit_returning(self):
+ self._run_test(implicit_returning=False)
+
+ @testing.requires.returning
+ def test_server_default_no_implicit_returning(self):
+ self._run_test(server_default='1', autoincrement=False)
+
+
a_eq(prep(r"(\:that$other)"), "(:that$other)")
a_eq(prep(r".\:that$ :other."), ".:that$ ?.")
+ def test_select_from_bindparam(self):
+ """Test result row processing when selecting from a plain bind param."""
+
+ class MyInteger(TypeDecorator):
+ impl = Integer
+ def process_bind_param(self, value, dialect):
+ return int(value[4:])
+
+ def process_result_value(self, value, dialect):
+ return "INT_%d" % value
+
+ eq_(
+ testing.db.scalar(select([literal("INT_5", type_=MyInteger)])),
+ "INT_5"
+ )
+ eq_(
+ testing.db.scalar(select([literal("INT_5", type_=MyInteger).label('foo')])),
+ "INT_5"
+ )
+
+
def test_delete(self):
users.insert().execute(user_id = 7, user_name = 'jack')
users.insert().execute(user_id = 8, user_name = 'fred')