From: Stefan Eissing Date: Tue, 16 Jan 2024 11:06:57 +0000 (+0100) Subject: websockets: refactor decode chain X-Git-Tag: curl-8_6_0~76 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3378d2bd0931433999ef4c0ca95040c394146484;p=thirdparty%2Fcurl.git websockets: refactor decode chain - use client writer stack for decoding frames - move websocket protocol handler to ws.c Closes #12713 --- diff --git a/lib/http.c b/lib/http.c index 14963ca0e4..6cb0a10b80 100644 --- a/lib/http.c +++ b/lib/http.c @@ -100,29 +100,14 @@ * Forward declarations. */ -static int http_getsock_do(struct Curl_easy *data, - struct connectdata *conn, - curl_socket_t *socks); static bool http_should_fail(struct Curl_easy *data); -static CURLcode http_setup_conn(struct Curl_easy *data, - struct connectdata *conn); -static CURLcode http_write_resp(struct Curl_easy *data, - const char *buf, size_t blen, - bool is_eos, - bool *done); - -#ifdef USE_WEBSOCKETS -static CURLcode ws_setup_conn(struct Curl_easy *data, - struct connectdata *conn); -#endif - /* * HTTP handler interface. */ const struct Curl_handler Curl_handler_http = { "HTTP", /* scheme */ - http_setup_conn, /* setup_connection */ + Curl_http_setup_conn, /* setup_connection */ Curl_http, /* do_it */ Curl_http_done, /* done */ ZERO_NULL, /* do_more */ @@ -130,11 +115,11 @@ const struct Curl_handler Curl_handler_http = { ZERO_NULL, /* connecting */ ZERO_NULL, /* doing */ ZERO_NULL, /* proto_getsock */ - http_getsock_do, /* doing_getsock */ + Curl_http_getsock_do, /* doing_getsock */ ZERO_NULL, /* domore_getsock */ ZERO_NULL, /* perform_getsock */ ZERO_NULL, /* disconnect */ - http_write_resp, /* write_resp */ + Curl_http_write_resp, /* write_resp */ ZERO_NULL, /* connection_check */ ZERO_NULL, /* attach connection */ PORT_HTTP, /* defport */ @@ -144,39 +129,13 @@ const struct Curl_handler Curl_handler_http = { PROTOPT_USERPWDCTRL }; -#ifdef USE_WEBSOCKETS -const struct Curl_handler Curl_handler_ws = { - "WS", /* scheme */ - ws_setup_conn, /* setup_connection */ - Curl_http, /* do_it */ - Curl_http_done, /* done */ - ZERO_NULL, /* do_more */ - Curl_http_connect, /* connect_it */ - ZERO_NULL, /* connecting */ - ZERO_NULL, /* doing */ - ZERO_NULL, /* proto_getsock */ - http_getsock_do, /* doing_getsock */ - ZERO_NULL, /* domore_getsock */ - ZERO_NULL, /* perform_getsock */ - Curl_ws_disconnect, /* disconnect */ - http_write_resp, /* write_resp */ - ZERO_NULL, /* connection_check */ - ZERO_NULL, /* attach connection */ - PORT_HTTP, /* defport */ - CURLPROTO_WS, /* protocol */ - CURLPROTO_HTTP, /* family */ - PROTOPT_CREDSPERREQUEST | /* flags */ - PROTOPT_USERPWDCTRL -}; -#endif - #ifdef USE_SSL /* * HTTPS handler interface. */ const struct Curl_handler Curl_handler_https = { "HTTPS", /* scheme */ - http_setup_conn, /* setup_connection */ + Curl_http_setup_conn, /* setup_connection */ Curl_http, /* do_it */ Curl_http_done, /* done */ ZERO_NULL, /* do_more */ @@ -184,11 +143,11 @@ const struct Curl_handler Curl_handler_https = { NULL, /* connecting */ ZERO_NULL, /* doing */ NULL, /* proto_getsock */ - http_getsock_do, /* doing_getsock */ + Curl_http_getsock_do, /* doing_getsock */ ZERO_NULL, /* domore_getsock */ ZERO_NULL, /* perform_getsock */ ZERO_NULL, /* disconnect */ - http_write_resp, /* write_resp */ + Curl_http_write_resp, /* write_resp */ ZERO_NULL, /* connection_check */ ZERO_NULL, /* attach connection */ PORT_HTTPS, /* defport */ @@ -198,36 +157,10 @@ const struct Curl_handler Curl_handler_https = { PROTOPT_USERPWDCTRL }; -#ifdef USE_WEBSOCKETS -const struct Curl_handler Curl_handler_wss = { - "WSS", /* scheme */ - ws_setup_conn, /* setup_connection */ - Curl_http, /* do_it */ - Curl_http_done, /* done */ - ZERO_NULL, /* do_more */ - Curl_http_connect, /* connect_it */ - NULL, /* connecting */ - ZERO_NULL, /* doing */ - NULL, /* proto_getsock */ - http_getsock_do, /* doing_getsock */ - ZERO_NULL, /* domore_getsock */ - ZERO_NULL, /* perform_getsock */ - Curl_ws_disconnect, /* disconnect */ - http_write_resp, /* write_resp */ - ZERO_NULL, /* connection_check */ - ZERO_NULL, /* attach connection */ - PORT_HTTPS, /* defport */ - CURLPROTO_WSS, /* protocol */ - CURLPROTO_HTTP, /* family */ - PROTOPT_SSL | PROTOPT_CREDSPERREQUEST | /* flags */ - PROTOPT_USERPWDCTRL -}; -#endif - #endif -static CURLcode http_setup_conn(struct Curl_easy *data, - struct connectdata *conn) +CURLcode Curl_http_setup_conn(struct Curl_easy *data, + struct connectdata *conn) { /* allocate the HTTP-specific struct for the Curl_easy, only to survive during this request */ @@ -250,16 +183,6 @@ static CURLcode http_setup_conn(struct Curl_easy *data, return CURLE_OK; } -#ifdef USE_WEBSOCKETS -static CURLcode ws_setup_conn(struct Curl_easy *data, - struct connectdata *conn) -{ - /* websockets is 1.1 only (for now) */ - data->state.httpwant = CURL_HTTP_VERSION_1_1; - return http_setup_conn(data, conn); -} -#endif - #ifndef CURL_DISABLE_PROXY /* * checkProxyHeaders() checks the linked list of custom proxy headers @@ -1594,9 +1517,9 @@ CURLcode Curl_http_connect(struct Curl_easy *data, bool *done) /* this returns the socket to wait for in the DO and DOING state for the multi interface and then we're always _sending_ a request and thus we wait for the single socket to become writable only */ -static int http_getsock_do(struct Curl_easy *data, - struct connectdata *conn, - curl_socket_t *socks) +int Curl_http_getsock_do(struct Curl_easy *data, + struct connectdata *conn, + curl_socket_t *socks) { /* write mode */ (void)conn; @@ -4133,11 +4056,10 @@ static CURLcode http_rw_headers(struct Curl_easy *data, if(result) return result; k->header = FALSE; /* no more header to parse! */ - if(data->set.connect_only) { + *pconsumed += blen; /* ws accept handled the data */ + blen = 0; + if(data->set.connect_only) k->keepon &= ~KEEP_RECV; /* read no more content */ - *pconsumed += blen; - blen = 0; - } } #endif else { @@ -4611,10 +4533,10 @@ CURLcode Curl_http_write_resp_hds(struct Curl_easy *data, } } -static CURLcode http_write_resp(struct Curl_easy *data, - const char *buf, size_t blen, - bool is_eos, - bool *done) +CURLcode Curl_http_write_resp(struct Curl_easy *data, + const char *buf, size_t blen, + bool is_eos, + bool *done) { CURLcode result; size_t consumed; diff --git a/lib/http.h b/lib/http.h index 8116ec1aac..ad2697c9e7 100644 --- a/lib/http.h +++ b/lib/http.h @@ -54,14 +54,6 @@ extern const struct Curl_handler Curl_handler_http; extern const struct Curl_handler Curl_handler_https; #endif -#ifdef USE_WEBSOCKETS -extern const struct Curl_handler Curl_handler_ws; - -#ifdef USE_SSL -extern const struct Curl_handler Curl_handler_wss; -#endif -#endif /* websockets */ - struct dynhds; CURLcode Curl_bump_headersize(struct Curl_easy *data, @@ -147,9 +139,17 @@ CURLcode Curl_http_firstwrite(struct Curl_easy *data, bool *done); /* protocol-specific functions set up to be called by the main engine */ +CURLcode Curl_http_setup_conn(struct Curl_easy *data, + struct connectdata *conn); CURLcode Curl_http(struct Curl_easy *data, bool *done); CURLcode Curl_http_done(struct Curl_easy *data, CURLcode, bool premature); CURLcode Curl_http_connect(struct Curl_easy *data, bool *done); +int Curl_http_getsock_do(struct Curl_easy *data, struct connectdata *conn, + curl_socket_t *socks); +CURLcode Curl_http_write_resp(struct Curl_easy *data, + const char *buf, size_t blen, + bool is_eos, + bool *done); /* These functions are in http.c */ CURLcode Curl_http_input_auth(struct Curl_easy *data, bool proxy, diff --git a/lib/sendf.c b/lib/sendf.c index 2a2dfce8e8..db3189a298 100644 --- a/lib/sendf.c +++ b/lib/sendf.c @@ -296,13 +296,6 @@ static CURLcode chop_write(struct Curl_easy *data, if(!skip_body_write && ((type & CLIENTWRITE_BODY) || ((type & CLIENTWRITE_HEADER) && data->set.include_header))) { -#ifdef USE_WEBSOCKETS - if(conn->handler->protocol & (CURLPROTO_WS|CURLPROTO_WSS)) { - writebody = Curl_ws_writecb; - writebody_ptr = data; - } - else -#endif writebody = data->set.fwrite_func; } if((type & (CLIENTWRITE_HEADER|CLIENTWRITE_INFO)) && diff --git a/lib/ws.c b/lib/ws.c index 78c1404699..d9765182d9 100644 --- a/lib/ws.c +++ b/lib/ws.c @@ -24,7 +24,7 @@ #include "curl_setup.h" #include -#ifdef USE_WEBSOCKETS +#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP) #include "urldata.h" #include "bufq.h" @@ -354,6 +354,136 @@ static void update_meta(struct websocket *ws, ws->frame.bytesleft = (payload_len - payload_offset - cur_len); } +/* WebSockets decoding client writer */ +struct ws_cw_ctx { + struct Curl_cwriter super; + struct bufq buf; +}; + +static CURLcode ws_cw_init(struct Curl_easy *data, + struct Curl_cwriter *writer) +{ + struct ws_cw_ctx *ctx = (struct ws_cw_ctx *)writer; + (void)data; + Curl_bufq_init2(&ctx->buf, WS_CHUNK_SIZE, 1, BUFQ_OPT_SOFT_LIMIT); + return CURLE_OK; +} + +static void ws_cw_close(struct Curl_easy *data, struct Curl_cwriter *writer) +{ + struct ws_cw_ctx *ctx = (struct ws_cw_ctx *)writer; + (void) data; + Curl_bufq_free(&ctx->buf); +} + +struct ws_cw_dec_ctx { + struct Curl_easy *data; + struct websocket *ws; + struct Curl_cwriter *next_writer; + int cw_type; +}; + +static ssize_t ws_cw_dec_next(const unsigned char *buf, size_t buflen, + int frame_age, int frame_flags, + curl_off_t payload_offset, + curl_off_t payload_len, + void *user_data, + CURLcode *err) +{ + struct ws_cw_dec_ctx *ctx = user_data; + struct Curl_easy *data = ctx->data; + struct websocket *ws = ctx->ws; + curl_off_t remain = (payload_len - (payload_offset + buflen)); + + (void)frame_age; + if((frame_flags & CURLWS_PING) && !remain) { + /* auto-respond to PINGs, only works for single-frame payloads atm */ + size_t bytes; + infof(data, "WS: auto-respond to PING with a PONG"); + /* send back the exact same content as a PONG */ + *err = curl_ws_send(data, buf, buflen, &bytes, 0, CURLWS_PONG); + if(*err) + return -1; + } + else if(buflen || !remain) { + /* forward the decoded frame to the next client writer. */ + update_meta(ws, frame_age, frame_flags, payload_offset, + payload_len, buflen); + + *err = Curl_cwriter_write(data, ctx->next_writer, ctx->cw_type, + (const char *)buf, buflen); + if(*err) + return -1; + } + *err = CURLE_OK; + return (ssize_t)buflen; +} + +static CURLcode ws_cw_write(struct Curl_easy *data, + struct Curl_cwriter *writer, int type, + const char *buf, size_t nbytes) +{ + struct ws_cw_ctx *ctx = (struct ws_cw_ctx *)writer; + struct websocket *ws; + CURLcode result; + + if(!(type & CLIENTWRITE_BODY) || data->set.ws_raw_mode) + return Curl_cwriter_write(data, writer->next, type, buf, nbytes); + + ws = data->conn->proto.ws; + if(!ws) { + failf(data, "WS: not a websocket transfer"); + return CURLE_FAILED_INIT; + } + + if(nbytes) { + ssize_t nwritten; + nwritten = Curl_bufq_write(&ctx->buf, (const unsigned char *)buf, + nbytes, &result); + if(nwritten < 0) { + infof(data, "WS: error adding data to buffer %d", result); + return result; + } + } + + while(!Curl_bufq_is_empty(&ctx->buf)) { + struct ws_cw_dec_ctx pass_ctx; + pass_ctx.data = data; + pass_ctx.ws = ws; + pass_ctx.next_writer = writer->next; + pass_ctx.cw_type = type; + result = ws_dec_pass(&ws->dec, data, &ctx->buf, + ws_cw_dec_next, &pass_ctx); + if(result == CURLE_AGAIN) + /* insufficient amount of data, keep it for later. + * we pretend to have written all since we have a copy */ + return CURLE_OK; + else if(result) { + infof(data, "WS: decode error %d", (int)result); + return result; + } + } + + if((type & CLIENTWRITE_EOS) && !Curl_bufq_is_empty(&ctx->buf)) { + infof(data, "WS: decode ending with %zd frame bytes remaining", + Curl_bufq_len(&ctx->buf)); + return CURLE_RECV_ERROR; + } + + return CURLE_OK; +} + +/* WebSocket payload decoding client writer. */ +static const struct Curl_cwtype ws_cw_decode = { + "ws-decode", + NULL, + ws_cw_init, + ws_cw_write, + ws_cw_close, + sizeof(struct ws_cw_ctx) +}; + + static void ws_enc_info(struct ws_encoder *enc, struct Curl_easy *data, const char *msg) { @@ -618,6 +748,7 @@ CURLcode Curl_ws_accept(struct Curl_easy *data, { struct SingleRequest *k = &data->req; struct websocket *ws; + struct Curl_cwriter *ws_dec_writer; CURLcode result; DEBUGASSERT(data->conn); @@ -627,7 +758,8 @@ CURLcode Curl_ws_accept(struct Curl_easy *data, if(!ws) return CURLE_OUT_OF_MEMORY; data->conn->proto.ws = ws; - Curl_bufq_init(&ws->recvbuf, WS_CHUNK_SIZE, WS_CHUNK_COUNT); + Curl_bufq_init2(&ws->recvbuf, WS_CHUNK_SIZE, WS_CHUNK_COUNT, + BUFQ_OPT_SOFT_LIMIT); Curl_bufq_init2(&ws->sendbuf, WS_CHUNK_SIZE, WS_CHUNK_COUNT, BUFQ_OPT_SOFT_LIMIT); ws_dec_init(&ws->dec); @@ -666,6 +798,18 @@ CURLcode Curl_ws_accept(struct Curl_easy *data, infof(data, "Received 101, switch to WebSocket; mask %02x%02x%02x%02x", ws->enc.mask[0], ws->enc.mask[1], ws->enc.mask[2], ws->enc.mask[3]); + /* Install our client writer that decodes WS frames payload */ + result = Curl_cwriter_create(&ws_dec_writer, data, &ws_cw_decode, + CURL_CW_CONTENT_DECODE); + if(result) + return result; + + result = Curl_cwriter_add(data, ws_dec_writer); + if(result) { + Curl_cwriter_free(data, ws_dec_writer); + return result; + } + if(data->set.connect_only) { ssize_t nwritten; /* In CONNECT_ONLY setup, the payloads from `mem` need to be received @@ -677,105 +821,15 @@ CURLcode Curl_ws_accept(struct Curl_easy *data, return result; infof(data, "%zu bytes websocket payload", nread); } - k->upgr101 = UPGR101_RECEIVED; - - return result; -} - -static ssize_t ws_client_write(const unsigned char *buf, size_t buflen, - int frame_age, int frame_flags, - curl_off_t payload_offset, - curl_off_t payload_len, - void *userp, - CURLcode *err) -{ - struct Curl_easy *data = userp; - struct websocket *ws; - size_t wrote; - curl_off_t remain = (payload_len - (payload_offset + buflen)); - - (void)frame_age; - if(!data->conn || !data->conn->proto.ws) { - *err = CURLE_FAILED_INIT; - return -1; - } - ws = data->conn->proto.ws; - - if((frame_flags & CURLWS_PING) && !remain) { - /* auto-respond to PINGs, only works for single-frame payloads atm */ - size_t bytes; - infof(data, "WS: auto-respond to PING with a PONG"); - /* send back the exact same content as a PONG */ - *err = curl_ws_send(data, buf, buflen, &bytes, 0, CURLWS_PONG); - if(*err) - return -1; - } - else if(buflen || !remain) { - /* deliver the decoded frame to the user callback. The application - * may invoke curl_ws_meta() to access frame information. */ - update_meta(ws, frame_age, frame_flags, payload_offset, - payload_len, buflen); - Curl_set_in_callback(data, true); - wrote = data->set.fwrite_func((char *)buf, 1, - buflen, data->set.out); - Curl_set_in_callback(data, false); - if(wrote != buflen) { - *err = CURLE_RECV_ERROR; - return -1; + else { /* !connect_only */ + /* And pass any additional data to the writers */ + if(nread) { + result = Curl_client_write(data, CLIENTWRITE_BODY, (char *)mem, nread); } } - *err = CURLE_OK; - return (ssize_t)buflen; -} - -/* Curl_ws_writecb() is the write callback for websocket traffic. The - websocket data is provided to this raw, in chunks. This function should - handle/decode the data and call the "real" underlying callback accordingly. -*/ -size_t Curl_ws_writecb(char *buffer, size_t size /* 1 */, - size_t nitems, void *userp) -{ - struct Curl_easy *data = userp; - - if(data->set.ws_raw_mode) - return data->set.fwrite_func(buffer, size, nitems, data->set.out); - else if(nitems) { - struct websocket *ws; - CURLcode result; - - if(!data->conn || !data->conn->proto.ws) { - failf(data, "WS: not a websocket transfer"); - return nitems - 1; - } - ws = data->conn->proto.ws; - - if(buffer) { - ssize_t nwritten; - - nwritten = Curl_bufq_write(&ws->recvbuf, (const unsigned char *)buffer, - nitems, &result); - if(nwritten < 0) { - infof(data, "WS: error adding data to buffer %d", (int)result); - return nitems - 1; - } - buffer = NULL; - } - - while(!Curl_bufq_is_empty(&ws->recvbuf)) { + k->upgr101 = UPGR101_RECEIVED; - result = ws_dec_pass(&ws->dec, data, &ws->recvbuf, - ws_client_write, data); - if(result == CURLE_AGAIN) - /* insufficient amount of data, keep it for later. - * we pretend to have written all since we have a copy */ - return nitems; - else if(result) { - infof(data, "WS: decode error %d", (int)result); - return nitems - 1; - } - } - } - return nitems; + return result; } struct ws_collect { @@ -1085,14 +1139,23 @@ static void ws_free(struct connectdata *conn) } } +static CURLcode ws_setup_conn(struct Curl_easy *data, + struct connectdata *conn) +{ + /* websockets is 1.1 only (for now) */ + data->state.httpwant = CURL_HTTP_VERSION_1_1; + return Curl_http_setup_conn(data, conn); +} + + void Curl_ws_done(struct Curl_easy *data) { (void)data; } -CURLcode Curl_ws_disconnect(struct Curl_easy *data, - struct connectdata *conn, - bool dead_connection) +static CURLcode ws_disconnect(struct Curl_easy *data, + struct connectdata *conn, + bool dead_connection) { (void)data; (void)dead_connection; @@ -1110,6 +1173,57 @@ CURL_EXTERN const struct curl_ws_frame *curl_ws_meta(struct Curl_easy *data) return NULL; } +const struct Curl_handler Curl_handler_ws = { + "WS", /* scheme */ + ws_setup_conn, /* setup_connection */ + Curl_http, /* do_it */ + Curl_http_done, /* done */ + ZERO_NULL, /* do_more */ + Curl_http_connect, /* connect_it */ + ZERO_NULL, /* connecting */ + ZERO_NULL, /* doing */ + ZERO_NULL, /* proto_getsock */ + Curl_http_getsock_do, /* doing_getsock */ + ZERO_NULL, /* domore_getsock */ + ZERO_NULL, /* perform_getsock */ + ws_disconnect, /* disconnect */ + Curl_http_write_resp, /* write_resp */ + ZERO_NULL, /* connection_check */ + ZERO_NULL, /* attach connection */ + PORT_HTTP, /* defport */ + CURLPROTO_WS, /* protocol */ + CURLPROTO_HTTP, /* family */ + PROTOPT_CREDSPERREQUEST | /* flags */ + PROTOPT_USERPWDCTRL +}; + +#ifdef USE_SSL +const struct Curl_handler Curl_handler_wss = { + "WSS", /* scheme */ + ws_setup_conn, /* setup_connection */ + Curl_http, /* do_it */ + Curl_http_done, /* done */ + ZERO_NULL, /* do_more */ + Curl_http_connect, /* connect_it */ + NULL, /* connecting */ + ZERO_NULL, /* doing */ + NULL, /* proto_getsock */ + Curl_http_getsock_do, /* doing_getsock */ + ZERO_NULL, /* domore_getsock */ + ZERO_NULL, /* perform_getsock */ + ws_disconnect, /* disconnect */ + Curl_http_write_resp, /* write_resp */ + ZERO_NULL, /* connection_check */ + ZERO_NULL, /* attach connection */ + PORT_HTTPS, /* defport */ + CURLPROTO_WSS, /* protocol */ + CURLPROTO_HTTP, /* family */ + PROTOPT_SSL | PROTOPT_CREDSPERREQUEST | /* flags */ + PROTOPT_USERPWDCTRL +}; +#endif + + #else CURL_EXTERN CURLcode curl_ws_recv(CURL *curl, void *buffer, size_t buflen, diff --git a/lib/ws.h b/lib/ws.h index 0308a42545..5f40d4528b 100644 --- a/lib/ws.h +++ b/lib/ws.h @@ -25,7 +25,7 @@ ***************************************************************************/ #include "curl_setup.h" -#ifdef USE_WEBSOCKETS +#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP) #ifdef USE_HYPER #define REQTYPE void @@ -75,11 +75,14 @@ struct websocket { CURLcode Curl_ws_request(struct Curl_easy *data, REQTYPE *req); CURLcode Curl_ws_accept(struct Curl_easy *data, const char *mem, size_t len); -size_t Curl_ws_writecb(char *buffer, size_t size, size_t nitems, void *userp); void Curl_ws_done(struct Curl_easy *data); -CURLcode Curl_ws_disconnect(struct Curl_easy *data, - struct connectdata *conn, - bool dead_connection); + +extern const struct Curl_handler Curl_handler_ws; +#ifdef USE_SSL +extern const struct Curl_handler Curl_handler_wss; +#endif + + #else #define Curl_ws_request(x,y) CURLE_OK #define Curl_ws_done(x) Curl_nop_stmt