self.pool = pool
self.url = url
self.dialect = dialect
- self.pool._dialect = dialect
if logging_name:
self.logging_name = logging_name
self.echo = echo
"""
self.url = url
+ def handle_dialect_kwargs(self, dialect_cls, dialect_args):
+ """parse and modify dialect kwargs"""
+
+ def handle_pool_kwargs(self, pool_cls, pool_args):
+ """parse and modify pool kwargs"""
+
def engine_created(self, engine):
"""Receive the :class:`.Engine` object when it is fully constructed.
dialect_args['dbapi'] = dbapi
+ for plugin in plugins:
+ plugin.handle_dialect_kwargs(dialect_cls, dialect_args)
+
# create dialect
dialect = dialect_cls(**dialect_args)
poolclass = pop_kwarg('poolclass', None)
if poolclass is None:
poolclass = dialect_cls.get_pool_class(u)
- pool_args = {}
+ pool_args = {
+ 'dialect': dialect
+ }
# consume pool arguments from kwargs, translating a few of
# the arguments
tk = translate.get(k, k)
if tk in kwargs:
pool_args[k] = pop_kwarg(tk)
+
+ for plugin in plugins:
+ plugin.handle_pool_kwargs(poolclass, pool_args)
+
pool = poolclass(creator, **pool_args)
else:
if isinstance(pool, poollib._DBProxy):
else:
pool = pool
+ pool._dialect = dialect
+
# create engine.
engineclass = self.engine_cls
engine_args = {}
reset_on_return=True,
listeners=None,
events=None,
- _dispatch=None,
- _dialect=None):
+ dialect=None,
+ _dispatch=None):
"""
Construct a Pool.
pool. This has been superseded by
:func:`~sqlalchemy.event.listen`.
+ :param dialect: a :class:`.Dialect` that will handle the job
+ of calling rollback(), close(), or commit() on DBAPI connections.
+ If omitted, a built-in "stub" dialect is used. Applications that
+ make use of :func:`~.create_engine` should not use this parameter
+ as it is handled by the engine creation strategy.
+
+ .. versionadded:: 1.1 - ``dialect`` is now a public parameter
+ to the :class:`.Pool`.
+
"""
if logging_name:
self.logging_name = self._orig_logging_name = logging_name
if _dispatch:
self.dispatch._update(_dispatch, only_propagate=False)
- if _dialect:
- self._dialect = _dialect
+ if dialect:
+ self._dialect = dialect
if events:
for fn, target in events:
event.listen(self, target, fn)
"""
- def __init__(self, pool):
+ def __init__(self, pool, connect=True):
self.__pool = pool
- self.__connect(first_connect_check=True)
+ if connect:
+ self.__connect(first_connect_check=True)
self.finalize_callback = deque()
+ fairy_ref = None
+
+ starttime = None
+
connection = None
"""A reference to the actual DBAPI connection being tracked.
This dictionary is shared among the :attr:`._ConnectionFairy.info`
and :attr:`.Connection.info` accessors.
+ .. note::
+
+ The lifespan of this dictionary is linked to the
+ DBAPI connection itself, meaning that it is **discarded** each time
+ the DBAPI connection is closed and/or invalidated. The
+ :attr:`._ConnectionRecord.record_info` dictionary remains
+ persistent throughout the lifespan of the
+ :class:`._ConnectionRecord` container.
+
+ """
+ return {}
+
+ @util.memoized_property
+ def record_info(self):
+ """An "info' dictionary associated with the connection record
+ itself.
+
+ Unlike the :attr:`._ConnectionRecord.info` dictionary, which is linked
+ to the lifespan of the DBAPI connection, this dictionary is linked
+ to the lifespan of the :class:`._ConnectionRecord` container itself
+ and will remain persisent throughout the life of the
+ :class:`._ConnectionRecord`.
+
+ .. versionadded:: 1.1
+
"""
return {}
pool.dispatch.checkin(connection, self)
pool._return_conn(self)
+ @property
+ def in_use(self):
+ return self.fairy_ref is not None
+
+ @property
+ def last_connect_time(self):
+ return self.starttime
+
def close(self):
if self.connection is not None:
self.__close()
if self.__pool.dispatch.close:
self.__pool.dispatch.close(self.connection, self)
self.__pool._close_connection(self.connection)
+ self.connection = None
def __connect(self, first_connect_check=False):
pool = self.__pool
with the :attr:`._ConnectionRecord.info` and :attr:`.Connection.info`
accessors.
+ The dictionary associated with a particular DBAPI connection is
+ discarded when the connection itself is discarded.
+
"""
return self._connection_record.info
+ @property
+ def record_info(self):
+ """Info dictionary associated with the :class:`._ConnectionRecord
+ container referred to by this :class:`.ConnectionFairy`.
+
+ Unlike the :attr:`._ConnectionFairy.info` dictionary, the lifespan
+ of this dictionary is persistent across connections that are
+ disconnected and/or invalidated within the lifespan of a
+ :class:`._ConnectionRecord`.
+
+ .. versionadded:: 1.1
+
+ """
+ if self._connection_record:
+ return self._connection_record.record_info
+ else:
+ return None
+
def invalidate(self, e=None, soft=False):
"""Mark this connection as invalidated.
use_threadlocal=self._use_threadlocal,
reset_on_return=self._reset_on_return,
_dispatch=self.dispatch,
- _dialect=self._dialect)
+ dialect=self._dialect)
def dispose(self):
"""Dispose of this pool."""
use_threadlocal=self._use_threadlocal,
reset_on_return=self._reset_on_return,
_dispatch=self.dispatch,
- _dialect=self._dialect)
+ dialect=self._dialect)
def dispose(self):
while True:
use_threadlocal=self._use_threadlocal,
reset_on_return=self._reset_on_return,
_dispatch=self.dispatch,
- _dialect=self._dialect)
+ dialect=self._dialect)
def dispose(self):
pass
echo=self.echo,
logging_name=self._orig_logging_name,
_dispatch=self.dispatch,
- _dialect=self._dialect)
+ dialect=self._dialect)
def _create_connection(self):
return self._conn
return self.__class__(self._creator, echo=self.echo,
logging_name=self._orig_logging_name,
_dispatch=self.dispatch,
- _dialect=self._dialect)
+ dialect=self._dialect)
def _do_get(self):
if self._checked_out:
test_schema = None
test_schema_2 = None
_current = None
-_skip_test_exception = None
+
+try:
+ from unittest import SkipTest as _skip_test_exception
+except ImportError:
+ _skip_test_exception = None
class Config(object):
def skip_test(msg):
raise _skip_test_exception(msg)
+
else:
all_fails._expect_success(config._current)
- def _do(self, config, fn, *args, **kw):
+ def _do(self, cfg, fn, *args, **kw):
for skip in self.skips:
- if skip(config):
+ if skip(cfg):
msg = "'%s' : %s" % (
fn.__name__,
- skip._as_string(config)
+ skip._as_string(cfg)
)
config.skip_test(msg)
try:
return_value = fn(*args, **kw)
except Exception as ex:
- self._expect_failure(config, ex, name=fn.__name__)
+ self._expect_failure(cfg, ex, name=fn.__name__)
else:
- self._expect_success(config, name=fn.__name__)
+ self._expect_success(cfg, name=fn.__name__)
return return_value
def _expect_failure(self, config, ex, name='block'):
if negate:
bool_ = not negate
return self.description % {
- "driver": config.db.url.get_driver_name(),
- "database": config.db.url.get_backend_name(),
+ "driver": config.db.url.get_driver_name()
+ if config else "<no driver>",
+ "database": config.db.url.get_backend_name()
+ if config else "<no database>",
"doesnt_support": "doesn't support" if bool_ else "does support",
"does_support": "does support" if bool_ else "doesn't support"
}
if not db_urls:
db_urls.append(file_config.get('db', 'default'))
+ config._current = None
for db_url in db_urls:
cfg = provision.setup_config(
db_url, options, file_config, provision.FOLLOWER_IDENT)
"""
+import sys
+
from . import exclusions
from .. import util
"Stability issues with coverage + py3k"
)
+ @property
+ def python2(self):
+ return exclusions.skip_if(
+ lambda: sys.version_info >= (3,),
+ "Python version 2.xx is required."
+ )
+
+ @property
+ def python3(self):
+ return exclusions.skip_if(
+ lambda: sys.version_info < (3,),
+ "Python version 3.xx is required."
+ )
+
+ @property
+ def cpython(self):
+ return exclusions.only_if(
+ lambda: util.cpython,
+ "cPython interpreter needed"
+ )
+
+ @property
+ def non_broken_pickle(self):
+ from sqlalchemy.util import pickle
+ return exclusions.only_if(
+ lambda: not util.pypy and pickle.__name__ == 'cPickle'
+ or sys.version_info >= (3, 2),
+ "Needs cPickle+cPython or newer Python 3 pickle"
+ )
+
+ @property
+ def predictable_gc(self):
+ """target platform must remove all cycles unconditionally when
+ gc.collect() is called, as well as clean out unreferenced subclasses.
+
+ """
+ return self.cpython
+
@property
def no_coverage(self):
"""Test should be skipped if coverage is enabled.
from sqlalchemy.testing import fixtures
from sqlalchemy import testing
from sqlalchemy.testing.mock import Mock, MagicMock, call
+from sqlalchemy.testing import mock
from sqlalchemy.dialects import registry
from sqlalchemy.dialects import plugins
MyEnginePlugin.mock_calls,
[
call(e.url, {}),
+ call.handle_dialect_kwargs(sqlite.dialect, mock.ANY),
+ call.handle_pool_kwargs(mock.ANY, {"dialect": e.dialect}),
call.engine_created(e)
]
)
assert not c2.info
assert 'foo2' in c.info
+ def test_rec_info(self):
+ p = self._queuepool_fixture(pool_size=1, max_overflow=0)
+
+ c = p.connect()
+ self.assert_(not c.record_info)
+ self.assert_(c.record_info is c._connection_record.record_info)
+
+ c.record_info['foo'] = 'bar'
+ c.close()
+ del c
+
+ c = p.connect()
+ self.assert_('foo' in c.record_info)
+
+ c.invalidate()
+ c = p.connect()
+ self.assert_('foo' in c.record_info)
+
+ c.record_info['foo2'] = 'bar2'
+ c.detach()
+ is_(c.record_info, None)
+ is_(c._connection_record, None)
+
+ c2 = p.connect()
+
+ assert c2.record_info
+ assert 'foo2' in c2.record_info
+
+ def test_rec_unconnected(self):
+ # test production of a _ConnectionRecord with an
+ # initally unconnected state.
+
+ dbapi = MockDBAPI()
+ p1 = pool.Pool(
+ creator=lambda: dbapi.connect('foo.db')
+ )
+
+ r1 = pool._ConnectionRecord(p1, connect=False)
+
+ assert not r1.connection
+ c1 = r1.get_connection()
+ is_(c1, r1.connection)
+
+ def test_rec_close_reopen(self):
+ # test that _ConnectionRecord.close() allows
+ # the record to be reusable
+ dbapi = MockDBAPI()
+ p1 = pool.Pool(
+ creator=lambda: dbapi.connect('foo.db')
+ )
+
+ r1 = pool._ConnectionRecord(p1)
+
+ c1 = r1.connection
+ c2 = r1.get_connection()
+ is_(c1, c2)
+
+ r1.close()
+
+ assert not r1.connection
+ eq_(
+ c1.mock_calls,
+ [call.close()]
+ )
+
+ c2 = r1.get_connection()
+
+ is_not_(c1, c2)
+ is_(c2, r1.connection)
+
+ eq_(
+ c2.mock_calls,
+ []
+ )
+
class PoolDialectTest(PoolTestBase):
def _dialect(self):
def duplicate_key_raises_integrity_error(self):
return fails_on("postgresql+pg8000")
- @property
- def python2(self):
- return skip_if(
- lambda: sys.version_info >= (3,),
- "Python version 2.xx is required."
- )
-
- @property
- def python3(self):
- return skip_if(
- lambda: sys.version_info < (3,),
- "Python version 3.xx is required."
- )
-
- @property
- def cpython(self):
- return only_if(lambda: util.cpython,
- "cPython interpreter needed"
- )
-
-
- @property
- def non_broken_pickle(self):
- from sqlalchemy.util import pickle
- return only_if(
- lambda: not util.pypy and pickle.__name__ == 'cPickle'
- or sys.version_info >= (3, 2),
- "Needs cPickle+cPython or newer Python 3 pickle"
- )
-
-
- @property
- def predictable_gc(self):
- """target platform must remove all cycles unconditionally when
- gc.collect() is called, as well as clean out unreferenced subclasses.
-
- """
- return self.cpython
-
@property
def hstore(self):
def check_hstore(config):