return rv;
}
+/* An arbitrary large value to address pathological case where we keep
+ * reading from one side only, without scheduling the other direction for
+ * too long. This can happen with large MTU and small read buffers, like
+ * micro-benchmarking huge files bidirectional transfer with client, proxy
+ * and backend on localhost for instance. Though we could just ignore the
+ * case and let the sender stop by itself at some point when/if it needs to
+ * receive data, or the receiver stop when/if it needs to send...
+ */
+#define PROXY_TRANSFER_MAX_READS 10000
+
PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
request_rec *r,
conn_rec *c_i,
{
apr_status_t rv;
int flush_each = 0;
+ unsigned int num_reads = 0;
#ifdef DEBUGGING
apr_off_t len;
#endif
/*
* Compat: since FLUSH_EACH is default (and zero) for legacy reasons, we
- * pretend it's no FLUSH_AFTER nor SHOULD_YIELD flags, the latter because
+ * pretend it's no FLUSH_AFTER nor YIELD_PENDING flags, the latter because
* flushing would defeat the purpose of checking for pending data (hence
* determine whether or not the output chain/stack is full for stopping).
*/
if (!(flags & (AP_PROXY_TRANSFER_FLUSH_AFTER |
- AP_PROXY_TRANSFER_SHOULD_YIELD))) {
+ AP_PROXY_TRANSFER_YIELD_PENDING))) {
flush_each = 1;
}
- do {
+ for (;;) {
+ /* Yield if the output filters stack is full? This is to avoid
+ * blocking and give the caller a chance to POLLOUT async.
+ */
+ if (flags & AP_PROXY_TRANSFER_YIELD_PENDING) {
+ int rc = OK;
+
+ if (!ap_filter_should_yield(c_o->output_filters)) {
+ rc = ap_filter_output_pending(c_o);
+ }
+ if (rc == OK) {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+ "ap_proxy_transfer_between_connections: "
+ "yield (output pending)");
+ rv = APR_INCOMPLETE;
+ break;
+ }
+ if (rc != DECLINED) {
+ rv = AP_FILTER_ERROR;
+ break;
+ }
+ }
+
+ /* Yield if we keep hold of the thread for too long? This gives
+ * the caller a chance to schedule the other direction too.
+ */
+ if ((flags & AP_PROXY_TRANSFER_YIELD_MAX_READS)
+ && ++num_reads > PROXY_TRANSFER_MAX_READS) {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+ "ap_proxy_transfer_between_connections: "
+ "yield (max reads)");
+ rv = APR_SUCCESS;
+ break;
+ }
+
apr_brigade_cleanup(bb_i);
rv = ap_get_brigade(c_i->input_filters, bb_i, AP_MODE_READBYTES,
APR_NONBLOCK_READ, bsize);
- if (rv == APR_SUCCESS) {
+ if (rv != APR_SUCCESS) {
+ if (!APR_STATUS_IS_EAGAIN(rv) && !APR_STATUS_IS_EOF(rv)) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, rv, r, APLOGNO(03308)
+ "ap_proxy_transfer_between_connections: "
+ "error on %s - ap_get_brigade",
+ name);
+ if (rv == APR_INCOMPLETE) {
+ /* Don't return APR_INCOMPLETE, it'd mean "should yield"
+ * for the caller, while it means "incomplete body" here
+ * from ap_http_filter(), which is an error.
+ */
+ rv = APR_EGENERAL;
+ }
+ }
+ break;
+ }
+ {
if (c_o->aborted) {
apr_brigade_cleanup(bb_i);
flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER;
APR_BRIGADE_INSERT_TAIL(bb_o, b);
}
rv = ap_pass_brigade(c_o->output_filters, bb_o);
+ apr_brigade_cleanup(bb_o);
if (rv != APR_SUCCESS) {
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(03307)
"ap_proxy_transfer_between_connections: "
"error on %s - ap_pass_brigade",
name);
flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER;
+ break;
}
- else if ((flags & AP_PROXY_TRANSFER_SHOULD_YIELD) &&
- ap_filter_should_yield(c_o->output_filters)) {
- ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
- "ap_proxy_transfer_between_connections: "
- "output filters full");
- flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER;
- rv = APR_INCOMPLETE;
- }
- apr_brigade_cleanup(bb_o);
- }
- else if (!APR_STATUS_IS_EAGAIN(rv) && !APR_STATUS_IS_EOF(rv)) {
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, rv, r, APLOGNO(03308)
- "ap_proxy_transfer_between_connections: "
- "error on %s - ap_get_brigade",
- name);
}
- } while (rv == APR_SUCCESS);
+ }
if (flags & AP_PROXY_TRANSFER_FLUSH_AFTER) {
ap_fflush(c_o->output_filters, bb_o);
apr_brigade_cleanup(bb_i);
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, rv, r,
- "ap_proxy_transfer_between_connections complete");
+ "ap_proxy_transfer_between_connections complete (%s %pI)",
+ (c_i == r->connection) ? "to" : "from",
+ (c_i == r->connection) ? c_o->client_addr
+ : c_i->client_addr);
if (APR_STATUS_IS_EAGAIN(rv)) {
rv = APR_SUCCESS;
}
-
return rv;
}
apr_bucket_brigade *bb;
struct proxy_tunnel_conn *other;
unsigned int readable:1,
- drain:1;
+ writable:1;
};
PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel,
tunnel->client->name = "client";
tunnel->client->bb = apr_brigade_create(c_i->pool, c_i->bucket_alloc);
tunnel->client->pfd = &APR_ARRAY_PUSH(tunnel->pfds, apr_pollfd_t);
- memset(tunnel->client->pfd, 0, sizeof(*tunnel->client->pfd));
tunnel->client->pfd->p = r->pool;
tunnel->client->pfd->desc_type = APR_POLL_SOCKET;
tunnel->client->pfd->desc.s = ap_get_conn_socket(c_i);
tunnel->origin->name = "origin";
tunnel->origin->bb = apr_brigade_create(c_o->pool, c_o->bucket_alloc);
tunnel->origin->pfd = &APR_ARRAY_PUSH(tunnel->pfds, apr_pollfd_t);
- memset(tunnel->origin->pfd, 0, sizeof(*tunnel->origin->pfd));
tunnel->origin->pfd->p = r->pool;
tunnel->origin->pfd->desc_type = APR_POLL_SOCKET;
tunnel->origin->pfd->desc.s = ap_get_conn_socket(c_o);
tunnel->origin->other = tunnel->client;
tunnel->origin->readable = 1;
-#if 0
- apr_socket_opt_set(tunnel->client->pfd->desc.s, APR_SO_NONBLOCK, 1);
+ /* We should be nonblocking from now on the sockets */
apr_socket_opt_set(tunnel->client->pfd->desc.s, APR_SO_NONBLOCK, 1);
- apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_KEEPALIVE, 1);
- apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_KEEPALIVE, 1);
-#endif
+ apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_NONBLOCK, 1);
/* No coalescing filters */
ap_remove_output_filter_byhandle(c_i->output_filters,
ap_remove_output_filter_byhandle(c_o->output_filters,
"SSL/TLS Coalescing Filter");
+ /* Bidirectional non-HTTP stream will confuse mod_reqtimeoout */
+ ap_remove_input_filter_byhandle(c_i->input_filters, "reqtimeout");
+
/* The input/output filter stacks should contain connection filters only */
r->input_filters = r->proto_input_filters = c_i->input_filters;
r->output_filters = r->proto_output_filters = c_i->output_filters;
+ /* Won't be reused after tunneling */
c_i->keepalive = AP_CONN_CLOSE;
c_o->keepalive = AP_CONN_CLOSE;
+ /* Start with POLLOUT and let ap_proxy_tunnel_run() schedule both
+ * directions when there are no output data pending (anymore).
+ */
+ tunnel->client->pfd->reqevents = APR_POLLOUT;
+ rv = apr_pollset_add(tunnel->pollset, tunnel->client->pfd);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+ tunnel->origin->pfd->reqevents = APR_POLLOUT;
+ rv = apr_pollset_add(tunnel->pollset, tunnel->origin->pfd);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
*ptunnel = tunnel;
return APR_SUCCESS;
}
int rc = OK;
request_rec *r = tunnel->r;
apr_pollset_t *pollset = tunnel->pollset;
+ struct proxy_tunnel_conn *client = tunnel->client,
+ *origin = tunnel->origin;
apr_interval_time_t timeout = tunnel->timeout >= 0 ? tunnel->timeout : -1;
- struct proxy_tunnel_conn *client = tunnel->client, *origin = tunnel->origin;
apr_size_t read_buf_size = ap_get_read_buf_size(r);
const char *scheme = tunnel->scheme;
apr_status_t rv;
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(10212)
- "proxy: %s: tunnel running (timeout %" APR_TIME_T_FMT "."
- "%" APR_TIME_T_FMT ")",
- scheme, timeout > 0 ? apr_time_sec(timeout) : timeout,
- timeout > 0 ? timeout % APR_USEC_PER_SEC : 0);
-
- client->pfd->reqevents = 0;
- origin->pfd->reqevents = 0;
- add_pollset(pollset, client->pfd, APR_POLLIN);
- add_pollset(pollset, origin->pfd, APR_POLLIN);
+ "proxy: %s: tunnel running (timeout %lf)",
+ scheme, timeout >= 0 ? (double)timeout / APR_USEC_PER_SEC
+ : (double)-1.0);
/* Loop until both directions of the connection are closed,
* or a failure occurs.
*/
do {
- struct proxy_tunnel_conn *in, *out;
const apr_pollfd_t *results;
apr_int32_t nresults, i;
ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
- "proxy: %s: polling client=%hx, origin=%hx",
+ "proxy: %s: polling (client=%hx, origin=%hx)",
scheme, client->pfd->reqevents, origin->pfd->reqevents);
do {
rv = apr_pollset_poll(pollset, timeout, &nresults, &results);
if (rv != APR_SUCCESS) {
if (APR_STATUS_IS_TIMEUP(rv)) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10213)
- "proxy: %s: polling timeout", scheme);
+ "proxy: %s: polling timed out "
+ "(client=%hx, origin=%hx)",
+ scheme, client->pfd->reqevents,
+ origin->pfd->reqevents);
rc = HTTP_GATEWAY_TIME_OUT;
}
else {
"proxy: %s: woken up, %i result(s)", scheme, nresults);
for (i = 0; i < nresults; i++) {
- const apr_pollfd_t *cur = &results[i];
- int revents = cur->rtnevents;
+ const apr_pollfd_t *pfd = &results[i];
+ struct proxy_tunnel_conn *tc = pfd->client_data;
+
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+ "proxy: %s: #%i: %s: %hx/%hx", scheme, i,
+ tc->name, pfd->rtnevents, tc->pfd->reqevents);
/* sanity check */
- if (cur->desc.s != client->pfd->desc.s
- && cur->desc.s != origin->pfd->desc.s) {
+ if (pfd->desc.s != client->pfd->desc.s
+ && pfd->desc.s != origin->pfd->desc.s) {
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10222)
"proxy: %s: unknown socket in pollset", scheme);
rc = HTTP_INTERNAL_SERVER_ERROR;
goto cleanup;
}
-
- in = cur->client_data;
- if (revents & APR_POLLOUT) {
- in = in->other;
- }
- else if (!(revents & (APR_POLLIN | APR_POLLHUP))) {
+ if (!(pfd->rtnevents & (APR_POLLIN | APR_POLLHUP | APR_POLLOUT))) {
/* this catches POLLERR/POLLNVAL etc.. */
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10220)
"proxy: %s: polling events error (%x)",
- scheme, revents);
+ scheme, pfd->rtnevents);
rc = HTTP_INTERNAL_SERVER_ERROR;
goto cleanup;
}
- out = in->other;
- ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
- "proxy: %s: #%i: %s/%hx => %s/%hx: %x",
- scheme, i, in->name, in->pfd->reqevents,
- out->name, out->pfd->reqevents, revents);
+ if (pfd->rtnevents & APR_POLLOUT) {
+ struct proxy_tunnel_conn *out = tc, *in = tc->other;
+
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+ "proxy: %s: %s output ready",
+ scheme, out->name);
+
+ rc = ap_filter_output_pending(out->c);
+ if (rc == OK) {
+ /* Keep polling out (only) */
+ continue;
+ }
+ if (rc != DECLINED) {
+ /* Real failure, bail out */
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10221)
+ "proxy: %s: %s flushing failed (%i)",
+ scheme, out->name, rc);
+ goto cleanup;
+ }
+ rc = OK;
+
+ /* No more pending data. If the input side is not readable
+ * anymore it's time to shutdown for write (this direction
+ * is over). Otherwise back to normal business.
+ */
+ del_pollset(pollset, out->pfd, APR_POLLOUT);
+ if (in->readable) {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE5, 0, r,
+ "proxy: %s: %s resume writable",
+ scheme, out->name);
+ add_pollset(pollset, in->pfd, APR_POLLIN);
+ out->writable = 1;
+ }
+ else {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r,
+ "proxy: %s: %s write shutdown",
+ scheme, out->name);
+ apr_socket_shutdown(out->pfd->desc.s, 1);
+ }
+ }
- if (in->readable && (in->drain || !(revents & APR_POLLOUT))) {
+ if (pfd->rtnevents & (APR_POLLIN | APR_POLLHUP)
+ || (tc->readable && tc->other->writable
+ && ap_filter_input_pending(tc->c) == OK)) {
+ struct proxy_tunnel_conn *in = tc, *out = tc->other;
int sent = 0;
ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
- "proxy: %s: %s is %s", scheme, in->name,
- (revents & APR_POLLOUT) ? "draining"
- : "readable");
+ "proxy: %s: %s input ready",
+ scheme, in->name);
rv = ap_proxy_transfer_between_connections(r,
- in->c, out->c,
- in->bb, out->bb,
- in->name, &sent,
- read_buf_size,
- AP_PROXY_TRANSFER_SHOULD_YIELD);
+ in->c, out->c,
+ in->bb, out->bb,
+ in->name, &sent,
+ read_buf_size,
+ AP_PROXY_TRANSFER_YIELD_PENDING |
+ AP_PROXY_TRANSFER_YIELD_MAX_READS);
if (sent && out == client) {
tunnel->replied = 1;
}
* side, hence avoid filling the output filters even
* more and hence blocking there.
*/
- ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE5, 0, r,
"proxy: %s: %s wait writable",
scheme, out->name);
- revents &= ~APR_POLLOUT;
- in->drain = 1;
+ out->writable = 0;
}
else if (APR_STATUS_IS_EOF(rv)) {
/* Stop POLLIN and wait for POLLOUT (flush) on the
* other side to shut it down.
*/
- ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r,
"proxy: %s: %s read shutdown",
scheme, in->name);
- in->readable = in->drain = 0;
+ in->readable = 0;
}
else {
/* Real failure, bail out */
rc = HTTP_INTERNAL_SERVER_ERROR;
goto cleanup;
}
- del_pollset(pollset, in->pfd, APR_POLLIN);
- sent = 1;
- }
- else {
- in->drain = 0;
- }
- if (sent) {
+ del_pollset(pollset, in->pfd, APR_POLLIN);
add_pollset(pollset, out->pfd, APR_POLLOUT);
}
}
-
- if (revents & APR_POLLOUT) {
- ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
- "proxy: %s: %s is writable",
- scheme, out->name);
-
- rv = ap_filter_output_pending(out->c);
- if (rv == DECLINED) {
- /* No more pending data. If the 'in' side is not readable
- * anymore it's time to shutdown for write (this direction
- * is over). Otherwise draining (if any) is done, back to
- * normal business.
- */
- if (!in->readable) {
- ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
- "proxy: %s: %s write shutdown",
- scheme, out->name);
- del_pollset(pollset, out->pfd, APR_POLLOUT);
- apr_socket_shutdown(out->pfd->desc.s, 1);
- }
- else {
- add_pollset(pollset, in->pfd, APR_POLLIN);
- if (!in->drain) {
- del_pollset(pollset, out->pfd, APR_POLLOUT);
- }
- }
- }
- else if (rv != OK) {
- /* Real failure, bail out */
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10221)
- "proxy: %s: %s flushing failed (%i)",
- scheme, out->name, rv);
- rc = HTTP_INTERNAL_SERVER_ERROR;
- goto cleanup;
- }
- }
}
} while (client->pfd->reqevents || origin->pfd->reqevents);
"proxy: %s: tunnel finished", scheme);
cleanup:
- del_pollset(pollset, client->pfd, ~0);
- del_pollset(pollset, origin->pfd, ~0);
return rc;
}