import sqlalchemy.ansisql as ansisql
import sqlalchemy.types as sqltypes
import sqlalchemy.exceptions as exceptions
+
try:
import kinterbasdb
except:
kinterbasdb = None
-
+
dbmodule = kinterbasdb
-
-_initialized_kb = False
+
+_initialized_kb = False
class FBNumeric(sqltypes.Numeric):
def get_col_spec(self):
- return "NUMERIC(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length}
+ if self.precision is None:
+ return "NUMERIC"
+ else:
+ return "NUMERIC(%(precision)s, %(length)s)" % { 'precision': self.precision,
+ 'length' : self.length }
+
+
class FBInteger(sqltypes.Integer):
def get_col_spec(self):
return "INTEGER"
+
+
class FBSmallInteger(sqltypes.Smallinteger):
def get_col_spec(self):
return "SMALLINT"
+
+
class FBDateTime(sqltypes.DateTime):
def get_col_spec(self):
return "TIMESTAMP"
+
+
class FBDate(sqltypes.DateTime):
def get_col_spec(self):
return "DATE"
+
+
class FBText(sqltypes.TEXT):
def get_col_spec(self):
return "BLOB SUB_TYPE 2"
+
+
class FBString(sqltypes.String):
def get_col_spec(self):
return "VARCHAR(%(length)s)" % {'length' : self.length}
+
+
class FBChar(sqltypes.CHAR):
def get_col_spec(self):
return "CHAR(%(length)s)" % {'length' : self.length}
+
+
class FBBinary(sqltypes.Binary):
def get_col_spec(self):
return "BLOB SUB_TYPE 1"
+
+
class FBBoolean(sqltypes.Boolean):
def get_col_spec(self):
return "SMALLINT"
-
+
+
colspecs = {
sqltypes.Integer : FBInteger,
sqltypes.Smallinteger : FBSmallInteger,
sqltypes.CHAR: FBChar,
}
+
def descriptor():
return {'name':'firebird',
'description':'Firebird',
('user', 'Username', None),
('password', 'Password', None)
]}
-
-class FireBirdExecutionContext(default.DefaultExecutionContext):
+
+class FBExecutionContext(default.DefaultExecutionContext):
def supports_sane_rowcount(self):
return True
-
-class FireBirdDialect(ansisql.ANSIDialect):
+
+class FBDialect(ansisql.ANSIDialect):
def __init__(self, module = None, **params):
global _initialized_kb
self.module = module or dbmodule
self.opts = {}
-
+
if not _initialized_kb:
_initialized_kb = True
type_conv = params.get('type_conv', 200) or 200
if isinstance(type_conv, types.StringTypes):
type_conv = int(type_conv)
-
+
concurrency_level = params.get('concurrency_level', 1) or 1
if isinstance(concurrency_level, types.StringTypes):
concurrency_level = int(concurrency_level)
-
+
if kinterbasdb is not None:
kinterbasdb.init(type_conv=type_conv, concurrency_level=concurrency_level)
ansisql.ANSIDialect.__init__(self, **params)
def create_connect_args(self, url):
-# self.opts = url.translate_connect_args(['host', 'database', 'user', 'password'])
opts = url.translate_connect_args(['host', 'database', 'user', 'password', 'port'])
if opts.get('port'):
opts['host'] = "%s/%s" % (opts['host'], opts['port'])
opts.pop('type_conv', None)
opts.pop('concurrency_level', None)
self.opts = opts
-
+
return ([], self.opts)
def create_execution_context(self):
- return FireBirdExecutionContext(self)
+ return FBExecutionContext(self)
def type_descriptor(self, typeobj):
return sqltypes.adapt_type(typeobj, colspecs)
return FBIdentifierPreparer(self)
def has_table(self, connection, table_name):
- tblqry = """\
+ tblqry = """
SELECT count(*)
- FROM RDB$RELATIONS R
+ FROM RDB$RELATIONS R
WHERE R.RDB$RELATION_NAME=?"""
-
+
c = connection.execute(tblqry, [table_name.upper()])
row = c.fetchone()
if row[0] > 0:
13 : lambda r: sqltypes.Time(), # TIME
16 : lambda r: sqltypes.Numeric(precision=r['FPREC'], length=r['FSCALE'] * -1) #INT64
}
- tblqry = """\
+ tblqry = """
SELECT DISTINCT R.RDB$FIELD_NAME AS FNAME,
R.RDB$NULL_FLAG AS NULL_FLAG,
R.RDB$FIELD_POSITION,
F.RDB$FIELD_LENGTH AS FLEN,
F.RDB$FIELD_PRECISION AS FPREC,
F.RDB$FIELD_SCALE AS FSCALE
- FROM RDB$RELATION_FIELDS R
+ FROM RDB$RELATION_FIELDS R
JOIN RDB$FIELDS F ON R.RDB$FIELD_SOURCE=F.RDB$FIELD_NAME
WHERE F.RDB$SYSTEM_FLAG=0 and R.RDB$RELATION_NAME=?
ORDER BY R.RDB$FIELD_POSITION"""
- keyqry = """\
+ keyqry = """
SELECT SE.RDB$FIELD_NAME SENAME
FROM RDB$RELATION_CONSTRAINTS RC
JOIN RDB$INDEX_SEGMENTS SE
ON RC.RDB$INDEX_NAME=SE.RDB$INDEX_NAME
WHERE RC.RDB$CONSTRAINT_TYPE=? AND RC.RDB$RELATION_NAME=?"""
- fkqry = """\
+ fkqry = """
SELECT RC.RDB$CONSTRAINT_NAME CNAME,
CSE.RDB$FIELD_NAME FNAME,
IX2.RDB$RELATION_NAME RNAME,
JOIN RDB$INDEX_SEGMENTS CSE
ON CSE.RDB$INDEX_NAME=IX1.RDB$INDEX_NAME
JOIN RDB$INDEX_SEGMENTS SE
- ON SE.RDB$INDEX_NAME=IX2.RDB$INDEX_NAME
+ ON SE.RDB$INDEX_NAME=IX2.RDB$INDEX_NAME AND SE.RDB$FIELD_POSITION=CSE.RDB$FIELD_POSITION
WHERE RC.RDB$CONSTRAINT_TYPE=? AND RC.RDB$RELATION_NAME=?
ORDER BY SE.RDB$INDEX_NAME, SE.RDB$FIELD_POSITION"""
while row:
name = row['FNAME']
args = [lower_if_possible(name)]
-
+
kw = {}
# get the data types and lengths
args.append(column_func[row['FTYPE']](row))
for name,value in fks.iteritems():
table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1], name=name))
-
def last_inserted_ids(self):
return self.context.last_inserted_ids
-
def do_execute(self, cursor, statement, parameters, **kwargs):
cursor.execute(statement, parameters or [])
connection.commit(True)
def connection(self):
- """returns a managed DBAPI connection from this SQLEngine's connection pool."""
+ """Returns a managed DBAPI connection from this SQLEngine's connection pool."""
c = self._pool.connect()
c.supportsTransactions = 0
return c
-
+
def dbapi(self):
return self.module
class FBCompiler(ansisql.ANSICompiler):
- """firebird compiler modifies the lexical structure of Select statements to work under
- non-ANSI configured Firebird databases, if the use_ansi flag is False."""
-
- def __init__(self, dialect, statement, parameters, **kwargs):
- self._outertable = None
- super(FBCompiler, self).__init__(dialect, statement, parameters, **kwargs)
-
-
+ """Firebird specific idiosincrasies"""
+
def visit_alias(self, alias):
# Override to not use the AS keyword which FB 1.5 does not like
self.froms[alias] = self.get_from_text(alias.original) + " " + self.preparer.format_alias(alias)
self.strings[alias] = self.get_str(alias.original)
-
- def visit_column(self, column):
- return ansisql.ANSICompiler.visit_column(self, column)
-
-
def visit_function(self, func):
if len(func.clauses):
super(FBCompiler, self).visit_function(func)
else:
self.strings[func] = func.name
-
+
def visit_insert(self, insert):
- """inserts are required to have the primary keys be explicitly present.
+ """Inserts are required to have the primary keys be explicitly present.
mapper will by default not put them in the insert statement to comply
- with autoincrement fields that require they not be present. so,
+ with autoincrement fields that require they not be present. So,
put them all in for all primary key columns."""
for c in insert.table.primary_key:
if not self.parameters.has_key(c.key):
self.parameters[c.key] = None
return ansisql.ANSICompiler.visit_insert(self, insert)
-
+
def visit_select_precolumns(self, select):
- """ called when building a SELECT statment, position is just before column list
- Firebird puts the limit and offset right after the select...thanks for adding the
- visit_select_precolumns!!!"""
+ """Called when building a SELECT statement, position is just before column list
+ Firebird puts the limit and offset right after the select..."""
result = ""
if select.limit:
result += " FIRST %d " % select.limit
"""Already taken care of in the visit_select_precolumns method."""
return ""
+
class FBSchemaGenerator(ansisql.ANSISchemaGenerator):
def get_column_specification(self, column, **kwargs):
- colspec = self.preparer.format_column(column)
+ colspec = self.preparer.format_column(column)
colspec += " " + column.type.engine_impl(self.engine).get_col_spec()
+
default = self.get_column_default_string(column)
if default is not None:
colspec += " DEFAULT " + default
- if not column.nullable:
+ if not column.nullable or column.primary_key:
colspec += " NOT NULL"
+
return colspec
def visit_sequence(self, sequence):
self.append("CREATE GENERATOR %s" % sequence.name)
self.execute()
+
class FBSchemaDropper(ansisql.ANSISchemaDropper):
def visit_sequence(self, sequence):
self.append("DROP GENERATOR %s" % sequence.name)
self.execute()
+
class FBDefaultRunner(ansisql.ANSIDefaultRunner):
def exec_default_sql(self, default):
c = sql.select([default.arg], from_obj=["rdb$database"], engine=self.engine).compile()
return self.proxy(str(c), c.get_params()).fetchone()[0]
-
+
def visit_sequence(self, seq):
return self.proxy("SELECT gen_id(" + seq.name + ", 1) FROM rdb$database").fetchone()[0]
+
RESERVED_WORDS = util.Set(
["action", "active", "add", "admin", "after", "all", "alter", "and", "any",
"as", "asc", "ascending", "at", "auto", "autoddl", "avg", "based", "basename",
"variable", "varying", "version", "view", "wait", "wait_time", "weekday", "when",
"whenever", "where", "while", "with", "work", "write", "year", "yearday" ])
+
class FBIdentifierPreparer(ansisql.ANSIIdentifierPreparer):
def __init__(self, dialect):
super(FBIdentifierPreparer,self).__init__(dialect, omit_schema=True)
def _reserved_words(self):
return RESERVED_WORDS
-dialect = FireBirdDialect
+
+dialect = FBDialect