lock.release()
self.assertFalse(lock.locked())
+ def test_locked_with_2threads(self):
+ # see gh-134323: check that a rlock which
+ # is acquired in a different thread,
+ # is still locked in the main thread.
+ result = []
+ rlock = self.locktype()
+ self.assertFalse(rlock.locked())
+ def acquire():
+ result.append(rlock.locked())
+ rlock.acquire()
+ result.append(rlock.locked())
+
+ with Bunch(acquire, 1):
+ pass
+ self.assertTrue(rlock.locked())
+ self.assertFalse(result[0])
+ self.assertTrue(result[1])
+
def test_release_save_unacquired(self):
# Cannot _release_save an unacquired lock
lock = self.locktype()
except KeyError:
pass
return "<%s %s.%s object owner=%r count=%d at %s>" % (
- "locked" if self._block.locked() else "unlocked",
+ "locked" if self.locked() else "unlocked",
self.__class__.__module__,
self.__class__.__qualname__,
owner,
def locked(self):
"""Return whether this object is locked."""
- return self._count > 0
+ return self._block.locked()
# Internal methods used by condition variables
--- /dev/null
+Fix the :meth:`threading.RLock.locked` method.\r
return 0;
}
+static int
+rlock_locked_impl(rlockobject *self)
+{
+ return PyMutex_IsLocked(&self->lock.mutex);
+}
static void
rlock_dealloc(PyObject *self)
rlock_locked(PyObject *op, PyObject *Py_UNUSED(ignored))
{
rlockobject *self = rlockobject_CAST(op);
- int is_locked = _PyRecursiveMutex_IsLockedByCurrentThread(&self->lock);
+ int is_locked = rlock_locked_impl(self);
return PyBool_FromLong(is_locked);
}
{
rlockobject *self = rlockobject_CAST(op);
PyThread_ident_t owner = self->lock.thread;
+ int locked = rlock_locked_impl(self);
size_t count = self->lock.level + 1;
return PyUnicode_FromFormat(
"<%s %s object owner=%" PY_FORMAT_THREAD_IDENT_T " count=%zu at %p>",
- owner ? "locked" : "unlocked",
+ locked ? "locked" : "unlocked",
Py_TYPE(self)->tp_name, owner,
count, self);
}