self.con.close()
def test_func_error_on_create(self):
- with self.assertRaises(sqlite.OperationalError):
+ with self.assertRaisesRegex(sqlite.ProgrammingError, "not -100"):
self.con.create_function("bla", -100, lambda x: 2*x)
def test_func_too_many_args(self):
self.assertEqual(self.cur.fetchall(), self.expected)
def test_win_error_on_create(self):
- self.assertRaises(sqlite.ProgrammingError,
- self.con.create_window_function,
- "shouldfail", -100, WindowSumInt)
+ with self.assertRaisesRegex(sqlite.ProgrammingError, "not -100"):
+ self.con.create_window_function("shouldfail", -100, WindowSumInt)
@with_tracebacks(BadWindow)
def test_win_exception_in_method(self):
self.con.close()
def test_aggr_error_on_create(self):
- with self.assertRaises(sqlite.OperationalError):
+ with self.assertRaisesRegex(sqlite.ProgrammingError, "not -100"):
self.con.create_function("bla", -100, AggrSum)
@with_tracebacks(AttributeError, msg_regex="AggrNoStep")
}
}
+static int
+check_num_params(pysqlite_Connection *self, const int n, const char *name)
+{
+ int limit = sqlite3_limit(self->db, SQLITE_LIMIT_FUNCTION_ARG, -1);
+ assert(limit >= 0);
+ if (n < -1 || n > limit) {
+ PyErr_Format(self->ProgrammingError,
+ "'%s' must be between -1 and %d, not %d",
+ name, limit, n);
+ return -1;
+ }
+ return 0;
+}
+
/*[clinic input]
_sqlite3.Connection.create_function as pysqlite_connection_create_function
if (!pysqlite_check_thread(self) || !pysqlite_check_connection(self)) {
return NULL;
}
+ if (check_num_params(self, narg, "narg") < 0) {
+ return NULL;
+ }
if (deterministic) {
flags |= SQLITE_DETERMINISTIC;
"SQLite 3.25.0 or higher");
return NULL;
}
-
if (!pysqlite_check_thread(self) || !pysqlite_check_connection(self)) {
return NULL;
}
+ if (check_num_params(self, num_params, "num_params") < 0) {
+ return NULL;
+ }
int flags = SQLITE_UTF8;
int rc;
if (!pysqlite_check_thread(self) || !pysqlite_check_connection(self)) {
return NULL;
}
+ if (check_num_params(self, n_arg, "n_arg") < 0) {
+ return NULL;
+ }
callback_context *ctx = create_callback_context(cls, aggregate_class);
if (ctx == NULL) {