if (enter_mutex(m, &acquired) == APR_SUCCESS) {
if (task) {
- h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): task(%s) done", m->id, task->id);
- /* clean our references and report request as done. Signal
- * that we want another unless we have been aborted */
- /* TODO: this will keep a worker attached to this h2_mplx as
- * long as it has requests to handle. Might no be fair to
- * other mplx's. Perhaps leave after n requests? */
-
- if (task->c) {
- apr_pool_destroy(task->c->pool);
+ if (task->frozen) {
+ /* this task was handed over to an engine for processing */
+ h2_task_thaw(task);
+ /* TODO: can we signal an engine that it can now start on this? */
}
- task = NULL;
- if (io) {
- io->processing_done = 1;
- h2_mplx_out_close(m, io->id, NULL);
- if (io->orphaned) {
- io_destroy(m, io, 0);
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
+ else {
+ h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): task(%s) done", m->id, task->id);
+ /* clean our references and report request as done. Signal
+ * that we want another unless we have been aborted */
+ /* TODO: this will keep a worker attached to this h2_mplx as
+ * long as it has requests to handle. Might no be fair to
+ * other mplx's. Perhaps leave after n requests? */
+
+ if (task->c) {
+ apr_pool_destroy(task->c->pool);
}
- else {
- /* hang around until the stream deregisteres */
+ task = NULL;
+ if (io) {
+ io->processing_done = 1;
+ h2_mplx_out_close(m, io->id, NULL);
+ if (io->orphaned) {
+ io_destroy(m, io, 0);
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
+ }
+ else {
+ /* hang around until the stream deregisteres */
+ }
}
+ apr_thread_cond_broadcast(m->request_done);
}
- apr_thread_cond_broadcast(m->request_done);
}
if (ptask) {
#include <http_core.h>
#include <http_log.h>
+#include "h2.h"
#include "h2_private.h"
#include "h2_conn.h"
#include "h2_ctx.h"
#include "h2_h2.h"
#include "h2_mplx.h"
-#include "h2_request.h"
#include "h2_task.h"
#include "h2_worker.h"
{
h2_worker *worker = (h2_worker *)wctx;
apr_status_t status;
+ int sticky;
while (!worker->aborted) {
- h2_mplx *m;
h2_task *task;
- /* Get a h2_mplx + h2_request from the main workers queue. */
- status = worker->get_next(worker, &m, &task, worker->ctx);
-
+ /* Get a h2_task from the main workers queue. */
+ status = worker->get_next(worker, worker->ctx, &task, &sticky);
while (task) {
- h2_task_do(task, worker->io, m->dummy_socket);
+ h2_task_do(task, worker->io, task->mplx->dummy_socket);
- if (task->frozen) {
- /* this task was handed over to someone else for processing */
- h2_task_thaw(task);
+ /* if someone was waiting on this task, time to wake up */
+ apr_thread_cond_signal(worker->io);
+ /* report the task done and maybe get another one from the same
+ * mplx (= master connection), if we can be sticky.
+ */
+ if (sticky && !worker->aborted) {
+ h2_mplx_task_done(task->mplx, task, &task);
+ }
+ else {
+ h2_mplx_task_done(task->mplx, task, NULL);
task = NULL;
}
- apr_thread_cond_signal(worker->io);
- h2_mplx_task_done(m, task, worker->aborted? NULL : &task);
}
}
#include <http_core.h>
#include <http_log.h>
+#include "h2.h"
#include "h2_private.h"
#include "h2_mplx.h"
-#include "h2_request.h"
+#include "h2_task.h"
#include "h2_worker.h"
#include "h2_workers.h"
/**
* Get the next task for the given worker. Will block until a task arrives
* or the max_wait timer expires and more than min workers exist.
- * The previous h2_mplx instance might be passed in and will be served
- * with preference, since we can ask it for the next task without aquiring
- * the h2_workers lock.
*/
-static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
- struct h2_task **ptask, void *ctx)
+static apr_status_t get_mplx_next(h2_worker *worker, void *ctx,
+ h2_task **ptask, int *psticky)
{
apr_status_t status;
- apr_time_t max_wait, start_wait;
+ apr_time_t max_wait, start_wait = 0;
h2_workers *workers = (h2_workers *)ctx;
+ h2_task *task = NULL;
- max_wait = apr_time_from_sec(apr_atomic_read32(&workers->max_idle_secs));
- start_wait = apr_time_now();
+ *ptask = NULL;
+ *psticky = 0;
status = apr_thread_mutex_lock(workers->lock);
if (status == APR_SUCCESS) {
- struct h2_task *task = NULL;
- h2_mplx *m = NULL;
- int has_more = 0;
-
- ++workers->idle_worker_count;
+ ++workers->idle_workers;
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
"h2_worker(%d): looking for work", h2_worker_get_id(worker));
* new mplx to arrive. Depending on how many workers do exist,
* we do a timed wait or block indefinitely.
*/
- m = NULL;
while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
+ h2_mplx *m;
+ int has_more = 0;
+
m = H2_MPLX_LIST_FIRST(&workers->mplxs);
H2_MPLX_REMOVE(m);
+ --workers->mplx_count;
task = h2_mplx_pop_task(m, &has_more);
+
if (task) {
if (has_more) {
H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
- }
- else {
- has_more = !H2_MPLX_LIST_EMPTY(&workers->mplxs);
+ ++workers->mplx_count;
}
break;
}
*/
cleanup_zombies(workers, 0);
- if (workers->worker_count > workers->min_size) {
- apr_time_t now = apr_time_now();
- if (now >= (start_wait + max_wait)) {
+ if (workers->worker_count > workers->min_workers) {
+ if (start_wait == 0) {
+ start_wait = apr_time_now();
+ max_wait = apr_time_from_sec(apr_atomic_read32(&workers->max_idle_secs));
+ }
+ else if (apr_time_now() >= (start_wait + max_wait)) {
/* waited long enough without getting a task. */
- if (workers->worker_count > workers->min_size) {
+ if (workers->worker_count > workers->min_workers) {
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0,
workers->s,
"h2_workers: aborting idle worker");
* needed to give up with more than enough workers.
*/
if (task) {
- *pm = m;
+ /* Ok, we got something to give back to the worker for execution.
+ * If we have more idle workers than h2_mplx in our queue, then
+ * we let the worker be sticky, e.g. making it poll the task's
+ * h2_mplx instance for more work before asking back here.
+ * This avoids entering our global lock as long as enough idle
+ * workers remain.
+ */
*ptask = task;
- if (has_more && workers->idle_worker_count > 1) {
+ *psticky = (workers->idle_workers - 1 > workers->mplx_count);
+
+ if (workers->mplx_count && workers->idle_workers > 1) {
apr_thread_cond_signal(workers->mplx_added);
}
status = APR_SUCCESS;
status = APR_EOF;
}
- --workers->idle_worker_count;
+ --workers->idle_workers;
apr_thread_mutex_unlock(workers->lock);
}
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
"h2_workers: starting");
- while (workers->worker_count < workers->min_size
+ while (workers->worker_count < workers->min_workers
&& status == APR_SUCCESS) {
status = add_worker(workers);
}
}
h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
- int min_size, int max_size,
+ int min_workers, int max_workers,
apr_size_t max_tx_handles)
{
apr_status_t status;
if (workers) {
workers->s = s;
workers->pool = pool;
- workers->min_size = min_size;
- workers->max_size = max_size;
+ workers->min_workers = min_workers;
+ workers->max_workers = max_workers;
apr_atomic_set32(&workers->max_idle_secs, 10);
workers->max_tx_handles = max_tx_handles;
}
else {
H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
+ ++workers->mplx_count;
status = APR_SUCCESS;
}
- if (workers->idle_worker_count > 0) {
+ if (workers->idle_workers > 0) {
apr_thread_cond_signal(workers->mplx_added);
}
else if (status == APR_SUCCESS
- && workers->worker_count < workers->max_size) {
+ && workers->worker_count < workers->max_workers) {
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
"h2_workers: got %d worker, adding 1",
workers->worker_count);