text
from sqlalchemy import schema, create_engine
from sqlalchemy.util import importlater
+from sqlalchemy.ext.compiler import compiles
+from sqlalchemy.sql.expression import _BindParamClause
import logging
base = importlater("alembic.ddl", "base")
if self.as_sql and self.transactional_ddl:
print "COMMIT;\n"
- def _exec(self, construct):
+ def _exec(self, construct, *args, **kw):
if isinstance(construct, basestring):
construct = text(construct)
if self.as_sql:
+ if args or kw:
+ raise Exception("Execution arguments not allowed with as_sql")
print unicode(
- construct.compile(dialect=self.connection.dialect)
+ construct.compile(dialect=self.dialect)
).replace("\t", " ") + ";"
else:
- self.connection.execute(construct)
+ self.connection.execute(construct, *args, **kw)
+
+ @property
+ def dialect(self):
+ return self.connection.dialect
def execute(self, sql):
self._exec(sql)
def drop_table(self, table):
self._exec(schema.DropTable(table))
+ def bulk_insert(self, table, rows):
+ if self.as_sql:
+ for row in rows:
+ self._exec(table.insert().values(**dict(
+ (k, _literal_bindparam(k, v, type_=table.c[k].type))
+ for k, v in row.items()
+ )))
+ else:
+ self._exec(table.insert(), *rows)
+
+class _literal_bindparam(_BindParamClause):
+ pass
+
+@compiles(_literal_bindparam)
+def _render_literal_bindparam(element, compiler, **kw):
+ return compiler.render_literal_bindparam(element, **kw)
+
def opts(cfg, **kw):
global _context_opts, config
_context_opts = kw
-import postgresql, mysql, sqlite
\ No newline at end of file
+import postgresql, mysql, sqlite, mssql
\ No newline at end of file
--- /dev/null
+from alembic.context import DefaultContext
+
+class MSSQLContext(DefaultContext):
+ __dialect__ = 'mssql'
+ transactional_ddl = True
+
+ def bulk_insert(self, table, rows):
+ if self.as_sql:
+ self._exec(
+ "SET IDENTITY_INSERT %s ON" %
+ self.dialect.identifier_preparer.format_table(table)
+ )
+ super(MSSQLContext, self).bulk_insert(table, rows)
+ self._exec(
+ "SET IDENTITY_INSERT %s OFF" %
+ self.dialect.identifier_preparer.format_table(table)
+ )
+ else:
+ super(MSSQLContext, self).bulk_insert(table, rows)
\ No newline at end of file
_table(name, *columns, **kw)
)
+def bulk_insert(table, rows):
+ get_context().bulk_insert(table, rows)
+
def execute(sql):
get_context().execute(sql)
-from sqlalchemy.util import defaultdict
from sqlalchemy.engine import url, default
import shutil
import os
import itertools
-from sqlalchemy import create_engine
+from sqlalchemy import create_engine, text
from alembic import context
import re
+from alembic.context import _context_impls
+from alembic import ddl
staging_directory = os.path.join(os.path.dirname(__file__), 'scratch')
-_dialects = defaultdict(lambda name:url.URL(drivername).get_dialect()())
+_dialects = {}
def _get_dialect(name):
- if name is None:
+ if name is None or name == 'default':
return default.DefaultDialect()
else:
- return _dialects[name]
-
+ try:
+ return _dialects[name]
+ except KeyError:
+ dialect_mod = getattr(__import__('sqlalchemy.dialects.%s' % name).dialects, name)
+ _dialects[name] = d = dialect_mod.dialect()
+ return d
def assert_compiled(element, assert_string, dialect=None):
dialect = _get_dialect(dialect)
os.mkdir(staging_directory)
return Config(os.path.join(staging_directory, 'test_alembic.ini'))
-class _op_fixture(context.DefaultContext):
- def __init__(self):
- # TODO: accept dialect here.
- context._context = self
- self.assertion = []
-
- def _exec(self, construct):
- sql = unicode(construct.compile())
- sql = re.sub(r'[\n\t]', '', sql)
- self.assertion.append(
- sql
- )
-
- def assert_(self, *sql):
- # TODO: make this more flexible about
- # whitespace and such
- eq_(self.assertion, list(sql))
+def _op_fixture(dialect='default', as_sql=False):
+ _base = _context_impls[dialect]
+ class ctx(_base):
+ def __init__(self, dialect='default', as_sql=False):
+ self._dialect = _get_dialect(dialect)
+
+ context._context = self
+ self.as_sql = as_sql
+ self.assertion = []
+
+ @property
+ def dialect(self):
+ return self._dialect
+
+ def _exec(self, construct, *args, **kw):
+ if isinstance(construct, basestring):
+ construct = text(construct)
+ sql = unicode(construct.compile(dialect=self.dialect))
+ sql = re.sub(r'[\n\t]', '', sql)
+ self.assertion.append(
+ sql
+ )
+
+ def assert_(self, *sql):
+ # TODO: make this more flexible about
+ # whitespace and such
+ eq_(self.assertion, list(sql))
+ return ctx(dialect, as_sql)
def _sqlite_testing_config():
cfg = _testing_config()
--- /dev/null
+from tests import _op_fixture
+from alembic import op
+from sqlalchemy import Integer, Column, ForeignKey, \
+ UniqueConstraint, Table, MetaData, String
+from sqlalchemy.sql import table
+
+def _test_bulk_insert(dialect, as_sql):
+ context = _op_fixture(dialect, as_sql)
+ t1 = table("ins_table",
+ Column('id', Integer, primary_key=True),
+ Column('v1', String()),
+ Column('v2', String()),
+ )
+ op.bulk_insert(t1, [
+ {'id':1, 'v1':'row v1', 'v2':'row v5'},
+ {'id':2, 'v1':'row v2', 'v2':'row v6'},
+ {'id':3, 'v1':'row v3', 'v2':'row v7'},
+ {'id':4, 'v1':'row v4', 'v2':'row v8'},
+ ])
+ return context
+
+def test_bulk_insert():
+ context = _test_bulk_insert('default', False)
+ context.assert_(
+ 'INSERT INTO ins_table (id, v1, v2) VALUES (:id, :v1, :v2)'
+ )
+
+def test_bulk_insert_wrong_cols():
+ context = _op_fixture('postgresql')
+ t1 = Table("ins_table", MetaData(),
+ Column('id', Integer, primary_key=True),
+ Column('v1', String()),
+ Column('v2', String()),
+ )
+ op.bulk_insert(t1, [
+ {'v1':'row v1', },
+ ])
+ # TODO: this is wrong because the test fixture isn't actually
+ # doing what the real context would do. Sending this to
+ # PG is going to produce a RETURNING clause. fixture would
+ # need to be beefed up
+ context.assert_(
+ 'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
+ )
+
+def test_bulk_insert_pg():
+ context = _test_bulk_insert('postgresql', False)
+ context.assert_(
+ 'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
+ )
+
+def test_bulk_insert_mssql():
+ context = _test_bulk_insert('mssql', False)
+ context.assert_(
+ 'INSERT INTO ins_table (id, v1, v2) VALUES (:id, :v1, :v2)'
+ )
+
+def test_bulk_insert_as_sql():
+ context = _test_bulk_insert('default', True)
+ context.assert_(
+ "INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')",
+ "INSERT INTO ins_table (id, v1, v2) VALUES (2, 'row v2', 'row v6')",
+ "INSERT INTO ins_table (id, v1, v2) VALUES (3, 'row v3', 'row v7')",
+ "INSERT INTO ins_table (id, v1, v2) VALUES (4, 'row v4', 'row v8')"
+ )
+
+def test_bulk_insert_as_sql_pg():
+ context = _test_bulk_insert('postgresql', True)
+ context.assert_(
+ "INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')",
+ "INSERT INTO ins_table (id, v1, v2) VALUES (2, 'row v2', 'row v6')",
+ "INSERT INTO ins_table (id, v1, v2) VALUES (3, 'row v3', 'row v7')",
+ "INSERT INTO ins_table (id, v1, v2) VALUES (4, 'row v4', 'row v8')"
+ )
+
+def test_bulk_insert_as_sql_mssql():
+ context = _test_bulk_insert('mssql', True)
+ # SQL server requires IDENTITY_INSERT
+ # TODO: figure out if this is safe to enable for a table that
+ # doesn't have an IDENTITY column
+ context.assert_(
+ 'SET IDENTITY_INSERT ins_table ON',
+ "INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')",
+ "INSERT INTO ins_table (id, v1, v2) VALUES (2, 'row v2', 'row v6')",
+ "INSERT INTO ins_table (id, v1, v2) VALUES (3, 'row v3', 'row v7')",
+ "INSERT INTO ins_table (id, v1, v2) VALUES (4, 'row v4', 'row v8')",
+ 'SET IDENTITY_INSERT ins_table OFF'
+ )
from tests import _op_fixture
from alembic import op
from sqlalchemy import Integer, Column, ForeignKey, \
- UniqueConstraint, Table, MetaData
+ UniqueConstraint, Table, MetaData, String
+from sqlalchemy.sql import table
def test_add_column():
context = _op_fixture()
"FOREIGN KEY(foo_bar) REFERENCES foo (bar))"
)
+