self._creator,
pool_size=self._pool.maxsize,
max_overflow=self._max_overflow,
+ pre_ping=self._pre_ping,
+ use_lifo=self._pool.use_lifo,
timeout=self._timeout,
recycle=self._recycle,
echo=self.echo,
echo=self.echo,
logging_name=self._orig_logging_name,
reset_on_return=self._reset_on_return,
+ pre_ping=self._pre_ping,
_dispatch=self.dispatch,
dialect=self._dialect,
)
pool_size=self.size,
recycle=self._recycle,
echo=self.echo,
+ pre_ping=self._pre_ping,
logging_name=self._orig_logging_name,
reset_on_return=self._reset_on_return,
_dispatch=self.dispatch,
creator=self._creator,
recycle=self._recycle,
reset_on_return=self._reset_on_return,
+ pre_ping=self._pre_ping,
echo=self.echo,
logging_name=self._orig_logging_name,
_dispatch=self.dispatch,
return self.__class__(
self._creator,
echo=self.echo,
+ pre_ping=self._pre_ping,
+ recycle=self._recycle,
+ reset_on_return=self._reset_on_return,
logging_name=self._orig_logging_name,
_dispatch=self.dispatch,
dialect=self._dialect,
from sqlalchemy import pool
from sqlalchemy import select
from sqlalchemy import testing
+from sqlalchemy.engine import default
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_context_ok
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_not
from sqlalchemy.testing import is_true
+from sqlalchemy.testing import mock
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.mock import ANY
from sqlalchemy.testing.mock import call
from sqlalchemy.testing.util import gc_collect
from sqlalchemy.testing.util import lazy_gc
-
join_timeout = 10
eq_(c2.mock_calls, [])
+ @testing.combinations(
+ (
+ pool.QueuePool,
+ dict(pool_size=8, max_overflow=10, timeout=25, use_lifo=True),
+ ),
+ (pool.QueuePool, {}),
+ (pool.NullPool, {}),
+ (pool.SingletonThreadPool, {}),
+ (pool.StaticPool, {}),
+ (pool.AssertionPool, {}),
+ )
+ def test_recreate_state(self, pool_cls, pool_args):
+ creator = object()
+ pool_args["pre_ping"] = True
+ pool_args["reset_on_return"] = "commit"
+ pool_args["recycle"] = 35
+ pool_args["logging_name"] = "somepool"
+ pool_args["dialect"] = default.DefaultDialect()
+ pool_args["echo"] = "debug"
+
+ p1 = pool_cls(creator=creator, **pool_args)
+
+ cls_keys = dir(pool_cls)
+
+ d1 = dict(p1.__dict__)
+
+ p2 = p1.recreate()
+
+ d2 = dict(p2.__dict__)
+
+ for k in cls_keys:
+ d1.pop(k, None)
+ d2.pop(k, None)
+
+ for k in (
+ "_threadconns",
+ "_invoke_creator",
+ "_pool",
+ "_overflow_lock",
+ "_fairy",
+ "_conn",
+ "logger",
+ ):
+ if k in d2:
+ d2[k] = mock.ANY
+
+ eq_(d1, d2)
+
+ eq_(p1.echo, p2.echo)
+ is_(p1._dialect, p2._dialect)
+
+ if "use_lifo" in pool_args:
+ eq_(p1._pool.use_lifo, p2._pool.use_lifo)
+
class PoolDialectTest(PoolTestBase):
def _dialect(self):