- oracle
- datetime fixes: got subsecond TIMESTAMP to work [ticket:604],
added OracleDate which supports types.Date with only year/month/day
+ - added dialect flag "auto_convert_lobs", defaults to True; will cause any
+ LOB objects detected in a result set to be forced into OracleBinary
+ so that the LOB is read() automatically, if no typemap was present
+ (i.e., if a textual execute() was issued).
- sqlite
- sqlite better handles datetime/date/time objects mixed and matched
with various Date/Time/DateTime columns
def get_result_proxy(self):
if self.cursor.description is not None:
- for column in self.cursor.description:
- type_code = column[1]
- if type_code in self.dialect.ORACLE_BINARY_TYPES:
+ if self.dialect.auto_convert_lobs and self.typemap is None:
+ typemap = {}
+ binary = False
+ for column in self.cursor.description:
+ type_code = column[1]
+ if type_code in self.dialect.ORACLE_BINARY_TYPES:
+ binary = True
+ typemap[column[0].lower()] = OracleBinary()
+ self.typemap = typemap
+ if binary:
return base.BufferedColumnResultProxy(self)
+ else:
+ for column in self.cursor.description:
+ type_code = column[1]
+ if type_code in self.dialect.ORACLE_BINARY_TYPES:
+ return base.BufferedColumnResultProxy(self)
return base.ResultProxy(self)
class OracleDialect(ansisql.ANSIDialect):
- def __init__(self, use_ansi=True, auto_setinputsizes=True, threaded=True, **kwargs):
+ def __init__(self, use_ansi=True, auto_setinputsizes=True, auto_convert_lobs=True, threaded=True, **kwargs):
ansisql.ANSIDialect.__init__(self, default_paramstyle='named', **kwargs)
self.use_ansi = use_ansi
self.threaded = threaded
self.supports_timestamp = self.dbapi is None or hasattr(self.dbapi, 'TIMESTAMP' )
self.auto_setinputsizes = auto_setinputsizes
+ self.auto_convert_lobs = auto_convert_lobs
if self.dbapi is not None:
self.ORACLE_BINARY_TYPES = [getattr(self.dbapi, k) for k in ["BFILE", "CLOB", "NCLOB", "BLOB", "LONG_BINARY", "LONG_STRING"] if hasattr(self.dbapi, k)]
else:
Column('pickled', PickleType)
)
binary_table.create()
+
+ def tearDown(self):
+ binary_table.delete().execute()
+
def tearDownAll(self):
binary_table.drop()
binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_slice=stream1[0:100], pickled=testobj1)
binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99], pickled=testobj2)
binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None)
- l = binary_table.select().execute().fetchall()
+ l = binary_table.select(order_by=binary_table.c.primary_id).execute().fetchall()
+ print type(stream1), type(l[0]['data']), type(l[0]['data_slice'])
print len(stream1), len(l[0]['data']), len(l[0]['data_slice'])
self.assert_(list(stream1) == list(l[0]['data']))
self.assert_(list(stream1[0:100]) == list(l[0]['data_slice']))
f = os.path.join(os.path.dirname(testbase.__file__), name)
# put a number less than the typical MySQL default BLOB size
return file(f).read(len)
-
+
+ @testbase.supported('oracle')
+ def test_oracle_autobinary(self):
+ stream1 =self.load_stream('binary_data_one.dat')
+ stream2 =self.load_stream('binary_data_two.dat')
+ binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_slice=stream1[0:100])
+ binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99])
+ binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None)
+ result = testbase.db.connect().execute("select primary_id, misc, data, data_slice from binary_table")
+ l = result.fetchall()
+ l[0]['data']
+ self.assert_(list(stream1) == list(l[0]['data']))
+ self.assert_(list(stream1[0:100]) == list(l[0]['data_slice']))
+ self.assert_(list(stream2) == list(l[1]['data']))
+
+
class DateTest(AssertMixin):
def setUpAll(self):
global users_with_date, insert_data