text += ")"
return text
-def _get_index_rendered_expressions(idx, autogen_context=None):
+
++def _get_index_rendered_expressions(idx, autogen_context):
+ if compat.sqla_08:
+ return [repr(getattr(exp, "name", None))
+ if isinstance(exp, sql.schema.Column)
+ else _render_potential_expr(exp, autogen_context)
+ for exp in idx.expressions]
+ else:
+ return [repr(getattr(col, "name", None)) for col in idx.columns]
+
++
def _add_index(index, autogen_context):
"""
Generate Alembic operations for the CREATE INDEX of an
:class:`~sqlalchemy.schema.Index` instance.
"""
- from .compare import _get_index_column_names
+
- text = "%(prefix)screate_index(%(name)r, '%(table)s', %(columns)s, "\
+ text = "%(prefix)screate_index(%(name)r, '%(table)s', [%(columns)s], "\
- "unique=%(unique)r%(schema)s%(kwargs)s)" % {
- 'prefix': _alembic_autogenerate_prefix(autogen_context),
- 'name': _render_gen_name(autogen_context, index.name),
- 'table': index.table.name,
- 'columns': ", ".join(_get_index_rendered_expressions(index, autogen_context)),
- 'unique': index.unique or False,
- 'schema': (", schema='%s'" % index.table.schema) if index.table.schema else '',
- 'kwargs': (', '+', '.join(
- ["%s=%s" % (key, _render_potential_expr(val, autogen_context))
- for key, val in index.kwargs.items()]))\
+ "unique=%(unique)r%(schema)s%(kwargs)s)" % {
+ 'prefix': _alembic_autogenerate_prefix(autogen_context),
+ 'name': _render_gen_name(autogen_context, index.name),
+ 'table': index.table.name,
- 'columns': _get_index_column_names(index),
++ 'columns': ", ".join(
++ _get_index_rendered_expressions(index, autogen_context)),
+ 'unique': index.unique or False,
+ 'schema': (", schema='%s'" % index.table.schema)
+ if index.table.schema else '',
+ 'kwargs': (
+ ', ' +
+ ', '.join(
+ ["%s=%s" %
+ (key, _render_potential_expr(val, autogen_context))
+ for key, val in index.kwargs.items()]))
if len(index.kwargs) else ''
- }
+ }
return text
+
def _drop_index(index, autogen_context):
"""
Generate Alembic operations for the DROP INDEX of an
return rendered
if isinstance(default, sa_schema.DefaultClause):
- if isinstance(default.arg, string_types):
- default = default.arg
- else:
- default = str(default.arg.compile(
- dialect=autogen_context['dialect']))
- if isinstance(default, string_types):
- if repr_:
- default = re.sub(r"^'|'$", "", default)
- return repr(default)
- else:
- return default
- else:
- return None
+ default = _render_potential_expr(default.arg, autogen_context)
+
+ elif isinstance(default, string_types) and repr_:
+ default = repr(re.sub(r"^'|'$", "", default))
+
+ return default
+
def _repr_type(type_, autogen_context):
rendered = _user_defined_render("type", type_, autogen_context)
if rendered is not False:
import re
import sys
-from unittest import TestCase
+from alembic.testing import TestBase
-from sqlalchemy import MetaData, Column, Table, Integer, String, Text, \
- Numeric, CHAR, ForeignKey, DATETIME, INTEGER, \
- TypeDecorator, CheckConstraint, Unicode, Enum,\
+from sqlalchemy import MetaData, Column, Table, String, \
+ Numeric, CHAR, ForeignKey, DATETIME, Integer, \
+ CheckConstraint, Unicode, Enum,\
UniqueConstraint, Boolean, ForeignKeyConstraint,\
- PrimaryKeyConstraint, Index
+ PrimaryKeyConstraint, Index, func, text
++
from sqlalchemy.types import TIMESTAMP
from sqlalchemy.dialects import mysql, postgresql
- from sqlalchemy.sql import and_, column, literal_column
+ from sqlalchemy.sql import and_, column, literal_column, false
-from . import patch
+from alembic.testing.mock import patch
from alembic import autogenerate, util, compat
-from . import eq_, eq_ignore_whitespace, requires_092, requires_09, requires_094
+from alembic.testing import eq_, eq_ignore_whitespace, config
+
py3k = sys.version_info >= (3, )
else:
eq_ignore_whitespace(
autogenerate.render._add_index(idx, autogen_context),
- """op.create_index('foo_idx', 't', ['x', 'y'], unique=False, """
- """postgresql_where=sa.text('t.y = %(y_1)s'))"""
+ """op.create_index('foo_idx', 't', ['x', 'y'], \
+unique=False, """
+ """postgresql_where=sa.text('t.y = %(y_1)s'))"""
)
- # def test_render_add_index_func(self):
- # """
- # autogenerate.render._drop_index using func -- TODO: SQLA needs to
- # reflect expressions as well as columns
- # """
- # m = MetaData()
- # t = Table('test', m,
- # Column('id', Integer, primary_key=True),
- # Column('active', Boolean()),
- # Column('code', String(255)),
- # )
- # idx = Index(
- # 'test_active_lower_code_idx', t.c.active, func.lower(t.c.code))
- # eq_ignore_whitespace(
- # autogenerate.render._add_index(idx, self.autogen_context),
- # ""
- # )
+ def test_render_add_index_func(self):
+ m = MetaData()
- t = Table('test', m,
++ t = Table(
++ 'test', m,
+ Column('id', Integer, primary_key=True),
+ Column('code', String(255))
+ )
+ idx = Index('test_lower_code_idx', func.lower(t.c.code))
+ eq_ignore_whitespace(
+ autogenerate.render._add_index(idx, self.autogen_context),
+ "op.create_index('test_lower_code_idx', 'test', "
+ "[sa.text('lower(test.code)')], unique=False)"
+ )
+
+ def test_render_add_index_desc(self):
+ m = MetaData()
- t = Table('test', m,
++ t = Table(
++ 'test', m,
+ Column('id', Integer, primary_key=True),
+ Column('code', String(255))
+ )
+ idx = Index('test_desc_code_idx', t.c.code.desc())
+ eq_ignore_whitespace(
+ autogenerate.render._add_index(idx, self.autogen_context),
+ "op.create_index('test_desc_code_idx', 'test', "
+ "[sa.text('test.code DESC')], unique=False)"
+ )
-
def test_drop_index(self):
"""
autogenerate.render._drop_index