]> git.ipfire.org Git - thirdparty/valgrind.git/commitdiff
Add fairly complete, and apparently working, support for condition
authorJulian Seward <jseward@acm.org>
Sat, 20 Apr 2002 13:53:23 +0000 (13:53 +0000)
committerJulian Seward <jseward@acm.org>
Sat, 20 Apr 2002 13:53:23 +0000 (13:53 +0000)
variables.

git-svn-id: svn://svn.valgrind.org/valgrind/trunk@102

coregrind/arch/x86-linux/vg_libpthread.c
coregrind/vg_include.h
coregrind/vg_libpthread.c
coregrind/vg_scheduler.c
tests/pth_cvsimple.c [new file with mode: 0644]
tests/pth_threadpool.c [new file with mode: 0644]
vg_include.h
vg_libpthread.c
vg_scheduler.c

index 435b7e292a505153a5f72819b08c3ef6872ef141..3ecd6f1145b73a3549e19ef31be1aaf6427b41b0 100644 (file)
@@ -87,6 +87,7 @@ void ensure_valgrind ( char* caller )
 
 
 static
+__attribute__((noreturn))
 void barf ( char* str )
 {
    char buf[100];
@@ -96,6 +97,8 @@ void barf ( char* str )
    strcat(buf, "\n\n");
    write(2, buf, strlen(buf));
    myexit(1);
+   /* We have to persuade gcc into believing this doesn't return. */
+   while (1) { };
 }
 
 
@@ -175,6 +178,18 @@ pthread_join (pthread_t __th, void **__thread_return)
 }
 
 
+void pthread_exit(void *retval)
+{
+   int res;
+   ensure_valgrind("pthread_exit");
+   VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+                           VG_USERREQ__PTHREAD_EXIT,
+                           retval, 0, 0, 0);
+   /* Doesn't return! */
+   /* However, we have to fool gcc into knowing that. */
+   barf("pthread_exit: still alive after request?!");
+}
+
 
 static int thread_specific_errno[VG_N_THREADS];
 
@@ -324,6 +339,36 @@ int pthread_setschedparam(pthread_t target_thread,
    return 0;
 }
 
+int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
+{
+   int res;
+   ensure_valgrind("pthread_cond_wait");
+   VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+                           VG_USERREQ__PTHREAD_COND_WAIT,
+                          cond, mutex, 0, 0);
+   return res;
+}
+
+int pthread_cond_signal(pthread_cond_t *cond)
+{
+   int res;
+   ensure_valgrind("pthread_cond_signal");
+   VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+                           VG_USERREQ__PTHREAD_COND_SIGNAL,
+                          cond, 0, 0, 0);
+   return res;
+}
+
+int pthread_cond_broadcast(pthread_cond_t *cond)
+{
+   int res;
+   ensure_valgrind("pthread_cond_broadcast");
+   VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+                           VG_USERREQ__PTHREAD_COND_BROADCAST,
+                          cond, 0, 0, 0);
+   return res;
+}
+
 
 /* ---------------------------------------------------
    CANCELLATION
index 1c72ac0d0595cdcc196bd4dd6f1a94352ad54992..bdb60fc47801cd4a658f05d330ae39fb9f68f1df 100644 (file)
    scheduler algorithms is surely O(N^2) in the number of threads,
    since that's simple, at least.  And (in practice) we hope that most
    programs do not need many threads. */
-#define VG_N_THREADS 10
+#define VG_N_THREADS 20
 
 /* Number of file descriptors that can simultaneously be waited on for
    I/O to complete.  Perhaps this should be the same as VG_N_THREADS
@@ -397,9 +397,13 @@ extern Bool  VG_(is_empty_arena) ( ArenaId aid );
 #define VG_USERREQ__PTHREAD_CREATE          0x3001
 #define VG_USERREQ__PTHREAD_JOIN            0x3002
 #define VG_USERREQ__PTHREAD_GET_THREADID    0x3003
-#define VG_USERREQ__PTHREAD_MUTEX_LOCK      0x3005
-#define VG_USERREQ__PTHREAD_MUTEX_UNLOCK    0x3006
-#define VG_USERREQ__PTHREAD_CANCEL          0x3008
+#define VG_USERREQ__PTHREAD_MUTEX_LOCK      0x3004
+#define VG_USERREQ__PTHREAD_MUTEX_UNLOCK    0x3005
+#define VG_USERREQ__PTHREAD_CANCEL          0x3006
+#define VG_USERREQ__PTHREAD_EXIT            0x3007
+#define VG_USERREQ__PTHREAD_COND_WAIT       0x3008
+#define VG_USERREQ__PTHREAD_COND_SIGNAL     0x3009
+#define VG_USERREQ__PTHREAD_COND_BROADCAST  0x300A
 
 /* Cosmetic ... */
 #define VG_USERREQ__GET_PTHREAD_TRACE_LEVEL 0x3101
@@ -445,6 +449,7 @@ typedef
       VgTs_WaitJoinee, /* waiting for the thread I did join on */
       VgTs_WaitFD,     /* waiting for I/O completion on a fd */
       VgTs_WaitMX,     /* waiting on a mutex */
+      VgTs_WaitCV,     /* waiting on a condition variable */
       VgTs_Sleeping    /* sleeping for a while */
    }
    ThreadStatus;
@@ -467,9 +472,15 @@ typedef
          VG_INVALID_THREADID if no one asked to join yet. */
       ThreadId joiner;
 
-      /* When .status == WaitMX, points to the mutex I am waiting
-         for. */
-      void* /* pthread_mutex_t* */ waited_on_mx;
+      /* When .status == WaitMX, points to the mutex I am waiting for.
+         When .status == WaitCV, points to the mutex associated with
+         the condition variable indicated by the .associated_cv field.
+         In all other cases, should be NULL. */
+      void* /* pthread_mutex_t* */ associated_mx;
+
+      /* When .status == WaitCV, points to the condition variable I am
+         waiting for.  In all other cases, should be NULL. */
+      void* /* pthread_cond_t* */ associated_cv;
 
       /* If VgTs_Sleeping, this is when we should wake up. */
       ULong awaken_at;
index 435b7e292a505153a5f72819b08c3ef6872ef141..3ecd6f1145b73a3549e19ef31be1aaf6427b41b0 100644 (file)
@@ -87,6 +87,7 @@ void ensure_valgrind ( char* caller )
 
 
 static
+__attribute__((noreturn))
 void barf ( char* str )
 {
    char buf[100];
@@ -96,6 +97,8 @@ void barf ( char* str )
    strcat(buf, "\n\n");
    write(2, buf, strlen(buf));
    myexit(1);
+   /* We have to persuade gcc into believing this doesn't return. */
+   while (1) { };
 }
 
 
@@ -175,6 +178,18 @@ pthread_join (pthread_t __th, void **__thread_return)
 }
 
 
+void pthread_exit(void *retval)
+{
+   int res;
+   ensure_valgrind("pthread_exit");
+   VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+                           VG_USERREQ__PTHREAD_EXIT,
+                           retval, 0, 0, 0);
+   /* Doesn't return! */
+   /* However, we have to fool gcc into knowing that. */
+   barf("pthread_exit: still alive after request?!");
+}
+
 
 static int thread_specific_errno[VG_N_THREADS];
 
@@ -324,6 +339,36 @@ int pthread_setschedparam(pthread_t target_thread,
    return 0;
 }
 
+int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
+{
+   int res;
+   ensure_valgrind("pthread_cond_wait");
+   VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+                           VG_USERREQ__PTHREAD_COND_WAIT,
+                          cond, mutex, 0, 0);
+   return res;
+}
+
+int pthread_cond_signal(pthread_cond_t *cond)
+{
+   int res;
+   ensure_valgrind("pthread_cond_signal");
+   VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+                           VG_USERREQ__PTHREAD_COND_SIGNAL,
+                          cond, 0, 0, 0);
+   return res;
+}
+
+int pthread_cond_broadcast(pthread_cond_t *cond)
+{
+   int res;
+   ensure_valgrind("pthread_cond_broadcast");
+   VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+                           VG_USERREQ__PTHREAD_COND_BROADCAST,
+                          cond, 0, 0, 0);
+   return res;
+}
+
 
 /* ---------------------------------------------------
    CANCELLATION
index 194c231236736f7c3ed09205683653032c2e666b..da8143cbee13229c0c967af1d7659427e2769edb 100644 (file)
@@ -188,9 +188,12 @@ void VG_(pp_sched_status) ( void )
          case VgTs_WaitJoinee: VG_(printf)("WaitJoinee"); break;
          case VgTs_Sleeping:   VG_(printf)("Sleeping"); break;
          case VgTs_WaitMX:     VG_(printf)("WaitMX"); break;
+         case VgTs_WaitCV:     VG_(printf)("WaitCV"); break;
          default: VG_(printf)("???"); break;
       }
-      VG_(printf)(", waited_on_mx = %p\n", vg_threads[i].waited_on_mx );
+      VG_(printf)(", associated_mx = %p, associated_cv = %p\n", 
+                  vg_threads[i].associated_mx,
+                  vg_threads[i].associated_cv );
       VG_(pp_ExeContext)( 
          VG_(get_ExeContext)( False, vg_threads[i].m_eip, 
                                      vg_threads[i].m_ebp ));
@@ -513,10 +516,11 @@ void VG_(scheduler_init) ( void )
    tid_main = vg_alloc_ThreadState();
    vg_assert(tid_main == 1); 
 
-   vg_threads[tid_main].status       = VgTs_Runnable;
-   vg_threads[tid_main].joiner       = VG_INVALID_THREADID;
-   vg_threads[tid_main].waited_on_mx = NULL;
-   vg_threads[tid_main].retval       = NULL; /* not important */
+   vg_threads[tid_main].status        = VgTs_Runnable;
+   vg_threads[tid_main].joiner        = VG_INVALID_THREADID;
+   vg_threads[tid_main].associated_mx = NULL;
+   vg_threads[tid_main].associated_cv = NULL;
+   vg_threads[tid_main].retval        = NULL; /* not important */
    vg_threads[tid_main].stack_highest_word 
       = vg_threads[tid_main].m_esp /* -4  ??? */;
 
@@ -1140,7 +1144,7 @@ VgSchedReturnCode VG_(scheduler) ( void )
          aim is not to do too many of Phase 1 since it is expensive.  */
 
       if (0)
-         VG_(printf)("SCHED: tid %d, used %d\n", tid, VG_N_THREADS);
+         VG_(printf)("SCHED: tid %d\n", tid);
 
       /* Figure out how many bbs to ask vg_run_innerloop to do.  Note
          that it decrements the counter before testing it for zero, so
@@ -1363,6 +1367,23 @@ void do_pthread_cancel ( ThreadId  tid_canceller,
 }
 
 
+static
+void do_pthread_exit ( ThreadId tid, void* retval )
+{
+   Char msg_buf[100];
+   /* We want make is appear that this thread has returned to
+      do_pthread_create_bogusRA with retval as the
+      return value.  So: simple: put retval into %EAX
+      and &do_pthread_create_bogusRA into %EIP and keep going! */
+   if (VG_(clo_trace_sched)) {
+      VG_(sprintf)(msg_buf, "exiting with %p", retval);
+      print_sched_event(tid, msg_buf);
+   }
+   vg_threads[tid].m_eax  = (UInt)retval;
+   vg_threads[tid].m_eip  = (UInt)&VG_(pthreadreturn_bogusRA);
+   vg_threads[tid].status = VgTs_Runnable;
+}
+
 
 /* Thread tid is exiting, by returning from the function it was
    created with.  Or possibly due to pthread_exit or cancellation.
@@ -1581,9 +1602,10 @@ void do_pthread_create ( ThreadId parent_tid,
    // ***** CHECK *thread is writable
    *thread = (pthread_t)tid;
 
-   vg_threads[tid].waited_on_mx = NULL;
-   vg_threads[tid].joiner       = VG_INVALID_THREADID;
-   vg_threads[tid].status       = VgTs_Runnable;
+   vg_threads[tid].associated_mx = NULL;
+   vg_threads[tid].associated_cv = NULL;
+   vg_threads[tid].joiner        = VG_INVALID_THREADID;
+   vg_threads[tid].status        = VgTs_Runnable;
 
    /* return zero */
    vg_threads[tid].m_edx  = 0; /* success */
@@ -1638,6 +1660,47 @@ void do_pthread_create ( ThreadId parent_tid,
    deals with that for us.  
 */
 
+/* Helper fns ... */
+static
+void release_one_thread_waiting_on_mutex ( pthread_mutex_t* mutex, 
+                                           Char* caller )
+{
+   Int  i;
+   Char msg_buf[100];
+
+   /* Find some arbitrary thread waiting on this mutex, and make it
+      runnable.  If none are waiting, mark the mutex as not held. */
+   for (i = 1; i < VG_N_THREADS; i++) {
+      if (vg_threads[i].status == VgTs_Empty) 
+         continue;
+      if (vg_threads[i].status == VgTs_WaitMX 
+          && vg_threads[i].associated_mx == mutex)
+         break;
+   }
+
+   vg_assert(i <= VG_N_THREADS);
+   if (i == VG_N_THREADS) {
+      /* Nobody else is waiting on it. */
+      mutex->__m_count = 0;
+      mutex->__m_owner = VG_INVALID_THREADID;
+   } else {
+      /* Notionally transfer the hold to thread i, whose
+         pthread_mutex_lock() call now returns with 0 (success). */
+      /* The .count is already == 1. */
+      vg_assert(vg_threads[i].associated_mx == mutex);
+      mutex->__m_owner = (_pthread_descr)i;
+      vg_threads[i].status        = VgTs_Runnable;
+      vg_threads[i].associated_mx = NULL;
+      vg_threads[i].m_edx         = 0; /* pth_lock() success */
+
+      if (VG_(clo_trace_pthread_level) >= 1) {
+         VG_(sprintf)(msg_buf, "%s       mx %p: RESUME", 
+                               caller, mutex );
+         print_pthread_event(i, msg_buf);
+      }
+   }
+}
+
 
 static
 void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex )
@@ -1645,7 +1708,7 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex )
    Char msg_buf[100];
 
    if (VG_(clo_trace_pthread_level) >= 2) {
-      VG_(sprintf)(msg_buf, "pthread_mutex_lock   %p", mutex );
+      VG_(sprintf)(msg_buf, "pthread_mutex_lock       mx %p ...", mutex );
       print_pthread_event(tid, msg_buf);
    }
 
@@ -1683,7 +1746,7 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex )
             /* return 0 (success). */
             mutex->__m_count++;
             vg_threads[tid].m_edx = 0;
-           VG_(printf)("!!!!!! tid %d, mutex %p -> locked %d\n", 
+           VG_(printf)("!!!!!! tid %d, mx %p -> locked %d\n", 
                         tid, mutex, mutex->__m_count);
             return;
          } else {
@@ -1693,11 +1756,11 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex )
       } else {
          /* Someone else has it; we have to wait.  Mark ourselves
             thusly. */
-         vg_threads[tid].status       = VgTs_WaitMX;
-         vg_threads[tid].waited_on_mx = mutex;
+         vg_threads[tid].status        = VgTs_WaitMX;
+         vg_threads[tid].associated_mx = mutex;
          /* No assignment to %EDX, since we're blocking. */
          if (VG_(clo_trace_pthread_level) >= 1) {
-            VG_(sprintf)(msg_buf, "pthread_mutex_lock   %p: BLOCK", 
+            VG_(sprintf)(msg_buf, "pthread_mutex_lock       mx %p: BLOCK", 
                                   mutex );
             print_pthread_event(tid, msg_buf);
          }
@@ -1710,7 +1773,7 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex )
       /* We get it! [for the first time]. */
       mutex->__m_count = 1;
       mutex->__m_owner = (_pthread_descr)tid;
-      vg_assert(vg_threads[tid].waited_on_mx == NULL);
+      vg_assert(vg_threads[tid].associated_mx == NULL);
       /* return 0 (success). */
       vg_threads[tid].m_edx = 0;
    }
@@ -1722,11 +1785,10 @@ static
 void do_pthread_mutex_unlock ( ThreadId tid,
                                pthread_mutex_t *mutex )
 {
-   Int      i;
-   Char     msg_buf[100];
+   Char msg_buf[100];
 
    if (VG_(clo_trace_pthread_level) >= 2) {
-      VG_(sprintf)(msg_buf, "pthread_mutex_unlock %p", mutex );
+      VG_(sprintf)(msg_buf, "pthread_mutex_unlock     mx %p ...", mutex );
       print_pthread_event(tid, msg_buf);
    }
 
@@ -1773,40 +1835,10 @@ void do_pthread_mutex_unlock ( ThreadId tid,
    vg_assert(mutex->__m_count == 1);
    vg_assert((ThreadId)mutex->__m_owner == tid);
 
-   /* Find some arbitrary thread waiting on this mutex, and make it
-      runnable.  If none are waiting, mark the mutex as not held. */
-   for (i = 1; i < VG_N_THREADS; i++) {
-      if (vg_threads[i].status == VgTs_Empty) 
-         continue;
-      if (vg_threads[i].status == VgTs_WaitMX 
-          && vg_threads[i].waited_on_mx == mutex)
-         break;
-   }
-
-   vg_assert(i <= VG_N_THREADS);
-   if (i == VG_N_THREADS) {
-      /* Nobody else is waiting on it. */
-      mutex->__m_count = 0;
-      mutex->__m_owner = VG_INVALID_THREADID;
-   } else {
-      /* Notionally transfer the hold to thread i, whose
-         pthread_mutex_lock() call now returns with 0 (success). */
-      /* The .count is already == 1. */
-      vg_assert(vg_threads[i].waited_on_mx == mutex);
-      mutex->__m_owner = (_pthread_descr)i;
-      vg_threads[i].status       = VgTs_Runnable;
-      vg_threads[i].waited_on_mx = NULL;
-      vg_threads[i].m_edx = 0; /* pth_lock() success */
-
-      if (VG_(clo_trace_pthread_level) >= 1) {
-         VG_(sprintf)(msg_buf, "pthread_mutex_lock   %p: RESUME", 
-                               mutex );
-         print_pthread_event(i, msg_buf);
-      }
-   }
+   /* Release at max one thread waiting on this mutex. */
+   release_one_thread_waiting_on_mutex ( mutex, "pthread_mutex_lock" );
 
-   /* In either case, our (tid's) pth_unlock() returns with 0
-      (success). */
+   /* Our (tid's) pth_unlock() returns with 0 (success). */
    vg_threads[tid].m_edx = 0; /* Success. */
 }
 
@@ -1833,15 +1865,173 @@ void do_pthread_mutex_unlock ( ThreadId tid,
 
    #define PTHREAD_COND_INITIALIZER {__LOCK_INITIALIZER, 0}
 
-   We'll just use the __c_waiting field to point to the head of the
-   list of threads waiting on this condition.  Note how the static
-   initialiser has __c_waiting == 0 == VG_INVALID_THREADID.
+   We don't use any fields of pthread_cond_t for anything at all.
+   Only the identity of the CVs is important.
 
    Linux pthreads supports no attributes on condition variables, so we
-   don't need to think too hard there.  
-*/
+   don't need to think too hard there.  */
 
 
+static
+void release_N_threads_waiting_on_cond ( pthread_cond_t* cond, 
+                                         Int n_to_release, 
+                                         Char* caller )
+{
+   Int              i;
+   Char             msg_buf[100];
+   pthread_mutex_t* mx;
+
+   while (True) {
+      if (n_to_release == 0)
+         return;
+
+      /* Find a thread waiting on this CV. */
+      for (i = 1; i < VG_N_THREADS; i++) {
+         if (vg_threads[i].status == VgTs_Empty) 
+            continue;
+         if (vg_threads[i].status == VgTs_WaitCV 
+             && vg_threads[i].associated_cv == cond)
+            break;
+      }
+      vg_assert(i <= VG_N_THREADS);
+
+      if (i == VG_N_THREADS) {
+         /* Nobody else is waiting on it. */
+         return;
+      }
+
+      mx = vg_threads[i].associated_mx;
+      vg_assert(mx != NULL);
+
+      if (mx->__m_owner == VG_INVALID_THREADID) {
+         /* Currently unheld; hand it out to thread i. */
+         vg_assert(mx->__m_count == 0);
+         vg_threads[i].status        = VgTs_Runnable;
+         vg_threads[i].associated_cv = NULL;
+         vg_threads[i].associated_mx = NULL;
+         mx->__m_owner = (_pthread_descr)i;
+         mx->__m_count = 1;
+         vg_threads[i].m_edx = 0; /* pthread_cond_wait returns success */
+
+         if (VG_(clo_trace_pthread_level) >= 1) {
+            VG_(sprintf)(msg_buf, "%s   cv %p: RESUME with mx %p", 
+                                  caller, cond, mx );
+            print_pthread_event(i, msg_buf);
+         }
+
+      } else {
+         /* Currently held.  Make thread i be blocked on it. */
+         vg_threads[i].status        = VgTs_WaitMX;
+         vg_threads[i].associated_cv = NULL;
+         vg_threads[i].associated_mx = mx;
+
+         if (VG_(clo_trace_pthread_level) >= 1) {
+            VG_(sprintf)(msg_buf, "%s   cv %p: BLOCK for mx %p", 
+                                  caller, cond, mx );
+            print_pthread_event(i, msg_buf);
+         }
+
+      }
+
+      n_to_release--;
+   }
+}
+
+
+static
+void do_pthread_cond_wait ( ThreadId tid,
+                            pthread_cond_t *cond, 
+                            pthread_mutex_t *mutex )
+{
+   Char msg_buf[100];
+
+   /* pre: mutex should be a valid mutex and owned by tid. */
+   if (VG_(clo_trace_pthread_level) >= 2) {
+      VG_(sprintf)(msg_buf, "pthread_cond_wait        cv %p, mx %p ...", 
+                            cond, mutex );
+      print_pthread_event(tid, msg_buf);
+   }
+
+   /* Paranoia ... */
+   vg_assert(is_valid_tid(tid) 
+             && vg_threads[tid].status == VgTs_Runnable);
+
+   if (mutex == NULL || cond == NULL) {
+      vg_threads[tid].m_edx = EINVAL;
+      return;
+   }
+
+   /* More paranoia ... */
+   switch (mutex->__m_kind) {
+      case PTHREAD_MUTEX_TIMED_NP:
+      case PTHREAD_MUTEX_RECURSIVE_NP:
+      case PTHREAD_MUTEX_ERRORCHECK_NP:
+      case PTHREAD_MUTEX_ADAPTIVE_NP:
+         if (mutex->__m_count >= 0) break;
+         /* else fall thru */
+      default:
+         vg_threads[tid].m_edx = EINVAL;
+         return;
+   }
+
+   /* Barf if we don't currently hold the mutex. */
+   if (mutex->__m_count == 0 /* nobody holds it */
+       || (ThreadId)mutex->__m_owner != tid /* we don't hold it */) {
+      vg_threads[tid].m_edx = EINVAL;
+      return;
+   }
+
+   /* Queue ourselves on the condition. */
+   vg_threads[tid].status        = VgTs_WaitCV;
+   vg_threads[tid].associated_cv = cond;
+   vg_threads[tid].associated_mx = mutex;
+
+   if (VG_(clo_trace_pthread_level) >= 1) {
+      VG_(sprintf)(msg_buf, 
+                   "pthread_cond_wait        cv %p, mx %p: BLOCK", 
+                   cond, mutex );
+      print_pthread_event(tid, msg_buf);
+   }
+
+   /* Release the mutex. */
+   release_one_thread_waiting_on_mutex ( mutex, "pthread_cond_wait " );
+}
+
+
+static
+void do_pthread_cond_signal_or_broadcast ( ThreadId tid, 
+                                           Bool broadcast,
+                                           pthread_cond_t *cond )
+{
+   Char  msg_buf[100];
+   Char* caller 
+      = broadcast ? "pthread_cond_broadcast" 
+                  : "pthread_cond_signal   ";
+
+   if (VG_(clo_trace_pthread_level) >= 2) {
+      VG_(sprintf)(msg_buf, "%s   cv %p ...", 
+                            caller, cond );
+      print_pthread_event(tid, msg_buf);
+   }
+
+   /* Paranoia ... */
+   vg_assert(is_valid_tid(tid) 
+             && vg_threads[tid].status == VgTs_Runnable);
+
+   if (cond == NULL) {
+      vg_threads[tid].m_edx = EINVAL;
+      return;
+   }
+   
+   release_N_threads_waiting_on_cond ( 
+      cond,
+      broadcast ? VG_N_THREADS : 1, 
+      caller
+   );
+
+   vg_threads[tid].m_edx = 0; /* success */
+}
+
 
 /* ---------------------------------------------------------------------
    Handle non-trivial client requests.
@@ -1882,6 +2072,30 @@ void do_nontrivial_clientreq ( ThreadId tid )
          do_pthread_cancel( tid, (pthread_t)(arg[1]) );
          break;
 
+      case VG_USERREQ__PTHREAD_EXIT:
+         do_pthread_exit( tid, (void*)(arg[1]) );
+         break;
+
+      case VG_USERREQ__PTHREAD_COND_WAIT:
+         do_pthread_cond_wait( tid, 
+                               (pthread_cond_t *)(arg[1]),
+                               (pthread_mutex_t *)(arg[2]) );
+         break;
+
+      case VG_USERREQ__PTHREAD_COND_SIGNAL:
+         do_pthread_cond_signal_or_broadcast( 
+            tid, 
+           False, /* signal, not broadcast */
+            (pthread_cond_t *)(arg[1]) );
+         break;
+
+      case VG_USERREQ__PTHREAD_COND_BROADCAST:
+         do_pthread_cond_signal_or_broadcast( 
+            tid, 
+           True, /* broadcast, not signal */
+            (pthread_cond_t *)(arg[1]) );
+         break;
+
       case VG_USERREQ__MAKE_NOACCESS:
       case VG_USERREQ__MAKE_WRITABLE:
       case VG_USERREQ__MAKE_READABLE:
@@ -1917,17 +2131,27 @@ void do_nontrivial_clientreq ( ThreadId tid )
 static
 void scheduler_sanity ( void )
 {
-   pthread_mutex_t* mutex;
+   pthread_mutex_t* mx;
+   pthread_cond_t*  cv;
    Int              i;
    /* VG_(printf)("scheduler_sanity\n"); */
    for (i = 1; i < VG_N_THREADS; i++) {
+      mx = vg_threads[i].associated_mx;
+      cv = vg_threads[i].associated_cv;
       if (vg_threads[i].status == VgTs_WaitMX) {
-         mutex = vg_threads[i].waited_on_mx;
-         vg_assert(mutex != NULL);
-         vg_assert(mutex->__m_count > 0);
-         vg_assert(is_valid_tid((ThreadId)mutex->__m_owner));
+         vg_assert(cv == NULL);
+         vg_assert(mx != NULL);
+         vg_assert(mx->__m_count > 0);
+         vg_assert(is_valid_tid((ThreadId)mx->__m_owner));
+         vg_assert(i != (ThreadId)mx->__m_owner); 
+            /* otherwise thread i would be deadlocked. */
+      } else 
+      if (vg_threads[i].status == VgTs_WaitCV) {
+         vg_assert(cv != NULL);
+         vg_assert(mx != NULL);
       } else {
-         vg_assert(vg_threads[i].waited_on_mx == NULL);
+         vg_assert(cv == NULL);
+         vg_assert(mx == NULL);
       }
    }
 }
diff --git a/tests/pth_cvsimple.c b/tests/pth_cvsimple.c
new file mode 100644 (file)
index 0000000..ba1101b
--- /dev/null
@@ -0,0 +1,84 @@
+/********************************************************
+ * An example source module to accompany...
+ *
+ * "Using POSIX Threads: Programming with Pthreads"
+ *     by Brad nichols, Dick Buttlar, Jackie Farrell
+ *     O'Reilly & Associates, Inc.
+ *
+ ********************************************************
+ *
+ * cvsimple.c
+ *
+ * Demonstrates pthread cancellation.
+ *
+ */
+
+#include <stdio.h>
+#include <pthread.h>
+
+#define NUM_THREADS  3
+#define TCOUNT 10
+#define COUNT_THRES 12
+
+int     count = 0;
+int     thread_ids[3] = {0,1,2};
+pthread_mutex_t count_lock=PTHREAD_MUTEX_INITIALIZER; 
+pthread_cond_t count_hit_threshold=PTHREAD_COND_INITIALIZER; 
+
+void *inc_count(void *idp)
+{
+  int i=0, save_state, save_type;
+  int *my_id = idp;
+
+  for (i=0; i<TCOUNT; i++) {
+    pthread_mutex_lock(&count_lock);
+    count++;
+    printf("inc_counter(): thread %d, count = %d, unlocking mutex\n", 
+          *my_id, count);
+    if (count == COUNT_THRES) {
+      printf("inc_count(): Thread %d, count %d\n", *my_id, count);
+      pthread_cond_signal(&count_hit_threshold);
+    }
+    pthread_mutex_unlock(&count_lock);
+  }
+  
+  return(NULL);
+}
+
+void *watch_count(void *idp)
+{
+  int i=0, save_state, save_type;
+  int *my_id = idp;
+
+  printf("watch_count(): thread %d\n", *my_id);
+  fflush(stdout);
+  pthread_mutex_lock(&count_lock);
+
+  while (count < COUNT_THRES) {
+    pthread_cond_wait(&count_hit_threshold, &count_lock);
+    printf("watch_count(): thread %d, count %d\n", *my_id, count);
+  }
+
+  pthread_mutex_unlock(&count_lock);
+  
+  return(NULL);
+}
+
+extern int
+main(void)
+{
+  int       i;
+  pthread_t threads[3];
+
+  pthread_create(&threads[0], NULL, inc_count, (void *)&thread_ids[0]);
+  pthread_create(&threads[1], NULL, inc_count, (void *)&thread_ids[1]);
+  pthread_create(&threads[2], NULL, watch_count, (void *)&thread_ids[2]);
+
+  for (i = 0; i < NUM_THREADS; i++) {
+    pthread_join(threads[i], NULL);
+  }
+
+  return 0;
+}
+
+
diff --git a/tests/pth_threadpool.c b/tests/pth_threadpool.c
new file mode 100644 (file)
index 0000000..30f1595
--- /dev/null
@@ -0,0 +1,395 @@
+/********************************************************
+ * An example source module to accompany...
+ *
+ * "Using POSIX Threads: Programming with Pthreads"
+ *     by Brad nichols, Dick Buttlar, Jackie Farrell
+ *     O'Reilly & Associates, Inc.
+ *
+ ********************************************************
+ * tpool.c -- 
+ * 
+ * Example thread pooling library
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <string.h>
+
+#include <pthread.h>
+
+
+/********************************************************
+ * An example source module to accompany...
+ *
+ * "Using POSIX Threads: Programming with Pthreads"
+ *     by Brad nichols, Dick Buttlar, Jackie Farrell
+ *     O'Reilly & Associates, Inc.
+ *
+ ********************************************************
+ * tpool.h --
+ *
+ * Structures for thread pool
+ */
+
+typedef struct tpool_work {
+       void               (*routine)();
+       void                *arg;
+       struct tpool_work   *next;
+} tpool_work_t;
+
+typedef struct tpool {
+       /* pool characteristics */
+       int                 num_threads;
+        int                 max_queue_size;
+        int                 do_not_block_when_full;
+        /* pool state */
+       pthread_t           *threads;
+        int                 cur_queue_size;
+       tpool_work_t        *queue_head;
+       tpool_work_t        *queue_tail;
+       int                 queue_closed;
+        int                 shutdown;
+       /* pool synchronization */
+        pthread_mutex_t     queue_lock;
+        pthread_cond_t      queue_not_empty;
+        pthread_cond_t      queue_not_full;
+       pthread_cond_t      queue_empty;
+} *tpool_t;
+
+void tpool_init(
+           tpool_t          *tpoolp,
+           int              num_threads, 
+           int              max_queue_size,
+           int              do_not_block_when_full);
+
+int tpool_add_work(
+           tpool_t          tpool,
+           void             (*routine)(),
+          void             *arg);
+
+int tpool_destroy(
+           tpool_t          tpool,
+           int              finish);
+
+
+/*-- end of tpool.h ----------------------------------*/
+
+
+void *tpool_thread(void *);
+
+void tpool_init(tpool_t   *tpoolp,
+               int       num_worker_threads, 
+               int       max_queue_size,
+               int       do_not_block_when_full)
+{
+  int i, rtn;
+  tpool_t tpool;
+   
+  /* allocate a pool data structure */ 
+  if ((tpool = (tpool_t )malloc(sizeof(struct tpool))) == NULL)
+    perror("malloc"), exit(1);
+
+  /* initialize th fields */
+  tpool->num_threads = num_worker_threads;
+  tpool->max_queue_size = max_queue_size;
+  tpool->do_not_block_when_full = do_not_block_when_full;
+  if ((tpool->threads = 
+       (pthread_t *)malloc(sizeof(pthread_t)*num_worker_threads)) 
+      == NULL)
+    perror("malloc"), exit(1);
+  tpool->cur_queue_size = 0;
+  tpool->queue_head = NULL; 
+  tpool->queue_tail = NULL;
+  tpool->queue_closed = 0;  
+  tpool->shutdown = 0; 
+  if ((rtn = pthread_mutex_init(&(tpool->queue_lock), NULL)) != 0)
+    fprintf(stderr,"pthread_mutex_init %s\n",strerror(rtn)), exit(1);
+  if ((rtn = pthread_cond_init(&(tpool->queue_not_empty), NULL)) != 0)
+    fprintf(stderr,"pthread_cond_init %s\n",strerror(rtn)), exit(1);
+  if ((rtn = pthread_cond_init(&(tpool->queue_not_full), NULL)) != 0)
+    fprintf(stderr,"pthread_cond_init %s\n",strerror(rtn)), exit(1);
+  if ((rtn = pthread_cond_init(&(tpool->queue_empty), NULL)) != 0)
+    fprintf(stderr,"pthread_cond_init %s\n",strerror(rtn)), exit(1);
+
+  /* create threads */
+  for (i = 0; i != num_worker_threads; i++) {
+    if ((rtn = pthread_create( &(tpool->threads[i]),
+                             NULL,
+                             tpool_thread,
+                             (void *)tpool)) != 0)
+      fprintf(stderr,"pthread_create %d\n",rtn), exit(1);
+  }
+
+  *tpoolp = tpool;
+}
+
+int tpool_add_work(
+                  tpool_t          tpool,
+                  void             (*routine)(),
+                  void             *arg)
+{
+  int rtn;
+  tpool_work_t *workp;
+
+  if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
+    fprintf(stderr,"pthread_mutex_lock %d\n",rtn), exit(1);
+
+  /* no space and this caller doesn't want to wait */
+  if ((tpool->cur_queue_size == tpool->max_queue_size) &&
+      tpool->do_not_block_when_full) {
+    if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
+      fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
+
+    return -1;
+  }
+
+  while( (tpool->cur_queue_size == tpool->max_queue_size) &&
+       (!(tpool->shutdown || tpool->queue_closed))  ) {
+
+    if ((rtn = pthread_cond_wait(&(tpool->queue_not_full),
+                                &(tpool->queue_lock))) != 0)
+      fprintf(stderr,"pthread_cond_waitA %d\n",rtn), exit(1);
+
+  }
+
+  /* the pool is in the process of being destroyed */
+  if (tpool->shutdown || tpool->queue_closed) {
+    if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
+      fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
+    return -1;
+  }
+
+
+  /* allocate work structure */
+  if ((workp = (tpool_work_t *)malloc(sizeof(tpool_work_t))) == NULL)
+    perror("malloc"), exit(1);
+  workp->routine = routine;
+  workp->arg = arg;
+  workp->next = NULL;
+
+  printf("adder: adding an item %d\n", workp->routine);
+
+  if (tpool->cur_queue_size == 0) {
+    tpool->queue_tail = tpool->queue_head = workp;
+
+     printf("adder: queue == 0, waking all workers\n");
+
+    if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_empty))) != 0)
+      fprintf(stderr,"pthread_cond_signal %d\n",rtn), exit(1);;
+  } else {
+    tpool->queue_tail->next = workp;
+    tpool->queue_tail = workp;
+  }
+
+  tpool->cur_queue_size++; 
+  if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
+    fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
+  return 1;
+}
+
+int tpool_destroy(tpool_t          tpool,
+                 int              finish)
+{
+  int          i,rtn;
+  tpool_work_t *cur_nodep;
+  
+
+  if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
+    fprintf(stderr,"pthread_mutex_lock %d\n",rtn), exit(1);
+
+  /* Is a shutdown already in progress? */
+  if (tpool->queue_closed || tpool->shutdown) {
+    if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
+      fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
+    return 0;
+  }
+
+  tpool->queue_closed = 1;
+
+  /* If the finish flag is set, wait for workers to 
+     drain queue */ 
+  if (finish == 1) {
+    while (tpool->cur_queue_size != 0) {
+      if ((rtn = pthread_cond_wait(&(tpool->queue_empty),
+                                  &(tpool->queue_lock))) != 0)
+       fprintf(stderr,"pthread_cond_waitB %d\n",rtn), exit(1);
+    }
+  }
+
+  tpool->shutdown = 1;
+
+  if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
+    fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
+
+
+  /* Wake up any workers so they recheck shutdown flag */
+  if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_empty))) != 0)
+    fprintf(stderr,"pthread_cond_broadcast %d\n",rtn), exit(1);
+  if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_full))) != 0)
+    fprintf(stderr,"pthread_cond_broadcast %d\n",rtn), exit(1);
+
+
+  /* Wait for workers to exit */
+  for(i=0; i < tpool->num_threads; i++) {
+    if ((rtn = pthread_join(tpool->threads[i],NULL)) != 0)
+      fprintf(stderr,"pthread_join %d\n",rtn), exit(1);
+  }
+
+  /* Now free pool structures */
+  free(tpool->threads);
+  while(tpool->queue_head != NULL) {
+    cur_nodep = tpool->queue_head->next; 
+    tpool->queue_head = tpool->queue_head->next;
+    free(cur_nodep);
+  }
+  free(tpool); 
+}
+
+void *tpool_thread(void *arg)
+{
+  tpool_t tpool = (tpool_t)arg; 
+  int rtn;
+  tpool_work_t *my_workp;
+       
+  for(;;) {
+
+
+
+    /* Check queue for work */ 
+    if ((rtn = pthread_mutex_lock(&(tpool->queue_lock))) != 0)
+      fprintf(stderr,"pthread_mutex_lock %d\n",rtn), exit(1);
+
+    while ((tpool->cur_queue_size == 0) && (!tpool->shutdown)) {
+
+
+      printf("worker %d: I'm sleeping again\n", pthread_self());
+
+      if ((rtn = pthread_cond_wait(&(tpool->queue_not_empty),
+                                  &(tpool->queue_lock))) != 0)
+       fprintf(stderr,"pthread_cond_waitC %d\n",rtn), exit(1);
+
+    }
+    sleep(1); 
+    printf("worker %d: I'm awake\n", pthread_self());
+
+    /* Has a shutdown started while i was sleeping? */
+    if (tpool->shutdown == 1) {
+
+      if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
+       fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
+      pthread_exit(NULL);
+    }
+
+
+    /* Get to work, dequeue the next item */ 
+    my_workp = tpool->queue_head;
+    tpool->cur_queue_size--;
+    if (tpool->cur_queue_size == 0)
+      tpool->queue_head = tpool->queue_tail = NULL;
+    else
+      tpool->queue_head = my_workp->next;
+    printf("worker %d: dequeing item %d\n", pthread_self(), my_workp->next);
+
+    /* Handle waiting add_work threads */
+    if ((!tpool->do_not_block_when_full) &&
+       (tpool->cur_queue_size ==  (tpool->max_queue_size - 1))) 
+
+      if ((rtn = pthread_cond_broadcast(&(tpool->queue_not_full))) != 0)
+       fprintf(stderr,"pthread_cond_broadcast %d\n",rtn), exit(1);
+
+    /* Handle waiting destroyer threads */
+    if (tpool->cur_queue_size == 0)
+
+      if ((rtn = pthread_cond_signal(&(tpool->queue_empty))) != 0)
+       fprintf(stderr,"pthread_cond_signal %d\n",rtn), exit(1);
+
+    if ((rtn = pthread_mutex_unlock(&(tpool->queue_lock))) != 0)
+      fprintf(stderr,"pthread_mutex_unlock %d\n",rtn), exit(1);
+      
+    /* Do this work item */
+    (*(my_workp->routine))(my_workp->arg);
+    free(my_workp);
+  } 
+  return(NULL);            
+}
+
+
+/********************************************************
+ * An example source module to accompany...
+ *
+ * "Using POSIX Threads: Programming with Pthreads"
+ *     by Brad nichols, Dick Buttlar, Jackie Farrell
+ *     O'Reilly & Associates, Inc.
+ *
+ ********************************************************
+ * tpool.c -- 
+ * 
+ * Example caller for thread pooling library
+ */
+
+char *s1[20]={  "STRING 0",
+               "STRING 1",
+                "STRING 2",
+                "STRING 3",
+                "STRING 4",
+                "STRING 5",
+                "STRING 6",
+                "STRING 7",
+                "STRING 8",
+                "STRING 9",
+               "STRING 10",
+                "STRING 11",
+                "STRING 12",
+                "STRING 13",
+                "STRING 14",
+                "STRING 15",
+                "STRING 16",
+                "STRING 17",
+                "STRING 18",
+                "STRING 19"};
+
+void r1(char * printstring)
+{
+   int i, x;
+
+   printf("%s START\n", printstring);
+
+   for (i = 0; i < 1000000; i++)  {
+       x = x +i;
+   }
+
+   printf("%s DONE\n", printstring);
+}
+
+extern int
+main(void)
+{
+   extern char *s1[];
+
+   pthread_t t1,t2;
+   int i;  
+
+   tpool_t test_pool;
+
+   tpool_init(&test_pool, 10, 20, 0);
+
+   sleep(1);
+
+   for ( i = 0; i < 5; i++) {
+          printf("tpool_add_work returned %d\n",
+               tpool_add_work(test_pool, r1, s1[i]));
+
+   }
+
+   printf("main: all work queued\n"); 
+
+   tpool_destroy(test_pool, 1); 
+
+   return 0;
+   
+}  
index 1c72ac0d0595cdcc196bd4dd6f1a94352ad54992..bdb60fc47801cd4a658f05d330ae39fb9f68f1df 100644 (file)
    scheduler algorithms is surely O(N^2) in the number of threads,
    since that's simple, at least.  And (in practice) we hope that most
    programs do not need many threads. */
-#define VG_N_THREADS 10
+#define VG_N_THREADS 20
 
 /* Number of file descriptors that can simultaneously be waited on for
    I/O to complete.  Perhaps this should be the same as VG_N_THREADS
@@ -397,9 +397,13 @@ extern Bool  VG_(is_empty_arena) ( ArenaId aid );
 #define VG_USERREQ__PTHREAD_CREATE          0x3001
 #define VG_USERREQ__PTHREAD_JOIN            0x3002
 #define VG_USERREQ__PTHREAD_GET_THREADID    0x3003
-#define VG_USERREQ__PTHREAD_MUTEX_LOCK      0x3005
-#define VG_USERREQ__PTHREAD_MUTEX_UNLOCK    0x3006
-#define VG_USERREQ__PTHREAD_CANCEL          0x3008
+#define VG_USERREQ__PTHREAD_MUTEX_LOCK      0x3004
+#define VG_USERREQ__PTHREAD_MUTEX_UNLOCK    0x3005
+#define VG_USERREQ__PTHREAD_CANCEL          0x3006
+#define VG_USERREQ__PTHREAD_EXIT            0x3007
+#define VG_USERREQ__PTHREAD_COND_WAIT       0x3008
+#define VG_USERREQ__PTHREAD_COND_SIGNAL     0x3009
+#define VG_USERREQ__PTHREAD_COND_BROADCAST  0x300A
 
 /* Cosmetic ... */
 #define VG_USERREQ__GET_PTHREAD_TRACE_LEVEL 0x3101
@@ -445,6 +449,7 @@ typedef
       VgTs_WaitJoinee, /* waiting for the thread I did join on */
       VgTs_WaitFD,     /* waiting for I/O completion on a fd */
       VgTs_WaitMX,     /* waiting on a mutex */
+      VgTs_WaitCV,     /* waiting on a condition variable */
       VgTs_Sleeping    /* sleeping for a while */
    }
    ThreadStatus;
@@ -467,9 +472,15 @@ typedef
          VG_INVALID_THREADID if no one asked to join yet. */
       ThreadId joiner;
 
-      /* When .status == WaitMX, points to the mutex I am waiting
-         for. */
-      void* /* pthread_mutex_t* */ waited_on_mx;
+      /* When .status == WaitMX, points to the mutex I am waiting for.
+         When .status == WaitCV, points to the mutex associated with
+         the condition variable indicated by the .associated_cv field.
+         In all other cases, should be NULL. */
+      void* /* pthread_mutex_t* */ associated_mx;
+
+      /* When .status == WaitCV, points to the condition variable I am
+         waiting for.  In all other cases, should be NULL. */
+      void* /* pthread_cond_t* */ associated_cv;
 
       /* If VgTs_Sleeping, this is when we should wake up. */
       ULong awaken_at;
index 435b7e292a505153a5f72819b08c3ef6872ef141..3ecd6f1145b73a3549e19ef31be1aaf6427b41b0 100644 (file)
@@ -87,6 +87,7 @@ void ensure_valgrind ( char* caller )
 
 
 static
+__attribute__((noreturn))
 void barf ( char* str )
 {
    char buf[100];
@@ -96,6 +97,8 @@ void barf ( char* str )
    strcat(buf, "\n\n");
    write(2, buf, strlen(buf));
    myexit(1);
+   /* We have to persuade gcc into believing this doesn't return. */
+   while (1) { };
 }
 
 
@@ -175,6 +178,18 @@ pthread_join (pthread_t __th, void **__thread_return)
 }
 
 
+void pthread_exit(void *retval)
+{
+   int res;
+   ensure_valgrind("pthread_exit");
+   VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+                           VG_USERREQ__PTHREAD_EXIT,
+                           retval, 0, 0, 0);
+   /* Doesn't return! */
+   /* However, we have to fool gcc into knowing that. */
+   barf("pthread_exit: still alive after request?!");
+}
+
 
 static int thread_specific_errno[VG_N_THREADS];
 
@@ -324,6 +339,36 @@ int pthread_setschedparam(pthread_t target_thread,
    return 0;
 }
 
+int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
+{
+   int res;
+   ensure_valgrind("pthread_cond_wait");
+   VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+                           VG_USERREQ__PTHREAD_COND_WAIT,
+                          cond, mutex, 0, 0);
+   return res;
+}
+
+int pthread_cond_signal(pthread_cond_t *cond)
+{
+   int res;
+   ensure_valgrind("pthread_cond_signal");
+   VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+                           VG_USERREQ__PTHREAD_COND_SIGNAL,
+                          cond, 0, 0, 0);
+   return res;
+}
+
+int pthread_cond_broadcast(pthread_cond_t *cond)
+{
+   int res;
+   ensure_valgrind("pthread_cond_broadcast");
+   VALGRIND_MAGIC_SEQUENCE(res, 0 /* default */,
+                           VG_USERREQ__PTHREAD_COND_BROADCAST,
+                          cond, 0, 0, 0);
+   return res;
+}
+
 
 /* ---------------------------------------------------
    CANCELLATION
index 194c231236736f7c3ed09205683653032c2e666b..da8143cbee13229c0c967af1d7659427e2769edb 100644 (file)
@@ -188,9 +188,12 @@ void VG_(pp_sched_status) ( void )
          case VgTs_WaitJoinee: VG_(printf)("WaitJoinee"); break;
          case VgTs_Sleeping:   VG_(printf)("Sleeping"); break;
          case VgTs_WaitMX:     VG_(printf)("WaitMX"); break;
+         case VgTs_WaitCV:     VG_(printf)("WaitCV"); break;
          default: VG_(printf)("???"); break;
       }
-      VG_(printf)(", waited_on_mx = %p\n", vg_threads[i].waited_on_mx );
+      VG_(printf)(", associated_mx = %p, associated_cv = %p\n", 
+                  vg_threads[i].associated_mx,
+                  vg_threads[i].associated_cv );
       VG_(pp_ExeContext)( 
          VG_(get_ExeContext)( False, vg_threads[i].m_eip, 
                                      vg_threads[i].m_ebp ));
@@ -513,10 +516,11 @@ void VG_(scheduler_init) ( void )
    tid_main = vg_alloc_ThreadState();
    vg_assert(tid_main == 1); 
 
-   vg_threads[tid_main].status       = VgTs_Runnable;
-   vg_threads[tid_main].joiner       = VG_INVALID_THREADID;
-   vg_threads[tid_main].waited_on_mx = NULL;
-   vg_threads[tid_main].retval       = NULL; /* not important */
+   vg_threads[tid_main].status        = VgTs_Runnable;
+   vg_threads[tid_main].joiner        = VG_INVALID_THREADID;
+   vg_threads[tid_main].associated_mx = NULL;
+   vg_threads[tid_main].associated_cv = NULL;
+   vg_threads[tid_main].retval        = NULL; /* not important */
    vg_threads[tid_main].stack_highest_word 
       = vg_threads[tid_main].m_esp /* -4  ??? */;
 
@@ -1140,7 +1144,7 @@ VgSchedReturnCode VG_(scheduler) ( void )
          aim is not to do too many of Phase 1 since it is expensive.  */
 
       if (0)
-         VG_(printf)("SCHED: tid %d, used %d\n", tid, VG_N_THREADS);
+         VG_(printf)("SCHED: tid %d\n", tid);
 
       /* Figure out how many bbs to ask vg_run_innerloop to do.  Note
          that it decrements the counter before testing it for zero, so
@@ -1363,6 +1367,23 @@ void do_pthread_cancel ( ThreadId  tid_canceller,
 }
 
 
+static
+void do_pthread_exit ( ThreadId tid, void* retval )
+{
+   Char msg_buf[100];
+   /* We want make is appear that this thread has returned to
+      do_pthread_create_bogusRA with retval as the
+      return value.  So: simple: put retval into %EAX
+      and &do_pthread_create_bogusRA into %EIP and keep going! */
+   if (VG_(clo_trace_sched)) {
+      VG_(sprintf)(msg_buf, "exiting with %p", retval);
+      print_sched_event(tid, msg_buf);
+   }
+   vg_threads[tid].m_eax  = (UInt)retval;
+   vg_threads[tid].m_eip  = (UInt)&VG_(pthreadreturn_bogusRA);
+   vg_threads[tid].status = VgTs_Runnable;
+}
+
 
 /* Thread tid is exiting, by returning from the function it was
    created with.  Or possibly due to pthread_exit or cancellation.
@@ -1581,9 +1602,10 @@ void do_pthread_create ( ThreadId parent_tid,
    // ***** CHECK *thread is writable
    *thread = (pthread_t)tid;
 
-   vg_threads[tid].waited_on_mx = NULL;
-   vg_threads[tid].joiner       = VG_INVALID_THREADID;
-   vg_threads[tid].status       = VgTs_Runnable;
+   vg_threads[tid].associated_mx = NULL;
+   vg_threads[tid].associated_cv = NULL;
+   vg_threads[tid].joiner        = VG_INVALID_THREADID;
+   vg_threads[tid].status        = VgTs_Runnable;
 
    /* return zero */
    vg_threads[tid].m_edx  = 0; /* success */
@@ -1638,6 +1660,47 @@ void do_pthread_create ( ThreadId parent_tid,
    deals with that for us.  
 */
 
+/* Helper fns ... */
+static
+void release_one_thread_waiting_on_mutex ( pthread_mutex_t* mutex, 
+                                           Char* caller )
+{
+   Int  i;
+   Char msg_buf[100];
+
+   /* Find some arbitrary thread waiting on this mutex, and make it
+      runnable.  If none are waiting, mark the mutex as not held. */
+   for (i = 1; i < VG_N_THREADS; i++) {
+      if (vg_threads[i].status == VgTs_Empty) 
+         continue;
+      if (vg_threads[i].status == VgTs_WaitMX 
+          && vg_threads[i].associated_mx == mutex)
+         break;
+   }
+
+   vg_assert(i <= VG_N_THREADS);
+   if (i == VG_N_THREADS) {
+      /* Nobody else is waiting on it. */
+      mutex->__m_count = 0;
+      mutex->__m_owner = VG_INVALID_THREADID;
+   } else {
+      /* Notionally transfer the hold to thread i, whose
+         pthread_mutex_lock() call now returns with 0 (success). */
+      /* The .count is already == 1. */
+      vg_assert(vg_threads[i].associated_mx == mutex);
+      mutex->__m_owner = (_pthread_descr)i;
+      vg_threads[i].status        = VgTs_Runnable;
+      vg_threads[i].associated_mx = NULL;
+      vg_threads[i].m_edx         = 0; /* pth_lock() success */
+
+      if (VG_(clo_trace_pthread_level) >= 1) {
+         VG_(sprintf)(msg_buf, "%s       mx %p: RESUME", 
+                               caller, mutex );
+         print_pthread_event(i, msg_buf);
+      }
+   }
+}
+
 
 static
 void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex )
@@ -1645,7 +1708,7 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex )
    Char msg_buf[100];
 
    if (VG_(clo_trace_pthread_level) >= 2) {
-      VG_(sprintf)(msg_buf, "pthread_mutex_lock   %p", mutex );
+      VG_(sprintf)(msg_buf, "pthread_mutex_lock       mx %p ...", mutex );
       print_pthread_event(tid, msg_buf);
    }
 
@@ -1683,7 +1746,7 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex )
             /* return 0 (success). */
             mutex->__m_count++;
             vg_threads[tid].m_edx = 0;
-           VG_(printf)("!!!!!! tid %d, mutex %p -> locked %d\n", 
+           VG_(printf)("!!!!!! tid %d, mx %p -> locked %d\n", 
                         tid, mutex, mutex->__m_count);
             return;
          } else {
@@ -1693,11 +1756,11 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex )
       } else {
          /* Someone else has it; we have to wait.  Mark ourselves
             thusly. */
-         vg_threads[tid].status       = VgTs_WaitMX;
-         vg_threads[tid].waited_on_mx = mutex;
+         vg_threads[tid].status        = VgTs_WaitMX;
+         vg_threads[tid].associated_mx = mutex;
          /* No assignment to %EDX, since we're blocking. */
          if (VG_(clo_trace_pthread_level) >= 1) {
-            VG_(sprintf)(msg_buf, "pthread_mutex_lock   %p: BLOCK", 
+            VG_(sprintf)(msg_buf, "pthread_mutex_lock       mx %p: BLOCK", 
                                   mutex );
             print_pthread_event(tid, msg_buf);
          }
@@ -1710,7 +1773,7 @@ void do_pthread_mutex_lock( ThreadId tid, pthread_mutex_t *mutex )
       /* We get it! [for the first time]. */
       mutex->__m_count = 1;
       mutex->__m_owner = (_pthread_descr)tid;
-      vg_assert(vg_threads[tid].waited_on_mx == NULL);
+      vg_assert(vg_threads[tid].associated_mx == NULL);
       /* return 0 (success). */
       vg_threads[tid].m_edx = 0;
    }
@@ -1722,11 +1785,10 @@ static
 void do_pthread_mutex_unlock ( ThreadId tid,
                                pthread_mutex_t *mutex )
 {
-   Int      i;
-   Char     msg_buf[100];
+   Char msg_buf[100];
 
    if (VG_(clo_trace_pthread_level) >= 2) {
-      VG_(sprintf)(msg_buf, "pthread_mutex_unlock %p", mutex );
+      VG_(sprintf)(msg_buf, "pthread_mutex_unlock     mx %p ...", mutex );
       print_pthread_event(tid, msg_buf);
    }
 
@@ -1773,40 +1835,10 @@ void do_pthread_mutex_unlock ( ThreadId tid,
    vg_assert(mutex->__m_count == 1);
    vg_assert((ThreadId)mutex->__m_owner == tid);
 
-   /* Find some arbitrary thread waiting on this mutex, and make it
-      runnable.  If none are waiting, mark the mutex as not held. */
-   for (i = 1; i < VG_N_THREADS; i++) {
-      if (vg_threads[i].status == VgTs_Empty) 
-         continue;
-      if (vg_threads[i].status == VgTs_WaitMX 
-          && vg_threads[i].waited_on_mx == mutex)
-         break;
-   }
-
-   vg_assert(i <= VG_N_THREADS);
-   if (i == VG_N_THREADS) {
-      /* Nobody else is waiting on it. */
-      mutex->__m_count = 0;
-      mutex->__m_owner = VG_INVALID_THREADID;
-   } else {
-      /* Notionally transfer the hold to thread i, whose
-         pthread_mutex_lock() call now returns with 0 (success). */
-      /* The .count is already == 1. */
-      vg_assert(vg_threads[i].waited_on_mx == mutex);
-      mutex->__m_owner = (_pthread_descr)i;
-      vg_threads[i].status       = VgTs_Runnable;
-      vg_threads[i].waited_on_mx = NULL;
-      vg_threads[i].m_edx = 0; /* pth_lock() success */
-
-      if (VG_(clo_trace_pthread_level) >= 1) {
-         VG_(sprintf)(msg_buf, "pthread_mutex_lock   %p: RESUME", 
-                               mutex );
-         print_pthread_event(i, msg_buf);
-      }
-   }
+   /* Release at max one thread waiting on this mutex. */
+   release_one_thread_waiting_on_mutex ( mutex, "pthread_mutex_lock" );
 
-   /* In either case, our (tid's) pth_unlock() returns with 0
-      (success). */
+   /* Our (tid's) pth_unlock() returns with 0 (success). */
    vg_threads[tid].m_edx = 0; /* Success. */
 }
 
@@ -1833,15 +1865,173 @@ void do_pthread_mutex_unlock ( ThreadId tid,
 
    #define PTHREAD_COND_INITIALIZER {__LOCK_INITIALIZER, 0}
 
-   We'll just use the __c_waiting field to point to the head of the
-   list of threads waiting on this condition.  Note how the static
-   initialiser has __c_waiting == 0 == VG_INVALID_THREADID.
+   We don't use any fields of pthread_cond_t for anything at all.
+   Only the identity of the CVs is important.
 
    Linux pthreads supports no attributes on condition variables, so we
-   don't need to think too hard there.  
-*/
+   don't need to think too hard there.  */
 
 
+static
+void release_N_threads_waiting_on_cond ( pthread_cond_t* cond, 
+                                         Int n_to_release, 
+                                         Char* caller )
+{
+   Int              i;
+   Char             msg_buf[100];
+   pthread_mutex_t* mx;
+
+   while (True) {
+      if (n_to_release == 0)
+         return;
+
+      /* Find a thread waiting on this CV. */
+      for (i = 1; i < VG_N_THREADS; i++) {
+         if (vg_threads[i].status == VgTs_Empty) 
+            continue;
+         if (vg_threads[i].status == VgTs_WaitCV 
+             && vg_threads[i].associated_cv == cond)
+            break;
+      }
+      vg_assert(i <= VG_N_THREADS);
+
+      if (i == VG_N_THREADS) {
+         /* Nobody else is waiting on it. */
+         return;
+      }
+
+      mx = vg_threads[i].associated_mx;
+      vg_assert(mx != NULL);
+
+      if (mx->__m_owner == VG_INVALID_THREADID) {
+         /* Currently unheld; hand it out to thread i. */
+         vg_assert(mx->__m_count == 0);
+         vg_threads[i].status        = VgTs_Runnable;
+         vg_threads[i].associated_cv = NULL;
+         vg_threads[i].associated_mx = NULL;
+         mx->__m_owner = (_pthread_descr)i;
+         mx->__m_count = 1;
+         vg_threads[i].m_edx = 0; /* pthread_cond_wait returns success */
+
+         if (VG_(clo_trace_pthread_level) >= 1) {
+            VG_(sprintf)(msg_buf, "%s   cv %p: RESUME with mx %p", 
+                                  caller, cond, mx );
+            print_pthread_event(i, msg_buf);
+         }
+
+      } else {
+         /* Currently held.  Make thread i be blocked on it. */
+         vg_threads[i].status        = VgTs_WaitMX;
+         vg_threads[i].associated_cv = NULL;
+         vg_threads[i].associated_mx = mx;
+
+         if (VG_(clo_trace_pthread_level) >= 1) {
+            VG_(sprintf)(msg_buf, "%s   cv %p: BLOCK for mx %p", 
+                                  caller, cond, mx );
+            print_pthread_event(i, msg_buf);
+         }
+
+      }
+
+      n_to_release--;
+   }
+}
+
+
+static
+void do_pthread_cond_wait ( ThreadId tid,
+                            pthread_cond_t *cond, 
+                            pthread_mutex_t *mutex )
+{
+   Char msg_buf[100];
+
+   /* pre: mutex should be a valid mutex and owned by tid. */
+   if (VG_(clo_trace_pthread_level) >= 2) {
+      VG_(sprintf)(msg_buf, "pthread_cond_wait        cv %p, mx %p ...", 
+                            cond, mutex );
+      print_pthread_event(tid, msg_buf);
+   }
+
+   /* Paranoia ... */
+   vg_assert(is_valid_tid(tid) 
+             && vg_threads[tid].status == VgTs_Runnable);
+
+   if (mutex == NULL || cond == NULL) {
+      vg_threads[tid].m_edx = EINVAL;
+      return;
+   }
+
+   /* More paranoia ... */
+   switch (mutex->__m_kind) {
+      case PTHREAD_MUTEX_TIMED_NP:
+      case PTHREAD_MUTEX_RECURSIVE_NP:
+      case PTHREAD_MUTEX_ERRORCHECK_NP:
+      case PTHREAD_MUTEX_ADAPTIVE_NP:
+         if (mutex->__m_count >= 0) break;
+         /* else fall thru */
+      default:
+         vg_threads[tid].m_edx = EINVAL;
+         return;
+   }
+
+   /* Barf if we don't currently hold the mutex. */
+   if (mutex->__m_count == 0 /* nobody holds it */
+       || (ThreadId)mutex->__m_owner != tid /* we don't hold it */) {
+      vg_threads[tid].m_edx = EINVAL;
+      return;
+   }
+
+   /* Queue ourselves on the condition. */
+   vg_threads[tid].status        = VgTs_WaitCV;
+   vg_threads[tid].associated_cv = cond;
+   vg_threads[tid].associated_mx = mutex;
+
+   if (VG_(clo_trace_pthread_level) >= 1) {
+      VG_(sprintf)(msg_buf, 
+                   "pthread_cond_wait        cv %p, mx %p: BLOCK", 
+                   cond, mutex );
+      print_pthread_event(tid, msg_buf);
+   }
+
+   /* Release the mutex. */
+   release_one_thread_waiting_on_mutex ( mutex, "pthread_cond_wait " );
+}
+
+
+static
+void do_pthread_cond_signal_or_broadcast ( ThreadId tid, 
+                                           Bool broadcast,
+                                           pthread_cond_t *cond )
+{
+   Char  msg_buf[100];
+   Char* caller 
+      = broadcast ? "pthread_cond_broadcast" 
+                  : "pthread_cond_signal   ";
+
+   if (VG_(clo_trace_pthread_level) >= 2) {
+      VG_(sprintf)(msg_buf, "%s   cv %p ...", 
+                            caller, cond );
+      print_pthread_event(tid, msg_buf);
+   }
+
+   /* Paranoia ... */
+   vg_assert(is_valid_tid(tid) 
+             && vg_threads[tid].status == VgTs_Runnable);
+
+   if (cond == NULL) {
+      vg_threads[tid].m_edx = EINVAL;
+      return;
+   }
+   
+   release_N_threads_waiting_on_cond ( 
+      cond,
+      broadcast ? VG_N_THREADS : 1, 
+      caller
+   );
+
+   vg_threads[tid].m_edx = 0; /* success */
+}
+
 
 /* ---------------------------------------------------------------------
    Handle non-trivial client requests.
@@ -1882,6 +2072,30 @@ void do_nontrivial_clientreq ( ThreadId tid )
          do_pthread_cancel( tid, (pthread_t)(arg[1]) );
          break;
 
+      case VG_USERREQ__PTHREAD_EXIT:
+         do_pthread_exit( tid, (void*)(arg[1]) );
+         break;
+
+      case VG_USERREQ__PTHREAD_COND_WAIT:
+         do_pthread_cond_wait( tid, 
+                               (pthread_cond_t *)(arg[1]),
+                               (pthread_mutex_t *)(arg[2]) );
+         break;
+
+      case VG_USERREQ__PTHREAD_COND_SIGNAL:
+         do_pthread_cond_signal_or_broadcast( 
+            tid, 
+           False, /* signal, not broadcast */
+            (pthread_cond_t *)(arg[1]) );
+         break;
+
+      case VG_USERREQ__PTHREAD_COND_BROADCAST:
+         do_pthread_cond_signal_or_broadcast( 
+            tid, 
+           True, /* broadcast, not signal */
+            (pthread_cond_t *)(arg[1]) );
+         break;
+
       case VG_USERREQ__MAKE_NOACCESS:
       case VG_USERREQ__MAKE_WRITABLE:
       case VG_USERREQ__MAKE_READABLE:
@@ -1917,17 +2131,27 @@ void do_nontrivial_clientreq ( ThreadId tid )
 static
 void scheduler_sanity ( void )
 {
-   pthread_mutex_t* mutex;
+   pthread_mutex_t* mx;
+   pthread_cond_t*  cv;
    Int              i;
    /* VG_(printf)("scheduler_sanity\n"); */
    for (i = 1; i < VG_N_THREADS; i++) {
+      mx = vg_threads[i].associated_mx;
+      cv = vg_threads[i].associated_cv;
       if (vg_threads[i].status == VgTs_WaitMX) {
-         mutex = vg_threads[i].waited_on_mx;
-         vg_assert(mutex != NULL);
-         vg_assert(mutex->__m_count > 0);
-         vg_assert(is_valid_tid((ThreadId)mutex->__m_owner));
+         vg_assert(cv == NULL);
+         vg_assert(mx != NULL);
+         vg_assert(mx->__m_count > 0);
+         vg_assert(is_valid_tid((ThreadId)mx->__m_owner));
+         vg_assert(i != (ThreadId)mx->__m_owner); 
+            /* otherwise thread i would be deadlocked. */
+      } else 
+      if (vg_threads[i].status == VgTs_WaitCV) {
+         vg_assert(cv != NULL);
+         vg_assert(mx != NULL);
       } else {
-         vg_assert(vg_threads[i].waited_on_mx == NULL);
+         vg_assert(cv == NULL);
+         vg_assert(mx == NULL);
       }
    }
 }