From 6f16b8db6f08cefd68cdf251292316497eb849b3 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Wed, 9 Feb 2011 15:06:32 -0500 Subject: [PATCH] - add connection and cursor to is_disconnect(). We aren't using it yet, but we'd like to. Most DBAPIs don't give us anything we can do with it. Some research was done on psycopg2 and it still seems like they give us no adequate method (tried connection.closed, cursor.closed, connection.status). mxodbc claims their .closed attribute will work (but I am skeptical). - remove beahvior in pool that auto-invalidated a connection when the cursor failed to create. That's not the pool's job. we need the conn for the error logic. Can't get any tests to fail, curious why that behavior was there, guess we'll find out (or not). - add support for psycopg2 version detection. even though we have no use for it yet... - adjust one of the reconnect tests to work with oracle's horrendously slow connect speed --- lib/sqlalchemy/connectors/mxodbc.py | 6 +-- lib/sqlalchemy/connectors/pyodbc.py | 2 +- lib/sqlalchemy/connectors/zxJDBC.py | 2 +- .../dialects/firebird/kinterbasdb.py | 2 +- .../dialects/informix/informixdb.py | 2 +- lib/sqlalchemy/dialects/mssql/adodbapi.py | 2 +- lib/sqlalchemy/dialects/mssql/pymssql.py | 2 +- lib/sqlalchemy/dialects/mysql/base.py | 2 +- .../dialects/mysql/mysqlconnector.py | 2 +- lib/sqlalchemy/dialects/mysql/oursql.py | 2 +- lib/sqlalchemy/dialects/oracle/cx_oracle.py | 2 +- lib/sqlalchemy/dialects/postgresql/pg8000.py | 2 +- .../dialects/postgresql/psycopg2.py | 8 +++- .../dialects/postgresql/pypostgresql.py | 2 +- lib/sqlalchemy/dialects/sqlite/pysqlite.py | 2 +- lib/sqlalchemy/dialects/sybase/pysybase.py | 2 +- lib/sqlalchemy/engine/base.py | 4 +- lib/sqlalchemy/engine/default.py | 2 +- lib/sqlalchemy/pool.py | 6 +-- test/engine/test_reconnect.py | 39 +++++++++++++++++-- 20 files changed, 63 insertions(+), 30 deletions(-) diff --git a/lib/sqlalchemy/connectors/mxodbc.py b/lib/sqlalchemy/connectors/mxodbc.py index f467234ca3..5573dda407 100644 --- a/lib/sqlalchemy/connectors/mxodbc.py +++ b/lib/sqlalchemy/connectors/mxodbc.py @@ -106,9 +106,9 @@ class MxODBCConnector(Connector): opts.pop('database', None) return (args,), opts - def is_disconnect(self, e): - # eGenix recommends checking connection.closed here, - # but how can we get a handle on the current connection? + def is_disconnect(self, e, connection, cursor): + # TODO: eGenix recommends checking connection.closed here + # Does that detect dropped connections ? if isinstance(e, self.dbapi.ProgrammingError): return "connection already closed" in str(e) elif isinstance(e, self.dbapi.Error): diff --git a/lib/sqlalchemy/connectors/pyodbc.py b/lib/sqlalchemy/connectors/pyodbc.py index c66a8a8ae3..3f6d6cb5fa 100644 --- a/lib/sqlalchemy/connectors/pyodbc.py +++ b/lib/sqlalchemy/connectors/pyodbc.py @@ -81,7 +81,7 @@ class PyODBCConnector(Connector): connectors.extend(['%s=%s' % (k,v) for k,v in keys.iteritems()]) return [[";".join (connectors)], connect_args] - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): if isinstance(e, self.dbapi.ProgrammingError): return "The cursor's connection has been closed." in str(e) or \ 'Attempt to use a closed connection.' in str(e) diff --git a/lib/sqlalchemy/connectors/zxJDBC.py b/lib/sqlalchemy/connectors/zxJDBC.py index a9ff5ec95c..20bf9d9cfd 100644 --- a/lib/sqlalchemy/connectors/zxJDBC.py +++ b/lib/sqlalchemy/connectors/zxJDBC.py @@ -46,7 +46,7 @@ class ZxJDBCConnector(Connector): self.jdbc_driver_name], opts] - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): if not isinstance(e, self.dbapi.ProgrammingError): return False e = str(e) diff --git a/lib/sqlalchemy/dialects/firebird/kinterbasdb.py b/lib/sqlalchemy/dialects/firebird/kinterbasdb.py index 216fec2703..ebb7805ae6 100644 --- a/lib/sqlalchemy/dialects/firebird/kinterbasdb.py +++ b/lib/sqlalchemy/dialects/firebird/kinterbasdb.py @@ -153,7 +153,7 @@ class FBDialect_kinterbasdb(FBDialect): else: return tuple([int(x) for x in m.group(1, 2, 3)] + ['interbase']) - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): if isinstance(e, (self.dbapi.OperationalError, self.dbapi.ProgrammingError)): msg = str(e) diff --git a/lib/sqlalchemy/dialects/informix/informixdb.py b/lib/sqlalchemy/dialects/informix/informixdb.py index c819838169..1b6833af7d 100644 --- a/lib/sqlalchemy/dialects/informix/informixdb.py +++ b/lib/sqlalchemy/dialects/informix/informixdb.py @@ -62,7 +62,7 @@ class InformixDialect_informixdb(InformixDialect): v = VERSION_RE.split(connection.connection.dbms_version) return (int(v[1]), int(v[2]), v[3]) - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): if isinstance(e, self.dbapi.OperationalError): return 'closed the connection' in str(e) \ or 'connection not open' in str(e) diff --git a/lib/sqlalchemy/dialects/mssql/adodbapi.py b/lib/sqlalchemy/dialects/mssql/adodbapi.py index 355214d892..f2d945de21 100644 --- a/lib/sqlalchemy/dialects/mssql/adodbapi.py +++ b/lib/sqlalchemy/dialects/mssql/adodbapi.py @@ -62,7 +62,7 @@ class MSDialect_adodbapi(MSDialect): connectors.append("Integrated Security=SSPI") return [[";".join (connectors)], {}] - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): return isinstance(e, self.dbapi.adodbapi.DatabaseError) and \ "'connection failure'" in str(e) diff --git a/lib/sqlalchemy/dialects/mssql/pymssql.py b/lib/sqlalchemy/dialects/mssql/pymssql.py index 192e633669..8bc0ad95b7 100644 --- a/lib/sqlalchemy/dialects/mssql/pymssql.py +++ b/lib/sqlalchemy/dialects/mssql/pymssql.py @@ -95,7 +95,7 @@ class MSDialect_pymssql(MSDialect): opts['host'] = "%s:%s" % (opts['host'], port) return [[], opts] - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): for msg in ( "Error 10054", "Not connected to any MS SQL server", diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py index b495cc36e7..882e13d2e0 100644 --- a/lib/sqlalchemy/dialects/mysql/base.py +++ b/lib/sqlalchemy/dialects/mysql/base.py @@ -1711,7 +1711,7 @@ class MySQLDialect(default.DefaultDialect): resultset = connection.execute("XA RECOVER") return [row['data'][0:row['gtrid_length']] for row in resultset] - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): if isinstance(e, self.dbapi.OperationalError): return self._extract_error_code(e) in \ (2006, 2013, 2014, 2045, 2055) diff --git a/lib/sqlalchemy/dialects/mysql/mysqlconnector.py b/lib/sqlalchemy/dialects/mysql/mysqlconnector.py index d3ec1f5cf4..035ebe4599 100644 --- a/lib/sqlalchemy/dialects/mysql/mysqlconnector.py +++ b/lib/sqlalchemy/dialects/mysql/mysqlconnector.py @@ -118,7 +118,7 @@ class MySQLDialect_mysqlconnector(MySQLDialect): def _extract_error_code(self, exception): return exception.errno - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): errnos = (2006, 2013, 2014, 2045, 2055, 2048) exceptions = (self.dbapi.OperationalError,self.dbapi.InterfaceError) if isinstance(e, exceptions): diff --git a/lib/sqlalchemy/dialects/mysql/oursql.py b/lib/sqlalchemy/dialects/mysql/oursql.py index d3ef839b14..8caa1eaec8 100644 --- a/lib/sqlalchemy/dialects/mysql/oursql.py +++ b/lib/sqlalchemy/dialects/mysql/oursql.py @@ -195,7 +195,7 @@ class MySQLDialect_oursql(MySQLDialect): execution_options(_oursql_plain_query=True), table, charset, full_name) - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): if isinstance(e, self.dbapi.ProgrammingError): return e.errno is None and 'cursor' not in e.args[1] and e.args[1].endswith('closed') else: diff --git a/lib/sqlalchemy/dialects/oracle/cx_oracle.py b/lib/sqlalchemy/dialects/oracle/cx_oracle.py index bc1c877037..b00adcd632 100644 --- a/lib/sqlalchemy/dialects/oracle/cx_oracle.py +++ b/lib/sqlalchemy/dialects/oracle/cx_oracle.py @@ -680,7 +680,7 @@ class OracleDialect_cx_oracle(OracleDialect): for x in connection.connection.version.split('.') ) - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): if isinstance(e, self.dbapi.InterfaceError): return "not connected" in str(e) else: diff --git a/lib/sqlalchemy/dialects/postgresql/pg8000.py b/lib/sqlalchemy/dialects/postgresql/pg8000.py index d3c2f1d50c..c4f00eabea 100644 --- a/lib/sqlalchemy/dialects/postgresql/pg8000.py +++ b/lib/sqlalchemy/dialects/postgresql/pg8000.py @@ -108,7 +108,7 @@ class PGDialect_pg8000(PGDialect): opts.update(url.query) return ([], opts) - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): return "connection is closed" in str(e) dialect = PGDialect_pg8000 diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index 50ea9d4371..10d6e02697 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -227,6 +227,7 @@ class PGDialect_psycopg2(PGDialect): execution_ctx_cls = PGExecutionContext_psycopg2 statement_compiler = PGCompiler_psycopg2 preparer = PGIdentifierPreparer_psycopg2 + psycopg2_version = (0, 0) colspecs = util.update_copy( PGDialect.colspecs, @@ -243,6 +244,11 @@ class PGDialect_psycopg2(PGDialect): self.server_side_cursors = server_side_cursors self.use_native_unicode = use_native_unicode self.supports_unicode_binds = use_native_unicode + if self.dbapi and hasattr(self.dbapi, '__version__'): + m = re.match(r'(\d+)\.(\d+)\.(\d+)?', + self.dbapi.__version__) + if m: + self.psycopg2_version = tuple(map(int, m.group(1, 2, 3))) @classmethod def dbapi(cls): @@ -295,7 +301,7 @@ class PGDialect_psycopg2(PGDialect): opts.update(url.query) return ([], opts) - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): if isinstance(e, self.dbapi.OperationalError): # these error messages from libpq: interfaces/libpq/fe-misc.c. # TODO: these are sent through gettext in libpq and we can't diff --git a/lib/sqlalchemy/dialects/postgresql/pypostgresql.py b/lib/sqlalchemy/dialects/postgresql/pypostgresql.py index dd22fcb330..a137a6240f 100644 --- a/lib/sqlalchemy/dialects/postgresql/pypostgresql.py +++ b/lib/sqlalchemy/dialects/postgresql/pypostgresql.py @@ -67,7 +67,7 @@ class PGDialect_pypostgresql(PGDialect): opts.update(url.query) return ([], opts) - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): return "connection is closed" in str(e) dialect = PGDialect_pypostgresql diff --git a/lib/sqlalchemy/dialects/sqlite/pysqlite.py b/lib/sqlalchemy/dialects/sqlite/pysqlite.py index 14cfa93d95..646c5b86f1 100644 --- a/lib/sqlalchemy/dialects/sqlite/pysqlite.py +++ b/lib/sqlalchemy/dialects/sqlite/pysqlite.py @@ -238,7 +238,7 @@ class SQLiteDialect_pysqlite(SQLiteDialect): return ([filename], opts) - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): return isinstance(e, self.dbapi.ProgrammingError) and "Cannot operate on a closed database." in str(e) dialect = SQLiteDialect_pysqlite diff --git a/lib/sqlalchemy/dialects/sybase/pysybase.py b/lib/sqlalchemy/dialects/sybase/pysybase.py index fed7928172..e12cf07dd7 100644 --- a/lib/sqlalchemy/dialects/sybase/pysybase.py +++ b/lib/sqlalchemy/dialects/sybase/pysybase.py @@ -87,7 +87,7 @@ class SybaseDialect_pysybase(SybaseDialect): # (12, 5, 0, 0) return (vers / 1000, vers % 1000 / 100, vers % 100 / 10, vers % 10) - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): if isinstance(e, (self.dbapi.OperationalError, self.dbapi.ProgrammingError)): msg = str(e) diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index b78a305374..f6c9741365 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -502,7 +502,7 @@ class Dialect(object): raise NotImplementedError() - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): """Return True if the given DB-API error indicates an invalid connection""" @@ -1518,7 +1518,7 @@ class Connection(Connectable): if context: context.handle_dbapi_exception(e) - is_disconnect = self.dialect.is_disconnect(e) + is_disconnect = self.dialect.is_disconnect(e, self.__connection, cursor) if is_disconnect: self.invalidate(e) self.engine.dispose() diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index aa75a28532..e669b305ea 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -324,7 +324,7 @@ class DefaultDialect(base.Dialect): def do_execute(self, cursor, statement, parameters, context=None): cursor.execute(statement, parameters) - def is_disconnect(self, e): + def is_disconnect(self, e, connection, cursor): return False def reset_isolation_level(self, dbapi_conn): diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 5150d282c5..7201bccf32 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -425,11 +425,7 @@ class _ConnectionFairy(object): self._close() def cursor(self, *args, **kwargs): - try: - return self.connection.cursor(*args, **kwargs) - except Exception, e: - self.invalidate(e=e) - raise + return self.connection.cursor(*args, **kwargs) def __getattr__(self, key): return getattr(self.connection, key) diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index 31a2b705a9..9b3a1b4db6 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -58,7 +58,7 @@ class MockReconnectTest(TestBase): module=dbapi, _initialize=False) # monkeypatch disconnect checker - db.dialect.is_disconnect = lambda e: isinstance(e, MockDisconnect) + db.dialect.is_disconnect = lambda e, conn, cursor: isinstance(e, MockDisconnect) def test_reconnect(self): """test that an 'is_disconnect' condition will invalidate the @@ -259,6 +259,22 @@ class RealReconnectTest(TestBase): conn.close() + def test_ensure_is_disconnect_gets_connection(self): + def is_disconnect(e, conn, cursor): + # connection is still present + assert conn.connection is not None + # the error usually occurs on connection.cursor(), + # though MySQLdb we get a non-working cursor. + # assert cursor is None + + engine.dialect.is_disconnect = is_disconnect + conn = engine.connect() + engine.test_shutdown() + assert_raises( + tsa.exc.DBAPIError, + conn.execute, select([1]) + ) + def test_invalidate_twice(self): conn = engine.connect() conn.invalidate() @@ -280,7 +296,7 @@ class RealReconnectTest(TestBase): p1 = engine.pool - def is_disconnect(e): + def is_disconnect(e, conn, cursor): return True engine.dialect.is_disconnect = is_disconnect @@ -374,13 +390,28 @@ class RecycleTest(TestBase): def test_basic(self): for threadlocal in False, True: - engine = engines.reconnecting_engine(options={'pool_recycle' - : 1, 'pool_threadlocal': threadlocal}) + engine = engines.reconnecting_engine( + options={'pool_threadlocal': threadlocal}) + conn = engine.contextual_connect() eq_(conn.execute(select([1])).scalar(), 1) conn.close() + + # set the pool recycle down to 1. + # we aren't doing this inline with the + # engine create since cx_oracle takes way + # too long to create the 1st connection and don't + # want to build a huge delay into this test. + + engine.pool._recycle = 1 + + # kill the DB connection engine.test_shutdown() + + # wait until past the recycle period time.sleep(2) + + # can connect, no exception conn = engine.contextual_connect() eq_(conn.execute(select([1])).scalar(), 1) conn.close() -- 2.47.2