]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- rework tests for attached databases into individual tests,
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 29 Sep 2014 22:09:25 +0000 (18:09 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 29 Sep 2014 22:09:25 +0000 (18:09 -0400)
test both memory and file-based
- When selecting from a UNION using an attached database file,
the pysqlite driver reports column names in cursor.description
as 'dbname.tablename.colname', instead of 'tablename.colname' as
it normally does for a UNION (note that it's supposed to just be
'colname' for both, but we work around it).  The column translation
logic here has been adjusted to retrieve the rightmost token, rather
than the second token, so it works in both cases.   Workaround
courtesy Tony Roberts.
fixes #3211

doc/build/changelog/changelog_09.rst
lib/sqlalchemy/dialects/sqlite/base.py
test/dialect/test_sqlite.py

index e5d6703e3a9c905f124031495587d44a60c0fec2..e3d9175cb32e109f38622aee491adc0eb2f01df1 100644 (file)
 .. changelog::
     :version: 0.9.8
 
+    .. change::
+        :tags: bug, sqlite
+        :versions: 1.0.0
+        :tickets: 3211
+
+        When selecting from a UNION using an attached database file,
+        the pysqlite driver reports column names in cursor.description
+        as 'dbname.tablename.colname', instead of 'tablename.colname' as
+        it normally does for a UNION (note that it's supposed to just be
+        'colname' for both, but we work around it).  The column translation
+        logic here has been adjusted to retrieve the rightmost token, rather
+        than the second token, so it works in both cases.   Workaround
+        courtesy Tony Roberts.
+
     .. change::
         :tags: bug, postgresql
         :versions: 1.0.0
index 817834b7d56076448137e3c2b2c983e8468c3cb7..335b35c94746afaf7900f71bbc43467bdf673af6 100644 (file)
@@ -713,10 +713,12 @@ class SQLiteExecutionContext(default.DefaultExecutionContext):
         return self.execution_options.get("sqlite_raw_colnames", False)
 
     def _translate_colname(self, colname):
-        # adjust for dotted column names. SQLite in the case of UNION may
-        # store col names as "tablename.colname" in cursor.description
+        # adjust for dotted column names.  SQLite
+        # in the case of UNION may store col names as
+        # "tablename.colname", or if using an attached database,
+        # "database.tablename.colname", in cursor.description
         if not self._preserve_raw_colnames and "." in colname:
-            return colname.split(".")[1], colname
+            return colname.split(".")[-1], colname
         else:
             return colname, None
 
index ae721224511ee8ea61732917334f2ad86e8530c9..124208dbe7b3c8430891806f57c3a4a9d0082da4 100644 (file)
@@ -11,7 +11,7 @@ from sqlalchemy import Table, select, bindparam, Column,\
     UniqueConstraint
 from sqlalchemy.types import Integer, String, Boolean, DateTime, Date, Time
 from sqlalchemy import types as sqltypes
-from sqlalchemy import event
+from sqlalchemy import event, inspect
 from sqlalchemy.util import u, ue
 from sqlalchemy import exc, sql, schema, pool, util
 from sqlalchemy.dialects.sqlite import base as sqlite, \
@@ -480,40 +480,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
         assert u('méil') in result.keys()
         assert ue('\u6e2c\u8a66') in result.keys()
 
-    def test_attached_as_schema(self):
-        cx = testing.db.connect()
-        try:
-            cx.execute('ATTACH DATABASE ":memory:" AS  test_schema')
-            dialect = cx.dialect
-            assert dialect.get_table_names(cx, 'test_schema') == []
-            meta = MetaData(cx)
-            Table('created', meta, Column('id', Integer),
-                  schema='test_schema')
-            alt_master = Table('sqlite_master', meta, autoload=True,
-                               schema='test_schema')
-            meta.create_all(cx)
-            eq_(dialect.get_table_names(cx, 'test_schema'), ['created'])
-            assert len(alt_master.c) > 0
-            meta.clear()
-            reflected = Table('created', meta, autoload=True,
-                              schema='test_schema')
-            assert len(reflected.c) == 1
-            cx.execute(reflected.insert(), dict(id=1))
-            r = cx.execute(reflected.select()).fetchall()
-            assert list(r) == [(1, )]
-            cx.execute(reflected.update(), dict(id=2))
-            r = cx.execute(reflected.select()).fetchall()
-            assert list(r) == [(2, )]
-            cx.execute(reflected.delete(reflected.c.id == 2))
-            r = cx.execute(reflected.select()).fetchall()
-            assert list(r) == []
-
-            # note that sqlite_master is cleared, above
-
-            meta.drop_all()
-            assert dialect.get_table_names(cx, 'test_schema') == []
-        finally:
-            cx.execute('DETACH DATABASE test_schema')
 
     def test_file_path_is_absolute(self):
         d = pysqlite_dialect.dialect()
@@ -532,7 +498,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
         e = create_engine('sqlite+pysqlite:///foo.db')
         assert e.pool.__class__ is pool.NullPool
 
-
     def test_dont_reflect_autoindex(self):
         meta = MetaData(testing.db)
         t = Table('foo', meta, Column('bar', String, primary_key=True))
@@ -577,6 +542,107 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
             meta.drop_all()
 
 
+class AttachedMemoryDBTest(fixtures.TestBase):
+    __only_on__ = 'sqlite'
+
+    dbname = None
+
+    def setUp(self):
+        self.conn = conn = testing.db.connect()
+        if self.dbname is None:
+            dbname = ':memory:'
+        else:
+            dbname = self.dbname
+        conn.execute('ATTACH DATABASE "%s" AS  test_schema' % dbname)
+        self.metadata = MetaData()
+
+    def tearDown(self):
+        self.metadata.drop_all(self.conn)
+        self.conn.execute('DETACH DATABASE test_schema')
+        if self.dbname:
+            os.remove(self.dbname)
+
+    def _fixture(self):
+        meta = self.metadata
+        ct = Table(
+            'created', meta,
+            Column('id', Integer),
+            Column('name', String),
+            schema='test_schema')
+
+        meta.create_all(self.conn)
+        return ct
+
+    def test_no_tables(self):
+        insp = inspect(self.conn)
+        eq_(insp.get_table_names("test_schema"), [])
+
+    def test_table_names_present(self):
+        self._fixture()
+        insp = inspect(self.conn)
+        eq_(insp.get_table_names("test_schema"), ["created"])
+
+    def test_table_names_system(self):
+        self._fixture()
+        insp = inspect(self.conn)
+        eq_(insp.get_table_names("test_schema"), ["created"])
+
+    def test_reflect_system_table(self):
+        meta = MetaData(self.conn)
+        alt_master = Table(
+            'sqlite_master', meta, autoload=True,
+            autoload_with=self.conn,
+            schema='test_schema')
+        assert len(alt_master.c) > 0
+
+    def test_reflect_user_table(self):
+        self._fixture()
+
+        m2 = MetaData()
+        c2 = Table('created', m2, autoload=True, autoload_with=self.conn)
+        eq_(len(c2.c), 2)
+
+    def test_crud(self):
+        ct = self._fixture()
+
+        self.conn.execute(ct.insert(), {'id': 1, 'name': 'foo'})
+        eq_(
+            self.conn.execute(ct.select()).fetchall(),
+            [(1, 'foo')]
+        )
+
+        self.conn.execute(ct.update(), {'id': 2, 'name': 'bar'})
+        eq_(
+            self.conn.execute(ct.select()).fetchall(),
+            [(2, 'bar')]
+        )
+        self.conn.execute(ct.delete())
+        eq_(
+            self.conn.execute(ct.select()).fetchall(),
+            []
+        )
+
+    def test_col_targeting(self):
+        ct = self._fixture()
+
+        self.conn.execute(ct.insert(), {'id': 1, 'name': 'foo'})
+        row = self.conn.execute(ct.select()).first()
+        eq_(row['id'], 1)
+        eq_(row['name'], 'foo')
+
+    def test_col_targeting_union(self):
+        ct = self._fixture()
+
+        self.conn.execute(ct.insert(), {'id': 1, 'name': 'foo'})
+        row = self.conn.execute(ct.select().union(ct.select())).first()
+        eq_(row['id'], 1)
+        eq_(row['name'], 'foo')
+
+
+class AttachedFileDBTest(AttachedMemoryDBTest):
+    dbname = 'attached_db.db'
+
+
 class SQLTest(fixtures.TestBase, AssertsCompiledSQL):
 
     """Tests SQLite-dialect specific compilation."""