]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- more tests, move some tests out of test_reflection, test_query
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 27 Sep 2012 20:11:32 +0000 (16:11 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 27 Sep 2012 20:11:32 +0000 (16:11 -0400)
21 files changed:
CHANGES
lib/sqlalchemy/testing/assertsql.py
lib/sqlalchemy/testing/engines.py
lib/sqlalchemy/testing/exclusions.py
lib/sqlalchemy/testing/plugin/config.py
lib/sqlalchemy/testing/profiling.py
lib/sqlalchemy/testing/requirements.py
lib/sqlalchemy/testing/schema.py
lib/sqlalchemy/testing/suite/__init__.py
lib/sqlalchemy/testing/suite/requirements.py [deleted file]
lib/sqlalchemy/testing/suite/test_ddl.py
lib/sqlalchemy/testing/suite/test_insert.py [new file with mode: 0644]
lib/sqlalchemy/testing/suite/test_reflection.py
lib/sqlalchemy/testing/suite/test_sequencing.py [deleted file]
lib/sqlalchemy/testing/suite/test_update_delete.py [new file with mode: 0644]
lib/sqlalchemy/testing/util.py
lib/sqlalchemy/testing/warnings.py
test/dialect/test_suite.py
test/engine/test_reflection.py
test/requirements.py
test/sql/test_query.py

diff --git a/CHANGES b/CHANGES
index a727501a76d9df65ba9d632ca82740102cf447c9..e3a893f34b2580a9870af6d6d48bf8ccb01ec39f 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -349,6 +349,21 @@ underneath "0.7.xx".
     phrases that invoke separately when RETURNING
     is not used with INSERT.  [ticket:2459]
 
+  - [feature] The libraries used by the test suite
+    have been moved around a bit so that they are
+    part of the SQLAlchemy install again.  In addition,
+    a new suite of tests is present in the
+    new sqlalchemy.testing.suite package.  This is
+    an under-development system that hopes to provide
+    a universal testing suite for external dialects.
+    Dialects which are maintained outside of SQLAlchemy
+    can use the new test fixture as the framework
+    for their own tests, and will get for free a
+    "compliance" suite of dialect-focused tests,
+    including an improved "requirements" system
+    where specific capabilities and features can
+    be enabled or disabled for testing.
+
   - [bug] Fixed bug whereby if a database restart
     affected multiple connections, each
     connection would individually invoke a new
index 897f4b3b1d2df6a07bacce27f81e5f9ef51df150..08ee55d57194f6192d3183fb8bb3ecd6786f9d1a 100644 (file)
@@ -1,8 +1,6 @@
 
-from sqlalchemy.interfaces import ConnectionProxy
-from sqlalchemy.engine.default import DefaultDialect
-from sqlalchemy.engine.base import Connection
-from sqlalchemy import util
+from ..engine.default import DefaultDialect
+from .. import util
 import re
 
 class AssertRule(object):
@@ -262,16 +260,16 @@ def _process_assertion_statement(query, context):
     paramstyle = context.dialect.paramstyle
     if paramstyle == 'named':
         pass
-    elif paramstyle =='pyformat':
+    elif paramstyle == 'pyformat':
         query = re.sub(r':([\w_]+)', r"%(\1)s", query)
     else:
         # positional params
         repl = None
-        if paramstyle=='qmark':
+        if paramstyle == 'qmark':
             repl = "?"
-        elif paramstyle=='format':
+        elif paramstyle == 'format':
             repl = r"%s"
-        elif paramstyle=='numeric':
+        elif paramstyle == 'numeric':
             repl = None
         query = re.sub(r':([\w_]+)', repl, query)
 
index f7401550ec49b3c671070a209a71a2ce3d85fccb..74e22adf1e1fce8527d86c5695458e3c20fed124 100644 (file)
@@ -5,7 +5,7 @@ import weakref
 from collections import deque
 from . import config
 from .util import decorator
-from sqlalchemy import event, pool
+from .. import event, pool
 import re
 import warnings
 
index ba2eebe4f76ea7007be7be6737f057780b07868d..96dd0d69342d66548584973afc9df3d36e7a6d64 100644 (file)
@@ -1,49 +1,63 @@
 import operator
 from nose import SkipTest
-from sqlalchemy.util import decorator
+from ..util import decorator
 from . import config
-from sqlalchemy import util
+from .. import util
 
 
-def fails_if(predicate, reason=None):
-    predicate = _as_predicate(predicate)
-
-    @decorator
-    def decorate(fn, *args, **kw):
-        if not predicate():
-            return fn(*args, **kw)
-        else:
-            try:
-                fn(*args, **kw)
-            except Exception, ex:
-                print ("'%s' failed as expected (%s): %s " % (
-                    fn.__name__, predicate, str(ex)))
-                return True
-            else:
-                raise AssertionError(
-                    "Unexpected success for '%s' (%s)" %
-                    (fn.__name__, predicate))
-    return decorate
+class fails_if(object):
+    def __init__(self, predicate, reason=None):
+        self.predicate = _as_predicate(predicate)
+        self.reason = reason
 
-def skip_if(predicate, reason=None):
-    predicate = _as_predicate(predicate)
+    @property
+    def enabled(self):
+        return not self.predicate()
 
-    @decorator
-    def decorate(fn, *args, **kw):
-        if predicate():
-            if reason:
-                msg = "'%s' : %s" % (
-                        fn.__name__,
-                        reason
-                    )
+    def __call__(self, fn):
+        @decorator
+        def decorate(fn, *args, **kw):
+            if not self.predicate():
+                return fn(*args, **kw)
             else:
-                msg = "'%s': %s" % (
-                        fn.__name__, predicate
-                    )
-            raise SkipTest(msg)
-        else:
-            return fn(*args, **kw)
-    return decorate
+                try:
+                    fn(*args, **kw)
+                except Exception, ex:
+                    print ("'%s' failed as expected (%s): %s " % (
+                        fn.__name__, self.predicate, str(ex)))
+                    return True
+                else:
+                    raise AssertionError(
+                        "Unexpected success for '%s' (%s)" %
+                        (fn.__name__, self.predicate))
+        return decorate(fn)
+
+class skip_if(object):
+    def __init__(self, predicate, reason=None):
+        self.predicate = _as_predicate(predicate)
+        self.reason = reason
+
+    @property
+    def enabled(self):
+        return not self.predicate()
+
+    def __call__(self, fn):
+        @decorator
+        def decorate(fn, *args, **kw):
+            if self.predicate():
+                if self.reason:
+                    msg = "'%s' : %s" % (
+                            fn.__name__,
+                            self.reason
+                        )
+                else:
+                    msg = "'%s': %s" % (
+                            fn.__name__, self.predicate
+                        )
+                raise SkipTest(msg)
+            else:
+                return fn(*args, **kw)
+        return decorate(fn)
 
 def only_if(predicate, reason=None):
     predicate = _as_predicate(predicate)
@@ -69,6 +83,23 @@ class Predicate(object):
         else:
             assert False, "unknown predicate type: %s" % predicate
 
+class BooleanPredicate(Predicate):
+    def __init__(self, value, description=None):
+        self.value = value
+        self.description = description
+
+    def __call__(self):
+        return self.value
+
+    def _as_string(self, negate=False):
+        if negate:
+            return "not " + self.description
+        else:
+            return self.description
+
+    def __str__(self):
+        return self._as_string()
+
 class SpecPredicate(Predicate):
     def __init__(self, db, op=None, spec=None, description=None):
         self.db = db
@@ -232,8 +263,11 @@ def db_spec(*dbs):
             Predicate.as_predicate(db) for db in dbs
         )
 
-def open(fn):
-    return fn
+def open():
+    return skip_if(BooleanPredicate(False))
+
+def closed():
+    return skip_if(BooleanPredicate(True))
 
 @decorator
 def future(fn, *args, **kw):
index 946a856ade56b608ef992741f44fe2279f573e63..6c9292864388de8874704c6f168b9469873fe5d2 100644 (file)
@@ -44,19 +44,26 @@ def _engine_strategy(options, opt_str, value, parser):
 
 pre_configure = []
 post_configure = []
-
+def pre(fn):
+    pre_configure.append(fn)
+    return fn
+def post(fn):
+    post_configure.append(fn)
+    return fn
+
+@pre
 def _setup_options(opt, file_config):
     global options
     options = opt
-pre_configure.append(_setup_options)
 
+@pre
 def _monkeypatch_cdecimal(options, file_config):
     if options.cdecimal:
         import sys
         import cdecimal
         sys.modules['decimal'] = cdecimal
-pre_configure.append(_monkeypatch_cdecimal)
 
+@post
 def _engine_uri(options, file_config):
     global db_label, db_url
 
@@ -73,8 +80,8 @@ def _engine_uri(options, file_config):
                 "Unknown URI specifier '%s'.  Specify --dbs for known uris."
                         % db_label)
         db_url = file_config.get('db', db_label)
-post_configure.append(_engine_uri)
 
+@post
 def _require(options, file_config):
     if not(options.require or
            (file_config.has_section('require') and
@@ -99,14 +106,14 @@ def _require(options, file_config):
             if seen:
                 continue
             pkg_resources.require(requirement)
-post_configure.append(_require)
 
+@post
 def _engine_pool(options, file_config):
     if options.mockpool:
         from sqlalchemy import pool
         db_opts['poolclass'] = pool.AssertionPool
-post_configure.append(_engine_pool)
 
+@post
 def _create_testing_engine(options, file_config):
     from sqlalchemy.testing import engines, config
     from sqlalchemy import testing
@@ -115,8 +122,8 @@ def _create_testing_engine(options, file_config):
     config.db_opts = db_opts
     config.db_url = db_url
 
-post_configure.append(_create_testing_engine)
 
+@post
 def _prep_testing_database(options, file_config):
     from sqlalchemy.testing import engines
     from sqlalchemy import schema
@@ -137,8 +144,8 @@ def _prep_testing_database(options, file_config):
             md.drop_all()
         e.dispose()
 
-post_configure.append(_prep_testing_database)
 
+@post
 def _set_table_options(options, file_config):
     from sqlalchemy.testing import schema
 
@@ -149,8 +156,8 @@ def _set_table_options(options, file_config):
 
     if options.mysql_engine:
         table_options['mysql_engine'] = options.mysql_engine
-post_configure.append(_set_table_options)
 
+@post
 def _reverse_topological(options, file_config):
     if options.reversetop:
         from sqlalchemy.orm import unitofwork, session, mapper, dependency
@@ -158,8 +165,8 @@ def _reverse_topological(options, file_config):
         from sqlalchemy.testing.util import RandomSet
         topological.set = unitofwork.set = session.set = mapper.set = \
                 dependency.set = RandomSet
-post_configure.append(_reverse_topological)
 
+@post
 def _requirements(options, file_config):
     from sqlalchemy.testing import config
     from sqlalchemy import testing
@@ -175,17 +182,15 @@ def _requirements(options, file_config):
     req_cls = getattr(mod, clsname)
     config.requirements = testing.requires = req_cls(db, config)
 
-post_configure.append(_requirements)
 
+@post
 def _post_setup_options(opt, file_config):
     from sqlalchemy.testing import config
     config.options = options
-post_configure.append(_post_setup_options)
 
+@post
 def _setup_profiling(options, file_config):
     from sqlalchemy.testing import profiling
     profiling._profile_stats = profiling.ProfileStatsFile(
                 file_config.get('sqla_testing', 'profile_file'))
 
-post_configure.append(_setup_profiling)
-
index be32b1d1da881fdef260b2100de44fb748ca64eb..a22e83cbc74b9323989979e0afd11455ca9b9eee 100644 (file)
@@ -13,25 +13,24 @@ from nose import SkipTest
 import pstats
 import time
 import collections
-from sqlalchemy import util
+from .. import util
 try:
     import cProfile
 except ImportError:
     cProfile = None
-from sqlalchemy.util.compat import jython, pypy, win32
+from ..util.compat import jython, pypy, win32
 
 _current_test = None
 
 def profiled(target=None, **target_opts):
     """Function profiling.
 
-    @profiled('label')
+    @profiled()
     or
-    @profiled('label', report=True, sort=('calls',), limit=20)
+    @profiled(report=True, sort=('calls',), limit=20)
+
+    Outputs profiling info for a decorated function.
 
-    Enables profiling for a function when 'label' is targetted for
-    profiling.  Report options can be supplied, and override the global
-    configuration and command-line options.
     """
 
     profile_config = {'targets': set(),
index eca883d4e1a98f063cd499ebf1c33c4bc684ad9e..90385c391e27e8ee5911ce401dce4619176daff5 100644 (file)
@@ -3,32 +3,12 @@
 Provides decorators to mark tests requiring specific feature support from the
 target database.
 
-"""
-
-from .exclusions import \
-     skip, \
-     skip_if,\
-     only_if,\
-     only_on,\
-     fails_on,\
-     fails_on_everything_except,\
-     fails_if,\
-     SpecPredicate,\
-     against
-
-def no_support(db, reason):
-    return SpecPredicate(db, description=reason)
-
-def exclude(db, op, spec, description=None):
-    return SpecPredicate(db, op, spec, description=description)
+External dialect test suites should subclass SuiteRequirements
+to provide specific inclusion/exlusions.
 
+"""
 
-def _chain_decorators_on(*decorators):
-    def decorate(fn):
-        for decorator in reversed(decorators):
-            fn = decorator(fn)
-        return fn
-    return decorate
+from . import exclusions
 
 class Requirements(object):
     def __init__(self, db, config):
@@ -36,3 +16,56 @@ class Requirements(object):
         self.config = config
 
 
+class SuiteRequirements(Requirements):
+
+    @property
+    def create_table(self):
+        """target platform can emit basic CreateTable DDL."""
+
+        return exclusions.open()
+
+    @property
+    def drop_table(self):
+        """target platform can emit basic DropTable DDL."""
+
+        return exclusions.open()
+
+    @property
+    def autoincrement_insert(self):
+        """target platform generates new surrogate integer primary key values
+        when insert() is executed, excluding the pk column."""
+
+        return exclusions.open()
+
+    @property
+    def returning(self):
+        """target platform supports RETURNING."""
+
+        return exclusions.closed()
+
+    @property
+    def dbapi_lastrowid(self):
+        """"target platform includes a 'lastrowid' accessor on the DBAPI
+        cursor object.
+
+        """
+        return exclusions.closed()
+
+    @property
+    def views(self):
+        """Target database must support VIEWs."""
+
+        return exclusions.closed()
+
+    @property
+    def schemas(self):
+        """Target database must support external schemas, and have one
+        named 'test_schema'."""
+
+        return exclusions.closed()
+
+    @property
+    def sequences(self):
+        """Target database must support SEQUENCEs."""
+
+        return self.config.db.dialect.supports_sequences
index 03da78c6469fdfd7da8f5115bad7ef0be9fd3b08..805c8e56774429a554ccc16de940e9decfaa6274 100644 (file)
@@ -1,9 +1,6 @@
-"""Enhanced versions of schema.Table and schema.Column which establish
-desired state for different backends.
-"""
 
 from . import exclusions
-from sqlalchemy import schema, event
+from .. import schema, event
 from . import config
 
 __all__ = 'Table', 'Column',
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..a92ecb46990d6fbb7c9e20b94a8e5b0b1445ff7e 100644 (file)
@@ -0,0 +1,4 @@
+from .test_ddl import *
+from .test_insert import *
+from .test_update_delete import *
+from .test_reflection import *
diff --git a/lib/sqlalchemy/testing/suite/requirements.py b/lib/sqlalchemy/testing/suite/requirements.py
deleted file mode 100644 (file)
index 3ea72ad..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-"""Requirement definitions used by the generic dialect suite.
-
-External dialect test suites should subclass SuiteRequirements
-to provide specific inclusion/exlusions.
-
-"""
-from ..requirements import Requirements
-from .. import exclusions
-
-
-class SuiteRequirements(Requirements):
-
-    @property
-    def create_table(self):
-        """target platform can emit basic CreateTable DDL."""
-
-        return exclusions.open
-
-    @property
-    def drop_table(self):
-        """target platform can emit basic DropTable DDL."""
-
-        return exclusions.open
-
-    @property
-    def autoincrement_insert(self):
-        """target platform generates new surrogate integer primary key values
-        when insert() is executed, excluding the pk column."""
-
-        return exclusions.open
index 1285c41963f88202fcd705ea70e96f6777250b3f..c9637cd70e034146dc402df7770a1d9a72b8137a 100644 (file)
@@ -45,4 +45,5 @@ class TableDDLTest(fixtures.TestBase):
             config.db, checkfirst=False
         )
 
-__all__ = ('TableDDLTest',)
\ No newline at end of file
+
+__all__ = ('TableDDLTest', )
\ No newline at end of file
diff --git a/lib/sqlalchemy/testing/suite/test_insert.py b/lib/sqlalchemy/testing/suite/test_insert.py
new file mode 100644 (file)
index 0000000..53a70e0
--- /dev/null
@@ -0,0 +1,111 @@
+from .. import fixtures, config
+from ..config import requirements
+from .. import exclusions
+from ..assertions import eq_
+from .. import engines
+
+from sqlalchemy import Integer, String, select, util
+
+from ..schema import Table, Column
+
+class InsertSequencingTest(fixtures.TablesTest):
+    run_deletes = 'each'
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table('autoinc_pk', metadata,
+                Column('id', Integer, primary_key=True,
+                                    test_needs_autoincrement=True),
+                Column('data', String(50))
+            )
+
+        Table('manual_pk', metadata,
+                Column('id', Integer, primary_key=True, autoincrement=False),
+                Column('data', String(50))
+            )
+
+    def _assert_round_trip(self, table):
+        row = config.db.execute(table.select()).first()
+        eq_(
+            row,
+            (1, "some data")
+        )
+
+    @requirements.autoincrement_insert
+    def test_autoincrement_on_insert(self):
+
+        config.db.execute(
+            self.tables.autoinc_pk.insert(),
+            data="some data"
+        )
+        self._assert_round_trip(self.tables.autoinc_pk)
+
+    @requirements.autoincrement_insert
+    def test_last_inserted_id(self):
+
+        r = config.db.execute(
+            self.tables.autoinc_pk.insert(),
+            data="some data"
+        )
+        pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
+        eq_(
+            r.inserted_primary_key,
+            [pk]
+        )
+
+    @exclusions.fails_if(lambda: util.pypy, "lastrowid not maintained after "
+                            "connection close")
+    @requirements.dbapi_lastrowid
+    def test_native_lastrowid_autoinc(self):
+        r = config.db.execute(
+            self.tables.autoinc_pk.insert(),
+            data="some data"
+        )
+        lastrowid = r.lastrowid
+        pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
+        eq_(
+            lastrowid, pk
+        )
+
+
+class InsertBehaviorTest(fixtures.TablesTest):
+    run_deletes = 'each'
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table('autoinc_pk', metadata,
+                Column('id', Integer, primary_key=True, \
+                                test_needs_autoincrement=True),
+                Column('data', String(50))
+            )
+
+    def test_autoclose_on_insert(self):
+        if requirements.returning.enabled:
+            engine = engines.testing_engine(
+                            options={'implicit_returning': False})
+        else:
+            engine = config.db
+
+
+        r = engine.execute(
+            self.tables.autoinc_pk.insert(),
+            data="some data"
+        )
+        assert r.closed
+        assert r.is_insert
+        assert not r.returns_rows
+
+    @requirements.returning
+    def test_autoclose_on_insert_implicit_returning(self):
+        r = config.db.execute(
+            self.tables.autoinc_pk.insert(),
+            data="some data"
+        )
+        assert r.closed
+        assert r.is_insert
+        assert r.returns_rows
+
+
+__all__ = ('InsertSequencingTest', 'InsertBehaviorTest')
+
+
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..f816895a46f160b28a1bcbe1effdf265b875291c 100644 (file)
@@ -0,0 +1,424 @@
+import sqlalchemy as sa
+from sqlalchemy import exc as sa_exc
+from sqlalchemy import types as sql_types
+from sqlalchemy import schema
+from sqlalchemy import inspect
+from sqlalchemy import MetaData, Integer, String
+from sqlalchemy.engine.reflection import Inspector
+from sqlalchemy.testing import engines, fixtures
+from sqlalchemy.testing.schema import Table, Column
+from sqlalchemy.testing import eq_, assert_raises_message
+from sqlalchemy import testing
+from .. import config
+
+metadata, users = None, None
+
+
+class HasTableTest(fixtures.TablesTest):
+    @classmethod
+    def define_tables(cls, metadata):
+        Table('test_table', metadata,
+                Column('id', Integer, primary_key=True),
+                Column('data', String(50))
+            )
+
+    def test_has_table(self):
+        with config.db.begin() as conn:
+            assert config.db.dialect.has_table(conn, "test_table")
+            assert not config.db.dialect.has_table(conn, "nonexistent_table")
+
+class HasSequenceTest(fixtures.TestBase):
+    __requires__ = 'sequences',
+
+    def test_has_sequence(self):
+        metadata = MetaData()
+        Table('users', metadata, Column('user_id', sa.Integer,
+                      sa.Sequence('user_id_seq'), primary_key=True),
+                      Column('user_name', sa.String(40)))
+        metadata.create_all(bind=testing.db)
+        try:
+            eq_(testing.db.dialect.has_sequence(testing.db,
+                'user_id_seq'), True)
+        finally:
+            metadata.drop_all(bind=testing.db)
+        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
+            False)
+
+    @testing.requires.schemas
+    def test_has_sequence_schema(self):
+        test_schema = 'test_schema'
+        s1 = sa.Sequence('user_id_seq', schema=test_schema)
+        s2 = sa.Sequence('user_id_seq')
+        testing.db.execute(schema.CreateSequence(s1))
+        testing.db.execute(schema.CreateSequence(s2))
+        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
+            schema=test_schema), True)
+        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
+            True)
+        testing.db.execute(schema.DropSequence(s1))
+        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
+            schema=test_schema), False)
+        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
+            True)
+        testing.db.execute(schema.DropSequence(s2))
+        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
+            schema=test_schema), False)
+        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
+            False)
+
+
+def createTables(meta, schema=None):
+    if schema:
+        schema_prefix = schema + "."
+    else:
+        schema_prefix = ""
+
+    users = Table('users', meta,
+        Column('user_id', sa.INT, primary_key=True),
+        Column('user_name', sa.VARCHAR(20), nullable=False),
+        Column('test1', sa.CHAR(5), nullable=False),
+        Column('test2', sa.Float(5), nullable=False),
+        Column('test3', sa.Text),
+        Column('test4', sa.Numeric(10, 2), nullable=False),
+        Column('test5', sa.Date),
+        Column('test5_1', sa.TIMESTAMP),
+        Column('parent_user_id', sa.Integer,
+                    sa.ForeignKey('%susers.user_id' % schema_prefix)),
+        Column('test6', sa.Date, nullable=False),
+        Column('test7', sa.Text),
+        Column('test8', sa.LargeBinary),
+        Column('test_passivedefault2', sa.Integer, server_default='5'),
+        Column('test9', sa.LargeBinary(100)),
+        Column('test10', sa.Numeric(10, 2)),
+        schema=schema,
+        test_needs_fk=True,
+    )
+    dingalings = Table("dingalings", meta,
+              Column('dingaling_id', sa.Integer, primary_key=True),
+              Column('address_id', sa.Integer,
+                    sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)),
+              Column('data', sa.String(30)),
+              schema=schema,
+              test_needs_fk=True,
+        )
+    addresses = Table('email_addresses', meta,
+        Column('address_id', sa.Integer),
+        Column('remote_user_id', sa.Integer,
+               sa.ForeignKey(users.c.user_id)),
+        Column('email_address', sa.String(20)),
+        sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'),
+        schema=schema,
+        test_needs_fk=True,
+    )
+
+    return (users, addresses, dingalings)
+
+def createIndexes(con, schema=None):
+    fullname = 'users'
+    if schema:
+        fullname = "%s.%s" % (schema, 'users')
+    query = "CREATE INDEX users_t_idx ON %s (test1, test2)" % fullname
+    con.execute(sa.sql.text(query))
+
+@testing.requires.views
+def _create_views(con, schema=None):
+    for table_name in ('users', 'email_addresses'):
+        fullname = table_name
+        if schema:
+            fullname = "%s.%s" % (schema, table_name)
+        view_name = fullname + '_v'
+        query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name,
+                                                                   fullname)
+        con.execute(sa.sql.text(query))
+
+@testing.requires.views
+def _drop_views(con, schema=None):
+    for table_name in ('email_addresses', 'users'):
+        fullname = table_name
+        if schema:
+            fullname = "%s.%s" % (schema, table_name)
+        view_name = fullname + '_v'
+        query = "DROP VIEW %s" % view_name
+        con.execute(sa.sql.text(query))
+
+class ComponentReflectionTest(fixtures.TestBase):
+
+    @testing.requires.schemas
+    def test_get_schema_names(self):
+        insp = inspect(testing.db)
+
+        self.assert_('test_schema' in insp.get_schema_names())
+
+    def test_dialect_initialize(self):
+        engine = engines.testing_engine()
+        assert not hasattr(engine.dialect, 'default_schema_name')
+        inspect(engine)
+        assert hasattr(engine.dialect, 'default_schema_name')
+
+    def test_get_default_schema_name(self):
+        insp = inspect(testing.db)
+        eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
+
+    @testing.provide_metadata
+    def _test_get_table_names(self, schema=None, table_type='table',
+                              order_by=None):
+        meta = self.metadata
+        users, addresses, dingalings = createTables(meta, schema)
+        meta.create_all()
+        _create_views(meta.bind, schema)
+        try:
+            insp = inspect(meta.bind)
+            if table_type == 'view':
+                table_names = insp.get_view_names(schema)
+                table_names.sort()
+                answer = ['email_addresses_v', 'users_v']
+            else:
+                table_names = insp.get_table_names(schema,
+                                                   order_by=order_by)
+                if order_by == 'foreign_key':
+                    answer = ['dingalings', 'email_addresses', 'users']
+                    eq_(table_names, answer)
+                else:
+                    answer = ['dingalings', 'email_addresses', 'users']
+                    eq_(sorted(table_names), answer)
+        finally:
+            _drop_views(meta.bind, schema)
+
+    def test_get_table_names(self):
+        self._test_get_table_names()
+
+    def test_get_table_names_fks(self):
+        self._test_get_table_names(order_by='foreign_key')
+
+    @testing.requires.schemas
+    def test_get_table_names_with_schema(self):
+        self._test_get_table_names('test_schema')
+
+    @testing.requires.views
+    def test_get_view_names(self):
+        self._test_get_table_names(table_type='view')
+
+    @testing.requires.schemas
+    def test_get_view_names_with_schema(self):
+        self._test_get_table_names('test_schema', table_type='view')
+
+    def _test_get_columns(self, schema=None, table_type='table'):
+        meta = MetaData(testing.db)
+        users, addresses, dingalings = createTables(meta, schema)
+        table_names = ['users', 'email_addresses']
+        meta.create_all()
+        if table_type == 'view':
+            _create_views(meta.bind, schema)
+            table_names = ['users_v', 'email_addresses_v']
+        try:
+            insp = inspect(meta.bind)
+            for table_name, table in zip(table_names, (users,
+                    addresses)):
+                schema_name = schema
+                cols = insp.get_columns(table_name, schema=schema_name)
+                self.assert_(len(cols) > 0, len(cols))
+
+                # should be in order
+
+                for i, col in enumerate(table.columns):
+                    eq_(col.name, cols[i]['name'])
+                    ctype = cols[i]['type'].__class__
+                    ctype_def = col.type
+                    if isinstance(ctype_def, sa.types.TypeEngine):
+                        ctype_def = ctype_def.__class__
+
+                    # Oracle returns Date for DateTime.
+
+                    if testing.against('oracle') and ctype_def \
+                        in (sql_types.Date, sql_types.DateTime):
+                        ctype_def = sql_types.Date
+
+                    # assert that the desired type and return type share
+                    # a base within one of the generic types.
+
+                    self.assert_(len(set(ctype.__mro__).
+                        intersection(ctype_def.__mro__).intersection([
+                        sql_types.Integer,
+                        sql_types.Numeric,
+                        sql_types.DateTime,
+                        sql_types.Date,
+                        sql_types.Time,
+                        sql_types.String,
+                        sql_types._Binary,
+                        ])) > 0, '%s(%s), %s(%s)' % (col.name,
+                                col.type, cols[i]['name'], ctype))
+        finally:
+            if table_type == 'view':
+                _drop_views(meta.bind, schema)
+            meta.drop_all()
+
+    def test_get_columns(self):
+        self._test_get_columns()
+
+    @testing.requires.schemas
+    def test_get_columns_with_schema(self):
+        self._test_get_columns(schema='test_schema')
+
+    @testing.requires.views
+    def test_get_view_columns(self):
+        self._test_get_columns(table_type='view')
+
+    @testing.requires.views
+    @testing.requires.schemas
+    def test_get_view_columns_with_schema(self):
+        self._test_get_columns(schema='test_schema', table_type='view')
+
+    @testing.provide_metadata
+    def _test_get_pk_constraint(self, schema=None):
+        meta = self.metadata
+        users, addresses, _ = createTables(meta, schema)
+        meta.create_all()
+        insp = inspect(meta.bind)
+
+        users_cons = insp.get_pk_constraint(users.name, schema=schema)
+        users_pkeys = users_cons['constrained_columns']
+        eq_(users_pkeys,  ['user_id'])
+
+        addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
+        addr_pkeys = addr_cons['constrained_columns']
+        eq_(addr_pkeys,  ['address_id'])
+
+        @testing.requires.reflects_pk_names
+        def go():
+            eq_(addr_cons['name'], 'email_ad_pk')
+        go()
+
+    def test_get_pk_constraint(self):
+        self._test_get_pk_constraint()
+
+    @testing.fails_on('sqlite', 'no schemas')
+    def test_get_pk_constraint_with_schema(self):
+        self._test_get_pk_constraint(schema='test_schema')
+
+    @testing.provide_metadata
+    def test_deprecated_get_primary_keys(self):
+        meta = self.metadata
+        users, _, _ = createTables(meta, schema=None)
+        meta.create_all()
+        insp = Inspector(meta.bind)
+        assert_raises_message(
+            sa_exc.SADeprecationWarning,
+            "Call to deprecated method get_primary_keys."
+            "  Use get_pk_constraint instead.",
+            insp.get_primary_keys, users.name
+        )
+
+    @testing.provide_metadata
+    def _test_get_foreign_keys(self, schema=None):
+        meta = self.metadata
+        users, addresses, dingalings = createTables(meta, schema)
+        meta.create_all()
+        insp = inspect(meta.bind)
+        expected_schema = schema
+        # users
+        users_fkeys = insp.get_foreign_keys(users.name,
+                                            schema=schema)
+        fkey1 = users_fkeys[0]
+
+        @testing.fails_on('sqlite', 'no support for constraint names')
+        def go():
+            self.assert_(fkey1['name'] is not None)
+        go()
+
+        eq_(fkey1['referred_schema'], expected_schema)
+        eq_(fkey1['referred_table'], users.name)
+        eq_(fkey1['referred_columns'], ['user_id', ])
+        eq_(fkey1['constrained_columns'], ['parent_user_id'])
+        #addresses
+        addr_fkeys = insp.get_foreign_keys(addresses.name,
+                                           schema=schema)
+        fkey1 = addr_fkeys[0]
+        @testing.fails_on('sqlite', 'no support for constraint names')
+        def go():
+            self.assert_(fkey1['name'] is not None)
+        go()
+        eq_(fkey1['referred_schema'], expected_schema)
+        eq_(fkey1['referred_table'], users.name)
+        eq_(fkey1['referred_columns'], ['user_id', ])
+        eq_(fkey1['constrained_columns'], ['remote_user_id'])
+
+    def test_get_foreign_keys(self):
+        self._test_get_foreign_keys()
+
+    @testing.requires.schemas
+    def test_get_foreign_keys_with_schema(self):
+        self._test_get_foreign_keys(schema='test_schema')
+
+    @testing.provide_metadata
+    def _test_get_indexes(self, schema=None):
+        meta = self.metadata
+        users, addresses, dingalings = createTables(meta, schema)
+        meta.create_all()
+        createIndexes(meta.bind, schema)
+        # The database may decide to create indexes for foreign keys, etc.
+        # so there may be more indexes than expected.
+        insp = inspect(meta.bind)
+        indexes = insp.get_indexes('users', schema=schema)
+        expected_indexes = [
+            {'unique': False,
+             'column_names': ['test1', 'test2'],
+             'name': 'users_t_idx'}]
+        index_names = [d['name'] for d in indexes]
+        for e_index in expected_indexes:
+            assert e_index['name'] in index_names
+            index = indexes[index_names.index(e_index['name'])]
+            for key in e_index:
+                eq_(e_index[key], index[key])
+
+    def test_get_indexes(self):
+        self._test_get_indexes()
+
+    @testing.requires.schemas
+    def test_get_indexes_with_schema(self):
+        self._test_get_indexes(schema='test_schema')
+
+    @testing.provide_metadata
+    def _test_get_view_definition(self, schema=None):
+        meta = self.metadata
+        users, addresses, dingalings = createTables(meta, schema)
+        meta.create_all()
+        _create_views(meta.bind, schema)
+        view_name1 = 'users_v'
+        view_name2 = 'email_addresses_v'
+        try:
+            insp = inspect(meta.bind)
+            v1 = insp.get_view_definition(view_name1, schema=schema)
+            self.assert_(v1)
+            v2 = insp.get_view_definition(view_name2, schema=schema)
+            self.assert_(v2)
+        finally:
+            _drop_views(meta.bind, schema)
+
+    @testing.requires.views
+    def test_get_view_definition(self):
+        self._test_get_view_definition()
+
+    @testing.requires.views
+    @testing.requires.schemas
+    def test_get_view_definition_with_schema(self):
+        self._test_get_view_definition(schema='test_schema')
+
+    @testing.only_on("postgresql", "PG specific feature")
+    @testing.provide_metadata
+    def _test_get_table_oid(self, table_name, schema=None):
+        meta = self.metadata
+        users, addresses, dingalings = createTables(meta, schema)
+        meta.create_all()
+        insp = inspect(meta.bind)
+        oid = insp.get_table_oid(table_name, schema)
+        self.assert_(isinstance(oid, (int, long)))
+
+    def test_get_table_oid(self):
+        self._test_get_table_oid('users')
+
+    @testing.requires.schemas
+    def test_get_table_oid_with_schema(self):
+        self._test_get_table_oid('users', schema='test_schema')
+
+
+__all__ = ('ComponentReflectionTest', 'HasSequenceTest', 'HasTableTest')
\ No newline at end of file
diff --git a/lib/sqlalchemy/testing/suite/test_sequencing.py b/lib/sqlalchemy/testing/suite/test_sequencing.py
deleted file mode 100644 (file)
index 7b09ecb..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-from .. import fixtures, config, util
-from ..config import requirements
-from ..assertions import eq_
-
-from sqlalchemy import Table, Column, Integer, String
-
-
-class InsertSequencingTest(fixtures.TablesTest):
-    run_deletes = 'each'
-
-    @classmethod
-    def define_tables(cls, metadata):
-        Table('plain_pk', metadata,
-                Column('id', Integer, primary_key=True),
-                Column('data', String(50))
-            )
-
-    def _assert_round_trip(self, table):
-        row = config.db.execute(table.select()).first()
-        eq_(
-            row,
-            (1, "some data")
-        )
-
-    @requirements.autoincrement_insert
-    def test_autoincrement_on_insert(self):
-
-        config.db.execute(
-            self.tables.plain_pk.insert(),
-            data="some data"
-        )
-        self._assert_round_trip(self.tables.plain_pk)
-
-
-
-__all__ = ('InsertSequencingTest',)
\ No newline at end of file
diff --git a/lib/sqlalchemy/testing/suite/test_update_delete.py b/lib/sqlalchemy/testing/suite/test_update_delete.py
new file mode 100644 (file)
index 0000000..e73b054
--- /dev/null
@@ -0,0 +1,64 @@
+from .. import fixtures, config
+from ..config import requirements
+from ..assertions import eq_
+from .. import engines
+
+from sqlalchemy import Integer, String, select
+from ..schema import Table, Column
+
+
+class SimpleUpdateDeleteTest(fixtures.TablesTest):
+    run_deletes = 'each'
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table('plain_pk', metadata,
+                Column('id', Integer, primary_key=True),
+                Column('data', String(50))
+            )
+
+    @classmethod
+    def insert_data(cls):
+        config.db.execute(
+            cls.tables.plain_pk.insert(),
+            [
+                {"id":1, "data":"d1"},
+                {"id":2, "data":"d2"},
+                {"id":3, "data":"d3"},
+            ]
+        )
+
+    def test_update(self):
+        t = self.tables.plain_pk
+        r = config.db.execute(
+            t.update().where(t.c.id == 2),
+            data="d2_new"
+        )
+        assert not r.is_insert
+        assert not r.returns_rows
+
+        eq_(
+            config.db.execute(t.select().order_by(t.c.id)).fetchall(),
+            [
+                (1, "d1"),
+                (2, "d2_new"),
+                (3, "d3")
+            ]
+        )
+
+    def test_delete(self):
+        t = self.tables.plain_pk
+        r = config.db.execute(
+            t.delete().where(t.c.id == 2)
+        )
+        assert not r.is_insert
+        assert not r.returns_rows
+        eq_(
+            config.db.execute(t.select().order_by(t.c.id)).fetchall(),
+            [
+                (1, "d1"),
+                (3, "d3")
+            ]
+        )
+
+__all__ = ('SimpleUpdateDeleteTest', )
\ No newline at end of file
index 625b9e6a5ffb572749d32528f9cfae3dc816ce30..a02053dfb505e5fb7d98d7e96bd36eb3efe70608 100644 (file)
@@ -1,5 +1,5 @@
-from sqlalchemy.util import jython, pypy, defaultdict, decorator
-from sqlalchemy.util.compat import decimal
+from ..util import jython, pypy, defaultdict, decorator
+from ..util.compat import decimal
 
 import gc
 import time
index 799fca128d7102fc1dfa5c5d058096fe7564ef1b..7afcc63c5add7c85b3e74049984263369fe63915 100644 (file)
@@ -1,8 +1,8 @@
 from __future__ import absolute_import
 
 import warnings
-from sqlalchemy import exc as sa_exc
-from sqlalchemy import util
+from .. import exc as sa_exc
+from .. import util
 
 def testing_warn(msg, stacklevel=3):
     """Replaces sqlalchemy.util.warn during tests."""
index 9abff4287752988e2d7ae86032196277b3ae353c..4a8a4f67de30508fa01103989482138dfbe093e5 100644 (file)
@@ -1,5 +1,6 @@
-from sqlalchemy.testing.suite.test_ddl import *
-from sqlalchemy.testing.suite.test_sequencing import *
+from sqlalchemy.testing.suite import *
+
+
 
 
 
index a909803a1336f4080af64228ac5c550f1e5ed183..9ed3e78c1672cbe42ddc034abe8808b74c8738b4 100644 (file)
@@ -1214,45 +1214,6 @@ class SchemaTest(fixtures.TestBase):
                 'test_schema.email_addresses'])
         )
 
-class HasSequenceTest(fixtures.TestBase):
-
-    @testing.requires.sequences
-    def test_has_sequence(self):
-        metadata = MetaData()
-        users = Table('users', metadata, Column('user_id', sa.Integer,
-                      sa.Sequence('user_id_seq'), primary_key=True),
-                      Column('user_name', sa.String(40)))
-        metadata.create_all(bind=testing.db)
-        try:
-            eq_(testing.db.dialect.has_sequence(testing.db,
-                'user_id_seq'), True)
-        finally:
-            metadata.drop_all(bind=testing.db)
-        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
-            False)
-
-    @testing.requires.schemas
-    @testing.requires.sequences
-    def test_has_sequence_schema(self):
-        test_schema = 'test_schema'
-        s1 = sa.Sequence('user_id_seq', schema=test_schema)
-        s2 = sa.Sequence('user_id_seq')
-        testing.db.execute(schema.CreateSequence(s1))
-        testing.db.execute(schema.CreateSequence(s2))
-        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
-            schema=test_schema), True)
-        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
-            True)
-        testing.db.execute(schema.DropSequence(s1))
-        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
-            schema=test_schema), False)
-        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
-            True)
-        testing.db.execute(schema.DropSequence(s2))
-        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
-            schema=test_schema), False)
-        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
-            False)
 
 
 
@@ -1406,284 +1367,6 @@ class CaseSensitiveTest(fixtures.TablesTest):
         eq_(t2.name, "sOmEtAbLe")
 
 
-class ComponentReflectionTest(fixtures.TestBase):
-
-    @testing.requires.schemas
-    def test_get_schema_names(self):
-        insp = inspect(testing.db)
-
-        self.assert_('test_schema' in insp.get_schema_names())
-
-    def test_dialect_initialize(self):
-        engine = engines.testing_engine()
-        assert not hasattr(engine.dialect, 'default_schema_name')
-        insp = inspect(engine)
-        assert hasattr(engine.dialect, 'default_schema_name')
-
-    def test_get_default_schema_name(self):
-        insp = inspect(testing.db)
-        eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
-
-    @testing.provide_metadata
-    def _test_get_table_names(self, schema=None, table_type='table',
-                              order_by=None):
-        meta = self.metadata
-        users, addresses, dingalings = createTables(meta, schema)
-        meta.create_all()
-        _create_views(meta.bind, schema)
-        try:
-            insp = inspect(meta.bind)
-            if table_type == 'view':
-                table_names = insp.get_view_names(schema)
-                table_names.sort()
-                answer = ['email_addresses_v', 'users_v']
-            else:
-                table_names = insp.get_table_names(schema,
-                                                   order_by=order_by)
-                if order_by == 'foreign_key':
-                    answer = ['dingalings', 'email_addresses', 'users']
-                    eq_(table_names, answer)
-                else:
-                    answer = ['dingalings', 'email_addresses', 'users']
-                    eq_(sorted(table_names), answer)
-        finally:
-            _drop_views(meta.bind, schema)
-
-    def test_get_table_names(self):
-        self._test_get_table_names()
-
-    def test_get_table_names_fks(self):
-        self._test_get_table_names(order_by='foreign_key')
-
-    @testing.requires.schemas
-    def test_get_table_names_with_schema(self):
-        self._test_get_table_names('test_schema')
-
-    @testing.requires.views
-    def test_get_view_names(self):
-        self._test_get_table_names(table_type='view')
-
-    @testing.requires.schemas
-    def test_get_view_names_with_schema(self):
-        self._test_get_table_names('test_schema', table_type='view')
-
-    def _test_get_columns(self, schema=None, table_type='table'):
-        meta = MetaData(testing.db)
-        users, addresses, dingalings = createTables(meta, schema)
-        table_names = ['users', 'email_addresses']
-        meta.create_all()
-        if table_type == 'view':
-            _create_views(meta.bind, schema)
-            table_names = ['users_v', 'email_addresses_v']
-        try:
-            insp = inspect(meta.bind)
-            for table_name, table in zip(table_names, (users,
-                    addresses)):
-                schema_name = schema
-                cols = insp.get_columns(table_name, schema=schema_name)
-                self.assert_(len(cols) > 0, len(cols))
-
-                # should be in order
-
-                for i, col in enumerate(table.columns):
-                    eq_(col.name, cols[i]['name'])
-                    ctype = cols[i]['type'].__class__
-                    ctype_def = col.type
-                    if isinstance(ctype_def, sa.types.TypeEngine):
-                        ctype_def = ctype_def.__class__
-
-                    # Oracle returns Date for DateTime.
-
-                    if testing.against('oracle') and ctype_def \
-                        in (sql_types.Date, sql_types.DateTime):
-                        ctype_def = sql_types.Date
-
-                    # assert that the desired type and return type share
-                    # a base within one of the generic types.
-
-                    self.assert_(len(set(ctype.__mro__).
-                        intersection(ctype_def.__mro__).intersection([
-                        sql_types.Integer,
-                        sql_types.Numeric,
-                        sql_types.DateTime,
-                        sql_types.Date,
-                        sql_types.Time,
-                        sql_types.String,
-                        sql_types._Binary,
-                        ])) > 0, '%s(%s), %s(%s)' % (col.name,
-                                col.type, cols[i]['name'], ctype))
-        finally:
-            if table_type == 'view':
-                _drop_views(meta.bind, schema)
-            meta.drop_all()
-
-    def test_get_columns(self):
-        self._test_get_columns()
-
-    @testing.requires.schemas
-    def test_get_columns_with_schema(self):
-        self._test_get_columns(schema='test_schema')
-
-    @testing.requires.views
-    def test_get_view_columns(self):
-        self._test_get_columns(table_type='view')
-
-    @testing.requires.views
-    @testing.requires.schemas
-    def test_get_view_columns_with_schema(self):
-        self._test_get_columns(schema='test_schema', table_type='view')
-
-    @testing.provide_metadata
-    def _test_get_pk_constraint(self, schema=None):
-        meta = self.metadata
-        users, addresses, _ = createTables(meta, schema)
-        meta.create_all()
-        insp = inspect(meta.bind)
-
-        users_cons = insp.get_pk_constraint(users.name, schema=schema)
-        users_pkeys = users_cons['constrained_columns']
-        eq_(users_pkeys,  ['user_id'])
-
-        addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
-        addr_pkeys = addr_cons['constrained_columns']
-        eq_(addr_pkeys,  ['address_id'])
-
-        @testing.requires.reflects_pk_names
-        def go():
-            eq_(addr_cons['name'], 'email_ad_pk')
-        go()
-
-    def test_get_pk_constraint(self):
-        self._test_get_pk_constraint()
-
-    @testing.fails_on('sqlite', 'no schemas')
-    def test_get_pk_constraint_with_schema(self):
-        self._test_get_pk_constraint(schema='test_schema')
-
-    @testing.provide_metadata
-    def test_deprecated_get_primary_keys(self):
-        meta = self.metadata
-        users, _, _ = createTables(meta, schema=None)
-        meta.create_all()
-        insp = Inspector(meta.bind)
-        assert_raises_message(
-            sa_exc.SADeprecationWarning,
-            "Call to deprecated method get_primary_keys."
-            "  Use get_pk_constraint instead.",
-            insp.get_primary_keys, users.name
-        )
-
-    @testing.provide_metadata
-    def _test_get_foreign_keys(self, schema=None):
-        meta = self.metadata
-        users, addresses, dingalings = createTables(meta, schema)
-        meta.create_all()
-        insp = inspect(meta.bind)
-        expected_schema = schema
-        # users
-        users_fkeys = insp.get_foreign_keys(users.name,
-                                            schema=schema)
-        fkey1 = users_fkeys[0]
-
-        @testing.fails_on('sqlite', 'no support for constraint names')
-        def go():
-            self.assert_(fkey1['name'] is not None)
-        go()
-
-        eq_(fkey1['referred_schema'], expected_schema)
-        eq_(fkey1['referred_table'], users.name)
-        eq_(fkey1['referred_columns'], ['user_id', ])
-        eq_(fkey1['constrained_columns'], ['parent_user_id'])
-        #addresses
-        addr_fkeys = insp.get_foreign_keys(addresses.name,
-                                           schema=schema)
-        fkey1 = addr_fkeys[0]
-        @testing.fails_on('sqlite', 'no support for constraint names')
-        def go():
-            self.assert_(fkey1['name'] is not None)
-        go()
-        eq_(fkey1['referred_schema'], expected_schema)
-        eq_(fkey1['referred_table'], users.name)
-        eq_(fkey1['referred_columns'], ['user_id', ])
-        eq_(fkey1['constrained_columns'], ['remote_user_id'])
-
-    def test_get_foreign_keys(self):
-        self._test_get_foreign_keys()
-
-    @testing.requires.schemas
-    def test_get_foreign_keys_with_schema(self):
-        self._test_get_foreign_keys(schema='test_schema')
-
-    @testing.provide_metadata
-    def _test_get_indexes(self, schema=None):
-        meta = self.metadata
-        users, addresses, dingalings = createTables(meta, schema)
-        meta.create_all()
-        createIndexes(meta.bind, schema)
-        # The database may decide to create indexes for foreign keys, etc.
-        # so there may be more indexes than expected.
-        insp = inspect(meta.bind)
-        indexes = insp.get_indexes('users', schema=schema)
-        expected_indexes = [
-            {'unique': False,
-             'column_names': ['test1', 'test2'],
-             'name': 'users_t_idx'}]
-        index_names = [d['name'] for d in indexes]
-        for e_index in expected_indexes:
-            assert e_index['name'] in index_names
-            index = indexes[index_names.index(e_index['name'])]
-            for key in e_index:
-                eq_(e_index[key], index[key])
-
-    def test_get_indexes(self):
-        self._test_get_indexes()
-
-    @testing.requires.schemas
-    def test_get_indexes_with_schema(self):
-        self._test_get_indexes(schema='test_schema')
-
-    @testing.provide_metadata
-    def _test_get_view_definition(self, schema=None):
-        meta = self.metadata
-        users, addresses, dingalings = createTables(meta, schema)
-        meta.create_all()
-        _create_views(meta.bind, schema)
-        view_name1 = 'users_v'
-        view_name2 = 'email_addresses_v'
-        try:
-            insp = inspect(meta.bind)
-            v1 = insp.get_view_definition(view_name1, schema=schema)
-            self.assert_(v1)
-            v2 = insp.get_view_definition(view_name2, schema=schema)
-            self.assert_(v2)
-        finally:
-            _drop_views(meta.bind, schema)
-
-    @testing.requires.views
-    def test_get_view_definition(self):
-        self._test_get_view_definition()
-
-    @testing.requires.views
-    @testing.requires.schemas
-    def test_get_view_definition_with_schema(self):
-        self._test_get_view_definition(schema='test_schema')
-
-    @testing.only_on("postgresql", "PG specific feature")
-    @testing.provide_metadata
-    def _test_get_table_oid(self, table_name, schema=None):
-        meta = self.metadata
-        users, addresses, dingalings = createTables(meta, schema)
-        meta.create_all()
-        insp = inspect(meta.bind)
-        oid = insp.get_table_oid(table_name, schema)
-        self.assert_(isinstance(oid, (int, long)))
-
-    def test_get_table_oid(self):
-        self._test_get_table_oid('users')
-
-    @testing.requires.schemas
-    def test_get_table_oid_with_schema(self):
-        self._test_get_table_oid('users', schema='test_schema')
 
 class ColumnEventsTest(fixtures.TestBase):
     @classmethod
index 59350c8e73a9e144fec1692f73851da5feed3119..e15c132416ccabe41eb17a0ec43c89ccf3a02c1b 100644 (file)
@@ -5,7 +5,7 @@
 
 from sqlalchemy import util
 import sys
-from sqlalchemy.testing.suite.requirements import SuiteRequirements
+from sqlalchemy.testing.requirements import SuiteRequirements
 from sqlalchemy.testing.exclusions import \
      skip, \
      skip_if,\
index d14cafc86a9333418b7e279518bc85eeaa89366d..e2f2544c863cdf36a2f2f1afac4700283661bbc0 100644 (file)
@@ -8,6 +8,10 @@ from sqlalchemy import exc, sql
 from sqlalchemy.engine import default, result as _result
 from sqlalchemy.testing.schema import Table, Column
 
+# ongoing - these are old tests.  those which are of general use
+# to test a dialect are being slowly migrated to
+# sqlalhcemy.testing.suite
+
 class QueryTest(fixtures.TestBase):
 
     @classmethod
@@ -44,12 +48,9 @@ class QueryTest(fixtures.TestBase):
     def teardown_class(cls):
         metadata.drop_all()
 
-    def test_insert(self):
-        users.insert().execute(user_id = 7, user_name = 'jack')
-        assert users.count().scalar() == 1
-
     def test_insert_heterogeneous_params(self):
-        """test that executemany parameters are asserted to match the parameter set of the first."""
+        """test that executemany parameters are asserted to match the
+        parameter set of the first."""
 
         assert_raises_message(exc.StatementError,
             r"A value is required for bind parameter 'user_name', in "
@@ -70,13 +71,6 @@ class QueryTest(fixtures.TestBase):
             {'user_id':9}
         )
 
-    def test_update(self):
-        users.insert().execute(user_id = 7, user_name = 'jack')
-        assert users.count().scalar() == 1
-
-        users.update(users.c.user_id == 7).execute(user_name = 'fred')
-        assert users.select(users.c.user_id==7).execute().first()['user_name'] == 'fred'
-
     def test_lastrow_accessor(self):
         """Tests the inserted_primary_key and lastrow_has_id() functions."""
 
@@ -196,7 +190,8 @@ class QueryTest(fixtures.TestBase):
         )
         t6 = Table("t6", metadata,
             Column('manual_id', Integer, ForeignKey('related.id'), primary_key=True),
-            Column('auto_id', Integer, primary_key=True, test_needs_autoincrement=True),
+            Column('auto_id', Integer, primary_key=True,
+                                    test_needs_autoincrement=True),
             mysql_engine='MyISAM'
         )
 
@@ -208,21 +203,6 @@ class QueryTest(fixtures.TestBase):
         r = t6.insert().values(manual_id=id).execute()
         eq_(r.inserted_primary_key, [12, 1])
 
-    def test_autoclose_on_insert(self):
-        if testing.against('firebird', 'postgresql', 'oracle', 'mssql'):
-            test_engines = [
-                engines.testing_engine(options={'implicit_returning':False}),
-                engines.testing_engine(options={'implicit_returning':True}),
-            ]
-        else:
-            test_engines = [testing.db]
-
-        for engine in test_engines:
-
-            r = engine.execute(users.insert(),
-                {'user_name':'jack'},
-            )
-            assert r.closed
 
     def test_row_iteration(self):
         users.insert().execute(
@@ -563,16 +543,6 @@ class QueryTest(fixtures.TestBase):
         )
 
 
-    def test_delete(self):
-        users.insert().execute(user_id = 7, user_name = 'jack')
-        users.insert().execute(user_id = 8, user_name = 'fred')
-        print repr(users.select().execute().fetchall())
-
-        users.delete(users.c.user_name == 'fred').execute()
-
-        print repr(users.select().execute().fetchall())
-
-
 
     @testing.exclude('mysql', '<', (5, 0, 37), 'database bug')
     def test_scalar_select(self):
@@ -840,47 +810,7 @@ class QueryTest(fixtures.TestBase):
             lambda: r['foo']
         )
 
-    @testing.fails_if(lambda: util.pypy, "lastrowid not maintained after "
-                            "connection close")
-    @testing.requires.dbapi_lastrowid
-    def test_native_lastrowid(self):
-        r = testing.db.execute(
-            users.insert(),
-            {'user_id':1, 'user_name':'ed'}
-        )
-
-        eq_(r.lastrowid, 1)
-
-    def test_returns_rows_flag_insert(self):
-        r = testing.db.execute(
-            users.insert(),
-            {'user_id':1, 'user_name':'ed'}
-        )
-        assert r.is_insert
-        assert not r.returns_rows
 
-    def test_returns_rows_flag_update(self):
-        r = testing.db.execute(
-            users.update().values(user_name='fred')
-        )
-        assert not r.is_insert
-        assert not r.returns_rows
-
-    def test_returns_rows_flag_select(self):
-        r = testing.db.execute(
-            users.select()
-        )
-        assert not r.is_insert
-        assert r.returns_rows
-
-    @testing.requires.returning
-    def test_returns_rows_flag_insert_returning(self):
-        r = testing.db.execute(
-            users.insert().returning(users.c.user_id),
-            {'user_id':1, 'user_name':'ed'}
-        )
-        assert r.is_insert
-        assert r.returns_rows
 
     def test_graceful_fetch_on_non_rows(self):
         """test that calling fetchone() etc. on a result that doesn't