const char *name,
int *sent,
apr_off_t bsize,
- int after)
+ int flags)
{
apr_status_t rv;
+ int flush_each = 0;
#ifdef DEBUGGING
apr_off_t len;
#endif
+ /*
+ * Compat: since FLUSH_EACH is default (and zero) for legacy reasons, we
+ * pretend it's no FLUSH_AFTER nor SHOULD_YIELD flags, the latter because
+ * flushing would defeat the purpose of checking for pending data (hence
+ * determine whether or not the output chain/stack is full for stopping).
+ */
+ if (!(flags & (AP_PROXY_TRANSFER_FLUSH_AFTER |
+ AP_PROXY_TRANSFER_SHOULD_YIELD))) {
+ flush_each = 1;
+ }
+
do {
apr_brigade_cleanup(bb_i);
rv = ap_get_brigade(c_i->input_filters, bb_i, AP_MODE_READBYTES,
if (rv == APR_SUCCESS) {
if (c_o->aborted) {
apr_brigade_cleanup(bb_i);
- return APR_EPIPE;
+ flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER;
+ rv = APR_EPIPE;
+ break;
}
if (APR_BRIGADE_EMPTY(bb_i)) {
break;
*sent = 1;
}
ap_proxy_buckets_lifetime_transform(r, bb_i, bb_o);
- if (!after) {
+ if (flush_each) {
apr_bucket *b;
/*
* Do not use ap_fflush here since this would cause the flush
* bucket to be sent in a separate brigade afterwards which
* causes some filters to set aside the buckets from the first
- * brigade and process them when the flush arrives in the second
+ * brigade and process them when FLUSH arrives in the second
* brigade. As set asides of our transformed buckets involve
* memory copying we try to avoid this. If we have the flush
* bucket in the first brigade they directly process the
"ap_proxy_transfer_between_connections: "
"error on %s - ap_pass_brigade",
name);
+ flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER;
+ }
+ else if ((flags & AP_PROXY_TRANSFER_SHOULD_YIELD) &&
+ ap_filter_should_yield(c_o->output_filters)) {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+ "ap_proxy_transfer_between_connections: "
+ "output filters full");
+ flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER;
+ rv = APR_INCOMPLETE;
}
apr_brigade_cleanup(bb_o);
}
}
} while (rv == APR_SUCCESS);
- if (after) {
+ if (flags & AP_PROXY_TRANSFER_FLUSH_AFTER) {
ap_fflush(c_o->output_filters, bb_o);
apr_brigade_cleanup(bb_o);
}
return rv;
}
+struct proxy_tunnel_conn {
+ conn_rec *c;
+ const char *name;
+ apr_pollfd_t *pfd;
+ apr_bucket_brigade *bb;
+ struct proxy_tunnel_conn *other;
+ unsigned int readable:1,
+ drain:1;
+};
+
PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel,
- request_rec *r,
- conn_rec *origin)
+ request_rec *r, conn_rec *c_o,
+ const char *scheme)
{
apr_status_t rv;
- apr_pollfd_t *pfds;
- conn_rec *c = r->connection;
+ conn_rec *c_i = r->connection;
proxy_tunnel_rec *tunnel;
*ptunnel = NULL;
tunnel = apr_pcalloc(r->pool, sizeof(*tunnel));
- tunnel->r = r;
- tunnel->origin = origin;
- tunnel->bb_i = apr_brigade_create(r->pool,
- c->bucket_alloc);
- tunnel->bb_o = apr_brigade_create(origin->pool,
- origin->bucket_alloc);
-
- tunnel->timeout = -1;
- rv = apr_pollset_create(&tunnel->pollset, 2, r->pool,
- APR_POLLSET_NOCOPY);
+ rv = apr_pollset_create(&tunnel->pollset, 2, r->pool, APR_POLLSET_NOCOPY);
if (rv != APR_SUCCESS) {
return rv;
}
+ tunnel->r = r;
+ tunnel->scheme = apr_pstrdup(r->pool, scheme);
+ tunnel->client = apr_pcalloc(r->pool, sizeof(struct proxy_tunnel_conn));
+ tunnel->origin = apr_pcalloc(r->pool, sizeof(struct proxy_tunnel_conn));
tunnel->pfds = apr_array_make(r->pool, 2, sizeof(apr_pollfd_t));
- apr_array_push(tunnel->pfds); /* pfds[0] */
- apr_array_push(tunnel->pfds); /* pfds[1] */
+ tunnel->timeout = -1;
- pfds = &APR_ARRAY_IDX(tunnel->pfds, 0, apr_pollfd_t);
- pfds[0].desc.s = ap_get_conn_socket(c);
- pfds[1].desc.s = ap_get_conn_socket(origin);
- pfds[0].desc_type = pfds[1].desc_type = APR_POLL_SOCKET;
- pfds[0].reqevents = pfds[1].reqevents = APR_POLLIN | APR_POLLHUP;
- pfds[0].p = pfds[1].p = r->pool;
+ tunnel->client->c = c_i;
+ tunnel->client->name = "client";
+ tunnel->client->bb = apr_brigade_create(c_i->pool, c_i->bucket_alloc);
+ tunnel->client->pfd = &APR_ARRAY_PUSH(tunnel->pfds, apr_pollfd_t);
+ memset(tunnel->client->pfd, 0, sizeof(*tunnel->client->pfd));
+ tunnel->client->pfd->p = r->pool;
+ tunnel->client->pfd->desc_type = APR_POLL_SOCKET;
+ tunnel->client->pfd->desc.s = ap_get_conn_socket(c_i);
+ tunnel->client->pfd->client_data = tunnel->client;
+ tunnel->client->other = tunnel->origin;
+ tunnel->client->readable = 1;
+
+ tunnel->origin->c = c_o;
+ tunnel->origin->name = "origin";
+ tunnel->origin->bb = apr_brigade_create(c_o->pool, c_o->bucket_alloc);
+ tunnel->origin->pfd = &APR_ARRAY_PUSH(tunnel->pfds, apr_pollfd_t);
+ memset(tunnel->origin->pfd, 0, sizeof(*tunnel->origin->pfd));
+ tunnel->origin->pfd->p = r->pool;
+ tunnel->origin->pfd->desc_type = APR_POLL_SOCKET;
+ tunnel->origin->pfd->desc.s = ap_get_conn_socket(c_o);
+ tunnel->origin->pfd->client_data = tunnel->origin;
+ tunnel->origin->other = tunnel->client;
+ tunnel->origin->readable = 1;
+
+#if 0
+ apr_socket_opt_set(tunnel->client->pfd->desc.s, APR_SO_NONBLOCK, 1);
+ apr_socket_opt_set(tunnel->client->pfd->desc.s, APR_SO_NONBLOCK, 1);
+ apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_KEEPALIVE, 1);
+ apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_KEEPALIVE, 1);
+#endif
+
+ /* No coalescing filters */
+ ap_remove_output_filter_byhandle(c_i->output_filters,
+ "SSL/TLS Coalescing Filter");
+ ap_remove_output_filter_byhandle(c_o->output_filters,
+ "SSL/TLS Coalescing Filter");
/* The input/output filter stacks should contain connection filters only */
- r->output_filters = c->output_filters;
- r->proto_output_filters = c->output_filters;
- r->input_filters = c->input_filters;
- r->proto_input_filters = c->input_filters;
+ r->input_filters = r->proto_input_filters = c_i->input_filters;
+ r->output_filters = r->proto_output_filters = c_i->output_filters;
- c->keepalive = AP_CONN_CLOSE;
- origin->keepalive = AP_CONN_CLOSE;
+ c_i->keepalive = AP_CONN_CLOSE;
+ c_o->keepalive = AP_CONN_CLOSE;
*ptunnel = tunnel;
return APR_SUCCESS;
}
-PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_run(proxy_tunnel_rec *tunnel)
+static void add_pollset(apr_pollset_t *pollset, apr_pollfd_t *pfd,
+ apr_int16_t events)
{
apr_status_t rv;
- request_rec *r = tunnel->r;
- conn_rec *c_i = r->connection;
- conn_rec *c_o = tunnel->origin;
- apr_socket_t *sock_i = ap_get_conn_socket(c_i);
- apr_socket_t *sock_o = ap_get_conn_socket(c_o);
- apr_interval_time_t timeout = tunnel->timeout >= 0 ? tunnel->timeout : -1;
- apr_pollfd_t *pfds = &APR_ARRAY_IDX(tunnel->pfds, 0, apr_pollfd_t);
- apr_pollset_t *pollset = tunnel->pollset;
- const apr_pollfd_t *signalled;
- apr_int32_t pollcnt, pi;
- int done = 0;
- AP_DEBUG_ASSERT(tunnel->pfds->nelts == 2);
- AP_DEBUG_ASSERT(pfds[0].desc.s == sock_i);
- AP_DEBUG_ASSERT(pfds[1].desc.s == sock_o);
+ if (events & APR_POLLIN) {
+ events |= APR_POLLHUP;
+ }
- ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(10212)
- "proxy: tunnel: running (timeout %" APR_TIME_T_FMT "."
- "%" APR_TIME_T_FMT ")",
- timeout > 0 ? apr_time_sec(timeout) : timeout,
- timeout > 0 ? timeout % APR_USEC_PER_SEC : 0);
+ if ((pfd->reqevents & events) == events) {
+ return;
+ }
-#if 0
- apr_socket_opt_set(sock_i, APR_SO_NONBLOCK, 1);
- apr_socket_opt_set(sock_i, APR_SO_NONBLOCK, 1);
- apr_socket_opt_set(sock_o, APR_SO_KEEPALIVE, 1);
- apr_socket_opt_set(sock_o, APR_SO_KEEPALIVE, 1);
-#endif
+ if (pfd->reqevents) {
+ rv = apr_pollset_remove(pollset, pfd);
+ if (rv != APR_SUCCESS) {
+ AP_DEBUG_ASSERT(1);
+ }
+ }
+
+ pfd->reqevents |= events;
+ rv = apr_pollset_add(pollset, pfd);
+ if (rv != APR_SUCCESS) {
+ AP_DEBUG_ASSERT(1);
+ }
+}
+
+static void del_pollset(apr_pollset_t *pollset, apr_pollfd_t *pfd,
+ apr_int16_t events)
+{
+ apr_status_t rv;
+
+ if (events & APR_POLLIN) {
+ events |= APR_POLLHUP;
+ }
+
+ if ((pfd->reqevents & events) == 0) {
+ return;
+ }
- apr_pollset_add(pollset, &pfds[0]);
- apr_pollset_add(pollset, &pfds[1]);
+ rv = apr_pollset_remove(pollset, pfd);
+ if (rv != APR_SUCCESS) {
+ AP_DEBUG_ASSERT(1);
+ }
- do { /* Loop until done (one side closes the connection, or an error) */
- rv = apr_pollset_poll(tunnel->pollset, timeout, &pollcnt, &signalled);
+ pfd->reqevents &= ~events;
+ if (pfd->reqevents) {
+ rv = apr_pollset_add(pollset, pfd);
if (rv != APR_SUCCESS) {
- if (APR_STATUS_IS_EINTR(rv)) {
- continue;
- }
+ AP_DEBUG_ASSERT(1);
+ }
+ }
+}
+
+PROXY_DECLARE(int) ap_proxy_tunnel_run(proxy_tunnel_rec *tunnel)
+{
+ int rc = OK;
+ request_rec *r = tunnel->r;
+ apr_pollset_t *pollset = tunnel->pollset;
+ apr_interval_time_t timeout = tunnel->timeout >= 0 ? tunnel->timeout : -1;
+ struct proxy_tunnel_conn *client = tunnel->client, *origin = tunnel->origin;
+ apr_size_t read_buf_size = ap_get_read_buf_size(r);
+ const char *scheme = tunnel->scheme;
+ apr_status_t rv;
+
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(10212)
+ "proxy: %s: tunnel running (timeout %" APR_TIME_T_FMT "."
+ "%" APR_TIME_T_FMT ")",
+ scheme, timeout > 0 ? apr_time_sec(timeout) : timeout,
+ timeout > 0 ? timeout % APR_USEC_PER_SEC : 0);
+
+ client->pfd->reqevents = 0;
+ origin->pfd->reqevents = 0;
+ add_pollset(pollset, client->pfd, APR_POLLIN);
+ add_pollset(pollset, origin->pfd, APR_POLLIN);
+
+ /* Loop until both directions of the connection are closed,
+ * or a failure occurs.
+ */
+ do {
+ struct proxy_tunnel_conn *in, *out;
+ const apr_pollfd_t *results;
+ apr_int32_t nresults, i;
- apr_pollset_remove(pollset, &pfds[1]);
- apr_pollset_remove(pollset, &pfds[0]);
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+ "proxy: %s: polling client=%hx, origin=%hx",
+ scheme, client->pfd->reqevents, origin->pfd->reqevents);
+ do {
+ rv = apr_pollset_poll(pollset, timeout, &nresults, &results);
+ } while (APR_STATUS_IS_EINTR(rv));
+ if (rv != APR_SUCCESS) {
if (APR_STATUS_IS_TIMEUP(rv)) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10213)
- "proxy: tunnel: woken up, i=%d", (int)pollcnt);
-
- return HTTP_GATEWAY_TIME_OUT;
+ "proxy: %s: polling timeout", scheme);
+ rc = HTTP_GATEWAY_TIME_OUT;
}
-
- ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(10214)
- "proxy: tunnel: polling failed");
- return HTTP_INTERNAL_SERVER_ERROR;
+ else {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(10214)
+ "proxy: %s: polling failed", scheme);
+ rc = HTTP_INTERNAL_SERVER_ERROR;
+ }
+ goto cleanup;
}
- ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10215)
- "proxy: tunnel: woken up, i=%d", (int)pollcnt);
-
- for (pi = 0; pi < pollcnt; pi++) {
- const apr_pollfd_t *cur = &signalled[pi];
- apr_int16_t pollevent = cur->rtnevents;
-
- if (cur->desc.s == sock_o) {
- if (pollevent & (APR_POLLIN | APR_POLLHUP)) {
- ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10216)
- "proxy: tunnel: backend was readable");
- rv = ap_proxy_transfer_between_connections(r, c_o, c_i,
- tunnel->bb_o,
- tunnel->bb_i,
- "backend",
- &tunnel->replied,
- AP_IOBUFSIZE,
- 0);
- done |= (rv != APR_SUCCESS);
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r, APLOGNO(10215)
+ "proxy: %s: woken up, %i result(s)", scheme, nresults);
+
+ for (i = 0; i < nresults; i++) {
+ const apr_pollfd_t *cur = &results[i];
+ int revents = cur->rtnevents;
+
+ /* sanity check */
+ if (cur->desc.s != client->pfd->desc.s
+ && cur->desc.s != origin->pfd->desc.s) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10222)
+ "proxy: %s: unknown socket in pollset", scheme);
+ rc = HTTP_INTERNAL_SERVER_ERROR;
+ goto cleanup;
+ }
+
+ in = cur->client_data;
+ if (revents & APR_POLLOUT) {
+ in = in->other;
+ }
+ else if (!(revents & (APR_POLLIN | APR_POLLHUP))) {
+ /* this catches POLLERR/POLLNVAL etc.. */
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10220)
+ "proxy: %s: polling events error (%x)",
+ scheme, revents);
+ rc = HTTP_INTERNAL_SERVER_ERROR;
+ goto cleanup;
+ }
+ out = in->other;
+
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+ "proxy: %s: #%i: %s/%hx => %s/%hx: %x",
+ scheme, i, in->name, in->pfd->reqevents,
+ out->name, out->pfd->reqevents, revents);
+
+ if (in->readable && (in->drain || !(revents & APR_POLLOUT))) {
+ int sent = 0;
+
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+ "proxy: %s: %s is %s", scheme, in->name,
+ (revents & APR_POLLOUT) ? "draining"
+ : "readable");
+
+ rv = ap_proxy_transfer_between_connections(r,
+ in->c, out->c,
+ in->bb, out->bb,
+ in->name, &sent,
+ read_buf_size,
+ AP_PROXY_TRANSFER_SHOULD_YIELD);
+ if (sent && out == client) {
+ tunnel->replied = 1;
+ }
+ if (rv != APR_SUCCESS) {
+ if (APR_STATUS_IS_INCOMPLETE(rv)) {
+ /* Pause POLLIN while waiting for POLLOUT on the other
+ * side, hence avoid filling the output filters even
+ * more and hence blocking there.
+ */
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+ "proxy: %s: %s wait writable",
+ scheme, out->name);
+ revents &= ~APR_POLLOUT;
+ in->drain = 1;
+ }
+ else if (APR_STATUS_IS_EOF(rv)) {
+ /* Stop POLLIN and wait for POLLOUT (flush) on the
+ * other side to shut it down.
+ */
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+ "proxy: %s: %s read shutdown",
+ scheme, in->name);
+ in->readable = in->drain = 0;
+ }
+ else {
+ /* Real failure, bail out */
+ rc = HTTP_INTERNAL_SERVER_ERROR;
+ goto cleanup;
+ }
+ del_pollset(pollset, in->pfd, APR_POLLIN);
+ sent = 1;
}
- else if (pollevent & APR_POLLERR) {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10217)
- "proxy: tunnel: error on backend connection");
- c_o->aborted = 1;
- done = 1;
+ else {
+ in->drain = 0;
}
- else {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10218)
- "proxy: tunnel: unknown event %d on backend connection",
- (int)pollevent);
- done = 1;
+
+ if (sent) {
+ add_pollset(pollset, out->pfd, APR_POLLOUT);
}
}
- else if (cur->desc.s == sock_i) {
- if (pollevent & (APR_POLLIN | APR_POLLHUP)) {
- ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10219)
- "proxy: tunnel: client was readable");
- rv = ap_proxy_transfer_between_connections(r, c_i, c_o,
- tunnel->bb_i,
- tunnel->bb_o,
- "client", NULL,
- AP_IOBUFSIZE,
- 0);
- done |= (rv != APR_SUCCESS);
- }
- else if (pollevent & APR_POLLERR) {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10220)
- "proxy: tunnel: error on client connection");
- c_i->aborted = 1;
- done = 1;
+
+ if (revents & APR_POLLOUT) {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+ "proxy: %s: %s is writable",
+ scheme, out->name);
+
+ rv = ap_filter_output_pending(out->c);
+ if (rv == DECLINED) {
+ /* No more pending data. If the 'in' side is not readable
+ * anymore it's time to shutdown for write (this direction
+ * is over). Otherwise draining (if any) is done, back to
+ * normal business.
+ */
+ if (!in->readable) {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+ "proxy: %s: %s write shutdown",
+ scheme, out->name);
+ del_pollset(pollset, out->pfd, APR_POLLOUT);
+ apr_socket_shutdown(out->pfd->desc.s, 1);
+ }
+ else {
+ add_pollset(pollset, in->pfd, APR_POLLIN);
+ if (!in->drain) {
+ del_pollset(pollset, out->pfd, APR_POLLOUT);
+ }
+ }
}
- else {
+ else if (rv != OK) {
+ /* Real failure, bail out */
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10221)
- "proxy: tunnel: unknown event %d on client connection",
- (int)pollevent);
- done = 1;
+ "proxy: %s: %s flushing failed (%i)",
+ scheme, out->name, rv);
+ rc = HTTP_INTERNAL_SERVER_ERROR;
+ goto cleanup;
}
}
- else {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10222)
- "proxy: tunnel: unknown socket in pollset");
- done = 1;
- }
}
- } while (!done);
+ } while (client->pfd->reqevents || origin->pfd->reqevents);
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(10223)
- "proxy: tunnel: finished");
-
- apr_pollset_remove(pollset, &pfds[1]);
- apr_pollset_remove(pollset, &pfds[0]);
+ "proxy: %s: tunnel finished", scheme);
- if (!tunnel->replied) {
- return HTTP_BAD_GATEWAY;
- }
-
- return OK;
+cleanup:
+ del_pollset(pollset, client->pfd, ~0);
+ del_pollset(pollset, origin->pfd, ~0);
+ return rc;
}
PROXY_DECLARE (const char *) ap_proxy_show_hcmethod(hcmethod_t method)