- The value of a bindparam() can be a callable, in which case
it's evaluated at statement execution time to get the value.
+ - added exception wrapping/reconnect support to result set
+ fetching. Reconnect works for those databases that
+ raise a catchable data error during results
+ (i.e. doesn't work on MySQL) [ticket:978]
+
- orm
- any(), has(), contains(), attribute level == and != now
work properly with self-referential relations - the clause
if isinstance(e, self.dbapi.OperationalError):
return 'closed the connection' in str(e) or 'connection not open' in str(e)
elif isinstance(e, self.dbapi.InterfaceError):
- return 'connection already closed' in str(e)
+ return 'connection already closed' in str(e) or 'cursor already closed' in str(e)
elif isinstance(e, self.dbapi.ProgrammingError):
# yes, it really says "losed", not "closed"
return "losed the connection unexpectedly" in str(e)
def fetchall(self):
"""Fetch all rows, just like DB-API ``cursor.fetchall()``."""
- l = [self._process_row(self, row) for row in self._fetchall_impl()]
- self.close()
- return l
+ try:
+ l = [self._process_row(self, row) for row in self._fetchall_impl()]
+ self.close()
+ return l
+ except Exception, e:
+ self.connection._handle_dbapi_exception(e, None, None, self.cursor)
+ raise
def fetchmany(self, size=None):
"""Fetch many rows, just like DB-API ``cursor.fetchmany(size=cursor.arraysize)``."""
- l = [self._process_row(self, row) for row in self._fetchmany_impl(size)]
- if len(l) == 0:
- self.close()
- return l
+ try:
+ l = [self._process_row(self, row) for row in self._fetchmany_impl(size)]
+ if len(l) == 0:
+ self.close()
+ return l
+ except Exception, e:
+ self.connection._handle_dbapi_exception(e, None, None, self.cursor)
+ raise
def fetchone(self):
"""Fetch one row, just like DB-API ``cursor.fetchone()``."""
- row = self._fetchone_impl()
- if row is not None:
- return self._process_row(self, row)
- else:
- self.close()
- return None
+ try:
+ row = self._fetchone_impl()
+ if row is not None:
+ return self._process_row(self, row)
+ else:
+ self.close()
+ return None
+ except Exception, e:
+ self.connection._handle_dbapi_exception(e, None, None, self.cursor)
+ raise
def scalar(self):
"""Fetch the first column of the first row, and close the result set."""
- row = self._fetchone_impl()
+ try:
+ row = self._fetchone_impl()
+ except Exception, e:
+ self.connection._handle_dbapi_exception(e, None, None, self.cursor)
+ raise
+
try:
if row is not None:
return self._process_row(self, row)[0]
import testenv; testenv.configure_for_tests()
import sys, weakref
-from sqlalchemy import create_engine, exceptions, select
+from sqlalchemy import create_engine, exceptions, select, MetaData, Table, Column, Integer, String
from testlib import *
assert not conn.invalidated
conn.close()
-
+
def test_close(self):
conn = engine.connect()
self.assertEquals(conn.execute(select([1])).scalar(), 1)
self.assertEquals(conn.execute(select([1])).scalar(), 1)
assert not conn.invalidated
+class InvalidateDuringResultTest(TestBase):
+ def setUp(self):
+ global meta, table, engine
+ engine = engines.reconnecting_engine()
+ meta = MetaData(engine)
+ table = Table('sometable', meta,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(50)))
+ meta.create_all()
+ table.insert().execute(
+ [{'id':i, 'name':'row %d' % i} for i in range(1, 100)]
+ )
+
+ def tearDown(self):
+ meta.drop_all()
+ engine.dispose()
+
+ @testing.fails_on('mysql')
+ def test_invalidate_on_results(self):
+ conn = engine.connect()
+
+ result = conn.execute("select * from sometable")
+ for x in xrange(20):
+ result.fetchone()
+
+ engine.test_shutdown()
+ try:
+ result.fetchone()
+ assert False
+ except exceptions.DBAPIError, e:
+ if not e.connection_invalidated:
+ raise
+ assert conn.invalidated
+
if __name__ == '__main__':
testenv.main()