As in title, expose C `raise` function as `raise_function` in `signal` module. Also drop existing `raise_signal` in `_testcapi` module and replace all usages with new function.
https://bugs.python.org/issue35568
:func:`sigpending`.
+.. function:: raise_signal(signum)
+
+ Sends a signal to the calling process. Returns nothing.
+
+ .. versionadded:: 3.8
+
+
.. function:: pthread_kill(thread_id, signalnum)
Send the signal *signalnum* to the thread *thread_id*, another thread in the
def SIGINT_after_delay():
time.sleep(1)
- _testcapi.raise_signal(signal.SIGINT)
+ signal.raise_signal(signal.SIGINT)
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
l = asyncio.get_event_loop()
@skip_segfault_on_android
def test_sigbus(self):
self.check_fatal_error("""
- import _testcapi
import faulthandler
import signal
faulthandler.enable()
- _testcapi.raise_signal(signal.SIGBUS)
+ signal.raise_signal(signal.SIGBUS)
""",
- 6,
+ 5,
'Bus error')
@unittest.skipIf(_testcapi is None, 'need _testcapi')
@skip_segfault_on_android
def test_sigill(self):
self.check_fatal_error("""
- import _testcapi
import faulthandler
import signal
faulthandler.enable()
- _testcapi.raise_signal(signal.SIGILL)
+ signal.raise_signal(signal.SIGILL)
""",
- 6,
+ 5,
'Illegal instruction')
def test_fatal_error(self):
'need signal.pthread_sigmask()')
def test_setsigmask(self):
code = textwrap.dedent("""\
- import _testcapi, signal
- _testcapi.raise_signal(signal.SIGUSR1)""")
+ import signal
+ signal.raise_signal(signal.SIGUSR1)""")
pid = posix.posix_spawn(
sys.executable,
def test_setsigdef(self):
original_handler = signal.signal(signal.SIGUSR1, signal.SIG_IGN)
code = textwrap.dedent("""\
- import _testcapi, signal
- _testcapi.raise_signal(signal.SIGUSR1)""")
+ import signal
+ signal.raise_signal(signal.SIGUSR1)""")
try:
pid = posix.posix_spawn(
sys.executable,
ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR))
TEST_INTERRUPTED = textwrap.dedent("""
- from signal import SIGINT
+ from signal import SIGINT, raise_signal
try:
- from _testcapi import raise_signal
raise_signal(SIGINT)
except ImportError:
import os
+import errno
import os
import random
import signal
signal.set_wakeup_fd(r)
try:
with captured_stderr() as err:
- _testcapi.raise_signal(signal.SIGALRM)
+ signal.raise_signal(signal.SIGALRM)
except ZeroDivisionError:
# An ignored exception should have been printed out on stderr
err = err.getvalue()
def test_signum(self):
self.check_wakeup("""def test():
- import _testcapi
signal.signal(signal.SIGUSR1, handler)
- _testcapi.raise_signal(signal.SIGUSR1)
- _testcapi.raise_signal(signal.SIGALRM)
+ signal.raise_signal(signal.SIGUSR1)
+ signal.raise_signal(signal.SIGALRM)
""", signal.SIGUSR1, signal.SIGALRM)
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
signal.signal(signum2, handler)
signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
- _testcapi.raise_signal(signum1)
- _testcapi.raise_signal(signum2)
+ signal.raise_signal(signum1)
+ signal.raise_signal(signum2)
# Unblocking the 2 signals calls the C signal handler twice
signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
""", signal.SIGUSR1, signal.SIGUSR2, ordered=False)
write.setblocking(False)
signal.set_wakeup_fd(write.fileno())
- _testcapi.raise_signal(signum)
+ signal.raise_signal(signum)
data = read.recv(1)
if not data:
write.close()
with captured_stderr() as err:
- _testcapi.raise_signal(signum)
+ signal.raise_signal(signum)
err = err.getvalue()
if ('Exception ignored when trying to {action} to the signal wakeup fd'
signal.set_wakeup_fd(write.fileno())
with captured_stderr() as err:
- _testcapi.raise_signal(signum)
+ signal.raise_signal(signum)
err = err.getvalue()
if msg not in err:
signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)
with captured_stderr() as err:
- _testcapi.raise_signal(signum)
+ signal.raise_signal(signum)
err = err.getvalue()
if msg not in err:
signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)
with captured_stderr() as err:
- _testcapi.raise_signal(signum)
+ signal.raise_signal(signum)
err = err.getvalue()
if err != "":
signal.set_wakeup_fd(write.fileno())
with captured_stderr() as err:
- _testcapi.raise_signal(signum)
+ signal.raise_signal(signum)
err = err.getvalue()
if msg not in err:
# Python handler
self.assertEqual(len(sigs), N, "Some signals were lost")
+class RaiseSignalTest(unittest.TestCase):
+
+ def test_sigint(self):
+ try:
+ signal.raise_signal(signal.SIGINT)
+ self.fail("Expected KeyInterrupt")
+ except KeyboardInterrupt:
+ pass
+
+ @unittest.skipIf(sys.platform != "win32", "Windows specific test")
+ def test_invalid_argument(self):
+ try:
+ SIGHUP = 1 # not supported on win32
+ signal.raise_signal(SIGHUP)
+ self.fail("OSError (Invalid argument) expected")
+ except OSError as e:
+ if e.errno == errno.EINVAL:
+ pass
+ else:
+ raise
+
+ def test_handler(self):
+ is_ok = False
+ def handler(a, b):
+ nonlocal is_ok
+ is_ok = True
+ old_signal = signal.signal(signal.SIGINT, handler)
+ self.addCleanup(signal.signal, signal.SIGINT, old_signal)
+
+ signal.raise_signal(signal.SIGINT)
+ self.assertTrue(is_ok)
+
def tearDownModule():
support.reap_children()
--- /dev/null
+Expose ``raise(signum)`` as `raise_signal`
return res;
}
-static PyObject*
-test_raise_signal(PyObject* self, PyObject *args)
-{
- int signum, err;
-
- if (!PyArg_ParseTuple(args, "i:raise_signal", &signum)) {
- return NULL;
- }
-
- err = raise(signum);
- if (err)
- return PyErr_SetFromErrno(PyExc_OSError);
-
- if (PyErr_CheckSignals() < 0)
- return NULL;
-
- Py_RETURN_NONE;
-}
-
/* marshal */
static PyObject*
{"docstring_with_signature_with_defaults",
(PyCFunction)test_with_docstring, METH_NOARGS,
docstring_with_signature_with_defaults},
- {"raise_signal",
- (PyCFunction)test_raise_signal, METH_VARARGS},
{"call_in_temporary_c_thread", call_in_temporary_c_thread, METH_O,
PyDoc_STR("set_error_class(error_class) -> None")},
{"pymarshal_write_long_to_file",
#endif /* defined(HAVE_PAUSE) */
+PyDoc_STRVAR(signal_raise_signal__doc__,
+"raise_signal($module, signalnum, /)\n"
+"--\n"
+"\n"
+"Send a signal to the executing process.");
+
+#define SIGNAL_RAISE_SIGNAL_METHODDEF \
+ {"raise_signal", (PyCFunction)signal_raise_signal, METH_O, signal_raise_signal__doc__},
+
+static PyObject *
+signal_raise_signal_impl(PyObject *module, int signalnum);
+
+static PyObject *
+signal_raise_signal(PyObject *module, PyObject *arg)
+{
+ PyObject *return_value = NULL;
+ int signalnum;
+
+ if (PyFloat_Check(arg)) {
+ PyErr_SetString(PyExc_TypeError,
+ "integer argument expected, got float" );
+ goto exit;
+ }
+ signalnum = _PyLong_AsInt(arg);
+ if (signalnum == -1 && PyErr_Occurred()) {
+ goto exit;
+ }
+ return_value = signal_raise_signal_impl(module, signalnum);
+
+exit:
+ return return_value;
+}
+
PyDoc_STRVAR(signal_signal__doc__,
"signal($module, signalnum, handler, /)\n"
"--\n"
#ifndef SIGNAL_PTHREAD_KILL_METHODDEF
#define SIGNAL_PTHREAD_KILL_METHODDEF
#endif /* !defined(SIGNAL_PTHREAD_KILL_METHODDEF) */
-/*[clinic end generated code: output=4ed8c36860f9f577 input=a9049054013a1b77]*/
+/*[clinic end generated code: output=365db4e807c26d4e input=a9049054013a1b77]*/
#endif
+/*[clinic input]
+signal.raise_signal
+
+ signalnum: int
+ /
+
+Send a signal to the executing process.
+[clinic start generated code]*/
+
+static PyObject *
+signal_raise_signal_impl(PyObject *module, int signalnum)
+/*[clinic end generated code: output=e2b014220aa6111d input=e90c0f9a42358de6]*/
+{
+ int err;
+ Py_BEGIN_ALLOW_THREADS
+ _Py_BEGIN_SUPPRESS_IPH
+ err = raise(signalnum);
+ _Py_END_SUPPRESS_IPH
+ Py_END_ALLOW_THREADS
+
+ if (err) {
+ return PyErr_SetFromErrno(PyExc_OSError);
+ }
+ Py_RETURN_NONE;
+}
/*[clinic input]
signal.signal
SIGNAL_SETITIMER_METHODDEF
SIGNAL_GETITIMER_METHODDEF
SIGNAL_SIGNAL_METHODDEF
+ SIGNAL_RAISE_SIGNAL_METHODDEF
SIGNAL_STRSIGNAL_METHODDEF
SIGNAL_GETSIGNAL_METHODDEF
{"set_wakeup_fd", (PyCFunction)(void(*)(void))signal_set_wakeup_fd, METH_VARARGS | METH_KEYWORDS, set_wakeup_fd_doc},