#include <assert.h>
#include <apr_lib.h>
#include <apr_strings.h>
+#include <apr_thread_mutex.h>
+#include <apr_thread_cond.h>
#include <httpd.h>
#include <http_core.h>
"link_reverse_map %s --> %s", s, ctx.s);
return ctx.s;
}
+
+/*******************************************************************************
+ * FIFO queue
+ ******************************************************************************/
+
+struct h2_proxy_fifo {
+ void **elems;
+ int nelems;
+ int set;
+ int head;
+ int count;
+ int aborted;
+ apr_thread_mutex_t *lock;
+ apr_thread_cond_t *not_empty;
+ apr_thread_cond_t *not_full;
+};
+
+static int nth_index(h2_proxy_fifo *fifo, int n)
+{
+ return (fifo->head + n) % fifo->nelems;
+}
+
+static apr_status_t fifo_destroy(void *data)
+{
+ h2_proxy_fifo *fifo = data;
+
+ apr_thread_cond_destroy(fifo->not_empty);
+ apr_thread_cond_destroy(fifo->not_full);
+ apr_thread_mutex_destroy(fifo->lock);
+
+ return APR_SUCCESS;
+}
+
+static int index_of(h2_proxy_fifo *fifo, void *elem)
+{
+ int i;
+
+ for (i = 0; i < fifo->count; ++i) {
+ if (elem == fifo->elems[nth_index(fifo, i)]) {
+ return i;
+ }
+ }
+ return -1;
+}
+
+static apr_status_t create_int(h2_proxy_fifo **pfifo, apr_pool_t *pool,
+ int capacity, int as_set)
+{
+ apr_status_t rv;
+ h2_proxy_fifo *fifo;
+
+ fifo = apr_pcalloc(pool, sizeof(*fifo));
+ if (fifo == NULL) {
+ return APR_ENOMEM;
+ }
+
+ rv = apr_thread_mutex_create(&fifo->lock,
+ APR_THREAD_MUTEX_UNNESTED, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ rv = apr_thread_cond_create(&fifo->not_empty, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ rv = apr_thread_cond_create(&fifo->not_full, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ fifo->elems = apr_pcalloc(pool, capacity * sizeof(void*));
+ if (fifo->elems == NULL) {
+ return APR_ENOMEM;
+ }
+ fifo->nelems = capacity;
+ fifo->set = as_set;
+
+ *pfifo = fifo;
+ apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null);
+
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_proxy_fifo_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+ return create_int(pfifo, pool, capacity, 0);
+}
+
+apr_status_t h2_proxy_fifo_set_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+ return create_int(pfifo, pool, capacity, 1);
+}
+
+apr_status_t h2_proxy_fifo_term(h2_proxy_fifo *fifo)
+{
+ apr_status_t rv;
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ fifo->aborted = 1;
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_proxy_fifo_interrupt(h2_proxy_fifo *fifo)
+{
+ apr_status_t rv;
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ apr_thread_cond_broadcast(fifo->not_empty);
+ apr_thread_cond_broadcast(fifo->not_full);
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+int h2_proxy_fifo_count(h2_proxy_fifo *fifo)
+{
+ return fifo->count;
+}
+
+int h2_proxy_fifo_capacity(h2_proxy_fifo *fifo)
+{
+ return fifo->nelems;
+}
+
+static apr_status_t check_not_empty(h2_proxy_fifo *fifo, int block)
+{
+ if (fifo->count == 0) {
+ if (!block) {
+ return APR_EAGAIN;
+ }
+ while (fifo->count == 0) {
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+ apr_thread_cond_wait(fifo->not_empty, fifo->lock);
+ }
+ }
+ return APR_SUCCESS;
+}
+
+static apr_status_t fifo_push(h2_proxy_fifo *fifo, void *elem, int block)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ if (fifo->set && index_of(fifo, elem) >= 0) {
+ /* set mode, elem already member */
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EEXIST;
+ }
+ else if (fifo->count == fifo->nelems) {
+ if (block) {
+ while (fifo->count == fifo->nelems) {
+ if (fifo->aborted) {
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EOF;
+ }
+ apr_thread_cond_wait(fifo->not_full, fifo->lock);
+ }
+ }
+ else {
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EAGAIN;
+ }
+ }
+
+ ap_assert(fifo->count < fifo->nelems);
+ fifo->elems[nth_index(fifo, fifo->count)] = elem;
+ ++fifo->count;
+ if (fifo->count == 1) {
+ apr_thread_cond_broadcast(fifo->not_empty);
+ }
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_proxy_fifo_push(h2_proxy_fifo *fifo, void *elem)
+{
+ return fifo_push(fifo, elem, 1);
+}
+
+apr_status_t h2_proxy_fifo_try_push(h2_proxy_fifo *fifo, void *elem)
+{
+ return fifo_push(fifo, elem, 0);
+}
+
+static void *pull_head(h2_proxy_fifo *fifo)
+{
+ void *elem;
+
+ ap_assert(fifo->count > 0);
+ elem = fifo->elems[fifo->head];
+ --fifo->count;
+ if (fifo->count > 0) {
+ fifo->head = nth_index(fifo, 1);
+ if (fifo->count+1 == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ }
+ return elem;
+}
+
+static apr_status_t fifo_pull(h2_proxy_fifo *fifo, void **pelem, int block)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ if ((rv = check_not_empty(fifo, block)) != APR_SUCCESS) {
+ apr_thread_mutex_unlock(fifo->lock);
+ *pelem = NULL;
+ return rv;
+ }
+
+ ap_assert(fifo->count > 0);
+ *pelem = pull_head(fifo);
+
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_proxy_fifo_pull(h2_proxy_fifo *fifo, void **pelem)
+{
+ return fifo_pull(fifo, pelem, 1);
+}
+
+apr_status_t h2_proxy_fifo_try_pull(h2_proxy_fifo *fifo, void **pelem)
+{
+ return fifo_pull(fifo, pelem, 0);
+}
+
+apr_status_t h2_proxy_fifo_remove(h2_proxy_fifo *fifo, void *elem)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ int i, rc;
+ void *e;
+
+ rc = 0;
+ for (i = 0; i < fifo->count; ++i) {
+ e = fifo->elems[nth_index(fifo, i)];
+ if (e == elem) {
+ ++rc;
+ }
+ else if (rc) {
+ fifo->elems[nth_index(fifo, i-rc)] = e;
+ }
+ }
+ if (rc) {
+ fifo->count -= rc;
+ if (fifo->count + rc == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ rv = APR_SUCCESS;
+ }
+ else {
+ rv = APR_EAGAIN;
+ }
+
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
const char *proxy_server_uri,
const char *s);
+/*******************************************************************************
+ * FIFO queue
+ ******************************************************************************/
+
+/**
+ * A thread-safe FIFO queue with some extra bells and whistles, if you
+ * do not need anything special, better use 'apr_queue'.
+ */
+typedef struct h2_proxy_fifo h2_proxy_fifo;
+
+/**
+ * Create a FIFO queue that can hold up to capacity elements. Elements can
+ * appear several times.
+ */
+apr_status_t h2_proxy_fifo_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity);
+
+/**
+ * Create a FIFO set that can hold up to capacity elements. Elements only
+ * appear once. Pushing an element already present does not change the
+ * queue and is successful.
+ */
+apr_status_t h2_proxy_fifo_set_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity);
+
+apr_status_t h2_proxy_fifo_term(h2_proxy_fifo *fifo);
+apr_status_t h2_proxy_fifo_interrupt(h2_proxy_fifo *fifo);
+
+int h2_proxy_fifo_capacity(h2_proxy_fifo *fifo);
+int h2_proxy_fifo_count(h2_proxy_fifo *fifo);
+
+/**
+ * Push en element into the queue. Blocks if there is no capacity left.
+ *
+ * @param fifo the FIFO queue
+ * @param elem the element to push
+ * @return APR_SUCCESS on push, APR_EAGAIN on try_push on a full queue,
+ * APR_EEXIST when in set mode and elem already there.
+ */
+apr_status_t h2_proxy_fifo_push(h2_proxy_fifo *fifo, void *elem);
+apr_status_t h2_proxy_fifo_try_push(h2_proxy_fifo *fifo, void *elem);
+
+apr_status_t h2_proxy_fifo_pull(h2_proxy_fifo *fifo, void **pelem);
+apr_status_t h2_proxy_fifo_try_pull(h2_proxy_fifo *fifo, void **pelem);
+
+/**
+ * Remove the elem from the queue, will remove multiple appearances.
+ * @param elem the element to remove
+ * @return APR_SUCCESS iff > 0 elems were removed, APR_EAGAIN otherwise.
+ */
+apr_status_t h2_proxy_fifo_remove(h2_proxy_fifo *fifo, void *elem);
+
+
#endif /* defined(__mod_h2__h2_proxy_util__) */
#include "h2_version.h"
#include "h2_proxy_session.h"
+#define H2MIN(x,y) ((x) < (y) ? (x) : (y))
+
static void register_hook(apr_pool_t *p);
AP_DECLARE_MODULE(proxy_http2) = {
const char *engine_type;
apr_pool_t *engine_pool;
apr_size_t req_buffer_size;
- request_rec *next;
+ h2_proxy_fifo *requests;
int capacity;
unsigned standalone : 1;
ctx->engine_type = type;
ctx->engine_pool = pool;
ctx->req_buffer_size = req_buffer_size;
- ctx->capacity = 100;
+ ctx->capacity = H2MIN(100, h2_proxy_fifo_capacity(ctx->requests));
*pconsumed = out_consumed;
*pctx = ctx;
return status;
}
-static void request_done(h2_proxy_session *session, request_rec *r,
+static void request_done(h2_proxy_ctx *ctx, request_rec *r,
apr_status_t status, int touched)
{
- h2_proxy_ctx *ctx = session->user_data;
const char *task_id = apr_table_get(r->connection->notes, H2_TASK_ID_NOTE);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection,
if (status != APR_SUCCESS) {
if (!touched) {
/* untouched request, need rescheduling */
- if (req_engine_push && is_h2 && is_h2(ctx->owner)) {
- if (req_engine_push(ctx->engine_type, r, NULL) == APR_SUCCESS) {
- /* push to engine */
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
- APLOGNO(03369)
- "h2_proxy_session(%s): rescheduled request %s",
- ctx->engine_id, task_id);
- return;
- }
- }
- else if (!ctx->next) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection,
- "h2_proxy_session(%s): retry untouched request",
- ctx->engine_id);
- ctx->next = r;
- }
+ status = h2_proxy_fifo_push(ctx->requests, r);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection,
+ APLOGNO(03369)
+ "h2_proxy_session(%s): rescheduled request %s",
+ ctx->engine_id, task_id);
+ return;
}
else {
const char *uri;
uri = apr_uri_unparse(r->pool, &r->parsed_uri, 0);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection,
APLOGNO(03471) "h2_proxy_session(%s): request %s -> %s "
- "not complete, was touched",
+ "not complete, cannot repeat",
ctx->engine_id, task_id, uri);
}
}
if (r == ctx->rbase) {
- ctx->r_status = (status == APR_SUCCESS)? APR_SUCCESS : HTTP_SERVICE_UNAVAILABLE;
+ ctx->r_status = ((status == APR_SUCCESS)? APR_SUCCESS
+ : HTTP_SERVICE_UNAVAILABLE);
}
if (req_engine_done && ctx->engine) {
}
}
+static void session_req_done(h2_proxy_session *session, request_rec *r,
+ apr_status_t status, int touched)
+{
+ request_done(session->user_data, r, status, touched);
+}
+
static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave)
{
- if (ctx->next) {
+ if (h2_proxy_fifo_count(ctx->requests) > 0) {
return APR_SUCCESS;
}
else if (req_engine_pull && ctx->engine) {
apr_status_t status;
+ request_rec *r = NULL;
+
status = req_engine_pull(ctx->engine, before_leave?
APR_BLOCK_READ: APR_NONBLOCK_READ,
- ctx->capacity, &ctx->next);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, ctx->owner,
- "h2_proxy_engine(%s): pulled request (%s) %s",
- ctx->engine_id,
- before_leave? "before leave" : "regular",
- (ctx->next? ctx->next->the_request : "NULL"));
+ ctx->capacity, &r);
+ if (status == APR_SUCCESS && r) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, ctx->owner,
+ "h2_proxy_engine(%s): pulled request (%s) %s",
+ ctx->engine_id,
+ before_leave? "before leave" : "regular",
+ r->the_request);
+ h2_proxy_fifo_push(ctx->requests, r);
+ }
return APR_STATUS_IS_EAGAIN(status)? APR_SUCCESS : status;
}
return APR_EOF;
static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
apr_status_t status = OK;
int h2_front;
+ request_rec *r;
/* Step Four: Send the Request in a new HTTP/2 stream and
* loop until we got the response or encounter errors.
ctx->session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf,
h2_front, 30,
h2_proxy_log2((int)ctx->req_buffer_size),
- request_done);
+ session_req_done);
if (!ctx->session) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner,
APLOGNO(03372) "session unavailable");
"eng(%s): run session %s", ctx->engine_id, ctx->session->id);
ctx->session->user_data = ctx;
- while (1) {
- if (ctx->next) {
- add_request(ctx->session, ctx->next);
- ctx->next = NULL;
+ while (!ctx->owner->aborted) {
+ if (APR_SUCCESS == h2_proxy_fifo_try_pull(ctx->requests, (void**)&r)) {
+ add_request(ctx->session, r);
}
status = h2_proxy_session_process(ctx->session);
/* ongoing processing, call again */
if (ctx->session->remote_max_concurrent > 0
&& ctx->session->remote_max_concurrent != ctx->capacity) {
- ctx->capacity = (int)ctx->session->remote_max_concurrent;
+ ctx->capacity = H2MIN((int)ctx->session->remote_max_concurrent,
+ h2_proxy_fifo_capacity(ctx->requests));
}
s2 = next_request(ctx, 0);
if (s2 == APR_ECONNABORTED) {
status = ctx->r_status = APR_SUCCESS;
break;
}
- if (!ctx->next && h2_proxy_ihash_empty(ctx->session->streams)) {
+ if ((h2_proxy_fifo_count(ctx->requests) == 0)
+ && h2_proxy_ihash_empty(ctx->session->streams)) {
break;
}
}
* a) be reopened on the new session iff safe to do so
* b) reported as done (failed) otherwise
*/
- h2_proxy_session_cleanup(ctx->session, request_done);
+ h2_proxy_session_cleanup(ctx->session, session_req_done);
break;
}
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"H2: hosting engine %s", ctx->engine_id);
}
- return APR_SUCCESS;
+
+ return h2_proxy_fifo_push(ctx->requests, r);
}
static int proxy_http2_handler(request_rec *r,
apr_status_t status;
h2_proxy_ctx *ctx;
apr_uri_t uri;
- int reconnected = 0;
+ int reconnects = 0;
/* find the scheme */
if ((url[0] != 'h' && url[0] != 'H') || url[1] != '2') {
ctx->conf = conf;
ctx->flushall = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0;
ctx->r_status = HTTP_SERVICE_UNAVAILABLE;
- ctx->next = r;
- r = NULL;
+
+ h2_proxy_fifo_set_create(&ctx->requests, ctx->pool, 100);
ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, ctx);
/* If we are not already hosting an engine, try to push the request
* to an already existing engine or host a new engine here. */
- if (!ctx->engine) {
- ctx->r_status = push_request_somewhere(ctx, ctx->next);
+ if (r && !ctx->engine) {
+ ctx->r_status = push_request_somewhere(ctx, r);
+ r = NULL;
if (ctx->r_status == SUSPENDED) {
/* request was pushed to another thread, leave processing here */
- ctx->next = NULL;
goto cleanup;
}
}
}
reconnect:
- if (!reconnected && next_request(ctx, 1) == APR_SUCCESS) {
+ if (next_request(ctx, 1) == APR_SUCCESS) {
/* Still more to do, tear down old conn and start over */
if (ctx->p_conn) {
ctx->p_conn->close = 1;
ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
ctx->p_conn = NULL;
}
- reconnected = 1; /* we do this only once, then fail */
- goto run_connect;
+ ++reconnects;
+ if (reconnects < 5 && !ctx->owner->aborted) {
+ goto run_connect;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(10023)
+ "giving up after %d reconnects, %d requests todo",
+ reconnects, h2_proxy_fifo_count(ctx->requests));
}
cleanup:
ctx->p_conn = NULL;
}
+ /* Any requests will still have need to fail */
+ while (APR_SUCCESS == h2_proxy_fifo_try_pull(ctx->requests, (void**)&r)) {
+ request_done(ctx, r, HTTP_SERVICE_UNAVAILABLE, true);
+ }
+
ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, NULL);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner,
APLOGNO(03377) "leaving handler");