#include <stddef.h>
#include <stdlib.h>
-#include <apr_queue.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
#include <apr_strings.h>
/* iterate until all ios have been orphaned or destroyed */
}
- /* Any remaining ios have handed out requests to workers that are
- * not done yet. Any operation they do on their assigned stream ios will
- * be errored ECONNRESET/ABORTED, so they should find out pretty soon.
+ /* If we still have busy workers, we cannot release our memory
+ * pool yet, as slave connections have child pools of their respective
+ * h2_io's.
+ * Any remaining ios are processed in these workers. Any operation
+ * they do on their input/outputs will be errored ECONNRESET/ABORTED,
+ * so processing them should fail and workers *should* return.
*/
- for (i = 0; h2_io_set_size(m->stream_ios) > 0; ++i) {
+ for (i = 0; m->workers_busy > 0; ++i) {
m->join_wait = wait;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): release_join, waiting on %d worker to report back",
*/
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
"h2_mplx(%ld): release, waiting for %d seconds now for "
- "all h2_workers to return, have still %d requests outstanding",
- m->id, i*wait_secs, (int)h2_io_set_size(m->stream_ios));
+ "%d h2_workers to return, have still %d requests outstanding",
+ m->id, i*wait_secs, m->workers_busy,
+ (int)h2_io_set_size(m->stream_ios));
if (i == 1) {
h2_io_set_iter(m->stream_ios, stream_print, m);
}
/* TODO: this will keep a worker attached to this h2_mplx as
* long as it has requests to handle. Might no be fair to
* other mplx's. Perhaps leave after n requests? */
-
- if (task->c) {
- apr_pool_destroy(task->c->pool);
- }
task = NULL;
if (io) {
io->processing_done = 1;
/*******************************************************************************
* HTTP/2 request engines
******************************************************************************/
-
+
+typedef struct h2_req_entry h2_req_entry;
+struct h2_req_entry {
+ APR_RING_ENTRY(h2_req_entry) link;
+ request_rec *r;
+};
+
+#define H2_REQ_ENTRY_NEXT(e) APR_RING_NEXT((e), link)
+#define H2_REQ_ENTRY_PREV(e) APR_RING_PREV((e), link)
+#define H2_REQ_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link)
+
typedef struct h2_req_engine_i h2_req_engine_i;
struct h2_req_engine_i {
h2_req_engine pub;
h2_mplx *m;
unsigned int shutdown : 1; /* engine is being shut down */
apr_thread_cond_t *io; /* condition var for waiting on data */
- apr_queue_t *queue; /* queue of scheduled request_rec* */
+ APR_RING_HEAD(h2_req_entries, h2_req_entry) entries;
apr_size_t no_assigned; /* # of assigned requests */
apr_size_t no_live; /* # of live */
apr_size_t no_finished; /* # of finished */
};
+#define H2_REQ_ENTRIES_SENTINEL(b) APR_RING_SENTINEL((b), h2_req_entry, link)
+#define H2_REQ_ENTRIES_EMPTY(b) APR_RING_EMPTY((b), h2_req_entry, link)
+#define H2_REQ_ENTRIES_FIRST(b) APR_RING_FIRST(b)
+#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b)
+
+#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do { \
+h2_req_entry *ap__b = (e); \
+APR_RING_INSERT_HEAD((b), ap__b, h2_req_entry, link); \
+} while (0)
+
+#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do { \
+h2_req_entry *ap__b = (e); \
+APR_RING_INSERT_TAIL((b), ap__b, h2_req_entry, link); \
+} while (0)
+
static apr_status_t h2_mplx_engine_schedule(h2_mplx *m,
h2_req_engine_i *engine,
request_rec *r)
{
- if (!engine->queue) {
- apr_queue_create(&engine->queue, 100, engine->pub.pool);
- }
- return apr_queue_trypush(engine->queue, r);
+ h2_req_entry *entry = apr_pcalloc(r->pool, sizeof(*entry));
+
+ APR_RING_ELEM_INIT(entry, link);
+ entry->r = r;
+ H2_REQ_ENTRIES_INSERT_TAIL(&engine->entries, entry);
+ return APR_SUCCESS;
}
engine->pub.pool = task->c->pool;
engine->pub.type = apr_pstrdup(task->c->pool, engine_type);
engine->c = r->connection;
+ APR_RING_INIT(&engine->entries, h2_req_entry, link);
engine->m = m;
engine->io = task->io;
engine->no_assigned = 1;
return status;
}
-static request_rec *get_non_frozen(apr_queue_t *equeue)
+static h2_req_entry *pop_non_frozen(h2_req_engine_i *engine)
{
- request_rec *r, *first = NULL;
+ h2_req_entry *entry;
h2_task *task;
- void *elem;
-
- if (equeue) {
- /* FIFO queue, try to find a request_rec whose task is not frozen */
- while (apr_queue_trypop(equeue, &elem) == APR_SUCCESS) {
- r = elem;
- task = h2_ctx_rget_task(r);
- AP_DEBUG_ASSERT(task);
- if (!task->frozen) {
- return r;
- }
- apr_queue_push(equeue, r);
- if (!first) {
- first = r;
- }
- else if (r == first) {
- return NULL; /* walked the whole queue */
- }
+
+ for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries);
+ entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries);
+ entry = H2_REQ_ENTRY_NEXT(entry)) {
+ task = h2_ctx_rget_task(entry->r);
+ AP_DEBUG_ASSERT(task);
+ if (!task->frozen) {
+ H2_REQ_ENTRY_REMOVE(entry);
+ return entry;
}
}
return NULL;
static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine,
apr_read_type_e block, request_rec **pr)
{
- request_rec *r;
+ h2_req_entry *entry;
AP_DEBUG_ASSERT(m);
AP_DEBUG_ASSERT(engine);
return APR_EOF;
}
- if (engine->queue && (r = get_non_frozen(engine->queue))) {
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+ if (!H2_REQ_ENTRIES_EMPTY(&engine->entries)
+ && (entry = pop_non_frozen(engine))) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r,
"h2_mplx(%ld): request %s pulled by engine %s",
- m->c->id, r->the_request, engine->pub.id);
+ m->c->id, entry->r->the_request, engine->pub.id);
engine->no_live++;
- r->connection->current_thread = engine->c->current_thread;
- *pr = r;
+ entry->r->connection->current_thread = engine->c->current_thread;
+ *pr = entry->r;
return APR_SUCCESS;
}
else if (APR_NONBLOCK_READ == block) {
*pr = NULL;
return APR_EAGAIN;
}
- else if (!engine->queue || !apr_queue_size(engine->queue)) {
+ else if (H2_REQ_ENTRIES_EMPTY(&engine->entries)) {
engine->shutdown = 1;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): emtpy queue, shutdown engine %s",
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- if (engine->queue && apr_queue_size(engine->queue)) {
- void *entry;
+ if (!m->aborted
+ && !H2_REQ_ENTRIES_EMPTY(&engine->entries)) {
+ h2_req_entry *entry;
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
"h2_mplx(%ld): exit engine %s (%s), "
- "has still %d requests queued, shutdown=%d,"
+ "has still requests queued, shutdown=%d,"
"assigned=%ld, live=%ld, finished=%ld",
m->c->id, engine->pub.id, engine->pub.type,
- (int)apr_queue_size(engine->queue),
engine->shutdown,
(long)engine->no_assigned, (long)engine->no_live,
(long)engine->no_finished);
- while (apr_queue_trypop(engine->queue, &entry) == APR_SUCCESS) {
- request_rec *r = entry;
+ for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries);
+ entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries);
+ entry = H2_REQ_ENTRY_NEXT(entry)) {
+ request_rec *r = entry->r;
h2_task *task = h2_ctx_rget_task(r);
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
"h2_mplx(%ld): engine %s has queued task %s, "
engine_done(m, engine, task, 0, 1);
}
}
- if (engine->no_assigned > 1 || engine->no_live > 1) {
+ if (!m->aborted && (engine->no_assigned > 1 || engine->no_live > 1)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
"h2_mplx(%ld): exit engine %s (%s), "
"assigned=%ld, live=%ld, finished=%ld",
} h2_proxy_stream;
-static int ngstatus_from_apr_status(apr_status_t rv)
-{
- if (rv == APR_SUCCESS) {
- return NGHTTP2_NO_ERROR;
- }
- else if (APR_STATUS_IS_EAGAIN(rv)) {
- return NGHTTP2_ERR_WOULDBLOCK;
- }
- else if (APR_STATUS_IS_EOF(rv)) {
- return NGHTTP2_ERR_EOF;
- }
- return NGHTTP2_ERR_PROTO;
-}
-
static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
int arg, const char *msg);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"proxy_session(%s): pool cleanup, state=%d, streams=%d",
session->id, session->state,
- h2_iq_size(session->streams));
+ (int)h2_ihash_count(session->streams));
dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
nghttp2_session_del(session->ngh2);
session->ngh2 = NULL;
char buffer[256];
h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
- ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO()
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
"h2_session(%s): recv FRAME[%s]",
session->id, buffer);
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO()
"h2_session(%s-%d): passing output",
session->id, stream->id);
- return NGHTTP2_ERR_CALLBACK_FAILURE;
+ nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
+ stream_id, NGHTTP2_STREAM_CLOSED);
+ return NGHTTP2_ERR_STREAM_CLOSING;
}
return 0;
}
h2_iq_add(stream->session->suspended, stream->id, NULL, NULL);
return NGHTTP2_ERR_DEFERRED;
}
- return ngstatus_from_apr_status(status);
+ else {
+ nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
+ stream_id, NGHTTP2_STREAM_CLOSED);
+ return NGHTTP2_ERR_STREAM_CLOSING;
+ }
}
h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
session->state = H2_PROXYS_ST_INIT;
session->window_bits_default = 30;
session->window_bits_connection = 30;
- session->streams = h2_iq_create(pool, 25);
+ session->streams = h2_ihash_create(pool, offsetof(h2_proxy_stream, id));
session->suspended = h2_iq_create(pool, 5);
session->done = done;
if (rv > 0) {
stream->id = rv;
stream->state = H2_STREAM_ST_OPEN;
- h2_iq_add(session->streams, stream->id, NULL, NULL);
+ h2_ihash_add(session->streams, stream);
dispatch_event(session, H2_PROXYS_EV_STREAM_SUBMITTED, rv, NULL);
return APR_SUCCESS;
return APR_EGENERAL;
}
-static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block)
+static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block,
+ apr_interval_time_t timeout)
{
apr_status_t status;
+ apr_socket_t *socket = NULL;
+ apr_time_t save_timeout = -1;
+
+ if (block) {
+ socket = ap_get_conn_socket(session->c);
+ if (socket) {
+ apr_socket_timeout_get(socket, &save_timeout);
+ apr_socket_timeout_set(socket, timeout);
+ }
+ else {
+ /* cannot block on timeout */
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c,
+ "h2_session(%s): unable to get conn socket",
+ session->id);
+ return APR_ENOTIMPL;
+ }
+ }
+
status = ap_get_brigade(session->c->input_filters, session->input,
AP_MODE_READBYTES,
block? APR_BLOCK_READ : APR_NONBLOCK_READ,
64 * 1024);
+ if (socket && save_timeout != -1) {
+ apr_socket_timeout_set(socket, save_timeout);
+ }
+
if (status == APR_SUCCESS) {
if (APR_BRIGADE_EMPTY(session->input)) {
status = APR_EAGAIN;
feed_brigade(session, session->input);
}
}
+ else if (APR_STATUS_IS_TIMEUP(status)) {
+ /* nop */
+ }
else if (!APR_STATUS_IS_EAGAIN(status)) {
dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
}
{
switch (session->state) {
case H2_PROXYS_ST_INIT:
- if (h2_iq_empty(session->streams)) {
+ if (h2_ihash_is_empty(session->streams)) {
transit(session, "init", H2_PROXYS_ST_IDLE);
}
else {
* CPU cycles. Ideally, we'd like to do a blocking read, but that
* is not possible if we have scheduled tasks and wait
* for them to produce something. */
- if (h2_iq_empty(session->streams)) {
+ if (h2_ihash_is_empty(session->streams)) {
if (!is_accepting_streams(session)) {
/* We are no longer accepting new streams and have
* finished processing existing ones. Time to leave. */
* task processing in other threads. Do a busy wait with
* backoff timer. */
transit(session, "no io", H2_PROXYS_ST_WAIT);
+ session->wait_timeout = 25;
}
break;
default:
stream->data_received = 1;
}
stream->state = H2_STREAM_ST_CLOSED;
- h2_iq_remove(session->streams, stream_id);
+ h2_ihash_remove(session->streams, stream_id);
h2_iq_remove(session->suspended, stream_id);
if (session->done) {
session->done(session, stream->r);
}
if (nghttp2_session_want_read(session->ngh2)) {
- if (h2_proxy_session_read(session, 0) == APR_SUCCESS) {
+ status = h2_proxy_session_read(session, 0, 0);
+ if (status == APR_SUCCESS) {
have_read = 1;
}
}
if (check_suspended(session) == APR_EAGAIN) {
/* no stream has become resumed. Do a blocking read with
* ever increasing timeouts... */
- if (h2_proxy_session_read(session, 0) == APR_SUCCESS) {
+ status = h2_proxy_session_read(session, 0, session->wait_timeout);
+ if (status == APR_SUCCESS) {
dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
}
+ else if (APR_STATUS_IS_TIMEUP(status)) {
+ session->wait_timeout = H2MIN(apr_time_from_msec(100),
+ 2*session->wait_timeout);
+ }
}
break;
return APR_EAGAIN;
}
+typedef struct {
+ h2_proxy_session *session;
+ h2_proxy_request_done *done;
+} cleanup_iter_ctx;
+
+static int cleanup_iter(void *udata, void *val)
+{
+ cleanup_iter_ctx *ctx = udata;
+ h2_proxy_stream *stream = val;
+ ctx->done(ctx->session, stream->r);
+ return 1;
+}
+
+void h2_proxy_session_cleanup(h2_proxy_session *session,
+ h2_proxy_request_done *done)
+{
+ cleanup_iter_ctx ctx;
+ ctx.session = session;
+ ctx.done = done;
+ h2_ihash_iter(session->streams, cleanup_iter, &ctx);
+ h2_ihash_clear(session->streams);
+}
+
#include <nghttp2/nghttp2.h>
struct h2_int_queue;
+struct h2_ihash_t;
typedef enum {
H2_PROXYS_ST_INIT, /* send initial SETTINGS, etc. */
int window_bits_connection;
h2_proxys_state state;
+ apr_interval_time_t wait_timeout;
- struct h2_int_queue *streams;
+ struct h2_ihash_t *streams;
struct h2_int_queue *suspended;
apr_size_t remote_max_concurrent;
int max_stream_recv;
apr_status_t h2_proxy_session_process(h2_proxy_session *s);
+void h2_proxy_session_cleanup(h2_proxy_session *s, h2_proxy_request_done *done);
#define H2_PROXY_REQ_URL_NOTE "h2-proxy-req-url"
apr_hash_set(ih->hash, &id, sizeof(id), NULL);
}
+void h2_ihash_clear(h2_ihash_t *ih)
+{
+ apr_hash_clear(ih->hash);
+}
+
/*******************************************************************************
* h2_util for apt_table_t
******************************************************************************/
void h2_ihash_add(h2_ihash_t *ih, void *val);
void h2_ihash_remove(h2_ihash_t *ih, int id);
+void h2_ihash_clear(h2_ihash_t *ih);
/*******************************************************************************
* common helpers
}
static request_rec *next_request(h2_proxy_ctx *ctx, h2_proxy_session *session,
- request_rec *r)
+ request_rec *r, int before_leave)
{
if (!r && !ctx->standalone) {
ctx->engine->capacity = session->remote_max_concurrent;
- if (req_engine_pull(ctx->engine, APR_NONBLOCK_READ, &r) == APR_SUCCESS) {
+ if (req_engine_pull(ctx->engine,
+ before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ,
+ &r) == APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_proxy_session(%s): pulled request %s",
session->id, r->the_request);
session->user_data = ctx;
status = h2_proxy_session_process(session);
while (APR_STATUS_IS_EAGAIN(status)) {
- r = next_request(ctx, session, r);
+ r = next_request(ctx, session, r, 0);
if (r) {
add_request(session, r);
r = NULL;
ctx->p_conn->close = 1;
}
- r = next_request(ctx, session, r);
+ r = next_request(ctx, session, r, 1);
if (r) {
if (ctx->p_conn->close) {
+ /* the connection is/willbe closed, the session is terminated.
+ * Any open stream of that session needs to
+ * a) be reopened on the new session iff safe to do so
+ * b) reported as done (failed) otherwise
+ */
+ h2_proxy_session_cleanup(session, request_done);
goto setup_backend;
}
add_request(session, r);
goto run_session;
}
- if (session->streams && !h2_iq_empty(session->streams)) {
+ if (session->streams && !h2_ihash_is_empty(session->streams)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, status,
ctx->p_conn->connection,
"session run done with %d streams unfinished",
- h2_iq_size(session->streams));
+ (int)h2_ihash_count(session->streams));
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status,
ctx->p_conn->connection, "eng(%s): session run done",