correct_for_autogen_constraints(
conn_uniques, conn_indexes,
metadata_unique_constraints,
- metadata_indexes
- )
+ metadata_indexes)
# 4. organize the constraints into "signature" collections, the
# _constraint_sig() objects provide a consistent facade over both
try:
return _dialects[name]
except KeyError:
- dialect_mod = getattr(__import__('sqlalchemy.dialects.%s' % name).dialects, name)
+ dialect_mod = getattr(
+ __import__('sqlalchemy.dialects.%s' % name).dialects, name)
_dialects[name] = d = dialect_mod.dialect()
if name == 'postgresql':
d.implicit_returning = True
def _no_sql_testing_config(dialect="postgresql", directives=""):
- """use a postgresql url with no host so that connections guaranteed to fail"""
+ """use a postgresql url with no host so that
+ connections guaranteed to fail"""
dir_ = os.path.join(staging_directory, 'scripts')
return _write_config_file("""
[alembic]
shutil.rmtree(staging_directory, True)
-def write_script(scriptdir, rev_id, content, encoding='ascii', sourceless=False):
+def write_script(
+ scriptdir, rev_id, content, encoding='ascii', sourceless=False):
old = scriptdir._revision_map[rev_id]
path = old.path
UniqueConstraint('order_id', 'user_id',
name='order_order_id_user_id_unique'
),
- Index('order_user_id_amount_idx', 'user_id', 'amount', unique=True),
+ Index(
+ 'order_user_id_amount_idx', 'user_id',
+ 'amount', unique=True),
)
diffs = self._fixture(m1, m2)
Table('nothing_changed_related', m1,
Column('id1', Integer),
Column('id2', Integer),
- ForeignKeyConstraint(['id1', 'id2'],
- ['nothing_changed.id1', 'nothing_changed.id2']),
+ ForeignKeyConstraint(
+ ['id1', 'id2'],
+ ['nothing_changed.id1', 'nothing_changed.id2']),
mysql_engine='InnoDB'
)
Table('nothing_changed_related', m2,
Column('id1', Integer),
Column('id2', Integer),
- ForeignKeyConstraint(['id1', 'id2'],
- ['nothing_changed.id1', 'nothing_changed.id2']),
+ ForeignKeyConstraint(
+ ['id1', 'id2'],
+ ['nothing_changed.id1', 'nothing_changed.id2']),
mysql_engine='InnoDB'
)
eq_(diffs[0][0], "add_table")
eq_(len(diffs), 1)
- assert UniqueConstraint in set(type(c) for c in diffs[0][1].constraints)
+ assert UniqueConstraint in set(
+ type(c) for c in diffs[0][1].constraints)
def test_add_uq_ix_on_table_create(self):
m1 = MetaData()
eq_(diffs[0][0], "add_table")
eq_(len(diffs), 2)
- assert UniqueConstraint not in set(type(c) for c in diffs[0][1].constraints)
+ assert UniqueConstraint not in set(
+ type(c) for c in diffs[0][1].constraints)
eq_(diffs[1][0], "add_index")
eq_(diffs[1][1].unique, True)
eq_(diffs[0][0], "add_table")
eq_(len(diffs), 2)
- assert UniqueConstraint not in set(type(c) for c in diffs[0][1].constraints)
+ assert UniqueConstraint not in set(
+ type(c) for c in diffs[0][1].constraints)
eq_(diffs[1][0], "add_index")
eq_(diffs[1][1].unique, False)
from . import patch
from alembic import autogenerate, util, compat
-from . import eq_, eq_ignore_whitespace, requires_092, requires_09, requires_094
+from . import eq_, eq_ignore_whitespace, requires_092, \
+ requires_09, requires_094
py3k = sys.version_info >= (3, )
if compat.sqla_08:
eq_ignore_whitespace(
autogenerate.render._add_index(idx, autogen_context),
- """op.create_index('foo_idx', 't', ['x', 'y'], unique=False, """
+ """op.create_index('foo_idx', 't', \
+['x', 'y'], unique=False, """
"""postgresql_where=sa.text("t.y = 'something'"))"""
)
else:
eq_ignore_whitespace(
autogenerate.render._add_index(idx, autogen_context),
- """op.create_index('foo_idx', 't', ['x', 'y'], unique=False, """
+ """op.create_index('foo_idx', 't', ['x', 'y'], \
+unique=False, """
"""postgresql_where=sa.text('t.y = %(y_1)s'))"""
)
# Column('active', Boolean()),
# Column('code', String(255)),
# )
- # idx = Index('test_active_lower_code_idx', t.c.active, func.lower(t.c.code))
+ # idx = Index(
+ # 'test_active_lower_code_idx', t.c.active, func.lower(t.c.code))
# eq_ignore_whitespace(
# autogenerate.render._add_index(idx, self.autogen_context),
# ""
)
uq = UniqueConstraint(t.c.code, name='uq_test_code')
eq_ignore_whitespace(
- autogenerate.render._add_unique_constraint(uq, self.autogen_context),
+ autogenerate.render._add_unique_constraint(
+ uq, self.autogen_context),
"op.create_unique_constraint('uq_test_code', 'test', ['code'])"
)
)
uq = UniqueConstraint(t.c.code, name='uq_test_code')
eq_ignore_whitespace(
- autogenerate.render._add_unique_constraint(uq, self.autogen_context),
- "op.create_unique_constraint('uq_test_code', 'test', ['code'], schema='CamelSchema')"
+ autogenerate.render._add_unique_constraint(
+ uq, self.autogen_context),
+ "op.create_unique_constraint('uq_test_code', 'test', "
+ "['code'], schema='CamelSchema')"
)
def test_drop_constraint(self):
"sa.Column('timestamp', sa.DATETIME(), "
"server_default='NOW()', "
"nullable=True),"
- "sa.Column('amount', sa.Numeric(precision=5, scale=2), nullable=True),"
+ "sa.Column('amount', sa.Numeric(precision=5, scale=2), "
+ "nullable=True),"
"sa.ForeignKeyConstraint(['address_id'], ['address.id'], ),"
"sa.PrimaryKeyConstraint('id'),"
"sa.UniqueConstraint('name', name='uq_name'),"
Column('q', Integer, ForeignKey('address.id')),
)
eq_ignore_whitespace(
- re.sub(r"u'", "'", autogenerate.render._add_table(t, self.autogen_context)),
+ re.sub(
+ r"u'", "'",
+ autogenerate.render._add_table(t, self.autogen_context)),
"op.create_table('test',"
"sa.Column('id', sa.Integer(), nullable=False),"
"sa.Column('q', sa.Integer(), nullable=True),"
eq_ignore_whitespace(
autogenerate.render._add_table(t, self.autogen_context),
- "op.create_table('test',sa.Column('x', sa.Boolean(), nullable=True))"
+ "op.create_table('test',"
+ "sa.Column('x', sa.Boolean(), nullable=True))"
)
def test_render_empty_pk_vs_nonempty_pk(self):
t1.append_constraint(fk)
eq_ignore_whitespace(
- re.sub(r"u'", "'", autogenerate.render._render_constraint(fk, self.autogen_context)),
+ re.sub(
+ r"u'", "'",
+ autogenerate.render._render_constraint(
+ fk, self.autogen_context)),
"sa.ForeignKeyConstraint(['c'], ['t2.c_rem'], onupdate='CASCADE')"
)
t1.append_constraint(fk)
eq_ignore_whitespace(
- re.sub(r"u'", "'", autogenerate.render._render_constraint(fk, self.autogen_context)),
+ re.sub(
+ r"u'", "'",
+ autogenerate.render._render_constraint(
+ fk, self.autogen_context)),
"sa.ForeignKeyConstraint(['c'], ['t2.c_rem'], ondelete='CASCADE')"
)
if not util.sqla_08:
t1.append_constraint(fk)
eq_ignore_whitespace(
- re.sub(r"u'", "'", autogenerate.render._render_constraint(fk, self.autogen_context)),
+ re.sub(
+ r"u'", "'",
+ autogenerate.render._render_constraint(
+ fk, self.autogen_context)),
"sa.ForeignKeyConstraint(['c'], ['t2.c_rem'], deferrable=True)"
)
if not util.sqla_08:
t1.append_constraint(fk)
eq_ignore_whitespace(
- re.sub(r"u'", "'", autogenerate.render._render_constraint(fk, self.autogen_context)),
+ re.sub(
+ r"u'", "'",
+ autogenerate.render._render_constraint(
+ fk, self.autogen_context)),
"sa.ForeignKeyConstraint(['c'], ['t2.c_rem'], initially='XYZ')"
)
def test_render_fk_constraint_use_alter(self):
m = MetaData()
Table('t', m, Column('c', Integer))
- t2 = Table('t2', m, Column('c_rem', Integer,
- ForeignKey('t.c', name="fk1", use_alter=True)))
+ t2 = Table(
+ 't2', m,
+ Column(
+ 'c_rem', Integer,
+ ForeignKey('t.c', name="fk1", use_alter=True)))
const = list(t2.foreign_keys)[0].constraint
eq_ignore_whitespace(
- autogenerate.render._render_constraint(const, self.autogen_context),
+ autogenerate.render._render_constraint(
+ const, self.autogen_context),
"sa.ForeignKeyConstraint(['c_rem'], ['t.c'], "
"name='fk1', use_alter=True)"
)
t1.append_constraint(fk)
eq_ignore_whitespace(
- re.sub(r"u'", "'", autogenerate.render._render_constraint(fk, self.autogen_context)),
- "sa.ForeignKeyConstraint(['c'], ['foo.t2.c_rem'], onupdate='CASCADE')"
+ re.sub(
+ r"u'", "'",
+ autogenerate.render._render_constraint(
+ fk, self.autogen_context)),
+ "sa.ForeignKeyConstraint(['c'], ['foo.t2.c_rem'], "
+ "onupdate='CASCADE')"
)
def test_render_check_constraint_literal(self):
"ix": 'ix_%(custom)s_%(column_0_label)s',
"uq": "uq_%(custom)s_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(custom)s_%(table_name)s",
- "fk": "fk_%(custom)s_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
+ "fk": "fk_%(custom)s_%(table_name)s_"
+ "%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(custom)s_%(table_name)s",
"custom": lambda const, table: "ct"
}
UniqueConstraint(t.c.c, deferrable='XYZ'),
self.autogen_context
),
- "sa.UniqueConstraint('c', deferrable='XYZ', name=op.f('uq_ct_t_c'))"
+ "sa.UniqueConstraint('c', deferrable='XYZ', "
+ "name=op.f('uq_ct_t_c'))"
)
def test_explicit_named_unique_constraint(self):
)
def test_inline_ck_constraint(self):
- t = Table('t', self.metadata, Column('c', Integer), CheckConstraint("c > 5"))
+ t = Table(
+ 't', self.metadata, Column('c', Integer), CheckConstraint("c > 5"))
eq_ignore_whitespace(
autogenerate.render._add_table(t, self.autogen_context),
"op.create_table('t',sa.Column('c', sa.Integer(), nullable=True),"
eq_ignore_whitespace(
autogenerate.render._add_table(t, self.autogen_context),
"op.create_table('t',sa.Column('c', sa.Integer(), nullable=True),"
- "sa.ForeignKeyConstraint(['c'], ['q.id'], name=op.f('fk_ct_t_c_q')))"
+ "sa.ForeignKeyConstraint(['c'], ['q.id'], "
+ "name=op.f('fk_ct_t_c_q')))"
)
def test_render_check_constraint_renamed(self):
used.
"""
- m1 = MetaData(naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"})
+ m1 = MetaData(naming_convention={
+ "ck": "ck_%(table_name)s_%(constraint_name)s"})
ck = CheckConstraint("im a constraint", name="cc1")
Table('t', m1, Column('x'), ck)
'context': context
}
diffs = []
- autogenerate._produce_net_changes(connection, model_metadata, diffs,
- autogen_context,
- object_filters=_default_object_filters,
- include_schemas=include_schemas
- )
+ autogenerate._produce_net_changes(
+ connection, model_metadata, diffs,
+ autogen_context,
+ object_filters=_default_object_filters,
+ include_schemas=include_schemas
+ )
return diffs
reports_unnamed_constraints = False
metadata = self.m2
connection = self.context.bind
diffs = []
- autogenerate._produce_net_changes(connection, metadata, diffs,
- self.autogen_context,
- object_filters=_default_object_filters,
- )
+ autogenerate._produce_net_changes(
+ connection, metadata, diffs,
+ self.autogen_context,
+ object_filters=_default_object_filters,
+ )
eq_(
diffs[0],
"""test a full render including indentation"""
template_args = {}
- autogenerate._produce_migration_diffs(self.context, template_args, set())
+ autogenerate._produce_migration_diffs(
+ self.context, template_args, set())
eq_(re.sub(r"u'", "'", template_args['upgrades']),
"""### commands auto generated by Alembic - please adjust! ###
sa.PrimaryKeyConstraint('id')
)
op.drop_table('extra')
- op.add_column('address', sa.Column('street', sa.String(length=50), nullable=True))
+ op.add_column('address', sa.Column('street', sa.String(length=50), \
+nullable=True))
op.add_column('order', sa.Column('user_id', sa.Integer(), nullable=True))
op.alter_column('order', 'amount',
existing_type=sa.NUMERIC(precision=8, scale=2),
existing_type=sa.TEXT(),
server_default=None,
existing_nullable=True)
- op.add_column('user', sa.Column('pw', sa.VARCHAR(length=50), nullable=True))
+ op.add_column('user', sa.Column('pw', sa.VARCHAR(length=50), \
+nullable=True))
op.alter_column('order', 'amount',
existing_type=sa.Numeric(precision=10, scale=2),
type_=sa.NUMERIC(precision=8, scale=2),
)
template_args = {}
autogenerate._produce_migration_diffs(context, template_args, set())
- template_args['upgrades'] = template_args['upgrades'].replace("u'", "'")
+ template_args['upgrades'] = \
+ template_args['upgrades'].replace("u'", "'")
template_args['downgrades'] = template_args['downgrades'].\
replace("u'", "'")
assert "alter_column('user'" not in template_args['upgrades']
template_args = {}
autogenerate._produce_migration_diffs(context, template_args, set())
- template_args['upgrades'] = template_args['upgrades'].replace("u'", "'")
+ template_args['upgrades'] = \
+ template_args['upgrades'].replace("u'", "'")
template_args['downgrades'] = template_args['downgrades'].\
replace("u'", "'")
assert "op.create_table('item'" not in template_args['upgrades']
return dialect.type_descriptor(CHAR(32))
diff = []
- autogenerate.compare._compare_type(None, "sometable", "somecol",
- Column("somecol", Integer, nullable=True),
- Column("somecol", MyType()),
- diff, self.autogen_context
- )
+ autogenerate.compare._compare_type(
+ None, "sometable", "somecol",
+ Column("somecol", Integer, nullable=True),
+ Column("somecol", MyType()),
+ diff, self.autogen_context
+ )
assert not diff
def test_dont_barf_on_already_reflected(self):
metadata = self.m2
connection = self.context.bind
diffs = []
- autogenerate._produce_net_changes(connection, metadata, diffs,
- self.autogen_context,
- object_filters=_default_object_filters,
- include_schemas=True
- )
+ autogenerate._produce_net_changes(
+ connection, metadata, diffs,
+ self.autogen_context,
+ object_filters=_default_object_filters,
+ include_schemas=True
+ )
eq_(
diffs[0],
}
)
template_args = {}
- autogenerate._produce_migration_diffs(context, template_args, set(),
- include_symbol=lambda name, schema: False
- )
+ autogenerate._produce_migration_diffs(
+ context, template_args, set(),
+ include_symbol=lambda name, schema: False
+ )
eq_(re.sub(r"u'", "'", template_args['upgrades']),
"""### commands auto generated by Alembic - please adjust! ###
pass
schema='%(schema)s'
)
op.drop_table('extra', schema='%(schema)s')
- op.add_column('address', sa.Column('street', sa.String(length=50), nullable=True), schema='%(schema)s')
- op.add_column('order', sa.Column('user_id', sa.Integer(), nullable=True), schema='%(schema)s')
+ op.add_column('address', sa.Column('street', sa.String(length=50), \
+nullable=True), schema='%(schema)s')
+ op.add_column('order', sa.Column('user_id', sa.Integer(), nullable=True), \
+schema='%(schema)s')
op.alter_column('order', 'amount',
existing_type=sa.NUMERIC(precision=8, scale=2),
type_=sa.Numeric(precision=10, scale=2),
server_default=None,
existing_nullable=True,
schema='%(schema)s')
- op.add_column('user', sa.Column('pw', sa.VARCHAR(length=50), autoincrement=False, nullable=True), schema='%(schema)s')
+ op.add_column('user', sa.Column('pw', sa.VARCHAR(length=50), \
+autoincrement=False, nullable=True), schema='%(schema)s')
op.alter_column('order', 'amount',
existing_type=sa.Numeric(precision=10, scale=2),
type_=sa.NUMERIC(precision=8, scale=2),
op.create_table('extra',
sa.Column('x', sa.CHAR(length=1), autoincrement=False, nullable=True),
sa.Column('uid', sa.INTEGER(), autoincrement=False, nullable=True),
- sa.ForeignKeyConstraint(['uid'], ['%(schema)s.user.id'], name='extra_uid_fkey'),
+ sa.ForeignKeyConstraint(['uid'], ['%(schema)s.user.id'], \
+name='extra_uid_fkey'),
schema='%(schema)s'
)
op.drop_table('item', schema='%(schema)s')
# We'll just test the first call
_, args, _ = my_compare_type.mock_calls[0]
- context, inspected_column, metadata_column, inspected_type, metadata_type = args
+ (context, inspected_column, metadata_column,
+ inspected_type, metadata_type) = args
eq_(context, self.context)
eq_(metadata_column, first_column)
eq_(metadata_type, first_column.type)
eq_(inspected_column.name, first_column.name)
eq_(type(inspected_type), INTEGER)
- def test_column_type_not_modified_when_custom_compare_type_returns_False(self):
+ def test_column_type_not_modified_custom_compare_type_returns_False(self):
my_compare_type = Mock()
my_compare_type.return_value = False
self.context._user_compare_type = my_compare_type
eq_(diffs, [])
- def test_column_type_modified_when_custom_compare_type_returns_True(self):
+ def test_column_type_modified_custom_compare_type_returns_True(self):
my_compare_type = Mock()
my_compare_type.return_value = True
self.context._user_compare_type = my_compare_type
from alembic.environment import EnvironmentContext
from alembic.migration import MigrationContext
import unittest
-from . import Mock, call, _no_sql_testing_config, staging_env, clear_staging_env
+from . import Mock, call, _no_sql_testing_config, staging_env, \
+ clear_staging_env
from . import eq_, is_
from alembic import op, command, util
from . import op_fixture, capture_context_buffer, \
_no_sql_testing_config, assert_raises_message, staging_env, \
- three_rev_fixture, clear_staging_env
+ three_rev_fixture, clear_staging_env, eq_
class FullEnvironmentTests(TestCase):
assert "BEGIN TRANSACTION;" in buf.getvalue()
# ensure ends in COMMIT; GO
- assert [x for x in buf.getvalue().splitlines() if x][-2:] == ['COMMIT;', 'GO']
+ eq_(
+ [x for x in buf.getvalue().splitlines() if x][-2:],
+ ['COMMIT;', 'GO']
+ )
def test_batch_separator_default(self):
with capture_context_buffer() as buf:
def test_add_column_with_default(self):
context = op_fixture("mssql")
- op.add_column('t1', Column('c1', Integer, nullable=False, server_default="12"))
+ op.add_column(
+ 't1', Column('c1', Integer, nullable=False, server_default="12"))
context.assert_("ALTER TABLE t1 ADD c1 INTEGER NOT NULL DEFAULT '12'")
def test_alter_column_rename_mssql(self):
context = op_fixture('mssql')
op.drop_column('t1', 'c1', mssql_drop_default=True)
op.drop_column('t1', 'c2', mssql_drop_default=True)
- context.assert_contains("exec('alter table t1 drop constraint ' + @const_name)")
+ context.assert_contains(
+ "exec('alter table t1 drop constraint ' + @const_name)")
context.assert_contains("ALTER TABLE t1 DROP COLUMN c1")
def test_alter_column_drop_default(self):
context = op_fixture('mssql')
op.alter_column("t", "c", server_default=None)
- context.assert_contains("exec('alter table t drop constraint ' + @const_name)")
+ context.assert_contains(
+ "exec('alter table t drop constraint ' + @const_name)")
def test_alter_column_dont_drop_default(self):
context = op_fixture('mssql')
context = op_fixture('mssql')
op.drop_column('t1', 'c1', mssql_drop_check=True)
op.drop_column('t1', 'c2', mssql_drop_check=True)
- context.assert_contains("exec('alter table t1 drop constraint ' + @const_name)")
+ context.assert_contains(
+ "exec('alter table t1 drop constraint ' + @const_name)")
context.assert_contains("ALTER TABLE t1 DROP COLUMN c1")
def test_drop_column_w_check_quoting(self):
context = op_fixture('mssql')
op.drop_column('table', 'column', mssql_drop_check=True)
- context.assert_contains("exec('alter table [table] drop constraint ' + @const_name)")
+ context.assert_contains(
+ "exec('alter table [table] drop constraint ' + @const_name)")
context.assert_contains("ALTER TABLE [table] DROP COLUMN [column]")
def test_alter_column_nullable_w_existing_type(self):
def test_drop_column_w_fk(self):
context = op_fixture('mssql')
op.drop_column('t1', 'c1', mssql_drop_foreign_key=True)
- context.assert_contains("exec('alter table t1 drop constraint ' + @const_name)")
+ context.assert_contains(
+ "exec('alter table t1 drop constraint ' + @const_name)")
context.assert_contains("ALTER TABLE t1 DROP COLUMN c1")
def test_alter_column_not_nullable_w_existing_type(self):
def test_alter_replace_server_default(self):
context = op_fixture('mssql')
- op.alter_column("t", "c", server_default="5", existing_server_default="6")
- context.assert_contains("exec('alter table t drop constraint ' + @const_name)")
+ op.alter_column(
+ "t", "c", server_default="5", existing_server_default="6")
+ context.assert_contains(
+ "exec('alter table t drop constraint ' + @const_name)")
context.assert_contains(
"ALTER TABLE t ADD DEFAULT '5' FOR c"
)
def test_alter_remove_server_default(self):
context = op_fixture('mssql')
op.alter_column("t", "c", server_default=None)
- context.assert_contains("exec('alter table t drop constraint ' + @const_name)")
+ context.assert_contains(
+ "exec('alter table t drop constraint ' + @const_name)")
def test_alter_do_everything(self):
context = op_fixture('mssql')
def test_rename_column(self):
context = op_fixture('mysql')
- op.alter_column('t1', 'c1', new_column_name="c2", existing_type=Integer)
+ op.alter_column(
+ 't1', 'c1', new_column_name="c2", existing_type=Integer)
context.assert_(
'ALTER TABLE t1 CHANGE c1 c2 INTEGER NULL'
)
op.alter_column('my table', 'column one', new_column_name="column two",
existing_type=Integer)
context.assert_(
- 'ALTER TABLE `my table` CHANGE `column one` `column two` INTEGER NULL'
+ 'ALTER TABLE `my table` CHANGE `column one` '
+ '`column two` INTEGER NULL'
)
def test_rename_column_serv_default(self):
context = op_fixture('mysql')
- op.alter_column('t1', 'c1', new_column_name="c2", existing_type=Integer,
- existing_server_default="q")
+ op.alter_column(
+ 't1', 'c1', new_column_name="c2", existing_type=Integer,
+ existing_server_default="q")
context.assert_(
"ALTER TABLE t1 CHANGE c1 c2 INTEGER NULL DEFAULT 'q'"
)
def test_rename_column_serv_compiled_default(self):
context = op_fixture('mysql')
- op.alter_column('t1', 'c1', existing_type=Integer,
- server_default=func.utc_thing(func.current_timestamp()))
+ op.alter_column(
+ 't1', 'c1', existing_type=Integer,
+ server_default=func.utc_thing(func.current_timestamp()))
# this is not a valid MySQL default but the point is to just
# test SQL expression rendering
context.assert_(
- "ALTER TABLE t1 ALTER COLUMN c1 SET DEFAULT utc_thing(CURRENT_TIMESTAMP)"
+ "ALTER TABLE t1 ALTER COLUMN c1 "
+ "SET DEFAULT utc_thing(CURRENT_TIMESTAMP)"
)
def test_rename_column_autoincrement(self):
context = op_fixture('mysql')
- op.alter_column('t1', 'c1', new_column_name="c2", existing_type=Integer,
- existing_autoincrement=True)
+ op.alter_column(
+ 't1', 'c1', new_column_name="c2", existing_type=Integer,
+ existing_autoincrement=True)
context.assert_(
'ALTER TABLE t1 CHANGE c1 c2 INTEGER NULL AUTO_INCREMENT'
)
def test_col_multi_alter(self):
context = op_fixture('mysql')
- op.alter_column('t1', 'c1', nullable=False, server_default="q", type_=Integer)
+ op.alter_column(
+ 't1', 'c1', nullable=False, server_default="q", type_=Integer)
context.assert_(
"ALTER TABLE t1 MODIFY c1 INTEGER NOT NULL DEFAULT 'q'"
)
def test_alter_column_multi_alter_w_drop_default(self):
context = op_fixture('mysql')
- op.alter_column('t1', 'c1', nullable=False, server_default=None, type_=Integer)
+ op.alter_column(
+ 't1', 'c1', nullable=False, server_default=None, type_=Integer)
context.assert_(
"ALTER TABLE t1 MODIFY c1 INTEGER NOT NULL"
)
else:
alternate = txt
expected = False
- t = Table("test", self.metadata,
- Column("somecol", type_, server_default=text(txt) if txt else None)
- )
+ t = Table(
+ "test", self.metadata,
+ Column(
+ "somecol", type_,
+ server_default=text(txt) if txt else None
+ )
+ )
t2 = Table("test", MetaData(),
Column("somecol", type_, server_default=text(alternate))
)
three_rev_fixture, env_file_fixture,\
assert_raises_message
+a = b = c = None
+
class OfflineEnvironmentTest(TestCase):
def test_add_column_with_default():
context = op_fixture()
- op.add_column('t1', Column('c1', Integer, nullable=False, server_default="12"))
- context.assert_("ALTER TABLE t1 ADD COLUMN c1 INTEGER DEFAULT '12' NOT NULL")
+ op.add_column(
+ 't1', Column('c1', Integer, nullable=False, server_default="12"))
+ context.assert_(
+ "ALTER TABLE t1 ADD COLUMN c1 INTEGER DEFAULT '12' NOT NULL")
def test_add_column_schema_with_default():
op.add_column('t1',
Column('c1', Integer, nullable=False, server_default="12"),
schema='foo')
- context.assert_("ALTER TABLE foo.t1 ADD COLUMN c1 INTEGER DEFAULT '12' NOT NULL")
+ context.assert_(
+ "ALTER TABLE foo.t1 ADD COLUMN c1 INTEGER DEFAULT '12' NOT NULL")
def test_add_column_fk():
context = op_fixture()
- op.add_column('t1', Column('c1', Integer, ForeignKey('c2.id'), nullable=False))
+ op.add_column(
+ 't1', Column('c1', Integer, ForeignKey('c2.id'), nullable=False))
context.assert_(
"ALTER TABLE t1 ADD COLUMN c1 INTEGER NOT NULL",
"ALTER TABLE t1 ADD FOREIGN KEY(c1) REFERENCES c2 (id)"
def test_add_column_fk_self_referential():
context = op_fixture()
- op.add_column('t1', Column('c1', Integer, ForeignKey('t1.c2'), nullable=False))
+ op.add_column(
+ 't1', Column('c1', Integer, ForeignKey('t1.c2'), nullable=False))
context.assert_(
"ALTER TABLE t1 ADD COLUMN c1 INTEGER NOT NULL",
"ALTER TABLE t1 ADD FOREIGN KEY(c1) REFERENCES t1 (c2)"
def test_add_column_schema_fk_self_referential():
context = op_fixture()
- op.add_column('t1',
- Column('c1', Integer, ForeignKey('foo.t1.c2'), nullable=False),
- schema='foo')
+ op.add_column(
+ 't1',
+ Column('c1', Integer, ForeignKey('foo.t1.c2'), nullable=False),
+ schema='foo')
context.assert_(
"ALTER TABLE foo.t1 ADD COLUMN c1 INTEGER NOT NULL",
"ALTER TABLE foo.t1 ADD FOREIGN KEY(c1) REFERENCES foo.t1 (c2)"
def test_add_column_fk_schema():
context = op_fixture()
- op.add_column('t1', Column('c1', Integer, ForeignKey('remote.t2.c2'), nullable=False))
+ op.add_column(
+ 't1',
+ Column('c1', Integer, ForeignKey('remote.t2.c2'), nullable=False))
context.assert_(
'ALTER TABLE t1 ADD COLUMN c1 INTEGER NOT NULL',
'ALTER TABLE t1 ADD FOREIGN KEY(c1) REFERENCES remote.t2 (c2)'
def test_add_column_schema_fk_schema():
context = op_fixture()
- op.add_column('t1',
- Column('c1', Integer, ForeignKey('remote.t2.c2'), nullable=False),
- schema='foo')
+ op.add_column(
+ 't1',
+ Column('c1', Integer, ForeignKey('remote.t2.c2'), nullable=False),
+ schema='foo')
context.assert_(
'ALTER TABLE foo.t1 ADD COLUMN c1 INTEGER NOT NULL',
'ALTER TABLE foo.t1 ADD FOREIGN KEY(c1) REFERENCES remote.t2 (c2)'
server_default=func.utc_thing(func.current_timestamp()),
schema='foo')
context.assert_(
- "ALTER TABLE foo.t ALTER COLUMN c SET DEFAULT utc_thing(CURRENT_TIMESTAMP)"
+ "ALTER TABLE foo.t ALTER COLUMN c "
+ "SET DEFAULT utc_thing(CURRENT_TIMESTAMP)"
)
def test_alter_column_schema_type_existing_type():
context = op_fixture('mssql')
- op.alter_column("t", "c", type_=String(10), existing_type=Boolean(name="xyz"))
+ op.alter_column(
+ "t", "c", type_=String(10), existing_type=Boolean(name="xyz"))
context.assert_(
'ALTER TABLE t DROP CONSTRAINT xyz',
'ALTER TABLE t ALTER COLUMN c VARCHAR(10)'
def test_add_foreign_key_dialect_kw():
context = op_fixture()
- with mock.patch("alembic.operations.sa_schema.ForeignKeyConstraint") as fkc:
+ with mock.patch(
+ "alembic.operations.sa_schema.ForeignKeyConstraint") as fkc:
op.create_foreign_key('fk_test', 't1', 't2',
['foo', 'bar'], ['bat', 'hoho'],
foobar_arg='xyz')
def test_add_unique_constraint_col_event():
context = op_fixture()
- op.create_unique_constraint('ik_test',
- 'tbl_with_auto_appended_column', ['foo', 'bar'])
+ op.create_unique_constraint(
+ 'ik_test',
+ 'tbl_with_auto_appended_column', ['foo', 'bar'])
context.assert_(
"ALTER TABLE tbl_with_auto_appended_column "
"ADD CONSTRAINT ik_test UNIQUE (foo, bar)"
@requires_094
def test_add_check_constraint_already_named_from_schema():
- m1 = MetaData(naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"})
+ m1 = MetaData(
+ naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"})
ck = CheckConstraint("im a constraint", name="cc1")
Table('t', m1, Column('x'), ck)
naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"})
op.create_table(
"some_table",
- Column('x', Integer, CheckConstraint("im a constraint", name=op.f("ck_q_cc1")))
+ Column(
+ 'x', Integer,
+ CheckConstraint("im a constraint", name=op.f("ck_q_cc1")))
)
context.assert_(
"CREATE TABLE some_table "
context = op_fixture(naming_convention={
"ck": "ck_%(table_name)s_%(constraint_name)s"
})
- op.add_column('t1', Column('c1', Boolean(name=op.f('foo')), nullable=False))
+ op.add_column(
+ 't1', Column('c1', Boolean(name=op.f('foo')), nullable=False))
context.assert_(
'ALTER TABLE t1 ADD COLUMN c1 BOOLEAN NOT NULL',
'ALTER TABLE t1 ADD CONSTRAINT foo CHECK (c1 IN (0, 1))'
def test_add_column_with_default(self):
context = op_fixture("oracle")
- op.add_column('t1', Column('c1', Integer, nullable=False, server_default="12"))
+ op.add_column(
+ 't1', Column('c1', Integer, nullable=False, server_default="12"))
context.assert_("ALTER TABLE t1 ADD c1 INTEGER DEFAULT '12' NOT NULL")
def test_alter_column_rename_oracle(self):
def test_alter_replace_server_default(self):
context = op_fixture('oracle')
- op.alter_column("t", "c", server_default="5", existing_server_default="6")
+ op.alter_column(
+ "t", "c", server_default="5", existing_server_default="6")
context.assert_(
"ALTER TABLE t MODIFY c DEFAULT '5'"
)
def test_alter_do_everything(self):
context = op_fixture('oracle')
- op.alter_column("t", "c", name="c2", nullable=True, type_=Integer, server_default="5")
+ op.alter_column(
+ "t", "c", name="c2", nullable=True,
+ type_=Integer, server_default="5")
context.assert_(
'ALTER TABLE t MODIFY c NULL',
"ALTER TABLE t MODIFY c DEFAULT '5'",
self._inline_enum_script()
with capture_context_buffer() as buf:
command.upgrade(self.cfg, self.rid, sql=True)
- assert "CREATE TYPE pgenum AS ENUM ('one', 'two', 'three')" in buf.getvalue()
+ assert "CREATE TYPE pgenum AS "\
+ "ENUM ('one', 'two', 'three')" in buf.getvalue()
assert "CREATE TABLE sometable (\n data pgenum\n)" in buf.getvalue()
def test_offline_inline_enum_drop(self):
self._distinct_enum_script()
with capture_context_buffer() as buf:
command.upgrade(self.cfg, self.rid, sql=True)
- assert "CREATE TYPE pgenum AS ENUM ('one', 'two', 'three')" in buf.getvalue()
+ assert "CREATE TYPE pgenum AS ENUM "\
+ "('one', 'two', 'three')" in buf.getvalue()
assert "CREATE TABLE sometable (\n data pgenum\n)" in buf.getvalue()
def test_offline_distinct_enum_drop(self):
execution_options={'no_parameters': True}
)
eq_(
- self.conn.execute("select count(*) from tab where col='new data'").scalar(),
+ self.conn.execute(
+ "select count(*) from tab where col='new data'").scalar(),
1,
)
-from tests import clear_staging_env, staging_env, eq_, ne_, is_, staging_directory
-from tests import _no_sql_testing_config, env_file_fixture, script_file_fixture, _testing_config
+from tests import clear_staging_env, staging_env, eq_, ne_, is_, \
+ staging_directory
+from tests import _no_sql_testing_config, env_file_fixture, \
+ script_file_fixture, _testing_config
from alembic import command
from alembic.script import ScriptDirectory, Script
from alembic.environment import EnvironmentContext
eq_(script.revision, abc)
eq_(script.down_revision, None)
assert os.access(
- os.path.join(env.dir, 'versions', '%s_this_is_a_message.py' % abc), os.F_OK)
+ os.path.join(env.dir, 'versions',
+ '%s_this_is_a_message.py' % abc), os.F_OK)
assert callable(script.module.upgrade)
eq_(env.get_heads(), [abc])
eq_(env.get_base(), abc)
def test_005_nextrev(self):
- script = env.generate_revision(def_, "this is the next rev", refresh=True)
+ script = env.generate_revision(
+ def_, "this is the next rev", refresh=True)
assert os.access(
- os.path.join(env.dir, 'versions', '%s_this_is_the_next_rev.py' % def_), os.F_OK)
+ os.path.join(
+ env.dir, 'versions',
+ '%s_this_is_the_next_rev.py' % def_), os.F_OK)
eq_(script.revision, def_)
eq_(script.down_revision, abc)
eq_(env._revision_map[abc].nextrev, set([def_]))
"lots of characters and also "
"I'd like it to\nhave\nnewlines")
assert os.access(
- os.path.join(env.dir, 'versions',
- '%s_this_is_a_really_long_name_with_lots_of_.py' % rid),
+ os.path.join(
+ env.dir, 'versions',
+ '%s_this_is_a_really_long_name_with_lots_of_.py' % rid),
os.F_OK)
def test_009_long_name_configurable(self):
def test_stamp(self):
with capture_context_buffer() as buf:
command.stamp(cfg, "head", sql=True)
- assert "UPDATE alembic_version SET version_num='%s';" % c in buf.getvalue()
+ assert "UPDATE alembic_version "\
+ "SET version_num='%s';" % c in buf.getvalue()
class EncodingTest(unittest.TestCase):
_sqlite_testing_config, sqlite_db, eq_, write_script, \
assert_raises_message
+a = b = c = None
+
class VersioningTest(unittest.TestCase):
sourceless = False