--- /dev/null
+.. change::
+ :tags: bug, engine
+ :tickets: 5004
+
+ Revised the :paramref:`.Connection.execution_options.schema_translate_map`
+ feature such that the processing of the SQL statement to receive a specific
+ schema name occurs within the execution phase of the statement, rather than
+ at the compile phase. This is to support the statement being efficiently
+ cached. Previously, the current schema being rendered into the statement
+ for a particular run would be considered as part of the cache key itself,
+ meaning that for a run against hundreds of schemas, there would be hundreds
+ of cache keys, rendering the cache much less performant. The new behavior
+ is that the rendering is done in a similar manner as the "post compile"
+ rendering added in 1.4 as part of :ticket:`4645`, :ticket:`4808`.
)
return bool(info)
+ def _get_default_schema_name(self, connection):
+ return "main"
+
@reflection.cache
def get_view_names(self, connection, schema=None, **kw):
if schema is not None:
from .. import log
from .. import util
from ..sql import compiler
-from ..sql import schema
from ..sql import util as sql_util
"""
- schema_for_object = schema._schema_getter(None)
- """Return the ".schema" attribute for an object.
-
- Used for :class:`.Table`, :class:`.Sequence` and similar objects,
- and takes into account
- the :paramref:`.Connection.execution_options.schema_translate_map`
- parameter.
-
- .. versionadded:: 1.1
-
- .. seealso::
-
- :ref:`schema_translating`
-
- """
+ _schema_translate_map = None
def __init__(
self,
self.should_close_with_result = False
self.dispatch = _dispatch
self._has_events = _branch_from._has_events
- self.schema_for_object = _branch_from.schema_for_object
+ self._schema_translate_map = _branch_from._schema_translate_map
else:
self.__connection = (
connection
if self._has_events or self.engine._has_events:
self.dispatch.engine_connect(self, self.__branch)
+ def schema_for_object(self, obj):
+ """return the schema name for the given schema item taking into
+ account current schema translate map.
+
+ """
+
+ name = obj.schema
+ schema_translate_map = self._schema_translate_map
+
+ if (
+ schema_translate_map
+ and name in schema_translate_map
+ and obj._use_schema_map
+ ):
+ return schema_translate_map[name]
+ else:
+ return name
+
def _branch(self):
"""Return a new Connection which references this Connection's
engine and connection; but does not have close_with_result enabled,
dialect = self.dialect
compiled = ddl.compile(
- dialect=dialect,
- schema_translate_map=self.schema_for_object
- if not self.schema_for_object.is_default
- else None,
+ dialect=dialect, schema_translate_map=self._schema_translate_map
)
ret = self._execute_context(
dialect,
dialect,
elem,
tuple(sorted(keys)),
- self.schema_for_object.hash_key,
+ bool(self._schema_translate_map),
len(distilled_params) > 1,
)
compiled_sql = self._execution_options["compiled_cache"].get(key)
dialect=dialect,
column_keys=keys,
inline=len(distilled_params) > 1,
- schema_translate_map=self.schema_for_object
- if not self.schema_for_object.is_default
- else None,
+ schema_translate_map=self._schema_translate_map,
linting=self.dialect.compiler_linting
| compiler.WARN_LINTING,
)
dialect=dialect,
column_keys=keys,
inline=len(distilled_params) > 1,
- schema_translate_map=self.schema_for_object
- if not self.schema_for_object.is_default
- else None,
+ schema_translate_map=self._schema_translate_map,
linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
)
_has_events = False
_connection_cls = Connection
- schema_for_object = schema._schema_getter(None)
- """Return the ".schema" attribute for an object.
-
- Used for :class:`.Table`, :class:`.Sequence` and similar objects,
- and takes into account
- the :paramref:`.Connection.execution_options.schema_translate_map`
- parameter.
-
- .. versionadded:: 1.1
-
- .. seealso::
-
- :ref:`schema_translating`
-
- """
+ _schema_translate_map = None
def __init__(
self,
from .. import util
from ..sql import compiler
from ..sql import expression
-from ..sql import schema
from ..sql.elements import quoted_name
AUTOCOMMIT_REGEXP = re.compile(
server_version_info = None
+ default_schema_name = None
+
construct_arguments = None
"""Optional set of argument specifiers for various SQLAlchemy
constructs, typically schema items.
self._set_connection_isolation(connection, isolation_level)
if "schema_translate_map" in opts:
- getter = schema._schema_getter(opts["schema_translate_map"])
- engine.schema_for_object = getter
+ engine._schema_translate_map = map_ = opts["schema_translate_map"]
@event.listens_for(engine, "engine_connect")
def set_schema_translate_map(connection, branch):
- connection.schema_for_object = getter
+ connection._schema_translate_map = map_
def set_connection_execution_options(self, connection, opts):
if "isolation_level" in opts:
self._set_connection_isolation(connection, opts["isolation_level"])
if "schema_translate_map" in opts:
- getter = schema._schema_getter(opts["schema_translate_map"])
- connection.schema_for_object = getter
+ connection._schema_translate_map = opts["schema_translate_map"]
def _set_connection_isolation(self, connection, level):
if connection.in_transaction():
self.execution_options = dict(self.execution_options)
self.execution_options.update(connection._execution_options)
+ self.unicode_statement = util.text_type(compiled)
+ if compiled.schema_translate_map:
+ rst = compiled.preparer._render_schema_translates
+ self.unicode_statement = rst(
+ self.unicode_statement, connection._schema_translate_map
+ )
+
if not dialect.supports_unicode_statements:
- self.unicode_statement = util.text_type(compiled)
self.statement = dialect._encoder(self.unicode_statement)[0]
else:
- self.statement = self.unicode_statement = util.text_type(compiled)
+ self.statement = self.unicode_statement
self.cursor = self.create_cursor()
self.compiled_parameters = []
elif compiled.positional:
positiontup = self.compiled.positiontup
+ if compiled.schema_translate_map:
+ rst = compiled.preparer._render_schema_translates
+ self.unicode_statement = rst(
+ self.unicode_statement, connection._schema_translate_map
+ )
+
# final self.unicode_statement is now assigned, encode if needed
# by dialect
if not dialect.supports_unicode_statements:
from . import url as _url
from .. import util
from ..sql import ddl
-from ..sql import schema
class MockConnection(base.Connectable):
dialect = property(attrgetter("_dialect"))
name = property(lambda s: s._dialect.name)
- schema_for_object = schema._schema_getter(None)
+ def schema_for_object(self, obj):
+ return obj.schema
def connect(self, **kwargs):
return self
dialect = self.bind.dialect
- schema = self.bind.schema_for_object(table)
+ with self._operation_context() as conn:
+ schema = conn.schema_for_object(table)
table_name = table.name
import collections
import contextlib
import itertools
+import operator
import re
from . import base
from . import selectable
from . import sqltypes
from .base import NO_ARG
+from .elements import quoted_name
from .. import exc
from .. import util
_cached_metadata = None
+ schema_translate_map = None
+
execution_options = util.immutabledict()
"""
Execution options propagated from the statement. In some cases,
statement,
bind=None,
schema_translate_map=None,
+ render_schema_translate=False,
compile_kwargs=util.immutabledict(),
):
"""Construct a new :class:`.Compiled` object.
self.bind = bind
self.preparer = self.dialect.identifier_preparer
if schema_translate_map:
+ self.schema_translate_map = schema_translate_map
self.preparer = self.preparer._with_schema_translate(
schema_translate_map
)
self.execution_options = statement._execution_options
self.string = self.process(self.statement, **compile_kwargs)
+ if render_schema_translate:
+ self.string = self.preparer._render_schema_translates(
+ self.string, schema_translate_map
+ )
+
@util.deprecated(
"0.7",
"The :meth:`.Compiled.compile` method is deprecated and will be "
return self.sql_compiler.post_process_text(ddl.statement % context)
- def visit_create_schema(self, create):
+ def visit_create_schema(self, create, **kw):
schema = self.preparer.format_schema(create.element)
return "CREATE SCHEMA " + schema
- def visit_drop_schema(self, drop):
+ def visit_drop_schema(self, drop, **kw):
schema = self.preparer.format_schema(drop.element)
text = "DROP SCHEMA " + schema
if drop.cascade:
text += " CASCADE"
return text
- def visit_create_table(self, create):
+ def visit_create_table(self, create, **kw):
table = create.element
preparer = self.preparer
text += "\n)%s\n\n" % self.post_create_table(table)
return text
- def visit_create_column(self, create, first_pk=False):
+ def visit_create_column(self, create, first_pk=False, **kw):
column = create.element
if column.system:
return text
def create_table_constraints(
- self, table, _include_foreign_key_constraints=None
+ self, table, _include_foreign_key_constraints=None, **kw
):
# On some DB order is significant: visit PK first, then the
if p is not None
)
- def visit_drop_table(self, drop):
+ def visit_drop_table(self, drop, **kw):
return "\nDROP TABLE " + self.preparer.format_table(drop.element)
- def visit_drop_view(self, drop):
+ def visit_drop_view(self, drop, **kw):
return "\nDROP VIEW " + self.preparer.format_table(drop.element)
def _verify_index_table(self, index):
)
def visit_create_index(
- self, create, include_schema=False, include_table_schema=True
+ self, create, include_schema=False, include_table_schema=True, **kw
):
index = create.element
self._verify_index_table(index)
)
return text
- def visit_drop_index(self, drop):
+ def visit_drop_index(self, drop, **kw):
index = drop.element
if index.name is None:
index_name = schema_name + "." + index_name
return index_name
- def visit_add_constraint(self, create):
+ def visit_add_constraint(self, create, **kw):
return "ALTER TABLE %s ADD %s" % (
self.preparer.format_table(create.element.table),
self.process(create.element),
)
- def visit_set_table_comment(self, create):
+ def visit_set_table_comment(self, create, **kw):
return "COMMENT ON TABLE %s IS %s" % (
self.preparer.format_table(create.element),
self.sql_compiler.render_literal_value(
),
)
- def visit_drop_table_comment(self, drop):
+ def visit_drop_table_comment(self, drop, **kw):
return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
drop.element
)
- def visit_set_column_comment(self, create):
+ def visit_set_column_comment(self, create, **kw):
return "COMMENT ON COLUMN %s IS %s" % (
self.preparer.format_column(
create.element, use_table=True, use_schema=True
),
)
- def visit_drop_column_comment(self, drop):
+ def visit_drop_column_comment(self, drop, **kw):
return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column(
drop.element, use_table=True
)
- def visit_create_sequence(self, create):
+ def visit_create_sequence(self, create, **kw):
text = "CREATE SEQUENCE %s" % self.preparer.format_sequence(
create.element
)
text += " CYCLE"
return text
- def visit_drop_sequence(self, drop):
+ def visit_drop_sequence(self, drop, **kw):
return "DROP SEQUENCE %s" % self.preparer.format_sequence(drop.element)
- def visit_drop_constraint(self, drop):
+ def visit_drop_constraint(self, drop, **kw):
constraint = drop.element
if constraint.name is not None:
formatted_name = self.preparer.format_constraint(constraint)
else:
return self.visit_check_constraint(constraint)
- def visit_check_constraint(self, constraint):
+ def visit_check_constraint(self, constraint, **kw):
text = ""
if constraint.name is not None:
formatted_name = self.preparer.format_constraint(constraint)
text += self.define_constraint_deferrability(constraint)
return text
- def visit_column_check_constraint(self, constraint):
+ def visit_column_check_constraint(self, constraint, **kw):
text = ""
if constraint.name is not None:
formatted_name = self.preparer.format_constraint(constraint)
text += self.define_constraint_deferrability(constraint)
return text
- def visit_primary_key_constraint(self, constraint):
+ def visit_primary_key_constraint(self, constraint, **kw):
if len(constraint) == 0:
return ""
text = ""
text += self.define_constraint_deferrability(constraint)
return text
- def visit_foreign_key_constraint(self, constraint):
+ def visit_foreign_key_constraint(self, constraint, **kw):
preparer = self.preparer
text = ""
if constraint.name is not None:
return preparer.format_table(table)
- def visit_unique_constraint(self, constraint):
+ def visit_unique_constraint(self, constraint, **kw):
if len(constraint) == 0:
return ""
text = ""
text += " MATCH %s" % constraint.match
return text
- def visit_computed_column(self, generated):
+ def visit_computed_column(self, generated, **kw):
text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
generated.sqltext, include_table=False, literal_binds=True
)
illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS
- schema_for_object = schema._schema_getter(None)
+ schema_for_object = operator.attrgetter("schema")
+ """Return the .schema attribute for an object.
+
+ For the default IdentifierPreparer, the schema for an object is always
+ the value of the ".schema" attribute. if the preparer is replaced
+ with one that has a non-empty schema_translate_map, the value of the
+ ".schema" attribute is rendered a symbol that will be converted to a
+ real schema name from the mapping post-compile.
+
+ """
def __init__(
self,
def _with_schema_translate(self, schema_translate_map):
prep = self.__class__.__new__(self.__class__)
prep.__dict__.update(self.__dict__)
- prep.schema_for_object = schema._schema_getter(schema_translate_map)
+
+ def symbol_getter(obj):
+ name = obj.schema
+ if name in schema_translate_map and obj._use_schema_map:
+ return quoted_name(
+ "[SCHEMA_%s]" % (name or "_none"), quote=False
+ )
+ else:
+ return obj.schema
+
+ prep.schema_for_object = symbol_getter
return prep
+ def _render_schema_translates(self, statement, schema_translate_map):
+ d = schema_translate_map
+ if None in d:
+ d["_none"] = d[None]
+
+ def replace(m):
+ name = m.group(2)
+ effective_schema = d[name]
+ if not effective_schema:
+ effective_schema = self.dialect.default_schema_name
+ if not effective_schema:
+ # TODO: no coverage here
+ raise exc.CompileError(
+ "Dialect has no default schema name; can't "
+ "use None as dynamic schema target."
+ )
+ return self.quote(effective_schema)
+
+ return re.sub(r"(\[SCHEMA_([\w\d_]+)\])", replace, statement)
+
def _escape_identifier(self, value):
"""Escape an identifier.
from __future__ import absolute_import
import collections
-import operator
import sqlalchemy
from . import coercions
schema_item.dispatch._update(self.dispatch)
return schema_item
- def _translate_schema(self, effective_schema, map_):
- return map_.get(effective_schema, effective_schema)
+ _use_schema_map = True
class Table(DialectKWArgs, SchemaItem, TableClause):
e.dispose()
-class _SchemaTranslateMap(object):
- """Provide translation of schema names based on a mapping.
-
- Also provides helpers for producing cache keys and optimized
- access when no mapping is present.
-
- Used by the :paramref:`.Connection.execution_options.schema_translate_map`
- feature.
-
- .. versionadded:: 1.1
-
-
- """
-
- __slots__ = "map_", "__call__", "hash_key", "is_default"
-
- _default_schema_getter = operator.attrgetter("schema")
-
- def __init__(self, map_):
- self.map_ = map_
- if map_ is not None:
-
- def schema_for_object(obj):
- effective_schema = self._default_schema_getter(obj)
- effective_schema = obj._translate_schema(
- effective_schema, map_
- )
- return effective_schema
-
- self.__call__ = schema_for_object
- self.hash_key = ";".join(
- "%s=%s" % (k, map_[k]) for k in sorted(map_, key=str)
- )
- self.is_default = False
- else:
- self.hash_key = 0
- self.__call__ = self._default_schema_getter
- self.is_default = True
-
- @classmethod
- def _schema_getter(cls, map_):
- if map_ is None:
- return _default_schema_map
- elif isinstance(map_, _SchemaTranslateMap):
- return map_
- else:
- return _SchemaTranslateMap(map_)
-
-
-_default_schema_map = _SchemaTranslateMap(None)
-_schema_getter = _SchemaTranslateMap._schema_getter
-
-
class Computed(FetchedValue, SchemaItem):
"""Defines a generated column, i.e. "GENERATED ALWAYS AS" syntax.
_is_from_clause = True
_is_join = False
- def _translate_schema(self, effective_schema, map_):
- return effective_schema
+ _use_schema_map = False
_memoized_property = util.group_expirable_memoized_property(["_columns"])
"""
+ _use_schema_map = True
+
def __init__(
self,
name=None,
util.portable_instancemethod(self._on_metadata_drop),
)
- def _translate_schema(self, effective_schema, map_):
- return map_.get(effective_schema, effective_schema)
-
def _set_parent(self, column):
column._on_table_attach(util.portable_instancemethod(self._set_table))
literal_binds=False,
render_postcompile=False,
schema_translate_map=None,
+ render_schema_translate=False,
+ default_schema_name=None,
inline_flag=None,
):
if use_default_dialect:
elif isinstance(dialect, util.string_types):
dialect = url.URL(dialect).get_dialect()()
+ if default_schema_name:
+ dialect.default_schema_name = default_schema_name
+
kw = {}
compile_kwargs = {}
if render_postcompile:
compile_kwargs["render_postcompile"] = True
+ if render_schema_translate:
+ kw["render_schema_translate"] = True
+
from sqlalchemy import orm
if isinstance(clause, orm.Query):
context = execute_observed.context
compare_dialect = self._compile_dialect(execute_observed)
+
+ if "schema_translate_map" in context.execution_options:
+ map_ = context.execution_options["schema_translate_map"]
+ else:
+ map_ = None
+
if isinstance(context.compiled.statement, _DDLCompiles):
+
compiled = context.compiled.statement.compile(
- dialect=compare_dialect,
- schema_translate_map=context.execution_options.get(
- "schema_translate_map"
- ),
+ dialect=compare_dialect, schema_translate_map=map_
)
else:
compiled = context.compiled.statement.compile(
dialect=compare_dialect,
column_keys=context.compiled.column_keys,
inline=context.compiled.inline,
- schema_translate_map=context.execution_options.get(
- "schema_translate_map"
- ),
+ schema_translate_map=map_,
)
_received_statement = re.sub(r"[\n\t]", "", util.text_type(compiled))
parameters = execute_observed.parameters
@testing.requires.schema_reflection
def test_dialect_initialize(self):
engine = engines.testing_engine()
- assert not hasattr(engine.dialect, "default_schema_name")
inspect(engine)
assert hasattr(engine.dialect, "default_schema_name")
postgresql.CreateEnumType(e1),
"CREATE TYPE foo.somename AS ENUM ('x', 'y', 'z')",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
self.assert_compile(
postgresql.CreateEnumType(e2),
"CREATE TYPE bar.somename AS ENUM ('x', 'y', 'z')",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
def test_create_table_with_schema_type_schema_translate(self):
CreateTable(table),
"CREATE TABLE foo.some_table (q foo.somename, p bar.somename)",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
def test_create_table_with_tablespace(self):
conn.execute(ins, {"q": 2})
eq_(conn.scalar(stmt), 2)
+ with config.db.connect().execution_options(
+ compiled_cache=cache, schema_translate_map={None: None},
+ ) as conn:
+ # should use default schema again even though statement
+ # was compiled with test_schema in the map
+ eq_(conn.scalar(stmt), 1)
+
with config.db.connect().execution_options(
compiled_cache=cache
) as conn:
t1.drop(conn)
asserter.assert_(
- CompiledSQL("CREATE TABLE %s.t1 (x INTEGER)" % config.test_schema),
- CompiledSQL("CREATE TABLE %s.t2 (x INTEGER)" % config.test_schema),
- CompiledSQL("CREATE TABLE t3 (x INTEGER)"),
- CompiledSQL("DROP TABLE t3"),
- CompiledSQL("DROP TABLE %s.t2" % config.test_schema),
- CompiledSQL("DROP TABLE %s.t1" % config.test_schema),
+ CompiledSQL("CREATE TABLE [SCHEMA__none].t1 (x INTEGER)"),
+ CompiledSQL("CREATE TABLE [SCHEMA_foo].t2 (x INTEGER)"),
+ CompiledSQL("CREATE TABLE [SCHEMA_bar].t3 (x INTEGER)"),
+ CompiledSQL("DROP TABLE [SCHEMA_bar].t3"),
+ CompiledSQL("DROP TABLE [SCHEMA_foo].t2"),
+ CompiledSQL("DROP TABLE [SCHEMA__none].t1"),
)
def _fixture(self):
conn.execute(t3.delete())
asserter.assert_(
+ CompiledSQL("INSERT INTO [SCHEMA__none].t1 (x) VALUES (:x)"),
+ CompiledSQL("INSERT INTO [SCHEMA_foo].t2 (x) VALUES (:x)"),
+ CompiledSQL("INSERT INTO [SCHEMA_bar].t3 (x) VALUES (:x)"),
CompiledSQL(
- "INSERT INTO %s.t1 (x) VALUES (:x)" % config.test_schema
- ),
- CompiledSQL(
- "INSERT INTO %s.t2 (x) VALUES (:x)" % config.test_schema
+ "UPDATE [SCHEMA__none].t1 SET x=:x WHERE "
+ "[SCHEMA__none].t1.x = :x_1"
),
- CompiledSQL("INSERT INTO t3 (x) VALUES (:x)"),
CompiledSQL(
- "UPDATE %s.t1 SET x=:x WHERE %s.t1.x = :x_1"
- % (config.test_schema, config.test_schema)
+ "UPDATE [SCHEMA_foo].t2 SET x=:x WHERE "
+ "[SCHEMA_foo].t2.x = :x_1"
),
CompiledSQL(
- "UPDATE %s.t2 SET x=:x WHERE %s.t2.x = :x_1"
- % (config.test_schema, config.test_schema)
+ "UPDATE [SCHEMA_bar].t3 SET x=:x WHERE "
+ "[SCHEMA_bar].t3.x = :x_1"
),
- CompiledSQL("UPDATE t3 SET x=:x WHERE t3.x = :x_1"),
- CompiledSQL(
- "SELECT %s.t1.x FROM %s.t1"
- % (config.test_schema, config.test_schema)
- ),
- CompiledSQL(
- "SELECT %s.t2.x FROM %s.t2"
- % (config.test_schema, config.test_schema)
- ),
- CompiledSQL("SELECT t3.x FROM t3"),
- CompiledSQL("DELETE FROM %s.t1" % config.test_schema),
- CompiledSQL("DELETE FROM %s.t2" % config.test_schema),
- CompiledSQL("DELETE FROM t3"),
+ CompiledSQL("SELECT [SCHEMA__none].t1.x FROM [SCHEMA__none].t1"),
+ CompiledSQL("SELECT [SCHEMA_foo].t2.x FROM [SCHEMA_foo].t2"),
+ CompiledSQL("SELECT [SCHEMA_bar].t3.x FROM [SCHEMA_bar].t3"),
+ CompiledSQL("DELETE FROM [SCHEMA__none].t1"),
+ CompiledSQL("DELETE FROM [SCHEMA_foo].t2"),
+ CompiledSQL("DELETE FROM [SCHEMA_bar].t3"),
)
@testing.provide_metadata
conn = eng.connect()
conn.execute(select([t2.c.x]))
asserter.assert_(
- CompiledSQL(
- "SELECT %s.t2.x FROM %s.t2"
- % (config.test_schema, config.test_schema)
- )
+ CompiledSQL("SELECT [SCHEMA_foo].t2.x FROM [SCHEMA_foo].t2")
)
schema_translate_map = {None: "z", "bar": None, "foo": "bat"}
+ self.assert_compile(
+ schema.CreateTable(t1),
+ "CREATE TABLE [SCHEMA__none].t1 (q INTEGER)",
+ schema_translate_map=schema_translate_map,
+ )
self.assert_compile(
schema.CreateTable(t1),
"CREATE TABLE z.t1 (q INTEGER)",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
+ self.assert_compile(
+ schema.CreateTable(t2),
+ "CREATE TABLE [SCHEMA_foo].t2 (q INTEGER)",
+ schema_translate_map=schema_translate_map,
+ )
self.assert_compile(
schema.CreateTable(t2),
"CREATE TABLE bat.t2 (q INTEGER)",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
self.assert_compile(
schema.CreateTable(t3),
- "CREATE TABLE t3 (q INTEGER)",
+ "CREATE TABLE [SCHEMA_bar].t3 (q INTEGER)",
schema_translate_map=schema_translate_map,
)
+ self.assert_compile(
+ schema.CreateTable(t3),
+ "CREATE TABLE main.t3 (q INTEGER)",
+ schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
+ default_schema_name="main",
+ )
def test_schema_translate_map_sequence(self):
s1 = schema.Sequence("s1")
self.assert_compile(
schema.CreateSequence(s1),
- "CREATE SEQUENCE z.s1",
+ "CREATE SEQUENCE [SCHEMA__none].s1",
schema_translate_map=schema_translate_map,
)
self.assert_compile(
schema.CreateSequence(s2),
- "CREATE SEQUENCE bat.s2",
+ "CREATE SEQUENCE [SCHEMA_foo].s2",
schema_translate_map=schema_translate_map,
)
self.assert_compile(
schema.CreateSequence(s3),
- "CREATE SEQUENCE s3",
+ "CREATE SEQUENCE [SCHEMA_bar].s3",
schema_translate_map=schema_translate_map,
)
"bar.mytable.description FROM bar.mytable "
"WHERE bar.mytable.name = :name_1",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
self.assert_compile(
"foob.remotetable.value FROM foob.remotetable "
"WHERE foob.remotetable.value = :value_1",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
schema_translate_map = {"remote_owner": "foob"}
"foob.remotetable.value FROM mytable JOIN foob.remotetable "
"ON mytable.myid = foob.remotetable.rem_id",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
def test_schema_translate_aliases(self):
.where(alias.c.name == "foo")
)
+ self.assert_compile(
+ stmt,
+ "SELECT [SCHEMA__none].myothertable.otherid, "
+ "[SCHEMA__none].myothertable.othername, "
+ "mytable_1.myid, mytable_1.name, mytable_1.description "
+ "FROM [SCHEMA__none].myothertable JOIN "
+ "[SCHEMA__none].mytable AS mytable_1 "
+ "ON [SCHEMA__none].myothertable.otherid = mytable_1.myid "
+ "WHERE mytable_1.name = :name_1",
+ schema_translate_map=schema_translate_map,
+ )
+
self.assert_compile(
stmt,
"SELECT bar.myothertable.otherid, bar.myothertable.othername, "
"ON bar.myothertable.otherid = mytable_1.myid "
"WHERE mytable_1.name = :name_1",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
def test_schema_translate_crud(self):
table1.insert().values(description="foo"),
"INSERT INTO bar.mytable (description) VALUES (:description)",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
self.assert_compile(
"UPDATE bar.mytable SET description=:description "
"WHERE bar.mytable.name = :name_1",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
self.assert_compile(
table1.delete().where(table1.c.name == "hi"),
"DELETE FROM bar.mytable WHERE bar.mytable.name = :name_1",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
self.assert_compile(
table4.insert().values(value="there"),
"INSERT INTO foob.remotetable (value) VALUES (:value)",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
self.assert_compile(
"UPDATE foob.remotetable SET value=:value "
"WHERE foob.remotetable.value = :value_1",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
self.assert_compile(
"DELETE FROM foob.remotetable WHERE "
"foob.remotetable.value = :value_1",
schema_translate_map=schema_translate_map,
+ render_schema_translate=True,
)
def test_alias(self):
has_index=Mock(side_effect=has_index),
supports_comments=True,
inline_comments=False,
- )
+ ),
+ _schema_translate_map=None,
)
def _mock_create_fixture(