typedef struct h2_slot h2_slot;
struct h2_slot {
int id;
+ int sticks;
h2_slot *next;
h2_workers *workers;
- int aborted;
- int sticks;
h2_task *task;
apr_thread_t *thread;
apr_thread_mutex_t *lock;
apr_thread_cond_t *not_idle;
};
-static h2_slot *pop_slot(h2_slot **phead)
+static h2_slot *pop_slot(h2_slot *volatile *phead)
{
/* Atomically pop a slot from the list */
for (;;) {
}
}
-static void push_slot(h2_slot **phead, h2_slot *slot)
+static void push_slot(h2_slot *volatile *phead, h2_slot *slot)
{
/* Atomically push a slot to the list */
ap_assert(!slot->next);
apr_status_t status;
slot->workers = workers;
- slot->aborted = 0;
slot->task = NULL;
if (!slot->lock) {
ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
"h2_workers: new thread for slot %d", slot->id);
+
/* thread will either immediately start work or add itself
* to the idle queue */
- apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot,
- workers->pool);
- if (!slot->thread) {
+ apr_atomic_inc32(&workers->worker_count);
+ status = apr_thread_create(&slot->thread, workers->thread_attr,
+ slot_run, slot, workers->pool);
+ if (status != APR_SUCCESS) {
+ apr_atomic_dec32(&workers->worker_count);
push_slot(&workers->free, slot);
- return APR_ENOMEM;
+ return status;
}
- apr_atomic_inc32(&workers->worker_count);
return APR_SUCCESS;
}
}
}
-static void cleanup_zombies(h2_workers *workers)
+static void join_zombies(h2_workers *workers)
{
h2_slot *slot;
while ((slot = pop_slot(&workers->zombies))) {
- if (slot->thread) {
- apr_status_t status;
- apr_thread_join(&status, slot->thread);
- slot->thread = NULL;
- }
- apr_atomic_dec32(&workers->worker_count);
- slot->next = NULL;
+ apr_status_t status;
+ ap_assert(slot->thread != NULL);
+ apr_thread_join(&status, slot->thread);
+ slot->thread = NULL;
+
push_slot(&workers->free, slot);
}
}
* Get the next task for the given worker. Will block until a task arrives
* or the max_wait timer expires and more than min workers exist.
*/
-static apr_status_t get_next(h2_slot *slot)
+static int get_next(h2_slot *slot)
{
h2_workers *workers = slot->workers;
- apr_status_t status;
-
- slot->task = NULL;
- while (!slot->aborted) {
- if (!slot->task) {
- status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
- if (status == APR_EOF) {
- return status;
- }
+
+ while (!workers->aborted) {
+ ap_assert(slot->task == NULL);
+ if (h2_fifo_try_peek(workers->mplxs, mplx_peek, slot) == APR_EOF) {
+ /* The queue is terminated with the MPM child being cleaned up,
+ * just leave.
+ */
+ break;
}
-
if (slot->task) {
- return APR_SUCCESS;
+ return 1;
}
- cleanup_zombies(workers);
+ join_zombies(workers);
apr_thread_mutex_lock(slot->lock);
- push_slot(&workers->idle, slot);
- apr_thread_cond_wait(slot->not_idle, slot->lock);
+ if (!workers->aborted) {
+ push_slot(&workers->idle, slot);
+ apr_thread_cond_wait(slot->not_idle, slot->lock);
+ }
apr_thread_mutex_unlock(slot->lock);
}
- return APR_EOF;
+
+ return 0;
}
static void slot_done(h2_slot *slot)
{
- push_slot(&(slot->workers->zombies), slot);
+ h2_workers *workers = slot->workers;
+
+ push_slot(&workers->zombies, slot);
+
+ /* If this worker is the last one exiting and the MPM child is stopping,
+ * unblock workers_pool_cleanup().
+ */
+ if (!apr_atomic_dec32(&workers->worker_count) && workers->aborted) {
+ apr_thread_mutex_lock(workers->lock);
+ apr_thread_cond_signal(workers->all_done);
+ apr_thread_mutex_unlock(workers->lock);
+ }
}
{
h2_slot *slot = wctx;
- while (!slot->aborted) {
-
- /* Get a h2_task from the mplxs queue. */
- get_next(slot);
- while (slot->task) {
-
+ /* Get the h2_task(s) from the ->mplxs queue. */
+ while (get_next(slot)) {
+ ap_assert(slot->task != NULL);
+ do {
h2_task_do(slot->task, thread, slot->id);
/* Report the task as done. If stickyness is left, offer the
* mplx the opportunity to give us back a new task right away.
*/
- if (!slot->aborted && (--slot->sticks > 0)) {
+ if (!slot->workers->aborted && --slot->sticks > 0) {
h2_mplx_s_task_done(slot->task->mplx, slot->task, &slot->task);
}
else {
h2_mplx_s_task_done(slot->task->mplx, slot->task, NULL);
slot->task = NULL;
}
- }
+ } while (slot->task);
}
slot_done(slot);
+
+ apr_thread_exit(thread, APR_SUCCESS);
return NULL;
}
h2_workers *workers = data;
h2_slot *slot;
- if (!workers->aborted) {
- workers->aborted = 1;
- /* abort all idle slots */
- for (;;) {
- slot = pop_slot(&workers->idle);
- if (slot) {
- apr_thread_mutex_lock(slot->lock);
- slot->aborted = 1;
- apr_thread_cond_signal(slot->not_idle);
- apr_thread_mutex_unlock(slot->lock);
- }
- else {
- break;
- }
- }
+ workers->aborted = 1;
+ h2_fifo_term(workers->mplxs);
- h2_fifo_term(workers->mplxs);
+ /* abort all idle slots */
+ while ((slot = pop_slot(&workers->idle))) {
+ apr_thread_mutex_lock(slot->lock);
+ apr_thread_cond_signal(slot->not_idle);
+ apr_thread_mutex_unlock(slot->lock);
+ }
- cleanup_zombies(workers);
+ /* wait for all the workers to become zombies and join them */
+ apr_thread_mutex_lock(workers->lock);
+ if (apr_atomic_read32(&workers->worker_count)) {
+ apr_thread_cond_wait(workers->all_done, workers->lock);
}
+ apr_thread_mutex_unlock(workers->lock);
+ join_zombies(workers);
+
return APR_SUCCESS;
}
status = apr_thread_mutex_create(&workers->lock,
APR_THREAD_MUTEX_DEFAULT,
workers->pool);
+ if (status == APR_SUCCESS) {
+ status = apr_thread_cond_create(&workers->all_done, workers->pool);
+ }
if (status == APR_SUCCESS) {
n = workers->nslots = workers->max_workers;
workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));