def has_table(self, connection, table_name):
cursor = connection.execute("PRAGMA table_info(" + table_name + ")", {})
row = cursor.fetchone()
+
+ # consume remaining rows, to work around: http://www.sqlite.org/cvstrac/tktview?tn=1884
+ while cursor.fetchone() is not None:pass
+
return (row is not None)
def reflecttable(self, connection, table):
def log(self, msg):
self._logger.write(msg)
+ def dispose(self):
+ raise NotImplementedError()
+
+ def __del__(self):
+ self.dispose()
+
class ConnectionFairy(object):
def __init__(self, pool, connection=None):
self.pool = pool
self._creator = creator
def dispose(self):
- pass
+ for key, conn in self._conns.items():
+ try:
+ conn.close()
+ except:
+ # sqlite won't even let you close a conn from a thread that didn't create it
+ pass
+ del self._conns[key]
+
def status(self):
return "SingletonThreadPool id:%d thread:%d size: %d" % (id(self), thread.get_ident(), len(self._conns))
conn.close()
except Queue.Empty:
break
- def __del__(self):
- self.dispose()
def status(self):
tup = (self.size(), self.checkedin(), self.overflow(), self.checkedout())
assert names == [uname]
finally:
module_metadata.drop_all(module_engine)
+ module_engine.get_engine().dispose()
except Exception, e:
import traceback
traceback.print_exc()
queue.put(False)
return test
- # NOTE: I'm not sure how to give the test runner the option to
- # override these uris, or how to safely clear them after test runs
a = Thread(target=run('sqlite:///threadtesta.db', 'jim', qa))
b = Thread(target=run('sqlite:///threadtestb.db', 'joe', qb))
a.start()
b.start()
-
+
# block and wait for the threads to push their results
- res = qa.get(True)
+ res = qa.get()
if res != False:
raise res
- res = qb.get(True)
+ res = qb.get()
if res != False:
raise res