is assembled into a first_connect/connect pool listener by the
connection strategy if non-None. Provides a simpler interface
for dialects.
-
+
+ - StaticPool now initializes, disposes and recreates without
+ opening a new connection - the connection is only opened when
+ first requested. dispose() also works on AssertionPool now.
+ [ticket:1728]
+
- metadata
- Added the ability to strip schema information when using
"tometadata" by passing "schema=None" as an argument. If schema
"""
- def __init__(self, creator, **params):
- """
- Construct a StaticPool.
-
- :param creator: a callable function that returns a DB-API
- connection object. The function will be called with
- parameters.
-
- :param echo: If True, connections being pulled and retrieved
- from the pool will be logged to the standard output, as well
- as pool sizing information. Echoing can also be achieved by
- enabling logging for the "sqlalchemy.pool"
- namespace. Defaults to False.
-
- :param reset_on_return: If true, reset the database state of
- connections returned to the pool. This is typically a
- ROLLBACK to release locks and transaction resources.
- Disable at your own peril. Defaults to True.
-
- :param listeners: A list of
- :class:`~sqlalchemy.interfaces.PoolListener`-like objects or
- dictionaries of callables that receive events when DB-API
- connections are created, checked out and checked in to the
- pool.
+ @memoized_property
+ def _conn(self):
+ return self._creator()
- """
- Pool.__init__(self, creator, **params)
- self._conn = creator()
-
@memoized_property
def connection(self):
return _ConnectionRecord(self)
return "StaticPool"
def dispose(self):
- self._conn.close()
- self._conn = None
+ if '_conn' in self.__dict__:
+ self._conn.close()
+ self._conn = None
def recreate(self):
self.logger.info("Pool recreating")
def dispose(self):
self._checked_out = False
- self._conn.close()
+ if self._conn:
+ self._conn.close()
def recreate(self):
self.logger.info("Pool recreating")
expected = [(1,)]
for row in cursor:
eq_(row, expected.pop(0))
+
+ def test_no_connect_on_recreate(self):
+ def creator():
+ raise Exception("no creates allowed")
+
+ for cls in (pool.SingletonThreadPool, pool.StaticPool,
+ pool.QueuePool, pool.NullPool, pool.AssertionPool):
+ p = cls(creator=creator)
+ p.dispose()
+ p.recreate()
+ mock_dbapi = MockDBAPI()
+ p = cls(creator=mock_dbapi.connect)
+ conn = p.connect()
+ conn.close()
+ mock_dbapi.throw_error = True
+ p.dispose()
+ p.recreate()
+
+
def testthreadlocal_del(self):
self._do_testthreadlocal(useclose=False)