from unittest import mock
from asyncio import tasks
from test.test_asyncio import utils as test_utils
+import test.support
+from test.support.script_helper import assert_python_ok
MOCK_ANY = mock.ANY
class CEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests, test_utils.TestCase):
Task = getattr(tasks, '_CTask', None)
+ def test_issue105987(self):
+ code = """if 1:
+ from _asyncio import _swap_current_task
+
+ class DummyTask:
+ pass
+
+ class DummyLoop:
+ pass
+
+ l = DummyLoop()
+ _swap_current_task(l, DummyTask())
+ t = _swap_current_task(l, None)
+ """
+
+ _, out, err = assert_python_ok("-c", code)
+ self.assertFalse(err)
class AsyncTaskCounter:
def __init__(self, loop, *, task_class, eager):
}
prev_task = Py_None;
}
+ Py_INCREF(prev_task);
if (task == Py_None) {
if (_PyDict_DelItem_KnownHash(state->current_tasks, loop, hash) == -1) {
- return NULL;
+ goto error;
}
} else {
if (_PyDict_SetItem_KnownHash(state->current_tasks, loop, task, hash) == -1) {
- return NULL;
+ goto error;
}
}
- Py_INCREF(prev_task);
-
return prev_task;
+
+error:
+ Py_DECREF(prev_task);
+ return NULL;
}
/* ----- Task */