import datetime
import warnings
-from sqlalchemy import util, schema, exceptions, pool
-from sqlalchemy.sql import compiler
-from sqlalchemy.engine import default, base
-from sqlalchemy import types as sqltypes
+from sqlalchemy import exceptions, pool, schema, types as sqltypes, util
+from sqlalchemy.engine import base, default
+from sqlalchemy.sql import compiler, text
_initialized_kb = False
def has_table(self, connection, table_name, schema=None):
tblqry = """
- SELECT count(*)
- FROM rdb$relations r
- WHERE r.rdb$relation_name=?
+ SELECT 1 FROM rdb$database
+ WHERE EXISTS (SELECT rdb$relation_name
+ FROM rdb$relations
+ WHERE rdb$relation_name=?)
"""
c = connection.execute(tblqry, [self._denormalize_name(table_name)])
row = c.fetchone()
- if row[0] > 0:
+ if row is not None:
+ return True
+ else:
+ return False
+
+ def has_sequence(self, connection, sequence_name):
+ genqry = """
+ SELECT 1 FROM rdb$database
+ WHERE EXISTS (SELECT rdb$generator_name
+ FROM rdb$generators
+ WHERE rdb$generator_name=?)
+ """
+ c = connection.execute(genqry, [self._denormalize_name(sequence_name)])
+ row = c.fetchone()
+ if row is not None:
return True
else:
return False
f.rdb$field_sub_type AS stype,
f.rdb$field_length AS flen,
f.rdb$field_precision AS fprec,
- f.rdb$field_scale AS fscale
+ f.rdb$field_scale AS fscale,
+ COALESCE(r.rdb$default_source, f.rdb$default_source) AS fdefault
FROM rdb$relation_fields r
JOIN rdb$fields f ON r.rdb$field_source=f.rdb$field_name
JOIN rdb$types t ON t.rdb$type=f.rdb$field_type AND t.rdb$field_name='RDB$FIELD_TYPE'
# is it a primary key?
kw['primary_key'] = name in pkfields
- # is it nullable ?
+ # is it nullable?
kw['nullable'] = not bool(row['null_flag'])
+ # does it have a default value?
+ if row['fdefault'] is not None:
+ # the value comes down as "DEFAULT 'value'"
+ defvalue = row['fdefault'][8:]
+ args.append(schema.PassiveDefault(text(defvalue)))
+
table.append_column(schema.Column(*args, **kw))
if not found_table:
fks[cname] = fk = ([], [])
rname = self._normalize_name(row['targetrname'])
schema.Table(rname, table.metadata, autoload=True, autoload_with=connection)
- fname = self._normalize_name(row['fieldname'])
+ fname = self._normalize_name(row['fname'])
refspec = rname + '.' + self._normalize_name(row['targetfname'])
fk[0].append(fname)
fk[1].append(refspec)
if not "attempt to store duplicate value" in str(e):
raise e
con.execute('''CREATE TABLE testtable (question int_domain,
- answer str_domain,
+ answer str_domain DEFAULT 'no answer',
remark rem_domain,
photo img_domain,
d date,
set(['question', 'answer', 'remark', 'photo', 'd', 't', 'dt']),
"Columns of reflected table didn't equal expected columns")
self.assertEquals(table.c.question.type.__class__, firebird.FBInteger)
+ self.assertEquals(table.c.question.default.arg.text, "42")
self.assertEquals(table.c.answer.type.__class__, firebird.FBString)
+ self.assertEquals(table.c.answer.default.arg.text, "'no answer'")
self.assertEquals(table.c.remark.type.__class__, firebird.FBText)
self.assertEquals(table.c.photo.type.__class__, firebird.FBBinary)
# The following assume a Dialect 3 database