-*- coding: utf-8 -*-
Changes with Apache 2.5.0
+ *) mod_http2: elimination of fixed master connectin buffer for TLS
+ connections. New scratch bucket handling optimized for TLS write sizes.
+ File bucket data read directly into scratch buffers, avoiding one
+ copy. Non-TLS connections continue to pass buckets unchanged to the core
+ filters to allow sendfile() usage.
+
*) mod_http2/mod_proxy_http2: h2_request.c is no longer shared between these
modules. This simplifies building on platforms such as Windows, as module
reference used in logging is now clear.
}
apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
- const h2_config *cfg,
- apr_pool_t *pool)
+ const h2_config *cfg)
{
io->c = c;
io->output = apr_brigade_create(c->pool, c->bucket_alloc);
- io->buflen = 0;
io->is_tls = h2_h2_is_tls(c);
io->buffer_output = io->is_tls;
- if (io->buffer_output) {
- io->bufsize = WRITE_BUFFER_SIZE;
- io->buffer = apr_pcalloc(pool, io->bufsize);
- }
- else {
- io->bufsize = 0;
- }
-
if (io->is_tls) {
/* This is what we start with,
* see https://issues.apache.org/jira/browse/TS-2503
io->warmup_size = h2_config_geti64(cfg, H2_CONF_TLS_WARMUP_SIZE);
io->cooldown_usecs = (h2_config_geti(cfg, H2_CONF_TLS_COOLDOWN_SECS)
* APR_USEC_PER_SEC);
- io->write_size = WRITE_SIZE_INITIAL;
+ io->write_size = (io->cooldown_usecs > 0?
+ WRITE_SIZE_INITIAL : WRITE_SIZE_MAX);
}
else {
io->warmup_size = 0;
io->cooldown_usecs = 0;
- io->write_size = io->bufsize;
+ io->write_size = 0;
}
if (APLOGctrace1(c)) {
return APR_SUCCESS;
}
+#define LOG_SCRATCH 0
+
+static void append_scratch(h2_conn_io *io)
+{
+ if (io->scratch && io->slen > 0) {
+ apr_bucket *b = apr_bucket_heap_create(io->scratch, io->slen,
+ apr_bucket_free,
+ io->c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(io->output, b);
+#if LOG_SCRATCH
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+ "h2_conn_io(%ld): append_scratch(%ld)",
+ io->c->id, (long)io->slen);
+#endif
+ io->scratch = NULL;
+ io->slen = io->ssize = 0;
+ }
+}
+
+static apr_size_t assure_scratch_space(h2_conn_io *io) {
+ apr_size_t remain = io->ssize - io->slen;
+ if (io->scratch && remain == 0) {
+ append_scratch(io);
+ }
+ if (!io->scratch) {
+ /* we control the size and it is larger than what buckets usually
+ * allocate. */
+ io->scratch = apr_bucket_alloc(io->write_size, io->c->bucket_alloc);
+ io->ssize = io->write_size;
+ io->slen = 0;
+ remain = io->ssize;
+ }
+ return remain;
+}
+
+static apr_status_t read_to_scratch(h2_conn_io *io, apr_bucket *b)
+{
+ apr_status_t status;
+ const char *data;
+ apr_size_t len;
+
+ if (!b->length) {
+ return APR_SUCCESS;
+ }
+
+ AP_DEBUG_ASSERT(b->length <= (io->ssize - io->slen));
+ if (APR_BUCKET_IS_FILE(b)) {
+ apr_bucket_file *f = (apr_bucket_file *)b->data;
+ apr_file_t *fd = f->fd;
+ apr_off_t offset = b->start;
+ apr_size_t len = b->length;
+
+ /* file buckets will either mmap (which we do not want) or
+ * read 8000 byte chunks and split themself. However, we do
+ * know *exactly* how many bytes we need where.
+ */
+ status = apr_file_seek(fd, APR_SET, &offset);
+ if (status != APR_SUCCESS) {
+ return status;
+ }
+ status = apr_file_read(fd, io->scratch + io->slen, &len);
+#if LOG_SCRATCH
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, status, io->c,
+ "h2_conn_io(%ld): FILE_to_scratch(%ld)",
+ io->c->id, (long)len);
+#endif
+ if (status != APR_SUCCESS && status != APR_EOF) {
+ return status;
+ }
+ io->slen += len;
+ }
+ else {
+ status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
+ if (status == APR_SUCCESS) {
+#if LOG_SCRATCH
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+ "h2_conn_io(%ld): read_to_scratch(%ld)",
+ io->c->id, (long)b->length);
+#endif
+ memcpy(io->scratch+io->slen, data, len);
+ io->slen += len;
+ }
+ }
+ return status;
+}
+
int h2_conn_io_is_buffered(h2_conn_io *io)
{
- return io->bufsize > 0;
+ return io->buffer_output;
}
typedef struct {
return status;
}
-/* Bring the current buffer content into the output brigade, appropriately
- * chunked.
- */
-static apr_status_t bucketeer_buffer(h2_conn_io *io)
+static void check_write_size(h2_conn_io *io)
{
- const char *data = io->buffer;
- apr_size_t remaining = io->buflen;
- apr_bucket *b;
- int bcount, i;
-
if (io->write_size > WRITE_SIZE_INITIAL
&& (io->cooldown_usecs > 0)
&& (apr_time_now() - io->last_write) >= io->cooldown_usecs) {
"h2_conn_io(%ld): threshold reached, write size now %ld",
(long)io->c->id, (long)io->write_size);
}
-
- bcount = (int)(remaining / io->write_size);
- for (i = 0; i < bcount; ++i) {
- b = apr_bucket_transient_create(data, io->write_size,
- io->output->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(io->output, b);
- data += io->write_size;
- remaining -= io->write_size;
- }
-
- if (remaining > 0) {
- b = apr_bucket_transient_create(data, remaining,
- io->output->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(io->output, b);
- }
- return APR_SUCCESS;
-}
-
-apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush)
-{
- APR_BRIGADE_INSERT_TAIL(io->output, b);
- if (flush) {
- b = apr_bucket_flush_create(io->c->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(io->output, b);
- }
- return APR_SUCCESS;
}
static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
pass_out_ctx ctx;
apr_bucket *b;
- if (io->buflen == 0 && APR_BRIGADE_EMPTY(io->output)) {
+ append_scratch(io);
+ if (APR_BRIGADE_EMPTY(io->output)) {
return APR_SUCCESS;
}
-
- if (io->buflen > 0) {
- /* something in the buffer, put it in the output brigade */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
- "h2_conn_io: flush, flushing %ld bytes",
- (long)io->buflen);
- bucketeer_buffer(io);
- }
if (flush) {
b = apr_bucket_flush_create(io->c->bucket_alloc);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush");
- io->buflen = 0;
ctx.c = io->c;
ctx.io = eoc? NULL : io;
if (!APR_BRIGADE_EMPTY(io->output)) {
len = h2_brigade_mem_size(io->output);
- }
- len += io->buflen;
- if (len >= WRITE_BUFFER_SIZE) {
- return h2_conn_io_flush_int(io, 1, 0);
+ if (len >= WRITE_BUFFER_SIZE) {
+ return h2_conn_io_flush_int(io, 1, 0);
+ }
}
return APR_SUCCESS;
}
return h2_conn_io_flush_int(io, 1, 1);
}
-apr_status_t h2_conn_io_write(h2_conn_io *io,
- const char *buf, size_t length)
+apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length)
{
apr_status_t status = APR_SUCCESS;
- pass_out_ctx ctx;
+ apr_size_t remain;
- ctx.c = io->c;
- ctx.io = io;
- if (io->bufsize > 0) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
- "h2_conn_io: buffering %ld bytes", (long)length);
-
- if (!APR_BRIGADE_EMPTY(io->output)) {
- status = h2_conn_io_flush_int(io, 0, 0);
+ if (io->buffer_output) {
+ while (length > 0) {
+ remain = assure_scratch_space(io);
+ if (remain >= length) {
+#if LOG_SCRATCH
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+ "h2_conn_io(%ld): write_to_scratch(%ld)",
+ io->c->id, (long)length);
+#endif
+ memcpy(io->scratch + io->slen, data, length);
+ io->slen += length;
+ length = 0;
+ }
+ else {
+#if LOG_SCRATCH
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+ "h2_conn_io(%ld): write_to_scratch(%ld)",
+ io->c->id, (long)remain);
+#endif
+ memcpy(io->scratch + io->slen, data, remain);
+ io->slen += remain;
+ data += remain;
+ length -= remain;
+ }
}
+ }
+ else {
+ status = apr_brigade_write(io->output, NULL, NULL, data, length);
+ }
+ return status;
+}
+
+apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb)
+{
+ apr_bucket *b;
+ apr_status_t status = APR_SUCCESS;
+
+ check_write_size(io);
+ while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
+ b = APR_BRIGADE_FIRST(bb);
- while (length > 0 && (status == APR_SUCCESS)) {
- apr_size_t avail = io->bufsize - io->buflen;
- if (avail <= 0) {
+ if (APR_BUCKET_IS_METADATA(b)) {
+ /* need to finish any open scratch bucket, as meta data
+ * needs to be forward "in order". */
+ append_scratch(io);
+ APR_BUCKET_REMOVE(b);
+ APR_BRIGADE_INSERT_TAIL(io->output, b);
+
+ if (APR_BUCKET_IS_FLUSH(b)) {
status = h2_conn_io_flush_int(io, 0, 0);
}
- else if (length > avail) {
- memcpy(io->buffer + io->buflen, buf, avail);
- io->buflen += avail;
- length -= avail;
- buf += avail;
+ }
+ else if (io->buffer_output) {
+ apr_size_t remain = assure_scratch_space(io);
+ if (b->length > remain) {
+ apr_bucket_split(b, remain);
+ if (io->slen == 0) {
+ /* complete write_size bucket, append unchanged */
+ APR_BUCKET_REMOVE(b);
+ APR_BRIGADE_INSERT_TAIL(io->output, b);
+#if LOG_SCRATCH
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+ "h2_conn_io(%ld): pass bucket(%ld)",
+ io->c->id, (long)b->length);
+#endif
+ continue;
+ }
}
else {
- memcpy(io->buffer + io->buflen, buf, length);
- io->buflen += length;
- length = 0;
- break;
+ /* bucket fits in remain, copy to scratch */
+ read_to_scratch(io, b);
+ apr_bucket_delete(b);
+ continue;
}
}
-
+ else {
+ /* no buffering, forward buckets setaside on flush */
+ if (APR_BUCKET_IS_TRANSIENT(b)) {
+ apr_bucket_setaside(b, io->c->pool);
+ }
+ APR_BUCKET_REMOVE(b);
+ APR_BRIGADE_INSERT_TAIL(io->output, b);
+ }
}
- else {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE4, status, io->c,
- "h2_conn_io: writing %ld bytes to brigade", (long)length);
- status = apr_brigade_write(io->output, pass_out, &ctx, buf, length);
+ if (status == APR_SUCCESS) {
+ return h2_conn_io_consider_pass(io);
}
-
return status;
}
apr_int64_t bytes_written;
int buffer_output;
- char *buffer;
- apr_size_t buflen;
- apr_size_t bufsize;
+ char *scratch;
+ apr_size_t ssize;
+ apr_size_t slen;
} h2_conn_io;
apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
- const struct h2_config *cfg,
- apr_pool_t *pool);
+ const struct h2_config *cfg);
int h2_conn_io_is_buffered(h2_conn_io *io);
const char *buf,
size_t length);
-/**
- * Append a bucket to the buffered output.
- * @param io the connection io
- * @param b the bucket to append
- */
-apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush);
+apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb);
/**
* Append an End-Of-Connection bucket to the output that, once destroyed,
}
static void have_out_data_for(h2_mplx *m, int stream_id);
+static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);
static void check_tx_reservation(h2_mplx *m)
{
{
h2_mplx *m = ctx;
h2_stream *stream = val;
+ h2_task *task = h2_ihash_get(m->tasks, stream->id);
h2_ihash_remove(m->spurge, stream->id);
h2_stream_destroy(stream);
+ if (task) {
+ task_destroy(m, task, 1);
+ }
return 0;
}
static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
{
- h2_task *task = h2_ihash_get(m->tasks, stream->id);
+ h2_task *task;
/* Situation: we are, on the master connection, done with processing
* the stream. Either we have handled it successfully, or the stream
* memory. We should either copy it on task creation or wait with the
* stream destruction until the task is done.
*/
+ h2_iq_remove(m->q, stream->id);
+ h2_ihash_remove(m->ready_tasks, stream->id);
h2_ihash_remove(m->streams, stream->id);
if (stream->input) {
m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
}
h2_stream_cleanup(stream);
+ task = h2_ihash_get(m->tasks, stream->id);
if (task) {
- /* Remove task from ready set, we will never submit it */
- h2_ihash_remove(m->ready_tasks, stream->id);
- task->input.beam = NULL;
-
if (!task->worker_done) {
/* task still running, cleanup once it is done */
if (rst_error) {
h2_task_rst(task, rst_error);
}
- /* FIXME: this should work, but does not
+ /* FIXME: this should work, but does not
h2_ihash_add(m->shold, stream);
return;*/
+ task->input.beam = NULL;
}
else {
/* already finished */
- h2_iq_remove(m->q, task->stream_id);
task_destroy(m, task, 0);
}
}
/* disable WINDOW_UPDATE callbacks */
h2_mplx_set_consumed_cb(m, NULL, NULL);
+ if (!h2_ihash_empty(m->shold)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): start release_join with %d streams in hold",
+ m->id, (int)h2_ihash_count(m->shold));
+ }
+ if (!h2_ihash_empty(m->spurge)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): start release_join with %d streams to purge",
+ m->id, (int)h2_ihash_count(m->spurge));
+ }
+
h2_iq_clear(m->q);
apr_thread_cond_broadcast(m->task_thawed);
while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
}
AP_DEBUG_ASSERT(h2_ihash_empty(m->streams));
+ if (!h2_ihash_empty(m->shold)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): 2. release_join with %d streams in hold",
+ m->id, (int)h2_ihash_count(m->shold));
+ }
+ if (!h2_ihash_empty(m->spurge)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): 2. release_join with %d streams to purge",
+ m->id, (int)h2_ihash_count(m->spurge));
+ }
+
/* If we still have busy workers, we cannot release our memory
- * pool yet, as slave connections have child pools of their respective
- * h2_io's.
- * Any remaining ios are processed in these workers. Any operation
- * they do on their input/outputs will be errored ECONNRESET/ABORTED,
- * so processing them should fail and workers *should* return.
+ * pool yet, as tasks have references to us.
+ * Any operation on the task slave connection will from now on
+ * be errored ECONNRESET/ABORTED, so processing them should fail
+ * and workers *should* return in a timely fashion.
*/
for (i = 0; m->workers_busy > 0; ++i) {
m->join_wait = wait;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): release_join, waiting on %d tasks to report back",
- m->id, (int)h2_ihash_count(m->tasks));
-
status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
if (APR_STATUS_IS_TIMEUP(status)) {
apr_thread_cond_broadcast(m->task_thawed);
}
}
- AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
+
AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
- purge_streams(m);
+ if (!h2_ihash_empty(m->spurge)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): release_join %d streams to purge",
+ m->id, (int)h2_ihash_count(m->spurge));
+ purge_streams(m);
+ }
+ AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
+ AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
- "h2_mplx(%ld): release_join (%d tasks left) -> destroy",
- m->id, (int)h2_ihash_count(m->tasks));
+ if (!h2_ihash_empty(m->tasks)) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056)
+ "h2_mplx(%ld): release_join -> destroy, "
+ "%d tasks still present",
+ m->id, (int)h2_ihash_count(m->tasks));
+ }
leave_mutex(m, acquired);
h2_mplx_destroy(m);
/* all gone */
h2_task_thaw(task);
/* we do not want the task to block on writing response
* bodies into the mplx. */
- /* FIXME: this implementation is incomplete. */
h2_task_set_io_blocking(task, 0);
apr_thread_cond_broadcast(m->task_thawed);
return;
}
else {
- h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+ h2_stream *stream;
+
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): task(%s) done", m->id, task->id);
out_close(m, task);
+ stream = h2_ihash_get(m->streams, task->stream_id);
if (ngn) {
apr_off_t bytes = 0;
h2_beam_on_consumed(task->output.beam, NULL, NULL);
h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%s): request done, %f ms"
- " elapsed", task->id,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%s): request done, %f ms elapsed", task->id,
(task->done_at - task->started_at) / 1000.0);
if (task->started_at > m->last_idle_block) {
/* this task finished without causing an 'idle block', e.g.
if (stream) {
/* hang around until the stream deregisters */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%s): task_done, stream still open",
+ task->id);
}
else {
+ /* stream done, was it placed in hold? */
stream = h2_ihash_get(m->shold, task->stream_id);
- task_destroy(m, task, 0);
if (stream) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%s): task_done, stream in hold",
+ task->id);
stream->response = NULL; /* ref from task memory */
/* We cannot destroy the stream here since this is
* called from a worker thread and freeing memory pools
h2_ihash_remove(m->shold, stream->id);
h2_ihash_add(m->spurge, stream);
}
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%s): task_done, stream not found",
+ task->id);
+ task_destroy(m, task, 0);
+ }
if (m->join_wait) {
apr_thread_cond_signal(m->join_wait);
return (s == end)? APR_EINVAL : APR_SUCCESS;
}
-apr_status_t h2_request_rwrite(h2_request *req, request_rec *r)
+apr_status_t h2_request_rwrite(h2_request *req, apr_pool_t *pool,
+ request_rec *r)
{
apr_status_t status;
const char *scheme, *authority;
- scheme = (r->parsed_uri.scheme? r->parsed_uri.scheme
+ scheme = apr_pstrdup(pool, r->parsed_uri.scheme? r->parsed_uri.scheme
: ap_http_scheme(r));
- authority = r->hostname;
+ authority = apr_pstrdup(pool, r->hostname);
if (!ap_strchr_c(authority, ':') && r->server && r->server->port) {
apr_port_t defport = apr_uri_port_of_scheme(scheme);
if (defport != r->server->port) {
/* port info missing and port is not default for scheme: append */
- authority = apr_psprintf(r->pool, "%s:%d", authority,
+ authority = apr_psprintf(pool, "%s:%d", authority,
(int)r->server->port);
}
}
- status = h2_req_make(req, r->pool, r->method, scheme, authority,
- apr_uri_unparse(r->pool, &r->parsed_uri,
- APR_URI_UNP_OMITSITEPART),
+ status = h2_req_make(req, pool, apr_pstrdup(pool, r->method), scheme,
+ authority, apr_uri_unparse(pool, &r->parsed_uri,
+ APR_URI_UNP_OMITSITEPART),
r->headers_in);
return status;
}
#include "h2.h"
-apr_status_t h2_request_rwrite(h2_request *req, request_rec *r);
+apr_status_t h2_request_rwrite(h2_request *req, apr_pool_t *pool,
+ request_rec *r);
apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool,
const char *name, size_t nlen,
uint32_t error_code)
{
conn_rec *c = session->c;
+ apr_bucket *b;
+ apr_status_t status;
+
if (!error_code) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): handled, closing",
h2_stream_rst(stream, error_code);
}
- return h2_conn_io_writeb(&session->io,
- h2_bucket_eos_create(c->bucket_alloc, stream), 0);
+ b = h2_bucket_eos_create(c->bucket_alloc, stream);
+ APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+ status = h2_conn_io_pass(&session->io, session->bbtmp);
+ apr_brigade_cleanup(session->bbtmp);
+ return status;
}
static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
return 0;
}
-static apr_status_t pass_data(void *ctx,
- const char *data, apr_off_t length)
-{
- return h2_conn_io_write(&((h2_session*)ctx)->io, data, length);
-}
-
-
static char immortal_zeros[H2_MAX_PADLEN];
static int on_send_data_cb(nghttp2_session *ngh2,
"h2_stream(%ld-%d): send_data_cb for %ld bytes",
session->id, (int)stream_id, (long)length);
- if (h2_conn_io_is_buffered(&session->io)) {
- status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
- if (status == APR_SUCCESS) {
- if (padlen) {
- status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
- }
-
- if (status == APR_SUCCESS) {
- apr_off_t len = length;
- status = h2_stream_readx(stream, pass_data, session, &len, &eos);
- if (status == APR_SUCCESS && len != length) {
- status = APR_EINVAL;
- }
- }
-
- if (status == APR_SUCCESS && padlen) {
- if (padlen) {
- status = h2_conn_io_write(&session->io, immortal_zeros, padlen);
- }
- }
- }
+ status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
+ if (padlen && status == APR_SUCCESS) {
+ status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
}
- else {
- status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
- if (padlen && status == APR_SUCCESS) {
- status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
- }
- if (status == APR_SUCCESS) {
- apr_off_t len = length;
- status = h2_stream_read_to(stream, session->io.output, &len, &eos);
- if (status == APR_SUCCESS && len != length) {
- status = APR_EINVAL;
- }
- }
-
- if (status == APR_SUCCESS && padlen) {
- b = apr_bucket_immortal_create(immortal_zeros, padlen,
- session->c->bucket_alloc);
- status = h2_conn_io_writeb(&session->io, b, 0);
+
+ if (status == APR_SUCCESS) {
+ apr_off_t len = length;
+ status = h2_stream_read_to(stream, session->bbtmp, &len, &eos);
+ if (status == APR_SUCCESS && len != length) {
+ status = APR_EINVAL;
}
}
+ if (status == APR_SUCCESS && padlen) {
+ b = apr_bucket_immortal_create(immortal_zeros, padlen,
+ session->c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+ }
+ if (status == APR_SUCCESS) {
+ status = h2_conn_io_pass(&session->io, session->bbtmp);
+ }
+
+ apr_brigade_cleanup(session->bbtmp);
if (status == APR_SUCCESS) {
stream->data_frames_sent++;
h2_conn_io_consider_pass(&session->io);
return APR_SUCCESS;
}
-static void h2_session_cleanup(h2_session *session)
+static void h2_session_destroy(h2_session *session)
{
- AP_DEBUG_ASSERT(session);
- /* This is an early cleanup of the session that may
- * discard what is no longer necessary for *new* streams
- * and general HTTP/2 processing.
- * At this point, all frames are in transit or somehwere in
- * our buffers or passed down output filters.
- * h2 streams might still being written out.
- */
- if (session->c) {
- h2_ctx_clear(session->c);
+ AP_DEBUG_ASSERT(session);
+
+ h2_ihash_clear(session->streams);
+ if (session->mplx) {
+ h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
+ h2_mplx_release_and_join(session->mplx, session->iowait);
+ session->mplx = NULL;
}
+
+ ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
+ session->c->input_filters), "H2_IN");
if (session->ngh2) {
nghttp2_session_del(session->ngh2);
session->ngh2 = NULL;
}
-}
+ if (session->c) {
+ h2_ctx_clear(session->c);
+ }
-static void h2_session_destroy(h2_session *session)
-{
- AP_DEBUG_ASSERT(session);
-
- h2_session_cleanup(session);
- AP_DEBUG_ASSERT(session->open_streams == h2_ihash_count(session->streams));
- h2_ihash_clear(session->streams);
- session->open_streams = 0;
-
- ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
- session->c->input_filters), "H2_IN");
if (APLOGctrace1(session->c)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session(%ld): destroy", session->id);
}
- if (session->mplx) {
- h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
- h2_mplx_release_and_join(session->mplx, session->iowait);
- session->mplx = NULL;
- }
if (session->pool) {
apr_pool_destroy(session->pool);
}
h2_session_receive, session);
ap_add_input_filter("H2_IN", session->cin, r, c);
- h2_conn_io_init(&session->io, c, session->config, session->pool);
+ h2_conn_io_init(&session->io, c, session->config);
session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc);
status = init_callbacks(c, &callbacks);
apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
{
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): cleanup by EOS bucket destroy",
+ "h2_stream(%ld-%d): EOS bucket cleanup -> done",
session->id, stream->id);
h2_ihash_remove(session->streams, stream->id);
--session->open_streams;
void h2_stream_destroy(h2_stream *stream)
{
AP_DEBUG_ASSERT(stream);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ "h2_stream(%ld-%d): destroy",
+ stream->session->id, stream->id);
if (stream->input) {
h2_beam_destroy(stream->input);
stream->input = NULL;
return APR_ECONNRESET;
}
set_state(stream, H2_STREAM_ST_OPEN);
- status = h2_request_rwrite(stream->request, r);
+ status = h2_request_rwrite(stream->request, stream->pool, r);
stream->request->serialize = h2_config_geti(h2_config_rget(r),
H2_CONF_SER_HEADERS);
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
return status;
}
+static const apr_size_t DATA_CHUNK_SIZE = ((16*1024) - 100 - 9);
+
apr_status_t h2_stream_out_prepare(h2_stream *stream,
apr_off_t *plen, int *peos)
{
conn_rec *c = stream->session->c;
apr_status_t status = APR_SUCCESS;
- apr_off_t requested = (*plen > 0)? *plen : 32*1024;
+ apr_off_t requested;
if (stream->rst_error) {
*plen = 0;
return APR_ECONNRESET;
}
+ if (*plen > 0) {
+ requested = H2MIN(*plen, DATA_CHUNK_SIZE);
+ }
+ else {
+ requested = DATA_CHUNK_SIZE;
+ }
+ *plen = requested;
+
H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre");
h2_util_bb_avail(stream->buffer, plen, peos);
- if (!*peos && !*plen) {
+ if (!*peos && *plen < requested) {
/* try to get more data */
- status = fill_buffer(stream, H2MIN(requested, 32*1024));
+ status = fill_buffer(stream, (requested - *plen) + DATA_CHUNK_SIZE);
if (APR_STATUS_IS_EOF(status)) {
apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
}
-apr_status_t h2_stream_readx(h2_stream *stream,
- h2_io_data_cb *cb, void *ctx,
- apr_off_t *plen, int *peos)
-{
- conn_rec *c = stream->session->c;
- apr_status_t status = APR_SUCCESS;
-
- if (stream->rst_error) {
- return APR_ECONNRESET;
- }
- status = h2_util_bb_readx(stream->buffer, cb, ctx, plen, peos);
- if (status == APR_SUCCESS && !*peos && !*plen) {
- status = APR_EAGAIN;
- }
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
- "h2_stream(%ld-%d): readx, len=%ld eos=%d",
- c->id, stream->id, (long)*plen, *peos);
- return status;
-}
-
-
apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
apr_off_t *plen, int *peos)
{
apr_status_t h2_stream_out_prepare(h2_stream *stream,
apr_off_t *plen, int *peos);
-/**
- * Read data from the stream output.
- *
- * @param stream the stream to read from
- * @param cb callback to invoke for byte chunks read. Might be invoked
- * multiple times (with different values) for one read operation.
- * @param ctx context data for callback
- * @param plen (in-/out) max. number of bytes to read and on return actual
- * number of bytes read
- * @param peos (out) != 0 iff end of stream has been reached while reading
- * @return APR_SUCCESS if out information was computed successfully.
- * APR_EAGAIN if not data is available and end of stream has not been
- * reached yet.
- */
-apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb,
- void *ctx, apr_off_t *plen, int *peos);
-
/**
* Read a maximum number of bytes into the bucket brigade.
*