-from .api import compare_metadata, _produce_migration_diffs, _produce_net_changes
+from .api import compare_metadata, _produce_migration_diffs, \
+ _produce_net_changes
template_args['imports'] = "\n".join(sorted(imports))
-def _get_object_filters(context_opts, include_symbol=None, include_object=None):
+def _get_object_filters(
+ context_opts, include_symbol=None, include_object=None):
include_symbol = context_opts.get('include_symbol', include_symbol)
include_object = context_opts.get('include_object', include_object)
inspector, metadata, diffs, autogen_context)
-###################################################
-# element comparison
-
-
-###################################################
-# render python
-
-
-###################################################
-# produce command structure
-
def _produce_upgrade_commands(diffs, autogen_context):
buf = []
for diff in diffs:
-from sqlalchemy.exc import NoSuchTableError
from sqlalchemy import schema as sa_schema, types as sqltypes
import logging
from .. import compat
for s, tname in metadata_table_names.difference(conn_table_names):
name = '%s.%s' % (s, tname) if s else tname
metadata_table = tname_to_table[(s, tname)]
- if _run_filters(metadata_table, tname, "table", False, None, object_filters):
+ if _run_filters(
+ metadata_table, tname, "table", False, None, object_filters):
diffs.append(("add_table", metadata_table))
log.info("Detected added table %r", name)
_compare_indexes_and_uniques(s, tname, object_filters,
metadata_table = tname_to_table[(s, tname)]
conn_table = existing_metadata.tables[name]
- if _run_filters(metadata_table, tname, "table", False, conn_table, object_filters):
+ if _run_filters(
+ metadata_table, tname, "table", False,
+ conn_table, object_filters):
_compare_columns(s, tname, object_filters,
conn_table,
metadata_table,
metadata_col = metadata_cols_by_name[colname]
conn_col = conn_table.c[colname]
if not _run_filters(
- metadata_col, colname, "column", False, conn_col, object_filters):
+ metadata_col, colname, "column", False,
+ conn_col, object_filters):
continue
col_diff = []
_compare_type(schema, tname, colname,
def _compare_indexes_and_uniques(schema, tname, object_filters, conn_table,
- metadata_table, diffs, autogen_context, inspector):
+ metadata_table, diffs,
+ autogen_context, inspector):
is_create_table = conn_table is None
# 1a. get raw indexes and unique constraints from metadata ...
- metadata_unique_constraints = set(uq for uq in metadata_table.constraints
- if isinstance(uq, sa_schema.UniqueConstraint)
- )
+ metadata_unique_constraints = set(
+ uq for uq in metadata_table.constraints
+ if isinstance(uq, sa_schema.UniqueConstraint)
+ )
metadata_indexes = set(metadata_table.indexes)
conn_uniques = conn_indexes = frozenset()
# can't accurately report on
autogen_context['context'].impl.\
correct_for_autogen_constraints(
- conn_uniques, conn_indexes,
- metadata_unique_constraints,
- metadata_indexes
- )
+ conn_uniques, conn_indexes,
+ metadata_unique_constraints,
+ metadata_indexes
+ )
# 4. organize the constraints into "signature" collections, the
# _constraint_sig() objects provide a consistent facade over both
metadata_indexes = set(_ix_constraint_sig(ix) for ix in metadata_indexes)
- conn_unique_constraints = set(_uq_constraint_sig(uq) for uq in conn_uniques)
+ conn_unique_constraints = set(
+ _uq_constraint_sig(uq) for uq in conn_uniques)
conn_indexes = set(_ix_constraint_sig(ix) for ix in conn_indexes)
doubled_constraints = dict(
(name, (conn_uniques_by_name[name], conn_indexes_by_name[name]))
- for name in set(conn_uniques_by_name).intersection(conn_indexes_by_name)
+ for name in set(
+ conn_uniques_by_name).intersection(conn_indexes_by_name)
)
# 6. index things by "column signature", to help with unnamed unique
(uq.sig, uq) for uq in metadata_unique_constraints)
metadata_indexes_by_sig = dict(
(ix.sig, ix) for ix in metadata_indexes)
- unnamed_metadata_uniques = dict((uq.sig, uq) for uq in
- metadata_unique_constraints if uq.name is None)
+ unnamed_metadata_uniques = dict(
+ (uq.sig, uq) for uq in
+ metadata_unique_constraints if uq.name is None)
# assumptions:
# 1. a unique constraint or an index from the connection *always*
'table': index.table.name,
'columns': _get_index_column_names(index),
'unique': index.unique or False,
- 'schema': (", schema='%s'" % index.table.schema) if index.table.schema else '',
- 'kwargs': (', ' + ', '.join(
- ["%s=%s" % (key, _render_potential_expr(val, autogen_context))
- for key, val in index.kwargs.items()]))
+ 'schema': (", schema='%s'" % index.table.schema)
+ if index.table.schema else '',
+ 'kwargs': (
+ ', ' +
+ ', '.join(
+ ["%s=%s" %
+ (key, _render_potential_expr(val, autogen_context))
+ for key, val in index.kwargs.items()]))
if len(index.kwargs) else ''
}
return text
if alter and constraint.table.schema:
opts.append(("schema", str(constraint.table.schema)))
if not alter and constraint.name:
- opts.append(("name", _render_gen_name(autogen_context, constraint.name)))
+ opts.append(
+ ("name", _render_gen_name(autogen_context, constraint.name)))
if alter:
args = [repr(_render_gen_name(autogen_context, constraint.name)),
Generate Alembic operations for the ALTER TABLE ... DROP CONSTRAINT
of a :class:`~sqlalchemy.schema.UniqueConstraint` instance.
"""
- text = "%(prefix)sdrop_constraint(%(name)r, '%(table_name)s'%(schema)s)" % {
- 'prefix': _alembic_autogenerate_prefix(autogen_context),
- 'name': _render_gen_name(autogen_context, constraint.name),
- 'table_name': constraint.table.name,
- 'schema': (", schema='%s'" % constraint.table.schema)
- if constraint.table.schema else '',
- }
+ text = "%(prefix)sdrop_constraint"\
+ "(%(name)r, '%(table_name)s'%(schema)s)" % {
+ 'prefix': _alembic_autogenerate_prefix(autogen_context),
+ 'name': _render_gen_name(autogen_context, constraint.name),
+ 'table_name': constraint.table.name,
+ 'schema': (", schema='%s'" % constraint.table.schema)
+ if constraint.table.schema else '',
+ }
return text
autogen_context),
'tname': tname,
'cname': cname}
- text += ",\n%sexisting_type=%s" % (indent,
- _repr_type(existing_type, autogen_context))
+ text += ",\n%sexisting_type=%s" % (
+ indent,
+ _repr_type(existing_type, autogen_context))
if server_default is not False:
rendered = _render_server_default(
server_default, autogen_context)
opts = []
if constraint.name:
- opts.append(("name", repr(_render_gen_name(autogen_context, constraint.name))))
+ opts.append(("name", repr(
+ _render_gen_name(autogen_context, constraint.name))))
return "%(prefix)sPrimaryKeyConstraint(%(args)s)" % {
"prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
"args": ", ".join(
opts = []
if constraint.name:
- opts.append(("name", repr(_render_gen_name(autogen_context, constraint.name))))
+ opts.append(("name", repr(
+ _render_gen_name(autogen_context, constraint.name))))
if constraint.onupdate:
opts.append(("onupdate", repr(constraint.onupdate)))
if constraint.ondelete:
return "%(prefix)sForeignKeyConstraint([%(cols)s], "\
"[%(refcols)s], %(args)s)" % {
"prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "cols": ", ".join("'%s'" % f.parent.key for f in constraint.elements),
+ "cols": ", ".join(
+ "'%s'" % f.parent.key for f in constraint.elements),
"refcols": ", ".join(repr(_fk_colspec(f, apply_metadata_schema))
for f in constraint.elements),
"args": ", ".join(
return None
opts = []
if constraint.name:
- opts.append(("name", repr(_render_gen_name(autogen_context, constraint.name))))
+ opts.append(
+ (
+ "name",
+ repr(_render_gen_name(autogen_context, constraint.name))
+ )
+ )
return "%(prefix)sCheckConstraint(%(sqltext)r%(opts)s)" % {
"prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
"opts": ", " + (", ".join("%s=%s" % (k, v)
raise util.CommandError("Range revision not allowed")
starting_rev, revision = revision.split(':', 2)
elif sql:
- raise util.CommandError("downgrade with --sql requires <fromrev>:<torev>")
+ raise util.CommandError(
+ "downgrade with --sql requires <fromrev>:<torev>")
def downgrade(rev, context):
return script._downgrade_revs(revision, rev)
from importlib import machinery
def load_module_py(module_id, path):
- return machinery.SourceFileLoader(module_id, path).load_module(module_id)
+ return machinery.SourceFileLoader(
+ module_id, path).load_module(module_id)
def load_module_pyc(module_id, path):
- return machinery.SourcelessFileLoader(module_id, path).load_module(module_id)
+ return machinery.SourcelessFileLoader(
+ module_id, path).load_module(module_id)
else:
import imp
type=str,
help="Setup template for use with 'init'")
if 'message' in kwargs:
- parser.add_argument("-m", "--message",
- type=str,
- help="Message string to use with 'revision'")
+ parser.add_argument(
+ "-m", "--message",
+ type=str,
+ help="Message string to use with 'revision'")
if 'sql' in kwargs:
- parser.add_argument("--sql",
- action="store_true",
- help="Don't emit SQL to database - dump to "
- "standard output/file instead")
+ parser.add_argument(
+ "--sql",
+ action="store_true",
+ help="Don't emit SQL to database - dump to "
+ "standard output/file instead")
if 'tag' in kwargs:
- parser.add_argument("--tag",
- type=str,
- help="Arbitrary 'tag' name - can be used by "
- "custom env.py scripts.")
+ parser.add_argument(
+ "--tag",
+ type=str,
+ help="Arbitrary 'tag' name - can be used by "
+ "custom env.py scripts.")
if 'autogenerate' in kwargs:
- parser.add_argument("--autogenerate",
- action="store_true",
- help="Populate revision script with candidate "
- "migration operations, based on comparison "
- "of database to model.")
+ parser.add_argument(
+ "--autogenerate",
+ action="store_true",
+ help="Populate revision script with candidate "
+ "migration operations, based on comparison "
+ "of database to model.")
# "current" command
if 'head_only' in kwargs:
- parser.add_argument("--head-only",
- action="store_true",
- help="Only show current version and "
- "whether or not this is the head revision.")
+ parser.add_argument(
+ "--head-only",
+ action="store_true",
+ help="Only show current version and "
+ "whether or not this is the head revision.")
if 'rev_range' in kwargs:
parser.add_argument("-r", "--rev-range",
existing_autoincrement=None
):
if autoincrement is not None or existing_autoincrement is not None:
- util.warn("nautoincrement and existing_autoincrement only make sense for MySQL")
+ util.warn(
+ "autoincrement and existing_autoincrement "
+ "only make sense for MySQL")
if nullable is not None:
- self._exec(base.ColumnNullable(table_name, column_name,
- nullable, schema=schema,
- existing_type=existing_type,
- existing_server_default=existing_server_default,
- existing_nullable=existing_nullable,
- ))
+ self._exec(base.ColumnNullable(
+ table_name, column_name,
+ nullable, schema=schema,
+ existing_type=existing_type,
+ existing_server_default=existing_server_default,
+ existing_nullable=existing_nullable,
+ ))
if server_default is not False:
self._exec(base.ColumnDefault(
table_name, column_name, server_default,
is the same length as the .expressions collection. Ultimately
SQLAlchemy should support text() expressions in indexes.
- See https://bitbucket.org/zzzeek/sqlalchemy/issue/3174/support-text-sent-to-indexes
+ See https://bitbucket.org/zzzeek/sqlalchemy/issue/3174/\
+ support-text-sent-to-indexes
"""
__visit_name__ = '_textual_idx_element'
@compiles(_ExecDropConstraint, 'mssql')
def _exec_drop_col_constraint(element, compiler, **kw):
tname, colname, type_ = element.tname, element.colname, element.type_
- # from http://www.mssqltips.com/sqlservertip/1425/working-with-default-constraints-in-sql-server/
+ # from http://www.mssqltips.com/sqlservertip/1425/\
+ # working-with-default-constraints-in-sql-server/
# TODO: needs table formatting, etc.
return """declare @const_name varchar(256)
select @const_name = [name] from %(type)s
import re
-from sqlalchemy import types as sqltypes
from .. import compat
from .base import compiles, alter_table, format_table_name, RenameTable
from .impl import DefaultImpl
from .impl import DefaultImpl
import re
-#from sqlalchemy.ext.compiler import compiles
-#from .base import AddColumn, alter_table
-#from sqlalchemy.schema import AddConstraint
-
class SQLiteImpl(DefaultImpl):
__dialect__ = 'sqlite'
rendered_metadata_default,
rendered_inspector_default):
- rendered_metadata_default = re.sub(r"^'|'$", "", rendered_metadata_default)
+ rendered_metadata_default = re.sub(
+ r"^'|'$", "", rendered_metadata_default)
return rendered_inspector_default != repr(rendered_metadata_default)
- def correct_for_autogen_constraints(self, conn_unique_constraints, conn_indexes,
- metadata_unique_constraints,
- metadata_indexes):
+ def correct_for_autogen_constraints(
+ self, conn_unique_constraints, conn_indexes,
+ metadata_unique_constraints,
+ metadata_indexes):
def uq_sig(uq):
return tuple(sorted(uq.columns.keys()))
# they will come up as removed. if the backend supports this now,
# add a version check here for the dialect.
if idx.name is None:
- conn_uniques.remove(idx)
+ conn_unique_constraints.remove(idx)
-#@compiles(AddColumn, 'sqlite')
+# @compiles(AddColumn, 'sqlite')
# def visit_add_column(element, compiler, **kw):
# return "%s %s" % (
# alter_table(compiler, element.table_name, element.schema),
For example, to support passing a database URL on the command line,
the standard ``env.py`` script can be modified like this::
- cmd_line_url = context.get_x_argument(as_dictionary=True).get('dbname')
+ cmd_line_url = context.get_x_argument(
+ as_dictionary=True).get('dbname')
if cmd_line_url:
engine = create_engine(cmd_line_url)
else:
)
- ``inspected_column`` is a :class:`sqlalchemy.schema.Column` as returned by
- :meth:`sqlalchemy.engine.reflection.Inspector.reflecttable`, whereas
- ``metadata_column`` is a :class:`sqlalchemy.schema.Column` from
- the local model environment.
+ ``inspected_column`` is a :class:`sqlalchemy.schema.Column` as
+ returned by
+ :meth:`sqlalchemy.engine.reflection.Inspector.reflecttable`,
+ whereas ``metadata_column`` is a
+ :class:`sqlalchemy.schema.Column` from the local model
+ environment.
A return value of ``None`` indicates to allow default type
comparison to proceed.
The function accepts the following positional arguments:
- * ``object``: a :class:`~sqlalchemy.schema.SchemaItem` object such as a
- :class:`~sqlalchemy.schema.Table` or :class:`~sqlalchemy.schema.Column`
- object
+ * ``object``: a :class:`~sqlalchemy.schema.SchemaItem` object such
+ as a :class:`~sqlalchemy.schema.Table` or
+ :class:`~sqlalchemy.schema.Column` object
* ``name``: the name of the object. This is typically available
via ``object.name``.
* ``type``: a string describing the type of object; currently
:paramref:`.EnvironmentContext.configure.include_schemas`
:param include_symbol: A callable function which, given a table name
- and schema name (may be ``None``), returns ``True`` or ``False``, indicating
- if the given table should be considered in the autogenerate sweep.
+ and schema name (may be ``None``), returns ``True`` or ``False``,
+ indicating if the given table should be considered in the
+ autogenerate sweep.
- .. deprecated:: 0.6.0 :paramref:`.EnvironmentContext.configure.include_symbol`
+ .. deprecated:: 0.6.0
+ :paramref:`.EnvironmentContext.configure.include_symbol`
is superceded by the more generic
:paramref:`.EnvironmentContext.configure.include_object`
parameter.
-import io
import logging
import sys
from contextlib import contextmanager
)
def run_migrations(self, **kw):
- """Run the migration scripts established for this :class:`.MigrationContext`,
- if any.
+ """Run the migration scripts established for this
+ :class:`.MigrationContext`, if any.
The commands in :mod:`alembic.command` will set up a function
that is ultimately passed to the :class:`.MigrationContext`
if self.as_sql and not current_rev:
self._version.create(self.connection)
if doc:
- log.info("Running %s %s -> %s, %s", change.__name__, prev_rev,
- rev, doc)
+ log.info(
+ "Running %s %s -> %s, %s", change.__name__, prev_rev,
+ rev, doc)
else:
- log.info("Running %s %s -> %s", change.__name__, prev_rev, rev)
+ log.info(
+ "Running %s %s -> %s", change.__name__, prev_rev, rev)
if self.as_sql:
self.impl.static_output(
"-- Running %s %s -> %s" %
in :ref:`sqlexpression_toplevel` as well as
for usage with the :meth:`sqlalchemy.schema.Table.create`
and :meth:`sqlalchemy.schema.MetaData.create_all` methods
- of :class:`~sqlalchemy.schema.Table`, :class:`~sqlalchemy.schema.MetaData`.
+ of :class:`~sqlalchemy.schema.Table`,
+ :class:`~sqlalchemy.schema.MetaData`.
Note that when "standard output" mode is enabled,
this bind will be a "mock" connection handler that cannot
t1_cols = local_cols + remote_cols
else:
t1_cols = local_cols
- sa_schema.Table(referent, m,
- *[sa_schema.Column(n, NULLTYPE) for n in remote_cols],
- schema=referent_schema)
+ sa_schema.Table(
+ referent, m,
+ *[sa_schema.Column(n, NULLTYPE) for n in remote_cols],
+ schema=referent_schema)
- t1 = sa_schema.Table(source, m,
- *[sa_schema.Column(n, NULLTYPE) for n in t1_cols],
- schema=source_schema)
+ t1 = sa_schema.Table(
+ source, m,
+ *[sa_schema.Column(n, NULLTYPE) for n in t1_cols],
+ schema=source_schema)
tname = "%s.%s" % (referent_schema, referent) if referent_schema \
else referent
return f
def _unique_constraint(self, name, source, local_cols, schema=None, **kw):
- t = sa_schema.Table(source, self._metadata(),
- *[sa_schema.Column(n, NULLTYPE) for n in local_cols],
- schema=schema)
+ t = sa_schema.Table(
+ source, self._metadata(),
+ *[sa_schema.Column(n, NULLTYPE) for n in local_cols],
+ schema=schema)
kw['name'] = name
uq = sa_schema.UniqueConstraint(*[t.c[n] for n in local_cols], **kw)
# TODO: need event tests to ensure the event
)
def _count_constraint(constraint):
- return not isinstance(constraint, sa_schema.PrimaryKeyConstraint) and \
+ return not isinstance(
+ constraint,
+ sa_schema.PrimaryKeyConstraint) and \
(not constraint._create_rule or
constraint._create_rule(compiler))
op.add_column('t', 'x', Boolean(name=op.f('ck_bool_t_x')))
- Above, the CHECK constraint generated will have the name ``ck_bool_t_x``
- regardless of whether or not a naming convention is in use.
+ Above, the CHECK constraint generated will have the name
+ ``ck_bool_t_x`` regardless of whether or not a naming convention is
+ in use.
Alternatively, if a naming convention is in use, and 'f' is not used,
names will be converted along conventions. If the ``target_metadata``
This internally generates a :class:`~sqlalchemy.schema.Table` object
containing the necessary columns, then generates a new
:class:`~sqlalchemy.schema.PrimaryKeyConstraint`
- object which it then associates with the :class:`~sqlalchemy.schema.Table`.
+ object which it then associates with the
+ :class:`~sqlalchemy.schema.Table`.
Any event listeners associated with this action will be fired
off normally. The :class:`~sqlalchemy.schema.AddConstraint`
construct is ultimately used to generate the ALTER statement.
:param name: Name of the primary key constraint. The name is necessary
so that an ALTER statement can be emitted. For setups that
use an automated naming scheme such as that described at
- `NamingConventions <http://www.sqlalchemy.org/trac/wiki/UsageRecipes/NamingConventions>`_,
+ `NamingConventions <http://www.sqlalchemy.org/trac/wiki/UsageRecipes/\
+ NamingConventions>`_,
``name`` here can be ``None``, as the event listener will
apply the name to the constraint object when it is associated
with the table.
This internally generates a :class:`~sqlalchemy.schema.Table` object
containing the necessary columns, then generates a new
:class:`~sqlalchemy.schema.ForeignKeyConstraint`
- object which it then associates with the :class:`~sqlalchemy.schema.Table`.
+ object which it then associates with the
+ :class:`~sqlalchemy.schema.Table`.
Any event listeners associated with this action will be fired
off normally. The :class:`~sqlalchemy.schema.AddConstraint`
construct is ultimately used to generate the ALTER statement.
:param name: Name of the foreign key constraint. The name is necessary
so that an ALTER statement can be emitted. For setups that
use an automated naming scheme such as that described at
- `NamingConventions <http://www.sqlalchemy.org/trac/wiki/UsageRecipes/NamingConventions>`_,
+ `NamingConventions <http://www.sqlalchemy.org/trac/wiki/UsageRecipes/\
+ NamingConventions>`_,
``name`` here can be ``None``, as the event listener will
apply the name to the constraint object when it is associated
with the table.
self._foreign_key_constraint(name, source, referent,
local_cols, remote_cols,
onupdate=onupdate, ondelete=ondelete,
- deferrable=deferrable, source_schema=source_schema,
+ deferrable=deferrable,
+ source_schema=source_schema,
referent_schema=referent_schema,
- initially=initially, match=match, **dialect_kw)
+ initially=initially, match=match,
+ **dialect_kw)
)
def create_unique_constraint(self, name, source, local_cols,
This internally generates a :class:`~sqlalchemy.schema.Table` object
containing the necessary columns, then generates a new
:class:`~sqlalchemy.schema.UniqueConstraint`
- object which it then associates with the :class:`~sqlalchemy.schema.Table`.
+ object which it then associates with the
+ :class:`~sqlalchemy.schema.Table`.
Any event listeners associated with this action will be fired
off normally. The :class:`~sqlalchemy.schema.AddConstraint`
construct is ultimately used to generate the ALTER statement.
:param name: Name of the unique constraint. The name is necessary
so that an ALTER statement can be emitted. For setups that
use an automated naming scheme such as that described at
- `NamingConventions <http://www.sqlalchemy.org/trac/wiki/UsageRecipes/NamingConventions>`_,
+ `NamingConventions <http://www.sqlalchemy.org/trac/wiki/UsageRecipes/\
+ NamingConventions>`_,
``name`` here can be ``None``, as the event listener will
apply the name to the constraint object when it is associated
with the table.
supported.
:param local_cols: a list of string column names in the
source table.
- :param deferrable: optional bool. If set, emit DEFERRABLE or NOT DEFERRABLE when
- issuing DDL for this constraint.
- :param initially: optional string. If set, emit INITIALLY <value> when issuing DDL
- for this constraint.
+ :param deferrable: optional bool. If set, emit DEFERRABLE or
+ NOT DEFERRABLE when issuing DDL for this constraint.
+ :param initially: optional string. If set, emit INITIALLY <value>
+ when issuing DDL for this constraint.
:param schema: Optional schema name to operate within.
.. versionadded:: 0.4.0
:param name: Name of the check constraint. The name is necessary
so that an ALTER statement can be emitted. For setups that
use an automated naming scheme such as that described at
- `NamingConventions <http://www.sqlalchemy.org/trac/wiki/UsageRecipes/NamingConventions>`_,
+ `NamingConventions <http://www.sqlalchemy.org/trac/wiki/UsageRecipes/\
+ NamingConventions>`_,
``name`` here can be ``None``, as the event listener will
apply the name to the constraint object when it is associated
with the table.
:param source: String name of the source table.
- :param condition: SQL expression that's the condition of the constraint.
- Can be a string or SQLAlchemy expression language structure.
- :param deferrable: optional bool. If set, emit DEFERRABLE or NOT DEFERRABLE when
- issuing DDL for this constraint.
- :param initially: optional string. If set, emit INITIALLY <value> when issuing DDL
- for this constraint.
+ :param condition: SQL expression that's the condition of the
+ constraint. Can be a string or SQLAlchemy expression language
+ structure.
+ :param deferrable: optional bool. If set, emit DEFERRABLE or
+ NOT DEFERRABLE when issuing DDL for this constraint.
+ :param initially: optional string. If set, emit INITIALLY <value>
+ when issuing DDL for this constraint.
:param schema: Optional schema name to operate within.
..versionadded:: 0.4.0
"""
self.impl.add_constraint(
- self._check_constraint(name, source, condition, schema=schema, **kw)
+ self._check_constraint(
+ name, source, condition, schema=schema, **kw)
)
def create_table(self, name, *columns, **kw):
- """Issue a "create table" instruction using the current migration context.
+ """Issue a "create table" instruction using the current migration
+ context.
This directive receives an argument list similar to that of the
traditional :class:`sqlalchemy.schema.Table` construct, but without the
Column('timestamp', TIMESTAMP, server_default=func.now())
)
- Note that :meth:`.create_table` accepts :class:`~sqlalchemy.schema.Column`
+ Note that :meth:`.create_table` accepts
+ :class:`~sqlalchemy.schema.Column`
constructs directly from the SQLAlchemy library. In particular,
default values to be created on the database side are
specified using the ``server_default`` parameter, and not
]
)
- When using --sql mode, some datatypes may not render inline automatically,
- such as dates and other special types. When this issue is present,
- :meth:`.Operations.inline_literal` may be used::
+ When using --sql mode, some datatypes may not render inline
+ automatically, such as dates and other special types. When this
+ issue is present, :meth:`.Operations.inline_literal` may be used::
op.bulk_insert(accounts_table,
[
:param multiinsert: when at its default of True and --sql mode is not
enabled, the INSERT statement will be executed using
- "executemany()" style, where all elements in the list of dictionaries
- are passed as bound parameters in a single list. Setting this
- to False results in individual INSERT statements being emitted
- per parameter set, and is needed in those cases where non-literal
- values are present in the parameter sets.
+ "executemany()" style, where all elements in the list of
+ dictionaries are passed as bound parameters in a single
+ list. Setting this to False results in individual INSERT
+ statements being emitted per parameter set, and is needed
+ in those cases where non-literal values are present in the
+ parameter sets.
.. versionadded:: 0.6.4
Note above we also used the SQLAlchemy
:func:`sqlalchemy.sql.expression.table`
- and :func:`sqlalchemy.sql.expression.column` constructs to make a brief,
- ad-hoc table construct just for our UPDATE statement. A full
- :class:`~sqlalchemy.schema.Table` construct of course works perfectly
- fine as well, though note it's a recommended practice to at least ensure
- the definition of a table is self-contained within the migration script,
- rather than imported from a module that may break compatibility with
+ and :func:`sqlalchemy.sql.expression.column` constructs to
+ make a brief, ad-hoc table construct just for our UPDATE
+ statement. A full :class:`~sqlalchemy.schema.Table` construct
+ of course works perfectly fine as well, though note it's a
+ recommended practice to at least ensure the definition of a
+ table is self-contained within the migration script, rather
+ than imported from a module that may break compatibility with
older migrations.
:param sql: Any legal SQLAlchemy expression, including:
execution options, will be passed to
:meth:`sqlalchemy.engine.Connection.execution_options`.
"""
- self.migration_context.impl.execute(sql,
- execution_options=execution_options)
+ self.migration_context.impl.execute(
+ sql,
+ execution_options=execution_options)
def get_bind(self):
"""Return the current 'bind'.
revs = list(self._iterate_revisions("head", lower))
revs = revs[-relative:]
if len(revs) != abs(relative):
- raise util.CommandError("Relative revision %s didn't "
- "produce %d migrations" % (upper, abs(relative)))
+ raise util.CommandError(
+ "Relative revision %s didn't "
+ "produce %d migrations" % (upper, abs(relative)))
return iter(revs)
elif lower is not None and _relative_destination.match(lower):
relative = int(lower)
revs = list(self._iterate_revisions(upper, "base"))
revs = revs[0:-relative]
if len(revs) != abs(relative):
- raise util.CommandError("Relative revision %s didn't "
- "produce %d migrations" % (lower, abs(relative)))
+ raise util.CommandError(
+ "Relative revision %s didn't "
+ "produce %d migrations" % (lower, abs(relative)))
return iter(revs)
else:
return self._iterate_revisions(upper, lower)
"""
current_heads = self.get_heads()
if len(current_heads) > 1:
- raise util.CommandError('Only a single head is supported. The '
- 'script directory has multiple heads (due to branching), which '
- 'must be resolved by manually editing the revision files to '
- 'form a linear sequence. Run `alembic branches` to see the '
- 'divergence(s).')
+ raise util.CommandError(
+ 'Only a single head is supported. The '
+ 'script directory has multiple heads (due to branching), '
+ 'which must be resolved by manually editing the revision '
+ 'files to form a linear sequence. Run `alembic branches` to '
+ 'see the divergence(s).')
if current_heads:
return current_heads[0]
from sqlalchemy.engine import url
from sqlalchemy import __version__
-from .compat import callable, exec_, load_module_py, load_module_pyc, binary_type
+from .compat import callable, exec_, load_module_py, load_module_pyc, \
+ binary_type
class CommandError(Exception):
return int(value)
except:
return value
-_vers = tuple([_safe_int(x) for x in re.findall(r'(\d+|[abc]\d)', __version__)])
+_vers = tuple(
+ [_safe_int(x) for x in re.findall(r'(\d+|[abc]\d)', __version__)])
sqla_07 = _vers > (0, 7, 2)
sqla_08 = _vers >= (0, 8, 0, 'b2')
sqla_09 = _vers >= (0, 9, 0)
def coerce_resource_to_filename(fname):
- """Interpret a filename as either a filesystem location or as a package resource.
+ """Interpret a filename as either a filesystem location or as a package
+ resource.
Names that are non absolute paths and contain a colon
are interpreted as resources and coerced to a file location.
module = load_module_py(module_id, path)
elif os.path.exists(simple_pyc_file_from_path(path)):
# look for sourceless load
- module = load_module_pyc(module_id, simple_pyc_file_from_path(path))
+ module = load_module_pyc(
+ module_id, simple_pyc_file_from_path(path))
else:
raise ImportError("Can't find Python file %s" % path)
elif ext in (".pyc", ".pyo"):
[flake8]
show-source = True
-ignore = E711,E712,E721,F841,F811
+ignore = E711,E712,E721,F841,F811,F401
exclude=.venv,.git,.tox,dist,doc,*egg,build