]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Reflect Firebird PassiveDefaults
authorLele Gaifax <lele@metapensiero.it>
Wed, 12 Dec 2007 16:08:07 +0000 (16:08 +0000)
committerLele Gaifax <lele@metapensiero.it>
Wed, 12 Dec 2007 16:08:07 +0000 (16:08 +0000)
 - column's default values are properly reflected (also those coming from DOMAINs)
 - implemented .has_sequence()
 - fix type on FK reflection

lib/sqlalchemy/databases/firebird.py
test/dialect/firebird.py

index fac0f875478e705e52878cb1db71fce6d033a86c..319e460626f65d469873733cac9fe8b9a7c28c04 100644 (file)
@@ -8,10 +8,9 @@
 import datetime
 import warnings
 
-from sqlalchemy import util, schema, exceptions, pool
-from sqlalchemy.sql import compiler
-from sqlalchemy.engine import default, base
-from sqlalchemy import types as sqltypes
+from sqlalchemy import exceptions, pool, schema, types as sqltypes, util
+from sqlalchemy.engine import base, default
+from sqlalchemy.sql import compiler, text
 
 
 _initialized_kb = False
@@ -224,13 +223,28 @@ class FBDialect(default.DefaultDialect):
 
     def has_table(self, connection, table_name, schema=None):
         tblqry = """
-        SELECT count(*)
-        FROM rdb$relations r
-        WHERE r.rdb$relation_name=?
+        SELECT 1 FROM rdb$database
+        WHERE EXISTS (SELECT rdb$relation_name
+                      FROM rdb$relations
+                      WHERE rdb$relation_name=?)
         """
         c = connection.execute(tblqry, [self._denormalize_name(table_name)])
         row = c.fetchone()
-        if row[0] > 0:
+        if row is not None:
+            return True
+        else:
+            return False
+
+    def has_sequence(self, connection, sequence_name):
+        genqry = """
+        SELECT 1 FROM rdb$database
+        WHERE EXISTS (SELECT rdb$generator_name
+                      FROM rdb$generators
+                      WHERE rdb$generator_name=?)
+        """
+        c = connection.execute(genqry, [self._denormalize_name(sequence_name)])
+        row = c.fetchone()
+        if row is not None:
             return True
         else:
             return False
@@ -250,7 +264,8 @@ class FBDialect(default.DefaultDialect):
                         f.rdb$field_sub_type AS stype,
                         f.rdb$field_length AS flen,
                         f.rdb$field_precision AS fprec,
-                        f.rdb$field_scale AS fscale
+                        f.rdb$field_scale AS fscale,
+                        COALESCE(r.rdb$default_source, f.rdb$default_source) AS fdefault
         FROM rdb$relation_fields r
              JOIN rdb$fields f ON r.rdb$field_source=f.rdb$field_name
              JOIN rdb$types t ON t.rdb$type=f.rdb$field_type AND t.rdb$field_name='RDB$FIELD_TYPE'
@@ -313,9 +328,15 @@ class FBDialect(default.DefaultDialect):
             # is it a primary key?
             kw['primary_key'] = name in pkfields
 
-            # is it nullable ?
+            # is it nullable?
             kw['nullable'] = not bool(row['null_flag'])
 
+            # does it have a default value?
+            if row['fdefault'] is not None:
+                # the value comes down as "DEFAULT 'value'"
+                defvalue = row['fdefault'][8:]
+                args.append(schema.PassiveDefault(text(defvalue)))
+
             table.append_column(schema.Column(*args, **kw))
 
         if not found_table:
@@ -335,7 +356,7 @@ class FBDialect(default.DefaultDialect):
                 fks[cname] = fk = ([], [])
             rname = self._normalize_name(row['targetrname'])
             schema.Table(rname, table.metadata, autoload=True, autoload_with=connection)
-            fname = self._normalize_name(row['fieldname'])
+            fname = self._normalize_name(row['fname'])
             refspec = rname + '.' + self._normalize_name(row['targetfname'])
             fk[0].append(fname)
             fk[1].append(refspec)
index f3cd57aa7334d9a543f9eed1e5d29c8f4c5af01d..2c527d37294826ffecf9feeb0d9c6ce66a7ff5cc 100644 (file)
@@ -27,7 +27,7 @@ class DomainReflectionTest(AssertMixin):
             if not "attempt to store duplicate value" in str(e):
                 raise e
         con.execute('''CREATE TABLE testtable (question int_domain,
-                                               answer str_domain,
+                                               answer str_domain DEFAULT 'no answer',
                                                remark rem_domain,
                                                photo img_domain,
                                                d date,
@@ -51,7 +51,9 @@ class DomainReflectionTest(AssertMixin):
                           set(['question', 'answer', 'remark', 'photo', 'd', 't', 'dt']),
                           "Columns of reflected table didn't equal expected columns")
         self.assertEquals(table.c.question.type.__class__, firebird.FBInteger)
+        self.assertEquals(table.c.question.default.arg.text, "42")
         self.assertEquals(table.c.answer.type.__class__, firebird.FBString)
+        self.assertEquals(table.c.answer.default.arg.text, "'no answer'")
         self.assertEquals(table.c.remark.type.__class__, firebird.FBText)
         self.assertEquals(table.c.photo.type.__class__, firebird.FBBinary)
         # The following assume a Dialect 3 database