From: Stefan Eissing Date: Thu, 6 Jun 2024 10:40:38 +0000 (+0200) Subject: multi: prepare multi_wait() for future shutdown usage X-Git-Tag: curl-8_9_0~274 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=374d178f141b2b2ad2676e0700da7aa32a713990;p=thirdparty%2Fcurl.git multi: prepare multi_wait() for future shutdown usage - new struct curl_pollfds and struct curl_waitfds - add structs and methods to init/add/cleanup an array of pollfd and struct curl_waitfd. Use in multi_wait() and multi_waitfds() to populate the sets for polling. - place USE_WINSOCK WSAEventSelect() setting into a separate loop over all collected pfds Closes #13900 --- diff --git a/lib/multi.c b/lib/multi.c index 4f6f6d3f77..88841c71c7 100644 --- a/lib/multi.c +++ b/lib/multi.c @@ -1227,12 +1227,8 @@ CURLMcode curl_multi_waitfds(struct Curl_multi *multi, unsigned int *fd_count) { struct Curl_easy *data; - unsigned int nfds = 0; + struct curl_waitfds cwfds; struct easy_pollset ps; - unsigned int i; - CURLMcode result = CURLM_OK; - struct curl_waitfd *ufd; - unsigned int j; if(!ufds) return CURLM_BAD_FUNCTION_ARGUMENT; @@ -1243,44 +1239,17 @@ CURLMcode curl_multi_waitfds(struct Curl_multi *multi, if(multi->in_callback) return CURLM_RECURSIVE_API_CALL; + Curl_waitfds_init(&cwfds, ufds, size); memset(&ps, 0, sizeof(ps)); for(data = multi->easyp; data; data = data->next) { multi_getsock(data, &ps); - - for(i = 0; i < ps.num; i++) { - if(nfds < size) { - curl_socket_t fd = ps.sockets[i]; - int fd_idx = -1; - - /* Simple linear search to skip an already added descriptor */ - for(j = 0; j < nfds; j++) { - if(ufds[j].fd == fd) { - fd_idx = (int)j; - break; - } - } - - if(fd_idx < 0) { - ufd = &ufds[nfds++]; - ufd->fd = ps.sockets[i]; - ufd->events = 0; - } - else - ufd = &ufds[fd_idx]; - - if(ps.actions[i] & CURL_POLL_IN) - ufd->events |= CURL_WAIT_POLLIN; - if(ps.actions[i] & CURL_POLL_OUT) - ufd->events |= CURL_WAIT_POLLOUT; - } - else - return CURLM_OUT_OF_MEMORY; - } + if(Curl_waitfds_add_ps(&cwfds, &ps)) + return CURLM_OUT_OF_MEMORY; } if(fd_count) - *fd_count = nfds; - return result; + *fd_count = cwfds.n; + return CURLM_OK; } #ifdef USE_WINSOCK @@ -1299,29 +1268,6 @@ static void reset_socket_fdwrite(curl_socket_t s) } #endif -static CURLMcode ufds_increase(struct pollfd **pfds, unsigned int *pfds_len, - unsigned int inc, bool *is_malloced) -{ - struct pollfd *new_fds, *old_fds = *pfds; - unsigned int new_len = *pfds_len + inc; - - new_fds = calloc(new_len, sizeof(struct pollfd)); - if(!new_fds) { - if(*is_malloced) - free(old_fds); - *pfds = NULL; - *pfds_len = 0; - return CURLM_OUT_OF_MEMORY; - } - memcpy(new_fds, old_fds, (*pfds_len) * sizeof(struct pollfd)); - if(*is_malloced) - free(old_fds); - *pfds = new_fds; - *pfds_len = new_len; - *is_malloced = TRUE; - return CURLM_OK; -} - #define NUM_POLLS_ON_STACK 10 static CURLMcode multi_wait(struct Curl_multi *multi, @@ -1338,10 +1284,8 @@ static CURLMcode multi_wait(struct Curl_multi *multi, long timeout_internal; int retcode = 0; struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK]; - struct pollfd *ufds = &a_few_on_stack[0]; - unsigned int ufds_len = NUM_POLLS_ON_STACK; - unsigned int nfds = 0, curl_nfds = 0; /* how many ufds are in use */ - bool ufds_malloc = FALSE; + struct curl_pollfds cpfds; + unsigned int curl_nfds = 0; /* how many pfds are for curl transfers */ #ifdef USE_WINSOCK WSANETWORKEVENTS wsa_events; DEBUGASSERT(multi->wsa_event != WSA_INVALID_EVENT); @@ -1359,105 +1303,62 @@ static CURLMcode multi_wait(struct Curl_multi *multi, if(timeout_ms < 0) return CURLM_BAD_FUNCTION_ARGUMENT; - memset(ufds, 0, ufds_len * sizeof(struct pollfd)); + Curl_pollfds_init(&cpfds, a_few_on_stack, NUM_POLLS_ON_STACK); memset(&ps, 0, sizeof(ps)); /* Add the curl handles to our pollfds first */ for(data = multi->easyp; data; data = data->next) { multi_getsock(data, &ps); - - for(i = 0; i < ps.num; i++) { - short events = 0; -#ifdef USE_WINSOCK - long mask = 0; -#endif - if(ps.actions[i] & CURL_POLL_IN) { -#ifdef USE_WINSOCK - mask |= FD_READ|FD_ACCEPT|FD_CLOSE; -#endif - events |= POLLIN; - } - if(ps.actions[i] & CURL_POLL_OUT) { -#ifdef USE_WINSOCK - mask |= FD_WRITE|FD_CONNECT|FD_CLOSE; - reset_socket_fdwrite(ps.sockets[i]); -#endif - events |= POLLOUT; - } - if(events) { - if(nfds && ps.sockets[i] == ufds[nfds-1].fd) { - ufds[nfds-1].events |= events; - } - else { - if(nfds >= ufds_len) { - if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc)) - return CURLM_OUT_OF_MEMORY; - } - DEBUGASSERT(nfds < ufds_len); - ufds[nfds].fd = ps.sockets[i]; - ufds[nfds].events = events; - ++nfds; - } - } -#ifdef USE_WINSOCK - if(mask) { - if(WSAEventSelect(ps.sockets[i], multi->wsa_event, mask) != 0) { - if(ufds_malloc) - free(ufds); - return CURLM_INTERNAL_ERROR; - } - } -#endif + if(Curl_pollfds_add_ps(&cpfds, &ps)) { + Curl_pollfds_cleanup(&cpfds); + return CURLM_OUT_OF_MEMORY; } } - curl_nfds = nfds; /* what curl internally used in ufds */ - + curl_nfds = cpfds.n; /* what curl internally uses in cpfds */ /* Add external file descriptions from poll-like struct curl_waitfd */ for(i = 0; i < extra_nfds; i++) { + unsigned short events = 0; + if(extra_fds[i].events & CURL_WAIT_POLLIN) + events |= POLLIN; + if(extra_fds[i].events & CURL_WAIT_POLLPRI) + events |= POLLPRI; + if(extra_fds[i].events & CURL_WAIT_POLLOUT) + events |= POLLOUT; + if(Curl_pollfds_add_sock(&cpfds, extra_fds[i].fd, events)) { + Curl_pollfds_cleanup(&cpfds); + return CURLM_OUT_OF_MEMORY; + } + } + #ifdef USE_WINSOCK + /* Set the WSA events based on the collected pollds */ + for(i = 0; i < cpfds.n; i++) { long mask = 0; - if(extra_fds[i].events & CURL_WAIT_POLLIN) + if(cpfds.pfds[i].events & POLLIN) mask |= FD_READ|FD_ACCEPT|FD_CLOSE; - if(extra_fds[i].events & CURL_WAIT_POLLPRI) + if(cpfds.pfds[i].events & POLLPRI) mask |= FD_OOB; - if(extra_fds[i].events & CURL_WAIT_POLLOUT) { + if(cpfds.pfds[i].events & POLLOUT) { mask |= FD_WRITE|FD_CONNECT|FD_CLOSE; - reset_socket_fdwrite(extra_fds[i].fd); - } - if(WSAEventSelect(extra_fds[i].fd, multi->wsa_event, mask) != 0) { - if(ufds_malloc) - free(ufds); - return CURLM_INTERNAL_ERROR; + reset_socket_fdwrite(cpfds.pfds[i].fd); } -#endif - if(nfds >= ufds_len) { - if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc)) - return CURLM_OUT_OF_MEMORY; + if(mask) { + if(WSAEventSelect(cpfds.pfds[i].fd, multi->wsa_event, mask) != 0) { + Curl_pollfds_cleanup(&cpfds); + return CURLM_INTERNAL_ERROR; + } } - DEBUGASSERT(nfds < ufds_len); - ufds[nfds].fd = extra_fds[i].fd; - ufds[nfds].events = 0; - if(extra_fds[i].events & CURL_WAIT_POLLIN) - ufds[nfds].events |= POLLIN; - if(extra_fds[i].events & CURL_WAIT_POLLPRI) - ufds[nfds].events |= POLLPRI; - if(extra_fds[i].events & CURL_WAIT_POLLOUT) - ufds[nfds].events |= POLLOUT; - ++nfds; } +#endif #ifdef ENABLE_WAKEUP #ifndef USE_WINSOCK if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { - if(nfds >= ufds_len) { - if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc)) - return CURLM_OUT_OF_MEMORY; + if(Curl_pollfds_add_sock(&cpfds, multi->wakeup_pair[0], POLLIN)) { + Curl_pollfds_cleanup(&cpfds); + return CURLM_OUT_OF_MEMORY; } - DEBUGASSERT(nfds < ufds_len); - ufds[nfds].fd = multi->wakeup_pair[0]; - ufds[nfds].events = POLLIN; - ++nfds; } #endif #endif @@ -1471,21 +1372,23 @@ static CURLMcode multi_wait(struct Curl_multi *multi, timeout_ms = (int)timeout_internal; #if defined(ENABLE_WAKEUP) && defined(USE_WINSOCK) - if(nfds || use_wakeup) { + if(cpfds.n || use_wakeup) { #else - if(nfds) { + if(cpfds.n) { #endif int pollrc; #ifdef USE_WINSOCK - if(nfds) - pollrc = Curl_poll(ufds, nfds, 0); /* just pre-check with WinSock */ + if(cpfds.n) /* just pre-check with WinSock */ + pollrc = Curl_poll(cpfds.pfds, cpfds.n, 0); else pollrc = 0; #else - pollrc = Curl_poll(ufds, nfds, timeout_ms); /* wait... */ + pollrc = Curl_poll(cpfds.pfds, cpfds.n, timeout_ms); /* wait... */ #endif - if(pollrc < 0) + if(pollrc < 0) { + Curl_pollfds_cleanup(&cpfds); return CURLM_UNRECOVERABLE_POLL; + } if(pollrc > 0) { retcode = pollrc; @@ -1503,7 +1406,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi, struct, the bit values of the actual underlying poll() implementation may not be the same as the ones in the public libcurl API! */ for(i = 0; i < extra_nfds; i++) { - unsigned r = (unsigned)ufds[curl_nfds + i].revents; + unsigned r = (unsigned)cpfds.pfds[curl_nfds + i].revents; unsigned short mask = 0; #ifdef USE_WINSOCK curl_socket_t s = extra_fds[i].fd; @@ -1557,7 +1460,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi, #else #ifdef ENABLE_WAKEUP if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { - if(ufds[curl_nfds + extra_nfds].revents & POLLIN) { + if(cpfds.pfds[curl_nfds + extra_nfds].revents & POLLIN) { char buf[64]; ssize_t nread; while(1) { @@ -1581,14 +1484,12 @@ static CURLMcode multi_wait(struct Curl_multi *multi, } } - if(ufds_malloc) - free(ufds); if(ret) *ret = retcode; #if defined(ENABLE_WAKEUP) && defined(USE_WINSOCK) - if(extrawait && !nfds && !use_wakeup) { + if(extrawait && !cpfds.n && !use_wakeup) { #else - if(extrawait && !nfds) { + if(extrawait && !cpfds.n) { #endif long sleep_ms = 0; @@ -1604,6 +1505,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi, } } + Curl_pollfds_cleanup(&cpfds); return CURLM_OK; } diff --git a/lib/select.c b/lib/select.c index 1a5ab8572c..d5127d7432 100644 --- a/lib/select.c +++ b/lib/select.c @@ -47,6 +47,10 @@ #include "select.h" #include "timediff.h" #include "warnless.h" +/* The last 3 #include files should be in this order */ +#include "curl_printf.h" +#include "curl_memory.h" +#include "memdebug.h" /* * Internal function used for waiting a specific amount of ms @@ -401,3 +405,147 @@ int Curl_poll(struct pollfd ufds[], unsigned int nfds, timediff_t timeout_ms) return r; } + +void Curl_pollfds_init(struct curl_pollfds *cpfds, + struct pollfd *static_pfds, + unsigned int static_count) +{ + DEBUGASSERT(cpfds); + memset(cpfds, 0, sizeof(*cpfds)); + if(static_pfds && static_count) { + cpfds->pfds = static_pfds; + cpfds->count = static_count; + } +} + +void Curl_pollfds_cleanup(struct curl_pollfds *cpfds) +{ + DEBUGASSERT(cpfds); + if(cpfds->allocated_pfds) { + free(cpfds->pfds); + } + memset(cpfds, 0, sizeof(*cpfds)); +} + +static CURLcode cpfds_increase(struct curl_pollfds *cpfds, unsigned int inc) +{ + struct pollfd *new_fds; + unsigned int new_count = cpfds->count + inc; + + new_fds = calloc(new_count, sizeof(struct pollfd)); + if(!new_fds) + return CURLE_OUT_OF_MEMORY; + + memcpy(new_fds, cpfds->pfds, cpfds->count * sizeof(struct pollfd)); + if(cpfds->allocated_pfds) + free(cpfds->pfds); + cpfds->pfds = new_fds; + cpfds->count = new_count; + cpfds->allocated_pfds = TRUE; + return CURLE_OK; +} + +static CURLcode cpfds_add_sock(struct curl_pollfds *cpfds, + curl_socket_t sock, short events, bool fold) +{ + int i; + + if(fold && cpfds->n <= INT_MAX) { + for(i = (int)cpfds->n - 1; i >= 0; --i) { + if(sock == cpfds->pfds[i].fd) { + cpfds->pfds[i].events |= events; + return CURLE_OK; + } + } + } + /* not folded, add new entry */ + if(cpfds->n >= cpfds->count) { + if(cpfds_increase(cpfds, 100)) + return CURLE_OUT_OF_MEMORY; + } + cpfds->pfds[cpfds->n].fd = sock; + cpfds->pfds[cpfds->n].events = events; + ++cpfds->n; + return CURLE_OK; +} + +CURLcode Curl_pollfds_add_sock(struct curl_pollfds *cpfds, + curl_socket_t sock, short events) +{ + return cpfds_add_sock(cpfds, sock, events, FALSE); +} + +CURLcode Curl_pollfds_add_ps(struct curl_pollfds *cpfds, + struct easy_pollset *ps) +{ + size_t i; + + DEBUGASSERT(cpfds); + DEBUGASSERT(ps); + for(i = 0; i < ps->num; i++) { + short events = 0; + if(ps->actions[i] & CURL_POLL_IN) + events |= POLLIN; + if(ps->actions[i] & CURL_POLL_OUT) + events |= POLLOUT; + if(events) { + if(cpfds_add_sock(cpfds, ps->sockets[i], events, TRUE)) + return CURLE_OUT_OF_MEMORY; + } + } + return CURLE_OK; +} + +void Curl_waitfds_init(struct curl_waitfds *cwfds, + struct curl_waitfd *static_wfds, + unsigned int static_count) +{ + DEBUGASSERT(cwfds); + DEBUGASSERT(static_wfds); + memset(cwfds, 0, sizeof(*cwfds)); + cwfds->wfds = static_wfds; + cwfds->count = static_count; +} + +static CURLcode cwfds_add_sock(struct curl_waitfds *cwfds, + curl_socket_t sock, short events) +{ + int i; + + if(cwfds->n <= INT_MAX) { + for(i = (int)cwfds->n - 1; i >= 0; --i) { + if(sock == cwfds->wfds[i].fd) { + cwfds->wfds[i].events |= events; + return CURLE_OK; + } + } + } + /* not folded, add new entry */ + if(cwfds->n >= cwfds->count) + return CURLE_OUT_OF_MEMORY; + cwfds->wfds[cwfds->n].fd = sock; + cwfds->wfds[cwfds->n].events = events; + ++cwfds->n; + return CURLE_OK; +} + +CURLcode Curl_waitfds_add_ps(struct curl_waitfds *cwfds, + struct easy_pollset *ps) +{ + size_t i; + + DEBUGASSERT(cwfds); + DEBUGASSERT(ps); + for(i = 0; i < ps->num; i++) { + short events = 0; + if(ps->actions[i] & CURL_POLL_IN) + events |= CURL_WAIT_POLLIN; + if(ps->actions[i] & CURL_POLL_OUT) + events |= CURL_WAIT_POLLOUT; + if(events) { + if(cwfds_add_sock(cwfds, ps->sockets[i], events)) + return CURLE_OUT_OF_MEMORY; + } + } + return CURLE_OK; +} diff --git a/lib/select.h b/lib/select.h index 5b1ca23eb1..f01acbdefc 100644 --- a/lib/select.h +++ b/lib/select.h @@ -111,4 +111,37 @@ int Curl_wait_ms(timediff_t timeout_ms); } while(0) #endif +struct curl_pollfds { + struct pollfd *pfds; + unsigned int n; + unsigned int count; + BIT(allocated_pfds); +}; + +void Curl_pollfds_init(struct curl_pollfds *cpfds, + struct pollfd *static_pfds, + unsigned int static_count); + +void Curl_pollfds_cleanup(struct curl_pollfds *cpfds); + +CURLcode Curl_pollfds_add_ps(struct curl_pollfds *cpfds, + struct easy_pollset *ps); + +CURLcode Curl_pollfds_add_sock(struct curl_pollfds *cpfds, + curl_socket_t sock, short events); + +struct curl_waitfds { + struct curl_waitfd *wfds; + unsigned int n; + unsigned int count; +}; + +void Curl_waitfds_init(struct curl_waitfds *cwfds, + struct curl_waitfd *static_wfds, + unsigned int static_count); + +CURLcode Curl_waitfds_add_ps(struct curl_waitfds *cwfds, + struct easy_pollset *ps); + + #endif /* HEADER_CURL_SELECT_H */