raise exc.CompileError("Postgresql ENUM type requires a name.")
name = self.quote(type_.name)
- effective_schema = self._get_effective_schema(type_)
+ effective_schema = self.schema_for_object(type_)
if not self.omit_schema and use_schema and \
effective_schema is not None:
column._postgresql_seq_name = seq_name = name
if column.table is not None:
- effective_schema = self.connection._get_effective_schema(
+ effective_schema = self.connection.schema_for_object(
column.table)
else:
effective_schema = None
import sys
from .. import exc, util, log, interfaces
from ..sql import util as sql_util
+from ..sql import schema
from .interfaces import Connectable, ExceptionContext
from .util import _distill_params
import contextlib
"""
- _schema_translate_map = None
+ schema_for_object = schema._schema_getter(None)
+ """Return the ".schema" attribute for an object.
+
+ Used for :class:`.Table`, :class:`.Sequence` and similar objects,
+ and takes into account
+ the :paramref:`.Connection.execution_options.schema_translate_map`
+ parameter.
+
+ .. versionadded:: 1.1
+
+ .. seealso::
+
+ :ref:`schema_translating`
+
+ """
def __init__(self, engine, connection=None, close_with_result=False,
_branch_from=None, _execution_options=None,
self.should_close_with_result = False
self.dispatch = _dispatch
self._has_events = _branch_from._has_events
- self._schema_translate_map = _branch_from._schema_translate_map
+ self.schema_for_object = _branch_from.schema_for_object
else:
self.__connection = connection \
if connection is not None else engine.raw_connection()
c.__dict__ = self.__dict__.copy()
return c
- def _get_effective_schema(self, table):
- effective_schema = table.schema
- if self._schema_translate_map:
- effective_schema = self._schema_translate_map.get(
- effective_schema, effective_schema)
- return effective_schema
-
def __enter__(self):
return self
compiled = ddl.compile(
dialect=dialect,
- schema_translate_map=self._schema_translate_map)
+ schema_translate_map=self.schema_for_object
+ if not self.schema_for_object.is_default else None)
ret = self._execute_context(
dialect,
dialect.execution_ctx_cls._init_ddl,
if 'compiled_cache' in self._execution_options:
key = (
dialect, elem, tuple(sorted(keys)),
- tuple(
- (k, self._schema_translate_map[k])
- for k in sorted(self._schema_translate_map)
- ) if self._schema_translate_map else None,
+ self.schema_for_object.hash_key,
len(distilled_params) > 1
)
compiled_sql = self._execution_options['compiled_cache'].get(key)
compiled_sql = elem.compile(
dialect=dialect, column_keys=keys,
inline=len(distilled_params) > 1,
- schema_translate_map=self._schema_translate_map
+ schema_translate_map=self.schema_for_object
+ if not self.schema_for_object.is_default else None
)
self._execution_options['compiled_cache'][key] = compiled_sql
else:
compiled_sql = elem.compile(
dialect=dialect, column_keys=keys,
inline=len(distilled_params) > 1,
- schema_translate_map=self._schema_translate_map)
+ schema_translate_map=self.schema_for_object
+ if not self.schema_for_object.is_default else None)
ret = self._execute_context(
dialect,
_has_events = False
_connection_cls = Connection
+ schema_for_object = schema._schema_getter(None)
+ """Return the ".schema" attribute for an object.
+
+ Used for :class:`.Table`, :class:`.Sequence` and similar objects,
+ and takes into account
+ the :paramref:`.Connection.execution_options.schema_translate_map`
+ parameter.
+
+ .. versionadded:: 1.1
+
+ .. seealso::
+
+ :ref:`schema_translating`
+
+ """
+
def __init__(self, pool, dialect, url,
logging_name=None, echo=None, proxy=None,
execution_options=None
import re
import random
from . import reflection, interfaces, result
-from ..sql import compiler, expression
+from ..sql import compiler, expression, schema
from .. import types as sqltypes
from .. import exc, util, pool, processors
import codecs
self._set_connection_isolation(connection, isolation_level)
if 'schema_translate_map' in opts:
+ getter = schema._schema_getter(opts['schema_translate_map'])
+ engine.schema_for_object = getter
+
@event.listens_for(engine, "engine_connect")
def set_schema_translate_map(connection, branch):
- connection._schema_translate_map = opts['schema_translate_map']
+ connection.schema_for_object = getter
def set_connection_execution_options(self, connection, opts):
if 'isolation_level' in opts:
self._set_connection_isolation(connection, opts['isolation_level'])
if 'schema_translate_map' in opts:
- connection._schema_translate_map = opts['schema_translate_map']
+ getter = schema._schema_getter(opts['schema_translate_map'])
+ connection.schema_for_object = getter
def _set_connection_isolation(self, connection, level):
if connection.in_transaction():
"""Define core interfaces used by the engine system."""
-from .. import util, event
+from .. import util
# backwards compat
from ..sql.compiler import Compiled, TypeCompiler
"""
dialect = self.bind.dialect
- with self.bind.connect() as conn:
- schema = conn._get_effective_schema(table)
+ schema = self.bind.schema_for_object(table)
table_name = table.name
from operator import attrgetter
from sqlalchemy.engine import base, threadlocal, url
-from sqlalchemy import util, exc, event
+from sqlalchemy import util, event
from sqlalchemy import pool as poollib
+from sqlalchemy.sql import schema
strategies = {}
dialect = property(attrgetter('_dialect'))
name = property(lambda s: s._dialect.name)
- def _get_effective_schema(self, table):
- return table.schema
+ schema_for_object = schema._schema_getter(None)
def contextual_connect(self, **kwargs):
return self
.. versionadded:: 1.1
+ .. seealso::
+
+ :ref:`schema_translating`
+
:param compile_kwargs: additional kwargs that will be
passed to the initial call to :meth:`.Compiled.process`.
if table is None or not include_table or not table.named_with_column:
return name
else:
-
- # inlining of preparer._get_effective_schema
- effective_schema = table.schema
- if self.preparer.schema_translate_map:
- effective_schema = self.preparer.schema_translate_map.get(
- effective_schema, effective_schema)
+ effective_schema = self.preparer.schema_for_object(table)
if effective_schema:
schema_prefix = self.preparer.quote_schema(
def visit_table(self, table, asfrom=False, iscrud=False, ashint=False,
fromhints=None, use_schema=True, **kwargs):
if asfrom or ashint:
-
- # inlining of preparer._get_effective_schema
- effective_schema = table.schema
- if self.preparer.schema_translate_map:
- effective_schema = self.preparer.schema_translate_map.get(
- effective_schema, effective_schema)
+ effective_schema = self.preparer.schema_for_object(table)
if use_schema and effective_schema:
ret = self.preparer.quote_schema(effective_schema) + \
def _prepared_index_name(self, index, include_schema=False):
if index.table is not None:
- effective_schema = self.preparer._get_effective_schema(index.table)
+ effective_schema = self.preparer.schema_for_object(index.table)
else:
effective_schema = None
if include_schema and effective_schema:
illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS
- schema_translate_map = util.immutabledict()
+ schema_for_object = schema._schema_getter(None)
def __init__(self, dialect, initial_quote='"',
final_quote=None, escape_quote='"', omit_schema=False):
def _with_schema_translate(self, schema_translate_map):
prep = self.__class__.__new__(self.__class__)
prep.__dict__.update(self.__dict__)
- prep.schema_translate_map = schema_translate_map
+ prep.schema_for_object = schema._schema_getter(schema_translate_map)
return prep
def _escape_identifier(self, value):
def format_sequence(self, sequence, use_schema=True):
name = self.quote(sequence.name)
- effective_schema = self._get_effective_schema(sequence)
+ effective_schema = self.schema_for_object(sequence)
if (not self.omit_schema and use_schema and
effective_schema is not None):
return None
return self.quote(constraint.name)
- def _get_effective_schema(self, table):
- effective_schema = table.schema
- if self.schema_translate_map:
- effective_schema = self.schema_translate_map.get(
- effective_schema, effective_schema)
- return effective_schema
-
def format_table(self, table, use_schema=True, name=None):
"""Prepare a quoted table and schema name."""
name = table.name
result = self.quote(name)
- effective_schema = self._get_effective_schema(table)
+ effective_schema = self.schema_for_object(table)
if not self.omit_schema and use_schema \
and effective_schema:
# ('database', 'owner', etc.) could override this and return
# a longer sequence.
- effective_schema = self._get_effective_schema(table)
+ effective_schema = self.schema_for_object(table)
if not self.omit_schema and use_schema and \
effective_schema:
def _can_create_table(self, table):
self.dialect.validate_identifier(table.name)
- effective_schema = self.connection._get_effective_schema(table)
+ effective_schema = self.connection.schema_for_object(table)
if effective_schema:
self.dialect.validate_identifier(effective_schema)
return not self.checkfirst or \
table.name, schema=effective_schema)
def _can_create_sequence(self, sequence):
- effective_schema = self.connection._get_effective_schema(sequence)
+ effective_schema = self.connection.schema_for_object(sequence)
return self.dialect.supports_sequences and \
(
def _can_drop_table(self, table):
self.dialect.validate_identifier(table.name)
- effective_schema = self.connection._get_effective_schema(table)
+ effective_schema = self.connection.schema_for_object(table)
if effective_schema:
self.dialect.validate_identifier(effective_schema)
return not self.checkfirst or self.dialect.has_table(
self.connection, table.name, schema=effective_schema)
def _can_drop_sequence(self, sequence):
- effective_schema = self.connection._get_effective_schema(sequence)
+ effective_schema = self.connection.schema_for_object(sequence)
return self.dialect.supports_sequences and \
((not self.dialect.sequences_optional or
not sequence.optional) and
"""
from __future__ import absolute_import
-import inspect
from .. import exc, util, event, inspection
from .base import SchemaEventTarget, DialectKWArgs
+import operator
from . import visitors
from . import type_api
from .base import _bind_or_error, ColumnCollection
-from .elements import ClauseElement, ColumnClause, _truncated_label, \
+from .elements import ClauseElement, ColumnClause, \
_as_truncated, TextClause, _literal_as_text,\
- ColumnElement, _find_columns, quoted_name
+ ColumnElement, quoted_name
from .selectable import TableClause
import collections
import sqlalchemy
from . import ddl
-import types
RETAIN_SCHEMA = util.symbol('retain_schema')
for e in self.__engines.values():
if hasattr(e, 'dispose'):
e.dispose()
+
+
+class _SchemaTranslateMap(object):
+ """Provide translation of schema names based on a mapping.
+
+ Also provides helpers for producing cache keys and optimized
+ access when no mapping is present.
+
+ Used by the :paramref:`.Connection.execution_options.schema_translate_map`
+ feature.
+
+ .. versionadded:: 1.1
+
+
+ """
+ __slots__ = 'map_', '__call__', 'hash_key', 'is_default'
+
+ _default_schema_getter = operator.attrgetter("schema")
+
+ def __init__(self, map_):
+ self.map_ = map_
+ if map_ is not None:
+ def schema_for_object(obj):
+ effective_schema = self._default_schema_getter(obj)
+ effective_schema = map_.get(effective_schema, effective_schema)
+ return effective_schema
+ self.__call__ = schema_for_object
+ self.hash_key = ";".join(
+ "%s=%s" % (k, map_[k])
+ for k in sorted(map_)
+ )
+ self.is_default = False
+ else:
+ self.hash_key = 0
+ self.__call__ = self._default_schema_getter
+ self.is_default = True
+
+ @classmethod
+ def _schema_getter(cls, map_):
+ if map_ is None:
+ return _default_schema_map
+ elif isinstance(map_, _SchemaTranslateMap):
+ return map_
+ else:
+ return _SchemaTranslateMap(map_)
+
+_default_schema_map = _SchemaTranslateMap(None)
+_schema_getter = _SchemaTranslateMap._schema_getter
+
context.compiled.statement.compile(
dialect=compare_dialect,
schema_translate_map=context.
- compiled.preparer.schema_translate_map)
+ execution_options.get('schema_translate_map'))
else:
compiled = (
context.compiled.statement.compile(
column_keys=context.compiled.column_keys,
inline=context.compiled.inline,
schema_translate_map=context.
- compiled.preparer.schema_translate_map)
+ execution_options.get('schema_translate_map'))
)
_received_statement = re.sub(r'[\n\t]', '', util.text_type(compiled))
parameters = execute_observed.parameters