From: Stefan Eissing Date: Mon, 15 Jun 2026 10:34:09 +0000 (+0200) Subject: cf-capsule: complete filter X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=7333f6674c8d3dbbe3728c2bf18f24d43a666ea0;p=thirdparty%2Fcurl.git cf-capsule: complete filter Make the capsule filter complete, implement: - pollset handling - flush handling and querying - shutdown handling - replace allocated send buffer with a bufq Closes #22019 --- diff --git a/lib/vquic/capsule.c b/lib/vquic/capsule.c index 2123526fff..4bbdae3ace 100644 --- a/lib/vquic/capsule.c +++ b/lib/vquic/capsule.c @@ -163,24 +163,29 @@ UNITTEST size_t capsule_encap_udp_hdr(uint8_t *hdr, size_t hdrlen, return off; } -CURLcode Curl_capsule_encap_udp_datagram(struct dynbuf *dyn, +CURLcode Curl_capsule_encap_udp_datagram(struct bufq *q, const void *buf, size_t blen) { CURLcode result; uint8_t hdr[HTTP_CAPSULE_HEADER_MAX_SIZE]; - size_t hdr_len; + size_t hdr_len, nwritten; - curlx_dyn_init(dyn, HTTP_CAPSULE_HEADER_MAX_SIZE + blen); hdr_len = capsule_encap_udp_hdr(hdr, sizeof(hdr), blen); DEBUGASSERT(hdr_len); if(!hdr_len) return CURLE_FAILED_INIT; - result = curlx_dyn_addn(dyn, hdr, hdr_len); - if(result) - return result; - - return curlx_dyn_addn(dyn, buf, blen); + result = Curl_bufq_write(q, hdr, hdr_len, &nwritten); + if(!result && (nwritten != hdr_len)) + return CURLE_WRITE_ERROR; + if(!result) { + result = Curl_bufq_write(q, buf, blen, &nwritten); + if(!result && (nwritten != blen)) + return CURLE_WRITE_ERROR; + } + if(result == CURLE_AGAIN) + return CURLE_WRITE_ERROR; + return result; } size_t Curl_capsule_process_udp_raw(struct Curl_cfilter *cf, diff --git a/lib/vquic/capsule.h b/lib/vquic/capsule.h index 315ec8a61e..7861e6fd73 100644 --- a/lib/vquic/capsule.h +++ b/lib/vquic/capsule.h @@ -38,12 +38,12 @@ /** * Encapsulate UDP payload into HTTP Datagram capsule format - * @param dyn Dynamic buffer to write capsule to + * @param q the bufq to write the capsule to * @param buf Payload buffer * @param blen Payload buffer length * @return CURLE_OK on success, error code on failure */ -CURLcode Curl_capsule_encap_udp_datagram(struct dynbuf *dyn, +CURLcode Curl_capsule_encap_udp_datagram(struct bufq *q, const void *buf, size_t blen); struct Curl_easy; diff --git a/lib/vquic/cf-capsule.c b/lib/vquic/cf-capsule.c index 862b96cf93..3ff7827bba 100644 --- a/lib/vquic/cf-capsule.c +++ b/lib/vquic/cf-capsule.c @@ -25,41 +25,37 @@ #if !defined(CURL_DISABLE_PROXY) && !defined(CURL_DISABLE_HTTP) -#include #include "urldata.h" #include "cfilters.h" #include "curl_trc.h" -#include "curlx/dynbuf.h" #include "bufq.h" +#include "select.h" #include "vquic/capsule.h" #include "vquic/cf-capsule.h" -/* recv buffer: 4 chunks of 16KB = 64KB, enough for large datagrams */ +/* send/recv buffer: 4 chunks of 16KB = 64KB, enough for large datagrams */ #define CAPSULE_RECV_CHUNKS 4 +#define CAPSULE_SEND_CHUNKS 4 #define CAPSULE_CHUNK_SIZE (16 * 1024) struct cf_capsule_ctx { struct bufq recvbuf; - struct cf_call_data call_data; - unsigned char *pending; /* unsent capsule bytes from partial write */ - size_t pending_len; /* total length of pending buffer */ - size_t pending_offset; /* bytes already sent from pending */ - size_t pending_payload; /* original payload len for pending capsule */ + struct bufq sendbuf; }; -static void capsule_cf_destroy(struct Curl_cfilter *cf, +static void cf_capsule_destroy(struct Curl_cfilter *cf, struct Curl_easy *data) { struct cf_capsule_ctx *ctx = cf->ctx; (void)data; if(ctx) { Curl_bufq_free(&ctx->recvbuf); - curlx_free(ctx->pending); + Curl_bufq_free(&ctx->sendbuf); curlx_safefree(ctx); } } -static CURLcode capsule_cf_connect(struct Curl_cfilter *cf, +static CURLcode cf_capsule_connect(struct Curl_cfilter *cf, struct Curl_easy *data, bool *done) { @@ -77,79 +73,63 @@ static CURLcode capsule_cf_connect(struct Curl_cfilter *cf, return CURLE_OK; } -static CURLcode capsule_cf_send(struct Curl_cfilter *cf, +static CURLcode cf_capsule_flush(struct Curl_cfilter *cf, + struct Curl_easy *data) +{ + struct cf_capsule_ctx *ctx = cf->ctx; + CURLcode result = CURLE_OK; + size_t nwritten; + + if(Curl_bufq_is_empty(&ctx->sendbuf)) + return CURLE_OK; + + result = Curl_cf_send_bufq(cf->next, data, &ctx->sendbuf, NULL, 0, + &nwritten); + if(result) { + if(result == CURLE_AGAIN) { + CURL_TRC_CF(data, cf, "flush send buffer(%zu) -> EAGAIN", + Curl_bufq_len(&ctx->sendbuf)); + } + return result; + } + return Curl_bufq_is_empty(&ctx->sendbuf) ? CURLE_OK : CURLE_AGAIN; +} + +static CURLcode cf_capsule_send(struct Curl_cfilter *cf, struct Curl_easy *data, const uint8_t *buf, size_t len, bool eos, size_t *pnwritten) { struct cf_capsule_ctx *ctx = cf->ctx; - struct dynbuf dyn; - size_t nwritten = 0; - size_t capsule_len; - size_t remaining; CURLcode result; (void)eos; *pnwritten = 0; - if(ctx->pending) { - /* flush remaining bytes from a partially sent capsule */ - remaining = ctx->pending_len - ctx->pending_offset; - result = Curl_conn_cf_send(cf->next, data, - ctx->pending + ctx->pending_offset, - remaining, FALSE, &nwritten); - if(result && result != CURLE_AGAIN) { - curlx_safefree(ctx->pending); + if(Curl_bufq_is_full(&ctx->sendbuf)) { + result = cf_capsule_flush(cf, data); + if(result) return result; - } - ctx->pending_offset += nwritten; - if(ctx->pending_offset < ctx->pending_len) - return CURLE_AGAIN; - /* pending capsule has been fully flushed */ - *pnwritten = ctx->pending_payload; - curlx_safefree(ctx->pending); - return CURLE_OK; } /* encapsulate new payload into a capsule */ - result = Curl_capsule_encap_udp_datagram(&dyn, buf, len); - if(result) { - curlx_dyn_free(&dyn); + result = Curl_capsule_encap_udp_datagram(&ctx->sendbuf, buf, len); + if(result) return result; - } - capsule_len = curlx_dyn_len(&dyn); - result = Curl_conn_cf_send(cf->next, data, - (const uint8_t *)curlx_dyn_ptr(&dyn), - capsule_len, FALSE, &nwritten); - if(result && result != CURLE_AGAIN) { - curlx_dyn_free(&dyn); - return result; + result = cf_capsule_flush(cf, data); + if(result == CURLE_AGAIN) { + /* Could not send it (or all), report success nevertheless as we + * have the payload buffered now and will flush it later. */ + result = CURLE_OK; } - if(nwritten < capsule_len) { - /* partial or zero write - save unsent capsule bytes as pending */ - remaining = capsule_len - nwritten; - ctx->pending = curlx_malloc(remaining); - if(!ctx->pending) { - curlx_dyn_free(&dyn); - return CURLE_OUT_OF_MEMORY; - } - memcpy(ctx->pending, curlx_dyn_ptr(&dyn) + nwritten, remaining); - ctx->pending_len = remaining; - ctx->pending_offset = 0; - ctx->pending_payload = len; - curlx_dyn_free(&dyn); - return CURLE_AGAIN; - } - - /* entire capsule sent */ - curlx_dyn_free(&dyn); - *pnwritten = len; - return CURLE_OK; + if(!result) + *pnwritten = len; + return result; } -static CURLcode capsule_cf_recv(struct Curl_cfilter *cf, +static CURLcode cf_capsule_recv(struct Curl_cfilter *cf, struct Curl_easy *data, char *buf, size_t len, size_t *pnread) @@ -178,7 +158,7 @@ static CURLcode capsule_cf_recv(struct Curl_cfilter *cf, return result; } -static bool capsule_cf_data_pending(struct Curl_cfilter *cf, +static bool cf_capsule_data_pending(struct Curl_cfilter *cf, const struct Curl_easy *data) { struct cf_capsule_ctx *ctx = cf->ctx; @@ -188,26 +168,98 @@ static bool capsule_cf_data_pending(struct Curl_cfilter *cf, return cf->next ? cf->next->cft->has_data_pending(cf->next, data) : FALSE; } +static CURLcode cf_capsule_cntrl(struct Curl_cfilter *cf, + struct Curl_easy *data, + int event, int arg1, void *arg2) +{ + CURLcode result = CURLE_OK; + + (void)arg1; + (void)arg2; + switch(event) { + case CF_CTRL_FLUSH: + result = cf_capsule_flush(cf, data); + break; + default: + break; + } + return result; +} + +static CURLcode cf_capsule_query(struct Curl_cfilter *cf, + struct Curl_easy *data, + int query, int *pres1, void *pres2) +{ + struct cf_capsule_ctx *ctx = cf->ctx; + + (void)pres2; + switch(query) { + case CF_QUERY_NEED_FLUSH: { + if(!Curl_bufq_is_empty(&ctx->sendbuf)) { + *pres1 = TRUE; + return CURLE_OK; + } + break; + } + default: + break; + } + return cf->next ? + cf->next->cft->query(cf->next, data, query, pres1, pres2) : + CURLE_UNKNOWN_OPTION; +} + +static CURLcode cf_capsule_adjust_pollset(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct easy_pollset *ps) +{ + struct cf_capsule_ctx *ctx = cf->ctx; + + if(!Curl_bufq_is_empty(&ctx->sendbuf)) { + curl_socket_t sock = Curl_conn_cf_get_socket(cf, data); + if(sock != CURL_SOCKET_BAD) + return Curl_pollset_add_out(data, ps, sock); + } + return CURLE_OK; +} + +static CURLcode cf_capsule_shutdown(struct Curl_cfilter *cf, + struct Curl_easy *data, bool *done) +{ + CURLcode result = CURLE_OK; + + if(!cf->connected || cf->shutdown) { + *done = TRUE; + } + else { + result = cf_capsule_flush(cf, data); + *done = !result; + if(result == CURLE_AGAIN) + result = CURLE_OK; + } + return result; +} + struct Curl_cftype Curl_cft_capsule = { "CAPSULE", 0, 0, - capsule_cf_destroy, - capsule_cf_connect, - Curl_cf_def_shutdown, - Curl_cf_def_adjust_pollset, - capsule_cf_data_pending, - capsule_cf_send, - capsule_cf_recv, - Curl_cf_def_cntrl, + cf_capsule_destroy, + cf_capsule_connect, + cf_capsule_shutdown, + cf_capsule_adjust_pollset, + cf_capsule_data_pending, + cf_capsule_send, + cf_capsule_recv, + cf_capsule_cntrl, Curl_cf_def_conn_is_alive, Curl_cf_def_conn_keep_alive, - Curl_cf_def_query, + cf_capsule_query, }; -CURLcode Curl_cf_capsule_create(struct Curl_cfilter **pcf, - struct Curl_easy *data, - struct connectdata *conn) +static CURLcode cf_capsule_create(struct Curl_cfilter **pcf, + struct Curl_easy *data, + struct connectdata *conn) { struct Curl_cfilter *cf = NULL; struct cf_capsule_ctx *ctx; @@ -224,6 +276,8 @@ CURLcode Curl_cf_capsule_create(struct Curl_cfilter **pcf, Curl_bufq_init2(&ctx->recvbuf, CAPSULE_CHUNK_SIZE, CAPSULE_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT); + Curl_bufq_init2(&ctx->sendbuf, CAPSULE_CHUNK_SIZE, CAPSULE_SEND_CHUNKS, + BUFQ_OPT_SOFT_LIMIT); result = Curl_cf_create(&cf, &Curl_cft_capsule, ctx); @@ -231,6 +285,7 @@ out: *pcf = (!result) ? cf : NULL; if(result && ctx) { Curl_bufq_free(&ctx->recvbuf); + Curl_bufq_free(&ctx->sendbuf); curlx_free(ctx); } return result; @@ -242,7 +297,7 @@ CURLcode Curl_cf_capsule_insert_after(struct Curl_cfilter *cf_at, struct Curl_cfilter *cf; CURLcode result; - result = Curl_cf_capsule_create(&cf, data, cf_at->conn); + result = cf_capsule_create(&cf, data, cf_at->conn); if(!result) Curl_conn_cf_insert_after(cf_at, cf); return result; diff --git a/lib/vquic/cf-capsule.h b/lib/vquic/cf-capsule.h index e45983543a..437c9681b6 100644 --- a/lib/vquic/cf-capsule.h +++ b/lib/vquic/cf-capsule.h @@ -27,10 +27,6 @@ #if !defined(CURL_DISABLE_PROXY) && !defined(CURL_DISABLE_HTTP) -CURLcode Curl_cf_capsule_create(struct Curl_cfilter **pcf, - struct Curl_easy *data, - struct connectdata *conn); - /* Insert a capsule protocol filter after `cf_at` in the filter chain. * The capsule filter encapsulates/decapsulates UDP datagrams using * the HTTP Datagram capsule format (RFC 9297). */ diff --git a/lib/vquic/vquic.c b/lib/vquic/vquic.c index 9463d1db55..8cd006bcb5 100644 --- a/lib/vquic/vquic.c +++ b/lib/vquic/vquic.c @@ -280,16 +280,17 @@ static CURLcode send_packet_no_gso_cf(struct Curl_cfilter *cf, len = CURLMIN(gsolen, (size_t)(end - p)); result = Curl_conn_cf_send(cf->next, data, p, len, FALSE, &sent); /* Report forward progress even if we return CURLE_AGAIN later. */ - *psent += sent; VERBOSE(++calls); /* Preserve lower-filter errors (including CURLE_AGAIN). */ if(result) goto out; - if(sent < len) { - /* We need whole datagrams here. Partial accept means blocked. */ - result = CURLE_AGAIN; + + if(sent != len) { + /* We can only send the complete datagram, not parts. */ + result = CURLE_SEND_ERROR; goto out; } + *psent += sent; } out: diff --git a/lib/vquic/vquic_int.h b/lib/vquic/vquic_int.h index 82bd5b0358..db5183d431 100644 --- a/lib/vquic/vquic_int.h +++ b/lib/vquic/vquic_int.h @@ -90,14 +90,6 @@ void vquic_ctx_set_time(struct cf_quic_ctx *qctx, void vquic_ctx_update_time(struct cf_quic_ctx *qctx, const struct curltime *pnow); -void vquic_push_blocked_pkt(struct Curl_cfilter *cf, - struct cf_quic_ctx *qctx, - const uint8_t *pkt, size_t pktlen, size_t gsolen); - -CURLcode vquic_send_blocked_pkts(struct Curl_cfilter *cf, - struct Curl_easy *data, - struct cf_quic_ctx *qctx); - CURLcode vquic_send(struct Curl_cfilter *cf, struct Curl_easy *data, struct cf_quic_ctx *qctx, size_t gsolen); diff --git a/tests/unit/unit3400.c b/tests/unit/unit3400.c index 03b70b0dcd..060cc61799 100644 --- a/tests/unit/unit3400.c +++ b/tests/unit/unit3400.c @@ -98,7 +98,6 @@ static void check_capsule_result(struct bufq *q, static void test_capsule_encode_decode_roundtrip(void) { - struct dynbuf dyn; struct bufq q; unsigned char payload[128]; unsigned char out[128]; @@ -106,20 +105,19 @@ static void test_capsule_encode_decode_roundtrip(void) size_t payload_len; size_t i, nread; + Curl_bufq_init2(&q, 32, 8, BUFQ_OPT_NONE); + for(i = 0; i < sizeof(payload); ++i) payload[i] = (unsigned char)i; for(i = 0; i < 2; ++i) { payload_len = i ? 64 : 7; memset(out, 0, sizeof(out)); + Curl_bufq_reset(&q); - result = Curl_capsule_encap_udp_datagram(&dyn, payload, payload_len); + result = Curl_capsule_encap_udp_datagram(&q, payload, payload_len); fail_unless(result == CURLE_OK, "failed to encapsulate UDP datagram"); - Curl_bufq_init2(&q, 32, 8, BUFQ_OPT_NONE); - queue_bytes(&q, (const unsigned char *)curlx_dyn_ptr(&dyn), - curlx_dyn_len(&dyn)); - err = CURLE_OK; nread = Curl_capsule_process_udp_raw(NULL, NULL, &q, out, sizeof(out), &err); @@ -128,10 +126,8 @@ static void test_capsule_encode_decode_roundtrip(void) fail_unless(!memcmp(out, payload, payload_len), "decoded payload bytes mismatch"); fail_unless(Curl_bufq_is_empty(&q), "decoded capsule must be consumed"); - - Curl_bufq_free(&q); - curlx_dyn_free(&dyn); } + Curl_bufq_free(&q); } static void test_capsule_sequential_decode(void)