sqltypes.CHAR: OracleChar,
}
+
+ischema_names = {
+ 'VARCHAR2' : OracleString,
+ 'DATE' : OracleDateTime,
+ 'DATETIME' : OracleDateTime,
+ 'NUMBER' : OracleNumeric,
+ 'BLOB' : OracleBinary,
+ 'CLOB' : OracleText
+}
+
def engine(*args, **params):
return OracleSQLEngine(*args, **params)
return OracleDefaultRunner(self, proxy)
def reflecttable(self, table):
- raise "not implemented"
+ c = self.execute ("select COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE, DATA_DEFAULT from USER_TAB_COLUMNS where TABLE_NAME = :table_name", {'table_name':table.name})
+
+ while True:
+ row = c.fetchone()
+ if row is None:
+ break
+
+ (name, coltype, length, precision, scale, nullable, default) = (row[0], row[1], row[2], row[3], row[4], row[5]=='Y', row[6])
+
+ # INTEGER if the scale is 0 and precision is null
+ # NUMBER if the scale and precision are both null
+ # NUMBER(9,2) if the precision is 9 and the scale is 2
+ # NUMBER(3) if the precision is 3 and scale is 0
+ #length is ignored except for CHAR and VARCHAR2
+ if coltype=='NUMBER' :
+ if precision is None and scale is None:
+ coltype = OracleNumeric
+ elif precision is None and scale == 0 :
+ coltype = OracleInteger
+ else :
+ coltype = OracleNumeric(precision, scale)
+ elif coltype=='CHAR' or coltype=='VARCHAR2':
+ coltype = ischema_names.get(coltype, OracleString)(length)
+ else:
+ coltype = ischema_names.get(coltype)
+
+ colargs = []
+ if default is not None:
+ colargs.append(PassiveDefault(sql.text(default, escape=False)))
+
+ name = name.lower()
+
+ table.append_item (schema.Column(name, coltype, nullable=nullable, *colargs))
+
+
+ c = self.execute("""select UCC.CONSTRAINT_NAME, UCC.COLUMN_NAME, UC.CONSTRAINT_TYPE, UC.SEARCH_CONDITION, UC2.TABLE_NAME as REFERENCES_TABLE
+from USER_CONS_COLUMNS UCC, USER_CONSTRAINTS UC, USER_CONSTRAINTS UC2
+where UCC.CONSTRAINT_NAME = UC.CONSTRAINT_NAME
+and UC.R_CONSTRAINT_NAME = UC2.CONSTRAINT_NAME(+)
+and UCC.TABLE_NAME = :table_name
+order by UCC.CONSTRAINT_NAME""",{'table_name' : table.name})
+ while True:
+ row = c.fetchone()
+ if row is None:
+ break
+
+ (cons_name, column_name, type, search, referred_table) = row
+ if type=='P' :
+ table.c[column_name.lower()]._set_primary_key()
+ elif type=='R':
+ remotetable = Table(referred_table, referred_table.engine, autoload = True)
+ table.c[column_name.lower()].append_item(schema.ForeignKey(remotetable.primary_key[0]))
def last_inserted_ids(self):
return self.context.last_inserted_ids