From: Stefan Eissing Date: Thu, 15 Aug 2024 11:16:53 +0000 (+0200) Subject: transfer: Curl_sendrecv() and event related improvements X-Git-Tag: curl-8_10_0~224 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a58b50fca6eb543896a7c1cb05bb21aa1fc49251;p=thirdparty%2Fcurl.git transfer: Curl_sendrecv() and event related improvements - Renames Curl_readwrite() to Curl_sendrecv() to reflect that it is mainly about talking to the server, not reads or writes to the client. Add a `nowp` parameter since the single caller already has this. - Curl_sendrecv() now runs all possible operations whenever it is called and either it had been polling sockets or the 'select_bits' are set. POLL_IN/POLL_OUT are not always directly related to send/recv operations. Filters like HTTP/2, QUIC or TLS may monitor reverse directions. If a transfer does not want to send (KEEP_SEND), it will not do so, as before. Same for receives. - Curl_update_timer() now checks the absolute timestamp of an expiry and the last/new timeout to determine if the application needs to stop/start/restart its timer. This fixes edge cases where updates did not happen as they should have. - improved --test-event curl_easy_perform() simulation to handle situations where no sockets are registered but a timeout is in place. - fixed bug in events_socket() that complained about removing a socket that was unknown, when indeed it had removed the socket just before, only it was the last in the list - fixed conncache's internal handle to carry the multi instance (where the cache has one) so that operations on the closure handle trigger event callbacks correctly. - fixed conncache to not POLL_REMOVE a socket twice when a conneciton was closed. Closes #14561 --- diff --git a/lib/cf-socket.c b/lib/cf-socket.c index 9e7cc6e981..7f64a2dba7 100644 --- a/lib/cf-socket.c +++ b/lib/cf-socket.c @@ -404,6 +404,9 @@ CURLcode Curl_socket_open(struct Curl_easy *data, static int socket_close(struct Curl_easy *data, struct connectdata *conn, int use_callback, curl_socket_t sock) { + if(CURL_SOCKET_BAD == sock) + return 0; + if(use_callback && conn && conn->fclosesocket) { int rc; Curl_multi_closed(data, sock); diff --git a/lib/conncache.c b/lib/conncache.c index 3591b4f8ef..0b00d57558 100644 --- a/lib/conncache.c +++ b/lib/conncache.c @@ -131,6 +131,12 @@ int Curl_conncache_init(struct conncache *connc, if(!connc->closure_handle) return 1; /* bad */ connc->closure_handle->state.internal = true; + /* TODO: this is quirky. We need an internal handle for certain + * operations, but we do not add it to the multi (if there is one). + * But we give it the multi so that socket event operations can work. + * Probably better to have an internal handle owned by the multi that + * can be used for conncache operations. */ + connc->closure_handle->multi = multi; #ifdef DEBUGBUILD if(getenv("CURL_DEBUG")) connc->closure_handle->set.verbose = true; @@ -146,6 +152,10 @@ int Curl_conncache_init(struct conncache *connc, void Curl_conncache_destroy(struct conncache *connc) { if(connc) { + if(connc->closure_handle) { + connc->closure_handle->multi = NULL; + Curl_close(&connc->closure_handle); + } Curl_hash_destroy(&connc->hash); connc->multi = NULL; } @@ -611,7 +621,8 @@ static void connc_close_all(struct conncache *connc) sigpipe_apply(data, &pipe_st); Curl_hostcache_clean(data, data->dns.hostcache); - Curl_close(&data); + connc->closure_handle->multi = NULL; + Curl_close(&connc->closure_handle); sigpipe_restore(&pipe_st); } @@ -980,14 +991,6 @@ static void connc_disconnect(struct Curl_easy *data, Curl_attach_connection(data, conn); - if(connc && connc->multi && connc->multi->socket_cb) { - struct easy_pollset ps; - /* With an empty pollset, all previously polled sockets will be removed - * via the multi_socket API callback. */ - memset(&ps, 0, sizeof(ps)); - (void)Curl_multi_pollset_ev(connc->multi, data, &ps, &conn->shutdown_poll); - } - connc_run_conn_shutdown_handler(data, conn); if(do_shutdown) { /* Make a last attempt to shutdown handlers and filters, if diff --git a/lib/easy.c b/lib/easy.c index b96736542a..9a3d1b8450 100644 --- a/lib/easy.c +++ b/lib/easy.c @@ -391,25 +391,22 @@ struct events { int running_handles; /* store the returned number */ }; +#define DEBUG_EV_POLL 0 + /* events_timer * * Callback that gets called with a new value when the timeout should be * updated. */ - static int events_timer(struct Curl_multi *multi, /* multi handle */ long timeout_ms, /* see above */ void *userp) /* private callback pointer */ { struct events *ev = userp; (void)multi; - if(timeout_ms == -1) - /* timeout removed */ - timeout_ms = 0; - else if(timeout_ms == 0) - /* timeout is already reached! */ - timeout_ms = 1; /* trigger asap */ - +#if DEBUG_EV_POLL + fprintf(stderr, "events_timer: set timeout %ldms\n", timeout_ms); +#endif ev->ms = timeout_ms; ev->msbump = TRUE; return 0; @@ -463,6 +460,7 @@ static int events_socket(struct Curl_easy *easy, /* easy handle */ struct events *ev = userp; struct socketmonitor *m; struct socketmonitor *prev = NULL; + bool found = FALSE; #if defined(CURL_DISABLE_VERBOSE_STRINGS) (void) easy; @@ -472,7 +470,7 @@ static int events_socket(struct Curl_easy *easy, /* easy handle */ m = ev->list; while(m) { if(m->socket.fd == s) { - + found = TRUE; if(what == CURL_POLL_REMOVE) { struct socketmonitor *nxt = m->next; /* remove this node from the list of monitored sockets */ @@ -481,7 +479,6 @@ static int events_socket(struct Curl_easy *easy, /* easy handle */ else ev->list = nxt; free(m); - m = nxt; infof(easy, "socket cb: socket %" CURL_FORMAT_SOCKET_T " REMOVED", s); } @@ -499,12 +496,13 @@ static int events_socket(struct Curl_easy *easy, /* easy handle */ prev = m; m = m->next; /* move to next node */ } - if(!m) { + + if(!found) { if(what == CURL_POLL_REMOVE) { - /* this happens a bit too often, libcurl fix perhaps? */ - /* fprintf(stderr, - "%s: socket %d asked to be REMOVED but not present!\n", - __func__, s); */ + /* should not happen if our logic is correct, but is no drama. */ + DEBUGF(infof(easy, "socket cb: asked to REMOVE socket %" + CURL_FORMAT_SOCKET_T "but not present!", s)); + DEBUGASSERT(0); } else { m = malloc(sizeof(struct socketmonitor)); @@ -565,14 +563,15 @@ static CURLcode wait_or_timeout(struct Curl_multi *multi, struct events *ev) int pollrc; int i; struct curltime before; - struct curltime after; /* populate the fds[] array */ for(m = ev->list, f = &fds[0]; m; m = m->next) { f->fd = m->socket.fd; f->events = m->socket.events; f->revents = 0; - /* fprintf(stderr, "poll() %d check socket %d\n", numfds, f->fd); */ +#if DEBUG_EV_POLL + fprintf(stderr, "poll() %d check socket %d\n", numfds, f->fd); +#endif f++; numfds++; } @@ -580,12 +579,27 @@ static CURLcode wait_or_timeout(struct Curl_multi *multi, struct events *ev) /* get the time stamp to use to figure out how long poll takes */ before = Curl_now(); - /* wait for activity or timeout */ - pollrc = Curl_poll(fds, (unsigned int)numfds, ev->ms); - if(pollrc < 0) - return CURLE_UNRECOVERABLE_POLL; - - after = Curl_now(); + if(numfds) { + /* wait for activity or timeout */ +#if DEBUG_EV_POLL + fprintf(stderr, "poll(numfds=%d, timeout=%ldms)\n", numfds, ev->ms); +#endif + pollrc = Curl_poll(fds, (unsigned int)numfds, ev->ms); +#if DEBUG_EV_POLL + fprintf(stderr, "poll(numfds=%d, timeout=%ldms) -> %d\n", + numfds, ev->ms, pollrc); +#endif + if(pollrc < 0) + return CURLE_UNRECOVERABLE_POLL; + } + else { +#if DEBUG_EV_POLL + fprintf(stderr, "poll, but no fds, wait timeout=%ldms\n", ev->ms); +#endif + pollrc = 0; + if(ev->ms > 0) + Curl_wait_ms(ev->ms); + } ev->msbump = FALSE; /* reset here */ @@ -618,12 +632,17 @@ static CURLcode wait_or_timeout(struct Curl_multi *multi, struct events *ev) } } - if(!ev->msbump) { + + if(!ev->msbump && ev->ms >= 0) { /* If nothing updated the timeout, we decrease it by the spent time. * If it was updated, it has the new timeout time stored already. */ - timediff_t timediff = Curl_timediff(after, before); + timediff_t timediff = Curl_timediff(Curl_now(), before); if(timediff > 0) { +#if DEBUG_EV_POLL + fprintf(stderr, "poll timeout %ldms not updated, decrease by " + "time spent %ldms\n", ev->ms, (long)timediff); +#endif if(timediff > ev->ms) ev->ms = 0; else @@ -656,7 +675,7 @@ static CURLcode easy_events(struct Curl_multi *multi) { /* this struct is made static to allow it to be used after this function returns and curl_multi_remove_handle() is called */ - static struct events evs = {2, FALSE, 0, NULL, 0}; + static struct events evs = {-1, FALSE, 0, NULL, 0}; /* if running event-based, do some further multi inits */ events_setup(multi, &evs); @@ -1121,7 +1140,7 @@ CURLcode curl_easy_pause(struct Curl_easy *data, int action) (data->mstate == MSTATE_PERFORMING || data->mstate == MSTATE_RATELIMITING)); /* Unpausing writes is detected on the next run in - * transfer.c:Curl_readwrite(). This is because this may result + * transfer.c:Curl_sendrecv(). This is because this may result * in a transfer error if the application's callbacks fail */ /* Set the new keepon state, so it takes effect no matter what error diff --git a/lib/http.c b/lib/http.c index 9b1fc71630..6b2755d74f 100644 --- a/lib/http.c +++ b/lib/http.c @@ -4441,7 +4441,8 @@ static CURLcode cr_exp100_read(struct Curl_easy *data, } /* We are now waiting for a reply from the server or * a timeout on our side IFF the request has been fully sent. */ - DEBUGF(infof(data, "cr_exp100_read, start AWAITING_CONTINUE")); + DEBUGF(infof(data, "cr_exp100_read, start AWAITING_CONTINUE, " + "timeout %ldms", data->set.expect_100_timeout)); ctx->state = EXP100_AWAITING_CONTINUE; ctx->start = Curl_now(); Curl_expire(data, data->set.expect_100_timeout, EXPIRE_100_TIMEOUT); diff --git a/lib/multi.c b/lib/multi.c index f286325c23..6f881f6292 100644 --- a/lib/multi.c +++ b/lib/multi.c @@ -94,9 +94,12 @@ static CURLMcode add_next_timeout(struct curltime now, struct Curl_multi *multi, struct Curl_easy *d); static CURLMcode multi_timeout(struct Curl_multi *multi, + struct curltime *expire_time, long *timeout_ms); static void process_pending_handles(struct Curl_multi *multi); static void multi_xfer_bufs_free(struct Curl_multi *multi); +static void Curl_expire_ex(struct Curl_easy *data, const struct curltime *nowp, + timediff_t milli, expire_id id); #ifdef DEBUGBUILD static const char * const multi_statename[]={ @@ -418,6 +421,7 @@ struct Curl_multi *Curl_multi_handle(size_t hashsize, /* socket hash */ multi->multiplexing = TRUE; multi->max_concurrent_streams = 100; + multi->last_timeout_ms = -1; #ifdef USE_WINSOCK multi->wsa_event = WSACreateEvent(); @@ -527,18 +531,6 @@ CURLMcode curl_multi_add_handle(struct Curl_multi *multi, happen. */ Curl_expire(data, 0, EXPIRE_RUN_NOW); - /* A somewhat crude work-around for a little glitch in Curl_update_timer() - that happens if the lastcall time is set to the same time when the handle - is removed as when the next handle is added, as then the check in - Curl_update_timer() that prevents calling the application multiple times - with the same timer info will not trigger and then the new handle's - timeout will not be notified to the app. - - The work-around is thus simply to clear the 'lastcall' variable to force - Curl_update_timer() to always trigger a callback to the app when a new - easy handle is added */ - memset(&multi->timer_lastcall, 0, sizeof(multi->timer_lastcall)); - rc = Curl_update_timer(multi); if(rc) { data->multi = NULL; /* not anymore */ @@ -799,6 +791,7 @@ CURLMcode curl_multi_remove_handle(struct Curl_multi *multi, bool premature; struct Curl_llist_node *e; CURLMcode rc; + bool removed_timer = FALSE; /* First, make some basic checks that the CURLM handle is a good handle */ if(!GOOD_MULTI_HANDLE(multi)) @@ -849,7 +842,7 @@ CURLMcode curl_multi_remove_handle(struct Curl_multi *multi, /* The timer must be shut down before data->multi is set to NULL, else the timenode will remain in the splay tree after curl_easy_cleanup is called. Do it after multi_done() in case that sets another time! */ - Curl_expire_clear(data); + removed_timer = Curl_expire_clear(data); /* the handle is in a list, remove it from whichever it is */ Curl_node_remove(&data->multi_queue); @@ -930,9 +923,11 @@ CURLMcode curl_multi_remove_handle(struct Curl_multi *multi, process_pending_handles(multi); - rc = Curl_update_timer(multi); - if(rc) - return rc; + if(removed_timer) { + rc = Curl_update_timer(multi); + if(rc) + return rc; + } return CURLM_OK; } @@ -1167,7 +1162,6 @@ CURLMcode curl_multi_fdset(struct Curl_multi *multi, Some easy handles may not have connected to the remote host yet, and then we must make sure that is done. */ int this_max_fd = -1; - struct easy_pollset ps; struct Curl_llist_node *e; (void)exc_fd_set; /* not used */ @@ -1177,23 +1171,22 @@ CURLMcode curl_multi_fdset(struct Curl_multi *multi, if(multi->in_callback) return CURLM_RECURSIVE_API_CALL; - memset(&ps, 0, sizeof(ps)); for(e = Curl_llist_head(&multi->process); e; e = Curl_node_next(e)) { struct Curl_easy *data = Curl_node_elem(e); unsigned int i; - multi_getsock(data, &ps); + multi_getsock(data, &data->last_poll); - for(i = 0; i < ps.num; i++) { - if(!FDSET_SOCK(ps.sockets[i])) + for(i = 0; i < data->last_poll.num; i++) { + if(!FDSET_SOCK(data->last_poll.sockets[i])) /* pretend it does not exist */ continue; - if(ps.actions[i] & CURL_POLL_IN) - FD_SET(ps.sockets[i], read_fd_set); - if(ps.actions[i] & CURL_POLL_OUT) - FD_SET(ps.sockets[i], write_fd_set); - if((int)ps.sockets[i] > this_max_fd) - this_max_fd = (int)ps.sockets[i]; + if(data->last_poll.actions[i] & CURL_POLL_IN) + FD_SET(data->last_poll.sockets[i], read_fd_set); + if(data->last_poll.actions[i] & CURL_POLL_OUT) + FD_SET(data->last_poll.sockets[i], write_fd_set); + if((int)data->last_poll.sockets[i] > this_max_fd) + this_max_fd = (int)data->last_poll.sockets[i]; } } @@ -1208,7 +1201,6 @@ CURLMcode curl_multi_waitfds(struct Curl_multi *multi, unsigned int *fd_count) { struct curl_waitfds cwfds; - struct easy_pollset ps; CURLMcode result = CURLM_OK; struct Curl_llist_node *e; @@ -1222,11 +1214,10 @@ CURLMcode curl_multi_waitfds(struct Curl_multi *multi, return CURLM_RECURSIVE_API_CALL; Curl_waitfds_init(&cwfds, ufds, size); - memset(&ps, 0, sizeof(ps)); for(e = Curl_llist_head(&multi->process); e; e = Curl_node_next(e)) { struct Curl_easy *data = Curl_node_elem(e); - multi_getsock(data, &ps); - if(Curl_waitfds_add_ps(&cwfds, &ps)) { + multi_getsock(data, &data->last_poll); + if(Curl_waitfds_add_ps(&cwfds, &data->last_poll)) { result = CURLM_OUT_OF_MEMORY; goto out; } @@ -1269,8 +1260,8 @@ static CURLMcode multi_wait(struct Curl_multi *multi, bool extrawait, /* when no socket, wait */ bool use_wakeup) { - struct easy_pollset ps; size_t i; + struct curltime expire_time; long timeout_internal; int retcode = 0; struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK]; @@ -1297,14 +1288,13 @@ static CURLMcode multi_wait(struct Curl_multi *multi, return CURLM_BAD_FUNCTION_ARGUMENT; Curl_pollfds_init(&cpfds, a_few_on_stack, NUM_POLLS_ON_STACK); - memset(&ps, 0, sizeof(ps)); /* Add the curl handles to our pollfds first */ for(e = Curl_llist_head(&multi->process); e; e = Curl_node_next(e)) { struct Curl_easy *data = Curl_node_elem(e); - multi_getsock(data, &ps); - if(Curl_pollfds_add_ps(&cpfds, &ps)) { + multi_getsock(data, &data->last_poll); + if(Curl_pollfds_add_ps(&cpfds, &data->last_poll)) { result = CURLM_OUT_OF_MEMORY; goto out; } @@ -1367,7 +1357,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi, * poll. Collecting the sockets may install new timers by protocols * and connection filters. * Use the shorter one of the internal and the caller requested timeout. */ - (void)multi_timeout(multi, &timeout_internal); + (void)multi_timeout(multi, &expire_time, &timeout_internal); if((timeout_internal >= 0) && (timeout_internal < (long)timeout_ms)) timeout_ms = (int)timeout_internal; @@ -1443,16 +1433,15 @@ static CURLMcode multi_wait(struct Curl_multi *multi, for(e = Curl_llist_head(&multi->process); e && !result; e = Curl_node_next(e)) { struct Curl_easy *data = Curl_node_elem(e); - multi_getsock(data, &ps); - for(i = 0; i < ps.num; i++) { + for(i = 0; i < data->last_poll.num; i++) { wsa_events.lNetworkEvents = 0; - if(WSAEnumNetworkEvents(ps.sockets[i], NULL, + if(WSAEnumNetworkEvents(data->last_poll.sockets[i], NULL, &wsa_events) == 0) { if(ret && !pollrc && wsa_events.lNetworkEvents) retcode++; } - WSAEventSelect(ps.sockets[i], multi->wsa_event, 0); + WSAEventSelect(data->last_poll.sockets[i], multi->wsa_event, 0); } } } @@ -2397,7 +2386,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, } /* read/write data if it is ready to do so */ - result = Curl_readwrite(data); + result = Curl_sendrecv(data, nowp); if(data->req.done || (result == CURLE_RECV_ERROR)) { /* If CURLE_RECV_ERROR happens early enough, we assume it was a race @@ -3064,11 +3053,15 @@ void Curl_multi_closed(struct Curl_easy *data, curl_socket_t s) if(data) { /* if there is still an easy handle associated with this connection */ struct Curl_multi *multi = data->multi; + DEBUGF(infof(data, "Curl_multi_closed, fd=%" CURL_FORMAT_SOCKET_T + " multi is %p", s, (void *)multi)); if(multi) { /* this is set if this connection is part of a handle that is added to a multi handle, and only then this is necessary */ struct Curl_sh_entry *entry = sh_getentry(&multi->sockhash, s); + DEBUGF(infof(data, "Curl_multi_closed, fd=%" CURL_FORMAT_SOCKET_T + " entry is %p", s, (void *)entry)); if(entry) { int rc = 0; if(multi->socket_cb) { @@ -3145,6 +3138,59 @@ static CURLMcode add_next_timeout(struct curltime now, return CURLM_OK; } +struct multi_run_ctx { + struct Curl_multi *multi; + struct curltime now; + size_t run_xfers; + SIGPIPE_MEMBER(pipe_st); + bool run_conn_cache; +}; + +static CURLMcode multi_run_expired(struct multi_run_ctx *mrc) +{ + struct Curl_multi *multi = mrc->multi; + struct Curl_easy *data = NULL; + struct Curl_tree *t = NULL; + CURLMcode result = CURLM_OK; + + /* + * The loop following here will go on as long as there are expire-times left + * to process (compared to mrc->now) in the splay and 'data' will be + * re-assigned for every expired handle we deal with. + */ + while(1) { + /* Check if there is one (more) expired timer to deal with! This function + extracts a matching node if there is one */ + multi->timetree = Curl_splaygetbest(mrc->now, multi->timetree, &t); + if(!t) + goto out; + + data = Curl_splayget(t); /* assign this for next loop */ + if(!data) + continue; + + (void)add_next_timeout(mrc->now, multi, data); + if(data == multi->conn_cache.closure_handle) { + mrc->run_conn_cache = TRUE; + continue; + } + + mrc->run_xfers++; + sigpipe_apply(data, &mrc->pipe_st); + result = multi_runsingle(multi, &mrc->now, data); + + if(CURLM_OK >= result) { + /* get the socket(s) and check if the state has been changed since + last */ + result = singlesocket(multi, data); + if(result) + goto out; + } + } + +out: + return result; +} static CURLMcode multi_socket(struct Curl_multi *multi, bool checkall, curl_socket_t s, @@ -3153,10 +3199,13 @@ static CURLMcode multi_socket(struct Curl_multi *multi, { CURLMcode result = CURLM_OK; struct Curl_easy *data = NULL; - struct Curl_tree *t = NULL; - struct curltime now = Curl_now(); - bool run_conn_cache = FALSE; - SIGPIPE_VARIABLE(pipe_st); + struct multi_run_ctx mrc; + + (void)ev_bitmask; + memset(&mrc, 0, sizeof(mrc)); + mrc.multi = multi; + mrc.now = Curl_now(); + sigpipe_init(&mrc.pipe_st); if(checkall) { struct Curl_llist_node *e; @@ -3171,10 +3220,10 @@ static CURLMcode multi_socket(struct Curl_multi *multi, result = singlesocket(multi, Curl_node_elem(e)); } } - - /* or should we fall-through and do the timer-based stuff? */ - return result; + mrc.run_conn_cache = TRUE; + goto out; } + if(s != CURL_SOCKET_TIMEOUT) { struct Curl_sh_entry *entry = sh_getentry(&multi->sockhash, s); @@ -3201,76 +3250,42 @@ static CURLMcode multi_socket(struct Curl_multi *multi, DEBUGASSERT(data->magic == CURLEASY_MAGIC_NUMBER); if(data == multi->conn_cache.closure_handle) - run_conn_cache = TRUE; + mrc.run_conn_cache = TRUE; else { - if(data->conn && !(data->conn->handler->flags & PROTOPT_DIRLOCK)) - /* set socket event bitmask if they are not locked */ - data->state.select_bits |= (unsigned char)ev_bitmask; - - Curl_expire(data, 0, EXPIRE_RUN_NOW); + /* Expire with out current now, so we will get it below when + * asking the splaytree for expired transfers. */ + Curl_expire_ex(data, &mrc.now, 0, EXPIRE_RUN_NOW); } } - - /* Now we fall-through and do the timer-based stuff, since we do not want - to force the user to have to deal with timeouts as long as at least - one connection in fact has traffic. */ - - now = Curl_now(); /* get a newer time since the multi_runsingle() loop - may have taken some time */ } } - else { - /* Asked to run due to time-out. Clear the 'lastcall' variable to force - Curl_update_timer() to trigger a callback to the app again even if the - same timeout is still the one to run after this call. That handles the - case when the application asks libcurl to run the timeout - prematurely. */ - memset(&multi->timer_lastcall, 0, sizeof(multi->timer_lastcall)); - } - /* - * The loop following here will go on as long as there are expire-times left - * to process in the splay and 'data' will be re-assigned for every expired - * handle we deal with. - */ - sigpipe_init(&pipe_st); - do { - if(data == multi->conn_cache.closure_handle) - run_conn_cache = TRUE; - /* the first loop lap 'data' can be NULL */ - else if(data) { - sigpipe_apply(data, &pipe_st); - result = multi_runsingle(multi, &now, data); - - if(CURLM_OK >= result) { - /* get the socket(s) and check if the state has been changed since - last */ - result = singlesocket(multi, data); - if(result) - break; - } - } - - /* Check if there is one (more) expired timer to deal with! This function - extracts a matching node if there is one */ - - multi->timetree = Curl_splaygetbest(now, multi->timetree, &t); - if(t) { - data = Curl_splayget(t); /* assign this for next loop */ - (void)add_next_timeout(now, multi, data); - } + result = multi_run_expired(&mrc); + if(result) + goto out; - } while(t); + if(mrc.run_xfers) { + /* Running transfers takes time. With a new timestamp, we might catch + * other expires which are due now. Instead of telling the application + * to set a 0 timeout and call us again, we run them here. + * Do that only once or it might be unfair to transfers on other + * sockets. */ + mrc.now = Curl_now(); + result = multi_run_expired(&mrc); + } - if(run_conn_cache) { - sigpipe_apply(multi->conn_cache.closure_handle, &pipe_st); +out: + if(mrc.run_conn_cache) { + sigpipe_apply(multi->conn_cache.closure_handle, &mrc.pipe_st); Curl_conncache_multi_perform(multi); } - - sigpipe_restore(&pipe_st); + sigpipe_restore(&mrc.pipe_st); if(running_handles) *running_handles = (int)multi->num_alive; + + if(CURLM_OK >= result) + result = Curl_update_timer(multi); return result; } @@ -3359,39 +3374,28 @@ CURLMcode curl_multi_setopt(struct Curl_multi *multi, CURLMcode curl_multi_socket(struct Curl_multi *multi, curl_socket_t s, int *running_handles) { - CURLMcode result; if(multi->in_callback) return CURLM_RECURSIVE_API_CALL; - result = multi_socket(multi, FALSE, s, 0, running_handles); - if(CURLM_OK >= result) - result = Curl_update_timer(multi); - return result; + return multi_socket(multi, FALSE, s, 0, running_handles); } CURLMcode curl_multi_socket_action(struct Curl_multi *multi, curl_socket_t s, int ev_bitmask, int *running_handles) { - CURLMcode result; if(multi->in_callback) return CURLM_RECURSIVE_API_CALL; - result = multi_socket(multi, FALSE, s, ev_bitmask, running_handles); - if(CURLM_OK >= result) - result = Curl_update_timer(multi); - return result; + return multi_socket(multi, FALSE, s, ev_bitmask, running_handles); } CURLMcode curl_multi_socket_all(struct Curl_multi *multi, int *running_handles) { - CURLMcode result; if(multi->in_callback) return CURLM_RECURSIVE_API_CALL; - result = multi_socket(multi, TRUE, CURL_SOCKET_BAD, 0, running_handles); - if(CURLM_OK >= result) - result = Curl_update_timer(multi); - return result; + return multi_socket(multi, TRUE, CURL_SOCKET_BAD, 0, running_handles); } static CURLMcode multi_timeout(struct Curl_multi *multi, + struct curltime *expire_time, long *timeout_ms) { static const struct curltime tv_zero = {0, 0}; @@ -3407,6 +3411,9 @@ static CURLMcode multi_timeout(struct Curl_multi *multi, /* splay the lowest to the bottom */ multi->timetree = Curl_splay(tv_zero, multi->timetree); + /* this will not return NULL from a non-emtpy tree, but some compilers + * are not convinced of that. Analyzers are hard. */ + *expire_time = multi->timetree? multi->timetree->key : tv_zero; /* 'multi->timetree' will be non-NULL here but the compilers sometimes yell at us if we assume so */ @@ -3418,12 +3425,15 @@ static CURLMcode multi_timeout(struct Curl_multi *multi, overly long timeouts */ *timeout_ms = (long)diff; } - else + else { /* 0 means immediately */ *timeout_ms = 0; + } } - else + else { + *expire_time = tv_zero; *timeout_ms = -1; + } return CURLM_OK; } @@ -3431,6 +3441,8 @@ static CURLMcode multi_timeout(struct Curl_multi *multi, CURLMcode curl_multi_timeout(struct Curl_multi *multi, long *timeout_ms) { + struct curltime expire_time; + /* First, make some basic checks that the CURLM handle is a good handle */ if(!GOOD_MULTI_HANDLE(multi)) return CURLM_BAD_HANDLE; @@ -3438,56 +3450,79 @@ CURLMcode curl_multi_timeout(struct Curl_multi *multi, if(multi->in_callback) return CURLM_RECURSIVE_API_CALL; - return multi_timeout(multi, timeout_ms); + return multi_timeout(multi, &expire_time, timeout_ms); } +#define DEBUG_UPDATE_TIMER 0 + /* * Tell the application it should update its timers, if it subscribes to the * update timer callback. */ CURLMcode Curl_update_timer(struct Curl_multi *multi) { + struct curltime expire_ts; long timeout_ms; int rc; + bool set_value = FALSE; if(!multi->timer_cb || multi->dead) return CURLM_OK; - if(multi_timeout(multi, &timeout_ms)) { + if(multi_timeout(multi, &expire_ts, &timeout_ms)) { return CURLM_OK; } - if(timeout_ms < 0) { - static const struct curltime none = {0, 0}; - if(Curl_timediff_us(none, multi->timer_lastcall)) { - multi->timer_lastcall = none; - /* there is no timeout now but there was one previously, tell the app to - disable it */ - set_in_callback(multi, TRUE); - rc = multi->timer_cb(multi, -1, multi->timer_userp); - set_in_callback(multi, FALSE); - if(rc == -1) { - multi->dead = TRUE; - return CURLM_ABORTED_BY_CALLBACK; - } - return CURLM_OK; - } - return CURLM_OK; - } - - /* When multi_timeout() is done, multi->timetree points to the node with the - * timeout we got the (relative) time-out time for. We can thus easily check - * if this is the same (fixed) time as we got in a previous call and then - * avoid calling the callback again. */ - if(Curl_timediff_us(multi->timetree->key, multi->timer_lastcall) == 0) - return CURLM_OK; - multi->timer_lastcall = multi->timetree->key; + if(timeout_ms < 0 && multi->last_timeout_ms < 0) { +#if DEBUG_UPDATE_TIMER + fprintf(stderr, "Curl_update_timer(), still no timeout, no change\n"); +#endif + } + else if(timeout_ms < 0) { + /* there is no timeout now but there was one previously */ +#if DEBUG_UPDATE_TIMER + fprintf(stderr, "Curl_update_timer(), remove timeout, " + " last_timeout=%ldms\n", multi->last_timeout_ms); +#endif + timeout_ms = -1; /* normalize */ + set_value = TRUE; + } + else if(multi->last_timeout_ms < 0) { +#if DEBUG_UPDATE_TIMER + fprintf(stderr, "Curl_update_timer(), had no timeout, set now\n"); +#endif + set_value = TRUE; + } + else if(Curl_timediff_us(multi->last_expire_ts, expire_ts)) { + /* We had a timeout before and have one now, the absolute timestamp + * differs. The relative timeout_ms may be the same, but the starting + * point differs. Let the application restart its timer. */ +#if DEBUG_UPDATE_TIMER + fprintf(stderr, "Curl_update_timer(), expire timestamp changed\n"); +#endif + set_value = TRUE; + } + else { + /* We have same expire time as previously. Our relative 'timeout_ms' + * may be different now, but the application has the timer running + * and we do not to tell it to start this again. */ +#if DEBUG_UPDATE_TIMER + fprintf(stderr, "Curl_update_timer(), same expire timestamp, no change\n"); +#endif + } - set_in_callback(multi, TRUE); - rc = multi->timer_cb(multi, timeout_ms, multi->timer_userp); - set_in_callback(multi, FALSE); - if(rc == -1) { - multi->dead = TRUE; - return CURLM_ABORTED_BY_CALLBACK; + if(set_value) { +#if DEBUG_UPDATE_TIMER + fprintf(stderr, "Curl_update_timer(), set timeout %ldms\n", timeout_ms); +#endif + multi->last_expire_ts = expire_ts; + multi->last_timeout_ms = timeout_ms; + set_in_callback(multi, TRUE); + rc = multi->timer_cb(multi, timeout_ms, multi->timer_userp); + set_in_callback(multi, FALSE); + if(rc == -1) { + multi->dead = TRUE; + return CURLM_ABORTED_BY_CALLBACK; + } } return CURLM_OK; } @@ -3566,10 +3601,12 @@ multi_addtimeout(struct Curl_easy *data, * * Expire replaces a former timeout using the same id if already set. */ -void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id id) +static void Curl_expire_ex(struct Curl_easy *data, + const struct curltime *nowp, + timediff_t milli, expire_id id) { struct Curl_multi *multi = data->multi; - struct curltime *nowp = &data->state.expiretime; + struct curltime *curr_expire = &data->state.expiretime; struct curltime set; /* this is only interesting while there is still an associated multi struct @@ -3579,7 +3616,7 @@ void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id id) DEBUGASSERT(id < EXPIRE_LAST); - set = Curl_now(); + set = *nowp; set.tv_sec += (time_t)(milli/1000); /* might be a 64 to 32 bits conversion */ set.tv_usec += (int)(milli%1000)*1000; @@ -3595,11 +3632,11 @@ void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id id) in case we need to recompute the minimum timer later. */ multi_addtimeout(data, &set, id); - if(nowp->tv_sec || nowp->tv_usec) { + if(curr_expire->tv_sec || curr_expire->tv_usec) { /* This means that the struct is added as a node in the splay tree. Compare if the new time is earlier, and only remove-old/add-new if it is. */ - timediff_t diff = Curl_timediff(set, *nowp); + timediff_t diff = Curl_timediff(set, *curr_expire); int rc; if(diff > 0) { @@ -3618,12 +3655,18 @@ void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id id) /* Indicate that we are in the splay tree and insert the new timer expiry value since it is our local minimum. */ - *nowp = set; + *curr_expire = set; Curl_splayset(&data->state.timenode, data); - multi->timetree = Curl_splayinsert(*nowp, multi->timetree, + multi->timetree = Curl_splayinsert(*curr_expire, multi->timetree, &data->state.timenode); } +void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id id) +{ + struct curltime now = Curl_now(); + Curl_expire_ex(data, &now, milli, id); +} + /* * Curl_expire_done() * @@ -3641,7 +3684,7 @@ void Curl_expire_done(struct Curl_easy *data, expire_id id) * * Clear ALL timeout values for this handle. */ -void Curl_expire_clear(struct Curl_easy *data) +bool Curl_expire_clear(struct Curl_easy *data) { struct Curl_multi *multi = data->multi; struct curltime *nowp = &data->state.expiretime; @@ -3649,7 +3692,7 @@ void Curl_expire_clear(struct Curl_easy *data) /* this is only interesting while there is still an associated multi struct remaining! */ if(!multi) - return; + return FALSE; if(nowp->tv_sec || nowp->tv_usec) { /* Since this is an cleared time, we must remove the previous entry from @@ -3670,12 +3713,11 @@ void Curl_expire_clear(struct Curl_easy *data) #endif nowp->tv_sec = 0; nowp->tv_usec = 0; + return TRUE; } + return FALSE; } - - - CURLMcode curl_multi_assign(struct Curl_multi *multi, curl_socket_t s, void *hashp) { diff --git a/lib/multihandle.h b/lib/multihandle.h index ad78c8c013..a6efe2769f 100644 --- a/lib/multihandle.h +++ b/lib/multihandle.h @@ -151,8 +151,9 @@ struct Curl_multi { /* timer callback and user data pointer for the *socket() API */ curl_multi_timer_callback timer_cb; void *timer_userp; - struct curltime timer_lastcall; /* the fixed time for the timeout for the - previous callback */ + long last_timeout_ms; /* the last timeout value set via timer_cb */ + struct curltime last_expire_ts; /* timestamp of last expiry */ + #ifdef USE_WINSOCK WSAEVENT wsa_event; /* Winsock event used for waits */ #else diff --git a/lib/multiif.h b/lib/multiif.h index 6722e4b2d6..451d8c730e 100644 --- a/lib/multiif.h +++ b/lib/multiif.h @@ -30,7 +30,7 @@ CURLcode Curl_updatesocket(struct Curl_easy *data); void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id); -void Curl_expire_clear(struct Curl_easy *data); +bool Curl_expire_clear(struct Curl_easy *data); void Curl_expire_done(struct Curl_easy *data, expire_id id); CURLMcode Curl_update_timer(struct Curl_multi *multi) WARN_UNUSED_RESULT; void Curl_attach_connection(struct Curl_easy *data, diff --git a/lib/sigpipe.h b/lib/sigpipe.h index d78afd905d..c57580f434 100644 --- a/lib/sigpipe.h +++ b/lib/sigpipe.h @@ -35,6 +35,7 @@ struct sigpipe_ignore { }; #define SIGPIPE_VARIABLE(x) struct sigpipe_ignore x +#define SIGPIPE_MEMBER(x) struct sigpipe_ignore x static void sigpipe_init(struct sigpipe_ignore *ig) { @@ -92,6 +93,7 @@ static void sigpipe_apply(struct Curl_easy *data, #define sigpipe_init(x) Curl_nop_stmt #define sigpipe_restore(x) Curl_nop_stmt #define SIGPIPE_VARIABLE(x) +#define SIGPIPE_MEMBER(x) bool x #endif #endif /* HEADER_CURL_SIGPIPE_H */ diff --git a/lib/transfer.c b/lib/transfer.c index e5d886baaf..46fde236d8 100644 --- a/lib/transfer.c +++ b/lib/transfer.c @@ -196,18 +196,6 @@ CURLcode Curl_xfer_send_shutdown(struct Curl_easy *data, bool *done) return Curl_conn_shutdown(data, sockindex, done); } -static bool xfer_send_shutdown_started(struct Curl_easy *data) -{ - int sockindex; - - if(!data || !data->conn) - return CURLE_FAILED_INIT; - if(data->conn->writesockfd == CURL_SOCKET_BAD) - return CURLE_FAILED_INIT; - sockindex = (data->conn->writesockfd == data->conn->sock[SECONDARYSOCKET]); - return Curl_shutdown_started(data, sockindex); -} - /** * Receive raw response data for the transfer. * @param data the transfer @@ -261,7 +249,7 @@ static ssize_t Curl_xfer_recv_resp(struct Curl_easy *data, return -1; } } - DEBUGF(infof(data, "readwrite_data: we are done")); + DEBUGF(infof(data, "sendrecv_dl: we are done")); } DEBUGASSERT(nread >= 0); return nread; @@ -272,9 +260,9 @@ static ssize_t Curl_xfer_recv_resp(struct Curl_easy *data, * the stream was rewound (in which case we have data in a * buffer) */ -static CURLcode readwrite_data(struct Curl_easy *data, - struct SingleRequest *k, - int *didwhat) +static CURLcode sendrecv_dl(struct Curl_easy *data, + struct SingleRequest *k, + int *didwhat) { struct connectdata *conn = data->conn; CURLcode result = CURLE_OK; @@ -381,14 +369,14 @@ static CURLcode readwrite_data(struct Curl_easy *data, out: Curl_multi_xfer_buf_release(data, xfer_buf); if(result) - DEBUGF(infof(data, "readwrite_data() -> %d", result)); + DEBUGF(infof(data, "sendrecv_dl() -> %d", result)); return result; } /* * Send data to upload to the server, when the socket is writable. */ -static CURLcode readwrite_upload(struct Curl_easy *data, int *didwhat) +static CURLcode sendrecv_ul(struct Curl_easy *data, int *didwhat) { /* We should not get here when the sending is already done. It * probably means that someone set `data-req.keepon |= KEEP_SEND` @@ -421,66 +409,44 @@ static int select_bits_paused(struct Curl_easy *data, int select_bits) } /* - * Curl_readwrite() is the low-level function to be called when data is to + * Curl_sendrecv() is the low-level function to be called when data is to * be read and written to/from the connection. */ -CURLcode Curl_readwrite(struct Curl_easy *data) +CURLcode Curl_sendrecv(struct Curl_easy *data, struct curltime *nowp) { - struct connectdata *conn = data->conn; struct SingleRequest *k = &data->req; - CURLcode result; - struct curltime now; + CURLcode result = CURLE_OK; int didwhat = 0; - int select_bits; + int select_bits = 0; + DEBUGASSERT(nowp); if(data->state.select_bits) { if(select_bits_paused(data, data->state.select_bits)) { /* leave the bits unchanged, so they'll tell us what to do when * this transfer gets unpaused. */ - DEBUGF(infof(data, "readwrite, select_bits, early return on PAUSED")); + /* DEBUGF(infof(data, "sendrecv, select_bits, early return on PAUSED")); + */ result = CURLE_OK; goto out; } - select_bits = data->state.select_bits; data->state.select_bits = 0; + /* DEBUGF(infof(data, "sendrecv, select_bits %x, RUN", select_bits)); */ + select_bits = (CURL_CSELECT_OUT|CURL_CSELECT_IN); } - else if(((k->keepon & KEEP_RECVBITS) == KEEP_RECV) && - xfer_recv_shutdown_started(data)) { - DEBUGF(infof(data, "readwrite, recv for finishing shutdown")); - select_bits = CURL_CSELECT_IN; + else if(data->last_poll.num) { + /* The transfer wanted something polled. Let's run all available + * send/receives. Worst case we EAGAIN on some. */ + /* DEBUGF(infof(data, "sendrecv, had poll sockets, RUN")); */ + select_bits = (CURL_CSELECT_OUT|CURL_CSELECT_IN); } - else if(((k->keepon & KEEP_SENDBITS) == KEEP_SEND) && - xfer_send_shutdown_started(data)) { - DEBUGF(infof(data, "readwrite, send for finishing shutdown")); + else if(data->req.keepon & KEEP_SEND_TIMED) { + /* DEBUGF(infof(data, "sendrecv, KEEP_SEND_TIMED, RUN ul")); */ select_bits = CURL_CSELECT_OUT; } - else { - curl_socket_t fd_read; - curl_socket_t fd_write; - /* only use the proper socket if the *_HOLD bit is not set simultaneously - as then we are in rate limiting state in that transfer direction */ - if((k->keepon & KEEP_RECVBITS) == KEEP_RECV) - fd_read = conn->sockfd; - else - fd_read = CURL_SOCKET_BAD; - - if(Curl_req_want_send(data)) - fd_write = conn->writesockfd; - else - fd_write = CURL_SOCKET_BAD; - - select_bits = Curl_socket_check(fd_read, CURL_SOCKET_BAD, fd_write, 0); - } - - if(select_bits == CURL_CSELECT_ERR) { - failf(data, "select/poll returned error"); - result = CURLE_SEND_ERROR; - goto out; - } #ifdef USE_HYPER - if(conn->datastream) { - result = conn->datastream(data, conn, &didwhat, select_bits); + if(data->conn->datastream) { + result = data->conn->datastream(data, data->conn, &didwhat, select_bits); if(result || data->req.done) goto out; } @@ -490,17 +456,15 @@ CURLcode Curl_readwrite(struct Curl_easy *data) the stream was rewound (in which case we have data in a buffer) */ if((k->keepon & KEEP_RECV) && (select_bits & CURL_CSELECT_IN)) { - result = readwrite_data(data, k, &didwhat); + result = sendrecv_dl(data, k, &didwhat); if(result || data->req.done) goto out; } /* If we still have writing to do, we check if we have a writable socket. */ - if((Curl_req_want_send(data) && (select_bits & CURL_CSELECT_OUT)) || - (k->keepon & KEEP_SEND_TIMED)) { - /* write */ - - result = readwrite_upload(data, &didwhat); + if((Curl_req_want_send(data) || (data->req.keepon & KEEP_SEND_TIMED)) && + (select_bits & CURL_CSELECT_OUT)) { + result = sendrecv_ul(data, &didwhat); if(result) goto out; } @@ -508,8 +472,8 @@ CURLcode Curl_readwrite(struct Curl_easy *data) } #endif - now = Curl_now(); - if(!didwhat) { + if(select_bits && !didwhat) { + /* Transfer wanted to send/recv, but nothing was possible. */ result = Curl_conn_ev_data_idle(data); if(result) goto out; @@ -518,23 +482,23 @@ CURLcode Curl_readwrite(struct Curl_easy *data) if(Curl_pgrsUpdate(data)) result = CURLE_ABORTED_BY_CALLBACK; else - result = Curl_speedcheck(data, now); + result = Curl_speedcheck(data, *nowp); if(result) goto out; if(k->keepon) { - if(0 > Curl_timeleft(data, &now, FALSE)) { + if(0 > Curl_timeleft(data, nowp, FALSE)) { if(k->size != -1) { failf(data, "Operation timed out after %" CURL_FORMAT_TIMEDIFF_T " milliseconds with %" CURL_FORMAT_CURL_OFF_T " out of %" CURL_FORMAT_CURL_OFF_T " bytes received", - Curl_timediff(now, data->progress.t_startsingle), + Curl_timediff(*nowp, data->progress.t_startsingle), k->bytecount, k->size); } else { failf(data, "Operation timed out after %" CURL_FORMAT_TIMEDIFF_T " milliseconds with %" CURL_FORMAT_CURL_OFF_T " bytes received", - Curl_timediff(now, data->progress.t_startsingle), + Curl_timediff(*nowp, data->progress.t_startsingle), k->bytecount); } result = CURLE_OPERATION_TIMEDOUT; @@ -573,7 +537,7 @@ CURLcode Curl_readwrite(struct Curl_easy *data) out: if(result) - DEBUGF(infof(data, "Curl_readwrite() -> %d", result)); + DEBUGF(infof(data, "Curl_sendrecv() -> %d", result)); return result; } diff --git a/lib/transfer.h b/lib/transfer.h index c41bdf3f74..8d6f98d750 100644 --- a/lib/transfer.h +++ b/lib/transfer.h @@ -44,7 +44,7 @@ typedef enum { CURLcode Curl_follow(struct Curl_easy *data, char *newurl, followtype type); -CURLcode Curl_readwrite(struct Curl_easy *data); +CURLcode Curl_sendrecv(struct Curl_easy *data, struct curltime *nowp); int Curl_single_getsock(struct Curl_easy *data, struct connectdata *conn, curl_socket_t *socks); CURLcode Curl_retry_request(struct Curl_easy *data, char **url); diff --git a/lib/url.c b/lib/url.c index abf9bc14ac..50e2074b8e 100644 --- a/lib/url.c +++ b/lib/url.c @@ -234,8 +234,6 @@ CURLcode Curl_close(struct Curl_easy **datap) data = *datap; *datap = NULL; - Curl_expire_clear(data); /* shut off timers */ - /* Detach connection if any is left. This should not be normal, but can be the case for example with CONNECT_ONLY + recv/send (test 556) */ Curl_detach_connection(data); @@ -253,6 +251,8 @@ CURLcode Curl_close(struct Curl_easy **datap) } } + Curl_expire_clear(data); /* shut off any timers left */ + data->magic = 0; /* force a clear AFTER the possibly enforced removal from the multi handle, since that function uses the magic field! */ diff --git a/tests/valgrind.supp b/tests/valgrind.supp index 6e570b1bf7..422dadf71a 100644 --- a/tests/valgrind.supp +++ b/tests/valgrind.supp @@ -14,7 +14,7 @@ fun:zstd_unencode_write fun:Curl_unencode_write fun:readwrite_data - fun:Curl_readwrite + fun:Curl_sendrecv fun:multi_runsingle fun:curl_multi_perform fun:easy_transfer @@ -31,7 +31,7 @@ Memcheck:Cond fun:ZSTD_decompressStream fun:zstd_unencode_write - fun:Curl_readwrite + fun:Curl_sendrecv fun:multi_runsingle fun:curl_multi_perform fun:curl_easy_perform