return APR_SUCCESS;
}
-AP_CORE_DECLARE_NONSTD(apr_status_t) ap_request_core_filter(ap_filter_t *f,
- apr_bucket_brigade *bb)
-{
- apr_status_t status = APR_SUCCESS;
- apr_read_type_e block = APR_NONBLOCK_READ;
- conn_rec *c = f->r->connection;
- apr_bucket *flush_upto = NULL;
- apr_bucket_brigade *tmp_bb;
- apr_size_t tmp_bb_len = 0;
- core_server_config *conf;
- int seen_eor = 0;
-
- /*
- * Handle the AsyncFilter directive. We limit the filters that are
- * eligible for asynchronous handling here.
- */
- if (f->frec->ftype < f->c->async_filter) {
- ap_remove_output_filter(f);
- return ap_pass_brigade(f->next, bb);
- }
-
- conf = ap_get_core_module_config(f->r->server->module_config);
-
- /* Reinstate any buffered content */
- ap_filter_reinstate_brigade(f, bb, &flush_upto);
-
- /* After EOR is passed downstream, anything pooled on the request may
- * be destroyed (including bb!), but not tmp_bb which is acquired from
- * c->pool (and released after the below loop).
- */
- tmp_bb = ap_acquire_brigade(f->c);
-
- /* Don't touch *bb after seen_eor */
- while (status == APR_SUCCESS && !seen_eor && !APR_BRIGADE_EMPTY(bb)) {
- apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
- int do_pass = 0;
-
- if (AP_BUCKET_IS_EOR(bucket)) {
- /* pass out everything and never come back again,
- * r is destroyed with this bucket!
- */
- APR_BRIGADE_CONCAT(tmp_bb, bb);
- ap_remove_output_filter(f);
- seen_eor = 1;
- }
- else {
- /* if the core has set aside data, back off and try later */
- if (!flush_upto) {
- if (ap_filter_should_yield(f->next)) {
- break;
- }
- }
- else if (flush_upto == bucket) {
- flush_upto = NULL;
- }
-
- /* have we found a morphing bucket? if so, force it to morph into
- * something safe to pass down to the connection filters without
- * needing to be set aside.
- */
- if (bucket->length == (apr_size_t)-1) {
- const char *data;
- apr_size_t size;
-
- status = apr_bucket_read(bucket, &data, &size, block);
- if (status != APR_SUCCESS) {
- if (!APR_STATUS_IS_EAGAIN(status)
- || block != APR_NONBLOCK_READ) {
- break;
- }
- /* Flush everything so far and retry in blocking mode */
- bucket = apr_bucket_flush_create(c->bucket_alloc);
- block = APR_BLOCK_READ;
- }
- else {
- tmp_bb_len += size;
- block = APR_NONBLOCK_READ;
- }
- }
- else {
- tmp_bb_len += bucket->length;
- }
-
- /* move the bucket to tmp_bb and check whether it exhausts bb or
- * brings tmp_bb above the limit; in both cases it's time to pass
- * everything down the chain.
- */
- APR_BUCKET_REMOVE(bucket);
- APR_BRIGADE_INSERT_TAIL(tmp_bb, bucket);
- if (APR_BRIGADE_EMPTY(bb)
- || APR_BUCKET_IS_FLUSH(bucket)
- || tmp_bb_len >= conf->flush_max_threshold) {
- do_pass = 1;
- }
- }
-
- if (do_pass || seen_eor) {
- status = ap_pass_brigade(f->next, tmp_bb);
- apr_brigade_cleanup(tmp_bb);
- tmp_bb_len = 0;
- }
- }
-
- /* Don't touch *bb after seen_eor */
- if (!seen_eor) {
- apr_status_t rv;
- APR_BRIGADE_PREPEND(bb, tmp_bb);
- rv = ap_filter_setaside_brigade(f, bb);
- if (status == APR_SUCCESS) {
- status = rv;
- }
- }
-
- ap_release_brigade(f->c, tmp_bb);
-
- return status;
-}
-
extern APR_OPTIONAL_FN_TYPE(authz_some_auth_required) *ap__authz_ap_some_auth_required;
AP_DECLARE(int) ap_some_auth_required(request_rec *r)
return OK;
}
+static apr_status_t save_aside_brigade(struct ap_filter_private *fp,
+ apr_bucket_brigade *bb)
+{
+ if (!fp->deferred_pool) {
+ apr_pool_create(&fp->deferred_pool, fp->f->c->pool);
+ apr_pool_tag(fp->deferred_pool, "deferred_pool");
+ }
+ return ap_save_brigade(fp->f, &fp->bb, &bb, fp->deferred_pool);
+}
+
AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f,
apr_bucket_brigade *bb)
{
+ apr_status_t rv = APR_SUCCESS;
struct ap_filter_private *fp = f->priv;
ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c,
(!fp->bb || APR_BRIGADE_EMPTY(fp->bb)) ? "empty" : "full",
f->frec->name);
+ /* This API is not suitable for request filters */
+ if (f->frec->ftype < AP_FTYPE_CONNECTION) {
+ return APR_ENOTIMPL;
+ }
+
if (!APR_BRIGADE_EMPTY(bb)) {
+ apr_bucket_brigade *tmp_bb = NULL;
+ int batched_buckets = 0;
+ apr_bucket *e, *next;
+
/*
- * Set aside the brigade bb within fp->bb.
+ * Set aside the brigade bb to fp->bb.
*/
ap_filter_prepare_brigade(f);
- /* decide what pool we setaside to, request pool or deferred pool? */
- if (f->r) {
- apr_bucket *e;
- for (e = APR_BRIGADE_FIRST(bb); e != APR_BRIGADE_SENTINEL(bb); e =
- APR_BUCKET_NEXT(e)) {
- if (APR_BUCKET_IS_TRANSIENT(e)) {
- int rv = apr_bucket_setaside(e, f->r->pool);
+ for (e = APR_BRIGADE_FIRST(bb);
+ e != APR_BRIGADE_SENTINEL(bb);
+ e = next) {
+ next = APR_BUCKET_NEXT(e);
+
+ /* Morphing buckets are moved, so assumed to have next EOR's
+ * lifetime or at least the lifetime of the connection.
+ */
+ if (AP_BUCKET_IS_MORPHING(e)) {
+ /* Save buckets batched below? */
+ if (batched_buckets) {
+ batched_buckets = 0;
+ if (!tmp_bb) {
+ tmp_bb = ap_acquire_brigade(f->c);
+ }
+ apr_brigade_split_ex(bb, e, tmp_bb);
+ rv = save_aside_brigade(fp, bb);
+ APR_BRIGADE_CONCAT(bb, tmp_bb);
if (rv != APR_SUCCESS) {
- return rv;
+ break;
}
+ AP_DEBUG_ASSERT(APR_BRIGADE_FIRST(bb) == e);
}
+ APR_BUCKET_REMOVE(e);
+ APR_BRIGADE_INSERT_TAIL(fp->bb, e);
}
- APR_BRIGADE_CONCAT(fp->bb, bb);
- }
- else {
- if (!fp->deferred_pool) {
- apr_pool_create(&fp->deferred_pool, f->c->pool);
- apr_pool_tag(fp->deferred_pool, "deferred_pool");
+ else {
+ /* Batch successive buckets to save. */
+ batched_buckets = 1;
}
- return ap_save_brigade(f, &fp->bb, &bb, fp->deferred_pool);
}
-
+ if (tmp_bb) {
+ ap_release_brigade(f->c, tmp_bb);
+ }
+ if (batched_buckets) {
+ /* Save any remainder. */
+ rv = save_aside_brigade(fp, bb);
+ }
+ if (!APR_BRIGADE_EMPTY(bb)) {
+ /* Anything left in bb is what we could not save (error), clean up.
+ * This destroys anything pipelined so far, including EOR(s), and
+ * swallows all data, so from now this filter should only be passed
+ * connection close data like TLS close_notify.
+ *
+ * XXX: Should we cleanup all previous c->output_filters' setaside
+ * brigades?
+ *
+ * XXX: For each EOR we potentially destroy here, there is a
+ * request handler/module which "thought" everything went well
+ * on the output filters side, and returned OK. Should we mark
+ * something in each EOR's request_rec (e.g. r->aborted) for
+ * the log_transaction hooks to know at least?
+ * Or alternatively (and possibly more robustly) have the
+ * ap_core_output_filter() set r->flushed when it sees an EOR
+ * up to which it sent everything (before destroying it)?
+ * Anyway we can't set c->aborted here, because close_notify
+ * for instance can/should still be sent out.
+ */
+ AP_DEBUG_ASSERT(rv != APR_SUCCESS);
+ f->c->keepalive = AP_CONN_CLOSE;
+ apr_brigade_cleanup(bb);
+ }
}
else if (fp->deferred_pool) {
/*
apr_brigade_cleanup(fp->bb);
apr_pool_clear(fp->deferred_pool);
}
- return APR_SUCCESS;
+
+ return rv;
}
void ap_filter_adopt_brigade(ap_filter_t *f, apr_bucket_brigade *bb)
apr_bucket **flush_upto)
{
apr_bucket *bucket, *next;
- apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
- int eor_buckets_in_brigade, morphing_bucket_in_brigade;
+ apr_size_t bytes_in_brigade, memory_bytes_in_brigade;
+ int eor_buckets_in_brigade, morphing_buckets_in_brigade;
struct ap_filter_private *fp = f->priv;
core_server_config *conf;
(APR_BRIGADE_EMPTY(bb) ? "empty" : "full"),
f->frec->name);
+ /* This API is not suitable for request filters */
+ if (f->frec->ftype < AP_FTYPE_CONNECTION) {
+ return APR_ENOTIMPL;
+ }
+
+ /* Buckets in fp->bb are leftover from previous call to setaside, so
+ * they happen before anything added here in bb.
+ */
if (fp->bb) {
APR_BRIGADE_PREPEND(bb, fp->bb);
}
*flush_upto = NULL;
/*
- * Determine if and up to which bucket we need to do a blocking write:
+ * Determine if and up to which bucket the caller needs to do a blocking
+ * write:
*
- * a) The brigade contains a flush bucket: Do a blocking write
- * of everything up that point.
+ * a) The brigade contains at least one flush bucket: do blocking writes
+ * of everything up to the last one.
*
- * b) The request is in CONN_STATE_HANDLER state, and the brigade
- * contains at least flush_max_threshold bytes in non-file
- * buckets: Do blocking writes until the amount of data in the
- * buffer is less than flush_max_threshold. (The point of this
- * rule is to provide flow control, in case a handler is
- * streaming out lots of data faster than the data can be
+ * b) The brigade contains at least flush_max_threshold bytes in memory,
+ * that is non-file and non-morphing buckets: do blocking writes of
+ * everything up the last bucket above flush_max_threshold.
+ * (The point of this rule is to provide flow control, in case a
+ * handler is streaming out lots of data faster than the data can be
* sent to the client.)
*
- * c) The request is in CONN_STATE_HANDLER state, and the brigade
- * contains at least flush_max_pipelined EOR buckets:
- * Do blocking writes until less than flush_max_pipelined EOR
- * buckets are left. (The point of this rule is to prevent too many
- * FDs being kept open by pipelined requests, possibly allowing a
- * DoS).
+ * c) The brigade contains at least flush_max_pipelined EOR buckets: do
+ * blocking writes until the last EOR above flush_max_pipelined.
+ * (The point of this rule is to prevent too many FDs being kept open
+ * by pipelined requests, possibly allowing a DoS).
*
- * d) The request is being served by a connection filter and the
- * brigade contains a morphing bucket: If there was no other
- * reason to do a blocking write yet, try reading the bucket. If its
- * contents fit into memory before flush_max_threshold is reached,
- * everything is fine. Otherwise we need to do a blocking write the
- * up to and including the morphing bucket, because ap_save_brigade()
- * would read the whole bucket into memory later on.
+ * Note that morphing buckets use no memory until read, so they don't
+ * account for point b) above. Both ap_filter_reinstate_brigade() and
+ * ap_filter_setaside_brigade() assume that morphing buckets have an
+ * appropriate lifetime (until next EOR for instance), so they are simply
+ * setaside or reinstated by moving them from/to fp->bb to/from bb.
*/
bytes_in_brigade = 0;
- non_file_bytes_in_brigade = 0;
+ memory_bytes_in_brigade = 0;
eor_buckets_in_brigade = 0;
- morphing_bucket_in_brigade = 0;
+ morphing_buckets_in_brigade = 0;
conf = ap_get_core_module_config(f->c->base_server->module_config);
bucket = next) {
next = APR_BUCKET_NEXT(bucket);
- if (!APR_BUCKET_IS_METADATA(bucket)) {
- if (bucket->length == (apr_size_t)-1) {
- /*
- * A setaside of morphing buckets would read everything into
- * memory. Instead, we will flush everything up to and
- * including this bucket.
- */
- morphing_bucket_in_brigade = 1;
- }
- else {
- bytes_in_brigade += bucket->length;
- if (!APR_BUCKET_IS_FILE(bucket))
- non_file_bytes_in_brigade += bucket->length;
- }
- }
- else if (AP_BUCKET_IS_EOR(bucket)) {
+ if (AP_BUCKET_IS_EOR(bucket)) {
eor_buckets_in_brigade++;
}
+ else if (AP_BUCKET_IS_MORPHING(bucket)) {
+ morphing_buckets_in_brigade++;
+ }
+ else if (bucket->length) {
+ bytes_in_brigade += bucket->length;
+ if (!APR_BUCKET_IS_FILE(bucket)) {
+ memory_bytes_in_brigade += bucket->length;
+ }
+ }
if (APR_BUCKET_IS_FLUSH(bucket)
- || non_file_bytes_in_brigade >= conf->flush_max_threshold
- || (!f->r && morphing_bucket_in_brigade)
- || eor_buckets_in_brigade > conf->flush_max_pipelined) {
+ || memory_bytes_in_brigade >= conf->flush_max_threshold
+ || eor_buckets_in_brigade >= conf->flush_max_pipelined) {
/* this segment of the brigade MUST be sent before returning. */
if (APLOGctrace6(f->c)) {
char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
"FLUSH bucket" :
- (non_file_bytes_in_brigade >= conf->flush_max_threshold) ?
- "max threshold" :
- (!f->r && morphing_bucket_in_brigade) ? "morphing bucket" :
- "max requests in pipeline";
+ (memory_bytes_in_brigade >= conf->flush_max_threshold) ?
+ "max threshold" : "max requests in pipeline";
ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c,
"will flush because of %s", reason);
ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c,
"seen in brigade%s: bytes: %" APR_SIZE_T_FMT
- ", non-file bytes: %" APR_SIZE_T_FMT ", eor "
+ ", memory bytes: %" APR_SIZE_T_FMT ", eor "
"buckets: %d, morphing buckets: %d",
*flush_upto == NULL ? " so far"
: " since last flush point",
bytes_in_brigade,
- non_file_bytes_in_brigade,
+ memory_bytes_in_brigade,
eor_buckets_in_brigade,
- morphing_bucket_in_brigade);
+ morphing_buckets_in_brigade);
}
/*
* Defer the actual blocking write to avoid doing many writes.
*flush_upto = next;
bytes_in_brigade = 0;
- non_file_bytes_in_brigade = 0;
+ memory_bytes_in_brigade = 0;
eor_buckets_in_brigade = 0;
- morphing_bucket_in_brigade = 0;
+ morphing_buckets_in_brigade = 0;
}
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c,
- "brigade contains: bytes: %" APR_SIZE_T_FMT
+ "brigade contains%s: bytes: %" APR_SIZE_T_FMT
", non-file bytes: %" APR_SIZE_T_FMT
", eor buckets: %d, morphing buckets: %d",
- bytes_in_brigade, non_file_bytes_in_brigade,
- eor_buckets_in_brigade, morphing_bucket_in_brigade);
+ *flush_upto == NULL ? "" : " since last flush point",
+ bytes_in_brigade, memory_bytes_in_brigade,
+ eor_buckets_in_brigade, morphing_buckets_in_brigade);
return APR_SUCCESS;
}
}
/* Flush outer most filters first for ap_filter_should_yield(f->next)
- * to be relevant in the previous ones (e.g. ap_request_core_filter()
- * won't pass its buckets if its next filters yield already).
+ * to be relevant in the previous ones (async filters won't pass their
+ * buckets if their next filters yield already).
*/
bb = ap_acquire_brigade(c);
for (fp = APR_RING_LAST(x->pending_output_filters);