sqlite.connect(':memory:', check_same_thread=False)
self.assertEqual(str(cm.exception), 'shared connections not available')
+
+class UninitialisedConnectionTests(unittest.TestCase):
+ def setUp(self):
+ self.cx = sqlite.Connection.__new__(sqlite.Connection)
+
+ def test_uninit_operations(self):
+ funcs = (
+ lambda: self.cx.isolation_level,
+ lambda: self.cx.total_changes,
+ lambda: self.cx.in_transaction,
+ lambda: self.cx.iterdump(),
+ lambda: self.cx.cursor(),
+ lambda: self.cx.close(),
+ )
+ for func in funcs:
+ with self.subTest(func=func):
+ self.assertRaisesRegex(sqlite.ProgrammingError,
+ "Base Connection.__init__ not called",
+ func)
+
+
class CursorTests(unittest.TestCase):
def setUp(self):
self.cx = sqlite.connect(":memory:")
closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
on_conflict_suite = unittest.makeSuite(SqliteOnConflictTests, "Check")
+ uninit_con_suite = unittest.makeSuite(UninitialisedConnectionTests)
return unittest.TestSuite((
module_suite, connection_suite, cursor_suite, thread_suite,
constructor_suite, ext_suite, closed_con_suite, closed_cur_suite,
- on_conflict_suite,
+ on_conflict_suite, uninit_con_suite,
))
def test():
database = PyBytes_AsString(database_obj);
- self->initialized = 1;
-
self->begin_statement = NULL;
Py_CLEAR(self->statement_cache);
Py_INCREF(isolation_level);
}
Py_CLEAR(self->isolation_level);
- if (pysqlite_connection_set_isolation_level(self, isolation_level, NULL) < 0) {
+ if (pysqlite_connection_set_isolation_level(self, isolation_level, NULL) != 0) {
Py_DECREF(isolation_level);
return -1;
}
self->ProgrammingError = pysqlite_ProgrammingError;
self->NotSupportedError = pysqlite_NotSupportedError;
+ self->initialized = 1;
+
return 0;
}
return NULL;
}
+ if (!self->initialized) {
+ PyErr_SetString(pysqlite_ProgrammingError,
+ "Base Connection.__init__ not called.");
+ return NULL;
+ }
+
pysqlite_do_all_statements(self, ACTION_FINALIZE, 1);
if (self->db) {
static PyObject* pysqlite_connection_get_isolation_level(pysqlite_Connection* self, void* unused)
{
+ if (!pysqlite_check_connection(self)) {
+ return NULL;
+ }
Py_INCREF(self->isolation_level);
return self->isolation_level;
}
return -1;
}
if (isolation_level == Py_None) {
- PyObject *res = pysqlite_connection_commit(self, NULL);
- if (!res) {
- return -1;
+ /* We might get called during connection init, so we cannot use
+ * pysqlite_connection_commit() here. */
+ if (self->db && !sqlite3_get_autocommit(self->db)) {
+ int rc;
+ Py_BEGIN_ALLOW_THREADS
+ rc = sqlite3_exec(self->db, "COMMIT", NULL, NULL, NULL);
+ Py_END_ALLOW_THREADS
+ if (rc != SQLITE_OK) {
+ return _pysqlite_seterror(self->db, NULL);
+ }
}
- Py_DECREF(res);
self->begin_statement = NULL;
} else {