regarding "is subquery" and "correlation" pushed to SQL generation phase.
select() elements are now *never* mutated by their enclosing containers
or by any dialect's compilation process [ticket:52] [ticket:569]
+ - result sets make a better attempt at matching the DBAPI types present
+ in cursor.description to the TypeEngine objects defined by the dialect,
+ which are then used for result-processing. Note this only takes effect
+ for textual SQL; constructed SQL statements always have an explicit type map.
- result sets from CRUD operations close their underlying cursor immediately.
will also autoclose the connection if defined for the operation; this
allows more efficient usage of connections for successive CRUD operations
def get_result_proxy(self):
if self.cursor.description is not None:
- if self.dialect.auto_convert_lobs and self.typemap is None:
- typemap = {}
- binary = False
- for column in self.cursor.description:
- type_code = column[1]
- if type_code in self.dialect.ORACLE_BINARY_TYPES:
- binary = True
- typemap[column[0].lower()] = OracleBinary()
- self.typemap = typemap
- if binary:
+ for column in self.cursor.description:
+ type_code = column[1]
+ if type_code in self.dialect.ORACLE_BINARY_TYPES:
return base.BufferedColumnResultProxy(self)
- else:
- for column in self.cursor.description:
- type_code = column[1]
- if type_code in self.dialect.ORACLE_BINARY_TYPES:
- return base.BufferedColumnResultProxy(self)
return base.ResultProxy(self)
self.supports_timestamp = self.dbapi is None or hasattr(self.dbapi, 'TIMESTAMP' )
self.auto_setinputsizes = auto_setinputsizes
self.auto_convert_lobs = auto_convert_lobs
+
if self.dbapi is not None:
self.ORACLE_BINARY_TYPES = [getattr(self.dbapi, k) for k in ["BFILE", "CLOB", "NCLOB", "BLOB", "LONG_BINARY", "LONG_STRING"] if hasattr(self.dbapi, k)]
else:
util.coerce_kw_type(opts, 'use_ansi', bool)
return ([], opts)
+ def dbapi_type_map(self):
+ if self.auto_convert_lobs:
+ return super(OracleDialect, self).dbapi_type_map()
+ else:
+ return {}
+
def type_descriptor(self, typeobj):
return sqltypes.adapt_type(typeobj, colspecs)
raise NotImplementedError()
+ def dbapi_type_map(self):
+ """return a mapping of DBAPI type objects present in this Dialect's DBAPI
+ mapped to TypeEngine implementations used by the dialect.
+
+ This is used to apply types to result sets based on the DBAPI types
+ present in cursor.description; it only takes effect for result sets against
+ textual statements where no explicit typemap was present. Constructed SQL statements
+ always have type information explicitly embedded.
+ """
+
+ raise NotImplementedError()
+
def type_descriptor(self, typeobj):
"""Transform the given [sqlalchemy.types#TypeEngine] instance from generic to database-specific.
metadata = self.cursor.description
if metadata is not None:
+ typemap = self.dialect.dbapi_type_map()
+
for i, item in enumerate(metadata):
# sqlite possibly prepending table name to colnames so strip
colname = item[0].split('.')[-1]
if self.context.typemap is not None:
- type = self.context.typemap.get(colname.lower(), types.NULLTYPE)
+ type = self.context.typemap.get(colname.lower(), typemap.get(item[1], types.NULLTYPE))
else:
- type = types.NULLTYPE
+ type = typemap.get(item[1], types.NULLTYPE)
+
rec = (type, type.dialect_impl(self.dialect), i)
if rec[0] is None:
self._ischema = None
self.dbapi = dbapi
self._figure_paramstyle(paramstyle=paramstyle, default=default_paramstyle)
-
+ self._generate_dbapi_type_map()
+
+ def _generate_dbapi_type_map(self):
+ """locate all TypeEngine objects in the dialect's module and map them against the DBAPI
+ type they represent.
+
+ TODO: dialects should export this mapping explicitly, instead of relying upon
+ module searching.
+ """
+ dialect_module = sys.modules[self.__class__.__module__]
+ map = {}
+ for obj in dialect_module.__dict__.values():
+ if isinstance(obj, types.TypeEngine):
+ map[obj().get_dbapi_type(self.dialect)] = obj
+ self._dbapi_type_map = map
+
+ def dbapi_type_map(self):
+ return self._dbapi_type_map
+
def create_execution_context(self, **kwargs):
return DefaultExecutionContext(self, **kwargs)
binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_slice=stream1[0:100], pickled=testobj1)
binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99], pickled=testobj2)
binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None)
- l = binary_table.select(order_by=binary_table.c.primary_id).execute().fetchall()
- print type(stream1), type(l[0]['data']), type(l[0]['data_slice'])
- print len(stream1), len(l[0]['data']), len(l[0]['data_slice'])
- self.assert_(list(stream1) == list(l[0]['data']))
- self.assert_(list(stream1[0:100]) == list(l[0]['data_slice']))
- self.assert_(list(stream2) == list(l[1]['data']))
- self.assert_(testobj1 == l[0]['pickled'])
- self.assert_(testobj2 == l[1]['pickled'])
+
+ for stmt in (
+ binary_table.select(order_by=binary_table.c.primary_id),
+ text("select * from binary_table order by binary_table.primary_id", typemap={'pickled':PickleType}, engine=testbase.db)
+ ):
+ l = stmt.execute().fetchall()
+ print type(stream1), type(l[0]['data']), type(l[0]['data_slice'])
+ print len(stream1), len(l[0]['data']), len(l[0]['data_slice'])
+ self.assert_(list(stream1) == list(l[0]['data']))
+ self.assert_(list(stream1[0:100]) == list(l[0]['data_slice']))
+ self.assert_(list(stream2) == list(l[1]['data']))
+ self.assert_(testobj1 == l[0]['pickled'])
+ self.assert_(testobj2 == l[1]['pickled'])
def load_stream(self, name, len=12579):
f = os.path.join(os.path.dirname(testbase.__file__), name)
# put a number less than the typical MySQL default BLOB size
return file(f).read(len)
- @testbase.supported('oracle')
- def test_oracle_autobinary(self):
- stream1 =self.load_stream('binary_data_one.dat')
- stream2 =self.load_stream('binary_data_two.dat')
- binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_slice=stream1[0:100])
- binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99])
- binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None)
- result = testbase.db.connect().execute("select primary_id, misc, data, data_slice from binary_table")
- l = result.fetchall()
- l[0]['data']
- self.assert_(list(stream1) == list(l[0]['data']))
- self.assert_(list(stream1[0:100]) == list(l[0]['data_slice']))
- self.assert_(list(stream2) == list(l[1]['data']))
-
class DateTest(AssertMixin):
def setUpAll(self):