import functools
from test import support
+from unittest.mock import patch
+
+
class RegressionTests(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")
+class RecursiveUseOfCursors(unittest.TestCase):
+ # GH-80254: sqlite3 should not segfault for recursive use of cursors.
+ msg = "Recursive use of cursors not allowed"
+
+ def setUp(self):
+ self.con = sqlite.connect(":memory:",
+ detect_types=sqlite.PARSE_COLNAMES)
+ self.cur = self.con.cursor()
+ self.cur.execute("create table test(x foo)")
+ self.cur.executemany("insert into test(x) values (?)",
+ [("foo",), ("bar",)])
+
+ def tearDown(self):
+ self.cur.close()
+ self.con.close()
+ del self.cur
+ del self.con
+
+ def test_recursive_cursor_init(self):
+ conv = lambda x: self.cur.__init__(self.con)
+ with patch.dict(sqlite.converters, {"INIT": conv}):
+ with self.assertRaisesRegex(sqlite.ProgrammingError, self.msg):
+ self.cur.execute(f'select x as "x [INIT]", x from test')
+
+ def test_recursive_cursor_close(self):
+ conv = lambda x: self.cur.close()
+ with patch.dict(sqlite.converters, {"CLOSE": conv}):
+ with self.assertRaisesRegex(sqlite.ProgrammingError, self.msg):
+ self.cur.execute(f'select x as "x [CLOSE]", x from test')
+
+ def test_recursive_cursor_fetch(self):
+ conv = lambda x, l=[]: self.cur.fetchone() if l else l.append(None)
+ with patch.dict(sqlite.converters, {"ITER": conv}):
+ self.cur.execute(f'select x as "x [ITER]", x from test')
+ with self.assertRaisesRegex(sqlite.ProgrammingError, self.msg):
+ self.cur.fetchall()
+
+
def suite():
regression_suite = unittest.makeSuite(RegressionTests, "Check")
+ recursive_cursor = unittest.makeSuite(RecursiveUseOfCursors)
return unittest.TestSuite((
regression_suite,
+ recursive_cursor,
))
def test():
--- /dev/null
+Raise :exc:`~sqlite3.ProgrammingError` instead of segfaulting on recursive
+usage of cursors in :mod:`sqlite3` converters. Patch by Sergey Fedoseev.
PyObject* pysqlite_cursor_iternext(pysqlite_Cursor* self);
+static inline int
+check_cursor_locked(pysqlite_Cursor *cur)
+{
+ if (cur->locked) {
+ PyErr_SetString(pysqlite_ProgrammingError,
+ "Recursive use of cursors not allowed.");
+ return 0;
+ }
+ return 1;
+}
+
static const char errmsg_fetch_across_rollback[] = "Cursor needed to be reset because of commit/rollback and can no longer be fetched from.";
static int pysqlite_cursor_init(pysqlite_Cursor* self, PyObject* args, PyObject* kwargs)
{
+ if (!check_cursor_locked(self)) {
+ return -1;
+ }
+
pysqlite_Connection* connection;
if (!PyArg_ParseTuple(args, "O!", &pysqlite_ConnectionType, &connection))
return 0;
}
- if (cur->locked) {
- PyErr_SetString(pysqlite_ProgrammingError, "Recursive use of cursors not allowed.");
- return 0;
- }
-
- return pysqlite_check_thread(cur->connection) && pysqlite_check_connection(cur->connection);
+ return (pysqlite_check_thread(cur->connection)
+ && pysqlite_check_connection(cur->connection)
+ && check_cursor_locked(cur));
}
static PyObject *
if (self->statement) {
rc = pysqlite_step(self->statement->st, self->connection);
if (PyErr_Occurred()) {
- (void)pysqlite_statement_reset(self->statement);
- Py_DECREF(next_row);
- return NULL;
+ goto error;
}
if (rc != SQLITE_DONE && rc != SQLITE_ROW) {
- (void)pysqlite_statement_reset(self->statement);
- Py_DECREF(next_row);
_pysqlite_seterror(self->connection->db, NULL);
- return NULL;
+ goto error;
}
if (rc == SQLITE_ROW) {
+ self->locked = 1; // GH-80254: Prevent recursive use of cursors.
self->next_row = _pysqlite_fetch_one_row(self);
+ self->locked = 0;
if (self->next_row == NULL) {
- (void)pysqlite_statement_reset(self->statement);
- return NULL;
+ goto error;
}
}
}
return next_row;
+
+error:
+ (void)pysqlite_statement_reset(self->statement);
+ Py_DECREF(next_row);
+ return NULL;
}
PyObject* pysqlite_cursor_fetchone(pysqlite_Cursor* self, PyObject* args)
PyObject* pysqlite_cursor_close(pysqlite_Cursor* self, PyObject* args)
{
+ if (!check_cursor_locked(self)) {
+ return NULL;
+ }
+
if (!self->connection) {
PyErr_SetString(pysqlite_ProgrammingError,
"Base Cursor.__init__ not called.");