From: Mike Bayer Date: Thu, 27 Sep 2012 20:11:32 +0000 (-0400) Subject: - more tests, move some tests out of test_reflection, test_query X-Git-Tag: rel_0_8_0b1~118 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=b9ea55f8616156820dca31ae0c65ba0115086e1e;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - more tests, move some tests out of test_reflection, test_query --- diff --git a/CHANGES b/CHANGES index a727501a76..e3a893f34b 100644 --- a/CHANGES +++ b/CHANGES @@ -349,6 +349,21 @@ underneath "0.7.xx". phrases that invoke separately when RETURNING is not used with INSERT. [ticket:2459] + - [feature] The libraries used by the test suite + have been moved around a bit so that they are + part of the SQLAlchemy install again. In addition, + a new suite of tests is present in the + new sqlalchemy.testing.suite package. This is + an under-development system that hopes to provide + a universal testing suite for external dialects. + Dialects which are maintained outside of SQLAlchemy + can use the new test fixture as the framework + for their own tests, and will get for free a + "compliance" suite of dialect-focused tests, + including an improved "requirements" system + where specific capabilities and features can + be enabled or disabled for testing. + - [bug] Fixed bug whereby if a database restart affected multiple connections, each connection would individually invoke a new diff --git a/lib/sqlalchemy/testing/assertsql.py b/lib/sqlalchemy/testing/assertsql.py index 897f4b3b1d..08ee55d571 100644 --- a/lib/sqlalchemy/testing/assertsql.py +++ b/lib/sqlalchemy/testing/assertsql.py @@ -1,8 +1,6 @@ -from sqlalchemy.interfaces import ConnectionProxy -from sqlalchemy.engine.default import DefaultDialect -from sqlalchemy.engine.base import Connection -from sqlalchemy import util +from ..engine.default import DefaultDialect +from .. import util import re class AssertRule(object): @@ -262,16 +260,16 @@ def _process_assertion_statement(query, context): paramstyle = context.dialect.paramstyle if paramstyle == 'named': pass - elif paramstyle =='pyformat': + elif paramstyle == 'pyformat': query = re.sub(r':([\w_]+)', r"%(\1)s", query) else: # positional params repl = None - if paramstyle=='qmark': + if paramstyle == 'qmark': repl = "?" - elif paramstyle=='format': + elif paramstyle == 'format': repl = r"%s" - elif paramstyle=='numeric': + elif paramstyle == 'numeric': repl = None query = re.sub(r':([\w_]+)', repl, query) diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py index f7401550ec..74e22adf1e 100644 --- a/lib/sqlalchemy/testing/engines.py +++ b/lib/sqlalchemy/testing/engines.py @@ -5,7 +5,7 @@ import weakref from collections import deque from . import config from .util import decorator -from sqlalchemy import event, pool +from .. import event, pool import re import warnings diff --git a/lib/sqlalchemy/testing/exclusions.py b/lib/sqlalchemy/testing/exclusions.py index ba2eebe4f7..96dd0d6934 100644 --- a/lib/sqlalchemy/testing/exclusions.py +++ b/lib/sqlalchemy/testing/exclusions.py @@ -1,49 +1,63 @@ import operator from nose import SkipTest -from sqlalchemy.util import decorator +from ..util import decorator from . import config -from sqlalchemy import util +from .. import util -def fails_if(predicate, reason=None): - predicate = _as_predicate(predicate) - - @decorator - def decorate(fn, *args, **kw): - if not predicate(): - return fn(*args, **kw) - else: - try: - fn(*args, **kw) - except Exception, ex: - print ("'%s' failed as expected (%s): %s " % ( - fn.__name__, predicate, str(ex))) - return True - else: - raise AssertionError( - "Unexpected success for '%s' (%s)" % - (fn.__name__, predicate)) - return decorate +class fails_if(object): + def __init__(self, predicate, reason=None): + self.predicate = _as_predicate(predicate) + self.reason = reason -def skip_if(predicate, reason=None): - predicate = _as_predicate(predicate) + @property + def enabled(self): + return not self.predicate() - @decorator - def decorate(fn, *args, **kw): - if predicate(): - if reason: - msg = "'%s' : %s" % ( - fn.__name__, - reason - ) + def __call__(self, fn): + @decorator + def decorate(fn, *args, **kw): + if not self.predicate(): + return fn(*args, **kw) else: - msg = "'%s': %s" % ( - fn.__name__, predicate - ) - raise SkipTest(msg) - else: - return fn(*args, **kw) - return decorate + try: + fn(*args, **kw) + except Exception, ex: + print ("'%s' failed as expected (%s): %s " % ( + fn.__name__, self.predicate, str(ex))) + return True + else: + raise AssertionError( + "Unexpected success for '%s' (%s)" % + (fn.__name__, self.predicate)) + return decorate(fn) + +class skip_if(object): + def __init__(self, predicate, reason=None): + self.predicate = _as_predicate(predicate) + self.reason = reason + + @property + def enabled(self): + return not self.predicate() + + def __call__(self, fn): + @decorator + def decorate(fn, *args, **kw): + if self.predicate(): + if self.reason: + msg = "'%s' : %s" % ( + fn.__name__, + self.reason + ) + else: + msg = "'%s': %s" % ( + fn.__name__, self.predicate + ) + raise SkipTest(msg) + else: + return fn(*args, **kw) + return decorate(fn) def only_if(predicate, reason=None): predicate = _as_predicate(predicate) @@ -69,6 +83,23 @@ class Predicate(object): else: assert False, "unknown predicate type: %s" % predicate +class BooleanPredicate(Predicate): + def __init__(self, value, description=None): + self.value = value + self.description = description + + def __call__(self): + return self.value + + def _as_string(self, negate=False): + if negate: + return "not " + self.description + else: + return self.description + + def __str__(self): + return self._as_string() + class SpecPredicate(Predicate): def __init__(self, db, op=None, spec=None, description=None): self.db = db @@ -232,8 +263,11 @@ def db_spec(*dbs): Predicate.as_predicate(db) for db in dbs ) -def open(fn): - return fn +def open(): + return skip_if(BooleanPredicate(False)) + +def closed(): + return skip_if(BooleanPredicate(True)) @decorator def future(fn, *args, **kw): diff --git a/lib/sqlalchemy/testing/plugin/config.py b/lib/sqlalchemy/testing/plugin/config.py index 946a856ade..6c92928643 100644 --- a/lib/sqlalchemy/testing/plugin/config.py +++ b/lib/sqlalchemy/testing/plugin/config.py @@ -44,19 +44,26 @@ def _engine_strategy(options, opt_str, value, parser): pre_configure = [] post_configure = [] - +def pre(fn): + pre_configure.append(fn) + return fn +def post(fn): + post_configure.append(fn) + return fn + +@pre def _setup_options(opt, file_config): global options options = opt -pre_configure.append(_setup_options) +@pre def _monkeypatch_cdecimal(options, file_config): if options.cdecimal: import sys import cdecimal sys.modules['decimal'] = cdecimal -pre_configure.append(_monkeypatch_cdecimal) +@post def _engine_uri(options, file_config): global db_label, db_url @@ -73,8 +80,8 @@ def _engine_uri(options, file_config): "Unknown URI specifier '%s'. Specify --dbs for known uris." % db_label) db_url = file_config.get('db', db_label) -post_configure.append(_engine_uri) +@post def _require(options, file_config): if not(options.require or (file_config.has_section('require') and @@ -99,14 +106,14 @@ def _require(options, file_config): if seen: continue pkg_resources.require(requirement) -post_configure.append(_require) +@post def _engine_pool(options, file_config): if options.mockpool: from sqlalchemy import pool db_opts['poolclass'] = pool.AssertionPool -post_configure.append(_engine_pool) +@post def _create_testing_engine(options, file_config): from sqlalchemy.testing import engines, config from sqlalchemy import testing @@ -115,8 +122,8 @@ def _create_testing_engine(options, file_config): config.db_opts = db_opts config.db_url = db_url -post_configure.append(_create_testing_engine) +@post def _prep_testing_database(options, file_config): from sqlalchemy.testing import engines from sqlalchemy import schema @@ -137,8 +144,8 @@ def _prep_testing_database(options, file_config): md.drop_all() e.dispose() -post_configure.append(_prep_testing_database) +@post def _set_table_options(options, file_config): from sqlalchemy.testing import schema @@ -149,8 +156,8 @@ def _set_table_options(options, file_config): if options.mysql_engine: table_options['mysql_engine'] = options.mysql_engine -post_configure.append(_set_table_options) +@post def _reverse_topological(options, file_config): if options.reversetop: from sqlalchemy.orm import unitofwork, session, mapper, dependency @@ -158,8 +165,8 @@ def _reverse_topological(options, file_config): from sqlalchemy.testing.util import RandomSet topological.set = unitofwork.set = session.set = mapper.set = \ dependency.set = RandomSet -post_configure.append(_reverse_topological) +@post def _requirements(options, file_config): from sqlalchemy.testing import config from sqlalchemy import testing @@ -175,17 +182,15 @@ def _requirements(options, file_config): req_cls = getattr(mod, clsname) config.requirements = testing.requires = req_cls(db, config) -post_configure.append(_requirements) +@post def _post_setup_options(opt, file_config): from sqlalchemy.testing import config config.options = options -post_configure.append(_post_setup_options) +@post def _setup_profiling(options, file_config): from sqlalchemy.testing import profiling profiling._profile_stats = profiling.ProfileStatsFile( file_config.get('sqla_testing', 'profile_file')) -post_configure.append(_setup_profiling) - diff --git a/lib/sqlalchemy/testing/profiling.py b/lib/sqlalchemy/testing/profiling.py index be32b1d1da..a22e83cbc7 100644 --- a/lib/sqlalchemy/testing/profiling.py +++ b/lib/sqlalchemy/testing/profiling.py @@ -13,25 +13,24 @@ from nose import SkipTest import pstats import time import collections -from sqlalchemy import util +from .. import util try: import cProfile except ImportError: cProfile = None -from sqlalchemy.util.compat import jython, pypy, win32 +from ..util.compat import jython, pypy, win32 _current_test = None def profiled(target=None, **target_opts): """Function profiling. - @profiled('label') + @profiled() or - @profiled('label', report=True, sort=('calls',), limit=20) + @profiled(report=True, sort=('calls',), limit=20) + + Outputs profiling info for a decorated function. - Enables profiling for a function when 'label' is targetted for - profiling. Report options can be supplied, and override the global - configuration and command-line options. """ profile_config = {'targets': set(), diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index eca883d4e1..90385c391e 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -3,32 +3,12 @@ Provides decorators to mark tests requiring specific feature support from the target database. -""" - -from .exclusions import \ - skip, \ - skip_if,\ - only_if,\ - only_on,\ - fails_on,\ - fails_on_everything_except,\ - fails_if,\ - SpecPredicate,\ - against - -def no_support(db, reason): - return SpecPredicate(db, description=reason) - -def exclude(db, op, spec, description=None): - return SpecPredicate(db, op, spec, description=description) +External dialect test suites should subclass SuiteRequirements +to provide specific inclusion/exlusions. +""" -def _chain_decorators_on(*decorators): - def decorate(fn): - for decorator in reversed(decorators): - fn = decorator(fn) - return fn - return decorate +from . import exclusions class Requirements(object): def __init__(self, db, config): @@ -36,3 +16,56 @@ class Requirements(object): self.config = config +class SuiteRequirements(Requirements): + + @property + def create_table(self): + """target platform can emit basic CreateTable DDL.""" + + return exclusions.open() + + @property + def drop_table(self): + """target platform can emit basic DropTable DDL.""" + + return exclusions.open() + + @property + def autoincrement_insert(self): + """target platform generates new surrogate integer primary key values + when insert() is executed, excluding the pk column.""" + + return exclusions.open() + + @property + def returning(self): + """target platform supports RETURNING.""" + + return exclusions.closed() + + @property + def dbapi_lastrowid(self): + """"target platform includes a 'lastrowid' accessor on the DBAPI + cursor object. + + """ + return exclusions.closed() + + @property + def views(self): + """Target database must support VIEWs.""" + + return exclusions.closed() + + @property + def schemas(self): + """Target database must support external schemas, and have one + named 'test_schema'.""" + + return exclusions.closed() + + @property + def sequences(self): + """Target database must support SEQUENCEs.""" + + return self.config.db.dialect.supports_sequences diff --git a/lib/sqlalchemy/testing/schema.py b/lib/sqlalchemy/testing/schema.py index 03da78c646..805c8e5677 100644 --- a/lib/sqlalchemy/testing/schema.py +++ b/lib/sqlalchemy/testing/schema.py @@ -1,9 +1,6 @@ -"""Enhanced versions of schema.Table and schema.Column which establish -desired state for different backends. -""" from . import exclusions -from sqlalchemy import schema, event +from .. import schema, event from . import config __all__ = 'Table', 'Column', diff --git a/lib/sqlalchemy/testing/suite/__init__.py b/lib/sqlalchemy/testing/suite/__init__.py index e69de29bb2..a92ecb4699 100644 --- a/lib/sqlalchemy/testing/suite/__init__.py +++ b/lib/sqlalchemy/testing/suite/__init__.py @@ -0,0 +1,4 @@ +from .test_ddl import * +from .test_insert import * +from .test_update_delete import * +from .test_reflection import * diff --git a/lib/sqlalchemy/testing/suite/requirements.py b/lib/sqlalchemy/testing/suite/requirements.py deleted file mode 100644 index 3ea72adcd1..0000000000 --- a/lib/sqlalchemy/testing/suite/requirements.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Requirement definitions used by the generic dialect suite. - -External dialect test suites should subclass SuiteRequirements -to provide specific inclusion/exlusions. - -""" -from ..requirements import Requirements -from .. import exclusions - - -class SuiteRequirements(Requirements): - - @property - def create_table(self): - """target platform can emit basic CreateTable DDL.""" - - return exclusions.open - - @property - def drop_table(self): - """target platform can emit basic DropTable DDL.""" - - return exclusions.open - - @property - def autoincrement_insert(self): - """target platform generates new surrogate integer primary key values - when insert() is executed, excluding the pk column.""" - - return exclusions.open diff --git a/lib/sqlalchemy/testing/suite/test_ddl.py b/lib/sqlalchemy/testing/suite/test_ddl.py index 1285c41963..c9637cd70e 100644 --- a/lib/sqlalchemy/testing/suite/test_ddl.py +++ b/lib/sqlalchemy/testing/suite/test_ddl.py @@ -45,4 +45,5 @@ class TableDDLTest(fixtures.TestBase): config.db, checkfirst=False ) -__all__ = ('TableDDLTest',) \ No newline at end of file + +__all__ = ('TableDDLTest', ) \ No newline at end of file diff --git a/lib/sqlalchemy/testing/suite/test_insert.py b/lib/sqlalchemy/testing/suite/test_insert.py new file mode 100644 index 0000000000..53a70e0c6a --- /dev/null +++ b/lib/sqlalchemy/testing/suite/test_insert.py @@ -0,0 +1,111 @@ +from .. import fixtures, config +from ..config import requirements +from .. import exclusions +from ..assertions import eq_ +from .. import engines + +from sqlalchemy import Integer, String, select, util + +from ..schema import Table, Column + +class InsertSequencingTest(fixtures.TablesTest): + run_deletes = 'each' + + @classmethod + def define_tables(cls, metadata): + Table('autoinc_pk', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('data', String(50)) + ) + + Table('manual_pk', metadata, + Column('id', Integer, primary_key=True, autoincrement=False), + Column('data', String(50)) + ) + + def _assert_round_trip(self, table): + row = config.db.execute(table.select()).first() + eq_( + row, + (1, "some data") + ) + + @requirements.autoincrement_insert + def test_autoincrement_on_insert(self): + + config.db.execute( + self.tables.autoinc_pk.insert(), + data="some data" + ) + self._assert_round_trip(self.tables.autoinc_pk) + + @requirements.autoincrement_insert + def test_last_inserted_id(self): + + r = config.db.execute( + self.tables.autoinc_pk.insert(), + data="some data" + ) + pk = config.db.scalar(select([self.tables.autoinc_pk.c.id])) + eq_( + r.inserted_primary_key, + [pk] + ) + + @exclusions.fails_if(lambda: util.pypy, "lastrowid not maintained after " + "connection close") + @requirements.dbapi_lastrowid + def test_native_lastrowid_autoinc(self): + r = config.db.execute( + self.tables.autoinc_pk.insert(), + data="some data" + ) + lastrowid = r.lastrowid + pk = config.db.scalar(select([self.tables.autoinc_pk.c.id])) + eq_( + lastrowid, pk + ) + + +class InsertBehaviorTest(fixtures.TablesTest): + run_deletes = 'each' + + @classmethod + def define_tables(cls, metadata): + Table('autoinc_pk', metadata, + Column('id', Integer, primary_key=True, \ + test_needs_autoincrement=True), + Column('data', String(50)) + ) + + def test_autoclose_on_insert(self): + if requirements.returning.enabled: + engine = engines.testing_engine( + options={'implicit_returning': False}) + else: + engine = config.db + + + r = engine.execute( + self.tables.autoinc_pk.insert(), + data="some data" + ) + assert r.closed + assert r.is_insert + assert not r.returns_rows + + @requirements.returning + def test_autoclose_on_insert_implicit_returning(self): + r = config.db.execute( + self.tables.autoinc_pk.insert(), + data="some data" + ) + assert r.closed + assert r.is_insert + assert r.returns_rows + + +__all__ = ('InsertSequencingTest', 'InsertBehaviorTest') + + diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py index e69de29bb2..f816895a46 100644 --- a/lib/sqlalchemy/testing/suite/test_reflection.py +++ b/lib/sqlalchemy/testing/suite/test_reflection.py @@ -0,0 +1,424 @@ +import sqlalchemy as sa +from sqlalchemy import exc as sa_exc +from sqlalchemy import types as sql_types +from sqlalchemy import schema +from sqlalchemy import inspect +from sqlalchemy import MetaData, Integer, String +from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.testing import engines, fixtures +from sqlalchemy.testing.schema import Table, Column +from sqlalchemy.testing import eq_, assert_raises_message +from sqlalchemy import testing +from .. import config + +metadata, users = None, None + + +class HasTableTest(fixtures.TablesTest): + @classmethod + def define_tables(cls, metadata): + Table('test_table', metadata, + Column('id', Integer, primary_key=True), + Column('data', String(50)) + ) + + def test_has_table(self): + with config.db.begin() as conn: + assert config.db.dialect.has_table(conn, "test_table") + assert not config.db.dialect.has_table(conn, "nonexistent_table") + +class HasSequenceTest(fixtures.TestBase): + __requires__ = 'sequences', + + def test_has_sequence(self): + metadata = MetaData() + Table('users', metadata, Column('user_id', sa.Integer, + sa.Sequence('user_id_seq'), primary_key=True), + Column('user_name', sa.String(40))) + metadata.create_all(bind=testing.db) + try: + eq_(testing.db.dialect.has_sequence(testing.db, + 'user_id_seq'), True) + finally: + metadata.drop_all(bind=testing.db) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + False) + + @testing.requires.schemas + def test_has_sequence_schema(self): + test_schema = 'test_schema' + s1 = sa.Sequence('user_id_seq', schema=test_schema) + s2 = sa.Sequence('user_id_seq') + testing.db.execute(schema.CreateSequence(s1)) + testing.db.execute(schema.CreateSequence(s2)) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', + schema=test_schema), True) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + True) + testing.db.execute(schema.DropSequence(s1)) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', + schema=test_schema), False) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + True) + testing.db.execute(schema.DropSequence(s2)) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', + schema=test_schema), False) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + False) + + +def createTables(meta, schema=None): + if schema: + schema_prefix = schema + "." + else: + schema_prefix = "" + + users = Table('users', meta, + Column('user_id', sa.INT, primary_key=True), + Column('user_name', sa.VARCHAR(20), nullable=False), + Column('test1', sa.CHAR(5), nullable=False), + Column('test2', sa.Float(5), nullable=False), + Column('test3', sa.Text), + Column('test4', sa.Numeric(10, 2), nullable=False), + Column('test5', sa.Date), + Column('test5_1', sa.TIMESTAMP), + Column('parent_user_id', sa.Integer, + sa.ForeignKey('%susers.user_id' % schema_prefix)), + Column('test6', sa.Date, nullable=False), + Column('test7', sa.Text), + Column('test8', sa.LargeBinary), + Column('test_passivedefault2', sa.Integer, server_default='5'), + Column('test9', sa.LargeBinary(100)), + Column('test10', sa.Numeric(10, 2)), + schema=schema, + test_needs_fk=True, + ) + dingalings = Table("dingalings", meta, + Column('dingaling_id', sa.Integer, primary_key=True), + Column('address_id', sa.Integer, + sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)), + Column('data', sa.String(30)), + schema=schema, + test_needs_fk=True, + ) + addresses = Table('email_addresses', meta, + Column('address_id', sa.Integer), + Column('remote_user_id', sa.Integer, + sa.ForeignKey(users.c.user_id)), + Column('email_address', sa.String(20)), + sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'), + schema=schema, + test_needs_fk=True, + ) + + return (users, addresses, dingalings) + +def createIndexes(con, schema=None): + fullname = 'users' + if schema: + fullname = "%s.%s" % (schema, 'users') + query = "CREATE INDEX users_t_idx ON %s (test1, test2)" % fullname + con.execute(sa.sql.text(query)) + +@testing.requires.views +def _create_views(con, schema=None): + for table_name in ('users', 'email_addresses'): + fullname = table_name + if schema: + fullname = "%s.%s" % (schema, table_name) + view_name = fullname + '_v' + query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name, + fullname) + con.execute(sa.sql.text(query)) + +@testing.requires.views +def _drop_views(con, schema=None): + for table_name in ('email_addresses', 'users'): + fullname = table_name + if schema: + fullname = "%s.%s" % (schema, table_name) + view_name = fullname + '_v' + query = "DROP VIEW %s" % view_name + con.execute(sa.sql.text(query)) + +class ComponentReflectionTest(fixtures.TestBase): + + @testing.requires.schemas + def test_get_schema_names(self): + insp = inspect(testing.db) + + self.assert_('test_schema' in insp.get_schema_names()) + + def test_dialect_initialize(self): + engine = engines.testing_engine() + assert not hasattr(engine.dialect, 'default_schema_name') + inspect(engine) + assert hasattr(engine.dialect, 'default_schema_name') + + def test_get_default_schema_name(self): + insp = inspect(testing.db) + eq_(insp.default_schema_name, testing.db.dialect.default_schema_name) + + @testing.provide_metadata + def _test_get_table_names(self, schema=None, table_type='table', + order_by=None): + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) + meta.create_all() + _create_views(meta.bind, schema) + try: + insp = inspect(meta.bind) + if table_type == 'view': + table_names = insp.get_view_names(schema) + table_names.sort() + answer = ['email_addresses_v', 'users_v'] + else: + table_names = insp.get_table_names(schema, + order_by=order_by) + if order_by == 'foreign_key': + answer = ['dingalings', 'email_addresses', 'users'] + eq_(table_names, answer) + else: + answer = ['dingalings', 'email_addresses', 'users'] + eq_(sorted(table_names), answer) + finally: + _drop_views(meta.bind, schema) + + def test_get_table_names(self): + self._test_get_table_names() + + def test_get_table_names_fks(self): + self._test_get_table_names(order_by='foreign_key') + + @testing.requires.schemas + def test_get_table_names_with_schema(self): + self._test_get_table_names('test_schema') + + @testing.requires.views + def test_get_view_names(self): + self._test_get_table_names(table_type='view') + + @testing.requires.schemas + def test_get_view_names_with_schema(self): + self._test_get_table_names('test_schema', table_type='view') + + def _test_get_columns(self, schema=None, table_type='table'): + meta = MetaData(testing.db) + users, addresses, dingalings = createTables(meta, schema) + table_names = ['users', 'email_addresses'] + meta.create_all() + if table_type == 'view': + _create_views(meta.bind, schema) + table_names = ['users_v', 'email_addresses_v'] + try: + insp = inspect(meta.bind) + for table_name, table in zip(table_names, (users, + addresses)): + schema_name = schema + cols = insp.get_columns(table_name, schema=schema_name) + self.assert_(len(cols) > 0, len(cols)) + + # should be in order + + for i, col in enumerate(table.columns): + eq_(col.name, cols[i]['name']) + ctype = cols[i]['type'].__class__ + ctype_def = col.type + if isinstance(ctype_def, sa.types.TypeEngine): + ctype_def = ctype_def.__class__ + + # Oracle returns Date for DateTime. + + if testing.against('oracle') and ctype_def \ + in (sql_types.Date, sql_types.DateTime): + ctype_def = sql_types.Date + + # assert that the desired type and return type share + # a base within one of the generic types. + + self.assert_(len(set(ctype.__mro__). + intersection(ctype_def.__mro__).intersection([ + sql_types.Integer, + sql_types.Numeric, + sql_types.DateTime, + sql_types.Date, + sql_types.Time, + sql_types.String, + sql_types._Binary, + ])) > 0, '%s(%s), %s(%s)' % (col.name, + col.type, cols[i]['name'], ctype)) + finally: + if table_type == 'view': + _drop_views(meta.bind, schema) + meta.drop_all() + + def test_get_columns(self): + self._test_get_columns() + + @testing.requires.schemas + def test_get_columns_with_schema(self): + self._test_get_columns(schema='test_schema') + + @testing.requires.views + def test_get_view_columns(self): + self._test_get_columns(table_type='view') + + @testing.requires.views + @testing.requires.schemas + def test_get_view_columns_with_schema(self): + self._test_get_columns(schema='test_schema', table_type='view') + + @testing.provide_metadata + def _test_get_pk_constraint(self, schema=None): + meta = self.metadata + users, addresses, _ = createTables(meta, schema) + meta.create_all() + insp = inspect(meta.bind) + + users_cons = insp.get_pk_constraint(users.name, schema=schema) + users_pkeys = users_cons['constrained_columns'] + eq_(users_pkeys, ['user_id']) + + addr_cons = insp.get_pk_constraint(addresses.name, schema=schema) + addr_pkeys = addr_cons['constrained_columns'] + eq_(addr_pkeys, ['address_id']) + + @testing.requires.reflects_pk_names + def go(): + eq_(addr_cons['name'], 'email_ad_pk') + go() + + def test_get_pk_constraint(self): + self._test_get_pk_constraint() + + @testing.fails_on('sqlite', 'no schemas') + def test_get_pk_constraint_with_schema(self): + self._test_get_pk_constraint(schema='test_schema') + + @testing.provide_metadata + def test_deprecated_get_primary_keys(self): + meta = self.metadata + users, _, _ = createTables(meta, schema=None) + meta.create_all() + insp = Inspector(meta.bind) + assert_raises_message( + sa_exc.SADeprecationWarning, + "Call to deprecated method get_primary_keys." + " Use get_pk_constraint instead.", + insp.get_primary_keys, users.name + ) + + @testing.provide_metadata + def _test_get_foreign_keys(self, schema=None): + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) + meta.create_all() + insp = inspect(meta.bind) + expected_schema = schema + # users + users_fkeys = insp.get_foreign_keys(users.name, + schema=schema) + fkey1 = users_fkeys[0] + + @testing.fails_on('sqlite', 'no support for constraint names') + def go(): + self.assert_(fkey1['name'] is not None) + go() + + eq_(fkey1['referred_schema'], expected_schema) + eq_(fkey1['referred_table'], users.name) + eq_(fkey1['referred_columns'], ['user_id', ]) + eq_(fkey1['constrained_columns'], ['parent_user_id']) + #addresses + addr_fkeys = insp.get_foreign_keys(addresses.name, + schema=schema) + fkey1 = addr_fkeys[0] + @testing.fails_on('sqlite', 'no support for constraint names') + def go(): + self.assert_(fkey1['name'] is not None) + go() + eq_(fkey1['referred_schema'], expected_schema) + eq_(fkey1['referred_table'], users.name) + eq_(fkey1['referred_columns'], ['user_id', ]) + eq_(fkey1['constrained_columns'], ['remote_user_id']) + + def test_get_foreign_keys(self): + self._test_get_foreign_keys() + + @testing.requires.schemas + def test_get_foreign_keys_with_schema(self): + self._test_get_foreign_keys(schema='test_schema') + + @testing.provide_metadata + def _test_get_indexes(self, schema=None): + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) + meta.create_all() + createIndexes(meta.bind, schema) + # The database may decide to create indexes for foreign keys, etc. + # so there may be more indexes than expected. + insp = inspect(meta.bind) + indexes = insp.get_indexes('users', schema=schema) + expected_indexes = [ + {'unique': False, + 'column_names': ['test1', 'test2'], + 'name': 'users_t_idx'}] + index_names = [d['name'] for d in indexes] + for e_index in expected_indexes: + assert e_index['name'] in index_names + index = indexes[index_names.index(e_index['name'])] + for key in e_index: + eq_(e_index[key], index[key]) + + def test_get_indexes(self): + self._test_get_indexes() + + @testing.requires.schemas + def test_get_indexes_with_schema(self): + self._test_get_indexes(schema='test_schema') + + @testing.provide_metadata + def _test_get_view_definition(self, schema=None): + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) + meta.create_all() + _create_views(meta.bind, schema) + view_name1 = 'users_v' + view_name2 = 'email_addresses_v' + try: + insp = inspect(meta.bind) + v1 = insp.get_view_definition(view_name1, schema=schema) + self.assert_(v1) + v2 = insp.get_view_definition(view_name2, schema=schema) + self.assert_(v2) + finally: + _drop_views(meta.bind, schema) + + @testing.requires.views + def test_get_view_definition(self): + self._test_get_view_definition() + + @testing.requires.views + @testing.requires.schemas + def test_get_view_definition_with_schema(self): + self._test_get_view_definition(schema='test_schema') + + @testing.only_on("postgresql", "PG specific feature") + @testing.provide_metadata + def _test_get_table_oid(self, table_name, schema=None): + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) + meta.create_all() + insp = inspect(meta.bind) + oid = insp.get_table_oid(table_name, schema) + self.assert_(isinstance(oid, (int, long))) + + def test_get_table_oid(self): + self._test_get_table_oid('users') + + @testing.requires.schemas + def test_get_table_oid_with_schema(self): + self._test_get_table_oid('users', schema='test_schema') + + +__all__ = ('ComponentReflectionTest', 'HasSequenceTest', 'HasTableTest') \ No newline at end of file diff --git a/lib/sqlalchemy/testing/suite/test_sequencing.py b/lib/sqlalchemy/testing/suite/test_sequencing.py deleted file mode 100644 index 7b09ecb761..0000000000 --- a/lib/sqlalchemy/testing/suite/test_sequencing.py +++ /dev/null @@ -1,36 +0,0 @@ -from .. import fixtures, config, util -from ..config import requirements -from ..assertions import eq_ - -from sqlalchemy import Table, Column, Integer, String - - -class InsertSequencingTest(fixtures.TablesTest): - run_deletes = 'each' - - @classmethod - def define_tables(cls, metadata): - Table('plain_pk', metadata, - Column('id', Integer, primary_key=True), - Column('data', String(50)) - ) - - def _assert_round_trip(self, table): - row = config.db.execute(table.select()).first() - eq_( - row, - (1, "some data") - ) - - @requirements.autoincrement_insert - def test_autoincrement_on_insert(self): - - config.db.execute( - self.tables.plain_pk.insert(), - data="some data" - ) - self._assert_round_trip(self.tables.plain_pk) - - - -__all__ = ('InsertSequencingTest',) \ No newline at end of file diff --git a/lib/sqlalchemy/testing/suite/test_update_delete.py b/lib/sqlalchemy/testing/suite/test_update_delete.py new file mode 100644 index 0000000000..e73b05485b --- /dev/null +++ b/lib/sqlalchemy/testing/suite/test_update_delete.py @@ -0,0 +1,64 @@ +from .. import fixtures, config +from ..config import requirements +from ..assertions import eq_ +from .. import engines + +from sqlalchemy import Integer, String, select +from ..schema import Table, Column + + +class SimpleUpdateDeleteTest(fixtures.TablesTest): + run_deletes = 'each' + + @classmethod + def define_tables(cls, metadata): + Table('plain_pk', metadata, + Column('id', Integer, primary_key=True), + Column('data', String(50)) + ) + + @classmethod + def insert_data(cls): + config.db.execute( + cls.tables.plain_pk.insert(), + [ + {"id":1, "data":"d1"}, + {"id":2, "data":"d2"}, + {"id":3, "data":"d3"}, + ] + ) + + def test_update(self): + t = self.tables.plain_pk + r = config.db.execute( + t.update().where(t.c.id == 2), + data="d2_new" + ) + assert not r.is_insert + assert not r.returns_rows + + eq_( + config.db.execute(t.select().order_by(t.c.id)).fetchall(), + [ + (1, "d1"), + (2, "d2_new"), + (3, "d3") + ] + ) + + def test_delete(self): + t = self.tables.plain_pk + r = config.db.execute( + t.delete().where(t.c.id == 2) + ) + assert not r.is_insert + assert not r.returns_rows + eq_( + config.db.execute(t.select().order_by(t.c.id)).fetchall(), + [ + (1, "d1"), + (3, "d3") + ] + ) + +__all__ = ('SimpleUpdateDeleteTest', ) \ No newline at end of file diff --git a/lib/sqlalchemy/testing/util.py b/lib/sqlalchemy/testing/util.py index 625b9e6a5f..a02053dfb5 100644 --- a/lib/sqlalchemy/testing/util.py +++ b/lib/sqlalchemy/testing/util.py @@ -1,5 +1,5 @@ -from sqlalchemy.util import jython, pypy, defaultdict, decorator -from sqlalchemy.util.compat import decimal +from ..util import jython, pypy, defaultdict, decorator +from ..util.compat import decimal import gc import time diff --git a/lib/sqlalchemy/testing/warnings.py b/lib/sqlalchemy/testing/warnings.py index 799fca128d..7afcc63c5a 100644 --- a/lib/sqlalchemy/testing/warnings.py +++ b/lib/sqlalchemy/testing/warnings.py @@ -1,8 +1,8 @@ from __future__ import absolute_import import warnings -from sqlalchemy import exc as sa_exc -from sqlalchemy import util +from .. import exc as sa_exc +from .. import util def testing_warn(msg, stacklevel=3): """Replaces sqlalchemy.util.warn during tests.""" diff --git a/test/dialect/test_suite.py b/test/dialect/test_suite.py index 9abff42877..4a8a4f67de 100644 --- a/test/dialect/test_suite.py +++ b/test/dialect/test_suite.py @@ -1,5 +1,6 @@ -from sqlalchemy.testing.suite.test_ddl import * -from sqlalchemy.testing.suite.test_sequencing import * +from sqlalchemy.testing.suite import * + + diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py index a909803a13..9ed3e78c16 100644 --- a/test/engine/test_reflection.py +++ b/test/engine/test_reflection.py @@ -1214,45 +1214,6 @@ class SchemaTest(fixtures.TestBase): 'test_schema.email_addresses']) ) -class HasSequenceTest(fixtures.TestBase): - - @testing.requires.sequences - def test_has_sequence(self): - metadata = MetaData() - users = Table('users', metadata, Column('user_id', sa.Integer, - sa.Sequence('user_id_seq'), primary_key=True), - Column('user_name', sa.String(40))) - metadata.create_all(bind=testing.db) - try: - eq_(testing.db.dialect.has_sequence(testing.db, - 'user_id_seq'), True) - finally: - metadata.drop_all(bind=testing.db) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), - False) - - @testing.requires.schemas - @testing.requires.sequences - def test_has_sequence_schema(self): - test_schema = 'test_schema' - s1 = sa.Sequence('user_id_seq', schema=test_schema) - s2 = sa.Sequence('user_id_seq') - testing.db.execute(schema.CreateSequence(s1)) - testing.db.execute(schema.CreateSequence(s2)) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', - schema=test_schema), True) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), - True) - testing.db.execute(schema.DropSequence(s1)) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', - schema=test_schema), False) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), - True) - testing.db.execute(schema.DropSequence(s2)) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', - schema=test_schema), False) - eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), - False) @@ -1406,284 +1367,6 @@ class CaseSensitiveTest(fixtures.TablesTest): eq_(t2.name, "sOmEtAbLe") -class ComponentReflectionTest(fixtures.TestBase): - - @testing.requires.schemas - def test_get_schema_names(self): - insp = inspect(testing.db) - - self.assert_('test_schema' in insp.get_schema_names()) - - def test_dialect_initialize(self): - engine = engines.testing_engine() - assert not hasattr(engine.dialect, 'default_schema_name') - insp = inspect(engine) - assert hasattr(engine.dialect, 'default_schema_name') - - def test_get_default_schema_name(self): - insp = inspect(testing.db) - eq_(insp.default_schema_name, testing.db.dialect.default_schema_name) - - @testing.provide_metadata - def _test_get_table_names(self, schema=None, table_type='table', - order_by=None): - meta = self.metadata - users, addresses, dingalings = createTables(meta, schema) - meta.create_all() - _create_views(meta.bind, schema) - try: - insp = inspect(meta.bind) - if table_type == 'view': - table_names = insp.get_view_names(schema) - table_names.sort() - answer = ['email_addresses_v', 'users_v'] - else: - table_names = insp.get_table_names(schema, - order_by=order_by) - if order_by == 'foreign_key': - answer = ['dingalings', 'email_addresses', 'users'] - eq_(table_names, answer) - else: - answer = ['dingalings', 'email_addresses', 'users'] - eq_(sorted(table_names), answer) - finally: - _drop_views(meta.bind, schema) - - def test_get_table_names(self): - self._test_get_table_names() - - def test_get_table_names_fks(self): - self._test_get_table_names(order_by='foreign_key') - - @testing.requires.schemas - def test_get_table_names_with_schema(self): - self._test_get_table_names('test_schema') - - @testing.requires.views - def test_get_view_names(self): - self._test_get_table_names(table_type='view') - - @testing.requires.schemas - def test_get_view_names_with_schema(self): - self._test_get_table_names('test_schema', table_type='view') - - def _test_get_columns(self, schema=None, table_type='table'): - meta = MetaData(testing.db) - users, addresses, dingalings = createTables(meta, schema) - table_names = ['users', 'email_addresses'] - meta.create_all() - if table_type == 'view': - _create_views(meta.bind, schema) - table_names = ['users_v', 'email_addresses_v'] - try: - insp = inspect(meta.bind) - for table_name, table in zip(table_names, (users, - addresses)): - schema_name = schema - cols = insp.get_columns(table_name, schema=schema_name) - self.assert_(len(cols) > 0, len(cols)) - - # should be in order - - for i, col in enumerate(table.columns): - eq_(col.name, cols[i]['name']) - ctype = cols[i]['type'].__class__ - ctype_def = col.type - if isinstance(ctype_def, sa.types.TypeEngine): - ctype_def = ctype_def.__class__ - - # Oracle returns Date for DateTime. - - if testing.against('oracle') and ctype_def \ - in (sql_types.Date, sql_types.DateTime): - ctype_def = sql_types.Date - - # assert that the desired type and return type share - # a base within one of the generic types. - - self.assert_(len(set(ctype.__mro__). - intersection(ctype_def.__mro__).intersection([ - sql_types.Integer, - sql_types.Numeric, - sql_types.DateTime, - sql_types.Date, - sql_types.Time, - sql_types.String, - sql_types._Binary, - ])) > 0, '%s(%s), %s(%s)' % (col.name, - col.type, cols[i]['name'], ctype)) - finally: - if table_type == 'view': - _drop_views(meta.bind, schema) - meta.drop_all() - - def test_get_columns(self): - self._test_get_columns() - - @testing.requires.schemas - def test_get_columns_with_schema(self): - self._test_get_columns(schema='test_schema') - - @testing.requires.views - def test_get_view_columns(self): - self._test_get_columns(table_type='view') - - @testing.requires.views - @testing.requires.schemas - def test_get_view_columns_with_schema(self): - self._test_get_columns(schema='test_schema', table_type='view') - - @testing.provide_metadata - def _test_get_pk_constraint(self, schema=None): - meta = self.metadata - users, addresses, _ = createTables(meta, schema) - meta.create_all() - insp = inspect(meta.bind) - - users_cons = insp.get_pk_constraint(users.name, schema=schema) - users_pkeys = users_cons['constrained_columns'] - eq_(users_pkeys, ['user_id']) - - addr_cons = insp.get_pk_constraint(addresses.name, schema=schema) - addr_pkeys = addr_cons['constrained_columns'] - eq_(addr_pkeys, ['address_id']) - - @testing.requires.reflects_pk_names - def go(): - eq_(addr_cons['name'], 'email_ad_pk') - go() - - def test_get_pk_constraint(self): - self._test_get_pk_constraint() - - @testing.fails_on('sqlite', 'no schemas') - def test_get_pk_constraint_with_schema(self): - self._test_get_pk_constraint(schema='test_schema') - - @testing.provide_metadata - def test_deprecated_get_primary_keys(self): - meta = self.metadata - users, _, _ = createTables(meta, schema=None) - meta.create_all() - insp = Inspector(meta.bind) - assert_raises_message( - sa_exc.SADeprecationWarning, - "Call to deprecated method get_primary_keys." - " Use get_pk_constraint instead.", - insp.get_primary_keys, users.name - ) - - @testing.provide_metadata - def _test_get_foreign_keys(self, schema=None): - meta = self.metadata - users, addresses, dingalings = createTables(meta, schema) - meta.create_all() - insp = inspect(meta.bind) - expected_schema = schema - # users - users_fkeys = insp.get_foreign_keys(users.name, - schema=schema) - fkey1 = users_fkeys[0] - - @testing.fails_on('sqlite', 'no support for constraint names') - def go(): - self.assert_(fkey1['name'] is not None) - go() - - eq_(fkey1['referred_schema'], expected_schema) - eq_(fkey1['referred_table'], users.name) - eq_(fkey1['referred_columns'], ['user_id', ]) - eq_(fkey1['constrained_columns'], ['parent_user_id']) - #addresses - addr_fkeys = insp.get_foreign_keys(addresses.name, - schema=schema) - fkey1 = addr_fkeys[0] - @testing.fails_on('sqlite', 'no support for constraint names') - def go(): - self.assert_(fkey1['name'] is not None) - go() - eq_(fkey1['referred_schema'], expected_schema) - eq_(fkey1['referred_table'], users.name) - eq_(fkey1['referred_columns'], ['user_id', ]) - eq_(fkey1['constrained_columns'], ['remote_user_id']) - - def test_get_foreign_keys(self): - self._test_get_foreign_keys() - - @testing.requires.schemas - def test_get_foreign_keys_with_schema(self): - self._test_get_foreign_keys(schema='test_schema') - - @testing.provide_metadata - def _test_get_indexes(self, schema=None): - meta = self.metadata - users, addresses, dingalings = createTables(meta, schema) - meta.create_all() - createIndexes(meta.bind, schema) - # The database may decide to create indexes for foreign keys, etc. - # so there may be more indexes than expected. - insp = inspect(meta.bind) - indexes = insp.get_indexes('users', schema=schema) - expected_indexes = [ - {'unique': False, - 'column_names': ['test1', 'test2'], - 'name': 'users_t_idx'}] - index_names = [d['name'] for d in indexes] - for e_index in expected_indexes: - assert e_index['name'] in index_names - index = indexes[index_names.index(e_index['name'])] - for key in e_index: - eq_(e_index[key], index[key]) - - def test_get_indexes(self): - self._test_get_indexes() - - @testing.requires.schemas - def test_get_indexes_with_schema(self): - self._test_get_indexes(schema='test_schema') - - @testing.provide_metadata - def _test_get_view_definition(self, schema=None): - meta = self.metadata - users, addresses, dingalings = createTables(meta, schema) - meta.create_all() - _create_views(meta.bind, schema) - view_name1 = 'users_v' - view_name2 = 'email_addresses_v' - try: - insp = inspect(meta.bind) - v1 = insp.get_view_definition(view_name1, schema=schema) - self.assert_(v1) - v2 = insp.get_view_definition(view_name2, schema=schema) - self.assert_(v2) - finally: - _drop_views(meta.bind, schema) - - @testing.requires.views - def test_get_view_definition(self): - self._test_get_view_definition() - - @testing.requires.views - @testing.requires.schemas - def test_get_view_definition_with_schema(self): - self._test_get_view_definition(schema='test_schema') - - @testing.only_on("postgresql", "PG specific feature") - @testing.provide_metadata - def _test_get_table_oid(self, table_name, schema=None): - meta = self.metadata - users, addresses, dingalings = createTables(meta, schema) - meta.create_all() - insp = inspect(meta.bind) - oid = insp.get_table_oid(table_name, schema) - self.assert_(isinstance(oid, (int, long))) - - def test_get_table_oid(self): - self._test_get_table_oid('users') - - @testing.requires.schemas - def test_get_table_oid_with_schema(self): - self._test_get_table_oid('users', schema='test_schema') class ColumnEventsTest(fixtures.TestBase): @classmethod diff --git a/test/requirements.py b/test/requirements.py index 59350c8e73..e15c132416 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -5,7 +5,7 @@ from sqlalchemy import util import sys -from sqlalchemy.testing.suite.requirements import SuiteRequirements +from sqlalchemy.testing.requirements import SuiteRequirements from sqlalchemy.testing.exclusions import \ skip, \ skip_if,\ diff --git a/test/sql/test_query.py b/test/sql/test_query.py index d14cafc86a..e2f2544c86 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -8,6 +8,10 @@ from sqlalchemy import exc, sql from sqlalchemy.engine import default, result as _result from sqlalchemy.testing.schema import Table, Column +# ongoing - these are old tests. those which are of general use +# to test a dialect are being slowly migrated to +# sqlalhcemy.testing.suite + class QueryTest(fixtures.TestBase): @classmethod @@ -44,12 +48,9 @@ class QueryTest(fixtures.TestBase): def teardown_class(cls): metadata.drop_all() - def test_insert(self): - users.insert().execute(user_id = 7, user_name = 'jack') - assert users.count().scalar() == 1 - def test_insert_heterogeneous_params(self): - """test that executemany parameters are asserted to match the parameter set of the first.""" + """test that executemany parameters are asserted to match the + parameter set of the first.""" assert_raises_message(exc.StatementError, r"A value is required for bind parameter 'user_name', in " @@ -70,13 +71,6 @@ class QueryTest(fixtures.TestBase): {'user_id':9} ) - def test_update(self): - users.insert().execute(user_id = 7, user_name = 'jack') - assert users.count().scalar() == 1 - - users.update(users.c.user_id == 7).execute(user_name = 'fred') - assert users.select(users.c.user_id==7).execute().first()['user_name'] == 'fred' - def test_lastrow_accessor(self): """Tests the inserted_primary_key and lastrow_has_id() functions.""" @@ -196,7 +190,8 @@ class QueryTest(fixtures.TestBase): ) t6 = Table("t6", metadata, Column('manual_id', Integer, ForeignKey('related.id'), primary_key=True), - Column('auto_id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('auto_id', Integer, primary_key=True, + test_needs_autoincrement=True), mysql_engine='MyISAM' ) @@ -208,21 +203,6 @@ class QueryTest(fixtures.TestBase): r = t6.insert().values(manual_id=id).execute() eq_(r.inserted_primary_key, [12, 1]) - def test_autoclose_on_insert(self): - if testing.against('firebird', 'postgresql', 'oracle', 'mssql'): - test_engines = [ - engines.testing_engine(options={'implicit_returning':False}), - engines.testing_engine(options={'implicit_returning':True}), - ] - else: - test_engines = [testing.db] - - for engine in test_engines: - - r = engine.execute(users.insert(), - {'user_name':'jack'}, - ) - assert r.closed def test_row_iteration(self): users.insert().execute( @@ -563,16 +543,6 @@ class QueryTest(fixtures.TestBase): ) - def test_delete(self): - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') - print repr(users.select().execute().fetchall()) - - users.delete(users.c.user_name == 'fred').execute() - - print repr(users.select().execute().fetchall()) - - @testing.exclude('mysql', '<', (5, 0, 37), 'database bug') def test_scalar_select(self): @@ -840,47 +810,7 @@ class QueryTest(fixtures.TestBase): lambda: r['foo'] ) - @testing.fails_if(lambda: util.pypy, "lastrowid not maintained after " - "connection close") - @testing.requires.dbapi_lastrowid - def test_native_lastrowid(self): - r = testing.db.execute( - users.insert(), - {'user_id':1, 'user_name':'ed'} - ) - - eq_(r.lastrowid, 1) - - def test_returns_rows_flag_insert(self): - r = testing.db.execute( - users.insert(), - {'user_id':1, 'user_name':'ed'} - ) - assert r.is_insert - assert not r.returns_rows - def test_returns_rows_flag_update(self): - r = testing.db.execute( - users.update().values(user_name='fred') - ) - assert not r.is_insert - assert not r.returns_rows - - def test_returns_rows_flag_select(self): - r = testing.db.execute( - users.select() - ) - assert not r.is_insert - assert r.returns_rows - - @testing.requires.returning - def test_returns_rows_flag_insert_returning(self): - r = testing.db.execute( - users.insert().returning(users.c.user_id), - {'user_id':1, 'user_name':'ed'} - ) - assert r.is_insert - assert r.returns_rows def test_graceful_fetch_on_non_rows(self): """test that calling fetchone() etc. on a result that doesn't