struct cw_out_ctx {
struct Curl_cwriter super;
struct cw_out_buf *buf;
+ BIT(paused);
+ BIT(errored);
};
static CURLcode cw_out_write(struct Curl_easy *data,
size_t max_write, min_write;
size_t wlen, nwritten;
- (void)ctx;
+ /* If we errored once, we do not invoke the client callback again */
+ if(ctx->errored)
+ return CURLE_WRITE_ERROR;
+
/* write callbacks may get NULLed by the client between calls. */
cw_get_writefunc(data, otype, &wcb, &wcb_data, &max_write, &min_write);
if(!wcb) {
}
*pconsumed = 0;
- while(blen && !(data->req.keepon & KEEP_RECV_PAUSE)) {
+ while(blen && !ctx->paused) {
if(!flush_all && blen < min_write)
break;
wlen = max_write? CURLMIN(blen, max_write) : blen;
}
/* mark the connection as RECV paused */
data->req.keepon |= KEEP_RECV_PAUSE;
+ ctx->paused = TRUE;
CURL_TRC_WRITE(data, "cw_out, PAUSE requested by client");
break;
}
- if(nwritten != wlen) {
+ else if(CURL_WRITEFUNC_ERROR == nwritten) {
+ failf(data, "client returned ERROR on write of %zu bytes", wlen);
+ return CURLE_WRITE_ERROR;
+ }
+ else if(nwritten != wlen) {
failf(data, "Failure writing output to destination, "
"passed %zu returned %zd", wlen, nwritten);
return CURLE_WRITE_ERROR;
if(!cwbuf)
return CURLE_OK;
- if(data->req.keepon & KEEP_RECV_PAUSE)
+ if(ctx->paused)
return CURLE_OK;
/* write the end of the chain until it blocks or gets empty */
return result;
if(*plast) {
/* could not write last, paused again? */
- DEBUGASSERT(data->req.keepon & KEEP_RECV_PAUSE);
+ DEBUGASSERT(ctx->paused);
return CURLE_OK;
}
}
bool flush_all,
const char *buf, size_t blen)
{
- CURLcode result;
+ CURLcode result = CURLE_OK;
/* if we have buffered data and it is a different type than what
* we are writing now, try to flush all */
if(ctx->buf && ctx->buf->type != otype) {
result = cw_out_flush_chain(ctx, data, &ctx->buf, TRUE);
if(result)
- return result;
+ goto out;
}
if(ctx->buf) {
return result;
result = cw_out_flush_chain(ctx, data, &ctx->buf, flush_all);
if(result)
- return result;
+ goto out;
}
else {
/* nothing buffered, try direct write */
/* did not write all, append the rest */
result = cw_out_append(ctx, otype, buf + consumed, blen - consumed);
if(result)
- return result;
+ goto out;
}
}
- return CURLE_OK;
+
+out:
+ if(result) {
+ /* We do not want to invoked client callbacks a second time after
+ * encountering an error. See issue #13337 */
+ ctx->errored = TRUE;
+ cw_out_bufs_free(ctx);
+ }
+ return result;
}
static CURLcode cw_out_write(struct Curl_easy *data,
return FALSE;
ctx = (struct cw_out_ctx *)cw_out;
- return cw_out_bufs_len(ctx) > 0;
+ CURL_TRC_WRITE(data, "cw-out is%spaused", ctx->paused? "" : " not");
+ return ctx->paused;
}
-static CURLcode cw_out_flush(struct Curl_easy *data, bool flush_all)
+static CURLcode cw_out_flush(struct Curl_easy *data,
+ bool unpause, bool flush_all)
{
struct Curl_cwriter *cw_out;
CURLcode result = CURLE_OK;
cw_out = Curl_cwriter_get_by_type(data, &Curl_cwt_out);
if(cw_out) {
struct cw_out_ctx *ctx = (struct cw_out_ctx *)cw_out;
+ if(ctx->errored)
+ return CURLE_WRITE_ERROR;
+ if(unpause && ctx->paused)
+ ctx->paused = FALSE;
+ if(ctx->paused)
+ return CURLE_OK; /* not doing it */
result = cw_out_flush_chain(ctx, data, &ctx->buf, flush_all);
+ if(result) {
+ ctx->errored = TRUE;
+ cw_out_bufs_free(ctx);
+ return result;
+ }
}
return result;
}
-CURLcode Curl_cw_out_flush(struct Curl_easy *data)
+CURLcode Curl_cw_out_unpause(struct Curl_easy *data)
{
- return cw_out_flush(data, FALSE);
+ CURL_TRC_WRITE(data, "cw-out unpause");
+ return cw_out_flush(data, TRUE, FALSE);
}
CURLcode Curl_cw_out_done(struct Curl_easy *data)
{
- return cw_out_flush(data, TRUE);
+ CURL_TRC_WRITE(data, "cw-out done");
+ return cw_out_flush(data, FALSE, TRUE);
}
/**
* Flush any buffered date to the client, chunk collation still applies.
*/
-CURLcode Curl_cw_out_flush(struct Curl_easy *data);
+CURLcode Curl_cw_out_unpause(struct Curl_easy *data);
/**
* Mark EndOfStream reached and flush ALL data to the client.
#include "multiif.h"
#include "select.h"
#include "cfilters.h"
-#include "cw-out.h"
#include "sendf.h" /* for failf function prototype */
#include "connect.h" /* for Curl_getconnectinfo */
#include "slist.h"
int oldstate;
int newstate;
bool recursive = FALSE;
+ bool keep_changed, unpause_read, not_all_paused;
if(!GOOD_EASY_HANDLE(data) || !data->conn)
/* crazy input, don't continue */
((action & CURLPAUSE_RECV)?KEEP_RECV_PAUSE:0) |
((action & CURLPAUSE_SEND)?KEEP_SEND_PAUSE:0);
- if((newstate & (KEEP_RECV_PAUSE| KEEP_SEND_PAUSE)) == oldstate) {
- /* Not changing any pause state, return */
- DEBUGF(infof(data, "pause: no change, early return"));
- return CURLE_OK;
- }
-
- /* Unpause parts in active mime tree. */
- if((k->keepon & ~newstate & KEEP_SEND_PAUSE) &&
- (data->mstate == MSTATE_PERFORMING ||
- data->mstate == MSTATE_RATELIMITING)) {
- result = Curl_creader_unpause(data);
- if(result)
- return result;
- }
-
- /* put it back in the keepon */
+ keep_changed = ((newstate & (KEEP_RECV_PAUSE| KEEP_SEND_PAUSE)) != oldstate);
+ not_all_paused = (newstate & (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE)) !=
+ (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE);
+ unpause_read = ((k->keepon & ~newstate & KEEP_SEND_PAUSE) &&
+ (data->mstate == MSTATE_PERFORMING ||
+ data->mstate == MSTATE_RATELIMITING));
+ /* Unpausing writes is detected on the next run in
+ * transfer.c:Curl_readwrite(). This is because this may result
+ * in a transfer error if the application's callbacks fail */
+
+ /* Set the new keepon state, so it takes effect no matter what error
+ * may happen afterwards. */
k->keepon = newstate;
- if(!(newstate & KEEP_RECV_PAUSE)) {
- Curl_conn_ev_data_pause(data, FALSE);
- result = Curl_cw_out_flush(data);
- if(result)
- return result;
- }
-
- /* if there's no error and we're not pausing both directions, we want
- to have this handle checked soon */
- if((newstate & (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE)) !=
- (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE)) {
- Curl_expire(data, 0, EXPIRE_RUN_NOW); /* get this handle going again */
-
+ /* If not completely pausing both directions now, run again in any case. */
+ if(not_all_paused) {
+ Curl_expire(data, 0, EXPIRE_RUN_NOW);
/* reset the too-slow time keeper */
data->state.keeps_speed.tv_sec = 0;
-
- if(!Curl_cw_out_is_paused(data))
- /* if not pausing again, force a recv/send check of this connection as
- the data might've been read off the socket already */
- data->state.select_bits = CURL_CSELECT_IN | CURL_CSELECT_OUT;
- if(data->multi) {
- if(Curl_update_timer(data->multi))
- return CURLE_ABORTED_BY_CALLBACK;
+ /* Simulate socket events on next run for unpaused directions */
+ if(!(newstate & KEEP_SEND_PAUSE))
+ data->state.select_bits |= CURL_CSELECT_OUT;
+ if(!(newstate & KEEP_RECV_PAUSE))
+ data->state.select_bits |= CURL_CSELECT_IN;
+ /* On changes, tell application to update its timers. */
+ if(keep_changed && data->multi) {
+ if(Curl_update_timer(data->multi)) {
+ result = CURLE_ABORTED_BY_CALLBACK;
+ goto out;
+ }
}
}
- if(!data->state.done)
+ if(unpause_read) {
+ result = Curl_creader_unpause(data);
+ if(result)
+ goto out;
+ }
+
+out:
+ if(!result && !data->state.done && keep_changed)
/* This transfer may have been moved in or out of the bundle, update the
corresponding socket callback, if used */
result = Curl_updatesocket(data);
Curl_posttransfer(data);
multi_done(data, result, TRUE);
}
- else if(data->req.done) {
+ else if(data->req.done && !Curl_cwriter_is_paused(data)) {
/* call this even if the readwrite function returned error */
Curl_posttransfer(data);
}
}
+bool Curl_cwriter_is_paused(struct Curl_easy *data)
+{
+ return Curl_cw_out_is_paused(data);
+}
+
+CURLcode Curl_cwriter_unpause(struct Curl_easy *data)
+{
+ return Curl_cw_out_unpause(data);
+}
+
CURLcode Curl_creader_read(struct Curl_easy *data,
struct Curl_creader *reader,
char *buf, size_t blen, size_t *nread, bool *eos)
struct Curl_cwriter *writer, int type,
const char *buf, size_t nbytes);
+/**
+ * Return TRUE iff client writer is paused.
+ */
+bool Curl_cwriter_is_paused(struct Curl_easy *data);
+
+/**
+ * Unpause client writer and flush any buffered date to the client.
+ */
+CURLcode Curl_cwriter_unpause(struct Curl_easy *data);
+
/**
* Default implementations for do_init, do_write, do_close that
* do nothing and pass the data through.
DEBUGF(infof(data, "nread == 0, stream closed, bailing"));
else
DEBUGF(infof(data, "nread <= 0, server closed connection, bailing"));
- k->keepon = 0; /* stop sending as well */
+ k->keepon &= ~(KEEP_RECV|KEEP_SEND); /* stop sending as well */
if(k->eos_written) /* already did write this to client, leave */
break;
}
int didwhat = 0;
int select_bits;
+ /* Check if client writes had been paused and can resume now. */
+ if(!(k->keepon & KEEP_RECV_PAUSE) && Curl_cwriter_is_paused(data)) {
+ Curl_conn_ev_data_pause(data, FALSE);
+ result = Curl_cwriter_unpause(data);
+ if(result)
+ goto out;
+ }
+
if(data->state.select_bits) {
if(select_bits_paused(data, data->state.select_bits)) {
/* leave the bits unchanged, so they'll tell us what to do when
*/
/* This is based on the poc client of issue #11982
*/
+#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
exit(2);
}
+static void usage(const char *msg)
+{
+ if(msg)
+ fprintf(stderr, "%s\n", msg);
+ fprintf(stderr,
+ "usage: [options] url\n"
+ " pause downloads with following options:\n"
+ " -V http_version (http/1.1, h2, h3) http version to use\n"
+ );
+}
+
struct handle
{
int idx;
int paused;
int resumed;
+ int errored;
+ int fail_write;
CURL *h;
};
++handle->paused;
fprintf(stderr, "INFO: [%d] write, PAUSING %d time on %lu bytes\n",
handle->idx, handle->paused, (long)realsize);
+ assert(handle->paused == 1);
return CURL_WRITEFUNC_PAUSE;
}
+ if(handle->fail_write) {
+ ++handle->errored;
+ fprintf(stderr, "INFO: [%d] FAIL write of %lu bytes, %d time\n",
+ handle->idx, (long)realsize, handle->errored);
+ return CURL_WRITEFUNC_ERROR;
+ }
fprintf(stderr, "INFO: [%d] write, accepting %lu bytes\n",
handle->idx, (long)realsize);
return realsize;
char *url, *host = NULL, *port = NULL;
int all_paused = 0;
int resume_round = -1;
+ int http_version = CURL_HTTP_VERSION_2_0;
+ int ch;
+
+ while((ch = getopt(argc, argv, "hV:")) != -1) {
+ switch(ch) {
+ case 'h':
+ usage(NULL);
+ return 2;
+ case 'V': {
+ if(!strcmp("http/1.1", optarg))
+ http_version = CURL_HTTP_VERSION_1_1;
+ else if(!strcmp("h2", optarg))
+ http_version = CURL_HTTP_VERSION_2_0;
+ else if(!strcmp("h3", optarg))
+ http_version = CURL_HTTP_VERSION_3ONLY;
+ else {
+ usage("invalid http version");
+ return 1;
+ }
+ break;
+ }
+ default:
+ usage("invalid option");
+ return 1;
+ }
+ }
+ argc -= optind;
+ argv += optind;
- if(argc != 2) {
+ if(argc != 1) {
fprintf(stderr, "ERROR: need URL as argument\n");
return 2;
}
- url = argv[1];
+ url = argv[0];
curl_global_init(CURL_GLOBAL_DEFAULT);
- curl_global_trace("ids,time,http/2");
+ curl_global_trace("ids,time,http/2,http/3");
cu = curl_url();
if(!cu) {
handles[i].idx = i;
handles[i].paused = 0;
handles[i].resumed = 0;
+ handles[i].errored = 0;
+ handles[i].fail_write = 1;
handles[i].h = curl_easy_init();
if(!handles[i].h ||
curl_easy_setopt(handles[i].h, CURLOPT_WRITEFUNCTION, cb) != CURLE_OK ||
!= CURLE_OK ||
curl_easy_setopt(handles[i].h, CURLOPT_SSL_VERIFYPEER, 0L) != CURLE_OK ||
curl_easy_setopt(handles[i].h, CURLOPT_RESOLVE, resolve) != CURLE_OK ||
+ curl_easy_setopt(handles[i].h, CURLOPT_PIPEWAIT, 1L) ||
curl_easy_setopt(handles[i].h, CURLOPT_URL, url) != CURLE_OK) {
err();
}
+ curl_easy_setopt(handles[i].h, CURLOPT_HTTP_VERSION, (long)http_version);
}
multi_handle = curl_multi_init();
fprintf(stderr, "ERROR: [%d] NOT resumed!\n", i);
as_expected = 0;
}
+ else if(handles[i].errored != 1) {
+ fprintf(stderr, "ERROR: [%d] NOT errored once, %d instead!\n",
+ i, handles[i].errored);
+ as_expected = 0;
+ }
}
if(!as_expected) {
fprintf(stderr, "ERROR: handles not in expected state "
if(all_paused) {
fprintf(stderr, "INFO: all transfers paused\n");
/* give transfer some rounds to mess things up */
- resume_round = rounds + 3;
+ resume_round = rounds + 2;
}
}
if(resume_round > 0 && rounds == resume_round) {
r.check_exit_code(0)
# test on paused transfers, based on issue #11982
- def test_02_27_paused_no_cl(self, env: Env, httpd, nghttpx, repeat):
- proto = 'h2'
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_02_27a_paused_no_cl(self, env: Env, httpd, nghttpx, proto, repeat):
url = f'https://{env.authority_for(env.domain1, proto)}' \
- '/tweak?&chunks=2&chunk_size=16000'
+ '/curltest/tweak/?&chunks=6&chunk_size=8000'
client = LocalClient(env=env, name='h2-pausing')
- r = client.run(args=[url])
+ r = client.run(args=['-V', proto, url])
+ r.check_exit_code(0)
+
+ # test on paused transfers, based on issue #11982
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_02_27b_paused_no_cl(self, env: Env, httpd, nghttpx, proto, repeat):
+ url = f'https://{env.authority_for(env.domain1, proto)}' \
+ '/curltest/tweak/?error=502'
+ client = LocalClient(env=env, name='h2-pausing')
+ r = client.run(args=['-V', proto, url])
+ r.check_exit_code(0)
+
+ # test on paused transfers, based on issue #11982
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_02_27c_paused_no_cl(self, env: Env, httpd, nghttpx, proto, repeat):
+ url = f'https://{env.authority_for(env.domain1, proto)}' \
+ '/curltest/tweak/?status=200&chunks=1&chunk_size=100'
+ client = LocalClient(env=env, name='h2-pausing')
+ r = client.run(args=['-V', proto, url])
r.check_exit_code(0)
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])