=====
- [feature] Informative error message when op.XYZ
directives are invoked at module import time.
-
+
+- [bug] implement 'tablename' parameter on
+ drop_index() as this is needed by some
+ backends.
+
+- [feature] Added execution_options parameter
+ to op.execute(), will call execution_options()
+ on the Connection before executing.
+
+ The immediate use case here is to allow
+ access to the new no_parameters option
+ in SQLAlchemy 0.7.6, which allows
+ some DBAPIs (psycopg2, MySQLdb) to allow
+ percent signs straight through without
+ escaping, thus providing cross-compatible
+ operation with DBAPI execution and
+ static script generation.
+
- [bug] setup.py won't install argparse if on
Python 2.7/3.2
from alembic.ddl import base
from alembic import util
from sqlalchemy import types as sqltypes
+from sqlalchemy import util as sqla_util
class ImplMeta(type):
def __init__(cls, classname, bases, dict_):
def bind(self):
return self.connection
- def _exec(self, construct, *args, **kw):
+ def _exec(self, construct, execution_options=None,
+ multiparams=(),
+ params=sqla_util.immutabledict()):
if isinstance(construct, basestring):
construct = text(construct)
if self.as_sql:
- if args or kw:
+ if multiparams or params:
# TODO: coverage
raise Exception("Execution arguments not allowed with as_sql")
self.static_output(unicode(
construct.compile(dialect=self.dialect)
).replace("\t", " ").strip() + ";")
else:
- self.connection.execute(construct, *args, **kw)
+ conn = self.connection
+ if execution_options:
+ conn = conn.execution_options(**execution_options)
+ conn.execute(construct, *multiparams, **params)
- def execute(self, sql):
- self._exec(sql)
+ def execute(self, sql, execution_options=None):
+ self._exec(sql, execution_options)
def alter_column(self, table_name, column_name,
nullable=None,
with Operations.context(self._migration_context):
self.get_context().run_migrations(**kw)
- def execute(self, sql):
+ def execute(self, sql, execution_options=None):
"""Execute the given SQL using the current change context.
The behavior of :meth:`.execute` is the same
first been made available via :meth:`.configure`.
"""
- self.get_context().execute(sql)
+ self.get_context().execute(sql,
+ execution_options=execution_options)
def static_output(self, text):
"""Emit text directly to the "offline" SQL stream.
return schema.Column(name, type_, **kw)
def _index(self, name, tablename, columns, **kw):
- t = schema.Table(tablename, schema.MetaData(),
+ t = schema.Table(tablename or 'no_table', schema.MetaData(),
*[schema.Column(n, NULLTYPE) for n in columns]
)
return schema.Index(name, *list(t.c), **kw)
self._index(name, tablename, *columns, **kw)
)
- def drop_index(self, name):
+ def drop_index(self, name, tablename=None):
"""Issue a "drop index" instruction using the current
migration context.
e.g.::
drop_index("accounts")
+
+ :param tablename: name of the owning table. Some
+ backends such as Microsoft SQL Server require this.
"""
# need a dummy column name here since SQLAlchemy
# 0.7.6 and further raises on Index with no columns
- self.impl.drop_index(self._index(name, 'foo', ['x']))
+ self.impl.drop_index(self._index(name, tablename, ['x']))
def drop_constraint(self, name, tablename):
"""Drop a constraint of the given name"""
"""
return impl._literal_bindparam(None, value, type_=type_)
- def execute(self, sql):
+ def execute(self, sql, execution_options=None):
"""Execute the given SQL using the current migration context.
In a SQL script context, the statement is emitted directly to the
* Pretty much anything that's "executable" as described
in :ref:`sqlexpression_toplevel`.
-
+ :param execution_options: Optional dictionary of
+ execution options, will be passed to
+ :meth:`sqlalchemy.engine.base.Connection.execution_options`.
"""
- self.migration_context.impl.execute(sql)
+ self.migration_context.impl.execute(sql,
+ execution_options=execution_options)
def get_bind(self):
"""Return the current 'bind'.
'ALTER TABLE t ALTER COLUMN c INTEGER'
)
+ def test_drop_index(self):
+ context = op_fixture('mssql')
+ op.drop_index('my_idx', 'my_table')
+ # TODO: annoying that SQLA escapes unconditionally
+ context.assert_contains("DROP INDEX [my_table].my_idx")
+
def test_drop_column_w_default(self):
context = op_fixture('mssql')
op.drop_column('t1', 'c1', mssql_drop_default=True)
def downgrade():
op.drop_table("sometable")
ENUM(name="pgenum").drop(op.get_bind(), checkfirst=False)
-
+
""" % self.rid)
def test_offline_inline_enum_create(self):
assert "DROP TABLE sometable" in buf.getvalue()
assert "DROP TYPE pgenum" in buf.getvalue()
+from alembic.migration import MigrationContext
+from alembic.operations import Operations
+from sqlalchemy.sql import table, column
+
+class PostgresqlInlineLiteralTest(TestCase):
+ @classmethod
+ def setup_class(cls):
+ cls.bind = db_for_dialect("postgresql")
+ cls.bind.execute("""
+ create table tab (
+ col varchar(50)
+ )
+ """)
+ cls.bind.execute("""
+ insert into tab (col) values
+ ('old data 1'),
+ ('old data 2.1'),
+ ('old data 3')
+ """)
+
+ @classmethod
+ def teardown_class(cls):
+ cls.bind.execute("drop table tab")
+
+ def setUp(self):
+ self.conn = self.bind.connect()
+ ctx = MigrationContext.configure(self.conn)
+ self.op = Operations(ctx)
+ def tearDown(self):
+ self.conn.close()
+
+ def test_inline_percent(self):
+ # TODO: here's the issue, you need to escape this.
+ tab = table('tab', column('col'))
+ self.op.execute(
+ tab.update().where(
+ tab.c.col.like(self.op.inline_literal('%.%'))
+ ).values(col=self.op.inline_literal('new data')),
+ execution_options={'no_parameters':True}
+ )
+ eq_(
+ self.conn.execute("select count(*) from tab where col='new data'").scalar(),
+ 1,
+ )
class PostgresqlDefaultCompareTest(TestCase):
@classmethod