]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- [bug] Adjusted column default reflection code to
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 23 Sep 2012 15:30:07 +0000 (11:30 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 23 Sep 2012 15:30:07 +0000 (11:30 -0400)
convert non-string values to string, to accommodate
old SQLite versions that don't deliver
default info as a string.  [ticket:2265]
- factor sqlite column reflection to be like we did for postgresql,
in a separate method.

CHANGES
lib/sqlalchemy/dialects/sqlite/base.py
lib/sqlalchemy/engine/reflection.py
test/dialect/test_sqlite.py

diff --git a/CHANGES b/CHANGES
index e16f995e3c12c9bfdd2f025e9f71e71bc842dd6b..590ac62a9ddbc954826a3dfcf12238d6c8ed7c86 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -156,6 +156,11 @@ CHANGES
     its name, as *part* of its name (i.e. """mytable""").
     [ticket:2568]
 
+  - [bug] Adjusted column default reflection code to
+    convert non-string values to string, to accommodate
+    old SQLite versions that don't deliver
+    default info as a string.  [ticket:2265]
+
 - mysql
   - [bug] Updated mysqlconnector interface to use
     updated "client flag" and "charset" APIs,
index 1c6f0d88ab36356e227f61d919ab0459673cc5f4..fb0f78619c163552788c187c4976675b4e49c359 100644 (file)
@@ -672,42 +672,50 @@ class SQLiteDialect(default.DefaultDialect):
         c = _pragma_cursor(
                     connection.execute("%stable_info(%s)" %
                     (pragma, qtable)))
-        found_table = False
+
+        rows = c.fetchall()
         columns = []
-        while True:
-            row = c.fetchone()
-            if row is None:
-                break
-            (name, type_, nullable, default, has_default, primary_key) = \
+        for row in rows:
+            (name, type_, nullable, default, primary_key) = \
                 (row[1], row[2].upper(), not row[3],
-                row[4], row[4] is not None, row[5])
-            match = re.match(r'(\w+)(\(.*?\))?', type_)
-            if match:
-                coltype = match.group(1)
-                args = match.group(2)
-            else:
-                coltype = "VARCHAR"
-                args = ''
-            try:
-                coltype = self.ischema_names[coltype]
-                if args is not None:
-                    args = re.findall(r'(\d+)', args)
-                    coltype = coltype(*[int(a) for a in args])
-            except KeyError:
-                util.warn("Did not recognize type '%s' of column '%s'" %
-                          (coltype, name))
-                coltype = sqltypes.NullType()
-
-            columns.append({
-                'name' : name,
-                'type' : coltype,
-                'nullable' : nullable,
-                'default' : default,
-                'autoincrement':default is None,
-                'primary_key': primary_key
-            })
+                row[4], row[5])
+
+            columns.append(self._get_column_info(name, type_, nullable,
+                                    default, primary_key))
         return columns
 
+    def _get_column_info(self, name, type_, nullable,
+                                    default, primary_key):
+
+        match = re.match(r'(\w+)(\(.*?\))?', type_)
+        if match:
+            coltype = match.group(1)
+            args = match.group(2)
+        else:
+            coltype = "VARCHAR"
+            args = ''
+        try:
+            coltype = self.ischema_names[coltype]
+            if args is not None:
+                args = re.findall(r'(\d+)', args)
+                coltype = coltype(*[int(a) for a in args])
+        except KeyError:
+            util.warn("Did not recognize type '%s' of column '%s'" %
+                      (coltype, name))
+            coltype = sqltypes.NullType()
+
+        if default is not None:
+            default = unicode(default)
+
+        return {
+            'name': name,
+            'type': coltype,
+            'nullable': nullable,
+            'default': default,
+            'autoincrement': default is None,
+            'primary_key': primary_key
+        }
+
     @reflection.cache
     def get_primary_keys(self, connection, table_name, schema=None, **kw):
         cols = self.get_columns(connection, table_name, schema, **kw)
@@ -747,7 +755,7 @@ class SQLiteDialect(default.DefaultDialect):
                 fk = fks[numerical_id]
             except KeyError:
                 fk = {
-                    'name' : None,
+                    'name': None,
                     'constrained_columns' : [],
                     'referred_schema' : None,
                     'referred_table' : rtbl,
@@ -803,4 +811,5 @@ def _pragma_cursor(cursor):
 
     if cursor.closed:
         cursor.fetchone = lambda: None
+        cursor.fetchall = lambda: []
     return cursor
index 76cb5bdaa4328f1096ba3b737ba8bc4d3d374d4c..23c2e726571f791bc89471a30124056477d969ec 100644 (file)
@@ -379,7 +379,7 @@ class Inspector(object):
 
             coltype = col_d['type']
             col_kw = {
-                'nullable':col_d['nullable'],
+                'nullable': col_d['nullable'],
             }
             for k in ('autoincrement', 'quote', 'info', 'key'):
                 if k in col_d:
index a4c9b4e7c786d8f600b6cab2a75cec6d613eb4d4..f84499067d31773c53ab85d89af0d31bd954a5c4 100644 (file)
@@ -261,7 +261,7 @@ class DefaultsTest(fixtures.TestBase, AssertsCompiledSQL):
 
     @testing.provide_metadata
     def test_boolean_default(self):
-        t= Table("t", self.metadata,
+        t = Table("t", self.metadata,
                 Column("x", Boolean, server_default=sql.false()))
         t.create(testing.db)
         testing.db.execute(t.insert())
@@ -271,6 +271,19 @@ class DefaultsTest(fixtures.TestBase, AssertsCompiledSQL):
             [(False,), (True,)]
         )
 
+    def test_old_style_default(self):
+        """test non-quoted integer value on older sqlite pragma"""
+
+        dialect = sqlite.dialect()
+        eq_(
+            dialect._get_column_info("foo", "INTEGER", False, 3, False),
+            {'primary_key': False, 'nullable': False,
+                'default': '3', 'autoincrement': False,
+                'type': INTEGER, 'name': 'foo'}
+        )
+
+
+
 
 class DialectTest(fixtures.TestBase, AssertsExecutionResults):