From: Mike Bayer Date: Sat, 24 Oct 2009 16:38:07 +0000 (+0000) Subject: - ResultProxy internals have been overhauled to greatly reduce X-Git-Tag: rel_0_6beta1~229 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=52b1ace6768a7f00b411d5f8b86ad8d1f84666b8;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - ResultProxy internals have been overhauled to greatly reduce method call counts when fetching columns that have no type-level processing applied. Provides a 100% speed improvement when fetching large result sets with no unicode conversion. Many thanks to Elixir's Gaëtan de Menten for this dramatic improvement ! [ticket:1586] --- diff --git a/CHANGES b/CHANGES index a5b759163e..1dd1c886b4 100644 --- a/CHANGES +++ b/CHANGES @@ -239,6 +239,13 @@ CHANGES - RowProxy objects are now pickleable, i.e. the object returned by result.fetchone(), result.fetchall() etc. + - ResultProxy internals have been overhauled to greatly reduce + method call counts when fetching columns that have no + type-level processing applied. Provides a 100% speed + improvement when fetching large result sets with no unicode + conversion. Many thanks to Elixir's Gaëtan de Menten + for this dramatic improvement ! [ticket:1586] + - setting echo=False on create_engine() now sets the loglevel to WARN instead of NOTSET. This so that logging can be disabled for a particular engine even if logging diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 643faa982b..e541399e8e 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -19,7 +19,7 @@ __all__ = [ 'RowProxy', 'SchemaIterator', 'StringIO', 'Transaction', 'TwoPhaseTransaction', 'connection_memoize'] -import inspect, StringIO, sys +import inspect, StringIO, sys, operator from sqlalchemy import exc, schema, util, types, log from sqlalchemy.sql import expression @@ -1485,13 +1485,14 @@ class RowProxy(object): results that correspond to constructed SQL expressions). """ - __slots__ = ['__parent', '__row'] + __slots__ = ['__parent', '__row', '__colfuncs'] def __init__(self, parent, row): """RowProxy objects are constructed by ResultProxy objects.""" self.__parent = parent self.__row = row + self.__colfuncs = parent._colfuncs if self.__parent._echo: self.__parent.context.engine.logger.debug("Row %r", row) @@ -1508,24 +1509,24 @@ class RowProxy(object): def __getstate__(self): return { - '__row':[self.__parent._get_col(self.__row, i) for i in xrange(len(self.__row))], + '__row':[self.__colfuncs[i][0](self.__row) for i in xrange(len(self.__row))], '__parent':PickledResultProxy(self.__parent) } def __setstate__(self, d): self.__row = d['__row'] self.__parent = d['__parent'] + self.__colfuncs = self.__parent._colfuncs def __iter__(self): - for i in xrange(len(self.__row)): - yield self.__parent._get_col(self.__row, i) + row = self.__row + for func in self.__parent._colfunc_list: + yield func(row) __hash__ = None def __eq__(self, other): - return ((other is self) or - (other == tuple(self.__parent._get_col(self.__row, key) - for key in xrange(len(self.__row))))) + return other is self or other == tuple(self) def __ne__(self, other): return not self.__eq__(other) @@ -1539,18 +1540,35 @@ class RowProxy(object): return self.__parent._has_key(self.__row, key) def __getitem__(self, key): - return self.__parent._get_col(self.__row, key) + # the fallback and slices are only useful for __getitem__ anyway + try: + return self.__colfuncs[key][0](self.__row) + except KeyError: + k = self.__parent._key_fallback(key) + if k is None: + raise exc.NoSuchColumnError( + "Could not locate column in row for column '%s'" % key) + else: + # save on KeyError + _key_fallback() lookup next time around + self.__colfuncs[key] = k + return k[0](self.__row) + except TypeError: + if isinstance(key, slice): + return tuple(func(self.__row) for func in self.__parent._colfunc_list[key]) + else: + raise def __getattr__(self, name): try: - return self.__parent._get_col(self.__row, name) + # TODO: no test coverage here + return self[name] except KeyError, e: raise AttributeError(e.args[0]) def items(self): """Return a list of tuples, each tuple containing a key/value pair.""" - - return [(key, getattr(self, key)) for key in self.iterkeys()] + # TODO: no coverage here + return [(key, self[key]) for key in self.iterkeys()] def keys(self): """Return the list of keys as strings represented by this RowProxy.""" @@ -1574,26 +1592,43 @@ class PickledResultProxy(object): _echo = False def __init__(self, resultproxy): - self._props = dict( - (k, resultproxy._props[k][2]) for k in resultproxy._props - if isinstance(k, (basestring, int)) - ) + self._pickled_colfuncs = \ + dict( + (key, (i, type_)) + for key, (fn, i, type_) in resultproxy._colfuncs.iteritems() + if isinstance(key, (basestring, int)) + ) self._keys = resultproxy.keys + + @util.memoized_property + def _colfuncs(self): + d = {} + for key, (index, type_) in self._pickled_colfuncs.iteritems(): + if type_ == 'ambiguous': + d[key] = (ResultProxy._ambiguous_processor(key), index, type_) + else: + d[key] = (operator.itemgetter(index), index, "itemgetter") + return d + + @util.memoized_property + def _colfunc_list(self): + funcs = self._colfuncs + return [funcs[i][0] for i in xrange(len(self.keys))] - def _fallback_key(self, key): - if key in self._props: - return self._props[key] + def _key_fallback(self, key): + if key in self._colfuncs: + return self._colfuncs[key] if isinstance(key, basestring): key = key.lower() - if key in self._props: - return self._props[key] + if key in self._colfuncs: + return self._colfuncs[key] if isinstance(key, expression.ColumnElement): - if key._label and key._label.lower() in self._props: - return self._props[key._label.lower()] - elif hasattr(key, 'name') and key.name.lower() in self._props: - return self._props[key.name.lower()] + if key._label and key._label.lower() in self._colfuncs: + return self._colfuncs[key._label.lower()] + elif hasattr(key, 'name') and key.name.lower() in self._colfuncs: + return self._colfuncs[key.name.lower()] return None @@ -1601,25 +1636,13 @@ class PickledResultProxy(object): pass def _has_key(self, row, key): - return self._fallback_key(key) is not None - - def _get_col(self, row, orig_key): - key = self._fallback_key(orig_key) - if key is None: - raise exc.NoSuchColumnError("Could not locate column in row for column '%s'" % orig_key) - return row[key] + return self._key_fallback(key) is not None @property def keys(self): return self._keys -class BufferedColumnRow(RowProxy): - def __init__(self, parent, row): - row = [ResultProxy._get_col(parent, row, i) for i in xrange(len(row))] - super(BufferedColumnRow, self).__init__(parent, row) - - class ResultProxy(object): """Wraps a DB-API cursor object to provide easier access to row columns. @@ -1716,8 +1739,7 @@ class ResultProxy(object): if metadata is None: return - self._props = util.populate_column_dict(None) - self._props.creator = self.__key_fallback() + self._colfuncs = colfuncs = {} self.keys = [] typemap = self.dialect.dbapi_type_map @@ -1728,7 +1750,8 @@ class ResultProxy(object): colname = colname.decode(self.dialect.description_encoding) if '.' in colname: - # sqlite will in some circumstances prepend table name to colnames, so strip + # sqlite will in some circumstances prepend table name to + # colnames, so strip origname = colname colname = colname.split('.')[-1] else: @@ -1736,61 +1759,82 @@ class ResultProxy(object): if self.context.result_map: try: - (name, obj, type_) = self.context.result_map[colname.lower()] + name, obj, type_ = self.context.result_map[colname.lower()] except KeyError: - (name, obj, type_) = (colname, None, typemap.get(coltype, types.NULLTYPE)) + name, obj, type_ = \ + colname, None, typemap.get(coltype, types.NULLTYPE) else: - (name, obj, type_) = (colname, None, typemap.get(coltype, types.NULLTYPE)) + name, obj, type_ = (colname, None, typemap.get(coltype, types.NULLTYPE)) - rec = (type_, type_.dialect_impl(self.dialect).result_processor(self.dialect), i) - - if self._props.setdefault(name.lower(), rec) is not rec: - self._props[name.lower()] = (type_, self.__ambiguous_processor(name), 0) + processor = type_.dialect_impl(self.dialect).\ + result_processor(self.dialect) + + if processor: + def make_colfunc(processor, index): + def getcol(row): + return processor(row[index]) + return getcol + rec = (make_colfunc(processor, i), i, "colfunc") + else: + rec = (operator.itemgetter(i), i, "itemgetter") + # indexes as keys + colfuncs[i] = rec + + # Column names as keys + if colfuncs.setdefault(name.lower(), rec) is not rec: + #XXX: why not raise directly? because several columns colliding + #by name is not a problem as long as the user don't use them (ie + #use the more precise ColumnElement + colfuncs[name.lower()] = (self._ambiguous_processor(name), i, "ambiguous") + # store the "origname" if we truncated (sqlite only) - if origname: - if self._props.setdefault(origname.lower(), rec) is not rec: - self._props[origname.lower()] = (type_, self.__ambiguous_processor(origname), 0) + if origname and \ + colfuncs.setdefault(origname.lower(), rec) is not rec: + colfuncs[name.lower()] = (self._ambiguous_processor(origname), i, "ambiguous") if self.dialect.requires_name_normalize: colname = self.dialect.normalize_name(colname) self.keys.append(colname) - self._props[i] = rec if obj: for o in obj: - self._props[o] = rec + colfuncs[o] = rec if self._echo: self.context.engine.logger.debug( "Col %r", tuple(x[0] for x in metadata)) - def __key_fallback(self): - # create a closure without 'self' to avoid circular references - props = self._props - - def fallback(key): - if isinstance(key, basestring): - key = key.lower() - if key in props: - return props[key] - - # fallback for targeting a ColumnElement to a textual expression - # this is a rare use case which only occurs when matching text() - # constructs to ColumnElements - if isinstance(key, expression.ColumnElement): - if key._label and key._label.lower() in props: - return props[key._label.lower()] - elif hasattr(key, 'name') and key.name.lower() in props: - return props[key.name.lower()] - - raise exc.NoSuchColumnError("Could not locate column in row for column '%s'" % key) - return fallback - - def __ambiguous_processor(self, colname): + @util.memoized_property + def _colfunc_list(self): + funcs = self._colfuncs + return [funcs[i][0] for i in xrange(len(self._metadata))] + + def _key_fallback(self, key): + funcs = self._colfuncs + + if isinstance(key, basestring): + key = key.lower() + if key in funcs: + return funcs[key] + + # fallback for targeting a ColumnElement to a textual expression + # this is a rare use case which only occurs when matching text() + # constructs to ColumnElements + if isinstance(key, expression.ColumnElement): + if key._label and key._label.lower() in funcs: + return funcs[key._label.lower()] + elif hasattr(key, 'name') and key.name.lower() in funcs: + return funcs[key.name.lower()] + + return None + + @classmethod + def _ambiguous_processor(cls, colname): def process(value): - raise exc.InvalidRequestError("Ambiguous column name '%s' in result set! " - "try 'use_labels' option on select statement." % colname) + raise exc.InvalidRequestError( + "Ambiguous column name '%s' in result set! " + "try 'use_labels' option on select statement." % colname) return process def close(self): @@ -1818,13 +1862,11 @@ class ResultProxy(object): self.connection.close() def _has_key(self, row, key): - try: - # _key_cache uses __missing__ in 2.5, so not much alternative - # to catching KeyError - self._props[key] + if key in self._colfuncs: return True - except KeyError: - return False + else: + key = self._key_fallback(key) + return key is not None def __iter__(self): while True: @@ -1900,23 +1942,6 @@ class ResultProxy(object): return self.dialect.supports_sane_multi_rowcount - def _get_col(self, row, key): - try: - type_, processor, index = self._props[key] - except TypeError: - # the 'slice' use case is very infrequent, - # so we use an exception catch to reduce conditionals in _get_col - if isinstance(key, slice): - indices = key.indices(len(row)) - return tuple(self._get_col(row, i) for i in xrange(*indices)) - else: - raise - - if processor: - return processor(row[index]) - else: - return row[index] - def _fetchone_impl(self): return self.cursor.fetchone() @@ -2102,6 +2127,11 @@ class FullyBufferedResultProxy(ResultProxy): self.__rowbuffer = [] return ret +class BufferedColumnRow(RowProxy): + def __init__(self, parent, row): + row = [parent._orig_colfuncs[i][0](row) for i in xrange(len(row))] + super(BufferedColumnRow, self).__init__(parent, row) + class BufferedColumnResultProxy(ResultProxy): """A ResultProxy with column buffering behavior. @@ -2109,28 +2139,30 @@ class BufferedColumnResultProxy(ResultProxy): fetchone() is called. If fetchmany() or fetchall() are called, the full grid of results is fetched. This is to operate with databases where result rows contain "live" results that fall out - of scope unless explicitly fetched. Currently this includes just - cx_Oracle LOB objects, but this behavior is known to exist in - other DB-APIs as well (Pygresql, currently unsupported). + of scope unless explicitly fetched. Currently this includes + cx_Oracle LOB objects. """ _process_row = BufferedColumnRow - def _get_col(self, row, key): - try: - rec = self._props[key] - return row[rec[2]] - except TypeError: - # the 'slice' use case is very infrequent, - # so we use an exception catch to reduce conditionals in _get_col - if isinstance(key, slice): - indices = key.indices(len(row)) - return tuple(self._get_col(row, i) for i in xrange(*indices)) + def _init_metadata(self): + super(BufferedColumnResultProxy, self)._init_metadata() + self._orig_colfuncs = self._colfuncs + self._colfuncs = colfuncs = {} + # replace the parent's _colfuncs dict, replacing + # column processors with straight itemgetters. + # the original _colfuncs dict is used when each row + # is constructed. + for k, (colfunc, index, type_) in self._orig_colfuncs.iteritems(): + if type_ == "colfunc": + colfuncs[k] = (operator.itemgetter(index), index, "itemgetter") else: - raise + colfuncs[k] = (colfunc, index, type_) def fetchall(self): + # can't call cursor.fetchall(), since rows must be + # fully processed before requesting more from the DBAPI. l = [] while True: row = self.fetchone() @@ -2140,6 +2172,8 @@ class BufferedColumnResultProxy(ResultProxy): return l def fetchmany(self, size=None): + # can't call cursor.fetchmany(), since rows must be + # fully processed before requesting more from the DBAPI. if size is None: return self.fetchall() l = [] @@ -2150,9 +2184,6 @@ class BufferedColumnResultProxy(ResultProxy): l.append(row) return l - - - def connection_memoize(key): """Decorator, memoize a function in a connection.info stash. diff --git a/test/aaa_profiling/test_resultset.py b/test/aaa_profiling/test_resultset.py new file mode 100644 index 0000000000..d67f9560c9 --- /dev/null +++ b/test/aaa_profiling/test_resultset.py @@ -0,0 +1,39 @@ +from sqlalchemy import * +from sqlalchemy.test import * + +NUM_FIELDS = 10 +NUM_RECORDS = 1000 + +class ResultSetTest(TestBase, AssertsExecutionResults): + __only_on__ = 'sqlite' + + @classmethod + def setup_class(cls): + global t, t2, metadata + + metadata = MetaData(testing.db) + t = Table('table', metadata, *[Column("field%d" % fnum, String) for fnum in range(NUM_FIELDS)]) + t2 = Table('table2', metadata, *[Column("field%d" % fnum, Unicode) for fnum in range(NUM_FIELDS)]) + + def setup(self): + metadata.create_all() + t.insert().execute( + [dict(("field%d" % fnum, u"value%d" % fnum) + for fnum in range(NUM_FIELDS)) for r_num in range(NUM_RECORDS)] + ) + t2.insert().execute( + [dict(("field%d" % fnum, u"value%d" % fnum) + for fnum in range(NUM_FIELDS)) for r_num in range(NUM_RECORDS)] + ) + + def teardown(self): + metadata.drop_all() + + @profiling.function_call_count(14416) + def test_string(self): + [tuple(row) for row in t.select().execute().fetchall()] + + @profiling.function_call_count(44406) + def test_unicode(self): + [tuple(row) for row in t2.select().execute().fetchall()] + diff --git a/test/sql/test_query.py b/test/sql/test_query.py index 05c61ee47f..0e1787e9e5 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -2,7 +2,7 @@ from sqlalchemy.test.testing import eq_ import datetime from sqlalchemy import * from sqlalchemy import exc, sql, util -from sqlalchemy.engine import default +from sqlalchemy.engine import default, base from sqlalchemy.test import * from sqlalchemy.test.testing import eq_, assert_raises_message, assert_raises from sqlalchemy.test.schema import Table, Column @@ -614,6 +614,23 @@ class QueryTest(TestBase): lambda: r['user_id'] ) + r = util.pickle.loads(util.pickle.dumps(r)) + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: r['user_id'] + ) + + result = users.outerjoin(addresses).select().execute() + result = base.BufferedColumnResultProxy(result.context) + r = result.first() + assert isinstance(r, base.BufferedColumnRow) + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: r['user_id'] + ) + @testing.requires.subqueries def test_column_label_targeting(self): users.insert().execute(user_id=7, user_name='ed')