struct h1_req_parser h1; /* parsing the request */
struct dynhds resp_trailers; /* response trailer fields */
size_t resp_hds_len; /* amount of response header bytes in recvbuf */
- size_t upload_blocked_len;
- curl_off_t upload_left; /* number of request bytes left to upload */
curl_off_t nrcvd_data; /* number of DATA bytes received */
char **push_headers; /* allocated array */
BIT(bodystarted);
BIT(send_closed); /* transfer is done sending, we might have still
buffered data in stream->sendbuf to upload. */
+ BIT(body_eos); /* the complete body has been added to `sendbuf` and
+ * is being/has been processed from there. */
};
#define H2_STREAM_CTX(ctx,data) ((struct h2_stream_ctx *)(\
stream->close_handled = FALSE;
stream->error = NGHTTP2_NO_ERROR;
stream->local_window_size = H2_STREAM_WINDOW_SIZE_INITIAL;
- stream->upload_left = 0;
stream->nrcvd_data = 0;
return stream;
}
(void)cf;
bits = CURL_CSELECT_IN;
if(!stream->send_closed &&
- (stream->upload_left || stream->upload_blocked_len))
+ (!stream->body_eos || !Curl_bufq_is_empty(&stream->sendbuf)))
bits |= CURL_CSELECT_OUT;
if(data->state.select_bits != bits) {
CURL_TRC_CF(data, cf, "[%d] DRAIN select_bits=%x",
drain_stream(cf, data, stream);
break;
case NGHTTP2_WINDOW_UPDATE:
- if(CURL_WANT_SEND(data)) {
+ if(CURL_WANT_SEND(data) && Curl_bufq_is_empty(&stream->sendbuf)) {
+ /* need more data, force processing of transfer */
drain_stream(cf, data, stream);
}
+ else if(!Curl_bufq_is_empty(&stream->sendbuf)) {
+ /* resume the potentially suspended stream */
+ rv = nghttp2_session_resume_data(ctx->h2, stream->id);
+ if(nghttp2_is_fatal(rv))
+ return CURLE_SEND_ERROR;
+ }
break;
default:
break;
(void)source;
(void)cf;
- if(stream_id) {
- /* get the stream from the hash based on Stream ID, stream ID zero is for
- connection-oriented stuff */
- data_s = nghttp2_session_get_stream_user_data(session, stream_id);
- if(!data_s)
- /* Receiving a Stream ID not in the hash should not happen, this is an
- internal error more than anything else! */
- return NGHTTP2_ERR_CALLBACK_FAILURE;
-
- stream = H2_STREAM_CTX(ctx, data_s);
- if(!stream)
- return NGHTTP2_ERR_CALLBACK_FAILURE;
- }
- else
+ if(!stream_id)
return NGHTTP2_ERR_INVALID_ARGUMENT;
+ /* get the stream from the hash based on Stream ID, stream ID zero is for
+ connection-oriented stuff */
+ data_s = nghttp2_session_get_stream_user_data(session, stream_id);
+ if(!data_s)
+ /* Receiving a Stream ID not in the hash should not happen, this is an
+ internal error more than anything else! */
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+
+ stream = H2_STREAM_CTX(ctx, data_s);
+ if(!stream)
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+
nread = Curl_bufq_read(&stream->sendbuf, buf, length, &result);
if(nread < 0) {
if(result != CURLE_AGAIN)
nread = 0;
}
- if(nread > 0 && stream->upload_left != -1)
- stream->upload_left -= nread;
-
- CURL_TRC_CF(data_s, cf, "[%d] req_body_read(len=%zu) left=%"
- CURL_FORMAT_CURL_OFF_T " -> %zd, %d",
- stream_id, length, stream->upload_left, nread, result);
+ CURL_TRC_CF(data_s, cf, "[%d] req_body_read(len=%zu) eos=%d -> %zd, %d",
+ stream_id, length, stream->body_eos, nread, result);
- if(stream->upload_left == 0)
- *data_flags = NGHTTP2_DATA_FLAG_EOF;
- else if(nread == 0)
+ if(nread == 0)
return NGHTTP2_ERR_DEFERRED;
+ if(stream->body_eos && Curl_bufq_is_empty(&stream->sendbuf))
+ *data_flags = NGHTTP2_DATA_FLAG_EOF;
return nread;
}
CURL_TRC_CF(data, cf, "[%d] data done send", stream->id);
if(!stream->send_closed) {
stream->send_closed = TRUE;
- if(stream->upload_left) {
+ if(!Curl_bufq_is_empty(&stream->sendbuf)) {
+ /* TODO: if we had not seen EOS on send(), it seems the request
+ * is now aborted? */
/* we now know that everything that is buffered is all there is. */
- stream->upload_left = Curl_bufq_len(&stream->sendbuf);
+ stream->body_eos = TRUE;
/* resume sending here to trigger the callback to get called again so
that it can signal EOF to nghttp2 */
(void)nghttp2_session_resume_data(ctx->h2, stream->id);
out:
result = h2_progress_egress(cf, data);
if(result == CURLE_AGAIN) {
- /* pending data to send, need to be called again. Ideally, we would
- * monitor the socket for POLLOUT, but we might not be in SENDING
- * transfer state any longer and are unable to make this happen.
- */
- drain_stream(cf, data, stream);
+ /* pending data to send, need to be called again. Ideally, we
+ * monitor the socket for POLLOUT, but when not SENDING
+ * any more, we force processing of the transfer. */
+ if(!CURL_WANT_SEND(data))
+ drain_stream(cf, data, stream);
}
else if(result) {
*err = result;
return nread;
}
+static ssize_t cf_h2_body_send(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct h2_stream_ctx *stream,
+ const void *buf, size_t blen, bool eos,
+ CURLcode *err)
+{
+ struct cf_h2_ctx *ctx = cf->ctx;
+ ssize_t nwritten;
+
+ if(stream->closed) {
+ if(stream->resp_hds_complete) {
+ /* Server decided to close the stream after having sent us a final
+ * response. This is valid if it is not interested in the request
+ * body. This happens on 30x or 40x responses.
+ * We silently discard the data sent, since this is not a transport
+ * error situation. */
+ CURL_TRC_CF(data, cf, "[%d] discarding data"
+ "on closed stream with response", stream->id);
+ if(eos)
+ stream->body_eos = TRUE;
+ *err = CURLE_OK;
+ return (ssize_t)blen;
+ }
+ /* Server closed before we got a response, this is an error */
+ infof(data, "stream %u closed", stream->id);
+ *err = CURLE_SEND_ERROR;
+ return -1;
+ }
+
+ nwritten = Curl_bufq_write(&stream->sendbuf, buf, blen, err);
+ if(nwritten < 0)
+ return -1;
+
+ if(eos && (blen == (size_t)nwritten))
+ stream->body_eos = TRUE;
+
+ if(eos || !Curl_bufq_is_empty(&stream->sendbuf)) {
+ /* resume the potentially suspended stream */
+ int rv = nghttp2_session_resume_data(ctx->h2, stream->id);
+ if(nghttp2_is_fatal(rv)) {
+ *err = CURLE_SEND_ERROR;
+ return -1;
+ }
+ }
+ return nwritten;
+}
+
static ssize_t h2_submit(struct h2_stream_ctx **pstream,
struct Curl_cfilter *cf, struct Curl_easy *data,
const void *buf, size_t len,
- size_t *phdslen, CURLcode *err)
+ bool eos, CURLcode *err)
{
struct cf_h2_ctx *ctx = cf->ctx;
struct h2_stream_ctx *stream = NULL;
nghttp2_priority_spec pri_spec;
ssize_t nwritten;
- *phdslen = 0;
Curl_dynhds_init(&h2_headers, 0, DYN_HTTP_REQUEST);
*err = http2_data_setup(cf, data, &stream);
nwritten = Curl_h1_req_parse_read(&stream->h1, buf, len, NULL, 0, err);
if(nwritten < 0)
goto out;
- *phdslen = (size_t)nwritten;
if(!stream->h1.done) {
/* need more data */
goto out;
case HTTPREQ_POST_FORM:
case HTTPREQ_POST_MIME:
case HTTPREQ_PUT:
- if(data->state.infilesize != -1)
- stream->upload_left = data->state.infilesize;
- else
- /* data sending without specifying the data amount up front */
- stream->upload_left = -1; /* unknown */
-
data_prd.read_callback = req_body_read_callback;
data_prd.source.ptr = NULL;
stream_id = nghttp2_submit_request(ctx->h2, &pri_spec, nva, nheader,
&data_prd, data);
break;
default:
- stream->upload_left = 0; /* no request body */
stream_id = nghttp2_submit_request(ctx->h2, &pri_spec, nva, nheader,
NULL, data);
}
body = (const char *)buf + nwritten;
bodylen = len - nwritten;
- if(bodylen) {
- /* We have request body to send in DATA frame */
- ssize_t n = Curl_bufq_write(&stream->sendbuf, body, bodylen, err);
- if(n < 0) {
+ if(bodylen || eos) {
+ ssize_t n = cf_h2_body_send(cf, data, stream, body, bodylen, eos, err);
+ if(n >= 0)
+ nwritten += n;
+ else if(*err == CURLE_AGAIN)
+ *err = CURLE_OK;
+ else if(*err != CURLE_AGAIN) {
*err = CURLE_SEND_ERROR;
nwritten = -1;
goto out;
}
- nwritten += n;
}
out:
struct cf_h2_ctx *ctx = cf->ctx;
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
struct cf_call_data save;
- int rv;
ssize_t nwritten;
- size_t hdslen = 0;
CURLcode result;
- int blocked = 0, was_blocked = 0;
CF_DATA_SAVE(save, cf, data);
- (void)eos; /* TODO: use for stream EOF */
- if(stream && stream->id != -1) {
- if(stream->upload_blocked_len) {
- /* the data in `buf` has already been submitted or added to the
- * buffers, but have been EAGAINed on the last invocation. */
- /* TODO: this assertion triggers in OSSFuzz runs and it is not
- * clear why. Disable for now to let OSSFuzz continue its tests. */
- DEBUGASSERT(len >= stream->upload_blocked_len);
- if(len < stream->upload_blocked_len) {
- /* Did we get called again with a smaller `len`? This should not
- * happen. We are not prepared to handle that. */
- failf(data, "HTTP/2 send again with decreased length (%zd vs %zd)",
- len, stream->upload_blocked_len);
- *err = CURLE_HTTP2;
- nwritten = -1;
- goto out;
- }
- nwritten = (ssize_t)stream->upload_blocked_len;
- stream->upload_blocked_len = 0;
- was_blocked = 1;
- }
- else if(stream->closed) {
- if(stream->resp_hds_complete) {
- /* Server decided to close the stream after having sent us a findl
- * response. This is valid if it is not interested in the request
- * body. This happens on 30x or 40x responses.
- * We silently discard the data sent, since this is not a transport
- * error situation. */
- CURL_TRC_CF(data, cf, "[%d] discarding data"
- "on closed stream with response", stream->id);
- *err = CURLE_OK;
- nwritten = (ssize_t)len;
- goto out;
- }
- infof(data, "stream %u closed", stream->id);
- *err = CURLE_SEND_ERROR;
- nwritten = -1;
+ if(!stream || stream->id == -1) {
+ nwritten = h2_submit(&stream, cf, data, buf, len, eos, err);
+ if(nwritten < 0) {
goto out;
}
- else {
- /* If stream_id != -1, we have dispatched request HEADERS and
- * optionally request body, and now are going to send or sending
- * more request body in DATA frame */
- nwritten = Curl_bufq_write(&stream->sendbuf, buf, len, err);
- if(nwritten < 0 && *err != CURLE_AGAIN)
- goto out;
- }
-
- if(!Curl_bufq_is_empty(&stream->sendbuf)) {
- /* req body data is buffered, resume the potentially suspended stream */
- rv = nghttp2_session_resume_data(ctx->h2, stream->id);
- if(nghttp2_is_fatal(rv)) {
- *err = CURLE_SEND_ERROR;
- nwritten = -1;
- goto out;
- }
- }
+ DEBUGASSERT(stream);
}
- else {
- nwritten = h2_submit(&stream, cf, data, buf, len, &hdslen, err);
+ else if(stream->body_eos) {
+ /* We already wrote this, but CURLE_AGAINed the call due to not
+ * being able to flush stream->sendbuf. Make a 0-length write
+ * to trigger flushing again.
+ * If this works, we report to have written `len` bytes. */
+ DEBUGASSERT(eos);
+ nwritten = cf_h2_body_send(cf, data, stream, buf, 0, eos, err);
+ CURL_TRC_CF(data, cf, "[%d] cf_body_send last CHUNK -> %zd, %d, eos=%d",
+ stream->id, nwritten, *err, eos);
if(nwritten < 0) {
goto out;
}
- DEBUGASSERT(stream);
- DEBUGASSERT(hdslen <= (size_t)nwritten);
+ nwritten = len;
+ }
+ else {
+ nwritten = cf_h2_body_send(cf, data, stream, buf, len, eos, err);
+ CURL_TRC_CF(data, cf, "[%d] cf_body_send(len=%zu) -> %zd, %d, eos=%d",
+ stream->id, len, nwritten, *err, eos);
}
/* Call the nghttp2 send loop and flush to write ALL buffered data,
* headers and/or request body completely out to the network */
result = h2_progress_egress(cf, data);
+
/* if the stream has been closed in egress handling (nghttp2 does that
* when it does not like the headers, for example */
- if(stream && stream->closed && !was_blocked) {
+ if(stream && stream->closed) {
infof(data, "stream %u closed", stream->id);
*err = CURLE_SEND_ERROR;
nwritten = -1;
goto out;
}
- else if(result == CURLE_AGAIN) {
- blocked = 1;
- }
- else if(result) {
+ else if(result && (result != CURLE_AGAIN)) {
*err = result;
nwritten = -1;
goto out;
}
- else if(stream && !Curl_bufq_is_empty(&stream->sendbuf)) {
- /* although we wrote everything that nghttp2 wants to send now,
- * there is data left in our stream send buffer unwritten. This may
- * be due to the stream's HTTP/2 flow window being exhausted. */
- blocked = 1;
- }
-
- if(stream && blocked && nwritten > 0) {
- /* Unable to send all data, due to connection blocked or H2 window
- * exhaustion. Data is left in our stream buffer, or nghttp2's internal
- * frame buffer or our network out buffer. */
- size_t rwin = (size_t)nghttp2_session_get_stream_remote_window_size(
- ctx->h2, stream->id);
- /* At the start of a stream, we are called with request headers
- * and, possibly, parts of the body. Later, only body data.
- * If we cannot send pure body data, we EAGAIN. If there had been
- * header, we return that *they* have been written and remember the
- * block on the data length only. */
- stream->upload_blocked_len = ((size_t)nwritten) - hdslen;
- CURL_TRC_CF(data, cf, "[%d] cf_send(len=%zu) BLOCK: win %u/%zu "
- "hds_len=%zu blocked_len=%zu",
- stream->id, len,
- nghttp2_session_get_remote_window_size(ctx->h2), rwin,
- hdslen, stream->upload_blocked_len);
- if(hdslen) {
- *err = CURLE_OK;
- nwritten = hdslen;
- }
- else {
- *err = CURLE_AGAIN;
- nwritten = -1;
- goto out;
- }
+ else if(stream && stream->body_eos &&
+ (!Curl_bufq_is_empty(&stream->sendbuf) ||
+ !Curl_bufq_is_empty(&ctx->outbufq))) {
+ /* We added the last send chunk to stream->sendbuf, but were unable
+ * to send it all off. Either the socket EAGAINed or the HTTP/2 flow
+ * control prevents it. This should be a call with `eos` set and
+ * we CURLE_AGAIN it until we flushed everything. */
+ CURL_TRC_CF(data, cf, "[%d] could not flush last send chunk -> EAGAIN",
+ stream->id);
+ *err = CURLE_AGAIN;
+ nwritten = -1;
}
- else if(should_close_session(ctx)) {
+
+ if(should_close_session(ctx)) {
/* nghttp2 thinks this session is done. If the stream has not been
* closed, this is an error state for out transfer */
if(stream->closed) {
out:
if(stream) {
CURL_TRC_CF(data, cf, "[%d] cf_send(len=%zu) -> %zd, %d, "
- "upload_left=%" CURL_FORMAT_CURL_OFF_T ", "
- "h2 windows %d-%d (stream-conn), "
+ "eos=%d, h2 windows %d-%d (stream-conn), "
"buffers %zu-%zu (stream-conn)",
stream->id, len, nwritten, *err,
- stream->upload_left,
+ stream->body_eos,
nghttp2_session_get_stream_remote_window_size(
ctx->h2, stream->id),
nghttp2_session_get_remote_window_size(ctx->h2),
stream->id);
want_recv = (want_recv || c_exhaust || s_exhaust);
want_send = (!s_exhaust && want_send) ||
- (!c_exhaust && nghttp2_session_want_write(ctx->h2));
+ (!c_exhaust && nghttp2_session_want_write(ctx->h2)) ||
+ !Curl_bufq_is_empty(&ctx->outbufq);
Curl_pollset_set(data, ps, sock, want_recv, want_send);
CF_DATA_RESTORE(cf, save);
else if(ctx->sent_goaway && !cf->shutdown) {
/* shutdown in progress */
CF_DATA_SAVE(save, cf, data);
- want_send = nghttp2_session_want_write(ctx->h2);
+ want_send = nghttp2_session_want_write(ctx->h2) ||
+ !Curl_bufq_is_empty(&ctx->outbufq);
want_recv = nghttp2_session_want_read(ctx->h2);
Curl_pollset_set(data, ps, sock, want_recv, want_send);
CF_DATA_RESTORE(cf, save);
}
/* GOAWAY submitted, process egress and ingress until nghttp2 is done. */
result = CURLE_OK;
- if(nghttp2_session_want_write(ctx->h2))
+ if(nghttp2_session_want_write(ctx->h2) ||
+ !Curl_bufq_is_empty(&ctx->outbufq))
result = h2_progress_egress(cf, data);
if(!result && nghttp2_session_want_read(ctx->h2))
result = h2_progress_ingress(cf, data, 0);
+ if(result == CURLE_AGAIN)
+ result = CURLE_OK;
+
*done = (ctx->conn_closed ||
(!result && !nghttp2_session_want_write(ctx->h2) &&
- !nghttp2_session_want_read(ctx->h2)));
+ !nghttp2_session_want_read(ctx->h2) &&
+ Curl_bufq_is_empty(&ctx->outbufq)));
out:
CF_DATA_RESTORE(cf, save);