return dst;
}
+#if !AP_MODULE_MAGIC_AT_LEAST(20120211, 106)
+static request_rec *my_ap_create_request(conn_rec *c)
+{
+ apr_pool_t *p;
+ request_rec *r;
+
+ apr_pool_create(&p, c->pool);
+ apr_pool_tag(p, "request");
+ r = apr_pcalloc(p, sizeof(request_rec));
+ AP_READ_REQUEST_ENTRY((intptr_t)r, (uintptr_t)c);
+ r->pool = p;
+ r->connection = c;
+ r->server = c->base_server;
+
+ r->user = NULL;
+ r->ap_auth_type = NULL;
+
+ r->allowed_methods = ap_make_method_list(p, 2);
+
+ r->headers_in = apr_table_make(r->pool, 5);
+ r->trailers_in = apr_table_make(r->pool, 5);
+ r->subprocess_env = apr_table_make(r->pool, 25);
+ r->headers_out = apr_table_make(r->pool, 12);
+ r->err_headers_out = apr_table_make(r->pool, 5);
+ r->trailers_out = apr_table_make(r->pool, 5);
+ r->notes = apr_table_make(r->pool, 5);
+
+ r->request_config = ap_create_request_config(r->pool);
+ /* Must be set before we run create request hook */
+
+ r->proto_output_filters = c->output_filters;
+ r->output_filters = r->proto_output_filters;
+ r->proto_input_filters = c->input_filters;
+ r->input_filters = r->proto_input_filters;
+ ap_run_create_request(r);
+ r->per_dir_config = r->server->lookup_defaults;
+
+ r->sent_bodyct = 0; /* bytect isn't for body */
+
+ r->read_length = 0;
+ r->read_body = REQUEST_NO_BODY;
+
+ r->status = HTTP_OK; /* Until further notice */
+ r->header_only = 0;
+ r->the_request = NULL;
+
+ /* Begin by presuming any module can make its own path_info assumptions,
+ * until some module interjects and changes the value.
+ */
+ r->used_path_info = AP_REQ_DEFAULT_PATH_INFO;
+
+ r->useragent_addr = c->client_addr;
+ r->useragent_ip = c->client_ip;
+ return r;
+}
+#endif
+
request_rec *h2_request_create_rec(const h2_request *req, conn_rec *c)
{
int access_status = HTTP_OK;
+#if AP_MODULE_MAGIC_AT_LEAST(20120211, 106)
request_rec *r = ap_create_request(c);
+#else
+ request_rec *r = my_ap_create_request(c);
+#endif
+#if AP_MODULE_MAGIC_AT_LEAST(20120211, 107)
ap_run_pre_read_request(r, c);
/* Time to populate r with the data we have. */
r->status = HTTP_OK;
goto die;
}
+#else
+ {
+ const char *s;
+
+ r->headers_in = apr_table_clone(r->pool, req->headers);
+ ap_run_pre_read_request(r, c);
+
+ /* Time to populate r with the data we have. */
+ r->request_time = req->request_time;
+ r->method = apr_pstrdup(r->pool, req->method);
+ /* Provide quick information about the request method as soon as known */
+ r->method_number = ap_method_number_of(r->method);
+ if (r->method_number == M_GET && r->method[0] == 'H') {
+ r->header_only = 1;
+ }
+ ap_parse_uri(r, req->path ? req->path : "");
+ r->protocol = (char*)"HTTP/2.0";
+ r->proto_num = HTTP_VERSION(2, 0);
+ r->the_request = apr_psprintf(r->pool, "%s %s HTTP/2.0",
+ r->method, req->path ? req->path : "");
+
+ /* Start with r->hostname = NULL, ap_check_request_header() will get it
+ * form Host: header, otherwise we get complains about port numbers.
+ */
+ r->hostname = NULL;
+ ap_update_vhost_from_headers(r);
+
+ /* we may have switched to another server */
+ r->per_dir_config = r->server->lookup_defaults;
+
+ s = apr_table_get(r->headers_in, "Expect");
+ if (s && s[0]) {
+ if (ap_cstr_casecmp(s, "100-continue") == 0) {
+ r->expecting_100 = 1;
+ }
+ else {
+ r->status = HTTP_EXPECTATION_FAILED;
+ access_status = r->status;
+ goto die;
+ }
+ }
+ }
+#endif
/* we may have switched to another server */
r->per_dir_config = r->server->lookup_defaults;
apr_thread_t *thread;
apr_thread_mutex_t *lock;
apr_thread_cond_t *not_idle;
+ volatile apr_uint32_t timed_out;
};
static h2_slot *pop_slot(h2_slot *volatile *phead)
}
static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx);
+static void slot_done(h2_slot *slot);
static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
{
- apr_status_t status;
+ apr_status_t rv;
slot->workers = workers;
slot->task = NULL;
+ apr_thread_mutex_lock(workers->lock);
if (!slot->lock) {
- status = apr_thread_mutex_create(&slot->lock,
+ rv = apr_thread_mutex_create(&slot->lock,
APR_THREAD_MUTEX_DEFAULT,
workers->pool);
- if (status != APR_SUCCESS) {
- push_slot(&workers->free, slot);
- return status;
- }
+ if (rv != APR_SUCCESS) goto cleanup;
}
if (!slot->not_idle) {
- status = apr_thread_cond_create(&slot->not_idle, workers->pool);
- if (status != APR_SUCCESS) {
- push_slot(&workers->free, slot);
- return status;
- }
+ rv = apr_thread_cond_create(&slot->not_idle, workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
}
- ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
+ ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s,
"h2_workers: new thread for slot %d", slot->id);
/* thread will either immediately start work or add itself
* to the idle queue */
apr_atomic_inc32(&workers->worker_count);
- status = apr_thread_create(&slot->thread, workers->thread_attr,
+ slot->timed_out = 0;
+ rv = apr_thread_create(&slot->thread, workers->thread_attr,
slot_run, slot, workers->pool);
- if (status != APR_SUCCESS) {
+ if (rv != APR_SUCCESS) {
apr_atomic_dec32(&workers->worker_count);
+ }
+
+cleanup:
+ apr_thread_mutex_unlock(workers->lock);
+ if (rv != APR_SUCCESS) {
push_slot(&workers->free, slot);
- return status;
}
-
- return APR_SUCCESS;
+ return rv;
}
static apr_status_t add_worker(h2_workers *workers)
{
h2_slot *slot = pop_slot(&workers->idle);
if (slot) {
+ int timed_out = 0;
apr_thread_mutex_lock(slot->lock);
- apr_thread_cond_signal(slot->not_idle);
+ timed_out = slot->timed_out;
+ if (!timed_out) {
+ apr_thread_cond_signal(slot->not_idle);
+ }
apr_thread_mutex_unlock(slot->lock);
+ if (timed_out) {
+ slot_done(slot);
+ wake_idle_worker(workers);
+ }
}
- else if (workers->dynamic) {
+ else if (workers->dynamic && !workers->shutdown) {
add_worker(workers);
}
}
static int get_next(h2_slot *slot)
{
h2_workers *workers = slot->workers;
+ int non_essential = slot->id >= workers->min_workers;
+ apr_status_t rv;
- while (!workers->aborted) {
+ while (!workers->aborted && !slot->timed_out) {
ap_assert(slot->task == NULL);
+ if (non_essential && workers->shutdown) {
+ /* Terminate non-essential worker on shutdown */
+ break;
+ }
if (h2_fifo_try_peek(workers->mplxs, mplx_peek, slot) == APR_EOF) {
/* The queue is terminated with the MPM child being cleaned up,
- * just leave.
- */
+ * just leave. */
break;
}
if (slot->task) {
apr_thread_mutex_lock(slot->lock);
if (!workers->aborted) {
+
push_slot(&workers->idle, slot);
- apr_thread_cond_wait(slot->not_idle, slot->lock);
+ if (non_essential && workers->max_idle_duration) {
+ rv = apr_thread_cond_timedwait(slot->not_idle, slot->lock,
+ workers->max_idle_duration);
+ if (APR_TIMEUP == rv) {
+ slot->timed_out = 1;
+ }
+ }
+ else {
+ apr_thread_cond_wait(slot->not_idle, slot->lock);
+ }
}
apr_thread_mutex_unlock(slot->lock);
}
} while (slot->task);
}
- slot_done(slot);
+ if (!slot->timed_out) {
+ slot_done(slot);
+ }
apr_thread_exit(thread, APR_SUCCESS);
return NULL;
}
-static apr_status_t workers_pool_cleanup(void *data)
+static void wake_non_essential_workers(h2_workers *workers)
{
- h2_workers *workers = data;
h2_slot *slot;
-
+ /* pop all idle, signal the non essentials and add the others again */
+ if ((slot = pop_slot(&workers->idle))) {
+ wake_non_essential_workers(workers);
+ if (slot->id > workers->min_workers) {
+ apr_thread_mutex_lock(slot->lock);
+ apr_thread_cond_signal(slot->not_idle);
+ apr_thread_mutex_unlock(slot->lock);
+ }
+ else {
+ push_slot(&workers->idle, slot);
+ }
+ }
+}
+
+static void workers_abort_idle(h2_workers *workers)
+{
+ h2_slot *slot;
+
+ workers->shutdown = 1;
workers->aborted = 1;
h2_fifo_term(workers->mplxs);
apr_thread_cond_signal(slot->not_idle);
apr_thread_mutex_unlock(slot->lock);
}
+}
+
+static apr_status_t workers_pool_cleanup(void *data)
+{
+ h2_workers *workers = data;
+
+ workers_abort_idle(workers);
/* wait for all the workers to become zombies and join them */
apr_thread_mutex_lock(workers->lock);
int min_workers, int max_workers,
int idle_secs)
{
- apr_status_t status;
+ apr_status_t rv;
h2_workers *workers;
apr_pool_t *pool;
int i, n;
workers->pool = pool;
workers->min_workers = min_workers;
workers->max_workers = max_workers;
- workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
+ workers->max_idle_duration = apr_time_from_sec((idle_secs > 0)? idle_secs : 10);
/* FIXME: the fifo set we use here has limited capacity. Once the
* set is full, connections with new requests do a wait. Unfortunately,
* For now, we just make enough room to have many connections inside one
* process.
*/
- status = h2_fifo_set_create(&workers->mplxs, pool, 8 * 1024);
- if (status != APR_SUCCESS) {
- return NULL;
- }
-
- status = apr_threadattr_create(&workers->thread_attr, workers->pool);
- if (status != APR_SUCCESS) {
- return NULL;
- }
-
+ rv = h2_fifo_set_create(&workers->mplxs, pool, 8 * 1024);
+ if (rv != APR_SUCCESS) goto cleanup;
+
+ rv = apr_threadattr_create(&workers->thread_attr, workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
+
if (ap_thread_stacksize != 0) {
apr_threadattr_stacksize_set(workers->thread_attr,
ap_thread_stacksize);
(long)ap_thread_stacksize);
}
- status = apr_thread_mutex_create(&workers->lock,
- APR_THREAD_MUTEX_DEFAULT,
- workers->pool);
- if (status == APR_SUCCESS) {
- status = apr_thread_cond_create(&workers->all_done, workers->pool);
+ rv = apr_thread_mutex_create(&workers->lock,
+ APR_THREAD_MUTEX_DEFAULT,
+ workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
+ rv = apr_thread_cond_create(&workers->all_done, workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
+
+ n = workers->nslots = workers->max_workers;
+ workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
+ if (workers->slots == NULL) {
+ n = workers->nslots = 0;
+ rv = APR_ENOMEM;
+ goto cleanup;
}
- if (status == APR_SUCCESS) {
- n = workers->nslots = workers->max_workers;
- workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
- if (workers->slots == NULL) {
- n = workers->nslots = 0;
- status = APR_ENOMEM;
- }
- for (i = 0; i < n; ++i) {
- workers->slots[i].id = i;
- }
+ for (i = 0; i < n; ++i) {
+ workers->slots[i].id = i;
}
- if (status == APR_SUCCESS) {
- /* we activate all for now, TODO: support min_workers again.
- * do this in reverse for vanity reasons so slot 0 will most
- * likely be at head of idle queue. */
- n = workers->max_workers;
- for (i = n-1; i >= 0; --i) {
- status = activate_slot(workers, &workers->slots[i]);
- }
- /* the rest of the slots go on the free list */
- for(i = n; i < workers->nslots; ++i) {
- push_slot(&workers->free, &workers->slots[i]);
- }
- workers->dynamic = (workers->worker_count < workers->max_workers);
+ /* we activate all for now, TODO: support min_workers again.
+ * do this in reverse for vanity reasons so slot 0 will most
+ * likely be at head of idle queue. */
+ n = workers->min_workers;
+ for (i = n-1; i >= 0; --i) {
+ rv = activate_slot(workers, &workers->slots[i]);
+ if (rv != APR_SUCCESS) goto cleanup;
+ }
+ /* the rest of the slots go on the free list */
+ for(i = n; i < workers->nslots; ++i) {
+ push_slot(&workers->free, &workers->slots[i]);
}
- if (status == APR_SUCCESS) {
+ workers->dynamic = (workers->worker_count < workers->max_workers);
+
+cleanup:
+ if (rv == APR_SUCCESS) {
/* Stop/join the workers threads when the MPM child exits (pchild is
* destroyed), and as a pre_cleanup of pchild thus before the threads
* pools (children of workers->pool) so that they are not destroyed
{
return h2_fifo_remove(workers->mplxs, m);
}
+
+void h2_workers_graceful_shutdown(h2_workers *workers)
+{
+ workers->shutdown = 1;
+ h2_fifo_term(workers->mplxs);
+ wake_non_essential_workers(workers);
+}