"buffered" result sets used for different purposes.
- server side cursor support fully functional in postgres
[ticket:514].
+ - improved framework for auto-invalidation of connections that have
+ lost their underlying database - the error catching/invalidate
+ step is totally moved to the connection pool. #516
- sql:
- the Unicode type is now a direct subclass of String, which now
contains all the "convert_unicode" logic. This helps the variety
def do_rollback(self, connection):
# pymssql throws an error on repeated rollbacks. Ignore it.
+ # TODO: this is normal behavior for most DBs. are we sure we want to ignore it ?
try:
connection.rollback()
except:
del keys['port']
return [[], keys]
+ def get_disconnect_checker(self):
+ def disconnect_checker(e):
+ return isinstance(e, self.dbapi.DatabaseError) and "Error 10054" in str(e)
+ return disconnect_checker
+
## This code is leftover from the initial implementation, for reference
## def do_begin(self, connection):
connectors.append ("TrustedConnection=Yes")
return [[";".join (connectors)], {}]
+ def get_disconnect_checker(self):
+ def disconnect_checker(e):
+ return isinstance(e, self.dbapi.Error) and '[08S01]' in e.args[1]
+ return disconnect_checker
+
class MSSQLDialect_adodbapi(MSSQLDialect):
def import_dbapi(cls):
connectors.append("Integrated Security=SSPI")
return [[";".join (connectors)], {}]
+ def get_disconnect_checker(self):
+ def disconnect_checker(e):
+ return isinstance(e, self.dbapi.adodbapi.DatabaseError) and "'connection failure'" in str(e)
+ return disconnect_checker
+
dialect_mapping = {
'pymssql': MSSQLDialect_pymssql,
'pyodbc': MSSQLDialect_pyodbc,
return MySQLIdentifierPreparer(self)
def do_executemany(self, cursor, statement, parameters, context=None, **kwargs):
- try:
- rowcount = cursor.executemany(statement, parameters)
- if context is not None:
- context._rowcount = rowcount
- except self.dbapi.OperationalError, o:
- if o.args[0] == 2006 or o.args[0] == 2014:
- cursor.invalidate()
- raise o
+ rowcount = cursor.executemany(statement, parameters)
+ if context is not None:
+ context._rowcount = rowcount
+
def do_execute(self, cursor, statement, parameters, **kwargs):
- try:
- cursor.execute(statement, parameters)
- except self.dbapi.OperationalError, o:
- if o.args[0] == 2006 or o.args[0] == 2014:
- cursor.invalidate()
- raise o
+ cursor.execute(statement, parameters)
def do_rollback(self, connection):
# MySQL without InnoDB doesnt support rollback()
except:
pass
+ def get_disconnect_checker(self):
+ def disconnect_checker(e):
+ return isinstance(e, self.dbapi.OperationalError) and e.args[0] in (2006, 2014)
+ return disconnect_checker
+
+
def get_default_schema_name(self):
if not hasattr(self, '_default_schema_name'):
self._default_schema_name = text("select database()", self).scalar()
cursor = connection.execute('''SELECT relname FROM pg_class WHERE relkind = 'S' AND relnamespace IN ( SELECT oid FROM pg_namespace WHERE nspname NOT LIKE 'pg_%%' AND nspname != 'information_schema' AND relname = %(seqname)s);''', {'seqname': sequence_name})
return bool(not not cursor.rowcount)
+ def get_disconnect_checker(self):
+ def disconnect_checker(e):
+ if isinstance(e, self.dbapi.OperationalError):
+ return 'closed the connection' in str(e) or 'connection not open' in str(e)
+ elif isinstance(e, self.dbapi.InterfaceError):
+ return 'connection already closed' in str(e)
+ elif isinstance(e, self.dbapi.ProgrammingError):
+ # yes, it really says "losed", not "closed"
+ return "losed the connection unexpectedly" in str(e)
+ else:
+ return False
+ return disconnect_checker
+
def reflecttable(self, connection, table):
if self.version == 2:
ischema_names = pg2_ischema_names
return clauseelement.compile(dialect=self, parameters=parameters)
+ def get_disconnect_checker(self):
+ """Return a callable that determines if an SQLError is caused by a database disconnection."""
+
+ return lambda x: False
+
class ExecutionContext(object):
"""A messenger object for a Dialect that corresponds to a single execution.
return self.__transaction is not None
def _begin_impl(self):
- self.__engine.logger.info("BEGIN")
- self.__engine.dialect.do_begin(self.connection)
+ if self.__connection.is_valid:
+ self.__engine.logger.info("BEGIN")
+ try:
+ self.__engine.dialect.do_begin(self.connection)
+ except Exception, e:
+ raise exceptions.SQLError(None, None, e)
def _rollback_impl(self):
- self.__engine.logger.info("ROLLBACK")
- self.__engine.dialect.do_rollback(self.connection)
- self.__connection.close_open_cursors()
+ if self.__connection.is_valid:
+ self.__engine.logger.info("ROLLBACK")
+ try:
+ self.__engine.dialect.do_rollback(self.connection)
+ except Exception, e:
+ raise exceptions.SQLError(None, None, e)
+ self.__connection.close_open_cursors()
self.__transaction = None
def _commit_impl(self):
- self.__engine.logger.info("COMMIT")
- self.__engine.dialect.do_commit(self.connection)
+ if self.__connection.is_valid:
+ self.__engine.logger.info("COMMIT")
+ try:
+ self.__engine.dialect.do_commit(self.connection)
+ except Exception, e:
+ raise exceptions.SQLError(None, None, e)
self.__transaction = None
def _autocommit(self, statement):
context.dialect.do_execute(context.cursor, context.statement, context.parameters, context=context)
except Exception, e:
self._autorollback()
- #self._rollback_impl()
if self.__close_with_result:
self.close()
raise exceptions.SQLError(context.statement, context.parameters, e)
context.dialect.do_executemany(context.cursor, context.statement, context.parameters, context=context)
except Exception, e:
self._autorollback()
- #self._rollback_impl()
if self.__close_with_result:
self.close()
raise exceptions.SQLError(context.statement, context.parameters, e)
if tk in kwargs:
pool_args[k] = kwargs.pop(tk)
pool_args['use_threadlocal'] = self.pool_threadlocal()
- pool = poolclass(creator, **pool_args)
+ pool = poolclass(creator, disconnect_checker=dialect.get_disconnect_checker(), **pool_args)
else:
if isinstance(pool, poollib._DBProxy):
pool = pool.get_pool(*cargs, **cparams)
False, then no cursor processing occurs upon checkin.
"""
- def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False, auto_close_cursors=True, disallow_open_cursors=False):
+ def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False, auto_close_cursors=True,
+ disallow_open_cursors=False, disconnect_checker=None):
self.logger = logging.instance_logger(self)
self._threadconns = weakref.WeakValueDictionary()
self._creator = creator
self.auto_close_cursors = auto_close_cursors
self.disallow_open_cursors = disallow_open_cursors
self.echo = echo
+ if disconnect_checker:
+ self.disconnect_checker = disconnect_checker
+ else:
+ self.disconnect_checker = lambda x: False
echo = logging.echo_property()
def unique_connection(self):
self.__pool.log("Closing connection %s" % repr(self.connection))
self.connection.close()
- def invalidate(self):
- self.__pool.log("Invalidate connection %s" % repr(self.connection))
+ def invalidate(self, e=None):
+ if e is not None:
+ self.__pool.log("Invalidate connection %s (reason: %s:%s)" % (repr(self.connection), e.__class__.__name__, str(e)))
+ else:
+ self.__pool.log("Invalidate connection %s" % repr(self.connection))
self.__close()
self.connection = None
def __init__(self, pool):
self._threadfairy = _ThreadFairy(self)
self._cursors = weakref.WeakKeyDictionary()
- self.__pool = pool
+ self._pool = pool
self.__counter = 0
try:
self._connection_record = pool.get()
self.connection = None # helps with endless __getattr__ loops later on
self._connection_record = None
raise
- if self.__pool.echo:
- self.__pool.log("Connection %s checked out from pool" % repr(self.connection))
+ if self._pool.echo:
+ self._pool.log("Connection %s checked out from pool" % repr(self.connection))
+
+ _logger = property(lambda self: self._pool.logger)
- _logger = property(lambda self: self.__pool.logger)
-
- def invalidate(self):
+ is_valid = property(lambda self:self.connection is not None)
+
+ def invalidate(self, e=None):
if self.connection is None:
raise exceptions.InvalidRequestError("This connection is closed")
- self._connection_record.invalidate()
+ self._connection_record.invalidate(e=e)
self.connection = None
self._cursors = None
self._close()
c = self.connection.cursor(*args, **kwargs)
return _CursorFairy(self, c)
except Exception, e:
- self.invalidate()
+ self.invalidate(e=e)
raise
def __getattr__(self, key):
if self._cursors is not None:
# cursors should be closed before connection is returned to the pool. some dbapis like
# mysql have real issues if they are not.
- if self.__pool.auto_close_cursors:
+ if self._pool.auto_close_cursors:
self.close_open_cursors()
- elif self.__pool.disallow_open_cursors:
+ elif self._pool.disallow_open_cursors:
if len(self._cursors):
raise exceptions.InvalidRequestError("This connection still has %d open cursors" % len(self._cursors))
if self.connection is not None:
try:
self.connection.rollback()
- except:
+ except Exception, e:
if self._connection_record is not None:
- self._connection_record.invalidate()
+ self._connection_record.invalidate(e=e)
if self._connection_record is not None:
- if self.__pool.echo:
- self.__pool.log("Connection %s being returned to pool" % repr(self.connection))
- self.__pool.return_conn(self)
+ if self._pool.echo:
+ self._pool.log("Connection %s being returned to pool" % repr(self.connection))
+ self._pool.return_conn(self)
self.connection = None
self._connection_record = None
self._threadfairy = None
class _CursorFairy(object):
def __init__(self, parent, cursor):
self.__parent = parent
- self.__parent._cursors[self]=True
+ self.__parent._cursors[self] = True
self.cursor = cursor
- def invalidate(self):
- self.__parent.invalidate()
+ def execute(self, *args, **kwargs):
+ try:
+ self.cursor.execute(*args, **kwargs)
+ except Exception, e:
+ if self.__parent._pool.disconnect_checker(e):
+ self.invalidate(e=e)
+ raise
+
+ def executemany(self, *args, **kwargs):
+ try:
+ self.cursor.executemany(*args, **kwargs)
+ except Exception, e:
+ if self.__parent._pool.disconnect_checker(e):
+ self.invalidate(e=e)
+ raise
+
+ def invalidate(self, e=None):
+ self.__parent.invalidate(e=e)
def close(self):
if self in self.__parent._cursors: