From: Stefan Eissing Date: Wed, 10 Apr 2024 12:52:34 +0000 (+0200) Subject: cw-out: improved error handling X-Git-Tag: curl-8_8_0~219 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=270a25c011367f6300d5c01497fb1f23527458f4;p=thirdparty%2Fcurl.git cw-out: improved error handling - remember error encountered in invoking write callback and always fail afterwards without further invokes - check behaviour in test_02_17 with h2-pausing client Reported-by: Pavel Kropachev Fixes #13337 Closes #13340 --- diff --git a/lib/cw-out.c b/lib/cw-out.c index 07172b6151..4e56c6a1bb 100644 --- a/lib/cw-out.c +++ b/lib/cw-out.c @@ -102,6 +102,8 @@ static void cw_out_buf_free(struct cw_out_buf *cwbuf) struct cw_out_ctx { struct Curl_cwriter super; struct cw_out_buf *buf; + BIT(paused); + BIT(errored); }; static CURLcode cw_out_write(struct Curl_easy *data, @@ -201,7 +203,10 @@ static CURLcode cw_out_ptr_flush(struct cw_out_ctx *ctx, size_t max_write, min_write; size_t wlen, nwritten; - (void)ctx; + /* If we errored once, we do not invoke the client callback again */ + if(ctx->errored) + return CURLE_WRITE_ERROR; + /* write callbacks may get NULLed by the client between calls. */ cw_get_writefunc(data, otype, &wcb, &wcb_data, &max_write, &min_write); if(!wcb) { @@ -210,7 +215,7 @@ static CURLcode cw_out_ptr_flush(struct cw_out_ctx *ctx, } *pconsumed = 0; - while(blen && !(data->req.keepon & KEEP_RECV_PAUSE)) { + while(blen && !ctx->paused) { if(!flush_all && blen < min_write) break; wlen = max_write? CURLMIN(blen, max_write) : blen; @@ -230,10 +235,15 @@ static CURLcode cw_out_ptr_flush(struct cw_out_ctx *ctx, } /* mark the connection as RECV paused */ data->req.keepon |= KEEP_RECV_PAUSE; + ctx->paused = TRUE; CURL_TRC_WRITE(data, "cw_out, PAUSE requested by client"); break; } - if(nwritten != wlen) { + else if(CURL_WRITEFUNC_ERROR == nwritten) { + failf(data, "client returned ERROR on write of %zu bytes", wlen); + return CURLE_WRITE_ERROR; + } + else if(nwritten != wlen) { failf(data, "Failure writing output to destination, " "passed %zu returned %zd", wlen, nwritten); return CURLE_WRITE_ERROR; @@ -287,7 +297,7 @@ static CURLcode cw_out_flush_chain(struct cw_out_ctx *ctx, if(!cwbuf) return CURLE_OK; - if(data->req.keepon & KEEP_RECV_PAUSE) + if(ctx->paused) return CURLE_OK; /* write the end of the chain until it blocks or gets empty */ @@ -300,7 +310,7 @@ static CURLcode cw_out_flush_chain(struct cw_out_ctx *ctx, return result; if(*plast) { /* could not write last, paused again? */ - DEBUGASSERT(data->req.keepon & KEEP_RECV_PAUSE); + DEBUGASSERT(ctx->paused); return CURLE_OK; } } @@ -342,14 +352,14 @@ static CURLcode cw_out_do_write(struct cw_out_ctx *ctx, bool flush_all, const char *buf, size_t blen) { - CURLcode result; + CURLcode result = CURLE_OK; /* if we have buffered data and it is a different type than what * we are writing now, try to flush all */ if(ctx->buf && ctx->buf->type != otype) { result = cw_out_flush_chain(ctx, data, &ctx->buf, TRUE); if(result) - return result; + goto out; } if(ctx->buf) { @@ -359,7 +369,7 @@ static CURLcode cw_out_do_write(struct cw_out_ctx *ctx, return result; result = cw_out_flush_chain(ctx, data, &ctx->buf, flush_all); if(result) - return result; + goto out; } else { /* nothing buffered, try direct write */ @@ -372,10 +382,18 @@ static CURLcode cw_out_do_write(struct cw_out_ctx *ctx, /* did not write all, append the rest */ result = cw_out_append(ctx, otype, buf + consumed, blen - consumed); if(result) - return result; + goto out; } } - return CURLE_OK; + +out: + if(result) { + /* We do not want to invoked client callbacks a second time after + * encountering an error. See issue #13337 */ + ctx->errored = TRUE; + cw_out_bufs_free(ctx); + } + return result; } static CURLcode cw_out_write(struct Curl_easy *data, @@ -413,10 +431,12 @@ bool Curl_cw_out_is_paused(struct Curl_easy *data) return FALSE; ctx = (struct cw_out_ctx *)cw_out; - return cw_out_bufs_len(ctx) > 0; + CURL_TRC_WRITE(data, "cw-out is%spaused", ctx->paused? "" : " not"); + return ctx->paused; } -static CURLcode cw_out_flush(struct Curl_easy *data, bool flush_all) +static CURLcode cw_out_flush(struct Curl_easy *data, + bool unpause, bool flush_all) { struct Curl_cwriter *cw_out; CURLcode result = CURLE_OK; @@ -424,18 +444,31 @@ static CURLcode cw_out_flush(struct Curl_easy *data, bool flush_all) cw_out = Curl_cwriter_get_by_type(data, &Curl_cwt_out); if(cw_out) { struct cw_out_ctx *ctx = (struct cw_out_ctx *)cw_out; + if(ctx->errored) + return CURLE_WRITE_ERROR; + if(unpause && ctx->paused) + ctx->paused = FALSE; + if(ctx->paused) + return CURLE_OK; /* not doing it */ result = cw_out_flush_chain(ctx, data, &ctx->buf, flush_all); + if(result) { + ctx->errored = TRUE; + cw_out_bufs_free(ctx); + return result; + } } return result; } -CURLcode Curl_cw_out_flush(struct Curl_easy *data) +CURLcode Curl_cw_out_unpause(struct Curl_easy *data) { - return cw_out_flush(data, FALSE); + CURL_TRC_WRITE(data, "cw-out unpause"); + return cw_out_flush(data, TRUE, FALSE); } CURLcode Curl_cw_out_done(struct Curl_easy *data) { - return cw_out_flush(data, TRUE); + CURL_TRC_WRITE(data, "cw-out done"); + return cw_out_flush(data, FALSE, TRUE); } diff --git a/lib/cw-out.h b/lib/cw-out.h index c13e85380b..ca4c2e435d 100644 --- a/lib/cw-out.h +++ b/lib/cw-out.h @@ -43,7 +43,7 @@ bool Curl_cw_out_is_paused(struct Curl_easy *data); /** * Flush any buffered date to the client, chunk collation still applies. */ -CURLcode Curl_cw_out_flush(struct Curl_easy *data); +CURLcode Curl_cw_out_unpause(struct Curl_easy *data); /** * Mark EndOfStream reached and flush ALL data to the client. diff --git a/lib/easy.c b/lib/easy.c index 30fd6ca89a..f4f4d2cc63 100644 --- a/lib/easy.c +++ b/lib/easy.c @@ -58,7 +58,6 @@ #include "multiif.h" #include "select.h" #include "cfilters.h" -#include "cw-out.h" #include "sendf.h" /* for failf function prototype */ #include "connect.h" /* for Curl_getconnectinfo */ #include "slist.h" @@ -1086,6 +1085,7 @@ CURLcode curl_easy_pause(struct Curl_easy *data, int action) int oldstate; int newstate; bool recursive = FALSE; + bool keep_changed, unpause_read, not_all_paused; if(!GOOD_EASY_HANDLE(data) || !data->conn) /* crazy input, don't continue */ @@ -1101,51 +1101,47 @@ CURLcode curl_easy_pause(struct Curl_easy *data, int action) ((action & CURLPAUSE_RECV)?KEEP_RECV_PAUSE:0) | ((action & CURLPAUSE_SEND)?KEEP_SEND_PAUSE:0); - if((newstate & (KEEP_RECV_PAUSE| KEEP_SEND_PAUSE)) == oldstate) { - /* Not changing any pause state, return */ - DEBUGF(infof(data, "pause: no change, early return")); - return CURLE_OK; - } - - /* Unpause parts in active mime tree. */ - if((k->keepon & ~newstate & KEEP_SEND_PAUSE) && - (data->mstate == MSTATE_PERFORMING || - data->mstate == MSTATE_RATELIMITING)) { - result = Curl_creader_unpause(data); - if(result) - return result; - } - - /* put it back in the keepon */ + keep_changed = ((newstate & (KEEP_RECV_PAUSE| KEEP_SEND_PAUSE)) != oldstate); + not_all_paused = (newstate & (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE)) != + (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE); + unpause_read = ((k->keepon & ~newstate & KEEP_SEND_PAUSE) && + (data->mstate == MSTATE_PERFORMING || + data->mstate == MSTATE_RATELIMITING)); + /* Unpausing writes is detected on the next run in + * transfer.c:Curl_readwrite(). This is because this may result + * in a transfer error if the application's callbacks fail */ + + /* Set the new keepon state, so it takes effect no matter what error + * may happen afterwards. */ k->keepon = newstate; - if(!(newstate & KEEP_RECV_PAUSE)) { - Curl_conn_ev_data_pause(data, FALSE); - result = Curl_cw_out_flush(data); - if(result) - return result; - } - - /* if there's no error and we're not pausing both directions, we want - to have this handle checked soon */ - if((newstate & (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE)) != - (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE)) { - Curl_expire(data, 0, EXPIRE_RUN_NOW); /* get this handle going again */ - + /* If not completely pausing both directions now, run again in any case. */ + if(not_all_paused) { + Curl_expire(data, 0, EXPIRE_RUN_NOW); /* reset the too-slow time keeper */ data->state.keeps_speed.tv_sec = 0; - - if(!Curl_cw_out_is_paused(data)) - /* if not pausing again, force a recv/send check of this connection as - the data might've been read off the socket already */ - data->state.select_bits = CURL_CSELECT_IN | CURL_CSELECT_OUT; - if(data->multi) { - if(Curl_update_timer(data->multi)) - return CURLE_ABORTED_BY_CALLBACK; + /* Simulate socket events on next run for unpaused directions */ + if(!(newstate & KEEP_SEND_PAUSE)) + data->state.select_bits |= CURL_CSELECT_OUT; + if(!(newstate & KEEP_RECV_PAUSE)) + data->state.select_bits |= CURL_CSELECT_IN; + /* On changes, tell application to update its timers. */ + if(keep_changed && data->multi) { + if(Curl_update_timer(data->multi)) { + result = CURLE_ABORTED_BY_CALLBACK; + goto out; + } } } - if(!data->state.done) + if(unpause_read) { + result = Curl_creader_unpause(data); + if(result) + goto out; + } + +out: + if(!result && !data->state.done && keep_changed) /* This transfer may have been moved in or out of the bundle, update the corresponding socket callback, if used */ result = Curl_updatesocket(data); diff --git a/lib/multi.c b/lib/multi.c index 63cdd11af2..d9094ae3f2 100644 --- a/lib/multi.c +++ b/lib/multi.c @@ -2521,7 +2521,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, Curl_posttransfer(data); multi_done(data, result, TRUE); } - else if(data->req.done) { + else if(data->req.done && !Curl_cwriter_is_paused(data)) { /* call this even if the readwrite function returned error */ Curl_posttransfer(data); diff --git a/lib/sendf.c b/lib/sendf.c index 7e099a2d15..7b00c4ce48 100644 --- a/lib/sendf.c +++ b/lib/sendf.c @@ -506,6 +506,16 @@ void Curl_cwriter_remove_by_name(struct Curl_easy *data, } } +bool Curl_cwriter_is_paused(struct Curl_easy *data) +{ + return Curl_cw_out_is_paused(data); +} + +CURLcode Curl_cwriter_unpause(struct Curl_easy *data) +{ + return Curl_cw_out_unpause(data); +} + CURLcode Curl_creader_read(struct Curl_easy *data, struct Curl_creader *reader, char *buf, size_t blen, size_t *nread, bool *eos) diff --git a/lib/sendf.h b/lib/sendf.h index d736ce44ad..3838f876eb 100644 --- a/lib/sendf.h +++ b/lib/sendf.h @@ -180,6 +180,16 @@ CURLcode Curl_cwriter_write(struct Curl_easy *data, struct Curl_cwriter *writer, int type, const char *buf, size_t nbytes); +/** + * Return TRUE iff client writer is paused. + */ +bool Curl_cwriter_is_paused(struct Curl_easy *data); + +/** + * Unpause client writer and flush any buffered date to the client. + */ +CURLcode Curl_cwriter_unpause(struct Curl_easy *data); + /** * Default implementations for do_init, do_write, do_close that * do nothing and pass the data through. diff --git a/lib/transfer.c b/lib/transfer.c index 4b32b53e99..8c7e33dac5 100644 --- a/lib/transfer.c +++ b/lib/transfer.c @@ -272,7 +272,7 @@ static CURLcode readwrite_data(struct Curl_easy *data, DEBUGF(infof(data, "nread == 0, stream closed, bailing")); else DEBUGF(infof(data, "nread <= 0, server closed connection, bailing")); - k->keepon = 0; /* stop sending as well */ + k->keepon &= ~(KEEP_RECV|KEEP_SEND); /* stop sending as well */ if(k->eos_written) /* already did write this to client, leave */ break; } @@ -409,6 +409,14 @@ CURLcode Curl_readwrite(struct Curl_easy *data) int didwhat = 0; int select_bits; + /* Check if client writes had been paused and can resume now. */ + if(!(k->keepon & KEEP_RECV_PAUSE) && Curl_cwriter_is_paused(data)) { + Curl_conn_ev_data_pause(data, FALSE); + result = Curl_cwriter_unpause(data); + if(result) + goto out; + } + if(data->state.select_bits) { if(select_bits_paused(data, data->state.select_bits)) { /* leave the bits unchanged, so they'll tell us what to do when diff --git a/tests/http/clients/h2-pausing.c b/tests/http/clients/h2-pausing.c index 40ae361f1b..c12e8fab00 100644 --- a/tests/http/clients/h2-pausing.c +++ b/tests/http/clients/h2-pausing.c @@ -27,6 +27,7 @@ */ /* This is based on the poc client of issue #11982 */ +#include #include #include #include @@ -141,11 +142,24 @@ static int err(void) exit(2); } +static void usage(const char *msg) +{ + if(msg) + fprintf(stderr, "%s\n", msg); + fprintf(stderr, + "usage: [options] url\n" + " pause downloads with following options:\n" + " -V http_version (http/1.1, h2, h3) http version to use\n" + ); +} + struct handle { int idx; int paused; int resumed; + int errored; + int fail_write; CURL *h; }; @@ -165,8 +179,15 @@ static size_t cb(void *data, size_t size, size_t nmemb, void *clientp) ++handle->paused; fprintf(stderr, "INFO: [%d] write, PAUSING %d time on %lu bytes\n", handle->idx, handle->paused, (long)realsize); + assert(handle->paused == 1); return CURL_WRITEFUNC_PAUSE; } + if(handle->fail_write) { + ++handle->errored; + fprintf(stderr, "INFO: [%d] FAIL write of %lu bytes, %d time\n", + handle->idx, (long)realsize, handle->errored); + return CURL_WRITEFUNC_ERROR; + } fprintf(stderr, "INFO: [%d] write, accepting %lu bytes\n", handle->idx, (long)realsize); return realsize; @@ -186,15 +207,43 @@ int main(int argc, char *argv[]) char *url, *host = NULL, *port = NULL; int all_paused = 0; int resume_round = -1; + int http_version = CURL_HTTP_VERSION_2_0; + int ch; + + while((ch = getopt(argc, argv, "hV:")) != -1) { + switch(ch) { + case 'h': + usage(NULL); + return 2; + case 'V': { + if(!strcmp("http/1.1", optarg)) + http_version = CURL_HTTP_VERSION_1_1; + else if(!strcmp("h2", optarg)) + http_version = CURL_HTTP_VERSION_2_0; + else if(!strcmp("h3", optarg)) + http_version = CURL_HTTP_VERSION_3ONLY; + else { + usage("invalid http version"); + return 1; + } + break; + } + default: + usage("invalid option"); + return 1; + } + } + argc -= optind; + argv += optind; - if(argc != 2) { + if(argc != 1) { fprintf(stderr, "ERROR: need URL as argument\n"); return 2; } - url = argv[1]; + url = argv[0]; curl_global_init(CURL_GLOBAL_DEFAULT); - curl_global_trace("ids,time,http/2"); + curl_global_trace("ids,time,http/2,http/3"); cu = curl_url(); if(!cu) { @@ -222,6 +271,8 @@ int main(int argc, char *argv[]) handles[i].idx = i; handles[i].paused = 0; handles[i].resumed = 0; + handles[i].errored = 0; + handles[i].fail_write = 1; handles[i].h = curl_easy_init(); if(!handles[i].h || curl_easy_setopt(handles[i].h, CURLOPT_WRITEFUNCTION, cb) != CURLE_OK || @@ -233,9 +284,11 @@ int main(int argc, char *argv[]) != CURLE_OK || curl_easy_setopt(handles[i].h, CURLOPT_SSL_VERIFYPEER, 0L) != CURLE_OK || curl_easy_setopt(handles[i].h, CURLOPT_RESOLVE, resolve) != CURLE_OK || + curl_easy_setopt(handles[i].h, CURLOPT_PIPEWAIT, 1L) || curl_easy_setopt(handles[i].h, CURLOPT_URL, url) != CURLE_OK) { err(); } + curl_easy_setopt(handles[i].h, CURLOPT_HTTP_VERSION, (long)http_version); } multi_handle = curl_multi_init(); @@ -269,6 +322,11 @@ int main(int argc, char *argv[]) fprintf(stderr, "ERROR: [%d] NOT resumed!\n", i); as_expected = 0; } + else if(handles[i].errored != 1) { + fprintf(stderr, "ERROR: [%d] NOT errored once, %d instead!\n", + i, handles[i].errored); + as_expected = 0; + } } if(!as_expected) { fprintf(stderr, "ERROR: handles not in expected state " @@ -308,7 +366,7 @@ int main(int argc, char *argv[]) if(all_paused) { fprintf(stderr, "INFO: all transfers paused\n"); /* give transfer some rounds to mess things up */ - resume_round = rounds + 3; + resume_round = rounds + 2; } } if(resume_round > 0 && rounds == resume_round) { diff --git a/tests/http/test_02_download.py b/tests/http/test_02_download.py index 00b4a04a4a..e0010a96ff 100644 --- a/tests/http/test_02_download.py +++ b/tests/http/test_02_download.py @@ -445,12 +445,30 @@ class TestDownload: r.check_exit_code(0) # test on paused transfers, based on issue #11982 - def test_02_27_paused_no_cl(self, env: Env, httpd, nghttpx, repeat): - proto = 'h2' + @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) + def test_02_27a_paused_no_cl(self, env: Env, httpd, nghttpx, proto, repeat): url = f'https://{env.authority_for(env.domain1, proto)}' \ - '/tweak?&chunks=2&chunk_size=16000' + '/curltest/tweak/?&chunks=6&chunk_size=8000' client = LocalClient(env=env, name='h2-pausing') - r = client.run(args=[url]) + r = client.run(args=['-V', proto, url]) + r.check_exit_code(0) + + # test on paused transfers, based on issue #11982 + @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) + def test_02_27b_paused_no_cl(self, env: Env, httpd, nghttpx, proto, repeat): + url = f'https://{env.authority_for(env.domain1, proto)}' \ + '/curltest/tweak/?error=502' + client = LocalClient(env=env, name='h2-pausing') + r = client.run(args=['-V', proto, url]) + r.check_exit_code(0) + + # test on paused transfers, based on issue #11982 + @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) + def test_02_27c_paused_no_cl(self, env: Env, httpd, nghttpx, proto, repeat): + url = f'https://{env.authority_for(env.domain1, proto)}' \ + '/curltest/tweak/?status=200&chunks=1&chunk_size=100' + client = LocalClient(env=env, name='h2-pausing') + r = client.run(args=['-V', proto, url]) r.check_exit_code(0) @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])