From e249e56cb9430668dcbe6b30c62457b8086fbc66 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Fri, 8 Dec 2006 18:47:20 +0000 Subject: [PATCH] - fix to connection pool _close() to properly clean up, fixes MySQL synchronization errors [ticket:387] --- CHANGES | 2 ++ lib/sqlalchemy/engine/base.py | 2 +- lib/sqlalchemy/pool.py | 30 +++++++++++--------- test/engine/pool.py | 3 +- test/perf/wsgi.py | 53 +++++++++++++++++++++++++++++++++++ 5 files changed, 74 insertions(+), 16 deletions(-) create mode 100644 test/perf/wsgi.py diff --git a/CHANGES b/CHANGES index 9d3e5e8ed2..4cfcb56bf7 100644 --- a/CHANGES +++ b/CHANGES @@ -6,6 +6,8 @@ could be unnecessarily cascaded on the save/update cascade - MySQL detects errors 2006 (server has gone away) and 2014 (commands out of sync) and invalidates the connection on which it occured. +- fix to connection pool _close() to properly clean up, fixes +MySQL synchronization errors [ticket:387] - added keywords for EXCEPT, INTERSECT, EXCEPT ALL, INTERSECT ALL [ticket:247] - added label() function to Select class, when scalar=True is used diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 0d9509eaf5..e696950b95 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -594,7 +594,7 @@ class ResultProxy(object): self.cursor.close() if self.connection.should_close_with_result and self.dialect.supports_autoclose_results: self.connection.close() - + def _convert_key(self, key): """given a key, which could be a ColumnElement, string, etc., matches it to the appropriate key we got from the result set's metadata; then cache it locally for quick re-access.""" diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 8e74f0343a..68ab0be47b 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -10,7 +10,7 @@ on a thread local basis. Also provides a DBAPI2 transparency layer so that pool be managed automatically, based on module type and connect arguments, simply by calling regular DBAPI connect() methods.""" -import weakref, string, time, sys +import weakref, string, time, sys, traceback try: import cPickle as pickle except: @@ -187,7 +187,7 @@ class _ConnectionFairy(object): """proxies a DBAPI connection object and provides return-on-dereference support""" def __init__(self, pool): self._threadfairy = _ThreadFairy(self) - self.cursors = {} + self._cursors = {} self.__pool = pool self.__counter = 0 try: @@ -204,7 +204,7 @@ class _ConnectionFairy(object): raise exceptions.InvalidRequestError("This connection is closed") self._connection_record.invalidate() self.connection = None - self.cursors = None + self._cursors = None self._close() def cursor(self, *args, **kwargs): try: @@ -220,8 +220,8 @@ class _ConnectionFairy(object): self.__counter +=1 return self def close_open_cursors(self): - if self.cursors is not None: - for c in list(self.cursors): + if self._cursors is not None: + for c in list(self._cursors): c.close() def close(self): self.__counter -=1 @@ -230,37 +230,39 @@ class _ConnectionFairy(object): def __del__(self): self._close() def _close(self): - if self.cursors is not None: + if self._cursors is not None: # cursors should be closed before connection is returned to the pool. some dbapis like # mysql have real issues if they are not. if self.__pool.auto_close_cursors: self.close_open_cursors() elif self.__pool.disallow_open_cursors: - if len(self.cursors): - raise exceptions.InvalidRequestError("This connection still has %d open cursors" % len(self.cursors)) + if len(self._cursors): + raise exceptions.InvalidRequestError("This connection still has %d open cursors" % len(self._cursors)) if self.connection is not None: try: self.connection.rollback() except: - # damn mysql -- (todo look for NotSupportedError) - pass + if self._connection_record is not None: + self._connection_record.invalidate() if self._connection_record is not None: if self.__pool.echo: self.__pool.log("Connection %s being returned to pool" % repr(self.connection)) self.__pool.return_conn(self) + self.connection = None self._connection_record = None self._threadfairy = None - + self._cursors = None + class _CursorFairy(object): def __init__(self, parent, cursor): self.__parent = parent - self.__parent.cursors[self]=True + self.__parent._cursors[self]=True self.cursor = cursor def invalidate(self): self.__parent.invalidate() def close(self): - if self in self.__parent.cursors: - del self.__parent.cursors[self] + if self in self.__parent._cursors: + del self.__parent._cursors[self] self.cursor.close() def __getattr__(self, key): return getattr(self.cursor, key) diff --git a/test/engine/pool.py b/test/engine/pool.py index 59096c3641..08df106ce2 100644 --- a/test/engine/pool.py +++ b/test/engine/pool.py @@ -20,6 +20,8 @@ class MockConnection(object): mcid += 1 def close(self): pass + def rollback(self): + pass def cursor(self): return MockCursor() class MockCursor(object): @@ -170,7 +172,6 @@ class PoolTest(PersistTest): c1 = p.connect() c_id = c1.connection.id c1.close(); c1=None - c1 = p.connect() assert c1.connection.id == c_id c1.invalidate() diff --git a/test/perf/wsgi.py b/test/perf/wsgi.py new file mode 100644 index 0000000000..7068de1fde --- /dev/null +++ b/test/perf/wsgi.py @@ -0,0 +1,53 @@ +#!/usr/bin/python + +from sqlalchemy import * +import sqlalchemy.pool as pool +import thread +from sqlalchemy import exceptions + +import logging +logging.basicConfig() +logging.getLogger('sqlalchemy.pool').setLevel(logging.INFO) + +threadids = set() +#meta = BoundMetaData('postgres://scott:tiger@127.0.0.1/test') + +#meta = BoundMetaData('mysql://scott:tiger@localhost/test', poolclass=pool.SingletonThreadPool) +meta = BoundMetaData('mysql://scott:tiger@localhost/test') +foo = Table('foo', meta, + Column('id', Integer, primary_key=True), + Column('data', String(30))) + +meta.drop_all() +meta.create_all() + +data = [] +for x in range(1,500): + data.append({'id':x,'data':"this is x value %d" % x}) +foo.insert().execute(data) + +class Foo(object): + pass + +mapper(Foo, foo) + +root = './' +port = 8000 + +def serve(environ, start_response): + sess = create_session() + l = sess.query(Foo).select() + + start_response("200 OK", [('Content-type','text/plain')]) + threadids.add(thread.get_ident()) + print "sending response on thread", thread.get_ident(), " total threads ", len(threadids) + return ["\n".join([x.data for x in l])] + + +if __name__ == '__main__': + from wsgiutils import wsgiServer + server = wsgiServer.WSGIServer (('localhost', port), {'/': serve}) + print "Server listening on port %d" % port + server.serve_forever() + + -- 2.47.2