variables.
git-svn-id: svn://svn.valgrind.org/valgrind/trunk@102
static
+__attribute__((noreturn))
void barf ( char* str )
{
char buf[100];
strcat(buf, "\n\n");
write(2, buf, strlen(buf));
myexit(1);
+ /* We have to persuade gcc into believing this doesn't return. */
+ while (1) { };
}
}
+void pthread_exit(void *retval)
+{
+ int res;
+ ensure_valgrind("pthread_exit");
+ VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+ VG_USERREQ__PTHREAD_EXIT,
+ retval, 0, 0, 0);
+ /* Doesn't return! */
+ /* However, we have to fool gcc into knowing that. */
+ barf("pthread_exit: still alive after request?!");
+}
+
static int thread_specific_errno[VG_N_THREADS];
return 0;
}
+int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
+{
+ int res;
+ ensure_valgrind("pthread_cond_wait");
+ VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+ VG_USERREQ__PTHREAD_COND_WAIT,
+ cond, mutex, 0, 0);
+ return res;
+}
+
+int pthread_cond_signal(pthread_cond_t *cond)
+{
+ int res;
+ ensure_valgrind("pthread_cond_signal");
+ VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+ VG_USERREQ__PTHREAD_COND_SIGNAL,
+ cond, 0, 0, 0);
+ return res;
+}
+
+int pthread_cond_broadcast(pthread_cond_t *cond)
+{
+ int res;
+ ensure_valgrind("pthread_cond_broadcast");
+ VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+ VG_USERREQ__PTHREAD_COND_BROADCAST,
+ cond, 0, 0, 0);
+ return res;
+}
+
/* ---------------------------------------------------
CANCELLATION
scheduler algorithms is surely O(N^2) in the number of threads,
since that's simple, at least. And (in practice) we hope that most
programs do not need many threads. */
-#define VG_N_THREADS 10
+#define VG_N_THREADS 20
/* Number of file descriptors that can simultaneously be waited on for
I/O to complete. Perhaps this should be the same as VG_N_THREADS
#define VG_USERREQ__PTHREAD_CREATE 0x3001
#define VG_USERREQ__PTHREAD_JOIN 0x3002
#define VG_USERREQ__PTHREAD_GET_THREADID 0x3003
-#define VG_USERREQ__PTHREAD_MUTEX_LOCK 0x3005
-#define VG_USERREQ__PTHREAD_MUTEX_UNLOCK 0x3006
-#define VG_USERREQ__PTHREAD_CANCEL 0x3008
+#define VG_USERREQ__PTHREAD_MUTEX_LOCK 0x3004
+#define VG_USERREQ__PTHREAD_MUTEX_UNLOCK 0x3005
+#define VG_USERREQ__PTHREAD_CANCEL 0x3006
+#define VG_USERREQ__PTHREAD_EXIT 0x3007
+#define VG_USERREQ__PTHREAD_COND_WAIT 0x3008
+#define VG_USERREQ__PTHREAD_COND_SIGNAL 0x3009
+#define VG_USERREQ__PTHREAD_COND_BROADCAST 0x300A
/* Cosmetic ... */
#define VG_USERREQ__GET_PTHREAD_TRACE_LEVEL 0x3101
VgTs_WaitJoinee, /* waiting for the thread I did join on */
VgTs_WaitFD, /* waiting for I/O completion on a fd */
VgTs_WaitMX, /* waiting on a mutex */
+ VgTs_WaitCV, /* waiting on a condition variable */
VgTs_Sleeping /* sleeping for a while */
}
ThreadStatus;
VG_INVALID_THREADID if no one asked to join yet. */
ThreadId joiner;
- /* When .status == WaitMX, points to the mutex I am waiting
- for. */
- void* /* pthread_mutex_t* */ waited_on_mx;
+ /* When .status == WaitMX, points to the mutex I am waiting for.
+ When .status == WaitCV, points to the mutex associated with
+ the condition variable indicated by the .associated_cv field.
+ In all other cases, should be NULL. */
+ void* /* pthread_mutex_t* */ associated_mx;
+
+ /* When .status == WaitCV, points to the condition variable I am
+ waiting for. In all other cases, should be NULL. */
+ void* /* pthread_cond_t* */ associated_cv;
/* If VgTs_Sleeping, this is when we should wake up. */
ULong awaken_at;
static
+__attribute__((noreturn))
void barf ( char* str )
{
char buf[100];
strcat(buf, "\n\n");
write(2, buf, strlen(buf));
myexit(1);
+ /* We have to persuade gcc into believing this doesn't return. */
+ while (1) { };
}
}
+void pthread_exit(void *retval)
+{
+ int res;
+ ensure_valgrind("pthread_exit");
+ VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+ VG_USERREQ__PTHREAD_EXIT,
+ retval, 0, 0, 0);
+ /* Doesn't return! */
+ /* However, we have to fool gcc into knowing that. */
+ barf("pthread_exit: still alive after request?!");
+}
+
static int thread_specific_errno[VG_N_THREADS];
return 0;
}
+int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
+{
+ int res;
+ ensure_valgrind("pthread_cond_wait");
+ VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+ VG_USERREQ__PTHREAD_COND_WAIT,
+ cond, mutex, 0, 0);
+ return res;
+}
+
+int pthread_cond_signal(pthread_cond_t *cond)
+{
+ int res;
+ ensure_valgrind("pthread_cond_signal");
+ VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+ VG_USERREQ__PTHREAD_COND_SIGNAL,
+ cond, 0, 0, 0);
+ return res;
+}
+
+int pthread_cond_broadcast(pthread_cond_t *cond)
+{
+ int res;
+ ensure_valgrind("pthread_cond_broadcast");
+ VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+ VG_USERREQ__PTHREAD_COND_BROADCAST,
+ cond, 0, 0, 0);
+ return res;
+}
+
/* ---------------------------------------------------
CANCELLATION
case VgTs_WaitJoinee: VG_(printf)("WaitJoinee"); break;
case VgTs_Sleeping: VG_(printf)("Sleeping"); break;
case VgTs_WaitMX: VG_(printf)("WaitMX"); break;
+ case VgTs_WaitCV: VG_(printf)("WaitCV"); break;
default: VG_(printf)("???"); break;
}
- VG_(printf)(", waited_on_mx = %p\n", vg_threads[i].waited_on_mx );
+ VG_(printf)(", associated_mx = %p, associated_cv = %p\n",
+ vg_threads[i].associated_mx,
+ vg_threads[i].associated_cv );
VG_(pp_ExeContext)(
VG_(get_ExeContext)( False, vg_threads[i].m_eip,
vg_threads[i].m_ebp ));
tid_main = vg_alloc_ThreadState();
vg_assert(tid_main == 1);
- vg_threads[tid_main].status = VgTs_Runnable;
- vg_threads[tid_main].joiner = VG_INVALID_THREADID;
- vg_threads[tid_main].waited_on_mx = NULL;
- vg_threads[tid_main].retval = NULL; /* not important */
+ vg_threads[tid_main].status = VgTs_Runnable;
+ vg_threads[tid_main].joiner = VG_INVALID_THREADID;
+ vg_threads[tid_main].associated_mx = NULL;
+ vg_threads[tid_main].associated_cv = NULL;
+ vg_threads[tid_main].retval = NULL; /* not important */
vg_threads[tid_main].stack_highest_word
= vg_threads[tid_main].m_esp /* -4 ??? */;
aim is not to do too many of Phase 1 since it is expensive. */
if (0)
- VG_(printf)("SCHED: tid %d, used %d\n", tid, VG_N_THREADS);
+ VG_(printf)("SCHED: tid %d\n", tid);
/* Figure out how many bbs to ask vg_run_innerloop to do. Note
that it decrements the counter before testing it for zero, so
}
+static
+void do_pthread_exit ( ThreadId tid, void* retval )
+{
+ Char msg_buf[100];
+ /* We want make is appear that this thread has returned to
+ do_pthread_create_bogusRA with retval as the
+ return value. So: simple: put retval into %EAX
+ and &do_pthread_create_bogusRA into %EIP and keep going! */
+ if (VG_(clo_trace_sched)) {
+ VG_(sprintf)(msg_buf, "exiting with %p", retval);
+ print_sched_event(tid, msg_buf);
+ }
+ vg_threads[tid].m_eax = (UInt)retval;
+ vg_threads[tid].m_eip = (UInt)&VG_(pthreadreturn_bogusRA);
+ vg_threads[tid].status = VgTs_Runnable;
+}
+
/* Thread tid is exiting, by returning from the function it was
created with. Or possibly due to pthread_exit or cancellation.
// ***** CHECK *thread is writable
*thread = (pthread_t)tid;
- vg_threads[tid].waited_on_mx = NULL;
- vg_threads[tid].joiner = VG_INVALID_THREADID;
- vg_threads[tid].status = VgTs_Runnable;
+ vg_threads[tid].associated_mx = NULL;
+ vg_threads[tid].associated_cv = NULL;
+ vg_threads[tid].joiner = VG_INVALID_THREADID;
+ vg_threads[tid].status = VgTs_Runnable;
/* return zero */
vg_threads[tid].m_edx = 0; /* success */
deals with that for us.
*/
+/* Helper fns ... */
+static
+void release_one_thread_waiting_on_mutex ( pthread_mutex_t* mutex,
+ Char* caller )
+{
+ Int i;
+ Char msg_buf[100];
+
+ /* Find some arbitrary thread waiting on this mutex, and make it
+ runnable. If none are waiting, mark the mutex as not held. */
+ for (i = 1; i < VG_N_THREADS; i++) {
+ if (vg_threads[i].status == VgTs_Empty)
+ continue;
+ if (vg_threads[i].status == VgTs_WaitMX
+ && vg_threads[i].associated_mx == mutex)
+ break;
+ }
+
+ vg_assert(i <= VG_N_THREADS);
+ if (i == VG_N_THREADS) {
+ /* Nobody else is waiting on it. */
+ mutex->__m_count = 0;
+ mutex->__m_owner = VG_INVALID_THREADID;
+ } else {
+ /* Notionally transfer the hold to thread i, whose
+ pthread_mutex_lock() call now returns with 0 (success). */
+ /* The .count is already == 1. */
+ vg_assert(vg_threads[i].associated_mx == mutex);
+ mutex->__m_owner = (_pthread_descr)i;
+ vg_threads[i].status = VgTs_Runnable;
+ vg_threads[i].associated_mx = NULL;
+ vg_threads[i].m_edx = 0; /* pth_lock() success */
+
+ if (VG_(clo_trace_pthread_level) >= 1) {
+ VG_(sprintf)(msg_buf, "%s mx %p: RESUME",
+ caller, mutex );
+ print_pthread_event(i, msg_buf);
+ }
+ }
+}
+
static
void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex )
Char msg_buf[100];
if (VG_(clo_trace_pthread_level) >= 2) {
- VG_(sprintf)(msg_buf, "pthread_mutex_lock %p", mutex );
+ VG_(sprintf)(msg_buf, "pthread_mutex_lock mx %p ...", mutex );
print_pthread_event(tid, msg_buf);
}
/* return 0 (success). */
mutex->__m_count++;
vg_threads[tid].m_edx = 0;
- VG_(printf)("!!!!!! tid %d, mutex %p -> locked %d\n",
+ VG_(printf)("!!!!!! tid %d, mx %p -> locked %d\n",
tid, mutex, mutex->__m_count);
return;
} else {
} else {
/* Someone else has it; we have to wait. Mark ourselves
thusly. */
- vg_threads[tid].status = VgTs_WaitMX;
- vg_threads[tid].waited_on_mx = mutex;
+ vg_threads[tid].status = VgTs_WaitMX;
+ vg_threads[tid].associated_mx = mutex;
/* No assignment to %EDX, since we're blocking. */
if (VG_(clo_trace_pthread_level) >= 1) {
- VG_(sprintf)(msg_buf, "pthread_mutex_lock %p: BLOCK",
+ VG_(sprintf)(msg_buf, "pthread_mutex_lock mx %p: BLOCK",
mutex );
print_pthread_event(tid, msg_buf);
}
/* We get it! [for the first time]. */
mutex->__m_count = 1;
mutex->__m_owner = (_pthread_descr)tid;
- vg_assert(vg_threads[tid].waited_on_mx == NULL);
+ vg_assert(vg_threads[tid].associated_mx == NULL);
/* return 0 (success). */
vg_threads[tid].m_edx = 0;
}
void do_pthread_mutex_unlock ( ThreadId tid,
pthread_mutex_t *mutex )
{
- Int i;
- Char msg_buf[100];
+ Char msg_buf[100];
if (VG_(clo_trace_pthread_level) >= 2) {
- VG_(sprintf)(msg_buf, "pthread_mutex_unlock %p", mutex );
+ VG_(sprintf)(msg_buf, "pthread_mutex_unlock mx %p ...", mutex );
print_pthread_event(tid, msg_buf);
}
vg_assert(mutex->__m_count == 1);
vg_assert((ThreadId)mutex->__m_owner == tid);
- /* Find some arbitrary thread waiting on this mutex, and make it
- runnable. If none are waiting, mark the mutex as not held. */
- for (i = 1; i < VG_N_THREADS; i++) {
- if (vg_threads[i].status == VgTs_Empty)
- continue;
- if (vg_threads[i].status == VgTs_WaitMX
- && vg_threads[i].waited_on_mx == mutex)
- break;
- }
-
- vg_assert(i <= VG_N_THREADS);
- if (i == VG_N_THREADS) {
- /* Nobody else is waiting on it. */
- mutex->__m_count = 0;
- mutex->__m_owner = VG_INVALID_THREADID;
- } else {
- /* Notionally transfer the hold to thread i, whose
- pthread_mutex_lock() call now returns with 0 (success). */
- /* The .count is already == 1. */
- vg_assert(vg_threads[i].waited_on_mx == mutex);
- mutex->__m_owner = (_pthread_descr)i;
- vg_threads[i].status = VgTs_Runnable;
- vg_threads[i].waited_on_mx = NULL;
- vg_threads[i].m_edx = 0; /* pth_lock() success */
-
- if (VG_(clo_trace_pthread_level) >= 1) {
- VG_(sprintf)(msg_buf, "pthread_mutex_lock %p: RESUME",
- mutex );
- print_pthread_event(i, msg_buf);
- }
- }
+ /* Release at max one thread waiting on this mutex. */
+ release_one_thread_waiting_on_mutex ( mutex, "pthread_mutex_lock" );
- /* In either case, our (tid's) pth_unlock() returns with 0
- (success). */
+ /* Our (tid's) pth_unlock() returns with 0 (success). */
vg_threads[tid].m_edx = 0; /* Success. */
}
#define PTHREAD_COND_INITIALIZER {__LOCK_INITIALIZER, 0}
- We'll just use the __c_waiting field to point to the head of the
- list of threads waiting on this condition. Note how the static
- initialiser has __c_waiting == 0 == VG_INVALID_THREADID.
+ We don't use any fields of pthread_cond_t for anything at all.
+ Only the identity of the CVs is important.
Linux pthreads supports no attributes on condition variables, so we
- don't need to think too hard there.
-*/
+ don't need to think too hard there. */
+static
+void release_N_threads_waiting_on_cond ( pthread_cond_t* cond,
+ Int n_to_release,
+ Char* caller )
+{
+ Int i;
+ Char msg_buf[100];
+ pthread_mutex_t* mx;
+
+ while (True) {
+ if (n_to_release == 0)
+ return;
+
+ /* Find a thread waiting on this CV. */
+ for (i = 1; i < VG_N_THREADS; i++) {
+ if (vg_threads[i].status == VgTs_Empty)
+ continue;
+ if (vg_threads[i].status == VgTs_WaitCV
+ && vg_threads[i].associated_cv == cond)
+ break;
+ }
+ vg_assert(i <= VG_N_THREADS);
+
+ if (i == VG_N_THREADS) {
+ /* Nobody else is waiting on it. */
+ return;
+ }
+
+ mx = vg_threads[i].associated_mx;
+ vg_assert(mx != NULL);
+
+ if (mx->__m_owner == VG_INVALID_THREADID) {
+ /* Currently unheld; hand it out to thread i. */
+ vg_assert(mx->__m_count == 0);
+ vg_threads[i].status = VgTs_Runnable;
+ vg_threads[i].associated_cv = NULL;
+ vg_threads[i].associated_mx = NULL;
+ mx->__m_owner = (_pthread_descr)i;
+ mx->__m_count = 1;
+ vg_threads[i].m_edx = 0; /* pthread_cond_wait returns success */
+
+ if (VG_(clo_trace_pthread_level) >= 1) {
+ VG_(sprintf)(msg_buf, "%s cv %p: RESUME with mx %p",
+ caller, cond, mx );
+ print_pthread_event(i, msg_buf);
+ }
+
+ } else {
+ /* Currently held. Make thread i be blocked on it. */
+ vg_threads[i].status = VgTs_WaitMX;
+ vg_threads[i].associated_cv = NULL;
+ vg_threads[i].associated_mx = mx;
+
+ if (VG_(clo_trace_pthread_level) >= 1) {
+ VG_(sprintf)(msg_buf, "%s cv %p: BLOCK for mx %p",
+ caller, cond, mx );
+ print_pthread_event(i, msg_buf);
+ }
+
+ }
+
+ n_to_release--;
+ }
+}
+
+
+static
+void do_pthread_cond_wait ( ThreadId tid,
+ pthread_cond_t *cond,
+ pthread_mutex_t *mutex )
+{
+ Char msg_buf[100];
+
+ /* pre: mutex should be a valid mutex and owned by tid. */
+ if (VG_(clo_trace_pthread_level) >= 2) {
+ VG_(sprintf)(msg_buf, "pthread_cond_wait cv %p, mx %p ...",
+ cond, mutex );
+ print_pthread_event(tid, msg_buf);
+ }
+
+ /* Paranoia ... */
+ vg_assert(is_valid_tid(tid)
+ && vg_threads[tid].status == VgTs_Runnable);
+
+ if (mutex == NULL || cond == NULL) {
+ vg_threads[tid].m_edx = EINVAL;
+ return;
+ }
+
+ /* More paranoia ... */
+ switch (mutex->__m_kind) {
+ case PTHREAD_MUTEX_TIMED_NP:
+ case PTHREAD_MUTEX_RECURSIVE_NP:
+ case PTHREAD_MUTEX_ERRORCHECK_NP:
+ case PTHREAD_MUTEX_ADAPTIVE_NP:
+ if (mutex->__m_count >= 0) break;
+ /* else fall thru */
+ default:
+ vg_threads[tid].m_edx = EINVAL;
+ return;
+ }
+
+ /* Barf if we don't currently hold the mutex. */
+ if (mutex->__m_count == 0 /* nobody holds it */
+ || (ThreadId)mutex->__m_owner != tid /* we don't hold it */) {
+ vg_threads[tid].m_edx = EINVAL;
+ return;
+ }
+
+ /* Queue ourselves on the condition. */
+ vg_threads[tid].status = VgTs_WaitCV;
+ vg_threads[tid].associated_cv = cond;
+ vg_threads[tid].associated_mx = mutex;
+
+ if (VG_(clo_trace_pthread_level) >= 1) {
+ VG_(sprintf)(msg_buf,
+ "pthread_cond_wait cv %p, mx %p: BLOCK",
+ cond, mutex );
+ print_pthread_event(tid, msg_buf);
+ }
+
+ /* Release the mutex. */
+ release_one_thread_waiting_on_mutex ( mutex, "pthread_cond_wait " );
+}
+
+
+static
+void do_pthread_cond_signal_or_broadcast ( ThreadId tid,
+ Bool broadcast,
+ pthread_cond_t *cond )
+{
+ Char msg_buf[100];
+ Char* caller
+ = broadcast ? "pthread_cond_broadcast"
+ : "pthread_cond_signal ";
+
+ if (VG_(clo_trace_pthread_level) >= 2) {
+ VG_(sprintf)(msg_buf, "%s cv %p ...",
+ caller, cond );
+ print_pthread_event(tid, msg_buf);
+ }
+
+ /* Paranoia ... */
+ vg_assert(is_valid_tid(tid)
+ && vg_threads[tid].status == VgTs_Runnable);
+
+ if (cond == NULL) {
+ vg_threads[tid].m_edx = EINVAL;
+ return;
+ }
+
+ release_N_threads_waiting_on_cond (
+ cond,
+ broadcast ? VG_N_THREADS : 1,
+ caller
+ );
+
+ vg_threads[tid].m_edx = 0; /* success */
+}
+
/* ---------------------------------------------------------------------
Handle non-trivial client requests.
do_pthread_cancel( tid, (pthread_t)(arg[1]) );
break;
+ case VG_USERREQ__PTHREAD_EXIT:
+ do_pthread_exit( tid, (void*)(arg[1]) );
+ break;
+
+ case VG_USERREQ__PTHREAD_COND_WAIT:
+ do_pthread_cond_wait( tid,
+ (pthread_cond_t *)(arg[1]),
+ (pthread_mutex_t *)(arg[2]) );
+ break;
+
+ case VG_USERREQ__PTHREAD_COND_SIGNAL:
+ do_pthread_cond_signal_or_broadcast(
+ tid,
+ False, /* signal, not broadcast */
+ (pthread_cond_t *)(arg[1]) );
+ break;
+
+ case VG_USERREQ__PTHREAD_COND_BROADCAST:
+ do_pthread_cond_signal_or_broadcast(
+ tid,
+ True, /* broadcast, not signal */
+ (pthread_cond_t *)(arg[1]) );
+ break;
+
case VG_USERREQ__MAKE_NOACCESS:
case VG_USERREQ__MAKE_WRITABLE:
case VG_USERREQ__MAKE_READABLE:
static
void scheduler_sanity ( void )
{
- pthread_mutex_t* mutex;
+ pthread_mutex_t* mx;
+ pthread_cond_t* cv;
Int i;
/* VG_(printf)("scheduler_sanity\n"); */
for (i = 1; i < VG_N_THREADS; i++) {
+ mx = vg_threads[i].associated_mx;
+ cv = vg_threads[i].associated_cv;
if (vg_threads[i].status == VgTs_WaitMX) {
- mutex = vg_threads[i].waited_on_mx;
- vg_assert(mutex != NULL);
- vg_assert(mutex->__m_count > 0);
- vg_assert(is_valid_tid((ThreadId)mutex->__m_owner));
+ vg_assert(cv == NULL);
+ vg_assert(mx != NULL);
+ vg_assert(mx->__m_count > 0);
+ vg_assert(is_valid_tid((ThreadId)mx->__m_owner));
+ vg_assert(i != (ThreadId)mx->__m_owner);
+ /* otherwise thread i would be deadlocked. */
+ } else
+ if (vg_threads[i].status == VgTs_WaitCV) {
+ vg_assert(cv != NULL);
+ vg_assert(mx != NULL);
} else {
- vg_assert(vg_threads[i].waited_on_mx == NULL);
+ vg_assert(cv == NULL);
+ vg_assert(mx == NULL);
}
}
}
--- /dev/null
+/********************************************************
+ * An example source module to accompany...
+ *
+ * "Using POSIX Threads: Programming with Pthreads"
+ * by Brad nichols, Dick Buttlar, Jackie Farrell
+ * O'Reilly & Associates, Inc.
+ *
+ ********************************************************
+ *
+ * cvsimple.c
+ *
+ * Demonstrates pthread cancellation.
+ *
+ */
+
+#include <stdio.h>
+#include <pthread.h>
+
+#define NUM_THREADS 3
+#define TCOUNT 10
+#define COUNT_THRES 12
+
+int count = 0;
+int thread_ids[3] = {0,1,2};
+pthread_mutex_t count_lock=PTHREAD_MUTEX_INITIALIZER;
+pthread_cond_t count_hit_threshold=PTHREAD_COND_INITIALIZER;
+
+void *inc_count(void *idp)
+{
+ int i=0, save_state, save_type;
+ int *my_id = idp;
+
+ for (i=0; i<TCOUNT; i++) {
+ pthread_mutex_lock(&count_lock);
+ count++;
+ printf("inc_counter(): thread %d, count = %d, unlocking mutex\n",
+ *my_id, count);
+ if (count == COUNT_THRES) {
+ printf("inc_count(): Thread %d, count %d\n", *my_id, count);
+ pthread_cond_signal(&count_hit_threshold);
+ }
+ pthread_mutex_unlock(&count_lock);
+ }
+
+ return(NULL);
+}
+
+void *watch_count(void *idp)
+{
+ int i=0, save_state, save_type;
+ int *my_id = idp;
+
+ printf("watch_count(): thread %d\n", *my_id);
+ fflush(stdout);
+ pthread_mutex_lock(&count_lock);
+
+ while (count < COUNT_THRES) {
+ pthread_cond_wait(&count_hit_threshold, &count_lock);
+ printf("watch_count(): thread %d, count %d\n", *my_id, count);
+ }
+
+ pthread_mutex_unlock(&count_lock);
+
+ return(NULL);
+}
+
+extern int
+main(void)
+{
+ int i;
+ pthread_t threads[3];
+
+ pthread_create(&threads[0], NULL, inc_count, (void *)&thread_ids[0]);
+ pthread_create(&threads[1], NULL, inc_count, (void *)&thread_ids[1]);
+ pthread_create(&threads[2], NULL, watch_count, (void *)&thread_ids[2]);
+
+ for (i = 0; i < NUM_THREADS; i++) {
+ pthread_join(threads[i], NULL);
+ }
+
+ return 0;
+}
+
+
--- /dev/null
+/********************************************************
+ * An example source module to accompany...
+ *
+ * "Using POSIX Threads: Programming with Pthreads"
+ * by Brad nichols, Dick Buttlar, Jackie Farrell
+ * O'Reilly & Associates, Inc.
+ *
+ ********************************************************
+ * tpool.c --
+ *
+ * Example thread pooling library
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <string.h>
+
+#include <pthread.h>
+
+
+/********************************************************
+ * An example source module to accompany...
+ *
+ * "Using POSIX Threads: Programming with Pthreads"
+ * by Brad nichols, Dick Buttlar, Jackie Farrell
+ * O'Reilly & Associates, Inc.
+ *
+ ********************************************************
+ * tpool.h --
+ *
+ * Structures for thread pool
+ */
+
+typedef struct tpool_work {
+ void (*routine)();
+ void *arg;
+ struct tpool_work *next;
+} tpool_work_t;
+
+typedef struct tpool {
+ /* pool characteristics */
+ int num_threads;
+ int max_queue_size;
+ int do_not_block_when_full;
+ /* pool state */
+ pthread_t *threads;
+ int cur_queue_size;
+ tpool_work_t *queue_head;
+ tpool_work_t *queue_tail;
+ int queue_closed;
+ int shutdown;
+ /* pool synchronization */
+ pthread_mutex_t queue_lock;
+ pthread_cond_t queue_not_empty;
+ pthread_cond_t queue_not_full;
+ pthread_cond_t queue_empty;
+} *tpool_t;
+
+void tpool_init(
+ tpool_t *tpoolp,
+ int num_threads,
+ int max_queue_size,
+ int do_not_block_when_full);
+
+int tpool_add_work(
+ tpool_t tpool,
+ void (*routine)(),
+ void *arg);
+
+int tpool_destroy(
+ tpool_t tpool,
+ int finish);
+
+
+/*-- end of tpool.h ----------------------------------*/
+
+
+void *tpool_thread(void *);
+
+void tpool_init(tpool_t *tpoolp,
+ int num_worker_threads,
+ int max_queue_size,
+ int do_not_block_when_full)
+{
+ int i, rtn;
+ tpool_t tpool;
+
+ /* allocate a pool data structure */
+ if ((tpool = (tpool_t )malloc(sizeof(struct tpool))) == NULL)
+ perror("malloc"), exit(1);
+
+ /* initialize th fields */
+ tpool->num_threads = num_worker_threads;
+ tpool->max_queue_size = max_queue_size;
+ tpool->do_not_block_when_full = do_not_block_when_full;
+ if ((tpool->threads =
+ (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads))
+ == NULL)
+ perror("malloc"), exit(1);
+ tpool->cur_queue_size = 0;
+ tpool->queue_head = NULL;
+ tpool->queue_tail = NULL;
+ tpool->queue_closed = 0;
+ tpool->shutdown = 0;
+ if ((rtn = pthread_mutex_init(&(tpool->queue_lock), NULL)) != 0)
+ fprintf(stderr,"pthread_mutex_init %s\n",strerror(rtn)), exit(1);
+ if ((rtn = pthread_cond_init(&(tpool->queue_not_empty), NULL)) != 0)
+ fprintf(stderr,"pthread_cond_init %s\n",strerror(rtn)), exit(1);
+ if ((rtn = pthread_cond_init(&(tpool->queue_not_full), NULL)) != 0)
+ fprintf(stderr,"pthread_cond_init %s\n",strerror(rtn)), exit(1);
+ if ((rtn = pthread_cond_init(&(tpool->queue_empty), NULL)) != 0)
+ fprintf(stderr,"pthread_cond_init %s\n",strerror(rtn)), exit(1);
+
+ /* create threads */
+ for (i = 0; i != num_worker_threads; i++) {
+ if ((rtn = pthread_create( &(tpool->threads[i]),
+ NULL,
+ tpool_thread,
+ (void *)tpool)) != 0)
+ fprintf(stderr,"pthread_create %d\n",rtn), exit(1);
+ }
+
+ *tpoolp = tpool;
+}
+
+int tpool_add_work(
+ tpool_t tpool,
+ void (*routine)(),
+ void *arg)
+{
+ int rtn;
+ tpool_work_t *workp;
+
+ if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
+ fprintf(stderr,"pthread_mutex_lock %d\n",rtn), exit(1);
+
+ /* no space and this caller doesn't want to wait */
+ if ((tpool->cur_queue_size == tpool->max_queue_size) &&
+ tpool->do_not_block_when_full) {
+ if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
+ fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
+
+ return -1;
+ }
+
+ while( (tpool->cur_queue_size == tpool->max_queue_size) &&
+ (!(tpool->shutdown || tpool->queue_closed)) ) {
+
+ if ((rtn = pthread_cond_wait(&(tpool->queue_not_full),
+ &(tpool->queue_lock))) != 0)
+ fprintf(stderr,"pthread_cond_waitA %d\n",rtn), exit(1);
+
+ }
+
+ /* the pool is in the process of being destroyed */
+ if (tpool->shutdown || tpool->queue_closed) {
+ if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
+ fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
+
+ return -1;
+ }
+
+
+ /* allocate work structure */
+ if ((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL)
+ perror("malloc"), exit(1);
+ workp->routine = routine;
+ workp->arg = arg;
+ workp->next = NULL;
+
+ printf("adder: adding an item %d\n", workp->routine);
+
+ if (tpool->cur_queue_size == 0) {
+ tpool->queue_tail = tpool->queue_head = workp;
+
+ printf("adder: queue == 0, waking all workers\n");
+
+ if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_empty))) != 0)
+ fprintf(stderr,"pthread_cond_signal %d\n",rtn), exit(1);;
+ } else {
+ tpool->queue_tail->next = workp;
+ tpool->queue_tail = workp;
+ }
+
+ tpool->cur_queue_size++;
+ if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
+ fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
+ return 1;
+}
+
+int tpool_destroy(tpool_t tpool,
+ int finish)
+{
+ int i,rtn;
+ tpool_work_t *cur_nodep;
+
+
+ if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
+ fprintf(stderr,"pthread_mutex_lock %d\n",rtn), exit(1);
+
+ /* Is a shutdown already in progress? */
+ if (tpool->queue_closed || tpool->shutdown) {
+ if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
+ fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
+ return 0;
+ }
+
+ tpool->queue_closed = 1;
+
+ /* If the finish flag is set, wait for workers to
+ drain queue */
+ if (finish == 1) {
+ while (tpool->cur_queue_size != 0) {
+ if ((rtn = pthread_cond_wait(&(tpool->queue_empty),
+ &(tpool->queue_lock))) != 0)
+ fprintf(stderr,"pthread_cond_waitB %d\n",rtn), exit(1);
+ }
+ }
+
+ tpool->shutdown = 1;
+
+ if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
+ fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
+
+
+ /* Wake up any workers so they recheck shutdown flag */
+ if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_empty))) != 0)
+ fprintf(stderr,"pthread_cond_broadcast %d\n",rtn), exit(1);
+ if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_full))) != 0)
+ fprintf(stderr,"pthread_cond_broadcast %d\n",rtn), exit(1);
+
+
+ /* Wait for workers to exit */
+ for(i=0; i < tpool->num_threads; i++) {
+ if ((rtn = pthread_join(tpool->threads[i],NULL)) != 0)
+ fprintf(stderr,"pthread_join %d\n",rtn), exit(1);
+ }
+
+ /* Now free pool structures */
+ free(tpool->threads);
+ while(tpool->queue_head != NULL) {
+ cur_nodep = tpool->queue_head->next;
+ tpool->queue_head = tpool->queue_head->next;
+ free(cur_nodep);
+ }
+ free(tpool);
+}
+
+void *tpool_thread(void *arg)
+{
+ tpool_t tpool = (tpool_t)arg;
+ int rtn;
+ tpool_work_t *my_workp;
+
+ for(;;) {
+
+
+
+ /* Check queue for work */
+ if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
+ fprintf(stderr,"pthread_mutex_lock %d\n",rtn), exit(1);
+
+ while ((tpool->cur_queue_size == 0) && (!tpool->shutdown)) {
+
+
+ printf("worker %d: I'm sleeping again\n", pthread_self());
+
+ if ((rtn = pthread_cond_wait(&(tpool->queue_not_empty),
+ &(tpool->queue_lock))) != 0)
+ fprintf(stderr,"pthread_cond_waitC %d\n",rtn), exit(1);
+
+ }
+ sleep(1);
+
+ printf("worker %d: I'm awake\n", pthread_self());
+
+ /* Has a shutdown started while i was sleeping? */
+ if (tpool->shutdown == 1) {
+
+ if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
+ fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
+ pthread_exit(NULL);
+ }
+
+
+ /* Get to work, dequeue the next item */
+ my_workp = tpool->queue_head;
+ tpool->cur_queue_size--;
+ if (tpool->cur_queue_size == 0)
+ tpool->queue_head = tpool->queue_tail = NULL;
+ else
+ tpool->queue_head = my_workp->next;
+
+ printf("worker %d: dequeing item %d\n", pthread_self(), my_workp->next);
+
+ /* Handle waiting add_work threads */
+ if ((!tpool->do_not_block_when_full) &&
+ (tpool->cur_queue_size == (tpool->max_queue_size - 1)))
+
+ if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_full))) != 0)
+ fprintf(stderr,"pthread_cond_broadcast %d\n",rtn), exit(1);
+
+ /* Handle waiting destroyer threads */
+ if (tpool->cur_queue_size == 0)
+
+ if ((rtn = pthread_cond_signal(&(tpool->queue_empty))) != 0)
+ fprintf(stderr,"pthread_cond_signal %d\n",rtn), exit(1);
+
+ if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
+ fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
+
+ /* Do this work item */
+ (*(my_workp->routine))(my_workp->arg);
+ free(my_workp);
+ }
+ return(NULL);
+}
+
+
+/********************************************************
+ * An example source module to accompany...
+ *
+ * "Using POSIX Threads: Programming with Pthreads"
+ * by Brad nichols, Dick Buttlar, Jackie Farrell
+ * O'Reilly & Associates, Inc.
+ *
+ ********************************************************
+ * tpool.c --
+ *
+ * Example caller for thread pooling library
+ */
+
+char *s1[20]={ "STRING 0",
+ "STRING 1",
+ "STRING 2",
+ "STRING 3",
+ "STRING 4",
+ "STRING 5",
+ "STRING 6",
+ "STRING 7",
+ "STRING 8",
+ "STRING 9",
+ "STRING 10",
+ "STRING 11",
+ "STRING 12",
+ "STRING 13",
+ "STRING 14",
+ "STRING 15",
+ "STRING 16",
+ "STRING 17",
+ "STRING 18",
+ "STRING 19"};
+
+void r1(char * printstring)
+{
+ int i, x;
+
+ printf("%s START\n", printstring);
+
+ for (i = 0; i < 1000000; i++) {
+ x = x +i;
+ }
+
+ printf("%s DONE\n", printstring);
+}
+
+extern int
+main(void)
+{
+ extern char *s1[];
+
+ pthread_t t1,t2;
+ int i;
+
+ tpool_t test_pool;
+
+ tpool_init(&test_pool, 10, 20, 0);
+
+ sleep(1);
+
+ for ( i = 0; i < 5; i++) {
+ printf("tpool_add_work returned %d\n",
+ tpool_add_work(test_pool, r1, s1[i]));
+
+ }
+
+ printf("main: all work queued\n");
+
+ tpool_destroy(test_pool, 1);
+
+ return 0;
+
+}
scheduler algorithms is surely O(N^2) in the number of threads,
since that's simple, at least. And (in practice) we hope that most
programs do not need many threads. */
-#define VG_N_THREADS 10
+#define VG_N_THREADS 20
/* Number of file descriptors that can simultaneously be waited on for
I/O to complete. Perhaps this should be the same as VG_N_THREADS
#define VG_USERREQ__PTHREAD_CREATE 0x3001
#define VG_USERREQ__PTHREAD_JOIN 0x3002
#define VG_USERREQ__PTHREAD_GET_THREADID 0x3003
-#define VG_USERREQ__PTHREAD_MUTEX_LOCK 0x3005
-#define VG_USERREQ__PTHREAD_MUTEX_UNLOCK 0x3006
-#define VG_USERREQ__PTHREAD_CANCEL 0x3008
+#define VG_USERREQ__PTHREAD_MUTEX_LOCK 0x3004
+#define VG_USERREQ__PTHREAD_MUTEX_UNLOCK 0x3005
+#define VG_USERREQ__PTHREAD_CANCEL 0x3006
+#define VG_USERREQ__PTHREAD_EXIT 0x3007
+#define VG_USERREQ__PTHREAD_COND_WAIT 0x3008
+#define VG_USERREQ__PTHREAD_COND_SIGNAL 0x3009
+#define VG_USERREQ__PTHREAD_COND_BROADCAST 0x300A
/* Cosmetic ... */
#define VG_USERREQ__GET_PTHREAD_TRACE_LEVEL 0x3101
VgTs_WaitJoinee, /* waiting for the thread I did join on */
VgTs_WaitFD, /* waiting for I/O completion on a fd */
VgTs_WaitMX, /* waiting on a mutex */
+ VgTs_WaitCV, /* waiting on a condition variable */
VgTs_Sleeping /* sleeping for a while */
}
ThreadStatus;
VG_INVALID_THREADID if no one asked to join yet. */
ThreadId joiner;
- /* When .status == WaitMX, points to the mutex I am waiting
- for. */
- void* /* pthread_mutex_t* */ waited_on_mx;
+ /* When .status == WaitMX, points to the mutex I am waiting for.
+ When .status == WaitCV, points to the mutex associated with
+ the condition variable indicated by the .associated_cv field.
+ In all other cases, should be NULL. */
+ void* /* pthread_mutex_t* */ associated_mx;
+
+ /* When .status == WaitCV, points to the condition variable I am
+ waiting for. In all other cases, should be NULL. */
+ void* /* pthread_cond_t* */ associated_cv;
/* If VgTs_Sleeping, this is when we should wake up. */
ULong awaken_at;
static
+__attribute__((noreturn))
void barf ( char* str )
{
char buf[100];
strcat(buf, "\n\n");
write(2, buf, strlen(buf));
myexit(1);
+ /* We have to persuade gcc into believing this doesn't return. */
+ while (1) { };
}
}
+void pthread_exit(void *retval)
+{
+ int res;
+ ensure_valgrind("pthread_exit");
+ VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+ VG_USERREQ__PTHREAD_EXIT,
+ retval, 0, 0, 0);
+ /* Doesn't return! */
+ /* However, we have to fool gcc into knowing that. */
+ barf("pthread_exit: still alive after request?!");
+}
+
static int thread_specific_errno[VG_N_THREADS];
return 0;
}
+int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
+{
+ int res;
+ ensure_valgrind("pthread_cond_wait");
+ VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+ VG_USERREQ__PTHREAD_COND_WAIT,
+ cond, mutex, 0, 0);
+ return res;
+}
+
+int pthread_cond_signal(pthread_cond_t *cond)
+{
+ int res;
+ ensure_valgrind("pthread_cond_signal");
+ VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+ VG_USERREQ__PTHREAD_COND_SIGNAL,
+ cond, 0, 0, 0);
+ return res;
+}
+
+int pthread_cond_broadcast(pthread_cond_t *cond)
+{
+ int res;
+ ensure_valgrind("pthread_cond_broadcast");
+ VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+ VG_USERREQ__PTHREAD_COND_BROADCAST,
+ cond, 0, 0, 0);
+ return res;
+}
+
/* ---------------------------------------------------
CANCELLATION
case VgTs_WaitJoinee: VG_(printf)("WaitJoinee"); break;
case VgTs_Sleeping: VG_(printf)("Sleeping"); break;
case VgTs_WaitMX: VG_(printf)("WaitMX"); break;
+ case VgTs_WaitCV: VG_(printf)("WaitCV"); break;
default: VG_(printf)("???"); break;
}
- VG_(printf)(", waited_on_mx = %p\n", vg_threads[i].waited_on_mx );
+ VG_(printf)(", associated_mx = %p, associated_cv = %p\n",
+ vg_threads[i].associated_mx,
+ vg_threads[i].associated_cv );
VG_(pp_ExeContext)(
VG_(get_ExeContext)( False, vg_threads[i].m_eip,
vg_threads[i].m_ebp ));
tid_main = vg_alloc_ThreadState();
vg_assert(tid_main == 1);
- vg_threads[tid_main].status = VgTs_Runnable;
- vg_threads[tid_main].joiner = VG_INVALID_THREADID;
- vg_threads[tid_main].waited_on_mx = NULL;
- vg_threads[tid_main].retval = NULL; /* not important */
+ vg_threads[tid_main].status = VgTs_Runnable;
+ vg_threads[tid_main].joiner = VG_INVALID_THREADID;
+ vg_threads[tid_main].associated_mx = NULL;
+ vg_threads[tid_main].associated_cv = NULL;
+ vg_threads[tid_main].retval = NULL; /* not important */
vg_threads[tid_main].stack_highest_word
= vg_threads[tid_main].m_esp /* -4 ??? */;
aim is not to do too many of Phase 1 since it is expensive. */
if (0)
- VG_(printf)("SCHED: tid %d, used %d\n", tid, VG_N_THREADS);
+ VG_(printf)("SCHED: tid %d\n", tid);
/* Figure out how many bbs to ask vg_run_innerloop to do. Note
that it decrements the counter before testing it for zero, so
}
+static
+void do_pthread_exit ( ThreadId tid, void* retval )
+{
+ Char msg_buf[100];
+ /* We want make is appear that this thread has returned to
+ do_pthread_create_bogusRA with retval as the
+ return value. So: simple: put retval into %EAX
+ and &do_pthread_create_bogusRA into %EIP and keep going! */
+ if (VG_(clo_trace_sched)) {
+ VG_(sprintf)(msg_buf, "exiting with %p", retval);
+ print_sched_event(tid, msg_buf);
+ }
+ vg_threads[tid].m_eax = (UInt)retval;
+ vg_threads[tid].m_eip = (UInt)&VG_(pthreadreturn_bogusRA);
+ vg_threads[tid].status = VgTs_Runnable;
+}
+
/* Thread tid is exiting, by returning from the function it was
created with. Or possibly due to pthread_exit or cancellation.
// ***** CHECK *thread is writable
*thread = (pthread_t)tid;
- vg_threads[tid].waited_on_mx = NULL;
- vg_threads[tid].joiner = VG_INVALID_THREADID;
- vg_threads[tid].status = VgTs_Runnable;
+ vg_threads[tid].associated_mx = NULL;
+ vg_threads[tid].associated_cv = NULL;
+ vg_threads[tid].joiner = VG_INVALID_THREADID;
+ vg_threads[tid].status = VgTs_Runnable;
/* return zero */
vg_threads[tid].m_edx = 0; /* success */
deals with that for us.
*/
+/* Helper fns ... */
+static
+void release_one_thread_waiting_on_mutex ( pthread_mutex_t* mutex,
+ Char* caller )
+{
+ Int i;
+ Char msg_buf[100];
+
+ /* Find some arbitrary thread waiting on this mutex, and make it
+ runnable. If none are waiting, mark the mutex as not held. */
+ for (i = 1; i < VG_N_THREADS; i++) {
+ if (vg_threads[i].status == VgTs_Empty)
+ continue;
+ if (vg_threads[i].status == VgTs_WaitMX
+ && vg_threads[i].associated_mx == mutex)
+ break;
+ }
+
+ vg_assert(i <= VG_N_THREADS);
+ if (i == VG_N_THREADS) {
+ /* Nobody else is waiting on it. */
+ mutex->__m_count = 0;
+ mutex->__m_owner = VG_INVALID_THREADID;
+ } else {
+ /* Notionally transfer the hold to thread i, whose
+ pthread_mutex_lock() call now returns with 0 (success). */
+ /* The .count is already == 1. */
+ vg_assert(vg_threads[i].associated_mx == mutex);
+ mutex->__m_owner = (_pthread_descr)i;
+ vg_threads[i].status = VgTs_Runnable;
+ vg_threads[i].associated_mx = NULL;
+ vg_threads[i].m_edx = 0; /* pth_lock() success */
+
+ if (VG_(clo_trace_pthread_level) >= 1) {
+ VG_(sprintf)(msg_buf, "%s mx %p: RESUME",
+ caller, mutex );
+ print_pthread_event(i, msg_buf);
+ }
+ }
+}
+
static
void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex )
Char msg_buf[100];
if (VG_(clo_trace_pthread_level) >= 2) {
- VG_(sprintf)(msg_buf, "pthread_mutex_lock %p", mutex );
+ VG_(sprintf)(msg_buf, "pthread_mutex_lock mx %p ...", mutex );
print_pthread_event(tid, msg_buf);
}
/* return 0 (success). */
mutex->__m_count++;
vg_threads[tid].m_edx = 0;
- VG_(printf)("!!!!!! tid %d, mutex %p -> locked %d\n",
+ VG_(printf)("!!!!!! tid %d, mx %p -> locked %d\n",
tid, mutex, mutex->__m_count);
return;
} else {
} else {
/* Someone else has it; we have to wait. Mark ourselves
thusly. */
- vg_threads[tid].status = VgTs_WaitMX;
- vg_threads[tid].waited_on_mx = mutex;
+ vg_threads[tid].status = VgTs_WaitMX;
+ vg_threads[tid].associated_mx = mutex;
/* No assignment to %EDX, since we're blocking. */
if (VG_(clo_trace_pthread_level) >= 1) {
- VG_(sprintf)(msg_buf, "pthread_mutex_lock %p: BLOCK",
+ VG_(sprintf)(msg_buf, "pthread_mutex_lock mx %p: BLOCK",
mutex );
print_pthread_event(tid, msg_buf);
}
/* We get it! [for the first time]. */
mutex->__m_count = 1;
mutex->__m_owner = (_pthread_descr)tid;
- vg_assert(vg_threads[tid].waited_on_mx == NULL);
+ vg_assert(vg_threads[tid].associated_mx == NULL);
/* return 0 (success). */
vg_threads[tid].m_edx = 0;
}
void do_pthread_mutex_unlock ( ThreadId tid,
pthread_mutex_t *mutex )
{
- Int i;
- Char msg_buf[100];
+ Char msg_buf[100];
if (VG_(clo_trace_pthread_level) >= 2) {
- VG_(sprintf)(msg_buf, "pthread_mutex_unlock %p", mutex );
+ VG_(sprintf)(msg_buf, "pthread_mutex_unlock mx %p ...", mutex );
print_pthread_event(tid, msg_buf);
}
vg_assert(mutex->__m_count == 1);
vg_assert((ThreadId)mutex->__m_owner == tid);
- /* Find some arbitrary thread waiting on this mutex, and make it
- runnable. If none are waiting, mark the mutex as not held. */
- for (i = 1; i < VG_N_THREADS; i++) {
- if (vg_threads[i].status == VgTs_Empty)
- continue;
- if (vg_threads[i].status == VgTs_WaitMX
- && vg_threads[i].waited_on_mx == mutex)
- break;
- }
-
- vg_assert(i <= VG_N_THREADS);
- if (i == VG_N_THREADS) {
- /* Nobody else is waiting on it. */
- mutex->__m_count = 0;
- mutex->__m_owner = VG_INVALID_THREADID;
- } else {
- /* Notionally transfer the hold to thread i, whose
- pthread_mutex_lock() call now returns with 0 (success). */
- /* The .count is already == 1. */
- vg_assert(vg_threads[i].waited_on_mx == mutex);
- mutex->__m_owner = (_pthread_descr)i;
- vg_threads[i].status = VgTs_Runnable;
- vg_threads[i].waited_on_mx = NULL;
- vg_threads[i].m_edx = 0; /* pth_lock() success */
-
- if (VG_(clo_trace_pthread_level) >= 1) {
- VG_(sprintf)(msg_buf, "pthread_mutex_lock %p: RESUME",
- mutex );
- print_pthread_event(i, msg_buf);
- }
- }
+ /* Release at max one thread waiting on this mutex. */
+ release_one_thread_waiting_on_mutex ( mutex, "pthread_mutex_lock" );
- /* In either case, our (tid's) pth_unlock() returns with 0
- (success). */
+ /* Our (tid's) pth_unlock() returns with 0 (success). */
vg_threads[tid].m_edx = 0; /* Success. */
}
#define PTHREAD_COND_INITIALIZER {__LOCK_INITIALIZER, 0}
- We'll just use the __c_waiting field to point to the head of the
- list of threads waiting on this condition. Note how the static
- initialiser has __c_waiting == 0 == VG_INVALID_THREADID.
+ We don't use any fields of pthread_cond_t for anything at all.
+ Only the identity of the CVs is important.
Linux pthreads supports no attributes on condition variables, so we
- don't need to think too hard there.
-*/
+ don't need to think too hard there. */
+static
+void release_N_threads_waiting_on_cond ( pthread_cond_t* cond,
+ Int n_to_release,
+ Char* caller )
+{
+ Int i;
+ Char msg_buf[100];
+ pthread_mutex_t* mx;
+
+ while (True) {
+ if (n_to_release == 0)
+ return;
+
+ /* Find a thread waiting on this CV. */
+ for (i = 1; i < VG_N_THREADS; i++) {
+ if (vg_threads[i].status == VgTs_Empty)
+ continue;
+ if (vg_threads[i].status == VgTs_WaitCV
+ && vg_threads[i].associated_cv == cond)
+ break;
+ }
+ vg_assert(i <= VG_N_THREADS);
+
+ if (i == VG_N_THREADS) {
+ /* Nobody else is waiting on it. */
+ return;
+ }
+
+ mx = vg_threads[i].associated_mx;
+ vg_assert(mx != NULL);
+
+ if (mx->__m_owner == VG_INVALID_THREADID) {
+ /* Currently unheld; hand it out to thread i. */
+ vg_assert(mx->__m_count == 0);
+ vg_threads[i].status = VgTs_Runnable;
+ vg_threads[i].associated_cv = NULL;
+ vg_threads[i].associated_mx = NULL;
+ mx->__m_owner = (_pthread_descr)i;
+ mx->__m_count = 1;
+ vg_threads[i].m_edx = 0; /* pthread_cond_wait returns success */
+
+ if (VG_(clo_trace_pthread_level) >= 1) {
+ VG_(sprintf)(msg_buf, "%s cv %p: RESUME with mx %p",
+ caller, cond, mx );
+ print_pthread_event(i, msg_buf);
+ }
+
+ } else {
+ /* Currently held. Make thread i be blocked on it. */
+ vg_threads[i].status = VgTs_WaitMX;
+ vg_threads[i].associated_cv = NULL;
+ vg_threads[i].associated_mx = mx;
+
+ if (VG_(clo_trace_pthread_level) >= 1) {
+ VG_(sprintf)(msg_buf, "%s cv %p: BLOCK for mx %p",
+ caller, cond, mx );
+ print_pthread_event(i, msg_buf);
+ }
+
+ }
+
+ n_to_release--;
+ }
+}
+
+
+static
+void do_pthread_cond_wait ( ThreadId tid,
+ pthread_cond_t *cond,
+ pthread_mutex_t *mutex )
+{
+ Char msg_buf[100];
+
+ /* pre: mutex should be a valid mutex and owned by tid. */
+ if (VG_(clo_trace_pthread_level) >= 2) {
+ VG_(sprintf)(msg_buf, "pthread_cond_wait cv %p, mx %p ...",
+ cond, mutex );
+ print_pthread_event(tid, msg_buf);
+ }
+
+ /* Paranoia ... */
+ vg_assert(is_valid_tid(tid)
+ && vg_threads[tid].status == VgTs_Runnable);
+
+ if (mutex == NULL || cond == NULL) {
+ vg_threads[tid].m_edx = EINVAL;
+ return;
+ }
+
+ /* More paranoia ... */
+ switch (mutex->__m_kind) {
+ case PTHREAD_MUTEX_TIMED_NP:
+ case PTHREAD_MUTEX_RECURSIVE_NP:
+ case PTHREAD_MUTEX_ERRORCHECK_NP:
+ case PTHREAD_MUTEX_ADAPTIVE_NP:
+ if (mutex->__m_count >= 0) break;
+ /* else fall thru */
+ default:
+ vg_threads[tid].m_edx = EINVAL;
+ return;
+ }
+
+ /* Barf if we don't currently hold the mutex. */
+ if (mutex->__m_count == 0 /* nobody holds it */
+ || (ThreadId)mutex->__m_owner != tid /* we don't hold it */) {
+ vg_threads[tid].m_edx = EINVAL;
+ return;
+ }
+
+ /* Queue ourselves on the condition. */
+ vg_threads[tid].status = VgTs_WaitCV;
+ vg_threads[tid].associated_cv = cond;
+ vg_threads[tid].associated_mx = mutex;
+
+ if (VG_(clo_trace_pthread_level) >= 1) {
+ VG_(sprintf)(msg_buf,
+ "pthread_cond_wait cv %p, mx %p: BLOCK",
+ cond, mutex );
+ print_pthread_event(tid, msg_buf);
+ }
+
+ /* Release the mutex. */
+ release_one_thread_waiting_on_mutex ( mutex, "pthread_cond_wait " );
+}
+
+
+static
+void do_pthread_cond_signal_or_broadcast ( ThreadId tid,
+ Bool broadcast,
+ pthread_cond_t *cond )
+{
+ Char msg_buf[100];
+ Char* caller
+ = broadcast ? "pthread_cond_broadcast"
+ : "pthread_cond_signal ";
+
+ if (VG_(clo_trace_pthread_level) >= 2) {
+ VG_(sprintf)(msg_buf, "%s cv %p ...",
+ caller, cond );
+ print_pthread_event(tid, msg_buf);
+ }
+
+ /* Paranoia ... */
+ vg_assert(is_valid_tid(tid)
+ && vg_threads[tid].status == VgTs_Runnable);
+
+ if (cond == NULL) {
+ vg_threads[tid].m_edx = EINVAL;
+ return;
+ }
+
+ release_N_threads_waiting_on_cond (
+ cond,
+ broadcast ? VG_N_THREADS : 1,
+ caller
+ );
+
+ vg_threads[tid].m_edx = 0; /* success */
+}
+
/* ---------------------------------------------------------------------
Handle non-trivial client requests.
do_pthread_cancel( tid, (pthread_t)(arg[1]) );
break;
+ case VG_USERREQ__PTHREAD_EXIT:
+ do_pthread_exit( tid, (void*)(arg[1]) );
+ break;
+
+ case VG_USERREQ__PTHREAD_COND_WAIT:
+ do_pthread_cond_wait( tid,
+ (pthread_cond_t *)(arg[1]),
+ (pthread_mutex_t *)(arg[2]) );
+ break;
+
+ case VG_USERREQ__PTHREAD_COND_SIGNAL:
+ do_pthread_cond_signal_or_broadcast(
+ tid,
+ False, /* signal, not broadcast */
+ (pthread_cond_t *)(arg[1]) );
+ break;
+
+ case VG_USERREQ__PTHREAD_COND_BROADCAST:
+ do_pthread_cond_signal_or_broadcast(
+ tid,
+ True, /* broadcast, not signal */
+ (pthread_cond_t *)(arg[1]) );
+ break;
+
case VG_USERREQ__MAKE_NOACCESS:
case VG_USERREQ__MAKE_WRITABLE:
case VG_USERREQ__MAKE_READABLE:
static
void scheduler_sanity ( void )
{
- pthread_mutex_t* mutex;
+ pthread_mutex_t* mx;
+ pthread_cond_t* cv;
Int i;
/* VG_(printf)("scheduler_sanity\n"); */
for (i = 1; i < VG_N_THREADS; i++) {
+ mx = vg_threads[i].associated_mx;
+ cv = vg_threads[i].associated_cv;
if (vg_threads[i].status == VgTs_WaitMX) {
- mutex = vg_threads[i].waited_on_mx;
- vg_assert(mutex != NULL);
- vg_assert(mutex->__m_count > 0);
- vg_assert(is_valid_tid((ThreadId)mutex->__m_owner));
+ vg_assert(cv == NULL);
+ vg_assert(mx != NULL);
+ vg_assert(mx->__m_count > 0);
+ vg_assert(is_valid_tid((ThreadId)mx->__m_owner));
+ vg_assert(i != (ThreadId)mx->__m_owner);
+ /* otherwise thread i would be deadlocked. */
+ } else
+ if (vg_threads[i].status == VgTs_WaitCV) {
+ vg_assert(cv != NULL);
+ vg_assert(mx != NULL);
} else {
- vg_assert(vg_threads[i].waited_on_mx == NULL);
+ vg_assert(cv == NULL);
+ vg_assert(mx == NULL);
}
}
}