from os import path
-__version__ = '0.5.0'
+__version__ = '0.6.0'
package_dir = path.abspath(path.dirname(__file__))
name = '%s.%s' % (schema, tname) if schema else tname
metadata_cols_by_name = dict((c.name, c) for c in metadata_table.c)
conn_col_names = set(conn_table)
- metadata_col_names = set(metadata_cols_by_name)
+ metadata_col_names = OrderedSet(sorted(metadata_cols_by_name))
for cname in metadata_col_names.difference(conn_col_names):
diffs.append(
import sys
+if sys.version_info < (2, 6):
+ raise NotImplementedError("Python 2.6 or greater is required.")
py3k = sys.version_info >= (3, 0)
-py3kwarning = getattr(sys, 'py3kwarning', False) or py3k
-py26 = sys.version_info >= (2, 6)
-jython = sys.platform.startswith('java')
-win32 = sys.platform.startswith('win')
-pypy = hasattr(sys, 'pypy_version_info')
if py3k:
import builtins as compat_builtins
string_types = str,
binary_type = bytes
text_type = str
+ def callable(fn):
+ return hasattr(fn, '__call__')
else:
import __builtin__ as compat_builtins
string_types = basestring,
binary_type = str
text_type = unicode
-
-if py3kwarning:
- def callable(fn):
- return hasattr(fn, '__call__')
-else:
callable = callable
+try:
+ import configparser
+except ImportError:
+ import ConfigParser as configparser
+
+try:
+ exec_ = getattr(compat_builtins, 'exec')
+except AttributeError:
+ # Python 2
+ def exec_(func_text, globals_, lcl):
+ exec('exec func_text in globals_, lcl')
+
################################################
# cross-compatible metaclass implementation
# Copyright (c) 2010-2012 Benjamin Peterson
from argparse import ArgumentParser
-try:
- import configparser
-except ImportError:
- import ConfigParser as configparser
+from .compat import configparser
import inspect
import os
import sys
-try:
- import builtins
-except ImportError:
- import __builtin__ as builtins
-
from sqlalchemy.sql.expression import _BindParamClause
from sqlalchemy.ext.compiler import compiles
from sqlalchemy import schema, text
from sqlalchemy import types as sqltypes
-from ..compat import callable, string_types, text_type, with_metaclass
+from ..compat import string_types, text_type, with_metaclass
from .. import util
from . import base
return _impls[dialect.name]
def static_output(self, text):
- text_ = text_type(text + '\n\n')
- self.output_buffer.write(text_)
- if callable(getattr(self.output_buffer, 'flush', None)):
- self.output_buffer.flush()
+ self.output_buffer.write(text_type(text + "\n\n"))
+ self.output_buffer.flush()
@property
def bind(self):
self._migrations_fn = opts.get('fn')
self.as_sql = as_sql
self.output_buffer = opts.get("output_buffer", sys.stdout)
- if (opts.get('output_encoding') and
- not isinstance(self.output_buffer, io.TextIOBase)):
+
+ if opts.get('output_encoding'):
self.output_buffer = io.TextIOWrapper(
self.output_buffer,
opts['output_encoding']
-from __future__ import with_statement
-
import datetime
import os
import re
-from __future__ import with_statement
-
-try:
- import builtins
-except ImportError:
- import __builtin__ as builtins
import sys
import os
import textwrap
from sqlalchemy.engine import url
from sqlalchemy import __version__
-from .compat import callable
+from .compat import callable, exec_
class CommandError(Exception):
pass
'doc': fn.__doc__,
})
lcl = {}
- try:
- exec_ = getattr(builtins, 'exec')
- except AttributeError:
- # Python 2
- def exec_(func_text, globals_, lcl):
- exec('exec func_text in globals_, lcl')
exec_(func_text, globals_, lcl)
return lcl[name]
module_id = re.sub(r'\W', "_", filename)
path = os.path.join(dir_, filename)
- with open(path, 'r') as f:
+ with open(path, 'rb') as f:
module = imp.load_source(module_id, path, f)
del sys.modules[module_id]
return module
-from __future__ import with_statement
-
-try:
- import builtins
-except ImportError:
- import __builtin__ as builtins
-try:
- import configparser
-except ImportError:
- import ConfigParser as configparser
+
import io
import os
import re
import shutil
import textwrap
+from alembic.compat import configparser
from nose import SkipTest
from sqlalchemy.engine import default
except ImportError as er1:
raise SkipTest("Can't import DBAPI: %s" % er1)
try:
- conn = eng.connect()
+ eng.connect()
except SQLAlchemyError as er2:
raise SkipTest("Can't connect to database: %s" % er2)
_engs[name] = eng
def capture_context_buffer(**kw):
if kw.pop('bytes_io', False):
- raw = io.BytesIO()
- encoding = kw.get('output_encoding', 'utf-8')
- buf = io.TextIOWrapper(raw, encoding)
+ buf = io.BytesIO()
else:
- raw = buf = io.StringIO()
+ buf = io.StringIO()
class capture(object):
def __enter__(self):
EnvironmentContext._default_opts = {
- 'dialect_name':"sqlite",
- 'output_buffer':buf
+ 'dialect_name': "sqlite",
+ 'output_buffer': buf
}
EnvironmentContext._default_opts.update(kw)
- return raw
+ return buf
def __exit__(self, *arg, **kwarg):
- #print(buf.getvalue())
EnvironmentContext._default_opts = None
return capture()
shutil.rmtree(staging_directory, True)
-def write_script(scriptdir, rev_id, content):
+def write_script(scriptdir, rev_id, content, encoding='ascii'):
old = scriptdir._revision_map[rev_id]
path = old.path
- if not hasattr(builtins, 'unicode') and isinstance(content, bytes):
- content = content.decode()
- with open(path, 'w') as fp:
- fp.write(textwrap.dedent(content))
+
+ content = textwrap.dedent(content)
+ if encoding:
+ content = content.encode(encoding)
+ with open(path, 'wb') as fp:
+ fp.write(content)
pyc_path = util.pyc_file_from_path(path)
if os.access(pyc_path, os.F_OK):
os.unlink(pyc_path)
"""Test op functions against MSSQL."""
-from __future__ import with_statement
from unittest import TestCase
assert_raises_message(
TypeError,
- "Unknown arguments: badarg2, badarg1",
+ r"Unknown arguments: badarg\d, badarg\d",
op.alter_column, "t", "c", badarg1="x", badarg2="y"
)
"""Test op functions against ORACLE."""
-from __future__ import with_statement
from unittest import TestCase
-from __future__ import with_statement
-
from unittest import TestCase
from sqlalchemy import DateTime, MetaData, Table, Column, text, Integer, String
def downgrade():
op.execute("drôle de petite voix m’a réveillé")
-""" % a).encode('utf-8'))
+""" % a), encoding='utf-8')
def tearDown(self):
clear_staging_env()
-from __future__ import with_statement
-
import os
import unittest