]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- rework tests for attached databases into individual tests,
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 29 Sep 2014 22:09:25 +0000 (18:09 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 29 Sep 2014 22:10:32 +0000 (18:10 -0400)
test both memory and file-based
- When selecting from a UNION using an attached database file,
the pysqlite driver reports column names in cursor.description
as 'dbname.tablename.colname', instead of 'tablename.colname' as
it normally does for a UNION (note that it's supposed to just be
'colname' for both, but we work around it).  The column translation
logic here has been adjusted to retrieve the rightmost token, rather
than the second token, so it works in both cases.   Workaround
courtesy Tony Roberts.
fixes #3211

doc/build/changelog/changelog_09.rst
lib/sqlalchemy/dialects/sqlite/base.py
test/dialect/test_sqlite.py

index e5d6703e3a9c905f124031495587d44a60c0fec2..e3d9175cb32e109f38622aee491adc0eb2f01df1 100644 (file)
 .. changelog::
     :version: 0.9.8
 
+    .. change::
+        :tags: bug, sqlite
+        :versions: 1.0.0
+        :tickets: 3211
+
+        When selecting from a UNION using an attached database file,
+        the pysqlite driver reports column names in cursor.description
+        as 'dbname.tablename.colname', instead of 'tablename.colname' as
+        it normally does for a UNION (note that it's supposed to just be
+        'colname' for both, but we work around it).  The column translation
+        logic here has been adjusted to retrieve the rightmost token, rather
+        than the second token, so it works in both cases.   Workaround
+        courtesy Tony Roberts.
+
     .. change::
         :tags: bug, postgresql
         :versions: 1.0.0
index cc9b78b053214d2a7faa274e48c60f37eb295e73..cf02991d270f7002ad4ce9d4da04e4c547fca5b9 100644 (file)
@@ -713,10 +713,12 @@ class SQLiteExecutionContext(default.DefaultExecutionContext):
         return self.execution_options.get("sqlite_raw_colnames", False)
 
     def _translate_colname(self, colname):
-        # adjust for dotted column names. SQLite in the case of UNION may
-        # store col names as "tablename.colname" in cursor.description
+        # adjust for dotted column names.  SQLite
+        # in the case of UNION may store col names as
+        # "tablename.colname", or if using an attached database,
+        # "database.tablename.colname", in cursor.description
         if not self._preserve_raw_colnames and "." in colname:
-            return colname.split(".")[1], colname
+            return colname.split(".")[-1], colname
         else:
             return colname, None
 
index e77a039806b3274ddf5152ea5cb6e96985ca7363..d5c0410f6a301cfe055c27eb4d4b8abec333479b 100644 (file)
@@ -11,7 +11,7 @@ from sqlalchemy import Table, select, bindparam, Column,\
     UniqueConstraint
 from sqlalchemy.types import Integer, String, Boolean, DateTime, Date, Time
 from sqlalchemy import types as sqltypes
-from sqlalchemy import event
+from sqlalchemy import event, inspect
 from sqlalchemy.util import u, ue
 from sqlalchemy import exc, sql, schema, pool, util
 from sqlalchemy.dialects.sqlite import base as sqlite, \
@@ -480,40 +480,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
         assert u('méil') in result.keys()
         assert ue('\u6e2c\u8a66') in result.keys()
 
-    def test_attached_as_schema(self):
-        cx = testing.db.connect()
-        try:
-            cx.execute('ATTACH DATABASE ":memory:" AS  test_schema')
-            dialect = cx.dialect
-            assert dialect.get_table_names(cx, 'test_schema') == []
-            meta = MetaData(cx)
-            Table('created', meta, Column('id', Integer),
-                  schema='test_schema')
-            alt_master = Table('sqlite_master', meta, autoload=True,
-                               schema='test_schema')
-            meta.create_all(cx)
-            eq_(dialect.get_table_names(cx, 'test_schema'), ['created'])
-            assert len(alt_master.c) > 0
-            meta.clear()
-            reflected = Table('created', meta, autoload=True,
-                              schema='test_schema')
-            assert len(reflected.c) == 1
-            cx.execute(reflected.insert(), dict(id=1))
-            r = cx.execute(reflected.select()).fetchall()
-            assert list(r) == [(1, )]
-            cx.execute(reflected.update(), dict(id=2))
-            r = cx.execute(reflected.select()).fetchall()
-            assert list(r) == [(2, )]
-            cx.execute(reflected.delete(reflected.c.id == 2))
-            r = cx.execute(reflected.select()).fetchall()
-            assert list(r) == []
-
-            # note that sqlite_master is cleared, above
-
-            meta.drop_all()
-            assert dialect.get_table_names(cx, 'test_schema') == []
-        finally:
-            cx.execute('DETACH DATABASE test_schema')
 
     @testing.exclude('sqlite', '<', (2, 6), 'no database support')
     def test_temp_table_reflection(self):
@@ -549,7 +515,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
         e = create_engine('sqlite+pysqlite:///foo.db')
         assert e.pool.__class__ is pool.NullPool
 
-
     def test_dont_reflect_autoindex(self):
         meta = MetaData(testing.db)
         t = Table('foo', meta, Column('bar', String, primary_key=True))
@@ -576,6 +541,107 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
             meta.drop_all()
 
 
+class AttachedMemoryDBTest(fixtures.TestBase):
+    __only_on__ = 'sqlite'
+
+    dbname = None
+
+    def setUp(self):
+        self.conn = conn = testing.db.connect()
+        if self.dbname is None:
+            dbname = ':memory:'
+        else:
+            dbname = self.dbname
+        conn.execute('ATTACH DATABASE "%s" AS  test_schema' % dbname)
+        self.metadata = MetaData()
+
+    def tearDown(self):
+        self.metadata.drop_all(self.conn)
+        self.conn.execute('DETACH DATABASE test_schema')
+        if self.dbname:
+            os.remove(self.dbname)
+
+    def _fixture(self):
+        meta = self.metadata
+        ct = Table(
+            'created', meta,
+            Column('id', Integer),
+            Column('name', String),
+            schema='test_schema')
+
+        meta.create_all(self.conn)
+        return ct
+
+    def test_no_tables(self):
+        insp = inspect(self.conn)
+        eq_(insp.get_table_names("test_schema"), [])
+
+    def test_table_names_present(self):
+        self._fixture()
+        insp = inspect(self.conn)
+        eq_(insp.get_table_names("test_schema"), ["created"])
+
+    def test_table_names_system(self):
+        self._fixture()
+        insp = inspect(self.conn)
+        eq_(insp.get_table_names("test_schema"), ["created"])
+
+    def test_reflect_system_table(self):
+        meta = MetaData(self.conn)
+        alt_master = Table(
+            'sqlite_master', meta, autoload=True,
+            autoload_with=self.conn,
+            schema='test_schema')
+        assert len(alt_master.c) > 0
+
+    def test_reflect_user_table(self):
+        self._fixture()
+
+        m2 = MetaData()
+        c2 = Table('created', m2, autoload=True, autoload_with=self.conn)
+        eq_(len(c2.c), 2)
+
+    def test_crud(self):
+        ct = self._fixture()
+
+        self.conn.execute(ct.insert(), {'id': 1, 'name': 'foo'})
+        eq_(
+            self.conn.execute(ct.select()).fetchall(),
+            [(1, 'foo')]
+        )
+
+        self.conn.execute(ct.update(), {'id': 2, 'name': 'bar'})
+        eq_(
+            self.conn.execute(ct.select()).fetchall(),
+            [(2, 'bar')]
+        )
+        self.conn.execute(ct.delete())
+        eq_(
+            self.conn.execute(ct.select()).fetchall(),
+            []
+        )
+
+    def test_col_targeting(self):
+        ct = self._fixture()
+
+        self.conn.execute(ct.insert(), {'id': 1, 'name': 'foo'})
+        row = self.conn.execute(ct.select()).first()
+        eq_(row['id'], 1)
+        eq_(row['name'], 'foo')
+
+    def test_col_targeting_union(self):
+        ct = self._fixture()
+
+        self.conn.execute(ct.insert(), {'id': 1, 'name': 'foo'})
+        row = self.conn.execute(ct.select().union(ct.select())).first()
+        eq_(row['id'], 1)
+        eq_(row['name'], 'foo')
+
+
+class AttachedFileDBTest(AttachedMemoryDBTest):
+    dbname = 'attached_db.db'
+
+
 class SQLTest(fixtures.TestBase, AssertsCompiledSQL):
 
     """Tests SQLite-dialect specific compilation."""