#include "http1.h"
#include "select.h"
#include "inet_pton.h"
+#include "transfer.h"
#include "vquic.h"
#include "vquic_int.h"
#include "vquic-tls.h"
struct h3_stream_ctx {
int64_t id; /* HTTP/3 protocol identifier */
struct bufq sendbuf; /* h3 request body */
- struct bufq recvbuf; /* h3 response body */
struct h1_req_parser h1; /* h1 request parsing */
size_t sendbuf_len_in_flight; /* sendbuf amount "in flight" */
size_t upload_blocked_len; /* the amount written last and EGAINed */
- size_t recv_buf_nonflow; /* buffered bytes, not counting for flow control */
uint64_t error3; /* HTTP/3 stream error code */
curl_off_t upload_left; /* number of request bytes left to upload */
int status_code; /* HTTP status code */
Curl_bufq_initp(&stream->sendbuf, &ctx->stream_bufcp,
H3_STREAM_SEND_CHUNKS, BUFQ_OPT_NONE);
stream->sendbuf_len_in_flight = 0;
- /* on recv, we need a flexible buffer limit since we also write
- * headers to it that are not counted against the nghttp3 flow limits. */
- Curl_bufq_initp(&stream->recvbuf, &ctx->stream_bufcp,
- H3_STREAM_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT);
- stream->recv_buf_nonflow = 0;
Curl_h1_req_parse_init(&stream->h1, H1_PARSE_DEFAULT_MAX_LINE_LEN);
H3_STREAM_LCTX(data) = stream;
}
Curl_bufq_free(&stream->sendbuf);
- Curl_bufq_free(&stream->recvbuf);
Curl_h1_req_parse_free(&stream->h1);
free(stream);
H3_STREAM_LCTX(data) = NULL;
return 0;
}
-static void report_consumed_data(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- size_t consumed)
-{
- struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
- struct cf_ngtcp2_ctx *ctx = cf->ctx;
-
- if(!stream)
- return;
- /* the HTTP/1.1 response headers are written to the buffer, but
- * consuming those does not count against flow control. */
- if(stream->recv_buf_nonflow) {
- if(consumed >= stream->recv_buf_nonflow) {
- consumed -= stream->recv_buf_nonflow;
- stream->recv_buf_nonflow = 0;
- }
- else {
- stream->recv_buf_nonflow -= consumed;
- consumed = 0;
- }
- }
- if(consumed > 0) {
- CURL_TRC_CF(data, cf, "[%" PRId64 "] ACK %zu bytes of DATA",
- stream->id, consumed);
- ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream->id,
- consumed);
- ngtcp2_conn_extend_max_offset(ctx->qconn, consumed);
- }
-}
-
static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags,
int64_t stream_id, uint64_t offset,
const uint8_t *buf, size_t buflen,
return 0;
}
-/*
- * write_resp_raw() copies response data in raw format to the `data`'s
- * receive buffer. If not enough space is available, it appends to the
- * `data`'s overflow buffer.
- */
-static CURLcode write_resp_raw(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- const void *mem, size_t memlen,
- bool flow)
+static CURLcode write_resp_hds(struct Curl_easy *data,
+ const char *buf, size_t blen)
{
- struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
- CURLcode result = CURLE_OK;
- ssize_t nwritten;
-
- (void)cf;
- if(!stream) {
- return CURLE_RECV_ERROR;
- }
- nwritten = Curl_bufq_write(&stream->recvbuf, mem, memlen, &result);
- if(nwritten < 0) {
- return result;
- }
-
- if(!flow)
- stream->recv_buf_nonflow += (size_t)nwritten;
-
- if((size_t)nwritten < memlen) {
- /* This MUST not happen. Our recbuf is dimensioned to hold the
- * full max_stream_window and then some for this very reason. */
- DEBUGASSERT(0);
- return CURLE_RECV_ERROR;
- }
- return result;
+ bool done;
+ return Curl_xfer_write_resp(data, (char *)buf, blen, FALSE, &done);
}
static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
- const uint8_t *buf, size_t buflen,
+ const uint8_t *buf, size_t blen,
void *user_data, void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
+ struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
CURLcode result;
+ bool done;
(void)conn;
(void)stream3_id;
if(!stream)
return NGHTTP3_ERR_CALLBACK_FAILURE;
- result = write_resp_raw(cf, data, buf, buflen, TRUE);
+ result = Curl_xfer_write_resp(data, (char *)buf, blen, FALSE, &done);
if(result) {
CURL_TRC_CF(data, cf, "[%" PRId64 "] DATA len=%zu, ERROR receiving %d",
- stream->id, buflen, result);
+ stream->id, blen, result);
return NGHTTP3_ERR_CALLBACK_FAILURE;
}
- CURL_TRC_CF(data, cf, "[%" PRId64 "] DATA len=%zu", stream->id, buflen);
- h3_drain_stream(cf, data);
+ if(blen) {
+ CURL_TRC_CF(data, cf, "[%" PRId64 "] ACK %zu bytes of DATA",
+ stream->id, blen);
+ ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream->id, blen);
+ ngtcp2_conn_extend_max_offset(ctx->qconn, blen);
+ }
+ CURL_TRC_CF(data, cf, "[%" PRId64 "] DATA len=%zu", stream->id, blen);
return 0;
}
if(!stream)
return 0;
/* add a CRLF only if we've received some headers */
- result = write_resp_raw(cf, data, "\r\n", 2, FALSE);
+ result = write_resp_hds(data, "\r\n", 2);
if(result) {
return -1;
}
ncopy = msnprintf(line, sizeof(line), "HTTP/3 %03d \r\n",
stream->status_code);
CURL_TRC_CF(data, cf, "[%" PRId64 "] status: %s", stream_id, line);
- result = write_resp_raw(cf, data, line, ncopy, FALSE);
+ result = write_resp_hds(data, line, ncopy);
if(result) {
return -1;
}
CURL_TRC_CF(data, cf, "[%" PRId64 "] header: %.*s: %.*s",
stream_id, (int)h3name.len, h3name.base,
(int)h3val.len, h3val.base);
- result = write_resp_raw(cf, data, h3name.base, h3name.len, FALSE);
+ result = write_resp_hds(data, (const char *)h3name.base, h3name.len);
if(result) {
return -1;
}
- result = write_resp_raw(cf, data, ": ", 2, FALSE);
+ result = write_resp_hds(data, ": ", 2);
if(result) {
return -1;
}
- result = write_resp_raw(cf, data, h3val.base, h3val.len, FALSE);
+ result = write_resp_hds(data, (const char *)h3val.base, h3val.len);
if(result) {
return -1;
}
- result = write_resp_raw(cf, data, "\r\n", 2, FALSE);
+ result = write_resp_hds(data, "\r\n", 2);
if(result) {
return -1;
}
/* incoming data frames on the h3 stream */
static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
- char *buf, size_t len, CURLcode *err)
+ char *buf, size_t blen, CURLcode *err)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct pkt_io_ctx pktx;
(void)ctx;
+ (void)buf;
CF_DATA_SAVE(save, cf, data);
DEBUGASSERT(cf->connected);
goto out;
}
- if(!Curl_bufq_is_empty(&stream->recvbuf)) {
- nread = Curl_bufq_read(&stream->recvbuf,
- (unsigned char *)buf, len, err);
- if(nread < 0) {
- CURL_TRC_CF(data, cf, "[%" PRId64 "] read recvbuf(len=%zu) "
- "-> %zd, %d", stream->id, len, nread, *err);
- goto out;
- }
- report_consumed_data(cf, data, nread);
- }
-
if(cf_progress_ingress(cf, data, &pktx)) {
*err = CURLE_RECV_ERROR;
nread = -1;
goto out;
}
- /* recvbuf had nothing before, maybe after progressing ingress? */
- if(nread < 0 && !Curl_bufq_is_empty(&stream->recvbuf)) {
- nread = Curl_bufq_read(&stream->recvbuf,
- (unsigned char *)buf, len, err);
- if(nread < 0) {
- CURL_TRC_CF(data, cf, "[%" PRId64 "] read recvbuf(len=%zu) "
- "-> %zd, %d", stream->id, len, nread, *err);
- goto out;
- }
- report_consumed_data(cf, data, nread);
- }
-
- if(nread > 0) {
- h3_drain_stream(cf, data);
- }
- else {
- if(stream->closed) {
- nread = recv_closed_stream(cf, data, stream, err);
- goto out;
- }
- *err = CURLE_AGAIN;
- nread = -1;
+ if(stream->closed) {
+ nread = recv_closed_stream(cf, data, stream, err);
+ goto out;
}
+ *err = CURLE_AGAIN;
+ nread = -1;
out:
if(cf_progress_egress(cf, data, &pktx)) {
nread = -1;
}
}
- CURL_TRC_CF(data, cf, "[%" PRId64 "] cf_recv(len=%zu) -> %zd, %d",
- stream? stream->id : -1, len, nread, *err);
+ CURL_TRC_CF(data, cf, "[%" PRId64 "] cf_recv(blen=%zu) -> %zd, %d",
+ stream? stream->id : -1, blen, nread, *err);
CF_DATA_RESTORE(cf, save);
return nread;
}
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct pkt_io_ctx local_pktx;
size_t pkts_chunk = 128, i;
- size_t pkts_max = 10 * pkts_chunk;
CURLcode result = CURLE_OK;
if(!pktx) {
if(result)
return result;
- for(i = 0; i < pkts_max; i += pkts_chunk) {
+ for(i = 0; i < 4; ++i) {
+ if(i)
+ pktx_update_time(pktx, cf);
pktx->pkt_count = 0;
result = vquic_recv_packets(cf, data, &ctx->q, pkts_chunk,
recv_pkt, pktx);
- if(result) /* error */
- break;
- if(pktx->pkt_count < pkts_chunk) /* got less than we could */
- break;
- /* give egress a chance before we receive more */
- result = cf_progress_egress(cf, data, pktx);
- if(result) /* error */
+ if(result || !pktx->pkt_count) /* error or got nothing */
break;
}
return result;
static bool cf_ngtcp2_data_pending(struct Curl_cfilter *cf,
const struct Curl_easy *data)
{
- const struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
(void)cf;
- return stream && !Curl_bufq_is_empty(&stream->recvbuf);
+ (void)data;
+ return FALSE;
}
static CURLcode h3_data_pause(struct Curl_cfilter *cf,