incompatible change for any application that may have been
using this feature, however the feature has never been
documented.
-
+
+- engine/pool
+ - Implemented recreate() for StaticPool.
+
+
0.5.4p2
=======
def dispose(self):
pass
-
+
+
class StaticPool(Pool):
"""A Pool of exactly one connection, used for all requests.
-
+
Reconnect-related functions such as ``recycle`` and connection
invalidation (which is also used to support auto-reconnect) are not
currently supported by this Pool implementation but may be implemented
in a future release.
-
+
"""
def __init__(self, creator, **params):
self._conn.close()
self._conn = None
+ def recreate(self):
+ self.log("Pool recreating")
+ return self.__class__(creator=self._creator,
+ recycle=self._recycle,
+ use_threadlocal=self._use_threadlocal,
+ reset_on_return=self._reset_on_return,
+ echo=self.echo,
+ listeners=self.listeners)
+
def create_connection(self):
return self._conn
c1 = p.connect()
assert c1.connection.id != c_id
-
-
-
+
+
+class StaticPoolTest(PoolTestBase):
+ def test_recreate(self):
+ dbapi = MockDBAPI()
+ creator = lambda: dbapi.connect('foo.db')
+ p = pool.StaticPool(creator)
+ p2 = p.recreate()
+ assert p._creator is p2._creator