self.name = name
def __repr__(self):
- return "%sf(%r)" % (self.prefix, self.name)
+ return "%sf(%r)" % (self.prefix, _ident(self.name))
+
+
+def _ident(name):
+ """produce a __repr__() object for a string identifier that may
+ use quoted_name() in SQLAlchemy 0.9 and greater.
+
+ The issue worked around here is that quoted_name() doesn't have
+ very good repr() behavior by itself when unicode is involved.
+
+ """
+ if name is None:
+ return name
+ elif compat.sqla_09 and isinstance(name, sql.elements.quoted_name):
+ if compat.py2k:
+ # the attempt to encode to ascii here isn't super ideal,
+ # however we are trying to cut down on an explosion of
+ # u'' literals only when py2k + SQLA 0.9, in particular
+ # makes unit tests testing code generation very difficult
+ try:
+ return name.encode('ascii')
+ except UnicodeError:
+ return compat.text_type(name)
+ else:
+ return compat.text_type(name)
+ elif isinstance(name, compat.string_types):
+ return name
def _render_potential_expr(value, autogen_context, wrap_in_text=True):
return template % {
"prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "sql": str(
+ "sql": compat.text_type(
value.compile(dialect=autogen_context['dialect'],
**compile_kw)
)
args = ',\n'.join(args)
text = "%(prefix)screate_table(%(tablename)r,\n%(args)s" % {
- 'tablename': table.name,
+ 'tablename': _ident(table.name),
'prefix': _alembic_autogenerate_prefix(autogen_context),
'args': args,
}
if table.schema:
- text += ",\nschema=%r" % table.schema
+ text += ",\nschema=%r" % _ident(table.schema)
for k in sorted(table.kwargs):
text += ",\n%s=%r" % (k.replace(" ", "_"), table.kwargs[k])
text += "\n)"
def _drop_table(table, autogen_context):
text = "%(prefix)sdrop_table(%(tname)r" % {
"prefix": _alembic_autogenerate_prefix(autogen_context),
- "tname": table.name
+ "tname": _ident(table.name)
}
if table.schema:
- text += ", schema=%r" % table.schema
+ text += ", schema=%r" % _ident(table.schema)
text += ")"
return text
def _get_index_rendered_expressions(idx, autogen_context):
if compat.sqla_08:
- return [repr(getattr(exp, "name", None))
+ return [repr(_ident(getattr(exp, "name", None)))
if isinstance(exp, sa_schema.Column)
else _render_potential_expr(exp, autogen_context)
for exp in idx.expressions]
else:
- return [repr(getattr(col, "name", None)) for col in idx.columns]
+ return [
+ repr(_ident(getattr(col, "name", None))) for col in idx.columns]
def _add_index(index, autogen_context):
:class:`~sqlalchemy.schema.Index` instance.
"""
- text = "%(prefix)screate_index(%(name)r, '%(table)s', [%(columns)s], "\
+ text = "%(prefix)screate_index(%(name)r, %(table)r, [%(columns)s], "\
"unique=%(unique)r%(schema)s%(kwargs)s)" % {
'prefix': _alembic_autogenerate_prefix(autogen_context),
'name': _render_gen_name(autogen_context, index.name),
- 'table': index.table.name,
+ 'table': _ident(index.table.name),
'columns': ", ".join(
_get_index_rendered_expressions(index, autogen_context)),
'unique': index.unique or False,
- 'schema': (", schema='%s'" % index.table.schema)
+ 'schema': (", schema=%r" % _ident(index.table.schema))
if index.table.schema else '',
'kwargs': (
', ' +
:class:`~sqlalchemy.schema.Index` instance.
"""
text = "%(prefix)sdrop_index(%(name)r, "\
- "table_name='%(table_name)s'%(schema)s)" % {
+ "table_name=%(table_name)r%(schema)s)" % {
'prefix': _alembic_autogenerate_prefix(autogen_context),
'name': _render_gen_name(autogen_context, index.name),
- 'table_name': index.table.name,
- 'schema': ((", schema='%s'" % index.table.schema)
+ 'table_name': _ident(index.table.name),
+ 'schema': ((", schema=%r" % _ident(index.table.schema))
if index.table.schema else '')
}
return text
if constraint.initially:
opts.append(("initially", str(constraint.initially)))
if not has_batch and alter and constraint.table.schema:
- opts.append(("schema", str(constraint.table.schema)))
+ opts.append(("schema", _ident(constraint.table.schema)))
if not alter and constraint.name:
opts.append(
- ("name", _render_gen_name(autogen_context, constraint.name)))
+ ("name",
+ _render_gen_name(autogen_context, constraint.name)))
if alter:
- args = [repr(_render_gen_name(autogen_context, constraint.name))]
+ args = [
+ repr(_render_gen_name(autogen_context, constraint.name))]
if not has_batch:
- args += [repr(constraint.table.name)]
- args.append(repr([col.name for col in constraint.columns]))
+ args += [repr(_ident(constraint.table.name))]
+ args.append(repr([_ident(col.name) for col in constraint.columns]))
args.extend(["%s=%r" % (k, v) for k, v in opts])
return "%(prefix)screate_unique_constraint(%(args)s)" % {
'prefix': _alembic_autogenerate_prefix(autogen_context),
'args': ", ".join(args)
}
else:
- args = [repr(col.name) for col in constraint.columns]
+ args = [repr(_ident(col.name)) for col in constraint.columns]
args.extend(["%s=%r" % (k, v) for k, v in opts])
return "%(prefix)sUniqueConstraint(%(args)s)" % {
"prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
text = template % {
'prefix': _alembic_autogenerate_prefix(autogen_context),
'name': _render_gen_name(autogen_context, constraint.name),
- 'table_name': constraint.table.name,
- 'schema': (", schema='%s'" % constraint.table.schema)
+ 'table_name': _ident(constraint.table.name),
+ 'schema': (", schema='%s'" % _ident(constraint.table.schema))
if constraint.table.schema else '',
}
return text
text = template % {
"prefix": _alembic_autogenerate_prefix(autogen_context),
- "tname": tname,
- "cname": column.name,
- "schema": schema
+ "tname": _ident(tname),
+ "cname": _ident(column.name),
+ "schema": _ident(schema)
}
return text
# TODO: for non-ascii colname, assign a "key"
return "%(prefix)sColumn(%(name)r, %(type)s, %(kw)s)" % {
'prefix': _sqlalchemy_autogenerate_prefix(autogen_context),
- 'name': column.name,
+ 'name': _ident(column.name),
'type': _repr_type(column.type, autogen_context),
'kw': ", ".join(["%s=%s" % (kwname, val) for kwname, val in opts])
}
"[%(refcols)s], %(args)s)" % {
"prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
"cols": ", ".join(
- "'%s'" % f.parent.key for f in constraint.elements),
+ "%r" % f.parent.key for f in constraint.elements),
"refcols": ", ".join(repr(_fk_colspec(f, apply_metadata_schema))
for f in constraint.elements),
"args": ", ".join(
opts.append(
(
"name",
- repr(_render_gen_name(autogen_context, constraint.name))
+ repr(
+ _render_gen_name(autogen_context, constraint.name))
)
)
return "%(prefix)sCheckConstraint(%(sqltext)s%(opts)s)" % {
autogenerate.render._add_index(idx, autogen_context),
"""op.create_index('foo_idx', 't', \
['x', 'y'], unique=False, """
- """postgresql_where=sa.text("t.y = 'something'"))"""
+ """postgresql_where=sa.text(!U"t.y = 'something'"))"""
)
else:
eq_ignore_whitespace(
autogenerate.render._add_index(idx, autogen_context),
"""op.create_index('foo_idx', 't', ['x', 'y'], \
unique=False, """
- """postgresql_where=sa.text('t.y = %(y_1)s'))"""
+ """postgresql_where=sa.text(!U't.y = %(y_1)s'))"""
)
@config.requirements.fail_before_sqla_080
eq_ignore_whitespace(
autogenerate.render._add_index(idx, self.autogen_context),
"op.create_index('test_lower_code_idx', 'test', "
- "[sa.text('lower(test.code)')], unique=False)"
+ "[sa.text(!U'lower(test.code)')], unique=False)"
)
@config.requirements.fail_before_sqla_080
eq_ignore_whitespace(
autogenerate.render._add_index(idx, self.autogen_context),
"op.create_index('test_lower_code_idx', 'test', "
- "[sa.text('CAST(test.code AS CHAR)')], unique=False)"
+ "[sa.text(!U'CAST(test.code AS CHAR)')], unique=False)"
)
@config.requirements.fail_before_sqla_080
eq_ignore_whitespace(
autogenerate.render._add_index(idx, self.autogen_context),
"op.create_index('test_desc_code_idx', 'test', "
- "[sa.text('test.code DESC')], unique=False)"
+ "[sa.text(!U'test.code DESC')], unique=False)"
)
def test_drop_index(self):
")"
)
+ def test_render_table_w_unicode_name(self):
+ m = MetaData()
+ t = Table(compat.ue('\u0411\u0435\u0437'), m,
+ Column('id', Integer, primary_key=True),
+ )
+ eq_ignore_whitespace(
+ autogenerate.render._add_table(t, self.autogen_context),
+ "op.create_table(%r,"
+ "sa.Column('id', sa.Integer(), nullable=False),"
+ "sa.PrimaryKeyConstraint('id'))" % compat.ue('\u0411\u0435\u0437')
+ )
+
+ def test_render_table_w_unicode_schema(self):
+ m = MetaData()
+ t = Table('test', m,
+ Column('id', Integer, primary_key=True),
+ schema=compat.ue('\u0411\u0435\u0437')
+ )
+ eq_ignore_whitespace(
+ autogenerate.render._add_table(t, self.autogen_context),
+ "op.create_table('test',"
+ "sa.Column('id', sa.Integer(), nullable=False),"
+ "sa.PrimaryKeyConstraint('id'),"
+ "schema=%r)" % compat.ue('\u0411\u0435\u0437')
+ )
+
@patch("alembic.autogenerate.render.MAX_PYTHON_ARGS", 3)
def test_render_table_max_cols(self):
m = MetaData()
eq_ignore_whitespace(
re.sub(
r"u'", "'",
- autogenerate.render._add_table(t, self.autogen_context)),
+ autogenerate.render._add_table(t, self.autogen_context)
+ ),
"op.create_table('test',"
"sa.Column('id', sa.Integer(), nullable=False),"
"sa.Column('q', sa.Integer(), nullable=True),"
)
def test_render_drop_table(self):
- eq_(
+ eq_ignore_whitespace(
autogenerate.render._drop_table(Table("sometable", MetaData()),
self.autogen_context),
"op.drop_table('sometable')"
)
def test_render_drop_table_w_schema(self):
- eq_(
+ eq_ignore_whitespace(
autogenerate.render._drop_table(
Table("sometable", MetaData(), schema='foo'),
self.autogen_context),
eq_ignore_whitespace(
autogenerate.render._add_table(t1, self.autogen_context),
- "op.create_table('t1',sa.Column('x', sa.Integer(), nullable=True))"
+ "op.create_table('t1',"
+ "sa.Column('x', sa.Integer(), nullable=True))"
)
eq_ignore_whitespace(
)
def test_render_add_column(self):
- eq_(
+ eq_ignore_whitespace(
autogenerate.render._add_column(
None, "foo", Column("x", Integer, server_default="5"),
self.autogen_context),
)
def test_render_add_column_w_schema(self):
- eq_(
+ eq_ignore_whitespace(
autogenerate.render._add_column(
"foo", "bar", Column("x", Integer, server_default="5"),
self.autogen_context),
)
def test_render_drop_column(self):
- eq_(
+ eq_ignore_whitespace(
autogenerate.render._drop_column(
None, "foo", Column("x", Integer, server_default="5"),
self.autogen_context),
)
def test_render_drop_column_w_schema(self):
- eq_(
+ eq_ignore_whitespace(
autogenerate.render._drop_column(
"foo", "bar", Column("x", Integer, server_default="5"),
self.autogen_context),
'"nextval(\'group_to_perm_group_to_perm_id_seq\'::regclass)"'
)
+ def test_render_unicode_server_default(self):
+ default = compat.ue(
+ '\u0411\u0435\u0437 '
+ '\u043d\u0430\u0437\u0432\u0430\u043d\u0438\u044f'
+ )
+
+ c = Column(
+ 'x', Unicode,
+ server_default=text(default)
+ )
+
+ eq_ignore_whitespace(
+ autogenerate.render._render_server_default(
+ c.server_default, self.autogen_context
+ ),
+ "sa.text(%r)" % default
+ )
+
def test_render_col_with_server_default(self):
c = Column('updated_at', TIMESTAMP(),
server_default='TIMEZONE("utc", CURRENT_TIMESTAMP)',
result = autogenerate.render._render_column(
c, self.autogen_context
)
- eq_(
+ eq_ignore_whitespace(
result,
'sa.Column(\'updated_at\', sa.TIMESTAMP(), '
'server_default=\'TIMEZONE("utc", CURRENT_TIMESTAMP)\', '
result = autogenerate.render._render_column(
c, self.autogen_context
)
- eq_(
+ eq_ignore_whitespace(
result,
'sa.Column(\'some_key\', sa.Integer(), '
'autoincrement=False, '
result = autogenerate.render._add_table(
t, autogen_context
)
- eq_(
- result, """sa.create_table('t',
-col(x),
-render:primary_key\n)"""
+ eq_ignore_whitespace(
+ result,
+ "sa.create_table('t',"
+ "col(x),"
+ "render:primary_key)"
)
def test_render_modify_type(self):
if not util.sqla_08:
t1.append_constraint(fk)
+ # SQLA 0.9 generates a u'' here for remote cols while 0.8 does not,
+ # so just whack out "'u" here from the generated
+
eq_ignore_whitespace(
re.sub(
r"u'", "'",
re.sub(
r"u'", "'",
autogenerate.render._render_constraint(
- fk, self.autogen_context)),
+ fk, self.autogen_context),
+ ),
"sa.ForeignKeyConstraint(['c'], ['t2.c_rem'], deferrable=True)"
)
re.sub(
r"u'", "'",
autogenerate.render._render_constraint(
- fk, self.autogen_context)),
+ fk, self.autogen_context)
+ ),
"sa.ForeignKeyConstraint(['c'], ['t2.c_rem'], initially='XYZ')"
)
re.sub(
r"u'", "'",
autogenerate.render._render_constraint(
- fk, self.autogen_context)),
+ fk, self.autogen_context)
+ ),
"sa.ForeignKeyConstraint(['c'], ['foo.t2.c_rem'], "
"onupdate='CASCADE')"
)
CheckConstraint("im a constraint", name='cc1'),
self.autogen_context
),
- "sa.CheckConstraint('im a constraint', name='cc1')"
+ "sa.CheckConstraint(!U'im a constraint', name='cc1')"
)
def test_render_check_constraint_sqlexpr(self):
CheckConstraint(and_(c > five, c < ten)),
self.autogen_context
),
- "sa.CheckConstraint('c > 5 AND c < 10')"
+ "sa.CheckConstraint(!U'c > 5 AND c < 10')"
)
@config.requirements.fail_before_sqla_080
CheckConstraint(and_(c > 5, c < 10)),
self.autogen_context
),
- "sa.CheckConstraint('c > 5 AND c < 10')"
+ "sa.CheckConstraint(!U'c > 5 AND c < 10')"
)
def test_render_unique_constraint_opts(self):
"sa.UniqueConstraint('c', deferrable='XYZ', name='uq_1')"
)
+ def test_add_unique_constraint_unicode_schema(self):
+ m = MetaData()
+ t = Table(
+ 't', m, Column('c', Integer),
+ schema=compat.ue('\u0411\u0435\u0437')
+ )
+ eq_ignore_whitespace(
+ autogenerate.render._add_unique_constraint(
+ UniqueConstraint(t.c.c),
+ self.autogen_context
+ ),
+ "op.create_unique_constraint(None, 't', ['c'], "
+ "schema=%r)" % compat.ue('\u0411\u0435\u0437')
+ )
+
def test_render_modify_nullable_w_default(self):
eq_ignore_whitespace(
autogenerate.render._modify_col(
result = autogenerate.render._render_column(
c, self.autogen_context
)
- eq_(
+ eq_ignore_whitespace(
result,
'sa.Column(\'updated_at\', sa.TIMESTAMP(), '
- 'server_default=sa.text(\'now()\'), '
+ 'server_default=sa.text(!U\'now()\'), '
'nullable=False)'
)
result = autogenerate.render._render_column(
c, autogen_context,
)
- eq_(
+ eq_ignore_whitespace(
result,
'sa.Column(\'updated_at\', sa.Boolean(), '
- 'server_default=sa.text(\'false\'), '
+ 'server_default=sa.text(!U\'false\'), '
'nullable=False)'
)
result = autogenerate.render._render_column(
c, autogen_context
)
- eq_(
+ eq_ignore_whitespace(
result,
'sa.Column(\'updated_at\', sa.Boolean(), '
- 'server_default=sa.text(\'0\'), '
+ 'server_default=sa.text(!U\'0\'), '
'nullable=False)'
)
result = autogenerate.render._render_column(
c, self.autogen_context
)
- eq_(
+ eq_ignore_whitespace(
result,
'sa.Column(\'updated_at\', sa.TIMESTAMP(), '
- 'server_default=sa.text(\'now()\'), '
+ 'server_default=sa.text(!U\'now()\'), '
'nullable=False)'
)
nullable=True),
"op.alter_column('sometable', 'somecolumn', "
"existing_type=sa.Integer(), nullable=True, "
- "existing_server_default=sa.text('5'))"
+ "existing_server_default=sa.text(!U'5'))"
)
-
class RenderNamingConventionTest(TestBase):
__requires__ = ('sqlalchemy_094',)
eq_ignore_whitespace(
autogenerate.render._add_table(t, self.autogen_context),
"op.create_table('t',sa.Column('c', sa.Integer(), nullable=True),"
- "sa.CheckConstraint('c > 5', name=op.f('ck_ct_t')))"
+ "sa.CheckConstraint(!U'c > 5', name=op.f('ck_ct_t')))"
)
def test_inline_fk(self):
ck,
self.autogen_context
),
- "sa.CheckConstraint('im a constraint', name=op.f('ck_t_cc1'))"
+ "sa.CheckConstraint(!U'im a constraint', name=op.f('ck_t_cc1'))"
)