replaced by inserted_primary_key.
- sql
+ - [feature] Major rework of operator system
+ in Core, to allow redefinition of existing
+ operators as well as addition of new operators
+ at the type level. New types can be created
+ from existing ones which add or redefine
+ operations that are exported out to column
+ expressions, in a similar manner to how the
+ ORM has allowed comparator_factory. The new
+ architecture moves this capability into the
+ Core so that it is consistently usable in
+ all cases, propagating cleanly using existing
+ type propagation behavior. [ticket:2547]
+
+ - [feature] To complement [ticket:2547], types
+ can now provide "bind expressions" and
+ "column expressions" which allow compile-time
+ injection of SQL expressions into statements
+ on a per-column or per-bind level. This is
+ to suit the use case of a type which needs
+ to augment bind- and result- behavior at the
+ SQL level, as opposed to in the Python level.
+ Allows for schemes like transparent encryption/
+ decryption, usage of Postgis functions, etc.
+ [ticket:1534]
+
- [feature] Revised the rules used to determine
the operator precedence for the user-defined
operator, i.e. that granted using the ``op()``
"""
def __init__(self, desc, srid=-1):
- assert isinstance(desc, basestring)
self.desc = desc
expression.Function.__init__(self, "ST_GeomFromText", desc, srid)
# override the __eq__() operator
def __eq__(self, other):
- return self.op('~=')(_to_postgis(other))
+ return self.op('~=')(other)
# add a custom operator
def intersects(self, other):
- return self.op('&&')(_to_postgis(other))
+ return self.op('&&')(other)
# any number of GIS operators can be overridden/added here
# using the techniques above.
+ def _coerce_compared_value(self, op, value):
+ return self
+
def get_col_spec(self):
return self.name
+ def bind_expression(self, bindvalue):
+ return TextualGisElement(bindvalue)
+
+ def column_expression(self, col):
+ return func.ST_AsText(col, type_=self)
+
def bind_processor(self, dialect):
def process(value):
- if value is not None:
+ if isinstance(value, GisElement):
return value.desc
else:
return value
table.columns = table.info.pop('_saved_columns')
setup_ddl_events()
-# ORM integration
-
def _to_postgis(value):
"""Interpret a value as a GIS-compatible construct.
else:
raise Exception("Invalid type")
-# without importing "orm", the "attribute_instrument"
-# event isn't even set up.
-from sqlalchemy import orm
-
-@event.listens_for(type, "attribute_instrument")
-def attribute_instrument(cls, key, inst):
- type_ = getattr(inst, "type", None)
- if isinstance(type_, Geometry):
- @event.listens_for(inst, "set", retval=True)
- def set_value(state, value, oldvalue, initiator):
- return _to_postgis(value)
# illustrate usage
session.commit()
# after flush and/or commit, all the TextualGisElements become PersistentGisElements.
- assert str(r.road_geom) == "01020000000200000000000000B832084100000000E813104100000000283208410000000088601041"
+ assert str(r.road_geom) == "LINESTRING(198231 263418,198213 268322)"
r1 = session.query(Road).filter(Road.road_name=='Graeme Ave').one()
"""Access uses "mod" instead of "%" """
return binary.operator == '%' and 'mod' or binary.operator
- def label_select_column(self, select, column, asfrom):
- if isinstance(column, expression.Function):
- return column.label()
- else:
- return super(AccessCompiler, self).\
- label_select_column(select, column, asfrom)
-
function_rewrites = {'current_date': 'now',
'current_timestamp': 'now',
'length': 'len',
return ""
def returning_clause(self, stmt, returning_cols):
-
columns = [
- self.process(
- self.label_select_column(None, c, asfrom=False),
- within_columns_clause=True,
- result_map=self.result_map
- )
+ self._label_select_column(None, c, True, False, {})
for c in expression._select_iterables(returning_cols)
]
+
return 'RETURNING ' + ', '.join(columns)
return ("ROLLBACK TRANSACTION %s"
% self.preparer.format_savepoint(savepoint_stmt))
- def visit_column(self, column, result_map=None, **kwargs):
+ def visit_column(self, column, add_to_result_map=None, **kwargs):
if column.table is not None and \
(not self.isupdate and not self.isdelete) or self.is_subquery():
# translate for schema-qualified table aliases
converted = expression._corresponding_column_or_error(
t, column)
- if result_map is not None:
- result_map[column.name
+ if add_to_result_map is not None:
+ self.result_map[column.name
if self.dialect.case_sensitive
else column.name.lower()] = \
- (column.name, (column, ),
+ (column.name, (column, ) + add_to_result_map,
column.type)
return super(MSSQLCompiler, self).\
visit_column(converted,
result_map=None, **kwargs)
- return super(MSSQLCompiler, self).visit_column(column,
- result_map=result_map,
- **kwargs)
+ return super(MSSQLCompiler, self).visit_column(
+ column, add_to_result_map=add_to_result_map, **kwargs)
def visit_binary(self, binary, **kwargs):
"""Move bind parameters to the right-hand side of an operator, where
target = stmt.table.alias("deleted")
adapter = sql_util.ClauseAdapter(target)
- def col_label(col):
- adapted = adapter.traverse(col)
- if isinstance(col, expression.Label):
- return adapted.label(c.key)
- else:
- return self.label_select_column(None, adapted, asfrom=False)
columns = [
- self.process(
- col_label(c),
- within_columns_clause=True,
- result_map=self.result_map
- )
- for c in expression._select_iterables(returning_cols)
- ]
+ self._label_select_column(None, adapter.traverse(c),
+ True, False, {})
+ for c in expression._select_iterables(returning_cols)
+ ]
+
return 'OUTPUT ' + ', '.join(columns)
def get_cte_preamble(self, recursive):
def returning_clause(self, stmt, returning_cols):
columns = [
- self.process(
- self.label_select_column(None, c, asfrom=False),
- within_columns_clause=True,
- result_map=self.result_map)
+ self._label_select_column(None, c, True, False, {})
for c in expression._select_iterables(returning_cols)
]
def quote(self):
return self.element.quote
+
class SQLCompiler(engine.Compiled):
"""Default implementation of Compiled.
def visit_grouping(self, grouping, asfrom=False, **kwargs):
return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")"
- def visit_label(self, label, result_map=None,
+ def visit_label(self, label,
+ add_to_result_map = None,
within_label_clause=False,
within_columns_clause=False, **kw):
# only render labels within the columns clause
else:
labelname = label.name
- if result_map is not None:
- result_map[labelname
+ if add_to_result_map is not None:
+ self.result_map[
+ labelname
if self.dialect.case_sensitive
- else labelname.lower()] = (
- label.name,
- (label, label.element, labelname, ) +
- label._alt_names,
- label.type)
+ else labelname.lower()
+ ] = (
+ label.name,
+ (label, label.element, labelname, ) +
+ label._alt_names +
+ add_to_result_map,
+ label.type,
+ )
return label.element._compiler_dispatch(self,
within_columns_clause=True,
within_columns_clause=False,
**kw)
- def visit_column(self, column, result_map=None, **kwargs):
+ def visit_column(self, column, add_to_result_map=None, **kwargs):
name = orig_name = column.name
if name is None:
raise exc.CompileError("Cannot compile Column object until "
if not is_literal and isinstance(name, sql._truncated_label):
name = self._truncated_identifier("colident", name)
- if result_map is not None:
- result_map[name
+ if add_to_result_map is not None:
+ self.result_map[
+ name
if self.dialect.case_sensitive
- else name.lower()] = (orig_name,
- (column, name, column.key),
- column.type)
+ else name.lower()
+ ] = (
+ orig_name,
+ (column, name, column.key) + add_to_result_map,
+ column.type
+ )
if is_literal:
name = self.escape_literal_column(name)
cast.typeclause._compiler_dispatch(self, **kwargs))
def visit_over(self, over, **kwargs):
- x ="%s OVER (" % over.func._compiler_dispatch(self, **kwargs)
+ x = "%s OVER (" % over.func._compiler_dispatch(self, **kwargs)
if over.partition_by is not None:
x += "PARTITION BY %s" % \
over.partition_by._compiler_dispatch(self, **kwargs)
return "EXTRACT(%s FROM %s)" % (field,
extract.expr._compiler_dispatch(self, **kwargs))
- def visit_function(self, func, result_map=None, **kwargs):
- if result_map is not None:
- result_map[func.name
+ def visit_function(self, func, add_to_result_map=None, **kwargs):
+ if add_to_result_map is not None:
+ self.result_map[
+ func.name
if self.dialect.case_sensitive
- else func.name.lower()] = \
- (func.name, None, func.type)
+ else func.name.lower()
+ ] = (func.name, add_to_result_map, func.type)
disp = getattr(self, "visit_%s_func" % func.name.lower(), None)
if disp:
else:
name = FUNCTIONS.get(func.__class__, func.name + "%(expr)s")
return ".".join(list(func.packagenames) + [name]) % \
- {'expr':self.function_argspec(func, **kwargs)}
+ {'expr': self.function_argspec(func, **kwargs)}
def visit_next_value_func(self, next_value, **kw):
return self.visit_sequence(next_value.sequence)
def visit_sequence(self, sequence):
raise NotImplementedError(
- "Dialect '%s' does not support sequence increments." % self.dialect.name
+ "Dialect '%s' does not support sequence increments." %
+ self.dialect.name
)
def function_argspec(self, func, **kwargs):
def visit_bindparam(self, bindparam, within_columns_clause=False,
- literal_binds=False, **kwargs):
+ literal_binds=False,
+ skip_bind_expression=False,
+ **kwargs):
+
+ if not skip_bind_expression and bindparam.type._has_bind_expression:
+ bind_expression = bindparam.type.bind_expression(bindparam)
+ return self.process(bind_expression,
+ skip_bind_expression=True)
if literal_binds or \
(within_columns_clause and \
else:
return alias.original._compiler_dispatch(self, **kwargs)
- def label_select_column(self, select, column, asfrom):
- """label columns present in a select()."""
+ def _label_select_column(self, select, column, populate_result_map,
+ asfrom, column_clause_args):
+ """produce labeled columns present in a select()."""
+
+ if column.type._has_column_expression:
+ col_expr = column.type.column_expression(column)
+ if populate_result_map:
+ add_to_result_map = (column, )
+ else:
+ add_to_result_map = None
+ else:
+ col_expr = column
+ if populate_result_map:
+ add_to_result_map = ()
+ else:
+ add_to_result_map = None
- if isinstance(column, sql.Label):
- return column
+ if isinstance(col_expr, sql.Label):
+ result_expr = col_expr
elif select is not None and \
select.use_labels and \
column._label:
- return _CompileLabel(
- column,
+ result_expr = _CompileLabel(
+ col_expr,
column._label,
alt_names=(column._key_label, )
)
not column.is_literal and \
column.table is not None and \
not isinstance(column.table, sql.Select):
- return _CompileLabel(column, sql._as_truncated(column.name),
- alt_names=(column.key,))
+ result_expr = _CompileLabel(col_expr,
+ sql._as_truncated(column.name),
+ alt_names=(column.key,))
elif not isinstance(column,
(sql.UnaryExpression, sql.TextClause)) \
and (not hasattr(column, 'name') or \
isinstance(column, sql.Function)):
- return _CompileLabel(column, column.anon_label)
+ result_expr = _CompileLabel(col_expr, column.anon_label)
+ elif col_expr is not column:
+ result_expr = _CompileLabel(col_expr, column.anon_label)
else:
- return column
+ result_expr = col_expr
+
+ return result_expr._compiler_dispatch(
+ self, within_columns_clause=True,
+ add_to_result_map=add_to_result_map,
+ **column_clause_args
+ )
+
def format_from_hint_text(self, sqltext, table, hint, iscrud):
hinttext = self.get_from_hint_text(table, hint)
# to outermost if existingfroms: correlate_froms =
# correlate_froms.union(existingfroms)
- self.stack.append({'from': correlate_froms, 'iswrapper'
- : iswrapper})
+ self.stack.append({'from': correlate_froms,
+ 'iswrapper': iswrapper})
- if compound_index==1 and not entry or entry.get('iswrapper', False):
- column_clause_args = {'result_map':self.result_map,
- 'positional_names':positional_names}
- else:
- column_clause_args = {'positional_names':positional_names}
+ populate_result_map = compound_index == 1 and not entry or \
+ entry.get('iswrapper', False)
+ column_clause_args = {'positional_names': positional_names}
# the actual list of columns to print in the SELECT column list.
inner_columns = [
c for c in [
- self.label_select_column(select, co, asfrom=asfrom).\
- _compiler_dispatch(self,
- within_columns_clause=True,
- **column_clause_args)
- for co in util.unique_list(select.inner_columns)
- ]
+ self._label_select_column(select, column,
+ populate_result_map, asfrom,
+ column_clause_args)
+ for column in util.unique_list(select.inner_columns)
+ ]
if c is not None
]
text += self.for_update_clause(select)
if self.ctes and \
- compound_index==1 and not entry:
- text = self._render_cte_clause() + text
+ compound_index == 1 and not entry:
+ text = self._render_cte_clause() + text
self.stack.pop(-1)
from . import exc, schema, util, processors, events, event
from .sql import operators
-from .sql.expression import _DefaultColumnComparator
+from .sql.expression import _DefaultColumnComparator, column, bindparam
from .util import pickle
from .util.compat import decimal
from .sql.visitors import Visitable
"""
return None
+ def column_expression(self, colexpr):
+ """Given a SELECT column expression, return a wrapping SQL expression."""
+
+ return None
+
+ @util.memoized_property
+ def _has_column_expression(self):
+ """memoized boolean, check if column_expression is implemented."""
+ return self.column_expression(column('x')) is not None
+
+ def bind_expression(self, bindvalue):
+ """"Given a bind value (i.e. a :class:`.BindParameter` instance),
+ return a SQL expression in its place.
+
+ This is typically a SQL function that wraps the existing value
+ in a bind. It is used for special data types that require
+ literals being wrapped in some special database function in all
+ cases, such as Postgis GEOMETRY types.
+
+ The method is evaluated at statement compile time, as opposed
+ to statement construction time.
+
+ Note that this method, when implemented, should always return
+ the exact same structure, without any conditional logic, as it
+ will be used during executemany() calls as well.
+
+ """
+ return None
+
+ @util.memoized_property
+ def _has_bind_expression(self):
+ """memoized boolean, check if bind_expression is implemented."""
+ return self.bind_expression(bindparam('x')) is not None
+
def compare_values(self, x, y):
"""Compare two values for equality."""
cls.dialect = default.DefaultDialect()
- @profiling.function_call_count(62)
+ @profiling.function_call_count()
def test_insert(self):
t1.insert().compile(dialect=self.dialect)
- @profiling.function_call_count(56)
+ @profiling.function_call_count()
def test_update(self):
t1.update().compile(dialect=self.dialect)
- @profiling.function_call_count(110)
def test_update_whereclause(self):
- t1.update().where(t1.c.c2==12).compile(dialect=self.dialect)
+ t1.update().where(t1.c.c2 == 12).compile(dialect=self.dialect)
+
+ @profiling.function_call_count()
+ def go():
+ t1.update().where(t1.c.c2 == 12).compile(dialect=self.dialect)
+ go()
- @profiling.function_call_count(139)
def test_select(self):
- s = select([t1], t1.c.c2==t2.c.c1)
+ # give some of the cached type values
+ # a chance to warm up
+ s = select([t1], t1.c.c2 == t2.c.c1)
s.compile(dialect=self.dialect)
+ @profiling.function_call_count()
+ def go():
+ s = select([t1], t1.c.c2 == t2.c.c1)
+ s.compile(dialect=self.dialect)
+ go()
\ No newline at end of file
-# /Users/classic/dev/sqla_comparators/./test/lib/profiles.txt
+# /Users/classic/dev/sqlalchemy/./test/lib/profiles.txt
# This file is written out on a per-environment basis.
-# For each test in aaa_profiling, the corresponding function and
+# For each test in aaa_profiling, the corresponding function and
# environment is located within this file. If it doesn't exist,
# the test is skipped.
-# If a callcount does exist, it is compared to what we received.
+# If a callcount does exist, it is compared to what we received.
# assertions are raised if the counts do not match.
-#
-# To add a new callcount test, apply the function_call_count
-# decorator and re-run the tests using the --write-profiles option -
+#
+# To add a new callcount test, apply the function_call_count
+# decorator and re-run the tests using the --write-profiles option -
# this file will be rewritten including the new count.
-#
+#
# TEST: test.aaa_profiling.test_compiler.CompileTest.test_insert
# TEST: test.aaa_profiling.test_compiler.CompileTest.test_select
-test.aaa_profiling.test_compiler.CompileTest.test_select 2.5_sqlite_pysqlite_nocextensions 149
-test.aaa_profiling.test_compiler.CompileTest.test_select 2.6_sqlite_pysqlite_nocextensions 149
-test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_mysql_mysqldb_cextensions 149
-test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_mysql_mysqldb_nocextensions 149
-test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_postgresql_psycopg2_cextensions 149
-test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_postgresql_psycopg2_nocextensions 149
-test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_sqlite_pysqlite_cextensions 149
-test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_sqlite_pysqlite_nocextensions 149
-test.aaa_profiling.test_compiler.CompileTest.test_select 3.2_postgresql_psycopg2_nocextensions 161
-test.aaa_profiling.test_compiler.CompileTest.test_select 3.2_sqlite_pysqlite_nocextensions 161
+test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_postgresql_psycopg2_nocextensions 133
+test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_sqlite_pysqlite_nocextensions 133
+
+# TEST: test.aaa_profiling.test_compiler.CompileTest.test_select_second_time
+
+test.aaa_profiling.test_compiler.CompileTest.test_select_second_time 2.7_sqlite_pysqlite_nocextensions 133
# TEST: test.aaa_profiling.test_compiler.CompileTest.test_update
# TEST: test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile
+test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_postgresql_psycopg2_nocextensions 14
+test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile 2.7_sqlite_pysqlite_nocextensions 14
# TEST: test.aaa_profiling.test_resultset.ResultSetTest.test_string
# TEST: test.aaa_profiling.test_zoomark.ZooMarkTest.test_profile_3_properties
-test.aaa_profiling.test_zoomark.ZooMarkTest.test_profile_3_properties 2.7_postgresql_psycopg2_cextensions 3093
-test.aaa_profiling.test_zoomark.ZooMarkTest.test_profile_3_properties 2.7_postgresql_psycopg2_nocextensions 3317
-test.aaa_profiling.test_zoomark.ZooMarkTest.test_profile_3_properties 3.2_postgresql_psycopg2_nocextensions 3094
+test.aaa_profiling.test_zoomark.ZooMarkTest.test_profile_3_properties 2.7_postgresql_psycopg2_nocextensions 3526
# TEST: test.aaa_profiling.test_zoomark.ZooMarkTest.test_profile_4_expressions
eq_(result2.fetchall(), [(2,False),])
class SequenceReturningTest(fixtures.TestBase):
- __requires__ = 'returning',
+ __requires__ = 'returning', 'sequences'
def setup(self):
meta = MetaData(testing.db)
--- /dev/null
+from sqlalchemy import Table, Column, String, func, MetaData, select
+from test.lib import fixtures, AssertsCompiledSQL, testing
+from test.lib.testing import eq_
+
+class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ def _fixture(self):
+ class MyString(String):
+ def bind_expression(self, bindvalue):
+ return func.lower(bindvalue)
+
+ def column_expression(self, col):
+ return func.lower(col)
+
+ test_table = Table(
+ 'test_table',
+ MetaData(), Column('x', String), Column('y', MyString)
+ )
+ return test_table
+
+ def test_select_cols(self):
+ table = self._fixture()
+
+ self.assert_compile(
+ select([table]),
+ "SELECT test_table.x, lower(test_table.y) AS y_1 FROM test_table"
+ )
+
+ def test_select_cols_use_labels(self):
+ table = self._fixture()
+
+ self.assert_compile(
+ select([table]).apply_labels(),
+ "SELECT test_table.x AS test_table_x, "
+ "lower(test_table.y) AS test_table_y FROM test_table"
+ )
+
+ def test_select_cols_use_labels_result_map_targeting(self):
+ table = self._fixture()
+
+ compiled = select([table]).apply_labels().compile()
+ assert table.c.y in compiled.result_map['test_table_y'][1]
+ assert table.c.x in compiled.result_map['test_table_x'][1]
+
+ # the lower() function goes into the result_map, we don't really
+ # need this but it's fine
+ self.assert_compile(
+ compiled.result_map['test_table_y'][1][1],
+ "lower(test_table.y)"
+ )
+ # then the original column gets put in there as well.
+ # it's not important that it's the last value.
+ self.assert_compile(
+ compiled.result_map['test_table_y'][1][-1],
+ "test_table.y"
+ )
+
+ def test_insert_binds(self):
+ table = self._fixture()
+
+ self.assert_compile(
+ table.insert(),
+ "INSERT INTO test_table (x, y) VALUES (:x, lower(:y))"
+ )
+
+ self.assert_compile(
+ table.insert().values(y="hi"),
+ "INSERT INTO test_table (y) VALUES (lower(:y))"
+ )
+
+ def test_select_binds(self):
+ table = self._fixture()
+ self.assert_compile(
+ select([table]).where(table.c.y == "hi"),
+ "SELECT test_table.x, lower(test_table.y) AS y_1 FROM "
+ "test_table WHERE test_table.y = lower(:y_2)"
+ )
+
+class RoundTripTest(fixtures.TablesTest):
+ @classmethod
+ def define_tables(cls, metadata):
+ class MyString(String):
+ def bind_expression(self, bindvalue):
+ return func.lower(bindvalue)
+
+ def column_expression(self, col):
+ return func.upper(col)
+
+ Table(
+ 'test_table',
+ metadata,
+ Column('x', String(50)),
+ Column('y', MyString(50)
+ )
+ )
+
+ def test_round_trip(self):
+ testing.db.execute(
+ self.tables.test_table.insert(),
+ {"x": "X1", "y": "Y1"},
+ {"x": "X2", "y": "Y2"},
+ {"x": "X3", "y": "Y3"},
+ )
+
+ # test insert coercion alone
+ eq_(
+ testing.db.execute(
+ "select * from test_table order by y").fetchall(),
+ [
+ ("X1", "y1"),
+ ("X2", "y2"),
+ ("X3", "y3"),
+ ]
+ )
+
+ # conversion back to upper
+ eq_(
+ testing.db.execute(
+ select([self.tables.test_table]).\
+ order_by(self.tables.test_table.c.y)
+ ).fetchall(),
+ [
+ ("X1", "Y1"),
+ ("X2", "Y2"),
+ ("X3", "Y3"),
+ ]
+ )
+
+ def test_targeting_no_labels(self):
+ testing.db.execute(
+ self.tables.test_table.insert(),
+ {"x": "X1", "y": "Y1"},
+ )
+ row = testing.db.execute(select([self.tables.test_table])).first()
+ eq_(
+ row[self.tables.test_table.c.y],
+ "Y1"
+ )
+
+ def test_targeting_apply_labels(self):
+ testing.db.execute(
+ self.tables.test_table.insert(),
+ {"x": "X1", "y": "Y1"},
+ )
+ row = testing.db.execute(select([self.tables.test_table]).
+ apply_labels()).first()
+ eq_(
+ row[self.tables.test_table.c.y],
+ "Y1"
+ )
+
+ @testing.fails_if(lambda: True, "still need to propagate "
+ "result_map more effectively")
+ def test_targeting_individual_labels(self):
+ testing.db.execute(
+ self.tables.test_table.insert(),
+ {"x": "X1", "y": "Y1"},
+ )
+ row = testing.db.execute(select([
+ self.tables.test_table.c.x.label('xbar'),
+ self.tables.test_table.c.y.label('ybar')
+ ])).first()
+ eq_(
+ row[self.tables.test_table.c.y],
+ "Y1"
+ )
+
+class ReturningTest(fixtures.TablesTest):
+ __requires__ = 'returning',
+
+ @classmethod
+ def define_tables(cls, metadata):
+ class MyString(String):
+ def column_expression(self, col):
+ return func.lower(col)
+
+ Table(
+ 'test_table',
+ metadata, Column('x', String(50)),
+ Column('y', MyString(50), server_default="YVALUE")
+ )
+
+ @testing.provide_metadata
+ def test_insert_returning(self):
+ table = self.tables.test_table
+ result = testing.db.execute(
+ table.insert().returning(table.c.y),
+ {"x": "xvalue"}
+ )
+ eq_(
+ result.first(),
+ ("yvalue",)
+ )
+
+