.. changelog::
:version: 1.1.0b1
+ .. change::
+ :tags: feature, orm
+ :tickets: 3631
+
+ Calling str() on a core SQL construct has been made more "friendly",
+ when the construct contains non-standard SQL elements such as
+ RETURNING, array index operations, or dialect-specific or custom
+ datatypes. A string is now returned in these cases rendering an
+ approximation of the construct (typically the Postgresql-style
+ version of it) rather than raising an error.
+
+ .. seealso::
+
+ :ref:`change_3631`
+
.. change::
:tags: bug, orm
:tickets: 3630
associated with any bound :class:`.Engine`, then the fallback to the
"default" dialect is used to generate the SQL string.
+.. seealso::
+
+ :ref:`change_3631`
+
:ticket:`3081`
New Features and Improvements - Core
:ticket:`2685`
+.. _change_3631:
+
+"Friendly" stringification of Core SQL constructs without a dialect
+-------------------------------------------------------------------
+
+Calling ``str()`` on a Core SQL construct will now produce a string
+in more cases than before, supporting various SQL constructs not normally
+present in default SQL such as RETURNING, array indexes, and non-standard
+datatypes::
+
+ >>> from sqlalchemy import table, column
+ t>>> t = table('x', column('a'), column('b'))
+ >>> print(t.insert().returning(t.c.a, t.c.b))
+ INSERT INTO x (a, b) VALUES (:a, :b) RETURNING x.a, x.b
+
+The ``str()`` function now calls upon an entirely separate dialect / compiler
+intended just for plain string printing without a specific dialect set up,
+so as more "just show me a string!" cases come up, these can be added
+to this dialect/compiler without impacting behaviors on real dialects.
+
+.. seealso::
+
+ :ref:`change_3081`
+
+:ticket:`3631`
+
.. _change_3531:
The type_coerce function is now a persistent SQL element
union,
union_all,
update,
+ within_group,
)
from .types import (
self.set_isolation_level(dbapi_conn, self.default_isolation_level)
+class StrCompileDialect(DefaultDialect):
+
+ statement_compiler = compiler.StrSQLCompiler
+ ddl_compiler = compiler.DDLCompiler
+ type_compiler = compiler.StrSQLTypeCompiler
+ preparer = compiler.IdentifierPreparer
+
+ supports_sequences = True
+ sequences_optional = True
+ preexecute_autoincrement_sequences = False
+ implicit_returning = False
+
+ supports_native_boolean = True
+
+ supports_simple_order_by_label = True
+
+
class DefaultExecutionContext(interfaces.ExecutionContext):
isinsert = False
isupdate = False
union,
union_all,
update,
+ within_group
)
from .visitors import ClauseVisitor
self.preparer.format_savepoint(savepoint_stmt)
+class StrSQLCompiler(SQLCompiler):
+ """"a compiler subclass with a few non-standard SQL features allowed.
+
+ Used for stringification of SQL statements when a real dialect is not
+ available.
+
+ """
+
+ def visit_getitem_binary(self, binary, operator, **kw):
+ return "%s[%s]" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw)
+ )
+
+ def returning_clause(self, stmt, returning_cols):
+
+ columns = [
+ self._label_select_column(None, c, True, False, {})
+ for c in elements._select_iterables(returning_cols)
+ ]
+
+ return 'RETURNING ' + ', '.join(columns)
+
+
class DDLCompiler(Compiled):
@util.memoized_property
return type_.get_col_spec(**kw)
+class StrSQLTypeCompiler(GenericTypeCompiler):
+ def __getattr__(self, key):
+ if key.startswith("visit_"):
+ return self._visit_unknown
+ else:
+ raise AttributeError(key)
+
+ def _visit_unknown(self, type_, **kw):
+ return "%s" % type_.__class__.__name__
+
+
class IdentifierPreparer(object):
"""Handle quoting and case-folding of identifiers based on options."""
dialect = self.bind.dialect
bind = self.bind
else:
- dialect = default.DefaultDialect()
+ dialect = default.StrCompileDialect()
return self._compiler(dialect, bind=bind, **kw)
def _compiler(self, dialect, **kw):
eq_, ne_, le_, is_, is_not_, startswith_, assert_raises, \
assert_raises_message, AssertsCompiledSQL, ComparesTables, \
AssertsExecutionResults, expect_deprecated, expect_warnings, \
- in_, not_in_
+ in_, not_in_, eq_ignore_whitespace
from .util import run_as_contextmanager, rowset, fail, \
provide_metadata, adict, force_drop_names, \
"""
-from sqlalchemy.testing import eq_, is_, assert_raises, assert_raises_message
+from sqlalchemy.testing import eq_, is_, assert_raises, \
+ assert_raises_message, eq_ignore_whitespace
from sqlalchemy import testing
from sqlalchemy.testing import fixtures, AssertsCompiledSQL
from sqlalchemy import Integer, String, MetaData, Table, Column, select, \
assert_raises_message(
exc.UnsupportedCompilationError,
- r"Compiler <sqlalchemy.sql.compiler.SQLCompiler .*"
+ r"Compiler <sqlalchemy.sql.compiler.StrSQLCompiler .*"
r"can't render element of type <class '.*SomeElement'>",
SomeElement().compile
)
assert_raises_message(
exc.UnsupportedCompilationError,
- r"Compiler <sqlalchemy.sql.compiler.SQLCompiler .*"
+ r"Compiler <sqlalchemy.sql.compiler.StrSQLCompiler .*"
r"can't render element of type <class '.*SomeElement'>",
SomeElement().compile
)
binary = BinaryExpression(column("foo"), column("bar"), myop)
assert_raises_message(
exc.UnsupportedCompilationError,
- r"Compiler <sqlalchemy.sql.compiler.SQLCompiler .*"
+ r"Compiler <sqlalchemy.sql.compiler.StrSQLCompiler .*"
r"can't render element of type <function.*",
binary.compile
)
+class StringifySpecialTest(fixtures.TestBase):
+ def test_basic(self):
+ stmt = select([table1]).where(table1.c.myid == 10)
+ eq_ignore_whitespace(
+ str(stmt),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = :myid_1"
+ )
+
+ def test_cte(self):
+ # stringify of these was supported anyway by defaultdialect.
+ stmt = select([table1.c.myid]).cte()
+ stmt = select([stmt])
+ eq_ignore_whitespace(
+ str(stmt),
+ "WITH anon_1 AS (SELECT mytable.myid AS myid FROM mytable) "
+ "SELECT anon_1.myid FROM anon_1"
+ )
+
+ def test_returning(self):
+ stmt = table1.insert().returning(table1.c.myid)
+
+ eq_ignore_whitespace(
+ str(stmt),
+ "INSERT INTO mytable (myid, name, description) "
+ "VALUES (:myid, :name, :description) RETURNING mytable.myid"
+ )
+
+ def test_array_index(self):
+ stmt = select([column('foo', types.ARRAY(Integer))[5]])
+
+ eq_ignore_whitespace(
+ str(stmt),
+ "SELECT foo[:foo_1] AS anon_1"
+ )
+
+ def test_unknown_type(self):
+ class MyType(types.TypeEngine):
+ __visit_name__ = 'mytype'
+
+ stmt = select([cast(table1.c.myid, MyType)])
+
+ eq_ignore_whitespace(
+ str(stmt),
+ "SELECT CAST(mytable.myid AS MyType) AS anon_1 FROM mytable"
+ )
+
+ def test_within_group(self):
+ # stringify of these was supported anyway by defaultdialect.
+ from sqlalchemy import within_group
+ stmt = select([
+ table1.c.myid,
+ within_group(
+ func.percentile_cont(0.5),
+ table1.c.name.desc()
+ )
+ ])
+ eq_ignore_whitespace(
+ str(stmt),
+ "SELECT mytable.myid, percentile_cont(:percentile_cont_1) "
+ "WITHIN GROUP (ORDER BY mytable.name DESC) AS anon_1 FROM mytable"
+ )
+
+
class KwargPropagationTest(fixtures.TestBase):
@classmethod
events, Unicode, types as sqltypes, bindparam, \
Table, Column, Boolean, Enum, func, text, TypeDecorator
from sqlalchemy import schema, exc
+from sqlalchemy.engine import default
from sqlalchemy.sql import elements, naming
import sqlalchemy as tsa
from sqlalchemy.testing import fixtures
exc.InvalidRequestError,
"Naming convention including \%\(constraint_name\)s token "
"requires that constraint is explicitly named.",
- schema.CreateTable(u1).compile
+ schema.CreateTable(u1).compile, dialect=default.DefaultDialect()
)
def test_schematype_no_ck_name_boolean_no_name(self):
assert c in s.c.bar.proxy_set
def test_no_error_on_unsupported_expr_key(self):
- from sqlalchemy.dialects.postgresql import ARRAY
+ from sqlalchemy.sql.expression import BinaryExpression
- t = table('t', column('x', ARRAY(Integer)))
+ def myop(x, y):
+ pass
+
+ t = table('t', column('x'), column('y'))
+
+ expr = BinaryExpression(t.c.x, t.c.y, myop)
- expr = t.c.x[5]
s = select([t, expr])
eq_(
s.c.keys(),
- ['x', expr.anon_label]
+ ['x', 'y', expr.anon_label]
)
def test_cloned_intersection(self):