From: Julian Seward Date: Sat, 20 Apr 2002 13:53:23 +0000 (+0000) Subject: Add fairly complete, and apparently working, support for condition X-Git-Tag: svn/VALGRIND_1_0_3~349 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e39f3f644a462b9c421b8633e089816260bc210b;p=thirdparty%2Fvalgrind.git Add fairly complete, and apparently working, support for condition variables. git-svn-id: svn://svn.valgrind.org/valgrind/trunk@102 --- diff --git a/coregrind/arch/x86-linux/vg_libpthread.c b/coregrind/arch/x86-linux/vg_libpthread.c index 435b7e292a..3ecd6f1145 100644 --- a/coregrind/arch/x86-linux/vg_libpthread.c +++ b/coregrind/arch/x86-linux/vg_libpthread.c @@ -87,6 +87,7 @@ void ensure_valgrind ( char* caller ) static +__attribute__((noreturn)) void barf ( char* str ) { char buf[100]; @@ -96,6 +97,8 @@ void barf ( char* str ) strcat(buf, "\n\n"); write(2, buf, strlen(buf)); myexit(1); + /* We have to persuade gcc into believing this doesn't return. */ + while (1) { }; } @@ -175,6 +178,18 @@ pthread_join (pthread_t __th, void **__thread_return) } +void pthread_exit(void *retval) +{ + int res; + ensure_valgrind("pthread_exit"); + VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */, + VG_USERREQ__PTHREAD_EXIT, + retval, 0, 0, 0); + /* Doesn't return! */ + /* However, we have to fool gcc into knowing that. */ + barf("pthread_exit: still alive after request?!"); +} + static int thread_specific_errno[VG_N_THREADS]; @@ -324,6 +339,36 @@ int pthread_setschedparam(pthread_t target_thread, return 0; } +int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) +{ + int res; + ensure_valgrind("pthread_cond_wait"); + VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */, + VG_USERREQ__PTHREAD_COND_WAIT, + cond, mutex, 0, 0); + return res; +} + +int pthread_cond_signal(pthread_cond_t *cond) +{ + int res; + ensure_valgrind("pthread_cond_signal"); + VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */, + VG_USERREQ__PTHREAD_COND_SIGNAL, + cond, 0, 0, 0); + return res; +} + +int pthread_cond_broadcast(pthread_cond_t *cond) +{ + int res; + ensure_valgrind("pthread_cond_broadcast"); + VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */, + VG_USERREQ__PTHREAD_COND_BROADCAST, + cond, 0, 0, 0); + return res; +} + /* --------------------------------------------------- CANCELLATION diff --git a/coregrind/vg_include.h b/coregrind/vg_include.h index 1c72ac0d05..bdb60fc478 100644 --- a/coregrind/vg_include.h +++ b/coregrind/vg_include.h @@ -126,7 +126,7 @@ scheduler algorithms is surely O(N^2) in the number of threads, since that's simple, at least. And (in practice) we hope that most programs do not need many threads. */ -#define VG_N_THREADS 10 +#define VG_N_THREADS 20 /* Number of file descriptors that can simultaneously be waited on for I/O to complete. Perhaps this should be the same as VG_N_THREADS @@ -397,9 +397,13 @@ extern Bool VG_(is_empty_arena) ( ArenaId aid ); #define VG_USERREQ__PTHREAD_CREATE 0x3001 #define VG_USERREQ__PTHREAD_JOIN 0x3002 #define VG_USERREQ__PTHREAD_GET_THREADID 0x3003 -#define VG_USERREQ__PTHREAD_MUTEX_LOCK 0x3005 -#define VG_USERREQ__PTHREAD_MUTEX_UNLOCK 0x3006 -#define VG_USERREQ__PTHREAD_CANCEL 0x3008 +#define VG_USERREQ__PTHREAD_MUTEX_LOCK 0x3004 +#define VG_USERREQ__PTHREAD_MUTEX_UNLOCK 0x3005 +#define VG_USERREQ__PTHREAD_CANCEL 0x3006 +#define VG_USERREQ__PTHREAD_EXIT 0x3007 +#define VG_USERREQ__PTHREAD_COND_WAIT 0x3008 +#define VG_USERREQ__PTHREAD_COND_SIGNAL 0x3009 +#define VG_USERREQ__PTHREAD_COND_BROADCAST 0x300A /* Cosmetic ... */ #define VG_USERREQ__GET_PTHREAD_TRACE_LEVEL 0x3101 @@ -445,6 +449,7 @@ typedef VgTs_WaitJoinee, /* waiting for the thread I did join on */ VgTs_WaitFD, /* waiting for I/O completion on a fd */ VgTs_WaitMX, /* waiting on a mutex */ + VgTs_WaitCV, /* waiting on a condition variable */ VgTs_Sleeping /* sleeping for a while */ } ThreadStatus; @@ -467,9 +472,15 @@ typedef VG_INVALID_THREADID if no one asked to join yet. */ ThreadId joiner; - /* When .status == WaitMX, points to the mutex I am waiting - for. */ - void* /* pthread_mutex_t* */ waited_on_mx; + /* When .status == WaitMX, points to the mutex I am waiting for. + When .status == WaitCV, points to the mutex associated with + the condition variable indicated by the .associated_cv field. + In all other cases, should be NULL. */ + void* /* pthread_mutex_t* */ associated_mx; + + /* When .status == WaitCV, points to the condition variable I am + waiting for. In all other cases, should be NULL. */ + void* /* pthread_cond_t* */ associated_cv; /* If VgTs_Sleeping, this is when we should wake up. */ ULong awaken_at; diff --git a/coregrind/vg_libpthread.c b/coregrind/vg_libpthread.c index 435b7e292a..3ecd6f1145 100644 --- a/coregrind/vg_libpthread.c +++ b/coregrind/vg_libpthread.c @@ -87,6 +87,7 @@ void ensure_valgrind ( char* caller ) static +__attribute__((noreturn)) void barf ( char* str ) { char buf[100]; @@ -96,6 +97,8 @@ void barf ( char* str ) strcat(buf, "\n\n"); write(2, buf, strlen(buf)); myexit(1); + /* We have to persuade gcc into believing this doesn't return. */ + while (1) { }; } @@ -175,6 +178,18 @@ pthread_join (pthread_t __th, void **__thread_return) } +void pthread_exit(void *retval) +{ + int res; + ensure_valgrind("pthread_exit"); + VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */, + VG_USERREQ__PTHREAD_EXIT, + retval, 0, 0, 0); + /* Doesn't return! */ + /* However, we have to fool gcc into knowing that. */ + barf("pthread_exit: still alive after request?!"); +} + static int thread_specific_errno[VG_N_THREADS]; @@ -324,6 +339,36 @@ int pthread_setschedparam(pthread_t target_thread, return 0; } +int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) +{ + int res; + ensure_valgrind("pthread_cond_wait"); + VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */, + VG_USERREQ__PTHREAD_COND_WAIT, + cond, mutex, 0, 0); + return res; +} + +int pthread_cond_signal(pthread_cond_t *cond) +{ + int res; + ensure_valgrind("pthread_cond_signal"); + VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */, + VG_USERREQ__PTHREAD_COND_SIGNAL, + cond, 0, 0, 0); + return res; +} + +int pthread_cond_broadcast(pthread_cond_t *cond) +{ + int res; + ensure_valgrind("pthread_cond_broadcast"); + VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */, + VG_USERREQ__PTHREAD_COND_BROADCAST, + cond, 0, 0, 0); + return res; +} + /* --------------------------------------------------- CANCELLATION diff --git a/coregrind/vg_scheduler.c b/coregrind/vg_scheduler.c index 194c231236..da8143cbee 100644 --- a/coregrind/vg_scheduler.c +++ b/coregrind/vg_scheduler.c @@ -188,9 +188,12 @@ void VG_(pp_sched_status) ( void ) case VgTs_WaitJoinee: VG_(printf)("WaitJoinee"); break; case VgTs_Sleeping: VG_(printf)("Sleeping"); break; case VgTs_WaitMX: VG_(printf)("WaitMX"); break; + case VgTs_WaitCV: VG_(printf)("WaitCV"); break; default: VG_(printf)("???"); break; } - VG_(printf)(", waited_on_mx = %p\n", vg_threads[i].waited_on_mx ); + VG_(printf)(", associated_mx = %p, associated_cv = %p\n", + vg_threads[i].associated_mx, + vg_threads[i].associated_cv ); VG_(pp_ExeContext)( VG_(get_ExeContext)( False, vg_threads[i].m_eip, vg_threads[i].m_ebp )); @@ -513,10 +516,11 @@ void VG_(scheduler_init) ( void ) tid_main = vg_alloc_ThreadState(); vg_assert(tid_main == 1); - vg_threads[tid_main].status = VgTs_Runnable; - vg_threads[tid_main].joiner = VG_INVALID_THREADID; - vg_threads[tid_main].waited_on_mx = NULL; - vg_threads[tid_main].retval = NULL; /* not important */ + vg_threads[tid_main].status = VgTs_Runnable; + vg_threads[tid_main].joiner = VG_INVALID_THREADID; + vg_threads[tid_main].associated_mx = NULL; + vg_threads[tid_main].associated_cv = NULL; + vg_threads[tid_main].retval = NULL; /* not important */ vg_threads[tid_main].stack_highest_word = vg_threads[tid_main].m_esp /* -4 ??? */; @@ -1140,7 +1144,7 @@ VgSchedReturnCode VG_(scheduler) ( void ) aim is not to do too many of Phase 1 since it is expensive. */ if (0) - VG_(printf)("SCHED: tid %d, used %d\n", tid, VG_N_THREADS); + VG_(printf)("SCHED: tid %d\n", tid); /* Figure out how many bbs to ask vg_run_innerloop to do. Note that it decrements the counter before testing it for zero, so @@ -1363,6 +1367,23 @@ void do_pthread_cancel ( ThreadId tid_canceller, } +static +void do_pthread_exit ( ThreadId tid, void* retval ) +{ + Char msg_buf[100]; + /* We want make is appear that this thread has returned to + do_pthread_create_bogusRA with retval as the + return value. So: simple: put retval into %EAX + and &do_pthread_create_bogusRA into %EIP and keep going! */ + if (VG_(clo_trace_sched)) { + VG_(sprintf)(msg_buf, "exiting with %p", retval); + print_sched_event(tid, msg_buf); + } + vg_threads[tid].m_eax = (UInt)retval; + vg_threads[tid].m_eip = (UInt)&VG_(pthreadreturn_bogusRA); + vg_threads[tid].status = VgTs_Runnable; +} + /* Thread tid is exiting, by returning from the function it was created with. Or possibly due to pthread_exit or cancellation. @@ -1581,9 +1602,10 @@ void do_pthread_create ( ThreadId parent_tid, // ***** CHECK *thread is writable *thread = (pthread_t)tid; - vg_threads[tid].waited_on_mx = NULL; - vg_threads[tid].joiner = VG_INVALID_THREADID; - vg_threads[tid].status = VgTs_Runnable; + vg_threads[tid].associated_mx = NULL; + vg_threads[tid].associated_cv = NULL; + vg_threads[tid].joiner = VG_INVALID_THREADID; + vg_threads[tid].status = VgTs_Runnable; /* return zero */ vg_threads[tid].m_edx = 0; /* success */ @@ -1638,6 +1660,47 @@ void do_pthread_create ( ThreadId parent_tid, deals with that for us. */ +/* Helper fns ... */ +static +void release_one_thread_waiting_on_mutex ( pthread_mutex_t* mutex, + Char* caller ) +{ + Int i; + Char msg_buf[100]; + + /* Find some arbitrary thread waiting on this mutex, and make it + runnable. If none are waiting, mark the mutex as not held. */ + for (i = 1; i < VG_N_THREADS; i++) { + if (vg_threads[i].status == VgTs_Empty) + continue; + if (vg_threads[i].status == VgTs_WaitMX + && vg_threads[i].associated_mx == mutex) + break; + } + + vg_assert(i <= VG_N_THREADS); + if (i == VG_N_THREADS) { + /* Nobody else is waiting on it. */ + mutex->__m_count = 0; + mutex->__m_owner = VG_INVALID_THREADID; + } else { + /* Notionally transfer the hold to thread i, whose + pthread_mutex_lock() call now returns with 0 (success). */ + /* The .count is already == 1. */ + vg_assert(vg_threads[i].associated_mx == mutex); + mutex->__m_owner = (_pthread_descr)i; + vg_threads[i].status = VgTs_Runnable; + vg_threads[i].associated_mx = NULL; + vg_threads[i].m_edx = 0; /* pth_lock() success */ + + if (VG_(clo_trace_pthread_level) >= 1) { + VG_(sprintf)(msg_buf, "%s mx %p: RESUME", + caller, mutex ); + print_pthread_event(i, msg_buf); + } + } +} + static void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex ) @@ -1645,7 +1708,7 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex ) Char msg_buf[100]; if (VG_(clo_trace_pthread_level) >= 2) { - VG_(sprintf)(msg_buf, "pthread_mutex_lock %p", mutex ); + VG_(sprintf)(msg_buf, "pthread_mutex_lock mx %p ...", mutex ); print_pthread_event(tid, msg_buf); } @@ -1683,7 +1746,7 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex ) /* return 0 (success). */ mutex->__m_count++; vg_threads[tid].m_edx = 0; - VG_(printf)("!!!!!! tid %d, mutex %p -> locked %d\n", + VG_(printf)("!!!!!! tid %d, mx %p -> locked %d\n", tid, mutex, mutex->__m_count); return; } else { @@ -1693,11 +1756,11 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex ) } else { /* Someone else has it; we have to wait. Mark ourselves thusly. */ - vg_threads[tid].status = VgTs_WaitMX; - vg_threads[tid].waited_on_mx = mutex; + vg_threads[tid].status = VgTs_WaitMX; + vg_threads[tid].associated_mx = mutex; /* No assignment to %EDX, since we're blocking. */ if (VG_(clo_trace_pthread_level) >= 1) { - VG_(sprintf)(msg_buf, "pthread_mutex_lock %p: BLOCK", + VG_(sprintf)(msg_buf, "pthread_mutex_lock mx %p: BLOCK", mutex ); print_pthread_event(tid, msg_buf); } @@ -1710,7 +1773,7 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex ) /* We get it! [for the first time]. */ mutex->__m_count = 1; mutex->__m_owner = (_pthread_descr)tid; - vg_assert(vg_threads[tid].waited_on_mx == NULL); + vg_assert(vg_threads[tid].associated_mx == NULL); /* return 0 (success). */ vg_threads[tid].m_edx = 0; } @@ -1722,11 +1785,10 @@ static void do_pthread_mutex_unlock ( ThreadId tid, pthread_mutex_t *mutex ) { - Int i; - Char msg_buf[100]; + Char msg_buf[100]; if (VG_(clo_trace_pthread_level) >= 2) { - VG_(sprintf)(msg_buf, "pthread_mutex_unlock %p", mutex ); + VG_(sprintf)(msg_buf, "pthread_mutex_unlock mx %p ...", mutex ); print_pthread_event(tid, msg_buf); } @@ -1773,40 +1835,10 @@ void do_pthread_mutex_unlock ( ThreadId tid, vg_assert(mutex->__m_count == 1); vg_assert((ThreadId)mutex->__m_owner == tid); - /* Find some arbitrary thread waiting on this mutex, and make it - runnable. If none are waiting, mark the mutex as not held. */ - for (i = 1; i < VG_N_THREADS; i++) { - if (vg_threads[i].status == VgTs_Empty) - continue; - if (vg_threads[i].status == VgTs_WaitMX - && vg_threads[i].waited_on_mx == mutex) - break; - } - - vg_assert(i <= VG_N_THREADS); - if (i == VG_N_THREADS) { - /* Nobody else is waiting on it. */ - mutex->__m_count = 0; - mutex->__m_owner = VG_INVALID_THREADID; - } else { - /* Notionally transfer the hold to thread i, whose - pthread_mutex_lock() call now returns with 0 (success). */ - /* The .count is already == 1. */ - vg_assert(vg_threads[i].waited_on_mx == mutex); - mutex->__m_owner = (_pthread_descr)i; - vg_threads[i].status = VgTs_Runnable; - vg_threads[i].waited_on_mx = NULL; - vg_threads[i].m_edx = 0; /* pth_lock() success */ - - if (VG_(clo_trace_pthread_level) >= 1) { - VG_(sprintf)(msg_buf, "pthread_mutex_lock %p: RESUME", - mutex ); - print_pthread_event(i, msg_buf); - } - } + /* Release at max one thread waiting on this mutex. */ + release_one_thread_waiting_on_mutex ( mutex, "pthread_mutex_lock" ); - /* In either case, our (tid's) pth_unlock() returns with 0 - (success). */ + /* Our (tid's) pth_unlock() returns with 0 (success). */ vg_threads[tid].m_edx = 0; /* Success. */ } @@ -1833,15 +1865,173 @@ void do_pthread_mutex_unlock ( ThreadId tid, #define PTHREAD_COND_INITIALIZER {__LOCK_INITIALIZER, 0} - We'll just use the __c_waiting field to point to the head of the - list of threads waiting on this condition. Note how the static - initialiser has __c_waiting == 0 == VG_INVALID_THREADID. + We don't use any fields of pthread_cond_t for anything at all. + Only the identity of the CVs is important. Linux pthreads supports no attributes on condition variables, so we - don't need to think too hard there. -*/ + don't need to think too hard there. */ +static +void release_N_threads_waiting_on_cond ( pthread_cond_t* cond, + Int n_to_release, + Char* caller ) +{ + Int i; + Char msg_buf[100]; + pthread_mutex_t* mx; + + while (True) { + if (n_to_release == 0) + return; + + /* Find a thread waiting on this CV. */ + for (i = 1; i < VG_N_THREADS; i++) { + if (vg_threads[i].status == VgTs_Empty) + continue; + if (vg_threads[i].status == VgTs_WaitCV + && vg_threads[i].associated_cv == cond) + break; + } + vg_assert(i <= VG_N_THREADS); + + if (i == VG_N_THREADS) { + /* Nobody else is waiting on it. */ + return; + } + + mx = vg_threads[i].associated_mx; + vg_assert(mx != NULL); + + if (mx->__m_owner == VG_INVALID_THREADID) { + /* Currently unheld; hand it out to thread i. */ + vg_assert(mx->__m_count == 0); + vg_threads[i].status = VgTs_Runnable; + vg_threads[i].associated_cv = NULL; + vg_threads[i].associated_mx = NULL; + mx->__m_owner = (_pthread_descr)i; + mx->__m_count = 1; + vg_threads[i].m_edx = 0; /* pthread_cond_wait returns success */ + + if (VG_(clo_trace_pthread_level) >= 1) { + VG_(sprintf)(msg_buf, "%s cv %p: RESUME with mx %p", + caller, cond, mx ); + print_pthread_event(i, msg_buf); + } + + } else { + /* Currently held. Make thread i be blocked on it. */ + vg_threads[i].status = VgTs_WaitMX; + vg_threads[i].associated_cv = NULL; + vg_threads[i].associated_mx = mx; + + if (VG_(clo_trace_pthread_level) >= 1) { + VG_(sprintf)(msg_buf, "%s cv %p: BLOCK for mx %p", + caller, cond, mx ); + print_pthread_event(i, msg_buf); + } + + } + + n_to_release--; + } +} + + +static +void do_pthread_cond_wait ( ThreadId tid, + pthread_cond_t *cond, + pthread_mutex_t *mutex ) +{ + Char msg_buf[100]; + + /* pre: mutex should be a valid mutex and owned by tid. */ + if (VG_(clo_trace_pthread_level) >= 2) { + VG_(sprintf)(msg_buf, "pthread_cond_wait cv %p, mx %p ...", + cond, mutex ); + print_pthread_event(tid, msg_buf); + } + + /* Paranoia ... */ + vg_assert(is_valid_tid(tid) + && vg_threads[tid].status == VgTs_Runnable); + + if (mutex == NULL || cond == NULL) { + vg_threads[tid].m_edx = EINVAL; + return; + } + + /* More paranoia ... */ + switch (mutex->__m_kind) { + case PTHREAD_MUTEX_TIMED_NP: + case PTHREAD_MUTEX_RECURSIVE_NP: + case PTHREAD_MUTEX_ERRORCHECK_NP: + case PTHREAD_MUTEX_ADAPTIVE_NP: + if (mutex->__m_count >= 0) break; + /* else fall thru */ + default: + vg_threads[tid].m_edx = EINVAL; + return; + } + + /* Barf if we don't currently hold the mutex. */ + if (mutex->__m_count == 0 /* nobody holds it */ + || (ThreadId)mutex->__m_owner != tid /* we don't hold it */) { + vg_threads[tid].m_edx = EINVAL; + return; + } + + /* Queue ourselves on the condition. */ + vg_threads[tid].status = VgTs_WaitCV; + vg_threads[tid].associated_cv = cond; + vg_threads[tid].associated_mx = mutex; + + if (VG_(clo_trace_pthread_level) >= 1) { + VG_(sprintf)(msg_buf, + "pthread_cond_wait cv %p, mx %p: BLOCK", + cond, mutex ); + print_pthread_event(tid, msg_buf); + } + + /* Release the mutex. */ + release_one_thread_waiting_on_mutex ( mutex, "pthread_cond_wait " ); +} + + +static +void do_pthread_cond_signal_or_broadcast ( ThreadId tid, + Bool broadcast, + pthread_cond_t *cond ) +{ + Char msg_buf[100]; + Char* caller + = broadcast ? "pthread_cond_broadcast" + : "pthread_cond_signal "; + + if (VG_(clo_trace_pthread_level) >= 2) { + VG_(sprintf)(msg_buf, "%s cv %p ...", + caller, cond ); + print_pthread_event(tid, msg_buf); + } + + /* Paranoia ... */ + vg_assert(is_valid_tid(tid) + && vg_threads[tid].status == VgTs_Runnable); + + if (cond == NULL) { + vg_threads[tid].m_edx = EINVAL; + return; + } + + release_N_threads_waiting_on_cond ( + cond, + broadcast ? VG_N_THREADS : 1, + caller + ); + + vg_threads[tid].m_edx = 0; /* success */ +} + /* --------------------------------------------------------------------- Handle non-trivial client requests. @@ -1882,6 +2072,30 @@ void do_nontrivial_clientreq ( ThreadId tid ) do_pthread_cancel( tid, (pthread_t)(arg[1]) ); break; + case VG_USERREQ__PTHREAD_EXIT: + do_pthread_exit( tid, (void*)(arg[1]) ); + break; + + case VG_USERREQ__PTHREAD_COND_WAIT: + do_pthread_cond_wait( tid, + (pthread_cond_t *)(arg[1]), + (pthread_mutex_t *)(arg[2]) ); + break; + + case VG_USERREQ__PTHREAD_COND_SIGNAL: + do_pthread_cond_signal_or_broadcast( + tid, + False, /* signal, not broadcast */ + (pthread_cond_t *)(arg[1]) ); + break; + + case VG_USERREQ__PTHREAD_COND_BROADCAST: + do_pthread_cond_signal_or_broadcast( + tid, + True, /* broadcast, not signal */ + (pthread_cond_t *)(arg[1]) ); + break; + case VG_USERREQ__MAKE_NOACCESS: case VG_USERREQ__MAKE_WRITABLE: case VG_USERREQ__MAKE_READABLE: @@ -1917,17 +2131,27 @@ void do_nontrivial_clientreq ( ThreadId tid ) static void scheduler_sanity ( void ) { - pthread_mutex_t* mutex; + pthread_mutex_t* mx; + pthread_cond_t* cv; Int i; /* VG_(printf)("scheduler_sanity\n"); */ for (i = 1; i < VG_N_THREADS; i++) { + mx = vg_threads[i].associated_mx; + cv = vg_threads[i].associated_cv; if (vg_threads[i].status == VgTs_WaitMX) { - mutex = vg_threads[i].waited_on_mx; - vg_assert(mutex != NULL); - vg_assert(mutex->__m_count > 0); - vg_assert(is_valid_tid((ThreadId)mutex->__m_owner)); + vg_assert(cv == NULL); + vg_assert(mx != NULL); + vg_assert(mx->__m_count > 0); + vg_assert(is_valid_tid((ThreadId)mx->__m_owner)); + vg_assert(i != (ThreadId)mx->__m_owner); + /* otherwise thread i would be deadlocked. */ + } else + if (vg_threads[i].status == VgTs_WaitCV) { + vg_assert(cv != NULL); + vg_assert(mx != NULL); } else { - vg_assert(vg_threads[i].waited_on_mx == NULL); + vg_assert(cv == NULL); + vg_assert(mx == NULL); } } } diff --git a/tests/pth_cvsimple.c b/tests/pth_cvsimple.c new file mode 100644 index 0000000000..ba1101bf61 --- /dev/null +++ b/tests/pth_cvsimple.c @@ -0,0 +1,84 @@ +/******************************************************** + * An example source module to accompany... + * + * "Using POSIX Threads: Programming with Pthreads" + * by Brad nichols, Dick Buttlar, Jackie Farrell + * O'Reilly & Associates, Inc. + * + ******************************************************** + * + * cvsimple.c + * + * Demonstrates pthread cancellation. + * + */ + +#include +#include + +#define NUM_THREADS 3 +#define TCOUNT 10 +#define COUNT_THRES 12 + +int count = 0; +int thread_ids[3] = {0,1,2}; +pthread_mutex_t count_lock=PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t count_hit_threshold=PTHREAD_COND_INITIALIZER; + +void *inc_count(void *idp) +{ + int i=0, save_state, save_type; + int *my_id = idp; + + for (i=0; i +#include +#include +#include +#include + +#include + + +/******************************************************** + * An example source module to accompany... + * + * "Using POSIX Threads: Programming with Pthreads" + * by Brad nichols, Dick Buttlar, Jackie Farrell + * O'Reilly & Associates, Inc. + * + ******************************************************** + * tpool.h -- + * + * Structures for thread pool + */ + +typedef struct tpool_work { + void (*routine)(); + void *arg; + struct tpool_work *next; +} tpool_work_t; + +typedef struct tpool { + /* pool characteristics */ + int num_threads; + int max_queue_size; + int do_not_block_when_full; + /* pool state */ + pthread_t *threads; + int cur_queue_size; + tpool_work_t *queue_head; + tpool_work_t *queue_tail; + int queue_closed; + int shutdown; + /* pool synchronization */ + pthread_mutex_t queue_lock; + pthread_cond_t queue_not_empty; + pthread_cond_t queue_not_full; + pthread_cond_t queue_empty; +} *tpool_t; + +void tpool_init( + tpool_t *tpoolp, + int num_threads, + int max_queue_size, + int do_not_block_when_full); + +int tpool_add_work( + tpool_t tpool, + void (*routine)(), + void *arg); + +int tpool_destroy( + tpool_t tpool, + int finish); + + +/*-- end of tpool.h ----------------------------------*/ + + +void *tpool_thread(void *); + +void tpool_init(tpool_t *tpoolp, + int num_worker_threads, + int max_queue_size, + int do_not_block_when_full) +{ + int i, rtn; + tpool_t tpool; + + /* allocate a pool data structure */ + if ((tpool = (tpool_t )malloc(sizeof(struct tpool))) == NULL) + perror("malloc"), exit(1); + + /* initialize th fields */ + tpool->num_threads = num_worker_threads; + tpool->max_queue_size = max_queue_size; + tpool->do_not_block_when_full = do_not_block_when_full; + if ((tpool->threads = + (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads)) + == NULL) + perror("malloc"), exit(1); + tpool->cur_queue_size = 0; + tpool->queue_head = NULL; + tpool->queue_tail = NULL; + tpool->queue_closed = 0; + tpool->shutdown = 0; + if ((rtn = pthread_mutex_init(&(tpool->queue_lock), NULL)) != 0) + fprintf(stderr,"pthread_mutex_init %s\n",strerror(rtn)), exit(1); + if ((rtn = pthread_cond_init(&(tpool->queue_not_empty), NULL)) != 0) + fprintf(stderr,"pthread_cond_init %s\n",strerror(rtn)), exit(1); + if ((rtn = pthread_cond_init(&(tpool->queue_not_full), NULL)) != 0) + fprintf(stderr,"pthread_cond_init %s\n",strerror(rtn)), exit(1); + if ((rtn = pthread_cond_init(&(tpool->queue_empty), NULL)) != 0) + fprintf(stderr,"pthread_cond_init %s\n",strerror(rtn)), exit(1); + + /* create threads */ + for (i = 0; i != num_worker_threads; i++) { + if ((rtn = pthread_create( &(tpool->threads[i]), + NULL, + tpool_thread, + (void *)tpool)) != 0) + fprintf(stderr,"pthread_create %d\n",rtn), exit(1); + } + + *tpoolp = tpool; +} + +int tpool_add_work( + tpool_t tpool, + void (*routine)(), + void *arg) +{ + int rtn; + tpool_work_t *workp; + + if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0) + fprintf(stderr,"pthread_mutex_lock %d\n",rtn), exit(1); + + /* no space and this caller doesn't want to wait */ + if ((tpool->cur_queue_size == tpool->max_queue_size) && + tpool->do_not_block_when_full) { + if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0) + fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1); + + return -1; + } + + while( (tpool->cur_queue_size == tpool->max_queue_size) && + (!(tpool->shutdown || tpool->queue_closed)) ) { + + if ((rtn = pthread_cond_wait(&(tpool->queue_not_full), + &(tpool->queue_lock))) != 0) + fprintf(stderr,"pthread_cond_waitA %d\n",rtn), exit(1); + + } + + /* the pool is in the process of being destroyed */ + if (tpool->shutdown || tpool->queue_closed) { + if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0) + fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1); + + return -1; + } + + + /* allocate work structure */ + if ((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL) + perror("malloc"), exit(1); + workp->routine = routine; + workp->arg = arg; + workp->next = NULL; + + printf("adder: adding an item %d\n", workp->routine); + + if (tpool->cur_queue_size == 0) { + tpool->queue_tail = tpool->queue_head = workp; + + printf("adder: queue == 0, waking all workers\n"); + + if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_empty))) != 0) + fprintf(stderr,"pthread_cond_signal %d\n",rtn), exit(1);; + } else { + tpool->queue_tail->next = workp; + tpool->queue_tail = workp; + } + + tpool->cur_queue_size++; + if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0) + fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1); + return 1; +} + +int tpool_destroy(tpool_t tpool, + int finish) +{ + int i,rtn; + tpool_work_t *cur_nodep; + + + if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0) + fprintf(stderr,"pthread_mutex_lock %d\n",rtn), exit(1); + + /* Is a shutdown already in progress? */ + if (tpool->queue_closed || tpool->shutdown) { + if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0) + fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1); + return 0; + } + + tpool->queue_closed = 1; + + /* If the finish flag is set, wait for workers to + drain queue */ + if (finish == 1) { + while (tpool->cur_queue_size != 0) { + if ((rtn = pthread_cond_wait(&(tpool->queue_empty), + &(tpool->queue_lock))) != 0) + fprintf(stderr,"pthread_cond_waitB %d\n",rtn), exit(1); + } + } + + tpool->shutdown = 1; + + if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0) + fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1); + + + /* Wake up any workers so they recheck shutdown flag */ + if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_empty))) != 0) + fprintf(stderr,"pthread_cond_broadcast %d\n",rtn), exit(1); + if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_full))) != 0) + fprintf(stderr,"pthread_cond_broadcast %d\n",rtn), exit(1); + + + /* Wait for workers to exit */ + for(i=0; i < tpool->num_threads; i++) { + if ((rtn = pthread_join(tpool->threads[i],NULL)) != 0) + fprintf(stderr,"pthread_join %d\n",rtn), exit(1); + } + + /* Now free pool structures */ + free(tpool->threads); + while(tpool->queue_head != NULL) { + cur_nodep = tpool->queue_head->next; + tpool->queue_head = tpool->queue_head->next; + free(cur_nodep); + } + free(tpool); +} + +void *tpool_thread(void *arg) +{ + tpool_t tpool = (tpool_t)arg; + int rtn; + tpool_work_t *my_workp; + + for(;;) { + + + + /* Check queue for work */ + if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0) + fprintf(stderr,"pthread_mutex_lock %d\n",rtn), exit(1); + + while ((tpool->cur_queue_size == 0) && (!tpool->shutdown)) { + + + printf("worker %d: I'm sleeping again\n", pthread_self()); + + if ((rtn = pthread_cond_wait(&(tpool->queue_not_empty), + &(tpool->queue_lock))) != 0) + fprintf(stderr,"pthread_cond_waitC %d\n",rtn), exit(1); + + } + sleep(1); + + printf("worker %d: I'm awake\n", pthread_self()); + + /* Has a shutdown started while i was sleeping? */ + if (tpool->shutdown == 1) { + + if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0) + fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1); + pthread_exit(NULL); + } + + + /* Get to work, dequeue the next item */ + my_workp = tpool->queue_head; + tpool->cur_queue_size--; + if (tpool->cur_queue_size == 0) + tpool->queue_head = tpool->queue_tail = NULL; + else + tpool->queue_head = my_workp->next; + + printf("worker %d: dequeing item %d\n", pthread_self(), my_workp->next); + + /* Handle waiting add_work threads */ + if ((!tpool->do_not_block_when_full) && + (tpool->cur_queue_size == (tpool->max_queue_size - 1))) + + if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_full))) != 0) + fprintf(stderr,"pthread_cond_broadcast %d\n",rtn), exit(1); + + /* Handle waiting destroyer threads */ + if (tpool->cur_queue_size == 0) + + if ((rtn = pthread_cond_signal(&(tpool->queue_empty))) != 0) + fprintf(stderr,"pthread_cond_signal %d\n",rtn), exit(1); + + if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0) + fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1); + + /* Do this work item */ + (*(my_workp->routine))(my_workp->arg); + free(my_workp); + } + return(NULL); +} + + +/******************************************************** + * An example source module to accompany... + * + * "Using POSIX Threads: Programming with Pthreads" + * by Brad nichols, Dick Buttlar, Jackie Farrell + * O'Reilly & Associates, Inc. + * + ******************************************************** + * tpool.c -- + * + * Example caller for thread pooling library + */ + +char *s1[20]={ "STRING 0", + "STRING 1", + "STRING 2", + "STRING 3", + "STRING 4", + "STRING 5", + "STRING 6", + "STRING 7", + "STRING 8", + "STRING 9", + "STRING 10", + "STRING 11", + "STRING 12", + "STRING 13", + "STRING 14", + "STRING 15", + "STRING 16", + "STRING 17", + "STRING 18", + "STRING 19"}; + +void r1(char * printstring) +{ + int i, x; + + printf("%s START\n", printstring); + + for (i = 0; i < 1000000; i++) { + x = x +i; + } + + printf("%s DONE\n", printstring); +} + +extern int +main(void) +{ + extern char *s1[]; + + pthread_t t1,t2; + int i; + + tpool_t test_pool; + + tpool_init(&test_pool, 10, 20, 0); + + sleep(1); + + for ( i = 0; i < 5; i++) { + printf("tpool_add_work returned %d\n", + tpool_add_work(test_pool, r1, s1[i])); + + } + + printf("main: all work queued\n"); + + tpool_destroy(test_pool, 1); + + return 0; + +} diff --git a/vg_include.h b/vg_include.h index 1c72ac0d05..bdb60fc478 100644 --- a/vg_include.h +++ b/vg_include.h @@ -126,7 +126,7 @@ scheduler algorithms is surely O(N^2) in the number of threads, since that's simple, at least. And (in practice) we hope that most programs do not need many threads. */ -#define VG_N_THREADS 10 +#define VG_N_THREADS 20 /* Number of file descriptors that can simultaneously be waited on for I/O to complete. Perhaps this should be the same as VG_N_THREADS @@ -397,9 +397,13 @@ extern Bool VG_(is_empty_arena) ( ArenaId aid ); #define VG_USERREQ__PTHREAD_CREATE 0x3001 #define VG_USERREQ__PTHREAD_JOIN 0x3002 #define VG_USERREQ__PTHREAD_GET_THREADID 0x3003 -#define VG_USERREQ__PTHREAD_MUTEX_LOCK 0x3005 -#define VG_USERREQ__PTHREAD_MUTEX_UNLOCK 0x3006 -#define VG_USERREQ__PTHREAD_CANCEL 0x3008 +#define VG_USERREQ__PTHREAD_MUTEX_LOCK 0x3004 +#define VG_USERREQ__PTHREAD_MUTEX_UNLOCK 0x3005 +#define VG_USERREQ__PTHREAD_CANCEL 0x3006 +#define VG_USERREQ__PTHREAD_EXIT 0x3007 +#define VG_USERREQ__PTHREAD_COND_WAIT 0x3008 +#define VG_USERREQ__PTHREAD_COND_SIGNAL 0x3009 +#define VG_USERREQ__PTHREAD_COND_BROADCAST 0x300A /* Cosmetic ... */ #define VG_USERREQ__GET_PTHREAD_TRACE_LEVEL 0x3101 @@ -445,6 +449,7 @@ typedef VgTs_WaitJoinee, /* waiting for the thread I did join on */ VgTs_WaitFD, /* waiting for I/O completion on a fd */ VgTs_WaitMX, /* waiting on a mutex */ + VgTs_WaitCV, /* waiting on a condition variable */ VgTs_Sleeping /* sleeping for a while */ } ThreadStatus; @@ -467,9 +472,15 @@ typedef VG_INVALID_THREADID if no one asked to join yet. */ ThreadId joiner; - /* When .status == WaitMX, points to the mutex I am waiting - for. */ - void* /* pthread_mutex_t* */ waited_on_mx; + /* When .status == WaitMX, points to the mutex I am waiting for. + When .status == WaitCV, points to the mutex associated with + the condition variable indicated by the .associated_cv field. + In all other cases, should be NULL. */ + void* /* pthread_mutex_t* */ associated_mx; + + /* When .status == WaitCV, points to the condition variable I am + waiting for. In all other cases, should be NULL. */ + void* /* pthread_cond_t* */ associated_cv; /* If VgTs_Sleeping, this is when we should wake up. */ ULong awaken_at; diff --git a/vg_libpthread.c b/vg_libpthread.c index 435b7e292a..3ecd6f1145 100644 --- a/vg_libpthread.c +++ b/vg_libpthread.c @@ -87,6 +87,7 @@ void ensure_valgrind ( char* caller ) static +__attribute__((noreturn)) void barf ( char* str ) { char buf[100]; @@ -96,6 +97,8 @@ void barf ( char* str ) strcat(buf, "\n\n"); write(2, buf, strlen(buf)); myexit(1); + /* We have to persuade gcc into believing this doesn't return. */ + while (1) { }; } @@ -175,6 +178,18 @@ pthread_join (pthread_t __th, void **__thread_return) } +void pthread_exit(void *retval) +{ + int res; + ensure_valgrind("pthread_exit"); + VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */, + VG_USERREQ__PTHREAD_EXIT, + retval, 0, 0, 0); + /* Doesn't return! */ + /* However, we have to fool gcc into knowing that. */ + barf("pthread_exit: still alive after request?!"); +} + static int thread_specific_errno[VG_N_THREADS]; @@ -324,6 +339,36 @@ int pthread_setschedparam(pthread_t target_thread, return 0; } +int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) +{ + int res; + ensure_valgrind("pthread_cond_wait"); + VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */, + VG_USERREQ__PTHREAD_COND_WAIT, + cond, mutex, 0, 0); + return res; +} + +int pthread_cond_signal(pthread_cond_t *cond) +{ + int res; + ensure_valgrind("pthread_cond_signal"); + VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */, + VG_USERREQ__PTHREAD_COND_SIGNAL, + cond, 0, 0, 0); + return res; +} + +int pthread_cond_broadcast(pthread_cond_t *cond) +{ + int res; + ensure_valgrind("pthread_cond_broadcast"); + VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */, + VG_USERREQ__PTHREAD_COND_BROADCAST, + cond, 0, 0, 0); + return res; +} + /* --------------------------------------------------- CANCELLATION diff --git a/vg_scheduler.c b/vg_scheduler.c index 194c231236..da8143cbee 100644 --- a/vg_scheduler.c +++ b/vg_scheduler.c @@ -188,9 +188,12 @@ void VG_(pp_sched_status) ( void ) case VgTs_WaitJoinee: VG_(printf)("WaitJoinee"); break; case VgTs_Sleeping: VG_(printf)("Sleeping"); break; case VgTs_WaitMX: VG_(printf)("WaitMX"); break; + case VgTs_WaitCV: VG_(printf)("WaitCV"); break; default: VG_(printf)("???"); break; } - VG_(printf)(", waited_on_mx = %p\n", vg_threads[i].waited_on_mx ); + VG_(printf)(", associated_mx = %p, associated_cv = %p\n", + vg_threads[i].associated_mx, + vg_threads[i].associated_cv ); VG_(pp_ExeContext)( VG_(get_ExeContext)( False, vg_threads[i].m_eip, vg_threads[i].m_ebp )); @@ -513,10 +516,11 @@ void VG_(scheduler_init) ( void ) tid_main = vg_alloc_ThreadState(); vg_assert(tid_main == 1); - vg_threads[tid_main].status = VgTs_Runnable; - vg_threads[tid_main].joiner = VG_INVALID_THREADID; - vg_threads[tid_main].waited_on_mx = NULL; - vg_threads[tid_main].retval = NULL; /* not important */ + vg_threads[tid_main].status = VgTs_Runnable; + vg_threads[tid_main].joiner = VG_INVALID_THREADID; + vg_threads[tid_main].associated_mx = NULL; + vg_threads[tid_main].associated_cv = NULL; + vg_threads[tid_main].retval = NULL; /* not important */ vg_threads[tid_main].stack_highest_word = vg_threads[tid_main].m_esp /* -4 ??? */; @@ -1140,7 +1144,7 @@ VgSchedReturnCode VG_(scheduler) ( void ) aim is not to do too many of Phase 1 since it is expensive. */ if (0) - VG_(printf)("SCHED: tid %d, used %d\n", tid, VG_N_THREADS); + VG_(printf)("SCHED: tid %d\n", tid); /* Figure out how many bbs to ask vg_run_innerloop to do. Note that it decrements the counter before testing it for zero, so @@ -1363,6 +1367,23 @@ void do_pthread_cancel ( ThreadId tid_canceller, } +static +void do_pthread_exit ( ThreadId tid, void* retval ) +{ + Char msg_buf[100]; + /* We want make is appear that this thread has returned to + do_pthread_create_bogusRA with retval as the + return value. So: simple: put retval into %EAX + and &do_pthread_create_bogusRA into %EIP and keep going! */ + if (VG_(clo_trace_sched)) { + VG_(sprintf)(msg_buf, "exiting with %p", retval); + print_sched_event(tid, msg_buf); + } + vg_threads[tid].m_eax = (UInt)retval; + vg_threads[tid].m_eip = (UInt)&VG_(pthreadreturn_bogusRA); + vg_threads[tid].status = VgTs_Runnable; +} + /* Thread tid is exiting, by returning from the function it was created with. Or possibly due to pthread_exit or cancellation. @@ -1581,9 +1602,10 @@ void do_pthread_create ( ThreadId parent_tid, // ***** CHECK *thread is writable *thread = (pthread_t)tid; - vg_threads[tid].waited_on_mx = NULL; - vg_threads[tid].joiner = VG_INVALID_THREADID; - vg_threads[tid].status = VgTs_Runnable; + vg_threads[tid].associated_mx = NULL; + vg_threads[tid].associated_cv = NULL; + vg_threads[tid].joiner = VG_INVALID_THREADID; + vg_threads[tid].status = VgTs_Runnable; /* return zero */ vg_threads[tid].m_edx = 0; /* success */ @@ -1638,6 +1660,47 @@ void do_pthread_create ( ThreadId parent_tid, deals with that for us. */ +/* Helper fns ... */ +static +void release_one_thread_waiting_on_mutex ( pthread_mutex_t* mutex, + Char* caller ) +{ + Int i; + Char msg_buf[100]; + + /* Find some arbitrary thread waiting on this mutex, and make it + runnable. If none are waiting, mark the mutex as not held. */ + for (i = 1; i < VG_N_THREADS; i++) { + if (vg_threads[i].status == VgTs_Empty) + continue; + if (vg_threads[i].status == VgTs_WaitMX + && vg_threads[i].associated_mx == mutex) + break; + } + + vg_assert(i <= VG_N_THREADS); + if (i == VG_N_THREADS) { + /* Nobody else is waiting on it. */ + mutex->__m_count = 0; + mutex->__m_owner = VG_INVALID_THREADID; + } else { + /* Notionally transfer the hold to thread i, whose + pthread_mutex_lock() call now returns with 0 (success). */ + /* The .count is already == 1. */ + vg_assert(vg_threads[i].associated_mx == mutex); + mutex->__m_owner = (_pthread_descr)i; + vg_threads[i].status = VgTs_Runnable; + vg_threads[i].associated_mx = NULL; + vg_threads[i].m_edx = 0; /* pth_lock() success */ + + if (VG_(clo_trace_pthread_level) >= 1) { + VG_(sprintf)(msg_buf, "%s mx %p: RESUME", + caller, mutex ); + print_pthread_event(i, msg_buf); + } + } +} + static void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex ) @@ -1645,7 +1708,7 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex ) Char msg_buf[100]; if (VG_(clo_trace_pthread_level) >= 2) { - VG_(sprintf)(msg_buf, "pthread_mutex_lock %p", mutex ); + VG_(sprintf)(msg_buf, "pthread_mutex_lock mx %p ...", mutex ); print_pthread_event(tid, msg_buf); } @@ -1683,7 +1746,7 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex ) /* return 0 (success). */ mutex->__m_count++; vg_threads[tid].m_edx = 0; - VG_(printf)("!!!!!! tid %d, mutex %p -> locked %d\n", + VG_(printf)("!!!!!! tid %d, mx %p -> locked %d\n", tid, mutex, mutex->__m_count); return; } else { @@ -1693,11 +1756,11 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex ) } else { /* Someone else has it; we have to wait. Mark ourselves thusly. */ - vg_threads[tid].status = VgTs_WaitMX; - vg_threads[tid].waited_on_mx = mutex; + vg_threads[tid].status = VgTs_WaitMX; + vg_threads[tid].associated_mx = mutex; /* No assignment to %EDX, since we're blocking. */ if (VG_(clo_trace_pthread_level) >= 1) { - VG_(sprintf)(msg_buf, "pthread_mutex_lock %p: BLOCK", + VG_(sprintf)(msg_buf, "pthread_mutex_lock mx %p: BLOCK", mutex ); print_pthread_event(tid, msg_buf); } @@ -1710,7 +1773,7 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex ) /* We get it! [for the first time]. */ mutex->__m_count = 1; mutex->__m_owner = (_pthread_descr)tid; - vg_assert(vg_threads[tid].waited_on_mx == NULL); + vg_assert(vg_threads[tid].associated_mx == NULL); /* return 0 (success). */ vg_threads[tid].m_edx = 0; } @@ -1722,11 +1785,10 @@ static void do_pthread_mutex_unlock ( ThreadId tid, pthread_mutex_t *mutex ) { - Int i; - Char msg_buf[100]; + Char msg_buf[100]; if (VG_(clo_trace_pthread_level) >= 2) { - VG_(sprintf)(msg_buf, "pthread_mutex_unlock %p", mutex ); + VG_(sprintf)(msg_buf, "pthread_mutex_unlock mx %p ...", mutex ); print_pthread_event(tid, msg_buf); } @@ -1773,40 +1835,10 @@ void do_pthread_mutex_unlock ( ThreadId tid, vg_assert(mutex->__m_count == 1); vg_assert((ThreadId)mutex->__m_owner == tid); - /* Find some arbitrary thread waiting on this mutex, and make it - runnable. If none are waiting, mark the mutex as not held. */ - for (i = 1; i < VG_N_THREADS; i++) { - if (vg_threads[i].status == VgTs_Empty) - continue; - if (vg_threads[i].status == VgTs_WaitMX - && vg_threads[i].waited_on_mx == mutex) - break; - } - - vg_assert(i <= VG_N_THREADS); - if (i == VG_N_THREADS) { - /* Nobody else is waiting on it. */ - mutex->__m_count = 0; - mutex->__m_owner = VG_INVALID_THREADID; - } else { - /* Notionally transfer the hold to thread i, whose - pthread_mutex_lock() call now returns with 0 (success). */ - /* The .count is already == 1. */ - vg_assert(vg_threads[i].waited_on_mx == mutex); - mutex->__m_owner = (_pthread_descr)i; - vg_threads[i].status = VgTs_Runnable; - vg_threads[i].waited_on_mx = NULL; - vg_threads[i].m_edx = 0; /* pth_lock() success */ - - if (VG_(clo_trace_pthread_level) >= 1) { - VG_(sprintf)(msg_buf, "pthread_mutex_lock %p: RESUME", - mutex ); - print_pthread_event(i, msg_buf); - } - } + /* Release at max one thread waiting on this mutex. */ + release_one_thread_waiting_on_mutex ( mutex, "pthread_mutex_lock" ); - /* In either case, our (tid's) pth_unlock() returns with 0 - (success). */ + /* Our (tid's) pth_unlock() returns with 0 (success). */ vg_threads[tid].m_edx = 0; /* Success. */ } @@ -1833,15 +1865,173 @@ void do_pthread_mutex_unlock ( ThreadId tid, #define PTHREAD_COND_INITIALIZER {__LOCK_INITIALIZER, 0} - We'll just use the __c_waiting field to point to the head of the - list of threads waiting on this condition. Note how the static - initialiser has __c_waiting == 0 == VG_INVALID_THREADID. + We don't use any fields of pthread_cond_t for anything at all. + Only the identity of the CVs is important. Linux pthreads supports no attributes on condition variables, so we - don't need to think too hard there. -*/ + don't need to think too hard there. */ +static +void release_N_threads_waiting_on_cond ( pthread_cond_t* cond, + Int n_to_release, + Char* caller ) +{ + Int i; + Char msg_buf[100]; + pthread_mutex_t* mx; + + while (True) { + if (n_to_release == 0) + return; + + /* Find a thread waiting on this CV. */ + for (i = 1; i < VG_N_THREADS; i++) { + if (vg_threads[i].status == VgTs_Empty) + continue; + if (vg_threads[i].status == VgTs_WaitCV + && vg_threads[i].associated_cv == cond) + break; + } + vg_assert(i <= VG_N_THREADS); + + if (i == VG_N_THREADS) { + /* Nobody else is waiting on it. */ + return; + } + + mx = vg_threads[i].associated_mx; + vg_assert(mx != NULL); + + if (mx->__m_owner == VG_INVALID_THREADID) { + /* Currently unheld; hand it out to thread i. */ + vg_assert(mx->__m_count == 0); + vg_threads[i].status = VgTs_Runnable; + vg_threads[i].associated_cv = NULL; + vg_threads[i].associated_mx = NULL; + mx->__m_owner = (_pthread_descr)i; + mx->__m_count = 1; + vg_threads[i].m_edx = 0; /* pthread_cond_wait returns success */ + + if (VG_(clo_trace_pthread_level) >= 1) { + VG_(sprintf)(msg_buf, "%s cv %p: RESUME with mx %p", + caller, cond, mx ); + print_pthread_event(i, msg_buf); + } + + } else { + /* Currently held. Make thread i be blocked on it. */ + vg_threads[i].status = VgTs_WaitMX; + vg_threads[i].associated_cv = NULL; + vg_threads[i].associated_mx = mx; + + if (VG_(clo_trace_pthread_level) >= 1) { + VG_(sprintf)(msg_buf, "%s cv %p: BLOCK for mx %p", + caller, cond, mx ); + print_pthread_event(i, msg_buf); + } + + } + + n_to_release--; + } +} + + +static +void do_pthread_cond_wait ( ThreadId tid, + pthread_cond_t *cond, + pthread_mutex_t *mutex ) +{ + Char msg_buf[100]; + + /* pre: mutex should be a valid mutex and owned by tid. */ + if (VG_(clo_trace_pthread_level) >= 2) { + VG_(sprintf)(msg_buf, "pthread_cond_wait cv %p, mx %p ...", + cond, mutex ); + print_pthread_event(tid, msg_buf); + } + + /* Paranoia ... */ + vg_assert(is_valid_tid(tid) + && vg_threads[tid].status == VgTs_Runnable); + + if (mutex == NULL || cond == NULL) { + vg_threads[tid].m_edx = EINVAL; + return; + } + + /* More paranoia ... */ + switch (mutex->__m_kind) { + case PTHREAD_MUTEX_TIMED_NP: + case PTHREAD_MUTEX_RECURSIVE_NP: + case PTHREAD_MUTEX_ERRORCHECK_NP: + case PTHREAD_MUTEX_ADAPTIVE_NP: + if (mutex->__m_count >= 0) break; + /* else fall thru */ + default: + vg_threads[tid].m_edx = EINVAL; + return; + } + + /* Barf if we don't currently hold the mutex. */ + if (mutex->__m_count == 0 /* nobody holds it */ + || (ThreadId)mutex->__m_owner != tid /* we don't hold it */) { + vg_threads[tid].m_edx = EINVAL; + return; + } + + /* Queue ourselves on the condition. */ + vg_threads[tid].status = VgTs_WaitCV; + vg_threads[tid].associated_cv = cond; + vg_threads[tid].associated_mx = mutex; + + if (VG_(clo_trace_pthread_level) >= 1) { + VG_(sprintf)(msg_buf, + "pthread_cond_wait cv %p, mx %p: BLOCK", + cond, mutex ); + print_pthread_event(tid, msg_buf); + } + + /* Release the mutex. */ + release_one_thread_waiting_on_mutex ( mutex, "pthread_cond_wait " ); +} + + +static +void do_pthread_cond_signal_or_broadcast ( ThreadId tid, + Bool broadcast, + pthread_cond_t *cond ) +{ + Char msg_buf[100]; + Char* caller + = broadcast ? "pthread_cond_broadcast" + : "pthread_cond_signal "; + + if (VG_(clo_trace_pthread_level) >= 2) { + VG_(sprintf)(msg_buf, "%s cv %p ...", + caller, cond ); + print_pthread_event(tid, msg_buf); + } + + /* Paranoia ... */ + vg_assert(is_valid_tid(tid) + && vg_threads[tid].status == VgTs_Runnable); + + if (cond == NULL) { + vg_threads[tid].m_edx = EINVAL; + return; + } + + release_N_threads_waiting_on_cond ( + cond, + broadcast ? VG_N_THREADS : 1, + caller + ); + + vg_threads[tid].m_edx = 0; /* success */ +} + /* --------------------------------------------------------------------- Handle non-trivial client requests. @@ -1882,6 +2072,30 @@ void do_nontrivial_clientreq ( ThreadId tid ) do_pthread_cancel( tid, (pthread_t)(arg[1]) ); break; + case VG_USERREQ__PTHREAD_EXIT: + do_pthread_exit( tid, (void*)(arg[1]) ); + break; + + case VG_USERREQ__PTHREAD_COND_WAIT: + do_pthread_cond_wait( tid, + (pthread_cond_t *)(arg[1]), + (pthread_mutex_t *)(arg[2]) ); + break; + + case VG_USERREQ__PTHREAD_COND_SIGNAL: + do_pthread_cond_signal_or_broadcast( + tid, + False, /* signal, not broadcast */ + (pthread_cond_t *)(arg[1]) ); + break; + + case VG_USERREQ__PTHREAD_COND_BROADCAST: + do_pthread_cond_signal_or_broadcast( + tid, + True, /* broadcast, not signal */ + (pthread_cond_t *)(arg[1]) ); + break; + case VG_USERREQ__MAKE_NOACCESS: case VG_USERREQ__MAKE_WRITABLE: case VG_USERREQ__MAKE_READABLE: @@ -1917,17 +2131,27 @@ void do_nontrivial_clientreq ( ThreadId tid ) static void scheduler_sanity ( void ) { - pthread_mutex_t* mutex; + pthread_mutex_t* mx; + pthread_cond_t* cv; Int i; /* VG_(printf)("scheduler_sanity\n"); */ for (i = 1; i < VG_N_THREADS; i++) { + mx = vg_threads[i].associated_mx; + cv = vg_threads[i].associated_cv; if (vg_threads[i].status == VgTs_WaitMX) { - mutex = vg_threads[i].waited_on_mx; - vg_assert(mutex != NULL); - vg_assert(mutex->__m_count > 0); - vg_assert(is_valid_tid((ThreadId)mutex->__m_owner)); + vg_assert(cv == NULL); + vg_assert(mx != NULL); + vg_assert(mx->__m_count > 0); + vg_assert(is_valid_tid((ThreadId)mx->__m_owner)); + vg_assert(i != (ThreadId)mx->__m_owner); + /* otherwise thread i would be deadlocked. */ + } else + if (vg_threads[i].status == VgTs_WaitCV) { + vg_assert(cv != NULL); + vg_assert(mx != NULL); } else { - vg_assert(vg_threads[i].waited_on_mx == NULL); + vg_assert(cv == NULL); + vg_assert(mx == NULL); } } }