char *authority;
int32_t stream_id;
uint32_t error;
- size_t upload_blocked_len;
h2_tunnel_state state;
BIT(has_final_response);
BIT(closed);
struct Curl_easy *data,
struct tunnel_stream *tunnel)
{
+ struct cf_h2_proxy_ctx *ctx = cf->ctx;
unsigned char bits;
(void)cf;
bits = CURL_CSELECT_IN;
- if(!tunnel->closed && !tunnel->reset && tunnel->upload_blocked_len)
+ if(!tunnel->closed && !tunnel->reset &&
+ !Curl_bufq_is_empty(&ctx->tunnel.sendbuf))
bits |= CURL_CSELECT_OUT;
if(data->state.select_bits != bits) {
CURL_TRC_CF(data, cf, "[%d] DRAIN select_bits=%x",
bool want_recv, want_send;
if(!cf->connected && ctx->h2) {
- want_send = nghttp2_session_want_write(ctx->h2);
+ want_send = nghttp2_session_want_write(ctx->h2) ||
+ !Curl_bufq_is_empty(&ctx->outbufq) ||
+ !Curl_bufq_is_empty(&ctx->tunnel.sendbuf);
want_recv = nghttp2_session_want_read(ctx->h2);
}
else
ctx->h2, ctx->tunnel.stream_id);
want_recv = (want_recv || c_exhaust || s_exhaust);
want_send = (!s_exhaust && want_send) ||
- (!c_exhaust && nghttp2_session_want_write(ctx->h2));
+ (!c_exhaust && nghttp2_session_want_write(ctx->h2)) ||
+ !Curl_bufq_is_empty(&ctx->outbufq) ||
+ !Curl_bufq_is_empty(&ctx->tunnel.sendbuf);
Curl_pollset_set(data, ps, sock, want_recv, want_send);
+ CURL_TRC_CF(data, cf, "adjust_pollset, want_recv=%d want_send=%d",
+ want_recv, want_send);
CF_DATA_RESTORE(cf, save);
}
else if(ctx->sent_goaway && !cf->shutdown) {
/* shutdown in progress */
CF_DATA_SAVE(save, cf, data);
- want_send = nghttp2_session_want_write(ctx->h2);
+ want_send = nghttp2_session_want_write(ctx->h2) ||
+ !Curl_bufq_is_empty(&ctx->outbufq) ||
+ !Curl_bufq_is_empty(&ctx->tunnel.sendbuf);
want_recv = nghttp2_session_want_read(ctx->h2);
Curl_pollset_set(data, ps, sock, want_recv, want_send);
+ CURL_TRC_CF(data, cf, "adjust_pollset, want_recv=%d want_send=%d",
+ want_recv, want_send);
CF_DATA_RESTORE(cf, save);
}
}
}
result = proxy_h2_progress_egress(cf, data);
- if(result == CURLE_AGAIN) {
- /* pending data to send, need to be called again. Ideally, we would
- * monitor the socket for POLLOUT, but we might not be in SENDING
- * transfer state any longer and are unable to make this happen.
- */
- CURL_TRC_CF(data, cf, "[%d] egress blocked, DRAIN",
- ctx->tunnel.stream_id);
- drain_tunnel(cf, data, &ctx->tunnel);
- }
- else if(result) {
+ if(result && (result != CURLE_AGAIN)) {
*err = result;
nread = -1;
}
int rv;
ssize_t nwritten;
CURLcode result;
- int blocked = 0;
(void)eos; /* TODO, maybe useful for blocks? */
if(ctx->tunnel.state != H2_TUNNEL_ESTABLISHED) {
*err = CURLE_SEND_ERROR;
goto out;
}
- else if(ctx->tunnel.upload_blocked_len) {
- /* the data in `buf` has already been submitted or added to the
- * buffers, but have been EAGAINed on the last invocation. */
- DEBUGASSERT(len >= ctx->tunnel.upload_blocked_len);
- if(len < ctx->tunnel.upload_blocked_len) {
- /* Did we get called again with a smaller `len`? This should not
- * happen. We are not prepared to handle that. */
- failf(data, "HTTP/2 proxy, send again with decreased length");
- *err = CURLE_HTTP2;
- nwritten = -1;
- goto out;
- }
- nwritten = (ssize_t)ctx->tunnel.upload_blocked_len;
- ctx->tunnel.upload_blocked_len = 0;
- *err = CURLE_OK;
- }
else {
nwritten = Curl_bufq_write(&ctx->tunnel.sendbuf, buf, len, err);
- if(nwritten < 0) {
- if(*err != CURLE_AGAIN)
- goto out;
- nwritten = 0;
- }
+ if(nwritten < 0 && (*err != CURLE_AGAIN))
+ goto out;
}
if(!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
/* Call the nghttp2 send loop and flush to write ALL buffered data,
* headers and/or request body completely out to the network */
result = proxy_h2_progress_egress(cf, data);
- if(result == CURLE_AGAIN) {
- blocked = 1;
- }
- else if(result) {
+ if(result && (result != CURLE_AGAIN)) {
*err = result;
nwritten = -1;
goto out;
}
- else if(!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
- /* although we wrote everything that nghttp2 wants to send now,
- * there is data left in our stream send buffer unwritten. This may
- * be due to the stream's HTTP/2 flow window being exhausted. */
- blocked = 1;
- }
-
- if(blocked) {
- /* Unable to send all data, due to connection blocked or H2 window
- * exhaustion. Data is left in our stream buffer, or nghttp2's internal
- * frame buffer or our network out buffer. */
- size_t rwin = (size_t)nghttp2_session_get_stream_remote_window_size(
- ctx->h2, ctx->tunnel.stream_id);
- if(rwin == 0) {
- /* H2 flow window exhaustion.
- * FIXME: there is no way to HOLD all transfers that use this
- * proxy connection AND to UNHOLD all of them again when the
- * window increases.
- * We *could* iterate over all data on this conn maybe? */
- CURL_TRC_CF(data, cf, "[%d] remote flow "
- "window is exhausted", ctx->tunnel.stream_id);
- }
- /* Whatever the cause, we need to return CURL_EAGAIN for this call.
- * We have unwritten state that needs us being invoked again and EAGAIN
- * is the only way to ensure that. */
- ctx->tunnel.upload_blocked_len = nwritten;
- CURL_TRC_CF(data, cf, "[%d] cf_send(len=%zu) BLOCK: win %u/%zu "
- "blocked_len=%zu",
- ctx->tunnel.stream_id, len,
- nghttp2_session_get_remote_window_size(ctx->h2), rwin,
- nwritten);
- drain_tunnel(cf, data, &ctx->tunnel);
- *err = CURLE_AGAIN;
- nwritten = -1;
- goto out;
- }
- else if(proxy_h2_should_close_session(ctx)) {
+ if(proxy_h2_should_close_session(ctx)) {
/* nghttp2 thinks this session is done. If the stream has not been
* closed, this is an error state for out transfer */
if(ctx->tunnel.closed) {
return nwritten;
}
+static CURLcode cf_h2_proxy_flush(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
+{
+ struct cf_h2_proxy_ctx *ctx = cf->ctx;
+ struct cf_call_data save;
+ CURLcode result = CURLE_OK;
+
+ CF_DATA_SAVE(save, cf, data);
+ if(!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
+ /* resume the potentially suspended tunnel */
+ int rv = nghttp2_session_resume_data(ctx->h2, ctx->tunnel.stream_id);
+ if(nghttp2_is_fatal(rv)) {
+ result = CURLE_SEND_ERROR;
+ goto out;
+ }
+ }
+
+ result = proxy_h2_progress_egress(cf, data);
+
+out:
+ CURL_TRC_CF(data, cf, "[%d] flush -> %d, "
+ "h2 windows %d-%d (stream-conn), buffers %zu-%zu (stream-conn)",
+ ctx->tunnel.stream_id, result,
+ nghttp2_session_get_stream_remote_window_size(
+ ctx->h2, ctx->tunnel.stream_id),
+ nghttp2_session_get_remote_window_size(ctx->h2),
+ Curl_bufq_len(&ctx->tunnel.sendbuf),
+ Curl_bufq_len(&ctx->outbufq));
+ CF_DATA_RESTORE(cf, save);
+ return result;
+}
+
static bool proxy_h2_connisalive(struct Curl_cfilter *cf,
struct Curl_easy *data,
bool *input_pending)
return result;
}
+static CURLcode cf_h2_proxy_query(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ int query, int *pres1, void *pres2)
+{
+ struct cf_h2_proxy_ctx *ctx = cf->ctx;
+
+ switch(query) {
+ case CF_QUERY_NEED_FLUSH: {
+ if(!Curl_bufq_is_empty(&ctx->outbufq) ||
+ !Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
+ *pres1 = TRUE;
+ return CURLE_OK;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ return cf->next?
+ cf->next->cft->query(cf->next, data, query, pres1, pres2) :
+ CURLE_UNKNOWN_OPTION;
+}
+
+static CURLcode cf_h2_proxy_cntrl(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ int event, int arg1, void *arg2)
+{
+ CURLcode result = CURLE_OK;
+ struct cf_call_data save;
+
+ (void)arg1;
+ (void)arg2;
+
+ switch(event) {
+ case CF_CTRL_FLUSH:
+ CF_DATA_SAVE(save, cf, data);
+ result = cf_h2_proxy_flush(cf, data);
+ CF_DATA_RESTORE(cf, save);
+ break;
+ default:
+ break;
+ }
+ return result;
+}
+
struct Curl_cftype Curl_cft_h2_proxy = {
"H2-PROXY",
CF_TYPE_IP_CONNECT|CF_TYPE_PROXY,
cf_h2_proxy_data_pending,
cf_h2_proxy_send,
cf_h2_proxy_recv,
- Curl_cf_def_cntrl,
+ cf_h2_proxy_cntrl,
cf_h2_proxy_is_alive,
Curl_cf_def_conn_keep_alive,
- Curl_cf_def_query,
+ cf_h2_proxy_query,
};
CURLcode Curl_cf_h2_proxy_insert_after(struct Curl_cfilter *cf,
return FALSE;
}
+bool Curl_conn_needs_flush(struct Curl_easy *data, int sockindex)
+{
+ CURLcode result;
+ int pending = FALSE;
+
+ struct Curl_cfilter *cf = data->conn->cfilter[sockindex];
+ result = cf? cf->cft->query(cf, data, CF_QUERY_NEED_FLUSH,
+ &pending, NULL) : CURLE_UNKNOWN_OPTION;
+ return (result || pending == FALSE)? FALSE : TRUE;
+}
+
void Curl_conn_cf_adjust_pollset(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct easy_pollset *ps)
CF_CTRL_DATA_IDLE, 0, NULL);
}
+
+CURLcode Curl_conn_flush(struct Curl_easy *data, int sockindex)
+{
+ return Curl_conn_cf_cntrl(data->conn->cfilter[sockindex], data, FALSE,
+ CF_CTRL_FLUSH, 0, NULL);
+}
+
/**
* Notify connection filters that the transfer represented by `data`
* is done with sending data (e.g. has uploaded everything).
/* update conn info at connection and data */
#define CF_CTRL_CONN_INFO_UPDATE (256+0) /* 0 NULL ignored */
#define CF_CTRL_FORGET_SOCKET (256+1) /* 0 NULL ignored */
+#define CF_CTRL_FLUSH (256+2) /* 0 NULL first fail */
/**
* Handle event/control for the filter.
* were received.
* -1 if not determined yet.
* - CF_QUERY_SOCKET: the socket used by the filter chain
+ * - CF_QUERY_NEED_FLUSH: TRUE iff any of the filters have unsent data
*/
/* query res1 res2 */
#define CF_QUERY_MAX_CONCURRENT 1 /* number - */
#define CF_QUERY_TIMER_CONNECT 4 /* - struct curltime */
#define CF_QUERY_TIMER_APPCONNECT 5 /* - struct curltime */
#define CF_QUERY_STREAM_ERROR 6 /* error code - */
+#define CF_QUERY_NEED_FLUSH 7 /* TRUE/FALSE - */
/**
* Query the cfilter for properties. Filters ignorant of a query will
bool Curl_conn_data_pending(struct Curl_easy *data,
int sockindex);
+/**
+ * Return TRUE if any of the connection filters at chain `sockindex`
+ * have data still to send.
+ */
+bool Curl_conn_needs_flush(struct Curl_easy *data, int sockindex);
+
+/**
+ * Flush any pending data on the connection filters at chain `sockindex`.
+ */
+CURLcode Curl_conn_flush(struct Curl_easy *data, int sockindex);
+
/**
* Return the socket used on data's connection for the index.
* Returns CURL_SOCKET_BAD if not available.
switch(ctx->state) {
case EXP100_SENDING_REQUEST:
+ if(!Curl_req_sendbuf_empty(data)) {
+ /* The initial request data has not been fully sent yet. Do
+ * not start the timer yet. */
+ DEBUGF(infof(data, "cr_exp100_read, request not full sent yet"));
+ *nread = 0;
+ *eos = FALSE;
+ return CURLE_OK;
+ }
/* We are now waiting for a reply from the server or
- * a timeout on our side */
+ * a timeout on our side IFF the request has been fully sent. */
DEBUGF(infof(data, "cr_exp100_read, start AWAITING_CONTINUE"));
ctx->state = EXP100_AWAITING_CONTINUE;
ctx->start = Curl_now();
return nwritten;
}
+static CURLcode cf_h2_flush(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
+{
+ struct cf_h2_ctx *ctx = cf->ctx;
+ struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
+ struct cf_call_data save;
+ CURLcode result = CURLE_OK;
+
+ CF_DATA_SAVE(save, cf, data);
+ if(stream && !Curl_bufq_is_empty(&stream->sendbuf)) {
+ /* resume the potentially suspended stream */
+ int rv = nghttp2_session_resume_data(ctx->h2, stream->id);
+ if(nghttp2_is_fatal(rv)) {
+ result = CURLE_SEND_ERROR;
+ goto out;
+ }
+ }
+
+ result = h2_progress_egress(cf, data);
+
+out:
+ if(stream) {
+ CURL_TRC_CF(data, cf, "[%d] flush -> %d, "
+ "h2 windows %d-%d (stream-conn), "
+ "buffers %zu-%zu (stream-conn)",
+ stream->id, result,
+ nghttp2_session_get_stream_remote_window_size(
+ ctx->h2, stream->id),
+ nghttp2_session_get_remote_window_size(ctx->h2),
+ Curl_bufq_len(&stream->sendbuf),
+ Curl_bufq_len(&ctx->outbufq));
+ }
+ else {
+ CURL_TRC_CF(data, cf, "flush -> %d, "
+ "connection-window=%d, nw_send_buffer(%zu)",
+ result, nghttp2_session_get_remote_window_size(ctx->h2),
+ Curl_bufq_len(&ctx->outbufq));
+ }
+ CF_DATA_RESTORE(cf, save);
+ return result;
+}
+
static void cf_h2_adjust_pollset(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct easy_pollset *ps)
case CF_CTRL_DATA_PAUSE:
result = http2_data_pause(cf, data, (arg1 != 0));
break;
+ case CF_CTRL_FLUSH:
+ result = cf_h2_flush(cf, data);
+ break;
case CF_CTRL_DATA_DONE_SEND:
result = http2_data_done_send(cf, data);
break;
*pres1 = stream? (int)stream->error : 0;
return CURLE_OK;
}
+ case CF_QUERY_NEED_FLUSH: {
+ struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
+ if(!Curl_bufq_is_empty(&ctx->outbufq) ||
+ (stream && !Curl_bufq_is_empty(&stream->sendbuf))) {
+ *pres1 = TRUE;
+ return CURLE_OK;
+ }
+ break;
+ }
default:
break;
}
sock[sockindex] = conn->sockfd;
}
- if(CURL_WANT_SEND(data)) {
+ if(Curl_req_want_send(data)) {
if((conn->sockfd != conn->writesockfd) ||
bitmap == GETSOCK_BLANK) {
/* only if they are not the same socket and we have a readable
Curl_creader_done(data, data->req.upload_aborted);
if(data->req.upload_aborted) {
+ Curl_bufq_reset(&data->req.sendbuf);
if(data->req.writebytecount)
infof(data, "abort upload after having sent %" CURL_FORMAT_CURL_OFF_T
" bytes", data->req.writebytecount);
if(result)
return result;
if(!Curl_bufq_is_empty(&data->req.sendbuf)) {
+ DEBUGF(infof(data, "Curl_req_flush(len=%zu) -> EAGAIN",
+ Curl_bufq_len(&data->req.sendbuf)));
return CURLE_AGAIN;
}
}
+ else if(Curl_xfer_needs_flush(data)) {
+ DEBUGF(infof(data, "Curl_req_flush(), xfer send_pending"));
+ return Curl_xfer_flush(data);
+ }
if(!data->req.upload_done && data->req.eos_read &&
Curl_bufq_is_empty(&data->req.sendbuf)) {
}
#endif /* !USE_HYPER */
+bool Curl_req_sendbuf_empty(struct Curl_easy *data)
+{
+ return !data->req.sendbuf_init || Curl_bufq_is_empty(&data->req.sendbuf);
+}
+
bool Curl_req_want_send(struct Curl_easy *data)
{
- return data->req.sendbuf_init && !Curl_bufq_is_empty(&data->req.sendbuf);
+ /* Not done and
+ * - KEEP_SEND and not PAUSEd.
+ * - or request has buffered data to send
+ * - or transfer connection has pending data to send */
+ return !data->req.done &&
+ (((data->req.keepon & KEEP_SENDBITS) == KEEP_SEND) ||
+ !Curl_req_sendbuf_empty(data) ||
+ Curl_xfer_needs_flush(data));
}
bool Curl_req_done_sending(struct Curl_easy *data)
{
- if(data->req.upload_done) {
- DEBUGASSERT(Curl_bufq_is_empty(&data->req.sendbuf));
- return TRUE;
- }
- return FALSE;
+ return data->req.upload_done && !Curl_req_want_send(data);
}
CURLcode Curl_req_send_more(struct Curl_easy *data)
CURLcode result;
/* Fill our send buffer if more from client can be read. */
- if(!data->req.eos_read && !Curl_bufq_is_full(&data->req.sendbuf)) {
+ if(!data->req.upload_aborted &&
+ !data->req.eos_read &&
+ !(data->req.keepon & KEEP_SEND_PAUSE) &&
+ !Curl_bufq_is_full(&data->req.sendbuf)) {
ssize_t nread = Curl_bufq_sipn(&data->req.sendbuf, 0,
add_from_client, data, &result);
if(nread < 0 && result != CURLE_AGAIN)
if(!data->req.upload_done) {
Curl_bufq_reset(&data->req.sendbuf);
data->req.upload_aborted = TRUE;
+ /* no longer KEEP_SEND and KEEP_SEND_PAUSE */
+ data->req.keepon &= ~KEEP_SENDBITS;
return req_set_upload_done(data);
}
return CURLE_OK;
}
+
+CURLcode Curl_req_stop_send_recv(struct Curl_easy *data)
+{
+ /* stop receiving and ALL sending as well, including PAUSE and HOLD.
+ * We might still be paused on receive client writes though, so
+ * keep those bits around. */
+ data->req.keepon &= ~(KEEP_RECV|KEEP_SENDBITS);
+ return Curl_req_abort_sending(data);
+}
*/
bool Curl_req_want_send(struct Curl_easy *data);
+/**
+ * TRUE iff the request has no buffered bytes yet to send.
+ */
+bool Curl_req_sendbuf_empty(struct Curl_easy *data);
+
/**
* Stop sending any more request data to the server.
* Will clear the send buffer and mark request sending as done.
*/
CURLcode Curl_req_abort_sending(struct Curl_easy *data);
+/**
+ * Stop sending and receiving any more request data.
+ * Will abort sending if not done.
+ */
+CURLcode Curl_req_stop_send_recv(struct Curl_easy *data);
+
#endif /* HEADER_CURL_REQUEST_H */
DEBUGF(infof(data, "nread == 0, stream closed, bailing"));
else
DEBUGF(infof(data, "nread <= 0, server closed connection, bailing"));
- /* stop receiving and ALL sending as well, including PAUSE and HOLD.
- * We might still be paused on receive client writes though, so
- * keep those bits around. */
- k->keepon &= ~(KEEP_RECV|KEEP_SENDBITS);
+ result = Curl_req_stop_send_recv(data);
+ if(result)
+ goto out;
if(k->eos_written) /* already did write this to client, leave */
break;
}
may now close the connection. If there is now any kind of sending going
on from our side, we need to stop that immediately. */
infof(data, "we are done reading and this is set to close, stop send");
- k->keepon &= ~KEEP_SEND; /* no writing anymore either */
- k->keepon &= ~KEEP_SEND_PAUSE; /* no pausing anymore either */
+ Curl_req_abort_sending(data);
}
out:
*/
static CURLcode readwrite_upload(struct Curl_easy *data, int *didwhat)
{
- if((data->req.keepon & KEEP_SEND_PAUSE))
- return CURLE_OK;
-
/* We should not get here when the sending is already done. It
* probably means that someone set `data-req.keepon |= KEEP_SEND`
* when it should not. */
else
fd_read = CURL_SOCKET_BAD;
- if((k->keepon & KEEP_SENDBITS) == KEEP_SEND)
+ if(Curl_req_want_send(data))
fd_write = conn->writesockfd;
else
fd_write = CURL_SOCKET_BAD;
}
/* If we still have writing to do, we check if we have a writable socket. */
- if(((k->keepon & KEEP_SEND) && (select_bits & CURL_CSELECT_OUT)) ||
+ if((Curl_req_want_send(data) && (select_bits & CURL_CSELECT_OUT)) ||
(k->keepon & KEEP_SEND_TIMED)) {
/* write */
return Curl_cw_out_done(data);
}
+bool Curl_xfer_needs_flush(struct Curl_easy *data)
+{
+ int sockindex;
+ sockindex = ((data->conn->writesockfd != CURL_SOCKET_BAD) &&
+ (data->conn->writesockfd == data->conn->sock[SECONDARYSOCKET]));
+ return Curl_conn_needs_flush(data, sockindex);
+}
+
+CURLcode Curl_xfer_flush(struct Curl_easy *data)
+{
+ int sockindex;
+ sockindex = ((data->conn->writesockfd != CURL_SOCKET_BAD) &&
+ (data->conn->writesockfd == data->conn->sock[SECONDARYSOCKET]));
+ return Curl_conn_flush(data, sockindex);
+}
+
CURLcode Curl_xfer_send(struct Curl_easy *data,
const void *buf, size_t blen, bool eos,
size_t *pnwritten)
else if(!result && *pnwritten)
data->info.request_size += *pnwritten;
+ DEBUGF(infof(data, "Curl_xfer_send(len=%zu) -> %d, %zu",
+ blen, result, *pnwritten));
return result;
}
*/
CURLcode Curl_xfer_write_done(struct Curl_easy *data, bool premature);
+/**
+ * Return TRUE iff transfer has pending data to send. Checks involved
+ * connection filters.
+ */
+bool Curl_xfer_needs_flush(struct Curl_easy *data);
+
+/**
+ * Flush any pending send data on the transfer connection.
+ */
+CURLcode Curl_xfer_flush(struct Curl_easy *data);
+
/**
* Send data on the socket/connection filter designated
* for transfer's outgoing data.