import heapq
from enum import Enum
-from threading import Thread, Barrier, Lock
+from threading import Barrier, Lock
from random import shuffle, randint
from test.support import threading_helper
+from test.support.threading_helper import run_concurrently
from test import test_heapq
heap = list(range(OBJECT_COUNT))
shuffle(heap)
- self.run_concurrently(
- worker_func=heapq.heapify, args=(heap,), nthreads=NTHREADS
+ run_concurrently(
+ worker_func=heapq.heapify, nthreads=NTHREADS, args=(heap,)
)
self.test_heapq.check_invariant(heap)
for item in reversed(range(OBJECT_COUNT)):
heapq.heappush(heap, item)
- self.run_concurrently(
- worker_func=heappush_func, args=(heap,), nthreads=NTHREADS
+ run_concurrently(
+ worker_func=heappush_func, nthreads=NTHREADS, args=(heap,)
)
self.test_heapq.check_invariant(heap)
# Each local list should be sorted
self.assertTrue(self.is_sorted_ascending(local_list))
- self.run_concurrently(
+ run_concurrently(
worker_func=heappop_func,
- args=(heap, per_thread_pop_count),
nthreads=NTHREADS,
+ args=(heap, per_thread_pop_count),
)
self.assertEqual(len(heap), 0)
popped_item = heapq.heappushpop(heap, item)
self.assertTrue(popped_item <= item)
- self.run_concurrently(
+ run_concurrently(
worker_func=heappushpop_func,
- args=(heap, pushpop_items),
nthreads=NTHREADS,
+ args=(heap, pushpop_items),
)
self.assertEqual(len(heap), OBJECT_COUNT)
self.test_heapq.check_invariant(heap)
for item in replace_items:
heapq.heapreplace(heap, item)
- self.run_concurrently(
+ run_concurrently(
worker_func=heapreplace_func,
- args=(heap, replace_items),
nthreads=NTHREADS,
+ args=(heap, replace_items),
)
self.assertEqual(len(heap), OBJECT_COUNT)
self.test_heapq.check_invariant(heap)
max_heap = list(range(OBJECT_COUNT))
shuffle(max_heap)
- self.run_concurrently(
- worker_func=heapq.heapify_max, args=(max_heap,), nthreads=NTHREADS
+ run_concurrently(
+ worker_func=heapq.heapify_max, nthreads=NTHREADS, args=(max_heap,)
)
self.test_heapq.check_max_invariant(max_heap)
for item in range(OBJECT_COUNT):
heapq.heappush_max(max_heap, item)
- self.run_concurrently(
- worker_func=heappush_max_func, args=(max_heap,), nthreads=NTHREADS
+ run_concurrently(
+ worker_func=heappush_max_func, nthreads=NTHREADS, args=(max_heap,)
)
self.test_heapq.check_max_invariant(max_heap)
# Each local list should be sorted
self.assertTrue(self.is_sorted_descending(local_list))
- self.run_concurrently(
+ run_concurrently(
worker_func=heappop_max_func,
- args=(max_heap, per_thread_pop_count),
nthreads=NTHREADS,
+ args=(max_heap, per_thread_pop_count),
)
self.assertEqual(len(max_heap), 0)
popped_item = heapq.heappushpop_max(max_heap, item)
self.assertTrue(popped_item >= item)
- self.run_concurrently(
+ run_concurrently(
worker_func=heappushpop_max_func,
- args=(max_heap, pushpop_items),
nthreads=NTHREADS,
+ args=(max_heap, pushpop_items),
)
self.assertEqual(len(max_heap), OBJECT_COUNT)
self.test_heapq.check_max_invariant(max_heap)
for item in replace_items:
heapq.heapreplace_max(max_heap, item)
- self.run_concurrently(
+ run_concurrently(
worker_func=heapreplace_max_func,
- args=(max_heap, replace_items),
nthreads=NTHREADS,
+ args=(max_heap, replace_items),
)
self.assertEqual(len(max_heap), OBJECT_COUNT)
self.test_heapq.check_max_invariant(max_heap)
except IndexError:
pass
- self.run_concurrently(worker, (), n_threads * 2)
+ run_concurrently(worker, n_threads * 2)
@staticmethod
def is_sorted_ascending(lst):
"""
return [randint(-a, b) for _ in range(size)]
- def run_concurrently(self, worker_func, args, nthreads):
- """
- Run the worker function concurrently in multiple threads.
- """
- barrier = Barrier(nthreads)
-
- def wrapper_func(*args):
- # Wait for all threads to reach this point before proceeding.
- barrier.wait()
- worker_func(*args)
-
- with threading_helper.catch_threading_exception() as cm:
- workers = (
- Thread(target=wrapper_func, args=args) for _ in range(nthreads)
- )
- with threading_helper.start_threads(workers):
- pass
-
- # Worker threads should not raise any exceptions
- self.assertIsNone(cm.exc_value)
-
if __name__ == "__main__":
unittest.main()
static struct PyModuleDef grpmodule;
+/* Mutex to protect calls to getgrgid(), getgrnam(), and getgrent().
+ * These functions return pointer to static data structure, which
+ * may be overwritten by any subsequent calls. */
+static PyMutex group_db_mutex = {0};
+
#define DEFAULT_BUFFER_SIZE 1024
static PyObject *
Py_END_ALLOW_THREADS
#else
+ PyMutex_Lock(&group_db_mutex);
+ // The getgrgid() function need not be thread-safe.
+ // https://pubs.opengroup.org/onlinepubs/9699919799/functions/getgrgid.html
p = getgrgid(gid);
#endif
if (p == NULL) {
+#ifndef HAVE_GETGRGID_R
+ PyMutex_Unlock(&group_db_mutex);
+#endif
PyMem_RawFree(buf);
if (nomem == 1) {
return PyErr_NoMemory();
retval = mkgrent(module, p);
#ifdef HAVE_GETGRGID_R
PyMem_RawFree(buf);
+#else
+ PyMutex_Unlock(&group_db_mutex);
#endif
return retval;
}
Py_END_ALLOW_THREADS
#else
+ PyMutex_Lock(&group_db_mutex);
+ // The getgrnam() function need not be thread-safe.
+ // https://pubs.opengroup.org/onlinepubs/9699919799/functions/getgrnam.html
p = getgrnam(name_chars);
#endif
if (p == NULL) {
+#ifndef HAVE_GETGRNAM_R
+ PyMutex_Unlock(&group_db_mutex);
+#endif
if (nomem == 1) {
PyErr_NoMemory();
}
goto out;
}
retval = mkgrent(module, p);
+#ifndef HAVE_GETGRNAM_R
+ PyMutex_Unlock(&group_db_mutex);
+#endif
out:
PyMem_RawFree(buf);
Py_DECREF(bytes);
return NULL;
}
- static PyMutex getgrall_mutex = {0};
- PyMutex_Lock(&getgrall_mutex);
+ PyMutex_Lock(&group_db_mutex);
setgrent();
struct group *p;
done:
endgrent();
- PyMutex_Unlock(&getgrall_mutex);
+ PyMutex_Unlock(&group_db_mutex);
return d;
}