import cx_Oracle
except:
cx_Oracle = None
+
+ORACLE_BINARY_TYPES = [getattr(cx_Oracle, k) for k in ["BFILE", "CLOB", "NCLOB", "BLOB", "LONG_BINARY", "LONG_STRING"] if hasattr(cx_Oracle, k)]
+
class OracleNumeric(sqltypes.Numeric):
def get_col_spec(self):
if cursor and cursor.description:
for column in cursor.description:
type_code = column[1]
- if type_code in (cx_Oracle.BFILE, cx_Oracle.CLOB, cx_Oracle.NCLOB,
- cx_Oracle.BLOB, cx_Oracle.LONG_BINARY, cx_Oracle.LONG_STRING):
+ if type_code in ORACLE_BINARY_TYPES:
args['should_prefetch'] = True
break
return args
stream2 =self.load_stream('binary_data_two.dat')
binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_slice=stream1[0:100], pickled=testobj1)
binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99], pickled=testobj2)
- if db.name == 'oracle':
- res = binary_table.select().execute()
- l = []
- row = res.fetchone()
- l.append(dict([(k, row[k]) for k in row.keys()]))
- row = res.fetchone()
- l.append(dict([(k, row[k]) for k in row.keys()]))
- else:
- l = binary_table.select().execute().fetchall()
+ binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None)
+ l = binary_table.select().execute().fetchall()
print len(stream1), len(l[0]['data']), len(l[0]['data_slice'])
self.assert_(list(stream1) == list(l[0]['data']))
self.assert_(list(stream1[0:100]) == list(l[0]['data_slice']))