From 49f4d98c04f2224903e54e0eb3e3f901a27d1e38 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Mon, 29 Sep 2014 18:09:25 -0400 Subject: [PATCH] - rework tests for attached databases into individual tests, test both memory and file-based - When selecting from a UNION using an attached database file, the pysqlite driver reports column names in cursor.description as 'dbname.tablename.colname', instead of 'tablename.colname' as it normally does for a UNION (note that it's supposed to just be 'colname' for both, but we work around it). The column translation logic here has been adjusted to retrieve the rightmost token, rather than the second token, so it works in both cases. Workaround courtesy Tony Roberts. fixes #3211 --- doc/build/changelog/changelog_09.rst | 14 +++ lib/sqlalchemy/dialects/sqlite/base.py | 8 +- test/dialect/test_sqlite.py | 138 ++++++++++++++++++------- 3 files changed, 121 insertions(+), 39 deletions(-) diff --git a/doc/build/changelog/changelog_09.rst b/doc/build/changelog/changelog_09.rst index e5d6703e3a..e3d9175cb3 100644 --- a/doc/build/changelog/changelog_09.rst +++ b/doc/build/changelog/changelog_09.rst @@ -13,6 +13,20 @@ .. changelog:: :version: 0.9.8 + .. change:: + :tags: bug, sqlite + :versions: 1.0.0 + :tickets: 3211 + + When selecting from a UNION using an attached database file, + the pysqlite driver reports column names in cursor.description + as 'dbname.tablename.colname', instead of 'tablename.colname' as + it normally does for a UNION (note that it's supposed to just be + 'colname' for both, but we work around it). The column translation + logic here has been adjusted to retrieve the rightmost token, rather + than the second token, so it works in both cases. Workaround + courtesy Tony Roberts. + .. change:: :tags: bug, postgresql :versions: 1.0.0 diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index cc9b78b053..cf02991d27 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -713,10 +713,12 @@ class SQLiteExecutionContext(default.DefaultExecutionContext): return self.execution_options.get("sqlite_raw_colnames", False) def _translate_colname(self, colname): - # adjust for dotted column names. SQLite in the case of UNION may - # store col names as "tablename.colname" in cursor.description + # adjust for dotted column names. SQLite + # in the case of UNION may store col names as + # "tablename.colname", or if using an attached database, + # "database.tablename.colname", in cursor.description if not self._preserve_raw_colnames and "." in colname: - return colname.split(".")[1], colname + return colname.split(".")[-1], colname else: return colname, None diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index e77a039806..d5c0410f6a 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -11,7 +11,7 @@ from sqlalchemy import Table, select, bindparam, Column,\ UniqueConstraint from sqlalchemy.types import Integer, String, Boolean, DateTime, Date, Time from sqlalchemy import types as sqltypes -from sqlalchemy import event +from sqlalchemy import event, inspect from sqlalchemy.util import u, ue from sqlalchemy import exc, sql, schema, pool, util from sqlalchemy.dialects.sqlite import base as sqlite, \ @@ -480,40 +480,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): assert u('méil') in result.keys() assert ue('\u6e2c\u8a66') in result.keys() - def test_attached_as_schema(self): - cx = testing.db.connect() - try: - cx.execute('ATTACH DATABASE ":memory:" AS test_schema') - dialect = cx.dialect - assert dialect.get_table_names(cx, 'test_schema') == [] - meta = MetaData(cx) - Table('created', meta, Column('id', Integer), - schema='test_schema') - alt_master = Table('sqlite_master', meta, autoload=True, - schema='test_schema') - meta.create_all(cx) - eq_(dialect.get_table_names(cx, 'test_schema'), ['created']) - assert len(alt_master.c) > 0 - meta.clear() - reflected = Table('created', meta, autoload=True, - schema='test_schema') - assert len(reflected.c) == 1 - cx.execute(reflected.insert(), dict(id=1)) - r = cx.execute(reflected.select()).fetchall() - assert list(r) == [(1, )] - cx.execute(reflected.update(), dict(id=2)) - r = cx.execute(reflected.select()).fetchall() - assert list(r) == [(2, )] - cx.execute(reflected.delete(reflected.c.id == 2)) - r = cx.execute(reflected.select()).fetchall() - assert list(r) == [] - - # note that sqlite_master is cleared, above - - meta.drop_all() - assert dialect.get_table_names(cx, 'test_schema') == [] - finally: - cx.execute('DETACH DATABASE test_schema') @testing.exclude('sqlite', '<', (2, 6), 'no database support') def test_temp_table_reflection(self): @@ -549,7 +515,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): e = create_engine('sqlite+pysqlite:///foo.db') assert e.pool.__class__ is pool.NullPool - def test_dont_reflect_autoindex(self): meta = MetaData(testing.db) t = Table('foo', meta, Column('bar', String, primary_key=True)) @@ -576,6 +541,107 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): meta.drop_all() +class AttachedMemoryDBTest(fixtures.TestBase): + __only_on__ = 'sqlite' + + dbname = None + + def setUp(self): + self.conn = conn = testing.db.connect() + if self.dbname is None: + dbname = ':memory:' + else: + dbname = self.dbname + conn.execute('ATTACH DATABASE "%s" AS test_schema' % dbname) + self.metadata = MetaData() + + def tearDown(self): + self.metadata.drop_all(self.conn) + self.conn.execute('DETACH DATABASE test_schema') + if self.dbname: + os.remove(self.dbname) + + def _fixture(self): + meta = self.metadata + ct = Table( + 'created', meta, + Column('id', Integer), + Column('name', String), + schema='test_schema') + + meta.create_all(self.conn) + return ct + + def test_no_tables(self): + insp = inspect(self.conn) + eq_(insp.get_table_names("test_schema"), []) + + def test_table_names_present(self): + self._fixture() + insp = inspect(self.conn) + eq_(insp.get_table_names("test_schema"), ["created"]) + + def test_table_names_system(self): + self._fixture() + insp = inspect(self.conn) + eq_(insp.get_table_names("test_schema"), ["created"]) + + def test_reflect_system_table(self): + meta = MetaData(self.conn) + alt_master = Table( + 'sqlite_master', meta, autoload=True, + autoload_with=self.conn, + schema='test_schema') + assert len(alt_master.c) > 0 + + def test_reflect_user_table(self): + self._fixture() + + m2 = MetaData() + c2 = Table('created', m2, autoload=True, autoload_with=self.conn) + eq_(len(c2.c), 2) + + def test_crud(self): + ct = self._fixture() + + self.conn.execute(ct.insert(), {'id': 1, 'name': 'foo'}) + eq_( + self.conn.execute(ct.select()).fetchall(), + [(1, 'foo')] + ) + + self.conn.execute(ct.update(), {'id': 2, 'name': 'bar'}) + eq_( + self.conn.execute(ct.select()).fetchall(), + [(2, 'bar')] + ) + self.conn.execute(ct.delete()) + eq_( + self.conn.execute(ct.select()).fetchall(), + [] + ) + + def test_col_targeting(self): + ct = self._fixture() + + self.conn.execute(ct.insert(), {'id': 1, 'name': 'foo'}) + row = self.conn.execute(ct.select()).first() + eq_(row['id'], 1) + eq_(row['name'], 'foo') + + def test_col_targeting_union(self): + ct = self._fixture() + + self.conn.execute(ct.insert(), {'id': 1, 'name': 'foo'}) + row = self.conn.execute(ct.select().union(ct.select())).first() + eq_(row['id'], 1) + eq_(row['name'], 'foo') + + +class AttachedFileDBTest(AttachedMemoryDBTest): + dbname = 'attached_db.db' + + class SQLTest(fixtures.TestBase, AssertsCompiledSQL): """Tests SQLite-dialect specific compilation.""" -- 2.47.3