From b81a0132a84a54559b7dbce075988dec62ed7da7 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Tue, 17 Apr 2007 00:21:25 +0000 Subject: [PATCH] added "recreate()" argument to connection pool classes this method is called when the invalidate() occurs for a disconnect condition, so that the entire pool is recreated, thereby avoiding repeat errors on remaining connections in the pool. dispose() called as well (also fixed up) but cant guarantee all connections closed. --- CHANGES | 7 +++++-- lib/sqlalchemy/engine/base.py | 2 ++ lib/sqlalchemy/engine/default.py | 5 ++--- lib/sqlalchemy/pool.py | 35 +++++++++++++++++++++++++++----- test/engine/pool.py | 8 ++++++++ 5 files changed, 47 insertions(+), 10 deletions(-) diff --git a/CHANGES b/CHANGES index d8875df085..42c109d99e 100644 --- a/CHANGES +++ b/CHANGES @@ -12,8 +12,11 @@ - server side cursor support fully functional in postgres [ticket:514]. - improved framework for auto-invalidation of connections that have - lost their underlying database - the error catching/invalidate - step is totally moved to the connection pool. #516 + lost their underlying database, via dialect-specific detection + of exceptions corresponding to that database's disconnect + related error messages. Additionally, when a "connection no + longer open" condition is detected, the entire connection pool + is discarded and replaced with a new instance. #516 - sql: - preliminary support for unicode table names, column names and SQL statements added, for databases which can support them. diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 6f0ff029a0..e7a3f8feb1 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -577,6 +577,7 @@ class Connection(Connectable): except Exception, e: if self.dialect.is_disconnect(e): self.__connection.invalidate(e=e) + self.engine.connection_provider.dispose() self._autorollback() if self.__close_with_result: self.close() @@ -588,6 +589,7 @@ class Connection(Connectable): except Exception, e: if self.dialect.is_disconnect(e): self.__connection.invalidate(e=e) + self.engine.connection_provider.dispose() self._autorollback() if self.__close_with_result: self.close() diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 969bde8d9d..f1858acdc0 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -20,9 +20,8 @@ class PoolConnectionProvider(base.ConnectionProvider): def dispose(self): self._pool.dispose() - if hasattr(self, '_dbproxy'): - self._dbproxy.dispose() - + self._pool = self._pool.recreate() + class DefaultDialect(base.Dialect): """Default implementation of Dialect""" diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index a617f8fecd..9a2cdad0e2 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -142,7 +142,20 @@ class Pool(object): def create_connection(self): return _ConnectionRecord(self) + + def recreate(self): + """return a new instance of this Pool's class with identical creation arguments.""" + raise NotImplementedError() + def dispose(self): + """dispose of this pool. + + this method leaves the possibility of checked-out connections remaining opened, + so it is advised to not reuse the pool once dispose() is called, and to instead + use a new pool constructed by the recreate() method. + """ + raise NotImplementedError() + def connect(self): if not self._use_threadlocal: return _ConnectionFairy(self).checkout() @@ -172,17 +185,15 @@ class Pool(object): def log(self, msg): self.logger.info(msg) - def dispose(self): - raise NotImplementedError() - class _ConnectionRecord(object): def __init__(self, pool): self.__pool = pool self.connection = self.__connect() def close(self): - self.__pool.log("Closing connection %s" % repr(self.connection)) - self.connection.close() + if self.connection is not None: + self.__pool.log("Closing connection %s" % repr(self.connection)) + self.connection.close() def invalidate(self, e=None): if e is not None: @@ -348,7 +359,17 @@ class SingletonThreadPool(Pool): self._conns = {} self.size = pool_size + def recreate(self): + self.log("Pool recreating") + return SingletonThreadPool(self._creator, pool_size=self.size, recycle=self._recycle, echo=self.echo, use_threadlocal=self._use_threadlocal, auto_close_cursors=self.auto_close_cursors, disallow_open_cursors=self.disallow_open_cursors) + def dispose(self): + """dispose of this pool. + + this method leaves the possibility of checked-out connections remaining opened, + so it is advised to not reuse the pool once dispose() is called, and to instead + use a new pool constructed by the recreate() method. + """ for key, conn in self._conns.items(): try: conn.close() @@ -426,6 +447,10 @@ class QueuePool(Pool): self._max_overflow = max_overflow self._timeout = timeout + def recreate(self): + self.log("Pool recreating") + return QueuePool(self._creator, pool_size=self._pool.maxsize, max_overflow=self._max_overflow, timeout=self._timeout, recycle=self._recycle, echo=self.echo, use_threadlocal=self._use_threadlocal, auto_close_cursors=self.auto_close_cursors, disallow_open_cursors=self.disallow_open_cursors) + def do_return_conn(self, conn): try: self._pool.put(conn, False) diff --git a/test/engine/pool.py b/test/engine/pool.py index db97ea6f8d..a2f7f9f35a 100644 --- a/test/engine/pool.py +++ b/test/engine/pool.py @@ -180,6 +180,14 @@ class PoolTest(PersistTest): c1 = p.connect() assert c1.connection.id != c_id + def test_recreate(self): + dbapi = MockDBAPI() + p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) + p2 = p.recreate() + assert p2.size() == 1 + assert p2._use_threadlocal is False + assert p2._max_overflow == 0 + def test_reconnect(self): dbapi = MockDBAPI() p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) -- 2.47.2