/* Replace these */
c->master = master;
- c->pool = p;
+ c->pool = p;
c->current_thread = thread;
c->conn_config = ap_create_conn_config(p);
c->notes = apr_table_make(p, 5);
return io;
}
-void h2_io_destroy(h2_io *io)
-{
- if (io->pool) {
- apr_pool_destroy(io->pool);
- /* gone */
- }
-}
-
void h2_io_set_response(h2_io *io, h2_response *response)
{
AP_DEBUG_ASSERT(io->pool);
*/
h2_io *h2_io_create(int id, apr_pool_t *pool);
-/**
- * Frees any resources hold by the h2_io instance.
- */
-void h2_io_destroy(h2_io *io);
-
/**
* Set the response of this stream.
*/
return sp;
}
-void h2_io_set_destroy(h2_io_set *sp)
-{
- int i;
- for (i = 0; i < sp->list->nelts; ++i) {
- h2_io *io = h2_io_IDX(sp->list, i);
- h2_io_destroy(io);
- }
- sp->list->nelts = 0;
-}
-
static int h2_stream_id_cmp(const void *s1, const void *s2)
{
h2_io **pio1 = (h2_io **)s1;
int last;
APR_ARRAY_PUSH(sp->list, h2_io*) = io;
/* Normally, streams get added in ascending order if id. We
- * keep the array sorted, so we just need to check of the newly
+ * keep the array sorted, so we just need to check if the newly
* appended stream has a lower id than the last one. if not,
* sorting is not necessary.
*/
--sp->list->nelts;
n = sp->list->nelts - idx;
if (n > 0) {
- /* Close the hole in the array by moving the upper
- * parts down one step.
- */
+ /* There are n h2_io* behind idx. Move the rest down */
h2_io **selts = (h2_io**)sp->list->elts;
memmove(selts + idx, selts + idx + 1, n * sizeof(h2_io*));
}
int i;
for (i = 0; i < sp->list->nelts; ++i) {
h2_io *e = h2_io_IDX(sp->list, i);
- if (e == io) {
+ if (e->id == io->id) {
remove_idx(sp, i);
return e;
}
return NULL;
}
-h2_io *h2_io_set_pop_highest_prio(h2_io_set *set)
+h2_io *h2_io_set_shift(h2_io_set *set)
{
/* For now, this just removes the first element in the set.
* the name is misleading...
h2_io_set *h2_io_set_create(apr_pool_t *pool);
-void h2_io_set_destroy(h2_io_set *set);
-
apr_status_t h2_io_set_add(h2_io_set *set, struct h2_io *io);
h2_io *h2_io_set_get(h2_io_set *set, int stream_id);
h2_io *h2_io_set_remove(h2_io_set *set, struct h2_io *io);
* @param ctx user data for the callback
* @return 1 iff iteration completed for all members
*/
-int h2_io_set_iter(h2_io_set *set,
- h2_io_set_iter_fn *iter, void *ctx);
+int h2_io_set_iter(h2_io_set *set, h2_io_set_iter_fn *iter, void *ctx);
-h2_io *h2_io_set_pop_highest_prio(h2_io_set *set);
+h2_io *h2_io_set_shift(h2_io_set *set);
#endif /* defined(__mod_h2__h2_io_set__) */
"h2_mplx(%ld): destroy, ios=%d",
m->id, (int)h2_io_set_size(m->stream_ios));
m->aborted = 1;
- if (m->ready_ios) {
- h2_io_set_destroy(m->ready_ios);
- m->ready_ios = NULL;
- }
- if (m->stream_ios) {
- h2_io_set_destroy(m->stream_ios);
- m->stream_ios = NULL;
- }
check_tx_free(m);
if (!m->pool) {
return NULL;
}
+ apr_pool_tag(m->pool, "h2_mplx");
apr_allocator_owner_set(allocator, m->pool);
status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
h2_mplx_destroy(m);
return NULL;
}
+
+ status = apr_socket_create(&m->dummy_socket, APR_INET, SOCK_STREAM,
+ APR_PROTO_TCP, m->pool);
+ if (status != APR_SUCCESS) {
+ h2_mplx_destroy(m);
+ return NULL;
+ }
m->q = h2_iq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
m->stream_ios = h2_io_set_create(m->pool);
h2_io_set_remove(m->stream_ios, io);
h2_io_set_remove(m->ready_ios, io);
- h2_io_destroy(io);
if (pool) {
apr_pool_clear(pool);
return io_stream_done((h2_mplx*)ctx, io, 0);
}
+static int stream_print(void *ctx, h2_io *io)
+{
+ h2_mplx *m = ctx;
+ if (io && io->request) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ "->03198: h2_stream(%ld-%d): %s %s %s -> %s %d"
+ "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]",
+ m->id, io->id,
+ io->request->method, io->request->authority, io->request->path,
+ io->response? "http" : (io->rst_error? "reset" : "?"),
+ io->response? io->response->http_status : io->rst_error,
+ io->orphaned, io->processing_started, io->processing_done,
+ io->eos_in, io->eos_out);
+ }
+ else if (io) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ "->03198: h2_stream(%ld-%d): NULL -> %s %d"
+ "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]",
+ m->id, io->id,
+ io->response? "http" : (io->rst_error? "reset" : "?"),
+ io->response? io->response->http_status : io->rst_error,
+ io->orphaned, io->processing_started, io->processing_done,
+ io->eos_in, io->eos_out);
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ "->03198: h2_stream(%ld-NULL): NULL", m->id);
+ }
+ return 1;
+}
+
apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
{
apr_status_t status;
/* Any remaining ios have handed out requests to workers that are
* not done yet. Any operation they do on their assigned stream ios will
- * be errored ECONNRESET/ABORTED, so that should find out pretty soon.
+ * be errored ECONNRESET/ABORTED, so they should find out pretty soon.
*/
for (i = 0; h2_io_set_size(m->stream_ios) > 0; ++i) {
m->join_wait = wait;
"h2_mplx(%ld): release, waiting for %d seconds now for "
"all h2_workers to return, have still %d requests outstanding",
m->id, i*wait_secs, (int)h2_io_set_size(m->stream_ios));
+ if (i == 1) {
+ h2_io_set_iter(m->stream_ios, stream_print, m);
+ }
}
m->aborted = 1;
apr_thread_cond_broadcast(m->request_done);
return status;
}
-static const h2_request *pop_request(h2_mplx *m)
-{
- const h2_request *req = NULL;
- int sid;
- while (!m->aborted && !req && (sid = h2_iq_shift(m->q)) > 0) {
- h2_io *io = h2_io_set_get(m->stream_ios, sid);
- if (io) {
- req = io->request;
- io->processing_started = 1;
- if (sid > m->max_stream_started) {
- m->max_stream_started = sid;
- }
- }
- }
- return req;
-}
-
-void h2_mplx_request_done(h2_mplx *m, int stream_id, const h2_request **preq)
-{
- int acquired;
-
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- if (stream_id) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): request(%d) done", m->id, stream_id);
- if (io) {
- io->processing_done = 1;
- h2_mplx_out_close(m, stream_id, NULL);
- if (io->orphaned) {
- io_destroy(m, io, 0);
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
- }
- else {
- /* hang around until the stream deregisteres */
- }
- }
- else {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
- "h2_mplx(%ld): request(%d) done, no io found",
- m->id, stream_id);
- }
- apr_thread_cond_broadcast(m->request_done);
- }
-
- if (preq) {
- /* someone wants another request, if we have */
- *preq = pop_request(m);
- }
- leave_mutex(m, acquired);
- }
-}
-
apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
int stream_id, apr_bucket_brigade *bb,
apr_table_t *trailers,
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_pop_highest_prio(m->ready_ios);
+ h2_io *io = h2_io_set_shift(m->ready_ios);
if (io && !m->aborted) {
stream = h2_stream_set_get(streams, io->id);
if (stream) {
* reset by the client. Should no longer happen since such
* streams should clear io's from the ready queue.
*/
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO()
"h2_mplx(%ld): stream for response %d closed, "
"resetting io to close request processing",
m->id, io->id);
if (!io_pool) {
apr_pool_create(&io_pool, m->pool);
+ apr_pool_tag(io_pool, "h2_io");
}
else {
m->spare_pool = NULL;
return status;
}
-const h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more)
+static h2_task *pop_task(h2_mplx *m)
{
- const h2_request *req = NULL;
+ h2_task *task = NULL;
+ int sid;
+ while (!m->aborted && !task && (sid = h2_iq_shift(m->q)) > 0) {
+ h2_io *io = h2_io_set_get(m->stream_ios, sid);
+ if (io) {
+ conn_rec *c;
+ apr_pool_t *task_pool;
+ apr_allocator_t *task_allocator = NULL;
+
+ /* We create a pool with its own allocator to be used for
+ * processing a request. This is the only way to have the processing
+ * independant of the worker pool as the h2_mplx pool as well as
+ * not sensitive to which thread it is in.
+ * In that sense, memory allocation and lifetime is similar to a master
+ * connection.
+ * The main goal in this is that slave connections and requests will
+ * - one day - be suspended and resumed in different threads.
+ */
+ apr_allocator_create(&task_allocator);
+ apr_pool_create_ex(&task_pool, io->pool, NULL, task_allocator);
+ apr_pool_tag(task_pool, "h2_task");
+ apr_allocator_owner_set(task_allocator, task_pool);
+
+ c = h2_slave_create(m->c, task_pool, m->c->current_thread, m->dummy_socket);
+ task = h2_task_create(m->id, io->request, c, m);
+
+ io->processing_started = 1;
+ if (sid > m->max_stream_started) {
+ m->max_stream_started = sid;
+ }
+ }
+ }
+ return task;
+}
+
+h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
+{
+ h2_task *task = NULL;
apr_status_t status;
int acquired;
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
if (m->aborted) {
- req = NULL;
*has_more = 0;
}
else {
- req = pop_request(m);
+ task = pop_task(m);
*has_more = !h2_iq_empty(m->q);
}
leave_mutex(m, acquired);
}
- return req;
+ return task;
+}
+
+void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
+{
+ int acquired;
+
+ if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ if (task) {
+ h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): task(%s) done", m->id, task->id);
+ /* clean our references and report request as done. Signal
+ * that we want another unless we have been aborted */
+ /* TODO: this will keep a worker attached to this h2_mplx as
+ * long as it has requests to handle. Might no be fair to
+ * other mplx's. Perhaps leave after n requests? */
+
+ if (task->c) {
+ apr_pool_destroy(task->c->pool);
+ }
+ task = NULL;
+ if (io) {
+ io->processing_done = 1;
+ h2_mplx_out_close(m, io->id, NULL);
+ if (io->orphaned) {
+ io_destroy(m, io, 0);
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
+ }
+ else {
+ /* hang around until the stream deregisteres */
+ }
+ }
+ apr_thread_cond_broadcast(m->request_done);
+ }
+
+ if (ptask) {
+ /* caller wants another task */
+ *ptask = pop_task(m);
+ }
+ leave_mutex(m, acquired);
+ }
}
}
if (!engine && einit) {
- engine = apr_pcalloc(task->pool, sizeof(*engine));
- engine->pub.id = apr_psprintf(task->pool, "eng-%ld-%d",
+ engine = apr_pcalloc(task->c->pool, sizeof(*engine));
+ engine->pub.id = apr_psprintf(task->c->pool, "eng-%ld-%d",
m->id, m->next_eng_id++);
- engine->pub.pool = task->pool;
- engine->pub.type = apr_pstrdup(task->pool, engine_type);
+ engine->pub.pool = task->c->pool;
+ engine->pub.type = apr_pstrdup(task->c->pool, engine_type);
engine->c = r->connection;
engine->m = m;
engine->io = task->io;
"h2_mplx(%ld): request %s pulled by engine %s",
m->c->id, r->the_request, engine->pub.id);
engine->no_live++;
+ r->connection->current_thread = engine->c->current_thread;
*pr = r;
return APR_SUCCESS;
}
m->id, task->id, aborted? "aborted":"done",
engine->pub.id);
h2_task_output_close(task->output);
- h2_mplx_request_done(m, task->stream_id, NULL);
- apr_pool_destroy(task->pool);
engine->no_finished++;
if (waslive) engine->no_live--;
engine->no_assigned--;
+ if (task->c != engine->c) { /* do not release what the engine runs on */
+ h2_mplx_task_done(m, task, NULL);
+ }
}
void h2_mplx_engine_done(h2_req_engine *pub_engine, conn_rec *r_conn)
void *entry;
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
"h2_mplx(%ld): exit engine %s (%s), "
- "has still %d requests queued, "
+ "has still %d requests queued, shutdown=%d,"
"assigned=%ld, live=%ld, finished=%ld",
m->c->id, engine->pub.id, engine->pub.type,
(int)apr_queue_size(engine->queue),
+ engine->shutdown,
(long)engine->no_assigned, (long)engine->no_live,
(long)engine->no_finished);
while (apr_queue_trypop(engine->queue, &entry) == APR_SUCCESS) {
engine_done(m, engine, task, 0, 1);
}
}
- if (engine->no_assigned > 1 || engine->no_live > 1) {
+ if (engine->no_assigned > 0 || engine->no_live > 0) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
"h2_mplx(%ld): exit engine %s (%s), "
"assigned=%ld, live=%ld, finished=%ld",
struct apr_thread_cond_t *added_output;
struct apr_thread_cond_t *request_done;
struct apr_thread_cond_t *join_wait;
+ apr_socket_t *dummy_socket;
apr_size_t stream_max_mem;
apr_interval_time_t stream_timeout;
*/
void h2_mplx_abort(h2_mplx *mplx);
-void h2_mplx_request_done(h2_mplx *m, int stream_id, const struct h2_request **preq);
+struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more);
+
+void h2_mplx_task_done(h2_mplx *m, struct h2_task *task, struct h2_task **ptask);
/**
* Get the highest stream identifier that has been passed on to processing.
*/
apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx);
-const struct h2_request *h2_mplx_pop_request(h2_mplx *mplx, int *has_more);
-
/**
* Register a callback for the amount of input data consumed per stream. The
* will only ever be invoked from the thread creating this h2_mplx, e.g. when
APLOG_USE_MODULE(http2);
-
#endif
h2_proxy_session *session = p_conn->data;
if (session && session->ngh2) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "proxy_session(%s): shutdown, state=%d, streams=%d",
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ "proxy_session(%s): pool cleanup, state=%d, streams=%d",
session->id, session->state,
h2_iq_size(session->streams));
dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
const nghttp2_frame *frame, void *user_data)
{
h2_proxy_session *session = user_data;
- switch (frame->hd.type) {
- case NGHTTP2_GOAWAY:
- if (APLOGcinfo(session->c)) {
- char buffer[256];
-
- h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
- ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO()
- "h2_session(%s): sent FRAME[%s]",
- session->id, buffer);
- }
- break;
- default:
- if (APLOGcdebug(session->c)) {
- char buffer[256];
-
- h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
- "h2_session(%s): sent FRAME[%s]",
- session->id, buffer);
- }
- break;
+ if (APLOGcdebug(session->c)) {
+ char buffer[256];
+
+ h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
+ "h2_session(%s): sent FRAME[%s]",
+ session->id, buffer);
}
return 0;
}
nghttp2_session_callbacks_set_send_callback(cbs, raw_send);
nghttp2_option_new(&option);
- nghttp2_option_set_peer_max_concurrent_streams(option, 20);
+ nghttp2_option_set_peer_max_concurrent_streams(option, 100);
nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
status = ap_get_brigade(session->c->input_filters, session->input,
AP_MODE_READBYTES,
block? APR_BLOCK_READ : APR_NONBLOCK_READ,
- APR_BUCKET_BUFF_SIZE);
+ 64 * 1024);
if (status == APR_SUCCESS) {
if (APR_BRIGADE_EMPTY(session->input)) {
status = APR_EAGAIN;
return APR_SUCCESS;
}
else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, session->c,
"h2_proxy_stream(%s-%d): check input",
session->id, stream_id);
h2_iq_remove(session->suspended, stream_id);
r->allowed_methods = ap_make_method_list(p, 2);
- r->headers_in = apr_table_copy(r->pool, req->headers);
+ r->headers_in = apr_table_clone(r->pool, req->headers);
r->trailers_in = apr_table_make(r->pool, 5);
r->subprocess_env = apr_table_make(r->pool, 25);
r->headers_out = apr_table_make(r->pool, 12);
}
else {
apr_pool_create(&stream_pool, session->pool);
+ apr_pool_tag(stream_pool, "h2_stream");
}
stream = h2_stream_open(stream_id, stream_pool, session);
if (status != APR_SUCCESS) {
return NULL;
}
+ apr_pool_tag(pool, "h2_session");
session = apr_pcalloc(pool, sizeof(h2_session));
if (session) {
#include "h2_private.h"
#include "h2_conn.h"
#include "h2_config.h"
+#include "h2_ctx.h"
#include "h2_from_h1.h"
#include "h2_h2.h"
#include "h2_mplx.h"
}
h2_task *h2_task_create(long session_id, const h2_request *req,
- apr_pool_t *pool, h2_mplx *mplx)
+ conn_rec *c, h2_mplx *mplx)
{
- h2_task *task = apr_pcalloc(pool, sizeof(h2_task));
+ h2_task *task = apr_pcalloc(c->pool, sizeof(h2_task));
if (task == NULL) {
- ap_log_perror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, pool,
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, c,
APLOGNO(02941) "h2_task(%ld-%d): create stream task",
session_id, req->id);
h2_mplx_out_close(mplx, req->id, NULL);
return NULL;
}
- task->id = apr_psprintf(pool, "%ld-%d", session_id, req->id);
+ task->id = apr_psprintf(c->pool, "%ld-%d", session_id, req->id);
task->stream_id = req->id;
- task->pool = pool;
+ task->c = c;
task->mplx = mplx;
task->request = req;
task->input_eos = !req->body;
task->ser_headers = req->serialize;
+ h2_ctx_create_for(c, task);
+
return task;
}
-apr_status_t h2_task_do(h2_task *task, conn_rec *c, apr_thread_cond_t *cond,
+apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond,
apr_socket_t *socket)
{
apr_status_t status;
AP_DEBUG_ASSERT(task);
task->io = cond;
- task->input = h2_task_input_create(task, c);
- task->output = h2_task_output_create(task, c);
+ task->input = h2_task_input_create(task, task->c);
+ task->output = h2_task_output_create(task, task->c);
- ap_process_connection(c, socket);
+ ap_process_connection(task->c, socket);
if (task->frozen) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
"h2_task(%s): process_conn returned frozen task",
task->id);
/* cleanup delayed */
status = APR_EAGAIN;
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
"h2_task(%s): processing done", task->id);
status = APR_SUCCESS;
}
apr_status_t h2_task_freeze(h2_task *task, request_rec *r)
{
if (!task->frozen) {
- conn_rec *c = task->output->c;
+ conn_rec *c = task->c;
task->frozen = 1;
task->frozen_out = apr_brigade_create(c->pool, c->bucket_alloc);
ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->output->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c,
"h2_task(%s), frozen", task->id);
}
return APR_SUCCESS;
{
if (task->frozen) {
task->frozen = 0;
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->output->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
"h2_task(%s), thawed", task->id);
}
return APR_SUCCESS;
struct h2_task {
const char *id;
int stream_id;
- apr_pool_t *pool;
+ conn_rec *c;
struct h2_mplx *mplx;
const struct h2_request *request;
};
h2_task *h2_task_create(long session_id, const struct h2_request *req,
- apr_pool_t *pool, struct h2_mplx *mplx);
+ conn_rec *c, struct h2_mplx *mplx);
-apr_status_t h2_task_do(h2_task *task, conn_rec *c,
- struct apr_thread_cond_t *cond, apr_socket_t *socket);
+apr_status_t h2_task_do(h2_task *task, struct apr_thread_cond_t *cond, apr_socket_t *socket);
void h2_task_register_hooks(void);
/*
{
h2_worker *worker = (h2_worker *)wctx;
apr_status_t status;
- apr_pool_t *task_pool = NULL;
-
- (void)thread;
- /* Other code might want to see a socket for this connection this
- * worker processes. Allocate one without further function...
- */
- status = apr_socket_create(&worker->socket,
- APR_INET, SOCK_STREAM,
- APR_PROTO_TCP, worker->pool);
- if (status != APR_SUCCESS) {
- ap_log_perror(APLOG_MARK, APLOG_ERR, status, worker->pool,
- APLOGNO(02948) "h2_worker(%d): alloc socket",
- worker->id);
- worker->worker_done(worker, worker->ctx);
- return NULL;
- }
while (!worker->aborted) {
h2_mplx *m;
- const h2_request *req;
+ h2_task *task;
/* Get a h2_mplx + h2_request from the main workers queue. */
- status = worker->get_next(worker, &m, &req, worker->ctx);
+ status = worker->get_next(worker, &m, &task, worker->ctx);
- while (req) {
- conn_rec *c, *master = m->c;
- h2_task *task;
- int stream_id = req->id;
-
- if (!task_pool) {
- apr_allocator_t *task_allocator = NULL;
- /* We create a root pool with its own allocator to be used for
- * processing a request. This is the only way to have the processing
- * independant of the worker pool as the h2_mplx pool as well as
- * not sensitive to which thread it is in.
- * In that sense, memory allocation and lifetime is similar to a master
- * connection.
- * The main goal in this is that slave connections and requests will
- * - one day - be suspended and resumed in different threads.
- */
- apr_allocator_create(&task_allocator);
- apr_pool_create_ex(&task_pool, NULL, NULL, task_allocator);
- apr_allocator_owner_set(task_allocator, task_pool);
- }
-
- c = h2_slave_create(master, task_pool,
- worker->thread, worker->socket);
-
- task = h2_task_create(m->id, req, task_pool, m);
- h2_ctx_create_for(c, task);
-
- h2_task_do(task, c, worker->io, worker->socket);
+ while (task) {
+ h2_task_do(task, worker->io, m->dummy_socket);
if (task->frozen) {
/* this task was handed over to someone else for processing */
h2_task_thaw(task);
- task_pool = NULL;
- req = NULL;
- h2_mplx_request_done(m, 0, worker->aborted? NULL : &req);
- }
- else {
- /* clean our references and report request as done. Signal
- * that we want another unless we have been aborted */
- /* TODO: this will keep a worker attached to this h2_mplx as
- * long as it has requests to handle. Might no be fair to
- * other mplx's. Perhaps leave after n requests? */
- apr_thread_cond_signal(worker->io);
- if (task_pool) {
- apr_pool_clear(task_pool);
- }
- req = NULL;
- h2_mplx_request_done(m, stream_id, worker->aborted? NULL : &req);
+ task = NULL;
}
+ apr_thread_cond_signal(worker->io);
+ h2_mplx_task_done(m, task, worker->aborted? NULL : &task);
}
}
- if (worker->socket) {
- apr_socket_close(worker->socket);
- worker->socket = NULL;
- }
-
worker->worker_done(worker, worker->ctx);
return NULL;
}
apr_allocator_create(&allocator);
apr_allocator_max_free_set(allocator, ap_max_mem_free);
apr_pool_create_ex(&pool, parent_pool, NULL, allocator);
+ apr_pool_tag(pool, "h2_worker");
apr_allocator_owner_set(allocator, pool);
w = apr_pcalloc(pool, sizeof(h2_worker));
* gets aborted (idle timeout, for example). */
typedef apr_status_t h2_worker_mplx_next_fn(h2_worker *worker,
struct h2_mplx **pm,
- const struct h2_request **preq,
+ struct h2_task **ptask,
void *ctx);
/* Invoked just before the worker thread exits. */
apr_thread_t *thread;
apr_pool_t *pool;
struct apr_thread_cond_t *io;
- apr_socket_t *socket;
h2_worker_mplx_next_fn *get_next;
h2_worker_done_fn *worker_done;
* the h2_workers lock.
*/
static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
- const h2_request **preq, void *ctx)
+ struct h2_task **ptask, void *ctx)
{
apr_status_t status;
apr_time_t max_wait, start_wait;
status = apr_thread_mutex_lock(workers->lock);
if (status == APR_SUCCESS) {
- const h2_request *req = NULL;
+ struct h2_task *task = NULL;
h2_mplx *m = NULL;
int has_more = 0;
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
"h2_worker(%d): looking for work", h2_worker_get_id(worker));
- while (!req && !h2_worker_is_aborted(worker) && !workers->aborted) {
+ while (!task && !h2_worker_is_aborted(worker) && !workers->aborted) {
/* Get the next h2_mplx to process that has a task to hand out.
* If it does, place it at the end of the queu and return the
* we do a timed wait or block indefinitely.
*/
m = NULL;
- while (!req && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
+ while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
m = H2_MPLX_LIST_FIRST(&workers->mplxs);
H2_MPLX_REMOVE(m);
- req = h2_mplx_pop_request(m, &has_more);
- if (req) {
+ task = h2_mplx_pop_task(m, &has_more);
+ if (task) {
if (has_more) {
H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
}
}
}
- if (!req) {
+ if (!task) {
/* Need to wait for a new mplx to arrive.
*/
cleanup_zombies(workers, 0);
/* Here, we either have gotten task and mplx for the worker or
* needed to give up with more than enough workers.
*/
- if (req) {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): start request(%ld-%d)",
- h2_worker_get_id(worker), m->id, req->id);
+ if (task) {
*pm = m;
- *preq = req;
-
+ *ptask = task;
if (has_more && workers->idle_worker_count > 1) {
apr_thread_cond_signal(workers->mplx_added);
}
* happen on the pool handed to us, which we do not guard.
*/
apr_pool_create(&pool, server_pool);
+ apr_pool_tag(pool, "h2_workers");
workers = apr_pcalloc(pool, sizeof(h2_workers));
if (workers) {
workers->s = s;
h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config,
&proxy_http2_module);
if (ctx) {
- engine->capacity = 20; /* conservative guess until we know */
+ engine->capacity = 100; /* guess until we know */
ctx->engine = engine;
return APR_SUCCESS;
}
}
static void request_done(h2_proxy_session *session, request_rec *r)
-{
+{
h2_proxy_ctx *ctx = session->user_data;
if (req_engine_done && r != ctx->rbase) {
apr_status_t status = OK;
h2_proxy_session *session;
+setup_session:
/* Step Two: Make the Connection (or check that an already existing
* socket is still usable). On success, we have a socket connected to
* backend->hostname. */
return HTTP_SERVICE_UNAVAILABLE;
}
+run_session:
session->user_data = ctx;
add_request(session, r);
-
status = APR_EAGAIN;
while (APR_STATUS_IS_EAGAIN(status)) {
status = h2_proxy_session_process(session);
}
}
- if (session->state == H2_PROXYS_ST_DONE) {
+ if (session->state == H2_PROXYS_ST_DONE || status != APR_SUCCESS) {
ctx->p_conn->close = 1;
}
+ if (!ctx->standalone) {
+ ctx->engine->capacity = session->remote_max_concurrent;
+ if (req_engine_pull(ctx->engine, APR_NONBLOCK_READ, &r) == APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_proxy_session(%s): idle, pulled request %s",
+ session->id, r->the_request);
+ add_request(session, r);
+ if (ctx->p_conn->close) {
+ goto setup_session;
+ }
+ goto run_session;
+ }
+ }
+
if (session->streams && !h2_iq_empty(session->streams)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, status,
ctx->p_conn->connection,
}
status = proxy_engine_run(ctx, r);
- if (!ctx->standalone && status == APR_SUCCESS) {
- apr_status_t s2;
- do {
- s2 = req_engine_pull(ctx->engine, APR_BLOCK_READ, &r);
- if (s2 == APR_SUCCESS) {
- s2 = proxy_engine_run(ctx, r);
- }
- } while (s2 != APR_EOF);
- }
cleanup:
if (!ctx->standalone && ctx->engine && req_engine_exit) {