- add support for actual DB connections, test.cfg, etc.
- add PG server default comparison tests, #6
if metadata_column.type._type_affinity is not sqltypes.String:
rendered_metadata_default = re.sub(r"^'|'$", "", rendered_metadata_default)
- return not self.connection.execute(
+ return not self.connection.scalar(
"SELECT %s = %s" % (
conn_col_default,
rendered_metadata_default
--- /dev/null
+[db]
+postgresql = postgresql://scott:tiger@localhost/test
+mysql = mysql://scott:tiger@localhost/test
+mssql = mssql+pyodbc://scott:tiger@ms_2005/
+oracle=oracle://scott:tiger@172.16.248.129/xe
+sybase=sybase+pyodbc://scott:tiger7@sybase/
+firebird=firebird://scott:tiger@localhost/foo.gdb?type_conv=300
+oursql=mysql+oursql://scott:tiger@localhost/test
+pymssql=mssql+pymssql://scott:tiger@ms_2005/
+
from alembic import ddl
import StringIO
from alembic.ddl.impl import _impls
+import ConfigParser
+from nose import SkipTest
+from sqlalchemy.exc import SQLAlchemyError
staging_directory = os.path.join(os.path.dirname(__file__), 'scratch')
files_directory = os.path.join(os.path.dirname(__file__), 'files')
+testing_config = ConfigParser.ConfigParser()
+testing_config.read(['test.cfg'])
+
+def sqlite_db():
+ # sqlite caches table pragma info
+ # per connection, so create a new
+ # engine for each assertion
+ dir_ = os.path.join(staging_directory, 'scripts')
+ return create_engine('sqlite:///%s/foo.db' % dir_)
+
+_engs = {}
+def db_for_dialect(name):
+ if name in _engs:
+ return _engs[name]
+ else:
+ try:
+ cfg = testing_config.get("db", name)
+ except ConfigParser.NoOptionError:
+ raise SkipTest("No dialect %r in test.cfg" % name)
+ try:
+ eng = create_engine(cfg, echo=True)
+ except ImportError, er1:
+ raise SkipTest("Can't import DBAPI: %s" % er1)
+ try:
+ conn = eng.connect()
+ except SQLAlchemyError, er2:
+ raise SkipTest("Can't connect to database: %s" % er2)
+ _engs[name] = eng
+ return eng
+
_dialects = {}
def _get_dialect(name):
if name is None or name == 'default':
os.mkdir(staging_directory)
return Config(os.path.join(staging_directory, 'test_alembic.ini'))
-def _op_fixture(dialect='default', as_sql=False):
+def op_fixture(dialect='default', as_sql=False):
impl = _impls[dialect]
class Impl(impl):
def __init__(self, dialect, as_sql):
)
return ctx(dialect, as_sql)
-def _env_file_fixture(txt):
+def env_file_fixture(txt):
dir_ = os.path.join(staging_directory, 'scripts')
txt = """
from alembic import context
""" % (dir_, dir_))
-def _no_sql_testing_config():
+def no_sql_testing_config():
"""use a postgresql url with no host so that connections guaranteed to fail"""
dir_ = os.path.join(staging_directory, 'scripts')
return _write_config_file("""
open(cfg.config_file_name, 'w').write(text)
return cfg
-def sqlite_db():
- # sqlite caches table pragma info
- # per connection, so create a new
- # engine for each assertion
- dir_ = os.path.join(staging_directory, 'scripts')
- return create_engine('sqlite:///%s/foo.db' % dir_)
def staging_env(create=True, template="generic"):
from alembic import command, script
-from tests import _op_fixture
+from tests import op_fixture
from alembic import op
from sqlalchemy import Integer, \
UniqueConstraint, String
from sqlalchemy.sql import table, column
def _test_bulk_insert(dialect, as_sql):
- context = _op_fixture(dialect, as_sql)
+ context = op_fixture(dialect, as_sql)
t1 = table("ins_table",
column('id', Integer),
column('v1', String()),
)
def test_bulk_insert_wrong_cols():
- context = _op_fixture('postgresql')
+ context = op_fixture('postgresql')
t1 = table("ins_table",
column('id', Integer),
column('v1', String()),
"""Test op functions against MSSQL."""
-from tests import _op_fixture
+from tests import op_fixture
from alembic import op
from sqlalchemy import Integer, Column, ForeignKey, \
UniqueConstraint, Table, MetaData, String
from sqlalchemy.sql import table
def test_add_column():
- context = _op_fixture('mssql')
+ context = op_fixture('mssql')
op.add_column('t1', Column('c1', Integer, nullable=False))
context.assert_("ALTER TABLE t1 ADD c1 INTEGER NOT NULL")
def test_add_column_with_default():
- context = _op_fixture("mssql")
+ context = op_fixture("mssql")
op.add_column('t1', Column('c1', Integer, nullable=False, server_default="12"))
context.assert_("ALTER TABLE t1 ADD c1 INTEGER NOT NULL DEFAULT '12'")
def test_alter_column_rename_mssql():
- context = _op_fixture('mssql')
+ context = op_fixture('mssql')
op.alter_column("t", "c", name="x")
context.assert_(
"EXEC sp_rename 't.c', 'x', 'COLUMN'"
)
def test_drop_column_w_default():
- context = _op_fixture('mssql')
+ context = op_fixture('mssql')
op.drop_column('t1', 'c1', mssql_drop_default=True)
context.assert_contains("exec('alter table t1 drop constraint ' + @const_name)")
context.assert_contains("ALTER TABLE t1 DROP COLUMN c1")
def test_drop_column_w_check():
- context = _op_fixture('mssql')
+ context = op_fixture('mssql')
op.drop_column('t1', 'c1', mssql_drop_check=True)
context.assert_contains("exec('alter table t1 drop constraint ' + @const_name)")
context.assert_contains("ALTER TABLE t1 DROP COLUMN c1")
def test_alter_column_nullable():
- context = _op_fixture('mssql')
+ context = op_fixture('mssql')
op.alter_column("t", "c", nullable=True)
context.assert_(
"ALTER TABLE t ALTER COLUMN c NULL"
)
def test_alter_column_not_nullable():
- context = _op_fixture('mssql')
+ context = op_fixture('mssql')
op.alter_column("t", "c", nullable=False)
context.assert_(
"ALTER TABLE t ALTER COLUMN c SET NOT NULL"
# TODO: when we add schema support
#def test_alter_column_rename_mssql_schema():
-# context = _op_fixture('mssql')
+# context = op_fixture('mssql')
# op.alter_column("t", "c", name="x", schema="y")
# context.assert_(
# "EXEC sp_rename 'y.t.c', 'x', 'COLUMN'"
-from tests import _op_fixture, assert_raises_message
+from tests import op_fixture, assert_raises_message
from alembic import op, util
from sqlalchemy import Integer, Column, ForeignKey, \
UniqueConstraint, Table, MetaData, String
from sqlalchemy.sql import table
def test_rename_column():
- context = _op_fixture('mysql')
+ context = op_fixture('mysql')
op.alter_column('t1', 'c1', name="c2", existing_type=Integer)
context.assert_(
'ALTER TABLE t1 CHANGE c1 c2 INTEGER NULL'
)
def test_rename_column_serv_default():
- context = _op_fixture('mysql')
+ context = op_fixture('mysql')
op.alter_column('t1', 'c1', name="c2", existing_type=Integer, existing_server_default="q")
context.assert_(
"ALTER TABLE t1 CHANGE c1 c2 INTEGER NULL DEFAULT 'q'"
)
def test_col_nullable():
- context = _op_fixture('mysql')
+ context = op_fixture('mysql')
op.alter_column('t1', 'c1', nullable=False, existing_type=Integer)
context.assert_(
'ALTER TABLE t1 CHANGE c1 c1 INTEGER NOT NULL'
)
def test_col_multi_alter():
- context = _op_fixture('mysql')
+ context = op_fixture('mysql')
op.alter_column('t1', 'c1', nullable=False, server_default="q", type_=Integer)
context.assert_(
"ALTER TABLE t1 CHANGE c1 c1 INTEGER NOT NULL DEFAULT 'q'"
def test_col_alter_type_required():
- context = _op_fixture('mysql')
+ context = op_fixture('mysql')
assert_raises_message(
util.CommandError,
"All MySQL ALTER COLUMN operations require the existing type.",
from tests import clear_staging_env, staging_env, \
- _no_sql_testing_config, sqlite_db, eq_, ne_, \
- capture_context_buffer, three_rev_fixture, _env_file_fixture,\
+ no_sql_testing_config, sqlite_db, eq_, ne_, \
+ capture_context_buffer, three_rev_fixture, env_file_fixture,\
assert_raises_message
from alembic import command, util
from unittest import TestCase
class OfflineEnvironmentTest(TestCase):
def setUp(self):
env = staging_env()
- self.cfg = _no_sql_testing_config()
+ self.cfg = no_sql_testing_config()
global a, b, c
a, b, c = three_rev_fixture(self.cfg)
clear_staging_env()
def test_not_requires_connection(self):
- _env_file_fixture("""
+ env_file_fixture("""
assert not context.requires_connection()
""")
command.upgrade(self.cfg, a, sql=True)
command.downgrade(self.cfg, a, sql=True)
def test_requires_connection(self):
- _env_file_fixture("""
+ env_file_fixture("""
assert context.requires_connection()
""")
command.upgrade(self.cfg, a)
def test_starting_rev_post_context(self):
- _env_file_fixture("""
+ env_file_fixture("""
context.configure(dialect_name='sqlite', starting_rev='x')
assert context.get_starting_revision_argument() == 'x'
""")
command.stamp(self.cfg, a)
def test_starting_rev_pre_context(self):
- _env_file_fixture("""
+ env_file_fixture("""
assert context.get_starting_revision_argument() == 'x'
""")
command.upgrade(self.cfg, "x:y", sql=True)
command.stamp(self.cfg, a)
def test_starting_rev_current_pre_context(self):
- _env_file_fixture("""
+ env_file_fixture("""
assert context.get_starting_revision_argument() is None
""")
assert_raises_message(
)
def test_destination_rev_pre_context(self):
- _env_file_fixture("""
+ env_file_fixture("""
assert context.get_revision_argument() == '%s'
""" % b)
command.upgrade(self.cfg, b, sql=True)
command.stamp(self.cfg, b, sql=True)
def test_destination_rev_post_context(self):
- _env_file_fixture("""
+ env_file_fixture("""
context.configure(dialect_name='sqlite')
assert context.get_revision_argument() == '%s'
""" % b)
command.stamp(self.cfg, b, sql=True)
def test_head_rev_pre_context(self):
- _env_file_fixture("""
+ env_file_fixture("""
assert context.get_head_revision() == '%s'
""" % c)
command.upgrade(self.cfg, b, sql=True)
command.current(self.cfg)
def test_head_rev_post_context(self):
- _env_file_fixture("""
+ env_file_fixture("""
context.configure(dialect_name='sqlite')
assert context.get_head_revision() == '%s'
""" % c)
command.current(self.cfg)
def test_tag_pre_context(self):
- _env_file_fixture("""
+ env_file_fixture("""
assert context.get_tag_argument() == 'hi'
""")
command.upgrade(self.cfg, b, sql=True, tag='hi')
command.downgrade(self.cfg, b, sql=True, tag='hi')
def test_tag_pre_context_None(self):
- _env_file_fixture("""
+ env_file_fixture("""
assert context.get_tag_argument() is None
""")
command.upgrade(self.cfg, b, sql=True)
command.downgrade(self.cfg, b, sql=True)
def test_tag_cmd_arg(self):
- _env_file_fixture("""
+ env_file_fixture("""
context.configure(dialect_name='sqlite')
assert context.get_tag_argument() == 'hi'
""")
command.downgrade(self.cfg, b, sql=True, tag='hi')
def test_tag_cfg_arg(self):
- _env_file_fixture("""
+ env_file_fixture("""
context.configure(dialect_name='sqlite', tag='there')
assert context.get_tag_argument() == 'there'
""")
command.downgrade(self.cfg, b, sql=True, tag='hi')
def test_tag_None(self):
- _env_file_fixture("""
+ env_file_fixture("""
context.configure(dialect_name='sqlite')
assert context.get_tag_argument() is None
""")
"""Test against the builders in the op.* module."""
-from tests import _op_fixture
+from tests import op_fixture
from alembic import op
from sqlalchemy import Integer, Column, ForeignKey, \
UniqueConstraint, Table, MetaData, String,\
from sqlalchemy.sql import table, column, func
def test_rename_table():
- context = _op_fixture()
+ context = op_fixture()
op.rename_table('t1', 't2')
context.assert_("ALTER TABLE t1 RENAME TO t2")
def test_rename_table_schema():
- context = _op_fixture()
+ context = op_fixture()
op.rename_table('t1', 't2', schema="foo")
context.assert_("ALTER TABLE foo.t1 RENAME TO foo.t2")
def test_add_column():
- context = _op_fixture()
+ context = op_fixture()
op.add_column('t1', Column('c1', Integer, nullable=False))
context.assert_("ALTER TABLE t1 ADD COLUMN c1 INTEGER NOT NULL")
def test_add_column_with_default():
- context = _op_fixture()
+ context = op_fixture()
op.add_column('t1', Column('c1', Integer, nullable=False, server_default="12"))
context.assert_("ALTER TABLE t1 ADD COLUMN c1 INTEGER DEFAULT '12' NOT NULL")
def test_add_column_fk():
- context = _op_fixture()
+ context = op_fixture()
op.add_column('t1', Column('c1', Integer, ForeignKey('c2.id'), nullable=False))
context.assert_(
"ALTER TABLE t1 ADD COLUMN c1 INTEGER NOT NULL",
def test_add_column_schema_type():
"""Test that a schema type generates its constraints...."""
- context = _op_fixture()
+ context = op_fixture()
op.add_column('t1', Column('c1', Boolean, nullable=False))
context.assert_(
'ALTER TABLE t1 ADD COLUMN c1 BOOLEAN NOT NULL',
def test_add_column_schema_type_checks_rule():
"""Test that a schema type doesn't generate a
constraint based on check rule."""
- context = _op_fixture('postgresql')
+ context = op_fixture('postgresql')
op.add_column('t1', Column('c1', Boolean, nullable=False))
context.assert_(
'ALTER TABLE t1 ADD COLUMN c1 BOOLEAN NOT NULL',
)
def test_add_column_fk_self_referential():
- context = _op_fixture()
+ context = op_fixture()
op.add_column('t1', Column('c1', Integer, ForeignKey('t1.c2'), nullable=False))
context.assert_(
"ALTER TABLE t1 ADD COLUMN c1 INTEGER NOT NULL",
)
def test_drop_column():
- context = _op_fixture()
+ context = op_fixture()
op.drop_column('t1', 'c1')
context.assert_("ALTER TABLE t1 DROP COLUMN c1")
def test_alter_column_nullable():
- context = _op_fixture()
+ context = op_fixture()
op.alter_column("t", "c", nullable=True)
context.assert_(
# TODO: not sure if this is PG only or standard
)
def test_alter_column_not_nullable():
- context = _op_fixture()
+ context = op_fixture()
op.alter_column("t", "c", nullable=False)
context.assert_(
# TODO: not sure if this is PG only or standard
)
def test_alter_column_rename():
- context = _op_fixture()
+ context = op_fixture()
op.alter_column("t", "c", name="x")
context.assert_(
"ALTER TABLE t RENAME c TO x"
)
def test_alter_column_type():
- context = _op_fixture()
+ context = op_fixture()
op.alter_column("t", "c", type_=String(50))
context.assert_(
'ALTER TABLE t ALTER COLUMN c TYPE VARCHAR(50)'
)
def test_alter_column_schema_type_unnamed():
- context = _op_fixture('mssql')
+ context = op_fixture('mssql')
op.alter_column("t", "c", type_=Boolean())
context.assert_(
'ALTER TABLE t ALTER COLUMN c TYPE BIT',
)
def test_alter_column_schema_type_named():
- context = _op_fixture('mssql')
+ context = op_fixture('mssql')
op.alter_column("t", "c", type_=Boolean(name="xyz"))
context.assert_(
'ALTER TABLE t ALTER COLUMN c TYPE BIT',
)
def test_alter_column_schema_type_existing_type():
- context = _op_fixture('mssql')
+ context = op_fixture('mssql')
op.alter_column("t", "c", type_=String(10), existing_type=Boolean(name="xyz"))
context.assert_(
'ALTER TABLE t DROP CONSTRAINT xyz',
)
def test_add_foreign_key():
- context = _op_fixture()
+ context = op_fixture()
op.create_foreign_key('fk_test', 't1', 't2',
['foo', 'bar'], ['bat', 'hoho'])
context.assert_(
)
def test_add_check_constraint():
- context = _op_fixture()
+ context = op_fixture()
op.create_check_constraint(
"ck_user_name_len",
"user_table",
)
def test_add_unique_constraint():
- context = _op_fixture()
+ context = op_fixture()
op.create_unique_constraint('uk_test', 't1', ['foo', 'bar'])
context.assert_(
"ALTER TABLE t1 ADD CONSTRAINT uk_test UNIQUE (foo, bar)"
)
def test_drop_constraint():
- context = _op_fixture()
+ context = op_fixture()
op.drop_constraint('foo_bar_bat', 't1')
context.assert_(
"ALTER TABLE t1 DROP CONSTRAINT foo_bar_bat"
)
def test_create_index():
- context = _op_fixture()
+ context = op_fixture()
op.create_index('ik_test', 't1', ['foo', 'bar'])
context.assert_(
"CREATE INDEX ik_test ON t1 (foo, bar)"
def test_drop_index():
- context = _op_fixture()
+ context = op_fixture()
op.drop_index('ik_test')
context.assert_(
"DROP INDEX ik_test"
)
def test_drop_table():
- context = _op_fixture()
+ context = op_fixture()
op.drop_table('tb_test')
context.assert_(
"DROP TABLE tb_test"
)
def test_create_table_fk_and_schema():
- context = _op_fixture()
+ context = op_fixture()
op.create_table(
"some_table",
Column('id', Integer, primary_key=True),
)
def test_create_table_two_fk():
- context = _op_fixture()
+ context = op_fixture()
op.create_table(
"some_table",
Column('id', Integer, primary_key=True),
)
def test_inline_literal():
- context = _op_fixture()
+ context = op_fixture()
from sqlalchemy.sql import table, column
from sqlalchemy import String, Integer
--- /dev/null
+
+from tests import op_fixture, db_for_dialect, eq_, staging_env, clear_staging_env
+from unittest import TestCase
+from sqlalchemy import DateTime, MetaData, Table, Column, text, Integer, String
+from sqlalchemy.engine.reflection import Inspector
+from alembic import context
+
+class PostgresqlDefaultCompareTest(TestCase):
+ @classmethod
+ def setup_class(cls):
+ cls.bind = db_for_dialect("postgresql")
+ staging_env()
+ context.configure(
+ connection = cls.bind.connect(),
+ compare_type = True,
+ compare_server_default = True,
+ )
+ connection = context.get_bind()
+ cls.autogen_context = {
+ 'imports':set(),
+ 'connection':connection,
+ 'dialect':connection.dialect,
+ 'context':context.get_context()
+ }
+
+ @classmethod
+ def teardown_class(cls):
+ clear_staging_env()
+
+ def setUp(self):
+ self.metadata = MetaData(self.bind)
+
+ def tearDown(self):
+ self.metadata.drop_all()
+
+ def _compare_default_roundtrip(
+ self, type_, txt, alternate=None):
+ if alternate:
+ expected = True
+ else:
+ alternate = txt
+ expected = False
+ t = Table("test", self.metadata,
+ Column("somecol", type_, server_default=text(txt))
+ )
+ t2 = Table("test", MetaData(),
+ Column("somecol", type_, server_default=text(alternate))
+ )
+ assert self._compare_default(
+ t, t2, t2.c.somecol, alternate
+ ) is expected
+# t.create(self.bind)
+# insp = Inspector.from_engine(self.bind)
+# cols = insp.get_columns("test")
+# ctx = context.get_context()
+# assert ctx.impl.compare_server_default(
+# cols[0],
+# t2.c.somecol,
+# alternate) is expected
+
+ def _compare_default(
+ self,
+ t1, t2, col,
+ rendered
+ ):
+ t1.create(self.bind)
+ insp = Inspector.from_engine(self.bind)
+ cols = insp.get_columns(t1.name)
+ ctx = context.get_context()
+ return ctx.impl.compare_server_default(
+ cols[0],
+ col,
+ rendered)
+
+ def test_compare_current_timestamp(self):
+ self._compare_default_roundtrip(
+ DateTime(),
+ "TIMEZONE('utc', CURRENT_TIMESTAMP)",
+ )
+
+ def test_compare_current_timestamp(self):
+ self._compare_default_roundtrip(
+ DateTime(),
+ "TIMEZONE('utc', CURRENT_TIMESTAMP)",
+ )
+
+ def test_compare_integer(self):
+ self._compare_default_roundtrip(
+ Integer(),
+ "5",
+ )
+
+ def test_compare_integer_diff(self):
+ self._compare_default_roundtrip(
+ Integer(),
+ "5", "7"
+ )
+
+ def test_compare_character_diff(self):
+ self._compare_default_roundtrip(
+ String(),
+ "'hello'",
+ "'there'"
+ )
+
+ def test_primary_key_skip(self):
+ """Test that SERIAL cols are just skipped"""
+ t1 = Table("sometable", self.metadata,
+ Column("id", Integer, primary_key=True)
+ )
+ t2 = Table("sometable", MetaData(),
+ Column("id", Integer, primary_key=True)
+ )
+ assert not self._compare_default(
+ t1, t2, t2.c.id, ""
+ )
\ No newline at end of file
-from tests import clear_staging_env, staging_env, _no_sql_testing_config, sqlite_db, eq_, ne_, capture_context_buffer, three_rev_fixture
+from tests import clear_staging_env, staging_env, no_sql_testing_config, sqlite_db, eq_, ne_, capture_context_buffer, three_rev_fixture
from alembic import command, util
def setup():
global cfg, env
env = staging_env()
- cfg = _no_sql_testing_config()
+ cfg = no_sql_testing_config()
global a, b, c
a, b, c = three_rev_fixture(cfg)