From: Mike Bayer Date: Mon, 29 Sep 2014 22:09:25 +0000 (-0400) Subject: - rework tests for attached databases into individual tests, X-Git-Tag: rel_1_0_0b1~70^2~51 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4da020dae324cb871074e302f4840e8731988be0;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - rework tests for attached databases into individual tests, test both memory and file-based - When selecting from a UNION using an attached database file, the pysqlite driver reports column names in cursor.description as 'dbname.tablename.colname', instead of 'tablename.colname' as it normally does for a UNION (note that it's supposed to just be 'colname' for both, but we work around it). The column translation logic here has been adjusted to retrieve the rightmost token, rather than the second token, so it works in both cases. Workaround courtesy Tony Roberts. fixes #3211 --- diff --git a/doc/build/changelog/changelog_09.rst b/doc/build/changelog/changelog_09.rst index e5d6703e3a..e3d9175cb3 100644 --- a/doc/build/changelog/changelog_09.rst +++ b/doc/build/changelog/changelog_09.rst @@ -13,6 +13,20 @@ .. changelog:: :version: 0.9.8 + .. change:: + :tags: bug, sqlite + :versions: 1.0.0 + :tickets: 3211 + + When selecting from a UNION using an attached database file, + the pysqlite driver reports column names in cursor.description + as 'dbname.tablename.colname', instead of 'tablename.colname' as + it normally does for a UNION (note that it's supposed to just be + 'colname' for both, but we work around it). The column translation + logic here has been adjusted to retrieve the rightmost token, rather + than the second token, so it works in both cases. Workaround + courtesy Tony Roberts. + .. change:: :tags: bug, postgresql :versions: 1.0.0 diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index 817834b7d5..335b35c947 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -713,10 +713,12 @@ class SQLiteExecutionContext(default.DefaultExecutionContext): return self.execution_options.get("sqlite_raw_colnames", False) def _translate_colname(self, colname): - # adjust for dotted column names. SQLite in the case of UNION may - # store col names as "tablename.colname" in cursor.description + # adjust for dotted column names. SQLite + # in the case of UNION may store col names as + # "tablename.colname", or if using an attached database, + # "database.tablename.colname", in cursor.description if not self._preserve_raw_colnames and "." in colname: - return colname.split(".")[1], colname + return colname.split(".")[-1], colname else: return colname, None diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index ae72122451..124208dbe7 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -11,7 +11,7 @@ from sqlalchemy import Table, select, bindparam, Column,\ UniqueConstraint from sqlalchemy.types import Integer, String, Boolean, DateTime, Date, Time from sqlalchemy import types as sqltypes -from sqlalchemy import event +from sqlalchemy import event, inspect from sqlalchemy.util import u, ue from sqlalchemy import exc, sql, schema, pool, util from sqlalchemy.dialects.sqlite import base as sqlite, \ @@ -480,40 +480,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): assert u('méil') in result.keys() assert ue('\u6e2c\u8a66') in result.keys() - def test_attached_as_schema(self): - cx = testing.db.connect() - try: - cx.execute('ATTACH DATABASE ":memory:" AS test_schema') - dialect = cx.dialect - assert dialect.get_table_names(cx, 'test_schema') == [] - meta = MetaData(cx) - Table('created', meta, Column('id', Integer), - schema='test_schema') - alt_master = Table('sqlite_master', meta, autoload=True, - schema='test_schema') - meta.create_all(cx) - eq_(dialect.get_table_names(cx, 'test_schema'), ['created']) - assert len(alt_master.c) > 0 - meta.clear() - reflected = Table('created', meta, autoload=True, - schema='test_schema') - assert len(reflected.c) == 1 - cx.execute(reflected.insert(), dict(id=1)) - r = cx.execute(reflected.select()).fetchall() - assert list(r) == [(1, )] - cx.execute(reflected.update(), dict(id=2)) - r = cx.execute(reflected.select()).fetchall() - assert list(r) == [(2, )] - cx.execute(reflected.delete(reflected.c.id == 2)) - r = cx.execute(reflected.select()).fetchall() - assert list(r) == [] - - # note that sqlite_master is cleared, above - - meta.drop_all() - assert dialect.get_table_names(cx, 'test_schema') == [] - finally: - cx.execute('DETACH DATABASE test_schema') def test_file_path_is_absolute(self): d = pysqlite_dialect.dialect() @@ -532,7 +498,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): e = create_engine('sqlite+pysqlite:///foo.db') assert e.pool.__class__ is pool.NullPool - def test_dont_reflect_autoindex(self): meta = MetaData(testing.db) t = Table('foo', meta, Column('bar', String, primary_key=True)) @@ -577,6 +542,107 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): meta.drop_all() +class AttachedMemoryDBTest(fixtures.TestBase): + __only_on__ = 'sqlite' + + dbname = None + + def setUp(self): + self.conn = conn = testing.db.connect() + if self.dbname is None: + dbname = ':memory:' + else: + dbname = self.dbname + conn.execute('ATTACH DATABASE "%s" AS test_schema' % dbname) + self.metadata = MetaData() + + def tearDown(self): + self.metadata.drop_all(self.conn) + self.conn.execute('DETACH DATABASE test_schema') + if self.dbname: + os.remove(self.dbname) + + def _fixture(self): + meta = self.metadata + ct = Table( + 'created', meta, + Column('id', Integer), + Column('name', String), + schema='test_schema') + + meta.create_all(self.conn) + return ct + + def test_no_tables(self): + insp = inspect(self.conn) + eq_(insp.get_table_names("test_schema"), []) + + def test_table_names_present(self): + self._fixture() + insp = inspect(self.conn) + eq_(insp.get_table_names("test_schema"), ["created"]) + + def test_table_names_system(self): + self._fixture() + insp = inspect(self.conn) + eq_(insp.get_table_names("test_schema"), ["created"]) + + def test_reflect_system_table(self): + meta = MetaData(self.conn) + alt_master = Table( + 'sqlite_master', meta, autoload=True, + autoload_with=self.conn, + schema='test_schema') + assert len(alt_master.c) > 0 + + def test_reflect_user_table(self): + self._fixture() + + m2 = MetaData() + c2 = Table('created', m2, autoload=True, autoload_with=self.conn) + eq_(len(c2.c), 2) + + def test_crud(self): + ct = self._fixture() + + self.conn.execute(ct.insert(), {'id': 1, 'name': 'foo'}) + eq_( + self.conn.execute(ct.select()).fetchall(), + [(1, 'foo')] + ) + + self.conn.execute(ct.update(), {'id': 2, 'name': 'bar'}) + eq_( + self.conn.execute(ct.select()).fetchall(), + [(2, 'bar')] + ) + self.conn.execute(ct.delete()) + eq_( + self.conn.execute(ct.select()).fetchall(), + [] + ) + + def test_col_targeting(self): + ct = self._fixture() + + self.conn.execute(ct.insert(), {'id': 1, 'name': 'foo'}) + row = self.conn.execute(ct.select()).first() + eq_(row['id'], 1) + eq_(row['name'], 'foo') + + def test_col_targeting_union(self): + ct = self._fixture() + + self.conn.execute(ct.insert(), {'id': 1, 'name': 'foo'}) + row = self.conn.execute(ct.select().union(ct.select())).first() + eq_(row['id'], 1) + eq_(row['name'], 'foo') + + +class AttachedFileDBTest(AttachedMemoryDBTest): + dbname = 'attached_db.db' + + class SQLTest(fixtures.TestBase, AssertsCompiledSQL): """Tests SQLite-dialect specific compilation."""