From: Stefan Eissing Date: Thu, 8 Aug 2024 14:00:24 +0000 (+0200) Subject: websocket: introduce blocking sends X-Git-Tag: curl-8_10_0~292 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=3e64569a;p=thirdparty%2Fcurl.git websocket: introduce blocking sends When using `curl_ws_send()`, perform a blocking send of the data under the following conditions: - the websocket is in raw mode and the call is done from within a curl callback. A partial write of the data could subsequently mess up the ws framing, as a callback has a hard time handling this. - the websocket is encoding the data itself, has added it to its internal sendbuf. A partial flush of the buffer has unclear semantics for the caller, as they will have no idea what to send again. Fixes WebSockets tests with CURL_DBG_SOCK_WBLOCK=90 set. Closes #14458 --- diff --git a/docs/TODO b/docs/TODO index 40c808eb08..366b1084bd 100644 --- a/docs/TODO +++ b/docs/TODO @@ -479,6 +479,8 @@ - curl_multi_remove_handle for any of the above. See section 2.3. + - Calling curl_ws_send() from a callback + 2.2 Better support for same name resolves If a name resolve has been initiated for name NN and a second easy handle diff --git a/docs/libcurl/curl_global_trace.md b/docs/libcurl/curl_global_trace.md index 0b0799db39..51b10fb413 100644 --- a/docs/libcurl/curl_global_trace.md +++ b/docs/libcurl/curl_global_trace.md @@ -109,6 +109,10 @@ Traces reading of upload data from the application in order to send it to the se Traces writing of download data, received from the server, to the application. +## `ws` + +Tracing of WebSocket operations when this protocol is enabled in your build. + # TRACE GROUPS Besides the specific component names there are the following group names diff --git a/docs/libcurl/curl_ws_send.md b/docs/libcurl/curl_ws_send.md index 82fce1cff3..262250b546 100644 --- a/docs/libcurl/curl_ws_send.md +++ b/docs/libcurl/curl_ws_send.md @@ -53,6 +53,10 @@ If **CURLWS_RAW_MODE** is enabled in CURLOPT_WS_OPTIONS(3), the To send a message consisting of multiple frames, set the *CURLWS_CONT* bit in all frames except the final one. +Warning: while it is possible to invoke this function from a callback, +such a call is blocking in this situation, e.g. only returns after all data +has been sent or an error is encountered. + # FLAGS ## CURLWS_TEXT diff --git a/lib/bufq.c b/lib/bufq.c index c3245516c9..46e6eaa386 100644 --- a/lib/bufq.c +++ b/lib/bufq.c @@ -91,6 +91,23 @@ static size_t chunk_read(struct buf_chunk *chunk, } } +static size_t chunk_unwrite(struct buf_chunk *chunk, size_t len) +{ + size_t n = chunk->w_offset - chunk->r_offset; + DEBUGASSERT(chunk->w_offset >= chunk->r_offset); + if(!n) { + return 0; + } + else if(n <= len) { + chunk->r_offset = chunk->w_offset = 0; + return n; + } + else { + chunk->w_offset -= len; + return len; + } +} + static ssize_t chunk_slurpn(struct buf_chunk *chunk, size_t max_len, Curl_bufq_reader *reader, void *reader_ctx, CURLcode *err) @@ -363,6 +380,49 @@ static void prune_head(struct bufq *q) } } +static struct buf_chunk *chunk_prev(struct buf_chunk *head, + struct buf_chunk *chunk) +{ + while(head) { + if(head == chunk) + return NULL; + if(head->next == chunk) + return head; + head = head->next; + } + return NULL; +} + +static void prune_tail(struct bufq *q) +{ + struct buf_chunk *chunk; + + while(q->tail && chunk_is_empty(q->tail)) { + chunk = q->tail; + q->tail = chunk_prev(q->head, chunk); + if(q->tail) + q->tail->next = NULL; + if(q->head == chunk) + q->head = q->tail; + if(q->pool) { + bufcp_put(q->pool, chunk); + --q->chunk_count; + } + else if((q->chunk_count > q->max_chunks) || + (q->opts & BUFQ_OPT_NO_SPARES)) { + /* SOFT_LIMIT allowed us more than max. free spares until + * we are at max again. Or free them if we are configured + * to not use spares. */ + free(chunk); + --q->chunk_count; + } + else { + chunk->next = q->spare; + q->spare = chunk; + } + } +} + static struct buf_chunk *get_non_full_tail(struct bufq *q) { struct buf_chunk *chunk; @@ -428,6 +488,15 @@ CURLcode Curl_bufq_cwrite(struct bufq *q, return result; } +CURLcode Curl_bufq_unwrite(struct bufq *q, size_t len) +{ + while(len && q->tail) { + len -= chunk_unwrite(q->head, len); + prune_tail(q); + } + return len? CURLE_AGAIN : CURLE_OK; +} + ssize_t Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len, CURLcode *err) { diff --git a/lib/bufq.h b/lib/bufq.h index 87ffa45da2..ec415648fd 100644 --- a/lib/bufq.h +++ b/lib/bufq.h @@ -182,6 +182,12 @@ CURLcode Curl_bufq_cwrite(struct bufq *q, const char *buf, size_t len, size_t *pnwritten); +/** + * Remove `len` bytes from the end of the buffer queue again. + * Returns CURLE_AGAIN if less than `len` bytes were in the queue. + */ +CURLcode Curl_bufq_unwrite(struct bufq *q, size_t len); + /** * Read buf from the start of the buffer queue. The buf is copied * and the amount of copied bytes is returned. diff --git a/lib/curl_trc.c b/lib/curl_trc.c index 3618275e5d..73748729ef 100644 --- a/lib/curl_trc.c +++ b/lib/curl_trc.c @@ -221,6 +221,24 @@ void Curl_trc_ftp(struct Curl_easy *data, const char *fmt, ...) } #endif /* !CURL_DISABLE_FTP */ +#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP) +struct curl_trc_feat Curl_trc_feat_ws = { + "WS", + CURL_LOG_LVL_NONE, +}; + +void Curl_trc_ws(struct Curl_easy *data, const char *fmt, ...) +{ + DEBUGASSERT(!strchr(fmt, '\n')); + if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_ws)) { + va_list ap; + va_start(ap, fmt); + trc_infof(data, &Curl_trc_feat_ws, fmt, ap); + va_end(ap); + } +} +#endif /* USE_WEBSOCKETS && !CURL_DISABLE_HTTP */ + #define TRC_CT_NONE (0) #define TRC_CT_PROTOCOL (1<<(0)) #define TRC_CT_NETWORK (1<<(1)) @@ -240,6 +258,9 @@ static struct trc_feat_def trc_feats[] = { #ifndef CURL_DISABLE_DOH { &Curl_doh_trc, TRC_CT_NETWORK }, #endif +#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP) + { &Curl_trc_feat_ws, TRC_CT_PROTOCOL }, +#endif }; struct trc_cft_def { diff --git a/lib/curl_trc.h b/lib/curl_trc.h index 3d38018342..c98234c6ac 100644 --- a/lib/curl_trc.h +++ b/lib/curl_trc.h @@ -89,6 +89,11 @@ void Curl_failf(struct Curl_easy *data, do { if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_ftp)) \ Curl_trc_ftp(data, __VA_ARGS__); } while(0) #endif /* !CURL_DISABLE_FTP */ +#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP) +#define CURL_TRC_WS(data, ...) \ + do { if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_ws)) \ + Curl_trc_ws(data, __VA_ARGS__); } while(0) +#endif /* USE_WEBSOCKETS && !CURL_DISABLE_HTTP */ #else /* CURL_HAVE_C99 */ @@ -100,6 +105,9 @@ void Curl_failf(struct Curl_easy *data, #ifndef CURL_DISABLE_FTP #define CURL_TRC_FTP Curl_trc_ftp #endif +#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP) +#define CURL_TRC_WS Curl_trc_ws +#endif #endif /* !CURL_HAVE_C99 */ @@ -148,6 +156,11 @@ extern struct curl_trc_feat Curl_trc_feat_ftp; void Curl_trc_ftp(struct Curl_easy *data, const char *fmt, ...) CURL_PRINTF(2, 3); #endif +#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP) +extern struct curl_trc_feat Curl_trc_feat_ws; +void Curl_trc_ws(struct Curl_easy *data, + const char *fmt, ...) CURL_PRINTF(2, 3); +#endif #else /* defined(CURL_DISABLE_VERBOSE_STRINGS) */ diff --git a/lib/ws.c b/lib/ws.c index 5854e5a5d3..9f80fec9f0 100644 --- a/lib/ws.c +++ b/lib/ws.c @@ -37,6 +37,7 @@ #include "ws.h" #include "easyif.h" #include "transfer.h" +#include "select.h" #include "nonblock.h" /* The last 3 #include files should be in this order */ @@ -136,6 +137,9 @@ static void ws_dec_info(struct ws_decoder *dec, struct Curl_easy *data, } } +static CURLcode ws_send_raw_blocking(CURL *data, struct websocket *ws, + const char *buffer, size_t buflen); + typedef ssize_t ws_write_payload(const unsigned char *buf, size_t buflen, int frame_age, int frame_flags, curl_off_t payload_offset, @@ -773,7 +777,7 @@ CURLcode Curl_ws_accept(struct Curl_easy *data, } } #endif - DEBUGF(infof(data, "WS, using chunk size %zu", chunk_size)); + CURL_TRC_WS(data, "WS, using chunk size %zu", chunk_size); Curl_bufq_init2(&ws->recvbuf, chunk_size, WS_CHUNK_COUNT, BUFQ_OPT_SOFT_LIMIT); Curl_bufq_init2(&ws->sendbuf, chunk_size, WS_CHUNK_COUNT, @@ -970,8 +974,8 @@ CURL_EXTERN CURLcode curl_ws_recv(struct Curl_easy *data, void *buffer, infof(data, "connection expectedly closed?"); return CURLE_GOT_NOTHING; } - DEBUGF(infof(data, "curl_ws_recv, added %zu bytes from network", - Curl_bufq_len(&ws->recvbuf))); + CURL_TRC_WS(data, "curl_ws_recv, added %zu bytes from network", + Curl_bufq_len(&ws->recvbuf)); } result = ws_dec_pass(&ws->dec, data, &ws->recvbuf, @@ -1001,14 +1005,14 @@ CURL_EXTERN CURLcode curl_ws_recv(struct Curl_easy *data, void *buffer, ctx.payload_len, ctx.bufidx); *metap = &ws->frame; *nread = ws->frame.len; - /* infof(data, "curl_ws_recv(len=%zu) -> %zu bytes (frame at %" - CURL_FORMAT_CURL_OFF_T ", %" CURL_FORMAT_CURL_OFF_T " left)", - buflen, *nread, ws->frame.offset, ws->frame.bytesleft); */ + CURL_TRC_WS(data, "curl_ws_recv(len=%zu) -> %zu bytes (frame at %" + CURL_FORMAT_CURL_OFF_T ", %" CURL_FORMAT_CURL_OFF_T " left)", + buflen, *nread, ws->frame.offset, ws->frame.bytesleft); return CURLE_OK; } static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws, - bool complete) + bool blocking) { if(!Curl_bufq_is_empty(&ws->sendbuf)) { CURLcode result; @@ -1016,7 +1020,11 @@ static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws, size_t outlen, n; while(Curl_bufq_peek(&ws->sendbuf, &out, &outlen)) { - if(data->set.connect_only) + if(blocking) { + result = ws_send_raw_blocking(data, ws, (char *)out, outlen); + n = result? 0 : outlen; + } + else if(data->set.connect_only || Curl_is_in_callback(data)) result = Curl_senddata(data, out, outlen, &n); else { result = Curl_xfer_send(data, out, outlen, FALSE, &n); @@ -1024,22 +1032,14 @@ static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws, result = CURLE_AGAIN; } - if(result) { - if(result == CURLE_AGAIN) { - if(!complete) { - infof(data, "WS: flush EAGAIN, %zu bytes remain in buffer", - Curl_bufq_len(&ws->sendbuf)); - return result; - } - /* TODO: the current design does not allow for buffered writes. - * We need to flush the buffer now. There is no ws_flush() later */ - n = 0; - continue; - } - else if(result) { - failf(data, "WS: flush, write error %d", result); - return result; - } + if(result == CURLE_AGAIN) { + CURL_TRC_WS(data, "flush EAGAIN, %zu bytes remain in buffer", + Curl_bufq_len(&ws->sendbuf)); + return result; + } + else if(result) { + failf(data, "WS: flush, write error %d", result); + return result; } else { infof(data, "WS: flushed %zu bytes", n); @@ -1050,6 +1050,83 @@ static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws, return CURLE_OK; } +static CURLcode ws_send_raw_blocking(CURL *data, struct websocket *ws, + const char *buffer, size_t buflen) +{ + CURLcode result = CURLE_OK; + size_t nwritten; + + (void)ws; + while(buflen) { + result = Curl_xfer_send(data, buffer, buflen, FALSE, &nwritten); + if(result) + return result; + DEBUGASSERT(nwritten <= buflen); + buffer += nwritten; + buflen -= nwritten; + if(buflen) { + curl_socket_t sock = data->conn->sock[FIRSTSOCKET]; + timediff_t left_ms; + int ev; + + CURL_TRC_WS(data, "ws_send_raw_blocking() partial, %zu left to send", + buflen); + left_ms = Curl_timeleft(data, NULL, FALSE); + if(left_ms < 0) { + failf(data, "Timeout waiting for socket becoming writable"); + return CURLE_SEND_ERROR; + } + + /* POLLOUT socket */ + if(sock == CURL_SOCKET_BAD) + return CURLE_SEND_ERROR; + ev = Curl_socket_check(CURL_SOCKET_BAD, CURL_SOCKET_BAD, sock, + left_ms? left_ms : 500); + if(ev < 0) { + failf(data, "Error while waiting for socket becoming writable"); + return CURLE_SEND_ERROR; + } + } + } + return result; +} + +static CURLcode ws_send_raw(CURL *data, const void *buffer, + size_t buflen, size_t *pnwritten) +{ + struct websocket *ws = data->conn->proto.ws; + CURLcode result; + + if(!ws) { + failf(data, "Not a websocket transfer"); + return CURLE_SEND_ERROR; + } + if(!buflen) + return CURLE_OK; + + if(Curl_is_in_callback(data)) { + /* When invoked from inside callbacks, we do a blocking send as the + * callback will probably not implement partial writes that may then + * mess up the ws framing subsequently. + * We need any pending data to be flushed before sending. */ + result = ws_flush(data, ws, TRUE); + if(result) + return result; + result = ws_send_raw_blocking(data, ws, buffer, buflen); + } + else { + /* We need any pending data to be sent or EAGAIN this call. */ + result = ws_flush(data, ws, FALSE); + if(result) + return result; + result = Curl_senddata(data, buffer, buflen, pnwritten); + } + + CURL_TRC_WS(data, "ws_send_raw(len=%zu) -> %d, %zu", + buflen, result, *pnwritten); + return result; +} + CURL_EXTERN CURLcode curl_ws_send(CURL *data, const void *buffer, size_t buflen, size_t *sent, curl_off_t fragsize, @@ -1057,60 +1134,53 @@ CURL_EXTERN CURLcode curl_ws_send(CURL *data, const void *buffer, { struct websocket *ws; ssize_t n; - size_t nwritten, space; + size_t space, payload_added; CURLcode result; + CURL_TRC_WS(data, "curl_ws_send(len=%zu, fragsize=%" CURL_FORMAT_CURL_OFF_T + ", flags=%x), raw=%d", + buflen, fragsize, flags, data->set.ws_raw_mode); *sent = 0; if(!data->conn && data->set.connect_only) { result = Curl_connect_only_attach(data); if(result) - return result; + goto out; } if(!data->conn) { failf(data, "No associated connection"); - return CURLE_SEND_ERROR; + result = CURLE_SEND_ERROR; + goto out; } if(!data->conn->proto.ws) { failf(data, "Not a websocket transfer"); - return CURLE_SEND_ERROR; + result = CURLE_SEND_ERROR; + goto out; } ws = data->conn->proto.ws; + /* try flushing any content still waiting to be sent. */ + result = ws_flush(data, ws, FALSE); + if(result) + goto out; + if(data->set.ws_raw_mode) { + /* In raw mode, we write directly to the connection */ if(fragsize || flags) { - DEBUGF(infof(data, "ws_send: " - "fragsize and flags cannot be non-zero in raw mode")); + failf(data, "ws_send, raw mode: fragsize and flags cannot be non-zero"); return CURLE_BAD_FUNCTION_ARGUMENT; } - if(!buflen) - /* nothing to do */ - return CURLE_OK; - /* raw mode sends exactly what was requested, and this is from within - the write callback */ - if(Curl_is_in_callback(data)) { - result = Curl_xfer_send(data, buffer, buflen, FALSE, &nwritten); - } - else - result = Curl_senddata(data, buffer, buflen, &nwritten); - - infof(data, "WS: wanted to send %zu bytes, sent %zu bytes", - buflen, nwritten); - *sent = nwritten; - return result; + result = ws_send_raw(data, buffer, buflen, sent); + goto out; } /* Not RAW mode, buf we do the frame encoding */ - result = ws_flush(data, ws, FALSE); - if(result) - return result; - - /* TODO: the current design does not allow partial writes, afaict. - * It is not clear how the application is supposed to react. */ space = Curl_bufq_space(&ws->sendbuf); - DEBUGF(infof(data, "curl_ws_send(len=%zu), sendbuf len=%zu space %zu", - buflen, Curl_bufq_len(&ws->sendbuf), space)); - if(space < 14) - return CURLE_AGAIN; + CURL_TRC_WS(data, "curl_ws_send(len=%zu), sendbuf=%zu space_left=%zu", + buflen, Curl_bufq_len(&ws->sendbuf), space); + if(space < 14) { + result = CURLE_AGAIN; + goto out; + } if(flags & CURLWS_OFFSET) { if(fragsize) { @@ -1118,7 +1188,7 @@ CURL_EXTERN CURLcode curl_ws_send(CURL *data, const void *buffer, n = ws_enc_write_head(data, &ws->enc, flags, fragsize, &ws->sendbuf, &result); if(n < 0) - return result; + goto out; } else { if((curl_off_t)buflen > ws->enc.payload_remain) { @@ -1132,16 +1202,66 @@ CURL_EXTERN CURLcode curl_ws_send(CURL *data, const void *buffer, n = ws_enc_write_head(data, &ws->enc, flags, (curl_off_t)buflen, &ws->sendbuf, &result); if(n < 0) - return result; + goto out; } n = ws_enc_write_payload(&ws->enc, data, buffer, buflen, &ws->sendbuf, &result); if(n < 0) - return result; + goto out; + payload_added = (size_t)n; + + while(!result && (buflen || !Curl_bufq_is_empty(&ws->sendbuf))) { + /* flush, blocking when in callback */ + result = ws_flush(data, ws, Curl_is_in_callback(data)); + if(!result) { + DEBUGASSERT(payload_added <= buflen); + /* all buffered data sent. Try sending the rest if there is any. */ + *sent += payload_added; + buffer = (const char *)buffer + payload_added; + buflen -= payload_added; + payload_added = 0; + if(buflen) { + n = ws_enc_write_payload(&ws->enc, data, + buffer, buflen, &ws->sendbuf, &result); + if(n < 0) + goto out; + payload_added = Curl_bufq_len(&ws->sendbuf); + } + } + else if(result == CURLE_AGAIN) { + /* partially sent. how much of the call data has been part of it? what + * should we report to out caller so it can retry/send the rest? */ + if(payload_added < buflen) { + /* We did not add everything the caller wanted. Return just + * the partial write to our buffer. */ + *sent = payload_added; + result = CURLE_OK; + goto out; + } + else if(!buflen) { + /* We have no payload to report a partial write. EAGAIN would make + * the caller repeat this and add the frame again. + * Flush blocking seems the only way out of this. */ + *sent = (size_t)n; + result = ws_flush(data, ws, TRUE); + goto out; + } + /* We added the complete data to our sendbuf. Report one byte less as + * sent. This parital success should make the caller invoke us again + * with the last byte. */ + *sent = payload_added - 1; + result = Curl_bufq_unwrite(&ws->sendbuf, 1); + if(!result) + result = CURLE_AGAIN; + } + } - *sent = (size_t)n; - return ws_flush(data, ws, TRUE); +out: + CURL_TRC_WS(data, "curl_ws_send(len=%zu, fragsize=%" CURL_FORMAT_CURL_OFF_T + ", flags=%x, raw=%d) -> %d, %zu", + buflen, fragsize, flags, data->set.ws_raw_mode, result, *sent); + return result; } static void ws_free(struct connectdata *conn)