help="Database uri. Multiple OK, first one is run by default.")
make_option("--dropfirst", action="store_true", dest="dropfirst",
help="Drop all tables in the target database first")
+ make_option("--backend-only", action="store_true", dest="backend_only",
+ help="Run only tests marked with __backend__")
make_option("--mockpool", action="store_true", dest="mockpool",
help="Use mock pool (asserts only one connection used)")
make_option("--low-connections", action="store_true", dest="low_connections",
return False
elif cls.__name__.startswith('_'):
return False
+ elif config.options.backend_only and not getattr(cls, '__backend__', False):
+ return False
else:
return True
def generate_sub_tests(cls, module):
- if getattr(cls, '__multiple__', False):
+ if getattr(cls, '__backend__', False):
for cfg in config.Config.all_configs():
name = "%s_%s_%s" % (cls.__name__, cfg.db.name, cfg.db.driver)
subcls = type(
(cls, ),
{
"__only_on__": (cfg.db.name, cfg.db.driver),
- "__multiple__": False}
+ "__backend__": False}
)
setattr(module, name, subcls)
yield subcls
def pytest_collection_modifyitems(session, config, items):
- # look for all those classes that specify __multiple__ and
+ # look for all those classes that specify __backend__ and
# expand them out into per-database test cases.
# this is much easier to do within pytest_pycollect_makeitem, however
class TableDDLTest(fixtures.TestBase):
- __multiple__ = True
+ __backend__ = True
def _simple_fixture(self):
return Table('test_table', self.metadata,
class LastrowidTest(fixtures.TablesTest):
run_deletes = 'each'
- __multiple__ = True
+ __backend__ = True
__requires__ = 'implements_get_lastrowid', 'autoincrement_insert'
class InsertBehaviorTest(fixtures.TablesTest):
run_deletes = 'each'
- __multiple__ = True
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class ReturningTest(fixtures.TablesTest):
run_create_tables = 'each'
__requires__ = 'returning', 'autoincrement_insert'
- __multiple__ = True
+ __backend__ = True
__engine_options__ = {"implicit_returning": True}
class HasTableTest(fixtures.TablesTest):
- __multiple__ = True
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class ComponentReflectionTest(fixtures.TablesTest):
run_inserts = run_deletes = None
- __multiple__ = True
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class RowFetchTest(fixtures.TablesTest):
- __multiple__ = True
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
__requires__ = ('percent_schema_names', )
- __multiple__ = True
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
setting.
"""
- __multiple__ = True
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class SequenceTest(fixtures.TablesTest):
__requires__ = ('sequences',)
- __multiple__ = True
+ __backend__ = True
run_create_tables = 'each'
class HasSequenceTest(fixtures.TestBase):
__requires__ = 'sequences',
- __multiple__ = True
+ __backend__ = True
def test_has_sequence(self):
s1 = Sequence('user_id_seq')
class UnicodeVarcharTest(_UnicodeFixture, fixtures.TablesTest):
__requires__ = 'unicode_data',
- __multiple__ = True
+ __backend__ = True
datatype = Unicode(255)
class UnicodeTextTest(_UnicodeFixture, fixtures.TablesTest):
__requires__ = 'unicode_data', 'text_type'
- __multiple__ = True
+ __backend__ = True
datatype = UnicodeText()
self._test_empty_strings()
class TextTest(_LiteralRoundTripFixture, fixtures.TablesTest):
- __multiple__ = True
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
self._literal_round_trip(Text, [data], [data])
class StringTest(_LiteralRoundTripFixture, fixtures.TestBase):
- __multiple__ = True
+ __backend__ = True
@requirements.unbounded_varchar
def test_nolength_string(self):
class DateTimeTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'datetime',
- __multiple__ = True
+ __backend__ = True
datatype = DateTime
data = datetime.datetime(2012, 10, 15, 12, 57, 18)
class DateTimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'datetime_microseconds',
- __multiple__ = True
+ __backend__ = True
datatype = DateTime
data = datetime.datetime(2012, 10, 15, 12, 57, 18, 396)
class TimeTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'time',
- __multiple__ = True
+ __backend__ = True
datatype = Time
data = datetime.time(12, 57, 18)
class TimeMicrosecondsTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'time_microseconds',
- __multiple__ = True
+ __backend__ = True
datatype = Time
data = datetime.time(12, 57, 18, 396)
class DateTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'date',
- __multiple__ = True
+ __backend__ = True
datatype = Date
data = datetime.date(2012, 10, 15)
class DateTimeCoercedToDateTimeTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'date',
- __multiple__ = True
+ __backend__ = True
datatype = Date
data = datetime.datetime(2012, 10, 15, 12, 57, 18)
compare = datetime.date(2012, 10, 15)
class DateTimeHistoricTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'datetime_historic',
- __multiple__ = True
+ __backend__ = True
datatype = DateTime
data = datetime.datetime(1850, 11, 10, 11, 52, 35)
class DateHistoricTest(_DateFixture, fixtures.TablesTest):
__requires__ = 'date_historic',
- __multiple__ = True
+ __backend__ = True
datatype = Date
data = datetime.date(1727, 4, 1)
class IntegerTest(_LiteralRoundTripFixture, fixtures.TestBase):
- __multiple__ = True
+ __backend__ = True
def test_literal(self):
self._literal_round_trip(Integer, [5], [5])
class NumericTest(_LiteralRoundTripFixture, fixtures.TestBase):
- __multiple__ = True
+ __backend__ = True
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
@testing.provide_metadata
class BooleanTest(_LiteralRoundTripFixture, fixtures.TablesTest):
- __multiple__ = True
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class SimpleUpdateDeleteTest(fixtures.TablesTest):
run_deletes = 'each'
- __multiple__ = True
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class CompileTest(fixtures.TestBase, AssertsExecutionResults):
__requires__ = 'cpython',
+ __backend__ = True
@classmethod
def setup_class(cls):
class MemUsageTest(EnsureZeroed):
__requires__ = 'cpython',
+ __backend__ = True
# ensure a pure growing test trips the assertion
@testing.fails_if(lambda: True)
import sys
class MergeTest(fixtures.MappedTest):
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
to load 1000 related objects from the identity map.
"""
+ __backend__ = True
@classmethod
s.merge(a)
class DeferOptionsTest(fixtures.MappedTest):
+ __backend__ = True
+
@classmethod
def define_tables(cls, metadata):
Table('a', metadata,
class AttributeOverheadTest(fixtures.MappedTest):
+ __backend__ = True
+
@classmethod
def define_tables(cls, metadata):
Table('parent', metadata, Column('id', Integer,
class ResultSetTest(fixtures.TestBase, AssertsExecutionResults):
+ __backend__ = True
@classmethod
def setup_class(cls):
class ExecutionTest(fixtures.TestBase):
+ __backend__ = True
def test_minimal_connection_execute(self):
# create an engine without any instrumentation.
class RowProxyTest(fixtures.TestBase):
__requires__ = 'cpython',
+ __backend__ = True
def _rowproxy_fixture(self, keys, processors, row):
class MockMeta(object):
users, metadata, users_autoinc = None, None, None
class ExecuteTest(fixtures.TestBase):
+ __backend__ = True
+
@classmethod
def setup_class(cls):
global users, users_autoinc, metadata
eng.dispose()
class ConvenienceExecuteTest(fixtures.TablesTest):
+ __backend__ = True
+
@classmethod
def define_tables(cls, metadata):
cls.table = Table('exec_test', metadata,
self._assert_no_data()
class CompiledCacheTest(fixtures.TestBase):
+ __backend__ = True
+
@classmethod
def setup_class(cls):
global users, metadata
assert len(cache) == 1
eq_(conn.execute("select count(*) from users").scalar(), 3)
-class LogParamsTest(fixtures.TestBase):
- __only_on__ = 'sqlite'
- __requires__ = 'ad_hoc_engines',
-
- def setup(self):
- self.eng = engines.testing_engine(options={'echo':True})
- self.eng.execute("create table foo (data string)")
- self.buf = logging.handlers.BufferingHandler(100)
- for log in [
- logging.getLogger('sqlalchemy.engine'),
- logging.getLogger('sqlalchemy.pool')
- ]:
- log.addHandler(self.buf)
-
- def teardown(self):
- self.eng.execute("drop table foo")
- for log in [
- logging.getLogger('sqlalchemy.engine'),
- logging.getLogger('sqlalchemy.pool')
- ]:
- log.removeHandler(self.buf)
-
- def test_log_large_dict(self):
- self.eng.execute(
- "INSERT INTO foo (data) values (:data)",
- [{"data":str(i)} for i in range(100)]
- )
- eq_(
- self.buf.buffer[1].message,
- "[{'data': '0'}, {'data': '1'}, {'data': '2'}, {'data': '3'}, "
- "{'data': '4'}, {'data': '5'}, {'data': '6'}, {'data': '7'}"
- " ... displaying 10 of 100 total bound "
- "parameter sets ... {'data': '98'}, {'data': '99'}]"
- )
-
- def test_log_large_list(self):
- self.eng.execute(
- "INSERT INTO foo (data) values (?)",
- [(str(i), ) for i in range(100)]
- )
- eq_(
- self.buf.buffer[1].message,
- "[('0',), ('1',), ('2',), ('3',), ('4',), ('5',), "
- "('6',), ('7',) ... displaying 10 of 100 total "
- "bound parameter sets ... ('98',), ('99',)]"
- )
-
- def test_error_large_dict(self):
- assert_raises_message(
- tsa.exc.DBAPIError,
- r".*'INSERT INTO nonexistent \(data\) values \(:data\)' "
- "\[{'data': '0'}, {'data': '1'}, {'data': '2'}, "
- "{'data': '3'}, {'data': '4'}, {'data': '5'}, "
- "{'data': '6'}, {'data': '7'} ... displaying 10 of "
- "100 total bound parameter sets ... {'data': '98'}, {'data': '99'}\]",
- lambda: self.eng.execute(
- "INSERT INTO nonexistent (data) values (:data)",
- [{"data":str(i)} for i in range(100)]
- )
- )
-
- def test_error_large_list(self):
- assert_raises_message(
- tsa.exc.DBAPIError,
- r".*INSERT INTO nonexistent \(data\) values "
- "\(\?\)' \[\('0',\), \('1',\), \('2',\), \('3',\), "
- "\('4',\), \('5',\), \('6',\), \('7',\) ... displaying "
- "10 of 100 total bound parameter sets ... "
- "\('98',\), \('99',\)\]",
- lambda: self.eng.execute(
- "INSERT INTO nonexistent (data) values (?)",
- [(str(i), ) for i in range(100)]
- )
- )
-
-class LoggingNameTest(fixtures.TestBase):
- __requires__ = 'ad_hoc_engines',
-
- def _assert_names_in_execute(self, eng, eng_name, pool_name):
- eng.execute(select([1]))
- assert self.buf.buffer
- for name in [b.name for b in self.buf.buffer]:
- assert name in (
- 'sqlalchemy.engine.base.Engine.%s' % eng_name,
- 'sqlalchemy.pool.%s.%s' %
- (eng.pool.__class__.__name__, pool_name)
- )
-
- def _assert_no_name_in_execute(self, eng):
- eng.execute(select([1]))
- assert self.buf.buffer
- for name in [b.name for b in self.buf.buffer]:
- assert name in (
- 'sqlalchemy.engine.base.Engine',
- 'sqlalchemy.pool.%s' % eng.pool.__class__.__name__
- )
-
- def _named_engine(self, **kw):
- options = {
- 'logging_name':'myenginename',
- 'pool_logging_name':'mypoolname',
- 'echo':True
- }
- options.update(kw)
- return engines.testing_engine(options=options)
-
- def _unnamed_engine(self, **kw):
- kw.update({'echo':True})
- return engines.testing_engine(options=kw)
-
- def setup(self):
- self.buf = logging.handlers.BufferingHandler(100)
- for log in [
- logging.getLogger('sqlalchemy.engine'),
- logging.getLogger('sqlalchemy.pool')
- ]:
- log.addHandler(self.buf)
-
- def teardown(self):
- for log in [
- logging.getLogger('sqlalchemy.engine'),
- logging.getLogger('sqlalchemy.pool')
- ]:
- log.removeHandler(self.buf)
-
- def test_named_logger_names(self):
- eng = self._named_engine()
- eq_(eng.logging_name, "myenginename")
- eq_(eng.pool.logging_name, "mypoolname")
-
- def test_named_logger_names_after_dispose(self):
- eng = self._named_engine()
- eng.execute(select([1]))
- eng.dispose()
- eq_(eng.logging_name, "myenginename")
- eq_(eng.pool.logging_name, "mypoolname")
-
- def test_unnamed_logger_names(self):
- eng = self._unnamed_engine()
- eq_(eng.logging_name, None)
- eq_(eng.pool.logging_name, None)
-
- def test_named_logger_execute(self):
- eng = self._named_engine()
- self._assert_names_in_execute(eng, "myenginename", "mypoolname")
-
- def test_named_logger_echoflags_execute(self):
- eng = self._named_engine(echo='debug', echo_pool='debug')
- self._assert_names_in_execute(eng, "myenginename", "mypoolname")
-
- def test_named_logger_execute_after_dispose(self):
- eng = self._named_engine()
- eng.execute(select([1]))
- eng.dispose()
- self._assert_names_in_execute(eng, "myenginename", "mypoolname")
-
- def test_unnamed_logger_execute(self):
- eng = self._unnamed_engine()
- self._assert_no_name_in_execute(eng)
-
- def test_unnamed_logger_echoflags_execute(self):
- eng = self._unnamed_engine(echo='debug', echo_pool='debug')
- self._assert_no_name_in_execute(eng)
-
-class EchoTest(fixtures.TestBase):
- __requires__ = 'ad_hoc_engines',
-
- def setup(self):
- self.level = logging.getLogger('sqlalchemy.engine').level
- logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN)
- self.buf = logging.handlers.BufferingHandler(100)
- logging.getLogger('sqlalchemy.engine').addHandler(self.buf)
-
- def teardown(self):
- logging.getLogger('sqlalchemy.engine').removeHandler(self.buf)
- logging.getLogger('sqlalchemy.engine').setLevel(self.level)
-
- def testing_engine(self):
- e = engines.testing_engine()
-
- # do an initial execute to clear out 'first connect'
- # messages
- e.execute(select([10])).close()
- self.buf.flush()
-
- return e
-
- def test_levels(self):
- e1 = engines.testing_engine()
-
- eq_(e1._should_log_info(), False)
- eq_(e1._should_log_debug(), False)
- eq_(e1.logger.isEnabledFor(logging.INFO), False)
- eq_(e1.logger.getEffectiveLevel(), logging.WARN)
-
- e1.echo = True
- eq_(e1._should_log_info(), True)
- eq_(e1._should_log_debug(), False)
- eq_(e1.logger.isEnabledFor(logging.INFO), True)
- eq_(e1.logger.getEffectiveLevel(), logging.INFO)
-
- e1.echo = 'debug'
- eq_(e1._should_log_info(), True)
- eq_(e1._should_log_debug(), True)
- eq_(e1.logger.isEnabledFor(logging.DEBUG), True)
- eq_(e1.logger.getEffectiveLevel(), logging.DEBUG)
-
- e1.echo = False
- eq_(e1._should_log_info(), False)
- eq_(e1._should_log_debug(), False)
- eq_(e1.logger.isEnabledFor(logging.INFO), False)
- eq_(e1.logger.getEffectiveLevel(), logging.WARN)
-
- def test_echo_flag_independence(self):
- """test the echo flag's independence to a specific engine."""
-
- e1 = self.testing_engine()
- e2 = self.testing_engine()
-
- e1.echo = True
- e1.execute(select([1])).close()
- e2.execute(select([2])).close()
-
- e1.echo = False
- e1.execute(select([3])).close()
- e2.execute(select([4])).close()
-
- e2.echo = True
- e1.execute(select([5])).close()
- e2.execute(select([6])).close()
-
- assert self.buf.buffer[0].getMessage().startswith("SELECT 1")
- assert self.buf.buffer[2].getMessage().startswith("SELECT 6")
- assert len(self.buf.buffer) == 4
class MockStrategyTest(fixtures.TestBase):
def _engine_fixture(self):
)
class ResultProxyTest(fixtures.TestBase):
+ __backend__ = True
def test_nontuple_row(self):
"""ensure the C version of BaseRowProxy handles
class EngineEventsTest(fixtures.TestBase):
__requires__ = 'ad_hoc_engines',
+ __backend__ = True
def tearDown(self):
Engine.dispatch._clear()
--- /dev/null
+from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \
+ config, is_
+import re
+from sqlalchemy.testing.util import picklers
+from sqlalchemy.interfaces import ConnectionProxy
+from sqlalchemy import MetaData, Integer, String, INT, VARCHAR, func, \
+ bindparam, select, event, TypeDecorator, create_engine, Sequence
+from sqlalchemy.sql import column, literal
+from sqlalchemy.testing.schema import Table, Column
+import sqlalchemy as tsa
+from sqlalchemy import testing
+from sqlalchemy.testing import engines
+from sqlalchemy import util
+from sqlalchemy.testing.engines import testing_engine
+import logging.handlers
+from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam
+from sqlalchemy.engine import result as _result, default
+from sqlalchemy.engine.base import Engine
+from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.mock import Mock, call, patch
+
+class LogParamsTest(fixtures.TestBase):
+ __only_on__ = 'sqlite'
+ __requires__ = 'ad_hoc_engines',
+
+ def setup(self):
+ self.eng = engines.testing_engine(options={'echo':True})
+ self.eng.execute("create table foo (data string)")
+ self.buf = logging.handlers.BufferingHandler(100)
+ for log in [
+ logging.getLogger('sqlalchemy.engine'),
+ logging.getLogger('sqlalchemy.pool')
+ ]:
+ log.addHandler(self.buf)
+
+ def teardown(self):
+ self.eng.execute("drop table foo")
+ for log in [
+ logging.getLogger('sqlalchemy.engine'),
+ logging.getLogger('sqlalchemy.pool')
+ ]:
+ log.removeHandler(self.buf)
+
+ def test_log_large_dict(self):
+ self.eng.execute(
+ "INSERT INTO foo (data) values (:data)",
+ [{"data":str(i)} for i in range(100)]
+ )
+ eq_(
+ self.buf.buffer[1].message,
+ "[{'data': '0'}, {'data': '1'}, {'data': '2'}, {'data': '3'}, "
+ "{'data': '4'}, {'data': '5'}, {'data': '6'}, {'data': '7'}"
+ " ... displaying 10 of 100 total bound "
+ "parameter sets ... {'data': '98'}, {'data': '99'}]"
+ )
+
+ def test_log_large_list(self):
+ self.eng.execute(
+ "INSERT INTO foo (data) values (?)",
+ [(str(i), ) for i in range(100)]
+ )
+ eq_(
+ self.buf.buffer[1].message,
+ "[('0',), ('1',), ('2',), ('3',), ('4',), ('5',), "
+ "('6',), ('7',) ... displaying 10 of 100 total "
+ "bound parameter sets ... ('98',), ('99',)]"
+ )
+
+ def test_error_large_dict(self):
+ assert_raises_message(
+ tsa.exc.DBAPIError,
+ r".*'INSERT INTO nonexistent \(data\) values \(:data\)' "
+ "\[{'data': '0'}, {'data': '1'}, {'data': '2'}, "
+ "{'data': '3'}, {'data': '4'}, {'data': '5'}, "
+ "{'data': '6'}, {'data': '7'} ... displaying 10 of "
+ "100 total bound parameter sets ... {'data': '98'}, {'data': '99'}\]",
+ lambda: self.eng.execute(
+ "INSERT INTO nonexistent (data) values (:data)",
+ [{"data":str(i)} for i in range(100)]
+ )
+ )
+
+ def test_error_large_list(self):
+ assert_raises_message(
+ tsa.exc.DBAPIError,
+ r".*INSERT INTO nonexistent \(data\) values "
+ "\(\?\)' \[\('0',\), \('1',\), \('2',\), \('3',\), "
+ "\('4',\), \('5',\), \('6',\), \('7',\) ... displaying "
+ "10 of 100 total bound parameter sets ... "
+ "\('98',\), \('99',\)\]",
+ lambda: self.eng.execute(
+ "INSERT INTO nonexistent (data) values (?)",
+ [(str(i), ) for i in range(100)]
+ )
+ )
+
+class LoggingNameTest(fixtures.TestBase):
+ __requires__ = 'ad_hoc_engines',
+
+ def _assert_names_in_execute(self, eng, eng_name, pool_name):
+ eng.execute(select([1]))
+ assert self.buf.buffer
+ for name in [b.name for b in self.buf.buffer]:
+ assert name in (
+ 'sqlalchemy.engine.base.Engine.%s' % eng_name,
+ 'sqlalchemy.pool.%s.%s' %
+ (eng.pool.__class__.__name__, pool_name)
+ )
+
+ def _assert_no_name_in_execute(self, eng):
+ eng.execute(select([1]))
+ assert self.buf.buffer
+ for name in [b.name for b in self.buf.buffer]:
+ assert name in (
+ 'sqlalchemy.engine.base.Engine',
+ 'sqlalchemy.pool.%s' % eng.pool.__class__.__name__
+ )
+
+ def _named_engine(self, **kw):
+ options = {
+ 'logging_name':'myenginename',
+ 'pool_logging_name':'mypoolname',
+ 'echo':True
+ }
+ options.update(kw)
+ return engines.testing_engine(options=options)
+
+ def _unnamed_engine(self, **kw):
+ kw.update({'echo':True})
+ return engines.testing_engine(options=kw)
+
+ def setup(self):
+ self.buf = logging.handlers.BufferingHandler(100)
+ for log in [
+ logging.getLogger('sqlalchemy.engine'),
+ logging.getLogger('sqlalchemy.pool')
+ ]:
+ log.addHandler(self.buf)
+
+ def teardown(self):
+ for log in [
+ logging.getLogger('sqlalchemy.engine'),
+ logging.getLogger('sqlalchemy.pool')
+ ]:
+ log.removeHandler(self.buf)
+
+ def test_named_logger_names(self):
+ eng = self._named_engine()
+ eq_(eng.logging_name, "myenginename")
+ eq_(eng.pool.logging_name, "mypoolname")
+
+ def test_named_logger_names_after_dispose(self):
+ eng = self._named_engine()
+ eng.execute(select([1]))
+ eng.dispose()
+ eq_(eng.logging_name, "myenginename")
+ eq_(eng.pool.logging_name, "mypoolname")
+
+ def test_unnamed_logger_names(self):
+ eng = self._unnamed_engine()
+ eq_(eng.logging_name, None)
+ eq_(eng.pool.logging_name, None)
+
+ def test_named_logger_execute(self):
+ eng = self._named_engine()
+ self._assert_names_in_execute(eng, "myenginename", "mypoolname")
+
+ def test_named_logger_echoflags_execute(self):
+ eng = self._named_engine(echo='debug', echo_pool='debug')
+ self._assert_names_in_execute(eng, "myenginename", "mypoolname")
+
+ def test_named_logger_execute_after_dispose(self):
+ eng = self._named_engine()
+ eng.execute(select([1]))
+ eng.dispose()
+ self._assert_names_in_execute(eng, "myenginename", "mypoolname")
+
+ def test_unnamed_logger_execute(self):
+ eng = self._unnamed_engine()
+ self._assert_no_name_in_execute(eng)
+
+ def test_unnamed_logger_echoflags_execute(self):
+ eng = self._unnamed_engine(echo='debug', echo_pool='debug')
+ self._assert_no_name_in_execute(eng)
+
+class EchoTest(fixtures.TestBase):
+ __requires__ = 'ad_hoc_engines',
+
+ def setup(self):
+ self.level = logging.getLogger('sqlalchemy.engine').level
+ logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN)
+ self.buf = logging.handlers.BufferingHandler(100)
+ logging.getLogger('sqlalchemy.engine').addHandler(self.buf)
+
+ def teardown(self):
+ logging.getLogger('sqlalchemy.engine').removeHandler(self.buf)
+ logging.getLogger('sqlalchemy.engine').setLevel(self.level)
+
+ def testing_engine(self):
+ e = engines.testing_engine()
+
+ # do an initial execute to clear out 'first connect'
+ # messages
+ e.execute(select([10])).close()
+ self.buf.flush()
+
+ return e
+
+ def test_levels(self):
+ e1 = engines.testing_engine()
+
+ eq_(e1._should_log_info(), False)
+ eq_(e1._should_log_debug(), False)
+ eq_(e1.logger.isEnabledFor(logging.INFO), False)
+ eq_(e1.logger.getEffectiveLevel(), logging.WARN)
+
+ e1.echo = True
+ eq_(e1._should_log_info(), True)
+ eq_(e1._should_log_debug(), False)
+ eq_(e1.logger.isEnabledFor(logging.INFO), True)
+ eq_(e1.logger.getEffectiveLevel(), logging.INFO)
+
+ e1.echo = 'debug'
+ eq_(e1._should_log_info(), True)
+ eq_(e1._should_log_debug(), True)
+ eq_(e1.logger.isEnabledFor(logging.DEBUG), True)
+ eq_(e1.logger.getEffectiveLevel(), logging.DEBUG)
+
+ e1.echo = False
+ eq_(e1._should_log_info(), False)
+ eq_(e1._should_log_debug(), False)
+ eq_(e1.logger.isEnabledFor(logging.INFO), False)
+ eq_(e1.logger.getEffectiveLevel(), logging.WARN)
+
+ def test_echo_flag_independence(self):
+ """test the echo flag's independence to a specific engine."""
+
+ e1 = self.testing_engine()
+ e2 = self.testing_engine()
+
+ e1.echo = True
+ e1.execute(select([1])).close()
+ e2.execute(select([2])).close()
+
+ e1.echo = False
+ e1.execute(select([3])).close()
+ e2.execute(select([4])).close()
+
+ e2.echo = True
+ e1.execute(select([5])).close()
+ e2.execute(select([6])).close()
+
+ assert self.buf.buffer[0].getMessage().startswith("SELECT 1")
+ assert self.buf.buffer[2].getMessage().startswith("SELECT 6")
+ assert len(self.buf.buffer) == 4
metadata, users = None, None
class ReflectionTest(fixtures.TestBase, ComparesTables):
+ __backend__ = True
@testing.exclude('mssql', '<', (10, 0, 0),
'Date is only supported on MSSQL 2008+')
_drop_views(metadata.bind)
class CreateDropTest(fixtures.TestBase):
+ __backend__ = True
@classmethod
def setup_class(cls):
metadata.drop_all(bind=testing.db)
class SchemaManipulationTest(fixtures.TestBase):
+ __backend__ = True
+
def test_append_constraint_unique(self):
meta = MetaData()
assert addresses.constraints == set([addresses.primary_key, fk])
class UnicodeReflectionTest(fixtures.TestBase):
+ __backend__ = True
+
@classmethod
def setup_class(cls):
cls.metadata = metadata = MetaData()
)
class SchemaTest(fixtures.TestBase):
+ __backend__ = True
@testing.requires.schemas
@testing.requires.cross_schema_fk_reflection
class ReverseCasingReflectTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
+ __backend__ = True
@testing.requires.denormalized_names
def setup(self):
class CaseSensitiveTest(fixtures.TablesTest):
"""Nail down case sensitive behaviors, mostly on MySQL."""
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class ColumnEventsTest(fixtures.RemovesEvents, fixtures.TestBase):
+ __backend__ = True
@classmethod
def setup_class(cls):
users, metadata = None, None
class TransactionTest(fixtures.TestBase):
+ __backend__ = True
+
@classmethod
def setup_class(cls):
global users, metadata
eq_(result.fetchall(), [])
class ResetAgentTest(fixtures.TestBase):
+ __backend__ = True
+
def test_begin_close(self):
with testing.db.connect() as connection:
trans = connection.begin()
assert connection.connection._reset_agent is None
class AutoRollbackTest(fixtures.TestBase):
+ __backend__ = True
@classmethod
def setup_class(cls):
class TLTransactionTest(fixtures.TestBase):
__requires__ = ('ad_hoc_engines', )
+ __backend__ = True
@classmethod
def setup_class(cls):
class ForUpdateTest(fixtures.TestBase):
__requires__ = 'ad_hoc_engines',
+ __backend__ = True
@classmethod
def setup_class(cls):
class IsolationLevelTest(fixtures.TestBase):
__requires__ = ('isolation_level', 'ad_hoc_engines')
+ __backend__ = True
def _default_isolation_level(self):
if testing.against('sqlite'):
_PolymorphicAliasedJoins
class _PolymorphicTestBase(object):
+ __backend__ = True
@classmethod
def setup_mappers(cls):
# MySQL 5.5 on Windows crashes (the entire server, not the client)
# if you screw around with ON UPDATE CASCADE type of stuff.
__requires__ = 'skip_mysql_on_windows', 'on_update_or_deferrable_fks'
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class TransientExceptionTesst(_fixtures.FixtureTest):
run_inserts = None
+ __backend__ = True
def test_transient_exception(self):
"""An object that goes from a pk value to transient/pending
"""reverse the primary keys of two entities and ensure bookkeeping
succeeds."""
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
__unsupported_on__ = ('mssql', 'mysql')
__requires__ = 'on_update_or_deferrable_fks',
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class NonPKCascadeTest(fixtures.MappedTest):
__requires__ = 'skip_mysql_on_windows', 'on_update_or_deferrable_fks'
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class CascadeToFKPKTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
"""A primary key mutation cascades onto a foreign key that is itself a
primary key."""
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
__unsupported_on__ = ('mssql',)
__requires__ = 'skip_mysql_on_windows',
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class BindTest(_fixtures.FixtureTest):
run_inserts = None
+ __backend__ = True
def test_mapped_binds(self):
Address, addresses, users, User = (self.classes.Address,
class ExecutionTest(_fixtures.FixtureTest):
run_inserts = None
+ __backend__ = True
@testing.requires.sequences
def test_sequence_execute(self):
class TransScopingTest(_fixtures.FixtureTest):
run_inserts = None
+ __backend__ = True
def test_no_close_on_flush(self):
"""Flush() doesn't close a connection the session didn't open"""
class SessionUtilTest(_fixtures.FixtureTest):
run_inserts = None
+ __backend__ = True
def test_object_session_raises(self):
User = self.classes.User
class SessionStateTest(_fixtures.FixtureTest):
run_inserts = None
+ __backend__ = True
def test_info(self):
assert object_session(u1) is None
class SessionStateWFixtureTest(_fixtures.FixtureTest):
+ __backend__ = True
def test_autoflush_rollback(self):
Address, addresses, users, User = (self.classes.Address,
"""
run_inserts = None
+ __backend__ = True
def setup(self):
mapper(self.classes.User, self.tables.users)
class WeakIdentityMapTest(_fixtures.FixtureTest):
run_inserts = None
+ __backend__ = True
@testing.requires.predictable_gc
def test_weakref(self):
class StrongIdentityMapTest(_fixtures.FixtureTest):
run_inserts = None
+ __backend__ = True
@testing.uses_deprecated()
def test_strong_ref(self):
class IsModifiedTest(_fixtures.FixtureTest):
run_inserts = None
+ __backend__ = True
def _default_mapping_fixture(self):
User, Address = self.classes.User, self.classes.Address
run_setup_mappers = 'once'
run_inserts = 'once'
run_deletes = None
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class TLTransactionTest(fixtures.MappedTest):
run_dispose_bind = 'once'
+ __backend__ = True
@classmethod
def setup_bind(cls):
class FlushWarningsTest(fixtures.MappedTest):
run_setup_mappers = 'each'
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class SessionTransactionTest(FixtureTest):
run_inserts = None
+ __backend__ = True
def test_no_close_transaction_on_flush(self):
User, users = self.classes.User, self.tables.users
class FixtureDataTest(_LocalFixture):
run_inserts = 'each'
+ __backend__ = True
def test_attrs_on_rollback(self):
User = self.classes.User
"""
run_inserts = None
+ __backend__ = True
def _run_test(self, update_fn):
User, users = self.classes.User, self.tables.users
class ContextManagerTest(FixtureTest):
run_inserts = None
+ __backend__ = True
@testing.requires.savepoints
@engines.close_open_connections
class AutoExpireTest(_LocalFixture):
+ __backend__ = True
def test_expunge_pending_on_rollback(self):
User = self.classes.User
assert u1.name == 'will'
class TwoPhaseTest(_LocalFixture):
+ __backend__ = True
@testing.requires.two_phase_transactions
def test_rollback_on_prepare(self):
assert u not in s
class RollbackRecoverTest(_LocalFixture):
+ __backend__ = True
def test_pk_violation(self):
User, Address = self.classes.User, self.classes.Address
class SavepointTest(_LocalFixture):
+ __backend__ = True
@testing.requires.savepoints
def test_savepoint_rollback(self):
class AccountingFlagsTest(_LocalFixture):
+ __backend__ = True
+
def test_no_expire_on_commit(self):
User, users = self.classes.User, self.tables.users
class AutoCommitTest(_LocalFixture):
+ __backend__ = True
+
def test_begin_nested_requires_trans(self):
sess = create_session(autocommit=True)
assert_raises(sa_exc.InvalidRequestError, sess.begin_nested)
eq_(u1.id, 3)
class NaturalPKRollbackTest(fixtures.MappedTest):
+ __backend__ = True
+
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
return _uuids.pop(0)
class VersioningTest(fixtures.MappedTest):
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
)
class ColumnTypeTest(fixtures.MappedTest):
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
s1.commit()
class RowSwitchTest(fixtures.MappedTest):
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('p', metadata,
session.commit()
class AlternateGeneratorTest(fixtures.MappedTest):
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('p', metadata,
versioning column.
"""
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('base', metadata,
class ServerVersioningTest(fixtures.MappedTest):
run_define_tables = 'each'
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class ManualVersionTest(fixtures.MappedTest):
run_define_tables = 'each'
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults):
__dialect__ = 'default'
+ __backend__ = True
+
@testing.provide_metadata
def test_pk_fk_constraint_create(self):
t = f = f2 = ts = currenttime = metadata = default_generator = None
class DefaultTest(fixtures.TestBase):
+ __backend__ = True
@classmethod
def setup_class(cls):
class PKDefaultTest(fixtures.TablesTest):
__requires__ = ('subqueries',)
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class PKIncrementTest(fixtures.TablesTest):
run_define_tables = 'each'
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class EmptyInsertTest(fixtures.TestBase):
+ __backend__ = True
+
@testing.exclude('sqlite', '<', (3, 3, 8), 'no empty insert support')
@testing.fails_on('oracle', 'FIXME: unknown')
@testing.provide_metadata
class AutoIncrementTest(fixtures.TablesTest):
__requires__ = ('identity',)
run_define_tables = 'each'
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__dialect__ = 'default'
+ __backend__ = True
def test_create_drop_ddl(self):
self.assert_compile(
class SequenceExecTest(fixtures.TestBase):
__requires__ = ('sequences',)
+ __backend__ = True
@classmethod
def setup_class(cls):
class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__requires__ = ('sequences',)
+ __backend__ = True
@testing.fails_on('firebird', 'no FB support for start/increment')
def test_start_increment(self):
cartitems = sometable = metadata = None
class TableBoundSequenceTest(fixtures.TestBase):
__requires__ = ('sequences',)
+ __backend__ = True
@classmethod
def setup_class(cls):
rather than the class of "type" itself.
"""
+ __backend__ = True
@classmethod
def setup_class(cls):
self._run_test(server_default='1', autoincrement=False)
class ServerDefaultsOnPKTest(fixtures.TestBase):
+ __backend__ = True
+
@testing.provide_metadata
def test_string_default_none_on_insert(self):
"""Test that without implicit returning, we return None for
)
class UnicodeDefaultsTest(fixtures.TestBase):
+ __backend__ = True
+
def test_no_default(self):
Column(Unicode(32))
class JoinExecTest(_JoinRewriteTestBase, fixtures.TestBase):
"""invoke the SQL on the current backend to ensure compatibility"""
+ __backend__ = True
+
_a_bc = _a_bc_comma_a1_selbc = _a__b_dc = _a_bkeyassoc = \
_a_bkeyassoc_aliased = _a_atobalias_balias_c_w_exists = \
_a_atobalias_balias = _b_ab1_union_c_ab2 = None
# sqlalhcemy.testing.suite
class QueryTest(fixtures.TestBase):
+ __backend__ = True
@classmethod
def setup_class(cls):
"""
run_create_tables = 'each'
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class KeyTargetingTest(fixtures.TablesTest):
run_inserts = 'once'
run_deletes = None
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
class LimitTest(fixtures.TestBase):
+ __backend__ = True
@classmethod
def setup_class(cls):
class CompoundTest(fixtures.TestBase):
"""test compound statements like UNION, INTERSECT, particularly their ability to nest on
different databases."""
+
+ __backend__ = True
+
@classmethod
def setup_class(cls):
global metadata, t1, t2, t3
`JOIN rhs ON lhs.col=rhs.col` vs `rhs.col=lhs.col`. At least one
database seems to be sensitive to this.
"""
+ __backend__ = True
@classmethod
def setup_class(cls):
class OperatorTest(fixtures.TestBase):
+ __backend__ = True
+
@classmethod
def setup_class(cls):
global metadata, flds
from sqlalchemy.sql.elements import quoted_name, _truncated_label, _anonymous_label
from sqlalchemy.testing.util import picklers
-class QuoteTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = 'default'
+class QuoteExecTest(fixtures.TestBase):
+ __backend__ = True
@classmethod
def setup_class(cls):
result = select(columns, use_labels=True).execute().fetchall()
assert(result == [(1, 2, 3), (2, 2, 3), (4, 3, 2)])
+class QuoteTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ @classmethod
+ def setup_class(cls):
+ # TODO: figure out which databases/which identifiers allow special
+ # characters to be used, such as: spaces, quote characters,
+ # punctuation characters, set up tests for those as well.
+
+ global table1, table2
+ metadata = MetaData(testing.db)
+
+ table1 = Table('WorstCase1', metadata,
+ Column('lowercase', Integer, primary_key=True),
+ Column('UPPERCASE', Integer),
+ Column('MixedCase', Integer),
+ Column('ASC', Integer, key='a123'))
+ table2 = Table('WorstCase2', metadata,
+ Column('desc', Integer, primary_key=True, key='d123'),
+ Column('Union', Integer, key='u123'),
+ Column('MixedCase', Integer))
+
@testing.crashes('oracle', 'FIXME: unknown, verify not fails_on')
@testing.requires.subqueries
def test_labels(self):
where the "UPPERCASE" column of "LaLa" doesn't exist.
"""
- x = table1.select(distinct=True).alias('LaLa').select().scalar()
self.assert_compile(
table1.select(distinct=True).alias('LaLa').select(),
'SELECT '
class ReturningTest(fixtures.TestBase, AssertsExecutionResults):
__requires__ = 'returning',
+ __backend__ = True
def setup(self):
meta = MetaData(testing.db)
class SequenceReturningTest(fixtures.TestBase):
__requires__ = 'returning', 'sequences'
+ __backend__ = True
def setup(self):
meta = MetaData(testing.db)
"""test returning() works with columns that define 'key'."""
__requires__ = 'returning',
+ __backend__ = True
def setup(self):
meta = MetaData(testing.db)
class ReturnDefaultsTest(fixtures.TablesTest):
__requires__ = ('returning', )
run_define_tables = 'each'
+ __backend__ = True
@classmethod
def define_tables(cls, metadata):
)
class ImplicitReturningFlag(fixtures.TestBase):
+ __backend__ = True
+
def test_flag_turned_off(self):
e = engines.testing_engine(options={'implicit_returning':False})
assert e.dialect.implicit_returning is False
"""tests rowcount functionality"""
__requires__ = ('sane_rowcount', )
+ __backend__ = True
@classmethod
def setup_class(cls):
sqlalchemy/testing/suite/test_types.py.
"""
+ __backend__ = True
def test_native_unicode(self):
"""assert expected values for 'native unicode' mode"""
class UnicodeSchemaTest(fixtures.TestBase):
__requires__ = ('unicode_ddl',)
+ __backend__ = True
@classmethod
def setup_class(cls):
class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest):
+ __backend__ = True
@testing.requires.update_from
def test_exec_two_table(self):
class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase,
fixtures.TablesTest):
+ __backend__ = True
+
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,