import functools
from test import support
+from unittest.mock import patch
from test.test_sqlite3.test_dbapi import memory_database, managed_connect, cx_limit
self.assertEqual(steps, values)
+class RecursiveUseOfCursors(unittest.TestCase):
+ # GH-80254: sqlite3 should not segfault for recursive use of cursors.
+ msg = "Recursive use of cursors not allowed"
+
+ def setUp(self):
+ self.con = sqlite.connect(":memory:",
+ detect_types=sqlite.PARSE_COLNAMES)
+ self.cur = self.con.cursor()
+ self.cur.execute("create table test(x foo)")
+ self.cur.executemany("insert into test(x) values (?)",
+ [("foo",), ("bar",)])
+
+ def tearDown(self):
+ self.cur.close()
+ self.con.close()
+
+ def test_recursive_cursor_init(self):
+ conv = lambda x: self.cur.__init__(self.con)
+ with patch.dict(sqlite.converters, {"INIT": conv}):
+ self.cur.execute(f'select x as "x [INIT]", x from test')
+ self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
+ self.cur.fetchall)
+
+ def test_recursive_cursor_close(self):
+ conv = lambda x: self.cur.close()
+ with patch.dict(sqlite.converters, {"CLOSE": conv}):
+ self.cur.execute(f'select x as "x [CLOSE]", x from test')
+ self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
+ self.cur.fetchall)
+
+ def test_recursive_cursor_iter(self):
+ conv = lambda x, l=[]: self.cur.fetchone() if l else l.append(None)
+ with patch.dict(sqlite.converters, {"ITER": conv}):
+ self.cur.execute(f'select x as "x [ITER]", x from test')
+ self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
+ self.cur.fetchall)
+
+
if __name__ == "__main__":
unittest.main()
#include "clinic/cursor.c.h"
#undef clinic_state
+static inline int
+check_cursor_locked(pysqlite_Cursor *cur)
+{
+ if (cur->locked) {
+ PyErr_SetString(cur->connection->ProgrammingError,
+ "Recursive use of cursors not allowed.");
+ return 0;
+ }
+ return 1;
+}
+
/*[clinic input]
module _sqlite3
class _sqlite3.Cursor "pysqlite_Cursor *" "clinic_state()->CursorType"
pysqlite_Connection *connection)
/*[clinic end generated code: output=ac59dce49a809ca8 input=23d4265b534989fb]*/
{
+ if (!check_cursor_locked(self)) {
+ return -1;
+ }
+
Py_INCREF(connection);
Py_XSETREF(self->connection, connection);
Py_CLEAR(self->statement);
return 0;
}
- if (cur->locked) {
- PyErr_SetString(cur->connection->state->ProgrammingError,
- "Recursive use of cursors not allowed.");
- return 0;
- }
-
- return pysqlite_check_thread(cur->connection) && pysqlite_check_connection(cur->connection);
+ return (pysqlite_check_thread(cur->connection)
+ && pysqlite_check_connection(cur->connection)
+ && check_cursor_locked(cur));
}
static int
return NULL;
}
+ self->locked = 1; // GH-80254: Prevent recursive use of cursors.
PyObject *row = _pysqlite_fetch_one_row(self);
+ self->locked = 0;
if (row == NULL) {
return NULL;
}
pysqlite_cursor_close_impl(pysqlite_Cursor *self)
/*[clinic end generated code: output=b6055e4ec6fe63b6 input=08b36552dbb9a986]*/
{
+ if (!check_cursor_locked(self)) {
+ return NULL;
+ }
+
if (!self->connection) {
PyTypeObject *tp = Py_TYPE(self);
pysqlite_state *state = pysqlite_get_state_by_type(tp);