to grow and mature it should become a more thorough and efficient system
of testing new dialects.
+As of SQLAlchemy 0.9.4, both nose and pytest are supported for running tests,
+and pytest is now preferred.
+
Dialect Layout
===============
<dbapi>.py
requirements.py
test/
+ conftest.py
__init__.py
test_suite.py
test_<dialect_specific_test>.py
create_engine("access+pyodbc://user:pw@dsn")
-* setup.cfg - this file contains the traditional contents such as [egg_info]
- and [nosetests] directives, but also contains new directives that are used
+* setup.cfg - this file contains the traditional contents such as [egg_info],
+ [pytest] and [nosetests] directives, but also contains new directives that are used
by SQLAlchemy's testing framework. E.g. for Access::
[egg_info]
tag_build = dev
+ [pytest]
+ addopts= --tb native -v -r fxX
+ python_files=test/*test_*.py
+
[nosetests]
with-sqla_testing = true
where = test
sqlite=sqlite:///:memory:
Above, the ``[sqla_testing]`` section contains configuration used by
- SQLAlchemy's test plugin.The ``[nosetests]`` section includes the
- directive ``with-sql_testing = true``, which indicates to Nose that
- the SQLAlchemy nose plugin should be used.
-
-* run_tests.py - The plugin is provided with SQLAlchemy, however is not
- plugged into Nose automatically; instead, a ``run_tests.py`` script
- should be composed as a front end to Nose, such that SQLAlchemy's plugin
- will be correctly installed.
-
- run_tests.py has two parts. One optional, but probably helpful, step
- is that it installs your third party dialect into SQLAlchemy without
- using the setuptools entrypoint system; this allows your dialect to
- be present without any explicit setup.py step needed. The other
- step is to import SQLAlchemy's nose runner and invoke it. An
- example run_tests.py file looks like the following::
+ SQLAlchemy's test plugin. The ``[pytest]`` and ``[nosetests]`` sections
+ include directives to help with these runners; in the case of
+ Nose, the directive ``with-sql_testing = true``, which indicates to Nose that
+ the SQLAlchemy nose plugin should be used. In the case of pytest, the
+ test/conftest.py file will bootstrap SQLAlchemy's plugin.
+
+* test/conftest.py - This script bootstraps SQLAlchemy's pytest plugin
+ into the pytest runner. This
+ script can also be used to install your third party dialect into
+ SQLAlchemy without using the setuptools entrypoint system; this allows
+ your dialect to be present without any explicit setup.py step needed.
+ The other portion invokes SQLAlchemy's pytest plugin::
+
+ from sqlalchemy.dialects import registry
+
+ registry.register("access", "sqlalchemy_access.pyodbc", "AccessDialect_pyodbc")
+ registry.register("access.pyodbc", "sqlalchemy_access.pyodbc", "AccessDialect_pyodbc")
+
+ from sqlalchemy.testing.plugin.pytestplugin import *
+
+ Where above, the ``registry`` module, introduced in SQLAlchemy 0.8, provides
+ an in-Python means of installing the dialect entrypoints without the use
+ of setuptools, using the ``registry.register()`` function in a way that
+ is similar to the ``entry_points`` directive we placed in our ``setup.py``.
+
+* run_tests.py - This script is used when running the tests via Nose.
+ The purpose of the script is to plug in SQLAlchemy's nose plugin into
+ the Nose environment before the tests run.
+
+ The format of this file is similar to that of conftest.py; first,
+ the optional but helpful step of registering your third party plugin,
+ then the other is to import SQLAlchemy's nose runner and invoke it::
from sqlalchemy.dialects import registry
if __name__ == '__main__':
runner.main()
- Where above, the ``registry`` module, introduced in SQLAlchemy 0.8, provides
- an in-Python means of installing the dialect entrypoints without the use
- of setuptools, using the ``registry.register()`` function in a way that
- is similar to the ``entry_points`` directive we placed in our ``setup.py``.
The call to ``runner.main()`` then runs the Nose front end, which installs
SQLAlchemy's testing plugins. Invoking our custom runner looks like the
following::
dialect take place::
cd /path/to/sqlalchemy
- python ./sqla_nose.py -v \
+ py.test -v \
--requirements sqlalchemy_access.requirements:Requirements \
--dburi access+pyodbc://admin@access_test
* test_suite.py - Finally, the ``test_suite.py`` module represents a
- Nose test suite, which pulls in the actual SQLAlchemy test suite.
+ stub test suite, which pulls in the actual SQLAlchemy test suite.
To pull in the suite as a whole, it can be imported in one step::
# test/test_suite.py
SQLALCHEMY UNIT TESTS
=====================
+**NOTE:** SQLAlchemy as of 0.9.4 now standardizes on `pytest <http://pytest.org/>`_
+for test running! However, the existing support for Nose **still remains**!
+That is, you can now run the tests via pytest or nose. We hope to keep the
+suite nose-compatible indefinitely however this might change at some point.
+
SQLAlchemy unit tests by default run using Python's built-in sqlite3
-module. If running on Python 2.4, pysqlite must be installed.
+module. If running on a Python installation that doesn't include this
+module, then pysqlite or compatible must be installed.
+
+Unit tests can be run with pytest or nose:
+
+ py.test: http://pytest.org/
-Unit tests are run using nose. Nose is available at::
+ nose: https://pypi.python.org/pypi/nose/
- https://pypi.python.org/pypi/nose/
+The suite includes enhanced support when running with pytest.
-SQLAlchemy implements a nose plugin that must be present when tests are run.
-This plugin is invoked when the test runner script provided with
-SQLAlchemy is used.
+SQLAlchemy implements plugins for both pytest and nose that must be
+present when tests are run. In the case of pytest, this plugin is automatically
+used when pytest is run against the SQLAlchemy source tree. However,
+for Nose support, a special test runner script must be used.
-The test suite as of version 0.8.2 also requires the mock library. While
+
+The test suite as also requires the mock library. While
mock is part of the Python standard library as of 3.3, previous versions
will need to have it installed, and is available at::
https://pypi.python.org/pypi/mock
-**NOTE:** - the nose plugin is no longer installed by setuptools as of
-version 0.7 ! Use "python setup.py test" or "./sqla_nose.py".
-
RUNNING TESTS VIA SETUP.PY
--------------------------
-A plain vanilla run of all tests using sqlite can be run via setup.py:
+A plain vanilla run of all tests using sqlite can be run via setup.py, and
+requires that pytest is installed::
$ python setup.py test
-The -v flag also works here::
- $ python setup.py test -v
-
-RUNNING ALL TESTS
-------------------
+RUNNING ALL TESTS - PYTEST
+--------------------------
To run all tests::
- $ ./sqla_nose.py
+ $ py.test
+
+The pytest configuration in setup.cfg will point the runner at the
+test/ directory, where it consumes a conftest.py file that gets everything
+else up and running.
-If you're running the tests on Microsoft Windows, then there is an additional
-argument that must be passed to ./sqla_nose.py::
- > ./sqla_nose.py --first-package-wins
+RUNNING ALL TESTS - NOSE
+--------------------------
+
+When using Nose, a bootstrap script is provided which sets up sys.path
+as well as installs the nose plugin::
-This is required because nose's importer will normally evict a package from
-sys.modules if it sees a package with the same name in a different location.
-Setting this argument disables that behavior.
+ $ ./sqla_nose.py
Assuming all tests pass, this is a very unexciting output. To make it more
interesting::
$ ./sqla_nose.py -v
RUNNING INDIVIDUAL TESTS
--------------------------
+---------------------------------
+
Any directory of test modules can be run at once by specifying the directory
-path::
+path, and a specific file can be specified as well::
+
+ $ py.test test/dialect
+
+ $ py.test test/orm/test_mapper.py
+
+When using nose, the setup.cfg currently sets "where" to "test/", so the
+"test/" prefix is omitted::
+
+ $ ./sqla_nose.py dialect/
- $ ./sqla_nose.py test/dialect
+ $ ./sqla_nose.py orm/test_mapper.py
-Any test module can be run directly by specifying its module name::
+With Nose, it is often more intuitive to specify tests as module paths::
$ ./sqla_nose.py test.orm.test_mapper
-To run a specific test within the module, specify it as module:ClassName.methodname::
+Nose can also specify a test class and optional method using this syntax::
$ ./sqla_nose.py test.orm.test_mapper:MapperTest.test_utils
+With pytest, the -k flag is used to limit tests::
+
+ $ py.test test/orm/test_mapper.py -k "MapperTest and test_utils"
+
COMMAND LINE OPTIONS
--------------------
-Help is available via --help::
+
+SQLAlchemy-specific options are added to both runners, which are viewable
+within the help screen. With pytest, these options are easier to locate
+as they are underneath the "sqlalchemy" grouping::
+
+ $ py.test --help
$ ./sqla_nose.py --help
the SQLAlchemy nose plugin adds. The most commonly SQLAlchemy-specific
options used are '--db' and '--dburi'.
+Both pytest and nose support the same set of SQLAlchemy options, though
+pytest features a bit more capability with them.
+
DATABASE TARGETS
----------------
--dburi=postgresql://user:password@localhost/test
+If you'll be running the tests frequently, database aliases can save a lot of
+typing. The --dbs option lists the built-in aliases and their matching URLs::
+
+ $ py.test --dbs
+ Available --db options (use --dburi to override)
+ mysql mysql://scott:tiger@127.0.0.1:3306/test
+ oracle oracle://scott:tiger@127.0.0.1:1521
+ postgresql postgresql://scott:tiger@127.0.0.1:5432/test
+ [...]
+
+To run tests against an aliased database::
+
+ $ py.test --db postgresql
+
+This list of database urls is present in the setup.cfg file. The list
+can be modified/extended by adding a file ``test.cfg`` at the
+top level of the SQLAlchemy source distribution which includes
+additional entries::
+
+ [db]
+ postgresql=postgresql://myuser:mypass@localhost/mydb
+
+Your custom entries will override the defaults and you'll see them reflected
+in the output of --dbs.
+
+MULTIPLE DATABASE TARGETS
+-------------------------
+
+As of SQLAlchemy 0.9.4, the test runner supports **multiple databases at once**.
+This doesn't mean that the entire test suite runs for each database, but
+instead specific test suites may do so, while other tests may choose to
+run on a specific target out of those available. For example, if the tests underneath
+test/dialect/ are run, the majority of these tests are either specific to
+a particular backend, or are marked as "multiple", meaning they will run repeatedly
+for each database in use. If one runs the test suite as follows::
+
+ $ py.test test/dialect --db sqlite --db postgresql --db mysql
+
+The tests underneath test/dialect/test_suite.py will be tripled up, running
+as appropriate for each target database, whereas dialect-specific tests
+within test/dialect/mysql, test/dialect/postgresql/ test/dialect/test_sqlite.py
+should run fully with no skips, as each suite has its target database available.
+
+The multiple targets feature is available both under pytest and nose,
+however when running nose, the "multiple runner" feature won't be available;
+instead, the first database target will be used.
+
+When running with multiple targets, tests that don't prefer a specific target
+will be run against the first target specified. Putting sqlite first in
+the list will lead to a much faster suite as the in-memory database is
+extremely fast for setting up and tearing down tables.
+
+
+
+DATABASE CONFIGURATION
+----------------------
+
Use an empty database and a database user with general DBA privileges.
The test suite will be creating and dropping many tables and other DDL, and
preexisting tables will interfere with the tests.
requires using a test.cfg configuration file as the cmd.exe shell won't
properly pass the URL arguments into the nose test runner.
-If you'll be running the tests frequently, database aliases can save a lot of
-typing. The --dbs option lists the built-in aliases and their matching URLs::
-
- $ ./sqla_nose.py --dbs
- Available --db options (use --dburi to override)
- mysql mysql://scott:tiger@127.0.0.1:3306/test
- oracle oracle://scott:tiger@127.0.0.1:1521
- postgresql postgresql://scott:tiger@127.0.0.1:5432/test
- [...]
-
-To run tests against an aliased database::
-
- $ ./sqla_nose.py --db=postgresql
-
-To customize the URLs with your own users or hostnames, create a file
-called `test.cfg` at the top level of the SQLAlchemy source distribution.
-This file is in Python config format, and contains a [db] section which
-lists out additional database configurations::
-
- [db]
- postgresql=postgresql://myuser:mypass@localhost/mydb
-
-Your custom entries will override the defaults and you'll see them reflected
-in the output of --dbs.
CONFIGURING LOGGING
-------------------
in particular the logic used to wrap "column default" callables
wouldn't work properly for Python built-ins.
+ .. change::
+ :tags: feature, general
+
+ Support has been added for pytest to run tests. This runner
+ is currently being supported in addition to nose, and will likely
+ be preferred to nose going forward. The nose plugin system used
+ by SQLAlchemy has been split out so that it works under pytest as
+ well. There are no plans to drop support for nose at the moment
+ and we hope that the test suite itself can continue to remain as
+ agnostic of testing platform as possible. See the file
+ README.unittests.rst for updated information on running tests
+ with pytest.
+
+ The test plugin system has also been enhanced to support running
+ tests against mutiple database URLs at once, by specifying the ``--db``
+ and/or ``--dburi`` flags multiple times. This does not run the entire test
+ suite for each database, but instead allows test cases that are specific
+ to certain backends make use of that backend as the test is run.
+ When using pytest as the test runner, the system will also run
+ specific test suites multiple times, once for each database, particularly
+ those tests within the "dialect suite". The plan is that the enhanced
+ system will also be used by Alembic, and allow Alembic to run
+ migration operation tests against multiple backends in one run, including
+ third-party backends not included within Alembic itself.
+ Third party dialects and extensions are also encouraged to standardize
+ on SQLAlchemy's test suite as a basis; see the file README.dialects.rst
+ for background on building out from SQLAlchemy's test platform.
+
.. change::
:tags: orm feature
:tickets: 2976
engine = None
-def setup():
+def setup_module():
global engine
engine = create_engine('sqlite://', echo=True)
)
instance = session_identity_map.get(identitykey)
+
if instance is not None:
state = attributes.instance_state(instance)
dict_ = attributes.instance_dict(instance)
self.bind = bind
if reflect:
- util.warn("reflect=True is deprecate; please "
+ util.warn_deprecated("reflect=True is deprecate; please "
"use the reflect() method.")
if not bind:
raise exc.ArgumentError(
from . import config
from .exclusions import db_spec, _is_excluded, fails_if, skip_if, future,\
- fails_on, fails_on_everything_except, skip, only_on, exclude, against,\
- _server_version, only_if
+ fails_on, fails_on_everything_except, skip, only_on, exclude, \
+ against as _against, _server_version, only_if
+
+
+def against(*queries):
+ return _against(config._current, *queries)
from .assertions import emits_warning, emits_warning_on, uses_deprecated, \
eq_, ne_, is_, is_not_, startswith_, assert_raises, \
crashes = skip
-from .config import db, requirements as requires
+from .config import db
+from .config import requirements as requires
from . import mock
\ No newline at end of file
@decorator
def decorate(fn, *args, **kw):
if isinstance(db, util.string_types):
- if not spec(config.db):
+ if not spec(config._current):
return fn(*args, **kw)
else:
wrapped = emits_warning(*warnings)(fn)
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
+import collections
+
requirements = None
db = None
+db_url = None
+db_opts = None
+file_config = None
+
+_current = None
+
+class Config(object):
+ def __init__(self, db, db_opts, options, file_config):
+ self.db = db
+ self.db_opts = db_opts
+ self.options = options
+ self.file_config = file_config
+
+ _stack = collections.deque()
+ _configs = {}
+
+ @classmethod
+ def register(cls, db, db_opts, options, file_config, namespace):
+ """add a config as one of the global configs.
+
+ If there are no configs set up yet, this config also
+ gets set as the "_current".
+ """
+ cfg = Config(db, db_opts, options, file_config)
+
+ global _current
+ if not _current:
+ cls.set_as_current(cfg, namespace)
+ cls._configs[cfg.db.name] = cfg
+ cls._configs[(cfg.db.name, cfg.db.dialect)] = cfg
+ cls._configs[cfg.db] = cfg
+
+ @classmethod
+ def set_as_current(cls, config, namespace):
+ global db, _current, db_url
+ _current = config
+ db_url = config.db.url
+ namespace.db = db = config.db
+
+ @classmethod
+ def push_engine(cls, db, namespace):
+ assert _current, "Can't push without a default Config set up"
+ cls.push(
+ Config(db, _current.db_opts, _current.options, _current.file_config),
+ namespace
+ )
+
+ @classmethod
+ def push(cls, config, namespace):
+ cls._stack.append(_current)
+ cls.set_as_current(config, namespace)
+
+ @classmethod
+ def reset(cls, namespace):
+ if cls._stack:
+ cls.set_as_current(cls._stack[0], namespace)
+ cls._stack.clear()
+
+ @classmethod
+ def all_configs(cls):
+ for cfg in set(cls._configs.values()):
+ yield cfg
+
+ @classmethod
+ def all_dbs(cls):
+ for cfg in cls.all_configs():
+ yield cfg.db
def reconnecting_engine(url=None, options=None):
- url = url or config.db_url
+ url = url or config.db.url
dbapi = config.db.dialect.dbapi
if not options:
options = {}
else:
use_reaper = options.pop('use_reaper', True)
- url = url or config.db_url
+ url = url or config.db.url
if options is None:
options = config.db_opts
return engine
-def utf8_engine(url=None, options=None):
- """Hook for dialects or drivers that don't handle utf8 by default."""
-
- from sqlalchemy.engine import url as engine_url
-
- if config.db.dialect.name == 'mysql' and \
- config.db.driver in ['mysqldb', 'pymysql', 'cymysql']:
- # note 1.2.1.gamma.6 or greater of MySQLdb
- # needed here
- url = url or config.db_url
- url = engine_url.make_url(url)
- url.query['charset'] = 'utf8'
- url.query['use_unicode'] = '0'
- url = str(url)
-
- return testing_engine(url, options)
def mock_engine(dialect_name=None):
from . import config
from .. import util
import contextlib
-
+import inspect
class skip_if(object):
def __init__(self, predicate, reason=None):
_fails_on = None
- @property
- def enabled(self):
- return not self.predicate()
-
def __add__(self, other):
def decorate(fn):
return other(self(fn))
return decorate
+ @property
+ def enabled(self):
+ return self.enabled_for_config(config._current)
+
+ def enabled_for_config(self, config):
+ return not self.predicate(config)
+
@contextlib.contextmanager
def fail_if(self, name='block'):
try:
yield
except Exception as ex:
- if self.predicate():
+ if self.predicate(config._current):
print(("%s failed as expected (%s): %s " % (
name, self.predicate, str(ex))))
else:
raise
else:
- if self.predicate():
+ if self.predicate(config._current):
raise AssertionError(
"Unexpected success for '%s' (%s)" %
(name, self.predicate))
def __call__(self, fn):
@decorator
def decorate(fn, *args, **kw):
- if self.predicate():
+ if self.predicate(config._current):
if self.reason:
msg = "'%s' : %s" % (
fn.__name__,
self.value = value
self.description = description or "boolean %s" % value
- def __call__(self):
+ def __call__(self, config):
return self.value
def _as_string(self, negate=False):
'between': lambda val, pair: val >= pair[0] and val <= pair[1],
}
- def __call__(self, engine=None):
- if engine is None:
- engine = config.db
+ def __call__(self, config):
+ engine = config.db
if "+" in self.db:
dialect, driver = self.db.split('+')
class LambdaPredicate(Predicate):
def __init__(self, lambda_, description=None, args=None, kw=None):
- self.lambda_ = lambda_
+ spec = inspect.getargspec(lambda_)
+ if not spec[0]:
+ self.lambda_ = lambda db: lambda_()
+ else:
+ self.lambda_ = lambda_
self.args = args or ()
self.kw = kw or {}
if description:
else:
self.description = "custom function"
- def __call__(self):
- return self.lambda_(*self.args, **self.kw)
+ def __call__(self, config):
+ return self.lambda_(config)
def _as_string(self, negate=False):
if negate:
def __init__(self, predicate):
self.predicate = predicate
- def __call__(self, *arg, **kw):
- return not self.predicate(*arg, **kw)
+ def __call__(self, config):
+ return not self.predicate(config)
def __str__(self):
return self.predicate._as_string(True)
self.predicates = predicates
self.description = description
- def __call__(self, *arg, **kw):
+ def __call__(self, config):
for pred in self.predicates:
- if pred(*arg, **kw):
+ if pred(config):
self._str = pred
return True
return False
def _is_excluded(db, op, spec):
- return SpecPredicate(db, op, spec)()
+ return SpecPredicate(db, op, spec)(config._current)
def _server_version(engine):
def db_spec(*dbs):
return OrPredicate(
- Predicate.as_predicate(db) for db in dbs
+ [Predicate.as_predicate(db) for db in dbs]
)
@decorator
-def future(fn, *args, **kw):
- return fails_if(LambdaPredicate(fn, *args, **kw), "Future feature")
+def future(fn, *arg):
+ return fails_if(LambdaPredicate(fn), "Future feature")
def fails_on(db, reason=None):
return skip_if(SpecPredicate(db, op, spec), reason)
-def against(*queries):
+def against(config, *queries):
+ assert queries, "no queries sent!"
return OrPredicate([
Predicate.as_predicate(query)
for query in queries
- ])()
+ ])(config)
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
+# whether or not we use unittest changes things dramatically,
+# as far as how py.test collection works.
class TestBase(object):
# A sequence of database names to always run, regardless of the
def assert_(self, val, msg=None):
assert val, msg
+ # apparently a handful of tests are doing this....OK
+ def setup(self):
+ if hasattr(self, "setUp"):
+ self.setUp()
+
+ def teardown(self):
+ if hasattr(self, "tearDown"):
+ self.tearDown()
class TablesTest(TestBase):
"""Enhance nose with extra options and behaviors for running SQLAlchemy tests.
-When running ./sqla_nose.py, this module is imported relative to the
-"plugins" package as a top level package by the sqla_nose.py runner,
-so that the plugin can be loaded with the rest of nose including the coverage
-plugin before any of SQLAlchemy itself is imported, so that coverage works.
-
-When third party libraries use this plugin, it can be imported
-normally as "from sqlalchemy.testing.plugin import noseplugin".
+Must be run via ./sqla_nose.py so that it is imported in the expected
+way (e.g. as a package-less import).
"""
-from __future__ import absolute_import
-
import os
-import sys
-py3k = sys.version_info >= (3, 0)
-
-if py3k:
- import configparser
-else:
- import ConfigParser as configparser
from nose.plugins import Plugin
-from nose import SkipTest
-import sys
-import re
-
-# late imports
fixtures = None
-engines = None
-exclusions = None
-warnings = None
-profiling = None
-assertions = None
-requirements = None
-config = None
-util = None
-file_config = None
-
-
-logging = None
-db = None
-db_label = None
-db_url = None
-db_opts = {}
-options = None
-_existing_engine = None
-
-
-def _log(option, opt_str, value, parser):
- global logging
- if not logging:
- import logging
- logging.basicConfig()
-
- if opt_str.endswith('-info'):
- logging.getLogger(value).setLevel(logging.INFO)
- elif opt_str.endswith('-debug'):
- logging.getLogger(value).setLevel(logging.DEBUG)
-
-
-def _list_dbs(*args):
- print("Available --db options (use --dburi to override)")
- for macro in sorted(file_config.options('db')):
- print("%20s\t%s" % (macro, file_config.get('db', macro)))
- sys.exit(0)
-
-
-def _server_side_cursors(options, opt_str, value, parser):
- db_opts['server_side_cursors'] = True
-
-
-def _engine_strategy(options, opt_str, value, parser):
- if value:
- db_opts['strategy'] = value
-
-pre_configure = []
-post_configure = []
-
-
-def pre(fn):
- pre_configure.append(fn)
- return fn
-
-
-def post(fn):
- post_configure.append(fn)
- return fn
-
-
-@pre
-def _setup_options(opt, file_config):
- global options
- options = opt
-
-
-@pre
-def _monkeypatch_cdecimal(options, file_config):
- if options.cdecimal:
- import cdecimal
- sys.modules['decimal'] = cdecimal
-
-
-@post
-def _engine_uri(options, file_config):
- global db_label, db_url
-
- if options.dburi:
- db_url = options.dburi
- db_label = db_url[:db_url.index(':')]
- elif options.db:
- db_label = options.db
- db_url = None
-
- if db_url is None:
- if db_label not in file_config.options('db'):
- raise RuntimeError(
- "Unknown URI specifier '%s'. Specify --dbs for known uris."
- % db_label)
- db_url = file_config.get('db', db_label)
-
-
-@post
-def _require(options, file_config):
- if not(options.require or
- (file_config.has_section('require') and
- file_config.items('require'))):
- return
-
- try:
- import pkg_resources
- except ImportError:
- raise RuntimeError("setuptools is required for version requirements")
-
- cmdline = []
- for requirement in options.require:
- pkg_resources.require(requirement)
- cmdline.append(re.split('\s*(<!>=)', requirement, 1)[0])
-
- if file_config.has_section('require'):
- for label, requirement in file_config.items('require'):
- if not label == db_label or label.startswith('%s.' % db_label):
- continue
- seen = [c for c in cmdline if requirement.startswith(c)]
- if seen:
- continue
- pkg_resources.require(requirement)
-
-
-@post
-def _engine_pool(options, file_config):
- if options.mockpool:
- from sqlalchemy import pool
- db_opts['poolclass'] = pool.AssertionPool
-
-
-@post
-def _create_testing_engine(options, file_config):
- from sqlalchemy.testing import engines, config
- from sqlalchemy import testing
- global db
- config.db = testing.db = db = engines.testing_engine(db_url, db_opts)
- config.db.connect().close()
- config.db_opts = db_opts
- config.db_url = db_url
-
-
-@post
-def _prep_testing_database(options, file_config):
- from sqlalchemy.testing import engines
- from sqlalchemy import schema, inspect
-
- # also create alt schemas etc. here?
- if options.dropfirst:
- e = engines.utf8_engine()
- inspector = inspect(e)
- try:
- view_names = inspector.get_view_names()
- except NotImplementedError:
- pass
- else:
- for vname in view_names:
- e.execute(schema._DropView(schema.Table(vname, schema.MetaData())))
-
- try:
- view_names = inspector.get_view_names(schema="test_schema")
- except NotImplementedError:
- pass
- else:
- for vname in view_names:
- e.execute(schema._DropView(
- schema.Table(vname,
- schema.MetaData(), schema="test_schema")))
-
- for tname in reversed(inspector.get_table_names(order_by="foreign_key")):
- e.execute(schema.DropTable(schema.Table(tname, schema.MetaData())))
-
- for tname in reversed(inspector.get_table_names(
- order_by="foreign_key", schema="test_schema")):
- e.execute(schema.DropTable(
- schema.Table(tname, schema.MetaData(), schema="test_schema")))
-
- e.dispose()
-
-
-@post
-def _set_table_options(options, file_config):
- from sqlalchemy.testing import schema
-
- table_options = schema.table_options
- for spec in options.tableopts:
- key, value = spec.split('=')
- table_options[key] = value
-
- if options.mysql_engine:
- table_options['mysql_engine'] = options.mysql_engine
-
-
-@post
-def _reverse_topological(options, file_config):
- if options.reversetop:
- from sqlalchemy.orm.util import randomize_unitofwork
- randomize_unitofwork()
-
-
-def _requirements_opt(options, opt_str, value, parser):
- _setup_requirements(value)
-
-@post
-def _requirements(options, file_config):
-
- requirement_cls = file_config.get('sqla_testing', "requirement_cls")
- _setup_requirements(requirement_cls)
-
-def _setup_requirements(argument):
- from sqlalchemy.testing import config
- from sqlalchemy import testing
-
- if config.requirements is not None:
- return
-
- modname, clsname = argument.split(":")
-
- # importlib.import_module() only introduced in 2.7, a little
- # late
- mod = __import__(modname)
- for component in modname.split(".")[1:]:
- mod = getattr(mod, component)
- req_cls = getattr(mod, clsname)
- config.requirements = testing.requires = req_cls(config)
-
-
-@post
-def _post_setup_options(opt, file_config):
- from sqlalchemy.testing import config
- config.options = options
- config.file_config = file_config
-
-
-@post
-def _setup_profiling(options, file_config):
- from sqlalchemy.testing import profiling
- profiling._profile_stats = profiling.ProfileStatsFile(
- file_config.get('sqla_testing', 'profile_file'))
+# no package imports yet! this prevents us from tripping coverage
+# too soon.
+import imp
+path = os.path.join(os.path.dirname(__file__), "plugin_base.py")
+plugin_base = imp.load_source("plugin_base", path)
class NoseSQLAlchemy(Plugin):
- """
- Handles the setup and extra properties required for testing SQLAlchemy
- """
enabled = True
name = 'sqla_testing'
def options(self, parser, env=os.environ):
Plugin.options(self, parser, env)
opt = parser.add_option
- opt("--log-info", action="callback", type="string", callback=_log,
- help="turn on info logging for <LOG> (multiple OK)")
- opt("--log-debug", action="callback", type="string", callback=_log,
- help="turn on debug logging for <LOG> (multiple OK)")
- opt("--require", action="append", dest="require", default=[],
- help="require a particular driver or module version (multiple OK)")
- opt("--db", action="store", dest="db", default="default",
- help="Use prefab database uri")
- opt('--dbs', action='callback', callback=_list_dbs,
- help="List available prefab dbs")
- opt("--dburi", action="store", dest="dburi",
- help="Database uri (overrides --db)")
- opt("--dropfirst", action="store_true", dest="dropfirst",
- help="Drop all tables in the target database first")
- opt("--mockpool", action="store_true", dest="mockpool",
- help="Use mock pool (asserts only one connection used)")
- opt("--low-connections", action="store_true", dest="low_connections",
- help="Use a low number of distinct connections - i.e. for Oracle TNS"
- )
- opt("--enginestrategy", action="callback", type="string",
- callback=_engine_strategy,
- help="Engine strategy (plain or threadlocal, defaults to plain)")
- opt("--reversetop", action="store_true", dest="reversetop", default=False,
- help="Use a random-ordering set implementation in the ORM (helps "
- "reveal dependency issues)")
- opt("--requirements", action="callback", type="string",
- callback=_requirements_opt,
- help="requirements class for testing, overrides setup.cfg")
- opt("--with-cdecimal", action="store_true", dest="cdecimal", default=False,
- help="Monkeypatch the cdecimal library into Python 'decimal' for all tests")
- opt("--unhashable", action="store_true", dest="unhashable", default=False,
- help="Disallow SQLAlchemy from performing a hash() on mapped test objects.")
- opt("--noncomparable", action="store_true", dest="noncomparable", default=False,
- help="Disallow SQLAlchemy from performing == on mapped test objects.")
- opt("--truthless", action="store_true", dest="truthless", default=False,
- help="Disallow SQLAlchemy from truth-evaluating mapped test objects.")
- opt("--serverside", action="callback", callback=_server_side_cursors,
- help="Turn on server side cursors for PG")
- opt("--mysql-engine", action="store", dest="mysql_engine", default=None,
- help="Use the specified MySQL storage engine for all tables, default is "
- "a db-default/InnoDB combo.")
- opt("--table-option", action="append", dest="tableopts", default=[],
- help="Add a dialect-specific table option, key=value")
- opt("--write-profiles", action="store_true", dest="write_profiles", default=False,
- help="Write/update profiling data.")
- global file_config
- file_config = configparser.ConfigParser()
- file_config.read(['setup.cfg', 'test.cfg'])
+
+ def make_option(name, **kw):
+ callback_ = kw.pop("callback", None)
+ if callback_:
+ def wrap_(option, opt_str, value, parser):
+ callback_(opt_str, value, parser)
+ kw["callback"] = wrap_
+ opt(name, **kw)
+
+ plugin_base.setup_options(make_option)
+ plugin_base.read_config()
def configure(self, options, conf):
- Plugin.configure(self, options, conf)
- self.options = options
- for fn in pre_configure:
- fn(self.options, file_config)
+ super(NoseSQLAlchemy, self).configure(options, conf)
+ plugin_base.pre_begin(options)
- def begin(self):
- # Lazy setup of other options (post coverage)
- for fn in post_configure:
- fn(self.options, file_config)
+ plugin_base.set_coverage_flag(options.enable_plugin_coverage)
- # late imports, has to happen after config as well
- # as nose plugins like coverage
- global util, fixtures, engines, exclusions, \
- assertions, warnings, profiling,\
- config
- from sqlalchemy.testing import fixtures, engines, exclusions, \
- assertions, warnings, profiling, config
- from sqlalchemy import util
+ global fixtures
+ from sqlalchemy.testing import fixtures
+
+ def begin(self):
+ plugin_base.post_begin()
def describeTest(self, test):
return ""
return False
def wantClass(self, cls):
- """Return true if you want the main test selector to collect
- tests from this class, false if you don't, and None if you don't
- care.
-
- :Parameters:
- cls : class
- The class being examined by the selector
-
- """
- if not issubclass(cls, fixtures.TestBase):
- return False
- elif cls.__name__.startswith('_'):
- return False
- else:
- return True
-
- def _do_skips(self, cls):
- from sqlalchemy.testing import config
- if hasattr(cls, '__requires__'):
- def test_suite():
- return 'ok'
- test_suite.__name__ = cls.__name__
- for requirement in cls.__requires__:
- check = getattr(config.requirements, requirement)
-
- if not check.enabled:
- raise SkipTest(
- check.reason if check.reason
- else
- (
- "'%s' unsupported on DB implementation '%s' == %s" % (
- cls.__name__, config.db.name,
- config.db.dialect.server_version_info
- )
- )
- )
-
- if cls.__unsupported_on__:
- spec = exclusions.db_spec(*cls.__unsupported_on__)
- if spec(config.db):
- raise SkipTest(
- "'%s' unsupported on DB implementation '%s' == %s" % (
- cls.__name__, config.db.name,
- config.db.dialect.server_version_info)
- )
-
- if getattr(cls, '__only_on__', None):
- spec = exclusions.db_spec(*util.to_list(cls.__only_on__))
- if not spec(config.db):
- raise SkipTest(
- "'%s' unsupported on DB implementation '%s' == %s" % (
- cls.__name__, config.db.name,
- config.db.dialect.server_version_info)
- )
-
- if getattr(cls, '__skip_if__', False):
- for c in getattr(cls, '__skip_if__'):
- if c():
- raise SkipTest("'%s' skipped by %s" % (
- cls.__name__, c.__name__)
- )
-
- for db, op, spec in getattr(cls, '__excluded_on__', ()):
- exclusions.exclude(db, op, spec,
- "'%s' unsupported on DB %s version %s" % (
- cls.__name__, config.db.name,
- exclusions._server_version(config.db)))
+ return plugin_base.want_class(cls)
def beforeTest(self, test):
- warnings.resetwarnings()
- profiling._current_test = test.id()
+ plugin_base.before_test(test, test.id())
def afterTest(self, test):
- engines.testing_reaper._after_test_ctx()
- warnings.resetwarnings()
-
- def _setup_engine(self, ctx):
- if getattr(ctx, '__engine_options__', None):
- global _existing_engine
- _existing_engine = config.db
- config.db = engines.testing_engine(options=ctx.__engine_options__)
-
- def _restore_engine(self, ctx):
- global _existing_engine
- if _existing_engine is not None:
- config.db = _existing_engine
- _existing_engine = None
+ plugin_base.after_test(test)
def startContext(self, ctx):
if not isinstance(ctx, type) \
or not issubclass(ctx, fixtures.TestBase):
return
- self._do_skips(ctx)
- self._setup_engine(ctx)
+ plugin_base.start_test_class(ctx)
def stopContext(self, ctx):
if not isinstance(ctx, type) \
or not issubclass(ctx, fixtures.TestBase):
return
- engines.testing_reaper._stop_test_ctx()
- if not options.low_connections:
- assertions.global_cleanup_assertions()
- self._restore_engine(ctx)
+ plugin_base.stop_test_class(ctx)
--- /dev/null
+# plugin/plugin_base.py
+# Copyright (C) 2005-2014 the SQLAlchemy authors and contributors <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+"""Testing extensions.
+
+this module is designed to work as a testing-framework-agnostic library,
+so that we can continue to support nose and also begin adding new functionality
+via py.test.
+
+"""
+
+from __future__ import absolute_import
+from unittest import SkipTest
+import sys
+import re
+
+py3k = sys.version_info >= (3, 0)
+
+if py3k:
+ import configparser
+else:
+ import ConfigParser as configparser
+
+
+# late imports
+fixtures = None
+engines = None
+exclusions = None
+warnings = None
+profiling = None
+assertions = None
+requirements = None
+config = None
+testing = None
+util = None
+file_config = None
+
+
+logging = None
+db_opts = {}
+options = None
+
+def setup_options(make_option):
+ make_option("--log-info", action="callback", type="string", callback=_log,
+ help="turn on info logging for <LOG> (multiple OK)")
+ make_option("--log-debug", action="callback", type="string", callback=_log,
+ help="turn on debug logging for <LOG> (multiple OK)")
+ make_option("--db", action="append", type="string", dest="db",
+ help="Use prefab database uri. Multiple OK, "
+ "first one is run by default.")
+ make_option('--dbs', action='callback', callback=_list_dbs,
+ help="List available prefab dbs")
+ make_option("--dburi", action="append", type="string", dest="dburi",
+ help="Database uri. Multiple OK, first one is run by default.")
+ make_option("--dropfirst", action="store_true", dest="dropfirst",
+ help="Drop all tables in the target database first")
+ make_option("--mockpool", action="store_true", dest="mockpool",
+ help="Use mock pool (asserts only one connection used)")
+ make_option("--low-connections", action="store_true", dest="low_connections",
+ help="Use a low number of distinct connections - i.e. for Oracle TNS"
+ )
+ make_option("--reversetop", action="store_true", dest="reversetop", default=False,
+ help="Use a random-ordering set implementation in the ORM (helps "
+ "reveal dependency issues)")
+ make_option("--requirements", action="callback", type="string",
+ callback=_requirements_opt,
+ help="requirements class for testing, overrides setup.cfg")
+ make_option("--with-cdecimal", action="store_true", dest="cdecimal", default=False,
+ help="Monkeypatch the cdecimal library into Python 'decimal' for all tests")
+ make_option("--serverside", action="callback", callback=_server_side_cursors,
+ help="Turn on server side cursors for PG")
+ make_option("--mysql-engine", action="store", dest="mysql_engine", default=None,
+ help="Use the specified MySQL storage engine for all tables, default is "
+ "a db-default/InnoDB combo.")
+ make_option("--tableopts", action="append", dest="tableopts", default=[],
+ help="Add a dialect-specific table option, key=value")
+ make_option("--write-profiles", action="store_true", dest="write_profiles", default=False,
+ help="Write/update profiling data.")
+
+def read_config():
+ global file_config
+ file_config = configparser.ConfigParser()
+ file_config.read(['setup.cfg', 'test.cfg'])
+
+def pre_begin(opt):
+ """things to set up early, before coverage might be setup."""
+ global options
+ options = opt
+ for fn in pre_configure:
+ fn(options, file_config)
+
+def set_coverage_flag(value):
+ options.has_coverage = value
+
+def post_begin():
+ """things to set up later, once we know coverage is running."""
+ # Lazy setup of other options (post coverage)
+ for fn in post_configure:
+ fn(options, file_config)
+
+ # late imports, has to happen after config as well
+ # as nose plugins like coverage
+ global util, fixtures, engines, exclusions, \
+ assertions, warnings, profiling,\
+ config, testing
+ from sqlalchemy import testing
+ from sqlalchemy.testing import fixtures, engines, exclusions, \
+ assertions, warnings, profiling, config
+ from sqlalchemy import util
+
+
+def _log(opt_str, value, parser):
+ global logging
+ if not logging:
+ import logging
+ logging.basicConfig()
+
+ if opt_str.endswith('-info'):
+ logging.getLogger(value).setLevel(logging.INFO)
+ elif opt_str.endswith('-debug'):
+ logging.getLogger(value).setLevel(logging.DEBUG)
+
+
+def _list_dbs(*args):
+ print("Available --db options (use --dburi to override)")
+ for macro in sorted(file_config.options('db')):
+ print("%20s\t%s" % (macro, file_config.get('db', macro)))
+ sys.exit(0)
+
+
+def _server_side_cursors(opt_str, value, parser):
+ db_opts['server_side_cursors'] = True
+
+def _requirements_opt(opt_str, value, parser):
+ _setup_requirements(value)
+
+
+pre_configure = []
+post_configure = []
+
+
+def pre(fn):
+ pre_configure.append(fn)
+ return fn
+
+
+def post(fn):
+ post_configure.append(fn)
+ return fn
+
+
+@pre
+def _setup_options(opt, file_config):
+ global options
+ options = opt
+
+
+@pre
+def _monkeypatch_cdecimal(options, file_config):
+ if options.cdecimal:
+ import cdecimal
+ sys.modules['decimal'] = cdecimal
+
+
+@post
+def _engine_uri(options, file_config):
+ from sqlalchemy.testing import engines, config
+ from sqlalchemy import testing
+
+ if options.dburi:
+ db_urls = list(options.dburi)
+ else:
+ db_urls = []
+
+ if options.db:
+ for db_token in options.db:
+ for db in re.split(r'[,\s]+', db_token):
+ if db not in file_config.options('db'):
+ raise RuntimeError(
+ "Unknown URI specifier '%s'. Specify --dbs for known uris."
+ % db)
+ else:
+ db_urls.append(file_config.get('db', db))
+
+ if not db_urls:
+ db_urls.append(file_config.get('db', 'default'))
+
+ for db_url in db_urls:
+ eng = engines.testing_engine(db_url, db_opts)
+ eng.connect().close()
+ config.Config.register(eng, db_opts, options, file_config, testing)
+
+ config.db_opts = db_opts
+
+
+@post
+def _engine_pool(options, file_config):
+ if options.mockpool:
+ from sqlalchemy import pool
+ db_opts['poolclass'] = pool.AssertionPool
+
+@post
+def _requirements(options, file_config):
+
+ requirement_cls = file_config.get('sqla_testing', "requirement_cls")
+ _setup_requirements(requirement_cls)
+
+def _setup_requirements(argument):
+ from sqlalchemy.testing import config
+ from sqlalchemy import testing
+
+ if config.requirements is not None:
+ return
+
+ modname, clsname = argument.split(":")
+
+ # importlib.import_module() only introduced in 2.7, a little
+ # late
+ mod = __import__(modname)
+ for component in modname.split(".")[1:]:
+ mod = getattr(mod, component)
+ req_cls = getattr(mod, clsname)
+
+ config.requirements = testing.requires = req_cls()
+
+@post
+def _prep_testing_database(options, file_config):
+ from sqlalchemy.testing import config
+ from sqlalchemy import schema, inspect
+
+ if options.dropfirst:
+ for cfg in config.Config.all_configs():
+ e = cfg.db
+ inspector = inspect(e)
+ try:
+ view_names = inspector.get_view_names()
+ except NotImplementedError:
+ pass
+ else:
+ for vname in view_names:
+ e.execute(schema._DropView(schema.Table(vname, schema.MetaData())))
+
+ if config.requirements.schemas.enabled_for_config(cfg):
+ try:
+ view_names = inspector.get_view_names(schema="test_schema")
+ except NotImplementedError:
+ pass
+ else:
+ for vname in view_names:
+ e.execute(schema._DropView(
+ schema.Table(vname,
+ schema.MetaData(), schema="test_schema")))
+
+ for tname in reversed(inspector.get_table_names(order_by="foreign_key")):
+ e.execute(schema.DropTable(schema.Table(tname, schema.MetaData())))
+
+ if config.requirements.schemas.enabled_for_config(cfg):
+ for tname in reversed(inspector.get_table_names(
+ order_by="foreign_key", schema="test_schema")):
+ e.execute(schema.DropTable(
+ schema.Table(tname, schema.MetaData(), schema="test_schema")))
+
+
+@post
+def _set_table_options(options, file_config):
+ from sqlalchemy.testing import schema
+
+ table_options = schema.table_options
+ for spec in options.tableopts:
+ key, value = spec.split('=')
+ table_options[key] = value
+
+ if options.mysql_engine:
+ table_options['mysql_engine'] = options.mysql_engine
+
+
+@post
+def _reverse_topological(options, file_config):
+ if options.reversetop:
+ from sqlalchemy.orm.util import randomize_unitofwork
+ randomize_unitofwork()
+
+
+@post
+def _post_setup_options(opt, file_config):
+ from sqlalchemy.testing import config
+ config.options = options
+ config.file_config = file_config
+
+
+@post
+def _setup_profiling(options, file_config):
+ from sqlalchemy.testing import profiling
+ profiling._profile_stats = profiling.ProfileStatsFile(
+ file_config.get('sqla_testing', 'profile_file'))
+
+
+def want_class(cls):
+ if not issubclass(cls, fixtures.TestBase):
+ return False
+ elif cls.__name__.startswith('_'):
+ return False
+ else:
+ return True
+
+def generate_sub_tests(cls, module):
+ if getattr(cls, '__multiple__', False):
+ for cfg in config.Config.all_configs():
+ name = "%s_%s_%s" % (cls.__name__, cfg.db.name, cfg.db.driver)
+ subcls = type(
+ name,
+ (cls, ),
+ {
+ "__only_on__": (cfg.db.name, cfg.db.driver),
+ "__multiple__": False}
+ )
+ setattr(module, name, subcls)
+ yield subcls
+ else:
+ yield cls
+
+
+def start_test_class(cls):
+ _do_skips(cls)
+ _setup_engine(cls)
+
+def stop_test_class(cls):
+ engines.testing_reaper._stop_test_ctx()
+ if not options.low_connections:
+ assertions.global_cleanup_assertions()
+ _restore_engine()
+
+def _restore_engine():
+ config._current.reset(testing)
+
+def _setup_engine(cls):
+ if getattr(cls, '__engine_options__', None):
+ eng = engines.testing_engine(options=cls.__engine_options__)
+ config._current.push_engine(eng, testing)
+
+def before_test(test, id_):
+ warnings.resetwarnings()
+ profiling._current_test = id_
+
+def after_test(test):
+ engines.testing_reaper._after_test_ctx()
+ warnings.resetwarnings()
+
+def _do_skips(cls):
+ all_configs = set(config.Config.all_configs())
+ reasons = []
+
+ if hasattr(cls, '__requires__'):
+ requirements = config.requirements
+ for config_obj in list(all_configs):
+ for requirement in cls.__requires__:
+ check = getattr(requirements, requirement)
+
+ if check.predicate(config_obj):
+ all_configs.remove(config_obj)
+ if check.reason:
+ reasons.append(check.reason)
+ break
+
+ if cls.__unsupported_on__:
+ spec = exclusions.db_spec(*cls.__unsupported_on__)
+ for config_obj in list(all_configs):
+ if spec(config_obj):
+ all_configs.remove(config_obj)
+
+ if getattr(cls, '__only_on__', None):
+ spec = exclusions.db_spec(*util.to_list(cls.__only_on__))
+ for config_obj in list(all_configs):
+ if not spec(config_obj):
+ all_configs.remove(config_obj)
+
+
+ if getattr(cls, '__skip_if__', False):
+ for c in getattr(cls, '__skip_if__'):
+ if c():
+ raise SkipTest("'%s' skipped by %s" % (
+ cls.__name__, c.__name__)
+ )
+
+ for db_spec, op, spec in getattr(cls, '__excluded_on__', ()):
+ for config_obj in list(all_configs):
+ if exclusions.skip_if(
+ exclusions.SpecPredicate(db_spec, op, spec)
+ ).predicate(config_obj):
+ all_configs.remove(config_obj)
+
+
+ if not all_configs:
+ raise SkipTest(
+ "'%s' unsupported on DB implementation %s%s" % (
+ cls.__name__,
+ ", ".join("'%s' = %s" % (
+ config_obj.db.name,
+ config_obj.db.dialect.server_version_info)
+ for config_obj in config.Config.all_configs()
+ ),
+ ", ".join(reasons)
+ )
+ )
+ elif hasattr(cls, '__prefer__'):
+ non_preferred = set()
+ spec = exclusions.db_spec(*util.to_list(cls.__prefer__))
+ for config_obj in all_configs:
+ if not spec(config_obj):
+ non_preferred.add(config_obj)
+ if all_configs.difference(non_preferred):
+ all_configs.difference_update(non_preferred)
+
+ if config._current not in all_configs:
+ _setup_config(all_configs.pop(), cls)
+
+def _setup_config(config_obj, ctx):
+ config._current.push(config_obj, testing)
+
--- /dev/null
+import pytest
+import argparse
+import inspect
+from . import plugin_base
+import collections
+
+def pytest_addoption(parser):
+ group = parser.getgroup("sqlalchemy")
+
+ def make_option(name, **kw):
+ callback_ = kw.pop("callback", None)
+ if callback_:
+ class CallableAction(argparse.Action):
+ def __call__(self, parser, namespace, values, option_string=None):
+ callback_(option_string, values, parser)
+ kw["action"] = CallableAction
+
+ group.addoption(name, **kw)
+
+ plugin_base.setup_options(make_option)
+ plugin_base.read_config()
+
+def pytest_configure(config):
+ plugin_base.pre_begin(config.option)
+
+ plugin_base.set_coverage_flag(bool(getattr(config.option, "cov_source", False)))
+
+ plugin_base.post_begin()
+
+
+def pytest_collection_modifyitems(session, config, items):
+ # look for all those classes that specify __multiple__ and
+ # expand them out into per-database test cases.
+
+ # this is much easier to do within pytest_pycollect_makeitem, however
+ # pytest is iterating through cls.__dict__ as makeitem is
+ # called which causes a "dictionary changed size" error on py3k.
+ # I'd submit a pullreq for them to turn it into a list first, but
+ # it's to suit the rather odd use case here which is that we are adding
+ # new classes to a module on the fly.
+
+ rebuilt_items = collections.defaultdict(list)
+ test_classes = set(item.parent for item in items)
+ for test_class in test_classes:
+ for sub_cls in plugin_base.generate_sub_tests(test_class.cls, test_class.parent.module):
+ if sub_cls is not test_class.cls:
+ list_ = rebuilt_items[test_class.cls]
+
+ for inst in pytest.Class(sub_cls.__name__,
+ parent=test_class.parent.parent).collect():
+ list_.extend(inst.collect())
+
+ newitems = []
+ for item in items:
+ if item.parent.cls in rebuilt_items:
+ newitems.extend(rebuilt_items[item.parent.cls])
+ rebuilt_items[item.parent.cls][:] = []
+ else:
+ newitems.append(item)
+
+ # seems like the functions attached to a test class aren't sorted already?
+ # is that true and why's that? (when using unittest, they're sorted)
+ items[:] = sorted(newitems, key=lambda item: (
+ item.parent.parent.parent.name,
+ item.parent.parent.name,
+ item.name
+ )
+ )
+
+
+
+def pytest_pycollect_makeitem(collector, name, obj):
+
+ if inspect.isclass(obj) and plugin_base.want_class(obj):
+ return pytest.Class(name, parent=collector)
+ elif inspect.isfunction(obj) and \
+ name.startswith("test_") and \
+ isinstance(collector, pytest.Instance):
+ return pytest.Function(name, parent=collector)
+ else:
+ return []
+
+_current_class = None
+
+def pytest_runtest_setup(item):
+ # here we seem to get called only based on what we collected
+ # in pytest_collection_modifyitems. So to do class-based stuff
+ # we have to tear that out.
+ global _current_class
+
+ if not isinstance(item, pytest.Function):
+ return
+
+ # ... so we're doing a little dance here to figure it out...
+ if item.parent.parent is not _current_class:
+
+ class_setup(item.parent.parent)
+ _current_class = item.parent.parent
+
+ # this is needed for the class-level, to ensure that the
+ # teardown runs after the class is completed with its own
+ # class-level teardown...
+ item.parent.parent.addfinalizer(lambda: class_teardown(item.parent.parent))
+
+ test_setup(item)
+
+def pytest_runtest_teardown(item):
+ # ...but this works better as the hook here rather than
+ # using a finalizer, as the finalizer seems to get in the way
+ # of the test reporting failures correctly (you get a bunch of
+ # py.test assertion stuff instead)
+ test_teardown(item)
+
+def test_setup(item):
+ id_ = "%s.%s:%s" % (item.parent.module.__name__, item.parent.name, item.name)
+ plugin_base.before_test(item, id_)
+
+def test_teardown(item):
+ plugin_base.after_test(item)
+
+def class_setup(item):
+ plugin_base.start_test_class(item.cls)
+
+def class_teardown(item):
+ plugin_base.stop_test_class(item.cls)
"""
-from . import exclusions, config
+from . import exclusions
class Requirements(object):
- def __init__(self, config):
- self.config = config
-
- @property
- def db(self):
- return config.db
+ pass
class SuiteRequirements(Requirements):
INSERT DEFAULT VALUES or equivalent."""
return exclusions.only_if(
- lambda: self.config.db.dialect.supports_empty_insert or \
- self.config.db.dialect.supports_default_values,
+ lambda config: config.db.dialect.supports_empty_insert or \
+ config.db.dialect.supports_default_values,
"empty inserts not supported"
)
"""target platform supports RETURNING."""
return exclusions.only_if(
- lambda: self.config.db.dialect.implicit_returning,
+ lambda config: config.db.dialect.implicit_returning,
"'returning' not supported by database"
)
UPPERCASE as case insensitive names."""
return exclusions.skip_if(
- lambda: not self.db.dialect.requires_name_normalize,
+ lambda config: not config.db.dialect.requires_name_normalize,
"Backend does not require denormalized names."
)
INSERT statement."""
return exclusions.skip_if(
- lambda: not self.db.dialect.supports_multivalues_insert,
+ lambda config: not config.db.dialect.supports_multivalues_insert,
"Backend does not support multirow inserts."
)
"""Target database must support SEQUENCEs."""
return exclusions.only_if([
- lambda: self.config.db.dialect.supports_sequences
+ lambda config: config.db.dialect.supports_sequences
], "no sequence support")
@property
as a means of generating new PK values."""
return exclusions.only_if([
- lambda: self.config.db.dialect.supports_sequences and \
- self.config.db.dialect.sequences_optional
+ lambda config: config.db.dialect.supports_sequences and \
+ config.db.dialect.sequences_optional
], "no sequence support, or sequences not optional")
"""Catchall for a large variety of MySQL on Windows failures"""
return exclusions.open()
- def _has_mysql_on_windows(self):
+ def _has_mysql_on_windows(self, config):
return False
- def _has_mysql_fully_case_sensitive(self):
+ def _has_mysql_fully_case_sensitive(self, config):
return False
kw.update(table_options)
- if exclusions.against('mysql'):
+ if exclusions.against(config._current, 'mysql'):
if 'mysql_engine' not in kw and 'mysql_type' not in kw:
if 'test_needs_fk' in test_opts or 'test_needs_acid' in test_opts:
kw['mysql_engine'] = 'InnoDB'
# Apply some default cascading rules for self-referential foreign keys.
# MySQL InnoDB has some issues around seleting self-refs too.
- if exclusions.against('firebird'):
+ if exclusions.against(config._current, 'firebird'):
table_name = args[0]
unpack = (config.db.dialect.
identifier_preparer.unformat_identifiers)
test_opts = dict([(k, kw.pop(k)) for k in list(kw)
if k.startswith('test_')])
- if not config.requirements.foreign_key_ddl.enabled:
+ if config.requirements.foreign_key_ddl.predicate(config):
args = [arg for arg in args if not isinstance(arg, schema.ForeignKey)]
col = schema.Column(*args, **kw)
# hardcoded rule for firebird, oracle; this should
# be moved out
- if exclusions.against('firebird', 'oracle'):
+ if exclusions.against(config._current, 'firebird', 'oracle'):
def add_seq(c, tbl):
c._init_items(
schema.Sequence(_truncate_name(
class TableDDLTest(fixtures.TestBase):
+ __multiple__ = True
def _simple_fixture(self):
return Table('test_table', self.metadata,
class LastrowidTest(fixtures.TablesTest):
run_deletes = 'each'
+ __multiple__ = True
+
__requires__ = 'implements_get_lastrowid', 'autoincrement_insert'
__engine_options__ = {"implicit_returning": False}
class InsertBehaviorTest(fixtures.TablesTest):
run_deletes = 'each'
+ __multiple__ = True
@classmethod
def define_tables(cls, metadata):
)
class ReturningTest(fixtures.TablesTest):
- run_deletes = 'each'
+ run_create_tables = 'each'
__requires__ = 'returning', 'autoincrement_insert'
+ __multiple__ = True
__engine_options__ = {"implicit_returning": True}
class HasTableTest(fixtures.TablesTest):
+ __multiple__ = True
+
@classmethod
def define_tables(cls, metadata):
Table('test_table', metadata,
+
class ComponentReflectionTest(fixtures.TablesTest):
run_inserts = run_deletes = None
+ __multiple__ = True
+
@classmethod
def define_tables(cls, metadata):
cls.define_reflected_tables(metadata, None)
class RowFetchTest(fixtures.TablesTest):
+ __multiple__ = True
@classmethod
def define_tables(cls, metadata):
setting.
"""
+ __multiple__ = True
+
@classmethod
def define_tables(cls, metadata):
Table("some_table", metadata,
class SequenceTest(fixtures.TablesTest):
__requires__ = ('sequences',)
+ __multiple__ = True
run_create_tables = 'each'
class HasSequenceTest(fixtures.TestBase):
__requires__ = 'sequences',
+ __multiple__ = True
def test_has_sequence(self):
s1 = Sequence('user_id_seq')
class UnicodeVarcharTest(_UnicodeFixture, fixtures.TablesTest):
__requires__ = 'unicode_data',
+ __multiple__ = True
datatype = Unicode(255)
class UnicodeTextTest(_UnicodeFixture, fixtures.TablesTest):
__requires__ = 'unicode_data', 'text_type'
+ __multiple__ = True
datatype = UnicodeText()
self._test_empty_strings()
class TextTest(_LiteralRoundTripFixture, fixtures.TablesTest):
+ __multiple__ = True
+
@classmethod
def define_tables(cls, metadata):
Table('text_table', metadata,
self._literal_round_trip(Text, [data], [data])
class StringTest(_LiteralRoundTripFixture, fixtures.TestBase):
+ __multiple__ = True
+
@requirements.unbounded_varchar
def test_nolength_string(self):
metadata = MetaData()
class DateTimeTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'datetime',
+ __multiple__ = True
datatype = DateTime
data = datetime.datetime(2012, 10, 15, 12, 57, 18)
class DateTimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'datetime_microseconds',
+ __multiple__ = True
datatype = DateTime
data = datetime.datetime(2012, 10, 15, 12, 57, 18, 396)
class TimeTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'time',
+ __multiple__ = True
datatype = Time
data = datetime.time(12, 57, 18)
class TimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'time_microseconds',
+ __multiple__ = True
datatype = Time
data = datetime.time(12, 57, 18, 396)
class DateTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'date',
+ __multiple__ = True
datatype = Date
data = datetime.date(2012, 10, 15)
class DateTimeCoercedToDateTimeTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'date',
+ __multiple__ = True
datatype = Date
data = datetime.datetime(2012, 10, 15, 12, 57, 18)
compare = datetime.date(2012, 10, 15)
class DateTimeHistoricTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'datetime_historic',
+ __multiple__ = True
datatype = DateTime
data = datetime.datetime(1850, 11, 10, 11, 52, 35)
class DateHistoricTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'date_historic',
+ __multiple__ = True
datatype = Date
data = datetime.date(1727, 4, 1)
class IntegerTest(_LiteralRoundTripFixture, fixtures.TestBase):
+ __multiple__ = True
def test_literal(self):
self._literal_round_trip(Integer, [5], [5])
class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase):
+ __multiple__ = True
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
@testing.provide_metadata
class SimpleUpdateDeleteTest(fixtures.TablesTest):
run_deletes = 'each'
+ __multiple__ = True
@classmethod
def define_tables(cls, metadata):
{targ_name: target, fn_name: fn},
fn.__name__)
decorated.__defaults__ = getattr(fn, 'im_func', fn).__defaults__
+ decorated.__wrapped__ = fn
return update_wrapper(decorated, fn)
return update_wrapper(decorate, target)
first-package-wins = true
where = test
+[pytest]
+addopts= --tb native -v -r fxX
+python_files=test/*test_*.py
+
[upload]
sign = 1
identity = C4DAFEE1
sqlite_file=sqlite:///querytest.db
postgresql=postgresql://scott:tiger@127.0.0.1:5432/test
postgres=postgresql://scott:tiger@127.0.0.1:5432/test
-pg8000=postgresql+pg8000://scott:tiger@127.0.0.1:5432/test
postgresql_jython=postgresql+zxjdbc://scott:tiger@127.0.0.1:5432/test
-mysql_jython=mysql+zxjdbc://scott:tiger@127.0.0.1:5432/test
mysql=mysql://scott:tiger@127.0.0.1:3306/test
mssql=mssql+pyodbc://scott:tiger@ms_2005
oursql=mysql+oursql://scott:tiger@127.0.0.1:3306/test
pymysql=mysql+pymysql://scott:tiger@127.0.0.1:3306/test?charset=utf8
oracle=oracle://scott:tiger@127.0.0.1:1521
oracle8=oracle://scott:tiger@127.0.0.1:1521/?use_ansi=0
-maxdb=maxdb://MONA:RED@/maxdb1
firebird=firebird://sysdba:masterkey@localhost//Users/classic/foo.fdb
package_dir={'': 'lib'},
license="MIT License",
cmdclass=cmdclass,
- tests_require=['nose >= 0.11', 'mock'],
- test_suite="sqla_nose",
+ tests_require=['pytest >= 2.5.2', 'mock'],
+ test_suite="pytest.main",
long_description=readme,
classifiers=[
"Development Status :: 5 - Production/Stable",
import sys
import imp
import nose
+import warnings
+warnings.warn(
+ "SQLAlchemy now includes py.test support which going forward should be "
+ "preferred to nose. Please see README.unittests.rst for updated "
+ "testing information!")
from os import path
for pth in ['./lib']:
--- /dev/null
+#!/usr/bin/env python
+"""
+pytest plugin script.
+
+This script is an extension to py.test which
+installs SQLAlchemy's testing plugin into the local environment.
+
+"""
+import sys
+import imp
+
+from os import path
+for pth in ['../lib']:
+ sys.path.insert(0, path.join(path.dirname(path.abspath(__file__)), pth))
+
+from sqlalchemy.testing.plugin.pytestplugin import *
gen(True, ['high_priority', sql.text('sql_cache')]),
'SELECT high_priority sql_cache DISTINCT q')
- @testing.uses_deprecated
- def test_deprecated_distinct(self):
- dialect = self.__dialect__
-
- self.assert_compile(
- select(['q'], distinct='ALL'),
- 'SELECT ALL q',
- )
-
- self.assert_compile(
- select(['q'], distinct='distinctROW'),
- 'SELECT DISTINCTROW q',
- )
-
- self.assert_compile(
- select(['q'], distinct='ALL',
- prefixes=['HIGH_PRIORITY', 'SQL_SMALL_RESULT']),
- 'SELECT HIGH_PRIORITY SQL_SMALL_RESULT ALL q'
- )
def test_backslash_escaping(self):
self.assert_compile(
from sqlalchemy.dialects.mysql import base as mysql
from sqlalchemy.testing import fixtures, AssertsCompiledSQL, AssertsExecutionResults
from sqlalchemy import testing
-from sqlalchemy.testing.engines import utf8_engine
import datetime
import decimal
found = set([frozenset(row[0]) for row in rows])
eq_(found, set([frozenset(['5']), frozenset(['5', '7'])]))
+ @testing.provide_metadata
def test_unicode_enum(self):
- unicode_engine = utf8_engine()
- metadata = MetaData(unicode_engine)
+ metadata = self.metadata
t1 = Table('table', metadata,
Column('id', Integer, primary_key=True),
Column('value', Enum(u('réveillé'), u('drôle'), u('S’il'))),
Column('value2', mysql.ENUM(u('réveillé'), u('drôle'), u('S’il')))
)
metadata.create_all()
- try:
- t1.insert().execute(value=u('drôle'), value2=u('drôle'))
- t1.insert().execute(value=u('réveillé'), value2=u('réveillé'))
- t1.insert().execute(value=u('S’il'), value2=u('S’il'))
- eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
- [(1, u('drôle'), u('drôle')), (2, u('réveillé'), u('réveillé')),
- (3, u('S’il'), u('S’il'))]
- )
+ t1.insert().execute(value=u('drôle'), value2=u('drôle'))
+ t1.insert().execute(value=u('réveillé'), value2=u('réveillé'))
+ t1.insert().execute(value=u('S’il'), value2=u('S’il'))
+ eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
+ [(1, u('drôle'), u('drôle')), (2, u('réveillé'), u('réveillé')),
+ (3, u('S’il'), u('S’il'))]
+ )
- # test reflection of the enum labels
+ # test reflection of the enum labels
- m2 = MetaData(testing.db)
- t2 = Table('table', m2, autoload=True)
+ m2 = MetaData(testing.db)
+ t2 = Table('table', m2, autoload=True)
- # TODO: what's wrong with the last element ? is there
- # latin-1 stuff forcing its way in ?
+ # TODO: what's wrong with the last element ? is there
+ # latin-1 stuff forcing its way in ?
- assert t2.c.value.type.enums[0:2] == \
- (u('réveillé'), u('drôle')) # u'S’il') # eh ?
+ assert t2.c.value.type.enums[0:2] == \
+ (u('réveillé'), u('drôle')) # u'S’il') # eh ?
- assert t2.c.value2.type.enums[0:2] == \
- (u('réveillé'), u('drôle')) # u'S’il') # eh ?
- finally:
- metadata.drop_all()
+ assert t2.c.value2.type.enums[0:2] == \
+ (u('réveillé'), u('drôle')) # u'S’il') # eh ?
def test_enum_compile(self):
e1 = Enum('x', 'y', 'z', name='somename')
from sqlalchemy.util import u
class SequenceTest(fixtures.TestBase, AssertsCompiledSQL):
+ __prefer__ = 'postgresql'
def test_format(self):
seq = Sequence('my_seq_no_schema')
class TimePrecisionTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = postgresql.dialect()
+ __prefer__ = 'postgresql'
def test_compile(self):
for type_, expected in [
from sqlalchemy import testing
from sqlalchemy.testing import engines
from sqlalchemy.testing import AssertsCompiledSQL, eq_
-from nose import SkipTest
from sqlalchemy.testing import fixtures
strings = ' '.join(str(x) for x in pg_mock.mock)
assert 'my_test_constraint' in strings
+ @testing.requires.sqlite
def test_ddl_execute(self):
- try:
- engine = create_engine('sqlite:///')
- except ImportError:
- raise SkipTest('Requires sqlite')
+ engine = create_engine('sqlite:///')
cx = engine.connect()
table = self.users
ddl = DDL('SELECT 1')
e = engines.testing_engine(options={"poolclass": AssertionPool})
fn(e)
- @testing.uses_deprecated
+ @testing.uses_deprecated()
def test_reflect_uses_bind_constructor_conn(self):
self._test_reflect_uses_bind(lambda e: MetaData(e.connect(),
reflect=True))
- @testing.uses_deprecated
+ @testing.uses_deprecated()
def test_reflect_uses_bind_constructor_engine(self):
self._test_reflect_uses_bind(lambda e: MetaData(e, reflect=True))
class UnicodeReflectionTest(fixtures.TestBase):
@classmethod
def setup_class(cls):
- # trigger mysql _server_casing check...
- testing.db.connect().close()
-
- cls.bind = bind = engines.utf8_engine(
- options={'convert_unicode': True})
-
cls.metadata = metadata = MetaData()
no_multibyte_period = set([
names = no_multibyte_period
# mysql can't handle casing usually
elif testing.against("mysql") and \
- not testing.requires._has_mysql_fully_case_sensitive():
+ not testing.requires.mysql_fully_case_sensitive.enabled:
names = no_multibyte_period.union(no_case_sensitivity)
# mssql + pyodbc + freetds can't compare multibyte names to
# information_schema.tables.table_name
)
schema.Index(ixname, t.c[cname])
- metadata.create_all(bind)
+ metadata.create_all(testing.db)
cls.names = names
@classmethod
def teardown_class(cls):
- cls.metadata.drop_all(cls.bind, checkfirst=False)
- cls.bind.dispose()
+ cls.metadata.drop_all(testing.db, checkfirst=False)
@testing.requires.unicode_connections
def test_has_table(self):
for tname, cname, ixname in self.names:
- assert self.bind.has_table(tname), "Can't detect name %s" % tname
+ assert testing.db.has_table(tname), "Can't detect name %s" % tname
@testing.requires.unicode_connections
def test_basic(self):
# (others?) expect non-unicode strings in result sets/bind
# params
- bind = self.bind
+ bind = testing.db
names = set([rec[0] for rec in self.names])
reflected = set(bind.table_names())
@testing.requires.unicode_connections
def test_get_names(self):
- inspector = inspect(self.bind)
+ inspector = inspect(testing.db)
names = dict(
(tname, (cname, ixname)) for tname, cname, ixname in self.names
)
-import datetime, os
+import datetime
+import os
from sqlalchemy import *
from sqlalchemy import event
from sqlalchemy import sql, util
from sqlalchemy.orm import *
from sqlalchemy.ext.horizontal_shard import ShardedSession
from sqlalchemy.sql import operators
-from sqlalchemy import pool
from sqlalchemy.testing import fixtures
-from sqlalchemy import testing
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing import eq_
-from nose import SkipTest
# TODO: ShardTest can be turned into a base for further subclasses
s = Session()
u2 = s.merge(User(name='u2'))
s.commit()
- s.query(User).first()
+ s.query(User).order_by(User.id).first()
eq_(canary, ['load', 'load', 'load'])
def test_inheritance(self):
assert u2.name =='jack'
assert a not in u2.addresses
+ @testing.provide_metadata
@testing.requires.unicode_connections
def test_unicode(self):
"""test that Query.get properly sets up the type for the bind
parameter. using unicode would normally fail on postgresql, mysql and
oracle unless it is converted to an encoded string"""
- metadata = MetaData(engines.utf8_engine())
+ metadata = self.metadata
table = Table('unicode_data', metadata,
Column('id', Unicode(40), primary_key=True, test_needs_autoincrement=True),
Column('data', Unicode(40)))
- try:
- metadata.create_all()
- ustring = util.b('petit voix m\xe2\x80\x99a').decode('utf-8')
-
- table.insert().execute(id=ustring, data=ustring)
- class LocalFoo(self.classes.Base):
- pass
- mapper(LocalFoo, table)
- eq_(create_session().query(LocalFoo).get(ustring),
- LocalFoo(id=ustring, data=ustring))
- finally:
- metadata.drop_all()
+ metadata.create_all()
+ ustring = util.b('petit voix m\xe2\x80\x99a').decode('utf-8')
+
+ table.insert().execute(id=ustring, data=ustring)
+ class LocalFoo(self.classes.Base):
+ pass
+ mapper(LocalFoo, table)
+ eq_(create_session().query(LocalFoo).get(ustring),
+ LocalFoo(id=ustring, data=ustring))
def test_populate_existing(self):
User, Address = self.classes.User, self.classes.Address
run_dispose_bind = 'once'
- @classmethod
- def create_engine(cls):
- return engines.utf8_engine()
-
@classmethod
def define_tables(cls, metadata):
t1 = Table('unitable1', metadata,
test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 3.3_postgresql_psycopg2_nocextensions 151
test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 3.3_sqlite_pysqlite_cextensions 151
+# TEST: test.aaa_profiling.test_compiler.CompileTest:test_insert
+
+test.aaa_profiling.test_compiler.CompileTest:test_insert 2.7_sqlite_pysqlite_cextensions 73
+
+# TEST: test.aaa_profiling.test_compiler.CompileTest:test_select
+
+test.aaa_profiling.test_compiler.CompileTest:test_select 2.7_sqlite_pysqlite_cextensions 145
+
+# TEST: test.aaa_profiling.test_compiler.CompileTest:test_select_labels
+
+test.aaa_profiling.test_compiler.CompileTest:test_select_labels 2.7_sqlite_pysqlite_cextensions 179
+
+# TEST: test.aaa_profiling.test_compiler.CompileTest:test_update
+
+test.aaa_profiling.test_compiler.CompileTest:test_update 2.7_sqlite_pysqlite_cextensions 78
+
+# TEST: test.aaa_profiling.test_compiler.CompileTest:test_update_whereclause
+
+test.aaa_profiling.test_compiler.CompileTest:test_update_whereclause 2.7_sqlite_pysqlite_cextensions 147
+
# TEST: test.aaa_profiling.test_orm.AttributeOverheadTest.test_attribute_set
test.aaa_profiling.test_orm.AttributeOverheadTest.test_attribute_set 2.7_sqlite_pysqlite_cextensions 4265
@property
def sane_rowcount(self):
return skip_if(
- lambda: not self.db.dialect.supports_sane_rowcount,
+ lambda config: not config.db.dialect.supports_sane_rowcount,
"driver doesn't support 'sane' rowcount"
)
@property
def sane_multi_rowcount(self):
return skip_if(
- lambda: not self.db.dialect.supports_sane_multi_rowcount,
+ lambda config: not config.db.dialect.supports_sane_multi_rowcount,
"driver doesn't support 'sane' multi row count"
)
@property
def hstore(self):
- def check_hstore():
- if not against("postgresql"):
+ def check_hstore(config):
+ if not against(config, "postgresql"):
return False
try:
- self.db.execute("SELECT 'a=>1,a=>2'::hstore;")
+ config.db.execute("SELECT 'a=>1,a=>2'::hstore;")
return True
except:
return False
@property
def range_types(self):
- def check_range_types():
- if not against("postgresql+psycopg2"):
+ def check_range_types(config):
+ if not against(config, "postgresql+psycopg2"):
return False
try:
- self.db.execute("select '[1,2)'::int4range;")
+ config.db.execute("select '[1,2)'::int4range;")
# only supported in psycopg 2.5+
from psycopg2.extras import NumericRange
return True
@property
def oracle_test_dblink(self):
return skip_if(
- lambda: not self.config.file_config.has_option(
+ lambda config: not config.file_config.has_option(
'sqla_testing', 'oracle_db_link'),
"oracle_db_link option not specified in config"
)
as not present.
"""
- return skip_if(lambda: self.config.options.low_connections)
+ return skip_if(lambda config: config.options.low_connections)
@property
def skip_mysql_on_windows(self):
"""
return skip_if(
- lambda: util.py3k and
- self.config.options.enable_plugin_coverage,
+ lambda config: util.py3k and
+ config.options.has_coverage,
"Stability issues with coverage + py3k"
)
except ImportError:
return False
- def _has_mysql_on_windows(self):
- return against('mysql') and \
- self.db.dialect._detect_casing(self.db) == 1
+ @property
+ def mysql_fully_case_sensitive(self):
+ return only_if(self._has_mysql_fully_case_sensitive)
+
+ def _has_mysql_on_windows(self, config):
+ return against(config, 'mysql') and \
+ config.db.dialect._detect_casing(config.db) == 1
- def _has_mysql_fully_case_sensitive(self):
- return against('mysql') and \
- self.db.dialect._detect_casing(self.db) == 0
+ def _has_mysql_fully_case_sensitive(self, config):
+ return against(config, 'mysql') and \
+ config.db.dialect._detect_casing(config.db) == 0
sqltypes.DateTime)
]:
assert isinstance(fn(*args).type, type_), \
- "%s / %s" % (fn(), type_)
+ "%s / %r != %s" % (fn(), fn(*args).type, type_)
assert isinstance(func.concat("foo", "bar").type, sqltypes.String)
def tearDown(self):
pass
- @testing.uses_deprecated
- def test_standalone_execute(self):
- x = testing.db.func.current_date().execute().scalar()
- y = testing.db.func.current_date().select().execute().scalar()
- z = testing.db.func.current_date().scalar()
- assert (x == y == z) is True
-
- # ansi func
- x = testing.db.func.current_date()
- assert isinstance(x.type, Date)
- assert isinstance(x.execute().scalar(), datetime.date)
def test_conn_execute(self):
from sqlalchemy.sql.expression import FunctionElement
go
)
- @testing.uses_deprecated
- def test_deprecated_useexisting(self):
- meta2 = self._useexisting_fixture()
- users = Table('users', meta2, Column('name', Unicode),
- autoload=True, useexisting=True)
- assert isinstance(users.c.name.type, Unicode)
- assert not users.quote
- users = Table('users', meta2, quote=True, autoload=True,
- useexisting=True)
- assert users.quote
def test_keep_plus_existing_raises(self):
meta2 = self._useexisting_fixture()
extend_existing=True
)
- @testing.uses_deprecated
+ @testing.uses_deprecated()
def test_existing_plus_useexisting_raises(self):
meta2 = self._useexisting_fixture()
assert_raises(
self.assert_(r.row == r['row'] == r[shadowed.c.row] == 'Without light there is no shadow')
self.assert_(r['_parent'] == 'Hidden parent')
self.assert_(r['_row'] == 'Hidden row')
- try:
- print(r._parent, r._row)
- self.fail('Should not allow access to private attributes')
- except AttributeError:
- pass # expected
finally:
shadowed.drop(checkfirst=True)
expr = column('foo', CHAR) == "asdf"
eq_(expr.right.type.__class__, CHAR)
- @testing.uses_deprecated
- @testing.fails_on('firebird', 'Data type unknown on the parameter')
- @testing.fails_on('mssql', 'int is unsigned ? not clear')
- def test_operator_adapt(self):
- """test type-based overloading of operators"""
-
- # test string concatenation
- expr = test_table.c.data + "somedata"
- eq_(testing.db.execute(select([expr])).scalar(), "somedatasomedata")
-
- expr = test_table.c.id + 15
- eq_(testing.db.execute(select([expr])).scalar(), 16)
-
- # test custom operator conversion
- expr = test_table.c.avalue + 40
- assert expr.type.__class__ is test_table.c.avalue.type.__class__
-
- # value here is calculated as (250 - 40) / 10 = 21
- # because "40" is an integer, not an "avalue"
- eq_(testing.db.execute(select([expr.label('foo')])).scalar(), 21)
-
- expr = test_table.c.avalue + literal(40, type_=MyCustomType)
-
- # + operator converted to -
- # value is calculated as: (250 - (40 * 10)) / 10 == -15
- eq_(testing.db.execute(select([expr.label('foo')])).scalar(), -15)
-
- # this one relies upon anonymous labeling to assemble result
- # processing rules on the column.
- eq_(testing.db.execute(select([expr])).scalar(), -15)
def test_typedec_operator_adapt(self):
expr = test_table.c.bvalue + "hi"
from sqlalchemy import *
from sqlalchemy.testing import fixtures, engines, eq_
from sqlalchemy import testing
-from sqlalchemy.testing.engines import utf8_engine
-from sqlalchemy.sql import column
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.util import u, ue
@classmethod
def setup_class(cls):
- global unicode_bind, metadata, t1, t2, t3
+ global metadata, t1, t2, t3
- unicode_bind = utf8_engine()
-
- metadata = MetaData(unicode_bind)
+ metadata = MetaData(testing.db)
t1 = Table(u('unitable1'), metadata,
Column(u('méil'), Integer, primary_key=True),
Column(ue('\u6e2c\u8a66'), Integer),
@classmethod
def teardown_class(cls):
- global unicode_bind
metadata.drop_all()
- del unicode_bind
def test_insert(self):
t1.insert().execute({u('méil'):1, ue('\u6e2c\u8a66'):5})
assert t3.select().execute().fetchall() == [(1, 5, 1, 1)]
def test_reflect(self):
- t1.insert().execute({u('méil'):2, ue('\u6e2c\u8a66'):7})
- t2.insert().execute({u('a'):2, u('b'):2})
+ t1.insert().execute({u('méil'): 2, ue('\u6e2c\u8a66'): 7})
+ t2.insert().execute({u('a'): 2, u('b'): 2})
t3.insert().execute({ue('\u6e2c\u8a66_id'): 2,
ue('unitable1_\u6e2c\u8a66'): 7,
u('Unitéble2_b'): 2,
ue('\u6e2c\u8a66_self'): 2})
- meta = MetaData(unicode_bind)
+ meta = MetaData(testing.db)
tt1 = Table(t1.name, meta, autoload=True)
tt2 = Table(t2.name, meta, autoload=True)
tt3 = Table(t3.name, meta, autoload=True)
- tt1.insert().execute({u('méil'):1, ue('\u6e2c\u8a66'):5})
- tt2.insert().execute({u('méil'):1, ue('\u6e2c\u8a66'):1})
+ tt1.insert().execute({u('méil'): 1, ue('\u6e2c\u8a66'): 5})
+ tt2.insert().execute({u('méil'): 1, ue('\u6e2c\u8a66'): 1})
tt3.insert().execute({ue('\u6e2c\u8a66_id'): 1,
ue('unitable1_\u6e2c\u8a66'): 5,
u('Unitéble2_b'): 1,
self.assert_(tt3.select(order_by=desc(ue('\u6e2c\u8a66_id'))).
execute().fetchall() ==
[(2, 7, 2, 2), (1, 5, 1, 1)])
- meta.drop_all()
- metadata.create_all()
def test_repr(self):
"Column('\\u6e2c\\u8a66_id', Integer(), table=<\u6e2c\u8a66>), "
"schema=None)"))
-class EscapesDefaultsTest(fixtures.TestBase):
- def test_default_exec(self):
- metadata = MetaData(testing.db)
- t1 = Table('t1', metadata,
- Column('special_col', Integer, Sequence('special_col'), primary_key=True),
- Column('data', String(50)) # to appease SQLite without DEFAULT VALUES
- )
- metadata.create_all()
-
- try:
- engine = metadata.bind
-
- # reset the identifier preparer, so that we can force it to cache
- # a unicode identifier
- engine.dialect.identifier_preparer = engine.dialect.preparer(engine.dialect)
- select([column('special_col')]).select_from(t1).execute().close()
- assert isinstance(engine.dialect.identifier_preparer.format_sequence(Sequence('special_col')), str)
-
- # now execute, run the sequence. it should run in u"Special_col.nextid" or similar as
- # a unicode object; cx_oracle asserts that this is None or a String (postgresql lets it pass thru).
- # ensure that executioncontext._exec_default() is encoding.
- t1.insert().execute(data='foo')
- finally:
- metadata.drop_all()
-
-