]> git.ipfire.org Git - thirdparty/ntp.git/commitdiff
[Bug 2954] Version 4.2.8p4 crashes on startup with sig fault
authorJuergen Perlinger <perlinger@ntp.org>
Sat, 31 Oct 2015 18:04:18 +0000 (18:04 +0000)
committerJuergen Perlinger <perlinger@ntp.org>
Sat, 31 Oct 2015 18:04:18 +0000 (18:04 +0000)
 - fixed race conditions between worker and main thread in DNS worker

bk: 563502a27rHB3d5_eb7CW5hDHEv3eQ

ChangeLog
include/ntp_worker.h
libntp/work_thread.c

index 5d8346b310b070472942fed74a146b9c4972df08..c8ac4434458dc9716643f0d9d169c887de8ea081 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,4 +1,7 @@
 ---
+* [Bug 2954] Version 4.2.8p4 crashes on startup with sig fault
+  - fixed data race conditions in threaded DNS worker. perlinger@ntp.org
+---
 (4.2.8p4) 2015/10/21 Released by Harlan Stenn <stenn@ntp.org>
 (4.2.8p4-RC1) 2015/10/06 Released by Harlan Stenn <stenn@ntp.org>
 
index f7e8d5be70865d6451af9c2f60b4f06dad204a3c..50616b3df7dd7916d2bc3b504246861e89adef49 100644 (file)
@@ -43,19 +43,22 @@ typedef struct blocking_pipe_header_tag {
 } blocking_pipe_header;
 
 # ifdef WORK_THREAD
-#  ifdef WORK_PIPE
-typedef pthread_t *    thr_ref;
-typedef sem_t *                sem_ref;
+#  ifdef SYS_WINNT
+typedef struct { HANDLE thnd; } thread_type;
+typedef struct { HANDLE shnd; } sema_type;
 #  else
-typedef HANDLE         thr_ref;
-typedef HANDLE         sem_ref;
+typedef pthread_t      thread_type;
+typedef sem_t          sema_type;
 #  endif
+typedef thread_type    *thr_ref;
+typedef sema_type      *sem_ref;
 # endif
 
 /*
  *
  */
-#ifdef WORK_FORK
+#if defined(WORK_FORK)
+
 typedef struct blocking_child_tag {
        int     reusable;
        int     pid;
@@ -66,38 +69,59 @@ typedef struct blocking_child_tag {
        int     resp_write_pipe;
        int     ispipe;
 } blocking_child;
+
 #elif defined(WORK_THREAD)
+
 typedef struct blocking_child_tag {
 /*
  * blocking workitems and blocking_responses are dynamically-sized
  * one-dimensional arrays of pointers to blocking worker requests and
  * responses.
+ *
+ * IMPORTANT: This structure is shared between threads, and all access
+ * that is not atomic (especially queue operations) must hold the
+ * 'accesslock' semaphore to avoid data races.
+ *
+ * The resource management (thread/semaphore creation/destruction)
+ * functions and functions just testing a handle are safe because these
+ * are only changed by the main thread when no worker is running on the
+ * same data structure.
  */
        int                     reusable;
-       thr_ref                 thread_ref;
-       u_int                   thread_id;
-       blocking_pipe_header * volatile * volatile 
+       sem_ref                 accesslock;     /* shared access lock */
+       thr_ref                 thread_ref;     /* thread 'handle' */
+
+       /* the reuest queue */
+       blocking_pipe_header ** volatile
                                workitems;
        volatile size_t         workitems_alloc;
-       size_t                  next_workitem;   /* parent */
-       size_t                  next_workeritem; /* child */
-       blocking_pipe_header * volatile * volatile 
+       size_t                  head_workitem;          /* parent */
+       size_t                  tail_workitem;          /* child */
+       sem_ref                 workitems_pending;      /* signalling */
+
+       /* the response queue */
+       blocking_pipe_header ** volatile
                                responses;
        volatile size_t         responses_alloc;
-       size_t                  next_response;  /* child */
-       size_t                  next_workresp;  /* parent */
+       size_t                  head_response;          /* child */
+       size_t                  tail_response;          /* parent */
+
        /* event handles / sem_t pointers */
-       /* sem_ref              child_is_blocking; */
-       sem_ref                 blocking_req_ready;
        sem_ref                 wake_scheduled_sleep;
+
+       /* some systems use a pipe for notification, others a semaphore.
+        * Both employ the queue above for the actual data transfer.
+        */
 #ifdef WORK_PIPE
-       int                     resp_read_pipe; /* parent */
-       int                     resp_write_pipe;/* child */
+       int                     resp_read_pipe;         /* parent */
+       int                     resp_write_pipe;        /* child */
        int                     ispipe;
-       void *                  resp_read_ctx;  /* child */
+       void *                  resp_read_ctx;          /* child */
 #else
-       sem_ref                 blocking_response_ready;
+       sem_ref                 responses_pending;      /* signalling */
 #endif
+       sema_type               sem_table[4];
+       thread_type             thr_table[1];
 } blocking_child;
 
 #endif /* WORK_THREAD */
@@ -111,7 +135,7 @@ extern      u_int   available_blocking_child_slot(void);
 extern int     queue_blocking_request(blocking_work_req, void *,
                                       size_t, blocking_work_callback,
                                       void *);
-extern int     queue_blocking_response(blocking_child *, 
+extern int     queue_blocking_response(blocking_child *,
                                        blocking_pipe_header *, size_t,
                                        const blocking_pipe_header *);
 extern void    process_blocking_resp(blocking_child *);
index 38d87470f01a7b7ad877311791de0fe6ae8689dd..49e90c16c3266dd70b984035b428b37d4f9df25c 100644 (file)
 #define THREAD_MINSTACKSIZE    (64U * 1024)
 #endif
 
-#ifndef DEVOLATILE
-#define DEVOLATILE(type, var) ((type)(uintptr_t)(volatile void *)(var))
-#endif
-
 #ifdef SYS_WINNT
+
 # define thread_exit(c)        _endthreadex(c)
-# define tickle_sem    SetEvent
+# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
+u_int  WINAPI  blocking_thread(void *);
+static BOOL    same_os_sema(const sem_ref obj, void * osobj);
+
 #else
+
 # define thread_exit(c)        pthread_exit((void*)(size_t)(c))
 # define tickle_sem    sem_post
+void *         blocking_thread(void *);
+static void    block_thread_signals(sigset_t *);
+
 #endif
 
 #ifdef WORK_PIPE
@@ -54,18 +58,10 @@ static      void    start_blocking_thread(blocking_child *);
 static void    start_blocking_thread_internal(blocking_child *);
 static void    prepare_child_sems(blocking_child *);
 static int     wait_for_sem(sem_ref, struct timespec *);
-static void    ensure_workitems_empty_slot(blocking_child *);
-static void    ensure_workresp_empty_slot(blocking_child *);
+static int     ensure_workitems_empty_slot(blocking_child *);
+static int     ensure_workresp_empty_slot(blocking_child *);
 static int     queue_req_pointer(blocking_child *, blocking_pipe_header *);
 static void    cleanup_after_child(blocking_child *);
-#ifdef SYS_WINNT
-u_int  WINAPI  blocking_thread(void *);
-#else
-void *         blocking_thread(void *);
-#endif
-#ifndef SYS_WINNT
-static void    block_thread_signals(sigset_t *);
-#endif
 
 
 void
@@ -76,7 +72,9 @@ exit_worker(
        thread_exit(exitcode);  /* see #define thread_exit */
 }
 
-
+/* --------------------------------------------------------------------
+ * sleep for a given time or until the wakup semaphore is tickled.
+ */
 int
 worker_sleep(
        blocking_child *        c,
@@ -98,9 +96,7 @@ worker_sleep(
        }
 # endif
        until.tv_sec += seconds;
-       do {
-               rc = wait_for_sem(c->wake_scheduled_sleep, &until);
-       } while (-1 == rc && EINTR == errno);
+       rc = wait_for_sem(c->wake_scheduled_sleep, &until);
        if (0 == rc)
                return -1;
        if (-1 == rc && ETIMEDOUT == errno)
@@ -110,6 +106,9 @@ worker_sleep(
 }
 
 
+/* --------------------------------------------------------------------
+ * Wake up a worker that takes a nap.
+ */
 void
 interrupt_worker_sleep(void)
 {
@@ -124,65 +123,79 @@ interrupt_worker_sleep(void)
        }
 }
 
-
-static void
+/* --------------------------------------------------------------------
+ * Make sure there is an empty slot at the head of the request
+ * queue. Tell if the queue is currently empty.
+ */
+static int
 ensure_workitems_empty_slot(
        blocking_child *c
        )
 {
-       const size_t    each = sizeof(blocking_children[0]->workitems[0]);
-       size_t          new_alloc;
-       size_t          old_octets;
-       size_t          new_octets;
-       void *          nonvol_workitems;
-
-
-       if (c->workitems != NULL &&
-           NULL == c->workitems[c->next_workitem])
-               return;
-
-       new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
-       old_octets = c->workitems_alloc * each;
-       new_octets = new_alloc * each;
-       nonvol_workitems = DEVOLATILE(void *, c->workitems);
-       c->workitems = erealloc_zero(nonvol_workitems, new_octets,
-                                    old_octets);
-       if (0 == c->next_workitem)
-               c->next_workitem = c->workitems_alloc;
-       c->workitems_alloc = new_alloc;
+       /*
+       ** !!! PRECONDITION: caller holds access lock!
+       **
+       ** This simply tries to increase the size of the buffer if it
+       ** becomes full. The resize operation does *not* maintain the
+       ** order of requests, but that should be irrelevant since the
+       ** processing is considered asynchronous anyway.
+       **
+       ** Return if the buffer is currently empty.
+       */
+       
+       static const size_t each =
+           sizeof(blocking_children[0]->workitems[0]);
+
+       size_t  new_alloc;
+       size_t  slots_used;
+
+       slots_used = c->head_workitem - c->tail_workitem;
+       if (slots_used >= c->workitems_alloc) {
+               new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
+               c->workitems = erealloc(c->workitems, new_alloc * each);
+               c->tail_workitem   = 0;
+               c->head_workitem   = c->workitems_alloc;
+               c->workitems_alloc = new_alloc;
+       }
+       return (0 == slots_used);
 }
 
-
-static void
+/* --------------------------------------------------------------------
+ * Make sure there is an empty slot at the head of the response
+ * queue. Tell if the queue is currently empty.
+ */
+static int
 ensure_workresp_empty_slot(
        blocking_child *c
        )
 {
-       const size_t    each = sizeof(blocking_children[0]->responses[0]);
-       size_t          new_alloc;
-       size_t          old_octets;
-       size_t          new_octets;
-       void *          nonvol_responses;
-
-       if (c->responses != NULL &&
-           NULL == c->responses[c->next_response])
-               return;
-
-       new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
-       old_octets = c->responses_alloc * each;
-       new_octets = new_alloc * each;
-       nonvol_responses = DEVOLATILE(void *, c->responses);
-       c->responses = erealloc_zero(nonvol_responses, new_octets,
-                                    old_octets);
-       if (0 == c->next_response)
-               c->next_response = c->responses_alloc;
-       c->responses_alloc = new_alloc;
+       /*
+       ** !!! PRECONDITION: caller holds access lock!
+       **
+       ** Works like the companion function above.
+       */
+       
+       static const size_t each =
+           sizeof(blocking_children[0]->responses[0]);
+
+       size_t  new_alloc;
+       size_t  slots_used;
+
+       slots_used = c->head_response - c->tail_response;
+       if (slots_used >= c->responses_alloc) {
+               new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
+               c->responses = erealloc(c->responses, new_alloc * each);
+               c->tail_response   = 0;
+               c->head_response   = c->responses_alloc;
+               c->responses_alloc = new_alloc;
+       }
+       return (0 == slots_used);
 }
 
 
-/*
+/* --------------------------------------------------------------------
  * queue_req_pointer() - append a work item or idle exit request to
- *                      blocking_workitems[].
+ *                      blocking_workitems[]. Employ proper locking.
  */
 static int
 queue_req_pointer(
@@ -190,21 +203,28 @@ queue_req_pointer(
        blocking_pipe_header *  hdr
        )
 {
-       c->workitems[c->next_workitem] = hdr;
-       c->next_workitem = (1 + c->next_workitem) % c->workitems_alloc;
+       size_t qhead;
+       
+       /* >>>> ACCESS LOCKING STARTS >>>> */
+       wait_for_sem(c->accesslock, NULL);
+       ensure_workitems_empty_slot(c);
+       qhead = c->head_workitem;
+       c->workitems[qhead % c->workitems_alloc] = hdr;
+       c->head_workitem = 1 + qhead;
+       tickle_sem(c->accesslock);
+       /* <<<< ACCESS LOCKING ENDS <<<< */
 
-       /*
-        * We only want to signal the wakeup event if the child is
-        * blocking on it, which is indicated by setting the blocking
-        * event.  Wait with zero timeout to test.
-        */
-       /* !!!! if (WAIT_OBJECT_0 == WaitForSingleObject(c->child_is_blocking, 0)) */
-               tickle_sem(c->blocking_req_ready);
+       /* queue consumer wake-up notification */
+       tickle_sem(c->workitems_pending);
 
        return 0;
 }
 
-
+/* --------------------------------------------------------------------
+ * API function to make sure a worker is running, a proper private copy
+ * of the data is made, the data eneterd into the queue and the worker
+ * is signalled.
+ */
 int
 send_blocking_req_internal(
        blocking_child *        c,
@@ -223,12 +243,8 @@ send_blocking_req_internal(
                return 1;       /* failure */
        payload_octets = hdr->octets - sizeof(*hdr);
 
-       ensure_workitems_empty_slot(c);
-       if (NULL == c->thread_ref) {
-               ensure_workresp_empty_slot(c);
+       if (NULL == c->thread_ref)
                start_blocking_thread(c);
-       }
-
        threadcopy = emalloc(hdr->octets);
        memcpy(threadcopy, hdr, sizeof(*hdr));
        memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
@@ -236,43 +252,41 @@ send_blocking_req_internal(
        return queue_req_pointer(c, threadcopy);
 }
 
-
+/* --------------------------------------------------------------------
+ * Wait for the 'incoming queue no longer empty' signal, lock the shared
+ * structure and dequeue an item.
+ */
 blocking_pipe_header *
 receive_blocking_req_internal(
        blocking_child *        c
        )
 {
        blocking_pipe_header *  req;
-       int                     rc;
+       size_t                  qhead, qtail;
 
-       /*
-        * Child blocks here when idle.  SysV semaphores maintain a
-        * count and release from sem_wait() only when it reaches 0.
-        * Windows auto-reset events are simpler, and multiple SetEvent
-        * calls before any thread waits result in a single wakeup.
-        * On Windows, the child drains all workitems each wakeup, while
-        * with SysV semaphores wait_sem() is used before each item.
-        */
-#ifdef SYS_WINNT
-       while (NULL == c->workitems[c->next_workeritem]) {
-               /* !!!! SetEvent(c->child_is_blocking); */
-               rc = wait_for_sem(c->blocking_req_ready, NULL);
-               INSIST(0 == rc);
-               /* !!!! ResetEvent(c->child_is_blocking); */
-       }
-#else
+       req = NULL;
        do {
-               rc = wait_for_sem(c->blocking_req_ready, NULL);
-       } while (-1 == rc && EINTR == errno);
-       INSIST(0 == rc);
-#endif
+               /* wait for tickle from the producer side */
+               wait_for_sem(c->workitems_pending, NULL);
+
+               /* >>>> ACCESS LOCKING STARTS >>>> */
+               wait_for_sem(c->accesslock, NULL);
+               qhead = c->head_workitem;
+               do {
+                       qtail = c->tail_workitem;
+                       if (qhead == qtail)
+                               break;
+                       c->tail_workitem = qtail + 1;
+                       qtail %= c->workitems_alloc;
+                       req = c->workitems[qtail];
+                       c->workitems[qtail] = NULL;
+               } while (NULL == req);
+               tickle_sem(c->accesslock);
+               /* <<<< ACCESS LOCKING ENDS <<<< */
+
+       } while (NULL == req);
 
-       req = c->workitems[c->next_workeritem];
        INSIST(NULL != req);
-       c->workitems[c->next_workeritem] = NULL;
-       c->next_workeritem = (1 + c->next_workeritem) %
-                               c->workitems_alloc;
-
        if (CHILD_EXIT_REQ == req) {    /* idled out */
                send_blocking_resp_internal(c, CHILD_GONE_RESP);
                req = NULL;
@@ -281,44 +295,74 @@ receive_blocking_req_internal(
        return req;
 }
 
-
+/* --------------------------------------------------------------------
+ * Push a response into the return queue and eventually tickle the
+ * receiver.
+ */
 int
 send_blocking_resp_internal(
        blocking_child *        c,
        blocking_pipe_header *  resp
        )
 {
-       ensure_workresp_empty_slot(c);
-
-       c->responses[c->next_response] = resp;
-       c->next_response = (1 + c->next_response) % c->responses_alloc;
-
-#ifdef WORK_PIPE
-       write(c->resp_write_pipe, "", 1);
-#else
-       tickle_sem(c->blocking_response_ready);
-#endif
-
+       size_t  qhead;
+       int     empty;
+       
+       /* >>>> ACCESS LOCKING STARTS >>>> */
+       wait_for_sem(c->accesslock, NULL);
+       empty = ensure_workresp_empty_slot(c);
+       qhead = c->head_response;
+       c->responses[qhead % c->responses_alloc] = resp;
+       c->head_response = 1 + qhead;
+       tickle_sem(c->accesslock);
+       /* <<<< ACCESS LOCKING ENDS <<<< */
+
+       /* queue consumer wake-up notification */
+       if (empty)
+       {
+#          ifdef WORK_PIPE
+               write(c->resp_write_pipe, "", 1);
+#          else
+               tickle_sem(c->responses_pending);
+#          endif
+       }
        return 0;
 }
 
 
 #ifndef WORK_PIPE
+
+/* --------------------------------------------------------------------
+ * Check if a (Windows-)hanndle to a semaphore is actually the same we
+ * are using inside the sema wrapper.
+ */
+static BOOL
+same_os_sema(
+       const sem_ref   obj,
+       void*           osh
+       )
+{
+       return obj && osh && (obj->shnd == (HANDLE)osh);
+}
+
+/* --------------------------------------------------------------------
+ * Find the shared context that associates to an OS handle and make sure
+ * the data is dequeued and processed.
+ */
 void
 handle_blocking_resp_sem(
        void *  context
        )
 {
-       HANDLE                  ready;
        blocking_child *        c;
        u_int                   idx;
 
-       ready = (HANDLE)context;
        c = NULL;
        for (idx = 0; idx < blocking_children_alloc; idx++) {
                c = blocking_children[idx];
-               if (c != NULL && c->thread_ref != NULL &&
-                   ready == c->blocking_response_ready)
+               if (c != NULL &&
+                       c->thread_ref != NULL &&
+                       same_os_sema(c->responses_pending, context))
                        break;
        }
        if (idx < blocking_children_alloc)
@@ -326,26 +370,41 @@ handle_blocking_resp_sem(
 }
 #endif /* !WORK_PIPE */
 
-
+/* --------------------------------------------------------------------
+ * Fetch the next response from the return queue. In case of signalling
+ * via pipe, make sure the pipe is flushed, too.
+ */
 blocking_pipe_header *
 receive_blocking_resp_internal(
        blocking_child *        c
        )
 {
        blocking_pipe_header *  removed;
+       size_t                  qhead, qtail, slot;
+
 #ifdef WORK_PIPE
        int                     rc;
        char                    scratch[32];
 
-       do {
+       do
                rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
-       while (-1 == rc && EINTR == errno);
+       while (-1 == rc && EINTR == errno);
 #endif
-       removed = c->responses[c->next_workresp];
+
+       /* >>>> ACCESS LOCKING STARTS >>>> */
+       wait_for_sem(c->accesslock, NULL);
+       qhead = c->head_response;
+       qtail = c->tail_response;
+       for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
+               slot = qtail % c->responses_alloc;
+               removed = c->responses[slot];
+               c->responses[slot] = NULL;
+       }
+       c->tail_response = qtail;
+       tickle_sem(c->accesslock);
+       /* <<<< ACCESS LOCKING ENDS <<<< */
+
        if (NULL != removed) {
-               c->responses[c->next_workresp] = NULL;
-               c->next_workresp = (1 + c->next_workresp) %
-                                  c->responses_alloc;
                DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
                             BLOCKING_RESP_MAGIC == removed->magic_sig);
        }
@@ -357,7 +416,9 @@ receive_blocking_resp_internal(
        return removed;
 }
 
-
+/* --------------------------------------------------------------------
+ * Light up a new worker.
+ */
 static void
 start_blocking_thread(
        blocking_child *        c
@@ -370,40 +431,45 @@ start_blocking_thread(
        start_blocking_thread_internal(c);
 }
 
-
+/* --------------------------------------------------------------------
+ * Create a worker thread. There are several differences between POSIX
+ * and Windows, of course -- most notably the Windows thread is no
+ * detached thread, and we keep the handle around until we want to get
+ * rid of the thread. The notification scheme also differs: Windows
+ * makes use of semaphores in both directions, POSIX uses a pipe for
+ * integration with 'select()' or alike.
+ */
 static void
 start_blocking_thread_internal(
        blocking_child *        c
        )
 #ifdef SYS_WINNT
 {
-       thr_ref blocking_child_thread;
-       u_int   blocking_thread_id;
        BOOL    resumed;
 
-       (*addremove_io_semaphore)(c->blocking_response_ready, FALSE);
-       blocking_child_thread =
+       c->thread_ref = NULL;
+       (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
+       c->thr_table[0].thnd =
                (HANDLE)_beginthreadex(
                        NULL,
                        0,
                        &blocking_thread,
                        c,
                        CREATE_SUSPENDED,
-                       &blocking_thread_id);
+                       NULL);
 
-       if (NULL == blocking_child_thread) {
+       if (NULL == c->thr_table[0].thnd) {
                msyslog(LOG_ERR, "start blocking thread failed: %m");
                exit(-1);
        }
-       c->thread_id = blocking_thread_id;
-       c->thread_ref = blocking_child_thread;
        /* remember the thread priority is only within the process class */
-       if (!SetThreadPriority(blocking_child_thread,
+       if (!SetThreadPriority(c->thr_table[0].thnd,
                               THREAD_PRIORITY_BELOW_NORMAL))
                msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
 
-       resumed = ResumeThread(blocking_child_thread);
+       resumed = ResumeThread(c->thr_table[0].thnd);
        DEBUG_INSIST(resumed);
+       c->thread_ref = &c->thr_table[0];
 }
 #else  /* pthreads start_blocking_thread_internal() follows */
 {
@@ -419,6 +485,8 @@ start_blocking_thread_internal(
        size_t          stacksize;
        sigset_t        saved_sig_mask;
 
+       c->thread_ref = NULL;
+
 # ifdef NEED_PTHREAD_INIT
        /*
         * from lib/isc/unix/app.c:
@@ -475,7 +543,7 @@ start_blocking_thread_internal(
 #endif
        c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
        block_thread_signals(&saved_sig_mask);
-       rc = pthread_create(c->thread_ref, &thr_attr,
+       rc = pthread_create(&c->thr_table[0], &thr_attr,
                            &blocking_thread, c);
        saved_errno = errno;
        pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
@@ -485,11 +553,11 @@ start_blocking_thread_internal(
                msyslog(LOG_ERR, "pthread_create() blocking child: %m");
                exit(1);
        }
+       c->thread_ref = &c->thr_table[0];
 }
 #endif
 
-
-/*
+/* --------------------------------------------------------------------
  * block_thread_signals()
  *
  * Temporarily block signals used by ntpd main thread, so that signal
@@ -538,61 +606,101 @@ block_thread_signals(
 #endif /* !SYS_WINNT */
 
 
-/*
+/* --------------------------------------------------------------------
+ * Create & destroy semaphores. This is sufficiently different between
+ * POSIX and Windows to warrant wrapper functions and close enough to
+ * use the concept of synchronization via semaphore for all platforms.
+ */
+static sem_ref
+create_sema(
+       sema_type*      semptr,
+       u_int           inival,
+       u_int           maxval)
+{
+#ifdef SYS_WINNT
+       
+       long svini, svmax;
+       if (NULL != semptr) {
+               svini = (inival < LONG_MAX)
+                   ? (long)inival : LONG_MAX;
+               svmax = (maxval < LONG_MAX && maxval > 0)
+                   ? (long)maxval : LONG_MAX;
+               semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
+               if (NULL == semptr->shnd)
+                       semptr = NULL;
+       }
+       
+#else
+       
+       (void)maxval;
+       if (semptr && sem_init(semptr, FALSE, inival))
+               semptr = NULL;
+       
+#endif
+
+       return semptr;
+}
+
+/* ------------------------------------------------------------------ */
+static sem_ref
+delete_sema(
+       sem_ref obj)
+{
+               
+#   ifdef SYS_WINNT
+               
+       if (obj) {
+               if (obj->shnd)
+                       CloseHandle(obj->shnd);
+               obj->shnd = NULL;
+       }
+       
+#   else
+               
+       if (obj)
+               sem_destroy(obj);
+               
+#   endif
+
+       return NULL;
+}
+
+/* --------------------------------------------------------------------
  * prepare_child_sems()
  *
- * create sync events (semaphores)
- * child_is_blocking initially unset
- * blocking_req_ready initially unset
+ * create sync & access semaphores
  *
- * Child waits for blocking_req_ready to be set after
- * setting child_is_blocking.  blocking_req_ready and
- * blocking_response_ready are auto-reset, so wake one
- * waiter and become unset (unsignalled) in one operation.
+ * All semaphores are cleared, only the access semaphore has 1 unit.
+ * Childs wait on 'workitems_pending', then grabs 'sema_access'
+ * and dequeues jobs. When done, 'sema_access' is given one unit back.
+ *
+ * The producer grabs 'sema_access', manages the queue, restores
+ * 'sema_access' and puts one unit into 'workitems_pending'.
+ *
+ * The story goes the same for the response queue.
  */
 static void
 prepare_child_sems(
        blocking_child *c
        )
-#ifdef SYS_WINNT
-{
-       if (NULL == c->blocking_req_ready) {
-               /* manual reset using ResetEvent() */
-               /* !!!! c->child_is_blocking = CreateEvent(NULL, TRUE, FALSE, NULL); */
-               /* auto reset - one thread released from wait each set */
-               c->blocking_req_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
-               c->blocking_response_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
-               c->wake_scheduled_sleep = CreateEvent(NULL, FALSE, FALSE, NULL);
-       } else {
-               /* !!!! ResetEvent(c->child_is_blocking); */
-               /* ResetEvent(c->blocking_req_ready); */
-               /* ResetEvent(c->blocking_response_ready); */
-               /* ResetEvent(c->wake_scheduled_sleep); */
-       }
-}
-#else  /* pthreads prepare_child_sems() follows */
 {
-       size_t  octets;
-
-       if (NULL == c->blocking_req_ready) {
-               octets = sizeof(*c->blocking_req_ready);
-               octets += sizeof(*c->wake_scheduled_sleep);
-               /* !!!! octets += sizeof(*c->child_is_blocking); */
-               c->blocking_req_ready = emalloc_zero(octets);;
-               c->wake_scheduled_sleep = 1 + c->blocking_req_ready;
-               /* !!!! c->child_is_blocking = 1 + c->wake_scheduled_sleep; */
-       } else {
-               sem_destroy(c->blocking_req_ready);
-               sem_destroy(c->wake_scheduled_sleep);
-               /* !!!! sem_destroy(c->child_is_blocking); */
-       }
-       sem_init(c->blocking_req_ready, FALSE, 0);
-       sem_init(c->wake_scheduled_sleep, FALSE, 0);
-       /* !!!! sem_init(c->child_is_blocking, FALSE, 0); */
+       c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
+       c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
+       c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
+#   ifndef WORK_PIPE
+       c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
+#   endif
 }
-#endif
-
 
+/* --------------------------------------------------------------------
+ * wait for semaphore. Where the wait can be interrupted, it will
+ * internally resume -- When this function returns, there is either no
+ * semaphore at all, a timeout occurred, or the caller could
+ * successfully take a token from the semaphore.
+ *
+ * For untimed wait, not checking the result of this function at all is
+ * definitely an option.
+ */
 static int
 wait_for_sem(
        sem_ref                 sem,
@@ -605,6 +713,11 @@ wait_for_sem(
        DWORD           msec;
        DWORD           rc;
 
+       if (!(sem && sem->shnd)) {
+               errno = EINVAL;
+               return -1;
+       }
+       
        if (NULL == timeout) {
                msec = INFINITE;
        } else {
@@ -619,7 +732,7 @@ wait_for_sem(
                        msec += delta.tv_nsec / (1000 * 1000);
                }
        }
-       rc = WaitForSingleObject(sem, msec);
+       rc = WaitForSingleObject(sem->shnd, msec);
        if (WAIT_OBJECT_0 == rc)
                return 0;
        if (WAIT_TIMEOUT == rc) {
@@ -632,24 +745,28 @@ wait_for_sem(
 }
 #else  /* pthreads wait_for_sem() follows */
 {
-       int rc;
-
-       if (NULL == timeout)
-               rc = sem_wait(sem);
+       int rc = -1;
+
+       if (sem) do {
+                       if (NULL == timeout)
+                               rc = sem_wait(sem);
+                       else
+                               rc = sem_timedwait(sem, timeout);
+               } while (rc == -1 && errno == EINTR);
        else
-               rc = sem_timedwait(sem, timeout);
-
+               errno = EINVAL;
+               
        return rc;
 }
 #endif
 
-
-/*
- * blocking_thread - thread functions have WINAPI calling convention
+/* --------------------------------------------------------------------
+ * blocking_thread - thread functions have WINAPI (aka 'stdcall')
+ * calling conventions under Windows and POSIX-defined signature
+ * otherwise.
  */
 #ifdef SYS_WINNT
-u_int
-WINAPI
+u_int WINAPI
 #else
 void *
 #endif
@@ -666,20 +783,28 @@ blocking_thread(
        return 0;
 }
 
-
-/*
+/* --------------------------------------------------------------------
  * req_child_exit() runs in the parent.
+ *
+ * This function is called from from the idle timer, too, and possibly
+ * without a thread being there any longer. Since we have folded up our
+ * tent in that case and all the semaphores are already gone, we simply
+ * ignore this request in this case.
+ *
+ * Since the existence of the semaphores is controlled exclusively by
+ * the parent, there's no risk of data race here.
  */
 int
 req_child_exit(
        blocking_child *c
        )
 {
-       return queue_req_pointer(c, CHILD_EXIT_REQ);
+       return (c->accesslock)
+           ? queue_req_pointer(c, CHILD_EXIT_REQ)
+           : 0;
 }
 
-
-/*
+/* --------------------------------------------------------------------
  * cleanup_after_child() runs in parent.
  */
 static void
@@ -687,17 +812,27 @@ cleanup_after_child(
        blocking_child *        c
        )
 {
-       u_int   idx;
-
        DEBUG_INSIST(!c->reusable);
-#ifdef SYS_WINNT
-       INSIST(CloseHandle(c->thread_ref));
-#else
-       free(c->thread_ref);
-#endif
+       
+#   ifdef SYS_WINNT
+       /* The thread was not created in detached state, so we better
+        * clean up.
+        */
+       if (c->thread_ref && c->thread_ref->thnd) {
+               WaitForSingleObject(c->thread_ref->thnd, INFINITE);
+               INSIST(CloseHandle(c->thread_ref->thnd));
+               c->thread_ref->thnd = NULL;
+       }
+#   endif
        c->thread_ref = NULL;
-       c->thread_id = 0;
-#ifdef WORK_PIPE
+
+       /* remove semaphores and (if signalling vi IO) pipes */
+       
+       c->accesslock           = delete_sema(c->accesslock);
+       c->workitems_pending    = delete_sema(c->workitems_pending);
+       c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
+
+#   ifdef WORK_PIPE
        DEBUG_INSIST(-1 != c->resp_read_pipe);
        DEBUG_INSIST(-1 != c->resp_write_pipe);
        (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
@@ -705,18 +840,22 @@ cleanup_after_child(
        close(c->resp_read_pipe);
        c->resp_write_pipe = -1;
        c->resp_read_pipe = -1;
-#else
-       DEBUG_INSIST(NULL != c->blocking_response_ready);
-       (*addremove_io_semaphore)(c->blocking_response_ready, TRUE);
-#endif
-       for (idx = 0; idx < c->workitems_alloc; idx++)
-               c->workitems[idx] = NULL;
-       c->next_workitem = 0;
-       c->next_workeritem = 0;
-       for (idx = 0; idx < c->responses_alloc; idx++)
-               c->responses[idx] = NULL;
-       c->next_response = 0;
-       c->next_workresp = 0;
+#   else
+       DEBUG_INSIST(NULL != c->responses_pending);
+       (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
+       c->responses_pending = delete_sema(c->responses_pending);
+#   endif
+
+       /* Is it necessary to check if there are pending requests and
+        * responses? If so, and if there are, what to do with them?
+        */
+       
+       /* re-init buffer index sequencers */
+       c->head_workitem = 0;
+       c->tail_workitem = 0;
+       c->head_response = 0;
+       c->tail_response = 0;
+
        c->reusable = TRUE;
 }