From 9b6148e9d95db54a752b03b571296c40d66e97fe Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Wed, 2 Apr 2025 12:25:21 +0200 Subject: [PATCH] async-threaded resolver: use ref counter Allocate the data shared between a transfer and an aync resolver thread separately and use a reference counter to determine its release. Change `Curl_thread_destroy()` to clear the thread handle, so that the thread is considered "gone" and we do not try to join (and fail to) afterwards. Retake of the revert in fb15a986c0d947ae6b9dd6 Closes #16916 --- lib/asyn-ares.c | 9 ++ lib/asyn-thread.c | 189 +++++++++++++++++++++++----------------- lib/asyn.h | 12 ++- lib/curl_threads.c | 21 ++--- lib/curl_threads.h | 2 +- lib/multi.c | 6 +- lib/socks.c | 12 +-- tests/libtest/lib3207.c | 2 +- 8 files changed, 146 insertions(+), 107 deletions(-) diff --git a/lib/asyn-ares.c b/lib/asyn-ares.c index 2bd0ffa027..a8bcaddab8 100644 --- a/lib/asyn-ares.c +++ b/lib/asyn-ares.c @@ -976,6 +976,15 @@ CURLcode Curl_set_dns_local_ip6(struct Curl_easy *data, return CURLE_NOT_BUILT_IN; #endif } + +void Curl_resolver_set_result(struct Curl_easy *data, + struct Curl_dns_entry *dnsentry) +{ + Curl_resolver_cancel(data); + data->state.async.dns = dnsentry; + data->state.async.done = TRUE; +} + #endif /* CURLRES_ARES */ #endif /* USE_ARES */ diff --git a/lib/asyn-thread.c b/lib/asyn-thread.c index 177cfc0c7d..128ee7d34a 100644 --- a/lib/asyn-thread.c +++ b/lib/asyn-thread.c @@ -143,28 +143,21 @@ void Curl_resolver_cancel(struct Curl_easy *data) destroy_async_data(data); } -/* This function is used to init a threaded resolve */ -static bool init_resolve_thread(struct Curl_easy *data, - const char *hostname, int port, - const struct addrinfo *hints); - - static struct thread_sync_data *conn_thread_sync_data(struct Curl_easy *data) { - return &(data->state.async.thdata.tsd); + return data->state.async.thdata.tsd; } /* Destroy resolver thread synchronization data */ static void destroy_thread_sync_data(struct thread_sync_data *tsd) { - Curl_mutex_destroy(&tsd->mutx); - - free(tsd->hostname); - - if(tsd->res) - Curl_freeaddrinfo(tsd->res); - + if(tsd) { + DEBUGASSERT(!tsd->ref_count); + Curl_mutex_destroy(&tsd->mutx); + free(tsd->hostname); + if(tsd->res) + Curl_freeaddrinfo(tsd->res); #ifndef CURL_DISABLE_SOCKETPAIR /* * close one end of the socket pair (may be done in resolver thread); @@ -176,7 +169,8 @@ void destroy_thread_sync_data(struct thread_sync_data *tsd) } #endif #endif - memset(tsd, 0, sizeof(*tsd)); + free(tsd); + } } /* Initialize resolver thread synchronization data */ @@ -186,16 +180,18 @@ int init_thread_sync_data(struct thread_data *td, int port, const struct addrinfo *hints) { - struct thread_sync_data *tsd = &td->tsd; + struct thread_sync_data *tsd; - memset(tsd, 0, sizeof(*tsd)); + DEBUGASSERT(!td->tsd); + tsd = calloc(1, sizeof(*tsd)); + if(!tsd) + return 0; - td->init = TRUE; tsd->port = port; - /* Treat the request as done until the thread actually starts so any early - * cleanup gets done properly. - */ - tsd->done = TRUE; + tsd->sock_pair[0] = CURL_SOCKET_BAD; + tsd->sock_pair[1] = CURL_SOCKET_BAD; + tsd->ref_count = 0; + #ifdef HAVE_GETADDRINFO DEBUGASSERT(hints); tsd->hints = *hints; @@ -222,6 +218,9 @@ int init_thread_sync_data(struct thread_data *td, if(!tsd->hostname) goto err_exit; + td->init = TRUE; + td->tsd = tsd; + tsd->ref_count = 1; return 1; err_exit: @@ -266,10 +265,10 @@ unsigned int #endif CURL_STDCALL getaddrinfo_thread(void *arg) { - struct thread_data *td = arg; - struct thread_sync_data *tsd = &td->tsd; + struct thread_sync_data *tsd = arg; char service[12]; int rc; + bool all_gone; msnprintf(service, sizeof(service), "%d", tsd->port); @@ -285,12 +284,8 @@ CURL_STDCALL getaddrinfo_thread(void *arg) } Curl_mutex_acquire(&tsd->mutx); - if(tsd->done) { - /* too late, gotta clean up the mess */ - Curl_mutex_release(&tsd->mutx); - destroy_thread_sync_data(tsd); - } - else { + if(tsd->ref_count > 1) { + /* Someone still waiting on our results. */ #ifndef CURL_DISABLE_SOCKETPAIR if(tsd->sock_pair[1] != CURL_SOCKET_BAD) { #ifdef USE_EVENTFD @@ -305,9 +300,13 @@ CURL_STDCALL getaddrinfo_thread(void *arg) } } #endif - tsd->done = TRUE; - Curl_mutex_release(&tsd->mutx); } + /* thread gives up its reference to the shared data now. */ + --tsd->ref_count; + all_gone = !tsd->ref_count; + Curl_mutex_release(&tsd->mutx); + if(all_gone) + destroy_thread_sync_data(tsd); return 0; } @@ -325,8 +324,8 @@ unsigned int #endif CURL_STDCALL gethostbyname_thread(void *arg) { - struct thread_data *td = arg; - struct thread_sync_data *tsd = &td->tsd; + struct thread_sync_data *tsd = arg; + bool all_gone; tsd->res = Curl_ipv4_resolve_r(tsd->hostname, tsd->port); @@ -337,15 +336,12 @@ CURL_STDCALL gethostbyname_thread(void *arg) } Curl_mutex_acquire(&tsd->mutx); - if(tsd->done) { - /* too late, gotta clean up the mess */ - Curl_mutex_release(&tsd->mutx); + /* thread gives up its reference to the shared data now. */ + --tsd->ref_count; + all_gone = !tsd->ref_count;; + Curl_mutex_release(&tsd->mutx); + if(all_gone) destroy_thread_sync_data(tsd); - } - else { - tsd->done = TRUE; - Curl_mutex_release(&tsd->mutx); - } return 0; } @@ -359,11 +355,9 @@ static void destroy_async_data(struct Curl_easy *data) { struct Curl_async *async = &data->state.async; struct thread_data *td = &async->thdata; + if(td->init) { bool done; -#ifndef CURL_DISABLE_SOCKETPAIR - curl_socket_t sock_rd = td->tsd.sock_pair[0]; -#endif #ifdef USE_HTTPSRR_ARES if(td->channel) { @@ -371,24 +365,32 @@ static void destroy_async_data(struct Curl_easy *data) td->channel = NULL; } #endif - /* - * if the thread is still blocking in the resolve syscall, detach it and - * let the thread do the cleanup... - */ - Curl_mutex_acquire(&td->tsd.mutx); - done = td->tsd.done; - td->tsd.done = TRUE; - Curl_mutex_release(&td->tsd.mutx); - - if(!done) { - Curl_thread_destroy(td->thread_hnd); - } - else { - if(td->thread_hnd != curl_thread_t_null) - Curl_thread_join(&td->thread_hnd); - destroy_thread_sync_data(&td->tsd); - } + if(td->tsd) { +#ifndef CURL_DISABLE_SOCKETPAIR + curl_socket_t sock_rd = td->tsd->sock_pair[0]; +#endif + /* Release our reference to the data shared with the thread. */ + Curl_mutex_acquire(&td->tsd->mutx); + --td->tsd->ref_count; + CURL_TRC_DNS(data, "resolve, destroy async data, shared ref=%d", + td->tsd->ref_count); + done = !td->tsd->ref_count; + Curl_mutex_release(&td->tsd->mutx); + + if(!done) { + /* thread is still running. Detach the thread, it will + * trigger the cleanup when it releases its reference. */ + Curl_thread_destroy(&td->thread_hnd); + } + else { + /* thread has released its reference, join it and + * release the memory we shared with it. */ + if(td->thread_hnd != curl_thread_t_null) + Curl_thread_join(&td->thread_hnd); + destroy_thread_sync_data(td->tsd); + } + td->tsd = NULL; #ifndef CURL_DISABLE_SOCKETPAIR /* * ensure CURLMOPT_SOCKETFUNCTION fires CURL_POLL_REMOVE @@ -397,10 +399,10 @@ static void destroy_async_data(struct Curl_easy *data) Curl_multi_will_close(data, sock_rd); wakeup_close(sock_rd); #endif + } td->init = FALSE; } - } #ifdef USE_HTTPSRR_ARES @@ -437,41 +439,51 @@ static bool init_resolve_thread(struct Curl_easy *data, int err = ENOMEM; struct Curl_async *async = &data->state.async; + if(async->done && td->tsd) { + CURL_TRC_DNS(data, "starting new resolve, with previous not cleaned up" + " for '%s:%d'", td->tsd->hostname, td->tsd->port); + destroy_async_data(data); + DEBUGASSERT(!td->tsd); + } + async->port = port; async->done = FALSE; async->dns = NULL; td->thread_hnd = curl_thread_t_null; td->start = Curl_now(); - if(!init_thread_sync_data(td, hostname, port, hints)) { - goto errno_exit; - } - - /* The thread will set this TRUE when complete. */ - td->tsd.done = FALSE; + if(!init_thread_sync_data(td, hostname, port, hints)) + goto err_exit; + DEBUGASSERT(td->tsd); + Curl_mutex_acquire(&td->tsd->mutx); + DEBUGASSERT(td->tsd->ref_count == 1); + /* passing td->tsd to the thread adds a reference */ + ++td->tsd->ref_count; #ifdef HAVE_GETADDRINFO - td->thread_hnd = Curl_thread_create(getaddrinfo_thread, td); + td->thread_hnd = Curl_thread_create(getaddrinfo_thread, td->tsd); #else - td->thread_hnd = Curl_thread_create(gethostbyname_thread, td); + td->thread_hnd = Curl_thread_create(gethostbyname_thread, td->tsd); #endif - if(td->thread_hnd == curl_thread_t_null) { - /* The thread never started, so mark it as done here for proper cleanup. */ - td->tsd.done = TRUE; + /* The thread never started, remove its reference that never happened. */ + --td->tsd->ref_count; err = errno; + Curl_mutex_release(&td->tsd->mutx); goto err_exit; } + #ifdef USE_HTTPSRR_ARES if(resolve_httpsrr(data, async)) infof(data, "Failed HTTPS RR operation"); #endif + Curl_mutex_release(&td->tsd->mutx); + CURL_TRC_DNS(data, "resolve thread started for of %s:%d", hostname, port); return TRUE; err_exit: + CURL_TRC_DNS(data, "resolve thread failed init: %d", err); destroy_async_data(data); - -errno_exit: CURL_SETERRNO(err); return FALSE; } @@ -491,6 +503,7 @@ static CURLcode thread_wait_resolv(struct Curl_easy *data, DEBUGASSERT(td); DEBUGASSERT(td->thread_hnd != curl_thread_t_null); + CURL_TRC_DNS(data, "resolve, wait for thread to finish"); /* wait for the thread to resolve the name */ if(Curl_thread_join(&td->thread_hnd)) { if(entry) @@ -571,10 +584,15 @@ CURLcode Curl_resolver_is_resolved(struct Curl_easy *data, (void)Curl_ares_perform(td->channel, 0); /* ignore errors */ #endif - Curl_mutex_acquire(&td->tsd.mutx); - done = td->tsd.done; - Curl_mutex_release(&td->tsd.mutx); + DEBUGASSERT(td->tsd); + if(!td->tsd) + return CURLE_FAILED_INIT; + + Curl_mutex_acquire(&td->tsd->mutx); + done = (td->tsd->ref_count == 1); + Curl_mutex_release(&td->tsd->mutx); + CURL_TRC_DNS(data, "resolve, thread %sfinished", done ? "" : "not "); if(done) { CURLcode result = td->result; getaddrinfo_complete(data); @@ -644,9 +662,9 @@ int Curl_resolver_getsock(struct Curl_easy *data, curl_socket_t *socks) } #endif #ifndef CURL_DISABLE_SOCKETPAIR - if(td->init) { + if(td->init && td->tsd) { /* return read fd to client for polling the DNS resolution status */ - socks[socketi] = td->tsd.sock_pair[0]; + socks[socketi] = td->tsd->sock_pair[0]; ret_val |= GETSOCK_READSOCK(socketi); } else @@ -704,6 +722,7 @@ struct Curl_addrinfo *Curl_resolver_getaddrinfo(struct Curl_easy *data, int pf = PF_INET; *waitp = 0; /* default to synchronous response */ + CURL_TRC_DNS(data, "init threaded resolve of %s:%d", hostname, port); #ifdef CURLRES_IPV6 if((data->conn->ip_version != CURL_IPRESOLVE_V4) && Curl_ipv6works(data)) { /* The stack seems to be IPv6-enabled */ @@ -732,6 +751,14 @@ struct Curl_addrinfo *Curl_resolver_getaddrinfo(struct Curl_easy *data, #endif /* !HAVE_GETADDRINFO */ +void Curl_resolver_set_result(struct Curl_easy *data, + struct Curl_dns_entry *dnsentry) +{ + destroy_async_data(data); + data->state.async.dns = dnsentry; + data->state.async.done = TRUE; +} + CURLcode Curl_set_dns_servers(struct Curl_easy *data, char *servers) { diff --git a/lib/asyn.h b/lib/asyn.h index 481c869bdb..ab7a973d15 100644 --- a/lib/asyn.h +++ b/lib/asyn.h @@ -51,7 +51,7 @@ struct thread_sync_data { #endif int port; int sock_error; - bool done; + int ref_count; }; struct thread_data { @@ -59,7 +59,7 @@ struct thread_data { unsigned int poll_interval; timediff_t interval_end; struct curltime start; - struct thread_sync_data tsd; + struct thread_sync_data *tsd; CURLcode result; /* CURLE_OK or error handling response */ #if defined(USE_HTTPSRR) && defined(USE_ARES) struct Curl_https_rrinfo hinfo; @@ -227,6 +227,13 @@ struct Curl_addrinfo *Curl_resolver_getaddrinfo(struct Curl_easy *data, int port, int *waitp); +/* + * Set `dnsentry` as result of resolve operation, replacing any + * ongoing resolve attempts. + */ +void Curl_resolver_set_result(struct Curl_easy *data, + struct Curl_dns_entry *dnsentry); + #ifndef CURLRES_ASYNCH /* convert these functions if an asynch resolver is not used */ #define Curl_resolver_cancel(x) Curl_nop_stmt @@ -237,6 +244,7 @@ struct Curl_addrinfo *Curl_resolver_getaddrinfo(struct Curl_easy *data, #define Curl_resolver_init(x,y) CURLE_OK #define Curl_resolver_global_init() CURLE_OK #define Curl_resolver_global_cleanup() Curl_nop_stmt +#define Curl_resolver_set_result(x,y) Curl_nop_stmt #define Curl_resolver_cleanup(x) Curl_nop_stmt #endif diff --git a/lib/curl_threads.c b/lib/curl_threads.c index 54f0dcee49..36ae32c11a 100644 --- a/lib/curl_threads.c +++ b/lib/curl_threads.c @@ -82,11 +82,12 @@ err: return curl_thread_t_null; } -void Curl_thread_destroy(curl_thread_t hnd) +void Curl_thread_destroy(curl_thread_t *hnd) { - if(hnd != curl_thread_t_null) { - pthread_detach(*hnd); - free(hnd); + if(*hnd != curl_thread_t_null) { + pthread_detach(**hnd); + free(*hnd); + *hnd = curl_thread_t_null; } } @@ -138,10 +139,12 @@ curl_thread_t Curl_thread_create( return t; } -void Curl_thread_destroy(curl_thread_t hnd) +void Curl_thread_destroy(curl_thread_t *hnd) { - if(hnd != curl_thread_t_null) - CloseHandle(hnd); + if(*hnd != curl_thread_t_null) { + CloseHandle(*hnd); + *hnd = curl_thread_t_null; + } } int Curl_thread_join(curl_thread_t *hnd) @@ -153,9 +156,7 @@ int Curl_thread_join(curl_thread_t *hnd) int ret = (WaitForSingleObjectEx(*hnd, INFINITE, FALSE) == WAIT_OBJECT_0); #endif - Curl_thread_destroy(*hnd); - - *hnd = curl_thread_t_null; + Curl_thread_destroy(hnd); return ret; } diff --git a/lib/curl_threads.h b/lib/curl_threads.h index 0b334034ce..b060d4acd3 100644 --- a/lib/curl_threads.h +++ b/lib/curl_threads.h @@ -61,7 +61,7 @@ curl_thread_t Curl_thread_create( (CURL_STDCALL *func) (void *), void *arg); -void Curl_thread_destroy(curl_thread_t hnd); +void Curl_thread_destroy(curl_thread_t *hnd); int Curl_thread_join(curl_thread_t *hnd); diff --git a/lib/multi.c b/lib/multi.c index 02ec6b374f..db2bfdcfaa 100644 --- a/lib/multi.c +++ b/lib/multi.c @@ -2070,10 +2070,8 @@ static CURLMcode state_resolving(struct Curl_multi *multi, dns = Curl_fetch_addr(data, hostname, conn->primary.remote_port); if(dns) { -#ifdef USE_CURL_ASYNC - data->state.async.dns = dns; - data->state.async.done = TRUE; -#endif + /* Tell a possibly async resolver we no longer need the results. */ + Curl_resolver_set_result(data, dns); result = CURLE_OK; infof(data, "Hostname '%s' was found in DNS cache", hostname); } diff --git a/lib/socks.c b/lib/socks.c index 3a88f0b386..431f9d57bd 100644 --- a/lib/socks.c +++ b/lib/socks.c @@ -344,10 +344,8 @@ static CURLproxycode do_SOCKS4(struct Curl_cfilter *cf, dns = Curl_fetch_addr(data, sx->hostname, conn->primary.remote_port); if(dns) { -#ifdef USE_CURL_ASYNC - data->state.async.dns = dns; - data->state.async.done = TRUE; -#endif + /* Tell a possibly async resolver we no longer need the results. */ + Curl_resolver_set_result(data, dns); infof(data, "Hostname '%s' was found", sx->hostname); sxstate(sx, data, CONNECT_RESOLVED); } @@ -814,10 +812,8 @@ CONNECT_REQ_INIT: dns = Curl_fetch_addr(data, sx->hostname, sx->remote_port); if(dns) { -#ifdef USE_CURL_ASYNC - data->state.async.dns = dns; - data->state.async.done = TRUE; -#endif + /* Tell a possibly async resolver we no longer need the results. */ + Curl_resolver_set_result(data, dns); infof(data, "SOCKS5: hostname '%s' found", sx->hostname); } diff --git a/tests/libtest/lib3207.c b/tests/libtest/lib3207.c index adb31aa264..ca87e0061f 100644 --- a/tests/libtest/lib3207.c +++ b/tests/libtest/lib3207.c @@ -161,7 +161,7 @@ static void execute(CURLSH *share, struct Ctx *ctx) for(i = 0; i < THREAD_SIZE; i++) { if(thread[i]) { Curl_thread_join(&thread[i]); - Curl_thread_destroy(thread[i]); + Curl_thread_destroy(&thread[i]); } } curl_share_setopt(share, CURLSHOPT_LOCKFUNC, NULL); -- 2.47.2