From: Mike Bayer Date: Sun, 1 Jul 2007 19:19:56 +0000 (+0000) Subject: moved oracles "auto_convert_lobs" logic into a generic dialect X-Git-Tag: rel_0_4_6~142 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b0b5e9f2e39cc5bc593749e3eebd146d4e55e36e;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git moved oracles "auto_convert_lobs" logic into a generic dialect function which attempts to map DBAPI types to TypeEngine instances at result time. This only occurs for statements that have no typemaps (i.e. textual statements). --- diff --git a/CHANGES b/CHANGES index a4a02eb323..135f35bd43 100644 --- a/CHANGES +++ b/CHANGES @@ -85,6 +85,10 @@ regarding "is subquery" and "correlation" pushed to SQL generation phase. select() elements are now *never* mutated by their enclosing containers or by any dialect's compilation process [ticket:52] [ticket:569] + - result sets make a better attempt at matching the DBAPI types present + in cursor.description to the TypeEngine objects defined by the dialect, + which are then used for result-processing. Note this only takes effect + for textual SQL; constructed SQL statements always have an explicit type map. - result sets from CRUD operations close their underlying cursor immediately. will also autoclose the connection if defined for the operation; this allows more efficient usage of connections for successive CRUD operations diff --git a/lib/sqlalchemy/databases/oracle.py b/lib/sqlalchemy/databases/oracle.py index 43bd3a1731..4add288ee4 100644 --- a/lib/sqlalchemy/databases/oracle.py +++ b/lib/sqlalchemy/databases/oracle.py @@ -163,22 +163,10 @@ class OracleExecutionContext(default.DefaultExecutionContext): def get_result_proxy(self): if self.cursor.description is not None: - if self.dialect.auto_convert_lobs and self.typemap is None: - typemap = {} - binary = False - for column in self.cursor.description: - type_code = column[1] - if type_code in self.dialect.ORACLE_BINARY_TYPES: - binary = True - typemap[column[0].lower()] = OracleBinary() - self.typemap = typemap - if binary: + for column in self.cursor.description: + type_code = column[1] + if type_code in self.dialect.ORACLE_BINARY_TYPES: return base.BufferedColumnResultProxy(self) - else: - for column in self.cursor.description: - type_code = column[1] - if type_code in self.dialect.ORACLE_BINARY_TYPES: - return base.BufferedColumnResultProxy(self) return base.ResultProxy(self) @@ -190,6 +178,7 @@ class OracleDialect(ansisql.ANSIDialect): self.supports_timestamp = self.dbapi is None or hasattr(self.dbapi, 'TIMESTAMP' ) self.auto_setinputsizes = auto_setinputsizes self.auto_convert_lobs = auto_convert_lobs + if self.dbapi is not None: self.ORACLE_BINARY_TYPES = [getattr(self.dbapi, k) for k in ["BFILE", "CLOB", "NCLOB", "BLOB", "LONG_BINARY", "LONG_STRING"] if hasattr(self.dbapi, k)] else: @@ -222,6 +211,12 @@ class OracleDialect(ansisql.ANSIDialect): util.coerce_kw_type(opts, 'use_ansi', bool) return ([], opts) + def dbapi_type_map(self): + if self.auto_convert_lobs: + return super(OracleDialect, self).dbapi_type_map() + else: + return {} + def type_descriptor(self, typeobj): return sqltypes.adapt_type(typeobj, colspecs) diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 5de64f9144..467aed55df 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -56,6 +56,18 @@ class Dialect(sql.AbstractDialect): raise NotImplementedError() + def dbapi_type_map(self): + """return a mapping of DBAPI type objects present in this Dialect's DBAPI + mapped to TypeEngine implementations used by the dialect. + + This is used to apply types to result sets based on the DBAPI types + present in cursor.description; it only takes effect for result sets against + textual statements where no explicit typemap was present. Constructed SQL statements + always have type information explicitly embedded. + """ + + raise NotImplementedError() + def type_descriptor(self, typeobj): """Transform the given [sqlalchemy.types#TypeEngine] instance from generic to database-specific. @@ -945,13 +957,16 @@ class ResultProxy(object): metadata = self.cursor.description if metadata is not None: + typemap = self.dialect.dbapi_type_map() + for i, item in enumerate(metadata): # sqlite possibly prepending table name to colnames so strip colname = item[0].split('.')[-1] if self.context.typemap is not None: - type = self.context.typemap.get(colname.lower(), types.NULLTYPE) + type = self.context.typemap.get(colname.lower(), typemap.get(item[1], types.NULLTYPE)) else: - type = types.NULLTYPE + type = typemap.get(item[1], types.NULLTYPE) + rec = (type, type.dialect_impl(self.dialect), i) if rec[0] is None: diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 3633473cd5..25cfad11ee 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -22,7 +22,25 @@ class DefaultDialect(base.Dialect): self._ischema = None self.dbapi = dbapi self._figure_paramstyle(paramstyle=paramstyle, default=default_paramstyle) - + self._generate_dbapi_type_map() + + def _generate_dbapi_type_map(self): + """locate all TypeEngine objects in the dialect's module and map them against the DBAPI + type they represent. + + TODO: dialects should export this mapping explicitly, instead of relying upon + module searching. + """ + dialect_module = sys.modules[self.__class__.__module__] + map = {} + for obj in dialect_module.__dict__.values(): + if isinstance(obj, types.TypeEngine): + map[obj().get_dbapi_type(self.dialect)] = obj + self._dbapi_type_map = map + + def dbapi_type_map(self): + return self._dbapi_type_map + def create_execution_context(self, **kwargs): return DefaultExecutionContext(self, **kwargs) diff --git a/test/sql/testtypes.py b/test/sql/testtypes.py index bb2e31a158..cd10bed812 100644 --- a/test/sql/testtypes.py +++ b/test/sql/testtypes.py @@ -244,34 +244,25 @@ class BinaryTest(AssertMixin): binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_slice=stream1[0:100], pickled=testobj1) binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99], pickled=testobj2) binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None) - l = binary_table.select(order_by=binary_table.c.primary_id).execute().fetchall() - print type(stream1), type(l[0]['data']), type(l[0]['data_slice']) - print len(stream1), len(l[0]['data']), len(l[0]['data_slice']) - self.assert_(list(stream1) == list(l[0]['data'])) - self.assert_(list(stream1[0:100]) == list(l[0]['data_slice'])) - self.assert_(list(stream2) == list(l[1]['data'])) - self.assert_(testobj1 == l[0]['pickled']) - self.assert_(testobj2 == l[1]['pickled']) + + for stmt in ( + binary_table.select(order_by=binary_table.c.primary_id), + text("select * from binary_table order by binary_table.primary_id", typemap={'pickled':PickleType}, engine=testbase.db) + ): + l = stmt.execute().fetchall() + print type(stream1), type(l[0]['data']), type(l[0]['data_slice']) + print len(stream1), len(l[0]['data']), len(l[0]['data_slice']) + self.assert_(list(stream1) == list(l[0]['data'])) + self.assert_(list(stream1[0:100]) == list(l[0]['data_slice'])) + self.assert_(list(stream2) == list(l[1]['data'])) + self.assert_(testobj1 == l[0]['pickled']) + self.assert_(testobj2 == l[1]['pickled']) def load_stream(self, name, len=12579): f = os.path.join(os.path.dirname(testbase.__file__), name) # put a number less than the typical MySQL default BLOB size return file(f).read(len) - @testbase.supported('oracle') - def test_oracle_autobinary(self): - stream1 =self.load_stream('binary_data_one.dat') - stream2 =self.load_stream('binary_data_two.dat') - binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_slice=stream1[0:100]) - binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99]) - binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None) - result = testbase.db.connect().execute("select primary_id, misc, data, data_slice from binary_table") - l = result.fetchall() - l[0]['data'] - self.assert_(list(stream1) == list(l[0]['data'])) - self.assert_(list(stream1[0:100]) == list(l[0]['data_slice'])) - self.assert_(list(stream2) == list(l[1]['data'])) - class DateTest(AssertMixin): def setUpAll(self):