From 21ecc7e376e449685bd40ccd6b91564af7637402 Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Mon, 30 Jun 2025 08:53:31 +0200 Subject: [PATCH] cf-socket: make socket data_pending a nop Eliminating the socket readability check in the socket connection filters for the 'data_pending' callback. Improves performance of handling of transfers, up to ~30%, depending on parallelism and response size. Whatever `data_pending()` once was, its semantics are now: "Is there anything buffered in the connection filters that needs receive?" Any checks of the socket's readability are done via `multi_wait()` and friends. Fix the one place in HTTP/1 proxy code that checked `data_pending()` and did an early return if false. Remove that check and actually try to receive data every time. Closes #17785 --- lib/cf-h1-proxy.c | 3 - lib/cf-socket.c | 19 ++----- lib/vquic/curl_ngtcp2.c | 14 +---- tests/http/scorecard.py | 114 +++++++++++++++++++++++-------------- tests/http/testenv/curl.py | 31 +++++++++- 5 files changed, 106 insertions(+), 75 deletions(-) diff --git a/lib/cf-h1-proxy.c b/lib/cf-h1-proxy.c index 38d6d14a9d..56c4d88d0b 100644 --- a/lib/cf-h1-proxy.c +++ b/lib/cf-h1-proxy.c @@ -374,9 +374,6 @@ static CURLcode recv_CONNECT_resp(struct Curl_cfilter *cf, error = SELECT_OK; *done = FALSE; - if(!Curl_conn_data_pending(data, cf->sockindex)) - return CURLE_OK; - while(ts->keepon) { size_t nread; char byte; diff --git a/lib/cf-socket.c b/lib/cf-socket.c index 87e7dd412f..4187a6b2cf 100644 --- a/lib/cf-socket.c +++ b/lib/cf-socket.c @@ -1404,17 +1404,6 @@ static void cf_socket_adjust_pollset(struct Curl_cfilter *cf, } } -static bool cf_socket_data_pending(struct Curl_cfilter *cf, - const struct Curl_easy *data) -{ - struct cf_socket_ctx *ctx = cf->ctx; - int readable; - - (void)data; - readable = SOCKET_READABLE(ctx->sock, 0); - return readable > 0 && (readable & CURL_CSELECT_IN); -} - #ifdef USE_WINSOCK #ifndef SIO_IDEAL_SEND_BACKLOG_QUERY @@ -1750,7 +1739,7 @@ struct Curl_cftype Curl_cft_tcp = { cf_socket_close, cf_socket_shutdown, cf_socket_adjust_pollset, - cf_socket_data_pending, + Curl_cf_def_data_pending, cf_socket_send, cf_socket_recv, cf_socket_cntrl, @@ -1904,7 +1893,7 @@ struct Curl_cftype Curl_cft_udp = { cf_socket_close, cf_socket_shutdown, cf_socket_adjust_pollset, - cf_socket_data_pending, + Curl_cf_def_data_pending, cf_socket_send, cf_socket_recv, cf_socket_cntrl, @@ -1958,7 +1947,7 @@ struct Curl_cftype Curl_cft_unix = { cf_socket_close, cf_socket_shutdown, cf_socket_adjust_pollset, - cf_socket_data_pending, + Curl_cf_def_data_pending, cf_socket_send, cf_socket_recv, cf_socket_cntrl, @@ -2178,7 +2167,7 @@ struct Curl_cftype Curl_cft_tcp_accept = { cf_socket_close, cf_socket_shutdown, cf_socket_adjust_pollset, - cf_socket_data_pending, + Curl_cf_def_data_pending, cf_socket_send, cf_socket_recv, cf_socket_cntrl, diff --git a/lib/vquic/curl_ngtcp2.c b/lib/vquic/curl_ngtcp2.c index d91cd6b083..f6046c717e 100644 --- a/lib/vquic/curl_ngtcp2.c +++ b/lib/vquic/curl_ngtcp2.c @@ -1941,18 +1941,6 @@ out: return CURLE_OK; } -/* - * Called from transfer.c:data_pending to know if we should keep looping - * to receive more data from the connection. - */ -static bool cf_ngtcp2_data_pending(struct Curl_cfilter *cf, - const struct Curl_easy *data) -{ - (void)cf; - (void)data; - return FALSE; -} - static CURLcode h3_data_pause(struct Curl_cfilter *cf, struct Curl_easy *data, bool pause) @@ -2728,7 +2716,7 @@ struct Curl_cftype Curl_cft_http3 = { cf_ngtcp2_close, cf_ngtcp2_shutdown, cf_ngtcp2_adjust_pollset, - cf_ngtcp2_data_pending, + Curl_cf_def_data_pending, cf_ngtcp2_send, cf_ngtcp2_recv, cf_ngtcp2_data_event, diff --git a/tests/http/scorecard.py b/tests/http/scorecard.py index 1966d9285c..51a183a162 100644 --- a/tests/http/scorecard.py +++ b/tests/http/scorecard.py @@ -185,7 +185,8 @@ class ScoreRunner: verbose: int, curl_verbose: int, download_parallel: int = 0, - server_addr: Optional[str] = None): + server_addr: Optional[str] = None, + with_dtrace: bool = False): self.verbose = verbose self.env = env self.protocol = protocol @@ -194,12 +195,18 @@ class ScoreRunner: self.server_port = server_port self._silent_curl = not curl_verbose self._download_parallel = download_parallel + self._with_dtrace = with_dtrace def info(self, msg): if self.verbose > 0: sys.stderr.write(msg) sys.stderr.flush() + def mk_curl_client(self): + return CurlClient(env=self.env, silent=self._silent_curl, + server_addr=self.server_addr, + with_dtrace=self._with_dtrace) + def handshakes(self) -> Dict[str, Any]: props = {} sample_size = 5 @@ -215,8 +222,7 @@ class ScoreRunner: hs_samples = [] errors = [] for _ in range(sample_size): - curl = CurlClient(env=self.env, silent=self._silent_curl, - server_addr=self.server_addr) + curl = self.mk_curl_client() args = [ '--http3-only' if self.protocol == 'h3' else '--http2', f'--{ipv}', f'https://{authority}/' @@ -274,8 +280,7 @@ class ScoreRunner: profiles = [] self.info('single...') for _ in range(nsamples): - curl = CurlClient(env=self.env, silent=self._silent_curl, - server_addr=self.server_addr) + curl = self.mk_curl_client() r = curl.http_download(urls=[url], alpn_proto=self.protocol, no_save=True, with_headers=False, with_profile=True) @@ -295,8 +300,7 @@ class ScoreRunner: url = f'{url}?[0-{count - 1}]' self.info('serial...') for _ in range(nsamples): - curl = CurlClient(env=self.env, silent=self._silent_curl, - server_addr=self.server_addr) + curl = self.mk_curl_client() r = curl.http_download(urls=[url], alpn_proto=self.protocol, no_save=True, with_headers=False, with_profile=True) @@ -317,8 +321,7 @@ class ScoreRunner: url = f'{url}?[0-{count - 1}]' self.info('parallel...') for _ in range(nsamples): - curl = CurlClient(env=self.env, silent=self._silent_curl, - server_addr=self.server_addr) + curl = self.mk_curl_client() r = curl.http_download(urls=[url], alpn_proto=self.protocol, no_save=True, with_headers=False, @@ -339,9 +342,12 @@ class ScoreRunner: def downloads(self, count: int, fsizes: List[int], meta: Dict[str, Any]) -> Dict[str, Any]: nsamples = meta['samples'] max_parallel = self._download_parallel if self._download_parallel > 0 else count - cols = ['size', 'single'] + cols = ['size'] + if not self._download_parallel: + cols.append('single') + if count > 1: + cols.append(f'serial({count})') if count > 1: - cols.append(f'serial({count})') cols.append(f'parallel({count}x{max_parallel})') rows = [] for fsize in fsizes: @@ -351,10 +357,11 @@ class ScoreRunner: }] self.info(f'{row[0]["sval"]} downloads...') url = f'https://{self.env.domain1}:{self.server_port}/score{row[0]["sval"]}.data' - - row.append(self.dl_single(url=url, nsamples=nsamples)) + if 'single' in cols: + row.append(self.dl_single(url=url, nsamples=nsamples)) if count > 1: - row.append(self.dl_serial(url=url, count=count, nsamples=nsamples)) + if 'single' in cols: + row.append(self.dl_serial(url=url, count=count, nsamples=nsamples)) row.append(self.dl_parallel(url=url, count=count, nsamples=nsamples)) rows.append(row) self.info('done.\n') @@ -387,8 +394,7 @@ class ScoreRunner: profiles = [] self.info('single...') for _ in range(nsamples): - curl = CurlClient(env=self.env, silent=self._silent_curl, - server_addr=self.server_addr) + curl = self.mk_curl_client() r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=self.protocol, with_headers=False, with_profile=True) err = self._check_uploads(r, 1) @@ -407,8 +413,7 @@ class ScoreRunner: url = f'{url}?id=[0-{count - 1}]' self.info('serial...') for _ in range(nsamples): - curl = CurlClient(env=self.env, silent=self._silent_curl, - server_addr=self.server_addr) + curl = self.mk_curl_client() r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=self.protocol, with_headers=False, with_profile=True) err = self._check_uploads(r, count) @@ -428,8 +433,7 @@ class ScoreRunner: url = f'{url}?id=[0-{count - 1}]' self.info('parallel...') for _ in range(nsamples): - curl = CurlClient(env=self.env, silent=self._silent_curl, - server_addr=self.server_addr) + curl = self.mk_curl_client() r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=self.protocol, with_headers=False, with_profile=True, extra_args=[ @@ -494,8 +498,7 @@ class ScoreRunner: ]) self.info(f'{max_parallel}...') for _ in range(nsamples): - curl = CurlClient(env=self.env, silent=self._silent_curl, - server_addr=self.server_addr) + curl = self.mk_curl_client() r = curl.http_download(urls=[url], alpn_proto=self.protocol, no_save=True, with_headers=False, with_profile=True, with_stats=False, extra_args=extra_args) @@ -517,7 +520,8 @@ class ScoreRunner: fsize = 10*1024 cols = ['size', 'total'] rows = [] - cols.extend([f'{mp} max' for mp in [1, 6, 25, 50, 100, 300]]) + mparallel = meta['request_parallels'] + cols.extend([f'{mp} max' for mp in mparallel]) row = [{ 'val': fsize, 'sval': Card.fmt_size(fsize) @@ -528,7 +532,7 @@ class ScoreRunner: self.info('requests, max parallel...') row.extend([self.do_requests(url=url, count=count, max_parallel=mp, nsamples=meta["samples"]) - for mp in [1, 6, 25, 50, 100, 300]]) + for mp in mparallel]) rows.append(row) self.info('done.\n') return { @@ -547,6 +551,7 @@ class ScoreRunner: uploads: Optional[List[int]] = None, upload_count: int = 50, req_count=5000, + request_parallels=None, nsamples: int = 1, requests: bool = True): self.info(f"scoring {self.protocol} against {self.server_descr}\n") @@ -599,6 +604,9 @@ class ScoreRunner: fsizes=uploads, meta=score['meta']) if requests: + if request_parallels is None: + request_parallels = [1, 6, 25, 50, 100, 300] + score['meta']['request_parallels'] = request_parallels score['requests'] = self.requests(count=req_count, meta=score['meta']) return score @@ -624,6 +632,13 @@ def run_score(args, protocol): uploads.extend([Card.parse_size(s) for s in x.split(',')]) requests = True + request_parallels = None + if args.request_parallels: + request_parallels = [] + for x in args.request_parallels: + request_parallels.extend([int(s) for s in x.split(',')]) + + if args.downloads or args.uploads or args.requests or args.handshakes: handshakes = args.handshakes if not args.downloads: @@ -663,7 +678,8 @@ def run_score(args, protocol): server_port=remote_port, verbose=args.verbose, curl_verbose=args.curl_verbose, - download_parallel=args.download_parallel) + download_parallel=args.download_parallel, + with_dtrace=args.dtrace) cards.append(card) if test_httpd: @@ -687,7 +703,8 @@ def run_score(args, protocol): server_descr=server_descr, server_port=server_port, verbose=args.verbose, curl_verbose=args.curl_verbose, - download_parallel=args.download_parallel) + download_parallel=args.download_parallel, + with_dtrace=args.dtrace) card.setup_resources(server_docs, downloads) cards.append(card) @@ -711,7 +728,8 @@ def run_score(args, protocol): server_descr=server_descr, server_port=server_port, verbose=args.verbose, curl_verbose=args.curl_verbose, - download_parallel=args.download_parallel) + download_parallel=args.download_parallel, + with_dtrace=args.dtrace) card.setup_resources(server_docs, downloads) cards.append(card) @@ -731,6 +749,7 @@ def run_score(args, protocol): upload_count=args.upload_count, req_count=args.request_count, requests=requests, + request_parallels=request_parallels, nsamples=args.samples) if args.json: print(json.JSONEncoder(indent=2).encode(score)) @@ -772,8 +791,28 @@ def main(): help="log more output on stderr") parser.add_argument("-j", "--json", action='store_true', default=False, help="print json instead of text") + parser.add_argument("--samples", action='store', type=int, metavar='number', + default=1, help="how many sample runs to make") + parser.add_argument("--httpd", action='store_true', default=False, + help="evaluate httpd server only") + parser.add_argument("--caddy", action='store_true', default=False, + help="evaluate caddy server only") + parser.add_argument("--curl-verbose", action='store_true', + default=False, help="run curl with `-v`") + parser.add_argument("--print", type=str, default=None, metavar='filename', + help="print the results from a JSON file") + parser.add_argument("protocol", default=None, nargs='?', + help="Name of protocol to score") + parser.add_argument("--start-only", action='store_true', default=False, + help="only start the servers") + parser.add_argument("--remote", action='store', type=str, + default=None, help="score against the remote server at :") + parser.add_argument("--dtrace", action='store_true', + default = False, help = "produce dtrace of curl") + parser.add_argument("-H", "--handshakes", action='store_true', default=False, help="evaluate handshakes only") + parser.add_argument("-d", "--downloads", action='store_true', default=False, help="evaluate downloads") parser.add_argument("--download-sizes", action='append', type=str, @@ -782,11 +821,10 @@ def main(): parser.add_argument("--download-count", action='store', type=int, metavar='number', default=50, help="perform that many downloads") - parser.add_argument("--samples", action='store', type=int, metavar='number', - default=1, help="how many sample runs to make") parser.add_argument("--download-parallel", action='store', type=int, metavar='number', default=0, help="perform that many downloads in parallel (default all)") + parser.add_argument("-u", "--uploads", action='store_true', default=False, help="evaluate uploads") parser.add_argument("--upload-sizes", action='append', type=str, @@ -795,25 +833,15 @@ def main(): parser.add_argument("--upload-count", action='store', type=int, metavar='number', default=50, help="perform that many uploads") + parser.add_argument("-r", "--requests", action='store_true', default=False, help="evaluate requests") parser.add_argument("--request-count", action='store', type=int, metavar='number', default=5000, help="perform that many requests") - parser.add_argument("--httpd", action='store_true', default=False, - help="evaluate httpd server only") - parser.add_argument("--caddy", action='store_true', default=False, - help="evaluate caddy server only") - parser.add_argument("--curl-verbose", action='store_true', - default=False, help="run curl with `-v`") - parser.add_argument("--print", type=str, default=None, metavar='filename', - help="print the results from a JSON file") - parser.add_argument("protocol", default=None, nargs='?', - help="Name of protocol to score") - parser.add_argument("--start-only", action='store_true', default=False, - help="only start the servers") - parser.add_argument("--remote", action='store', type=str, - default=None, help="score against the remote server at :") + parser.add_argument("--request-parallels", action='append', type=str, + metavar='numberlist', + default=None, help="evaluate request with these max-parallel numbers") args = parser.parse_args() if args.verbose > 0: diff --git a/tests/http/testenv/curl.py b/tests/http/testenv/curl.py index eb4d5d327f..21bc2fc762 100644 --- a/tests/http/testenv/curl.py +++ b/tests/http/testenv/curl.py @@ -108,6 +108,28 @@ class RunProfile: f'stats={self.stats}]' +class DTraceProfile: + + def __init__(self, pid: int, run_dir): + self._pid = pid + self._run_dir = run_dir + self._proc = None + + def start(self): + args = [ + 'sudo', 'dtrace', + '-x', 'ustackframes=100', + '-n', f'profile-97 /pid == {self._pid}/ {{ @[ustack()] = count(); }} tick-60s {{ exit(0); }}', + '-o', f'{self._run_dir}/curl.user_stacks' + ] + self._proc = subprocess.Popen(args, text=True, cwd=self._run_dir, shell=False) + assert self._proc + + def finish(self): + if self._proc: + self._proc.terminate() + + class RunTcpDump: def __init__(self, env, run_dir): @@ -467,7 +489,8 @@ class CurlClient: timeout: Optional[float] = None, silent: bool = False, run_env: Optional[Dict[str, str]] = None, - server_addr: Optional[str] = None): + server_addr: Optional[str] = None, + with_dtrace: bool = False): self.env = env self._timeout = timeout if timeout else env.test_timeout self._curl = os.environ['CURL'] if 'CURL' in os.environ else env.curl @@ -476,6 +499,7 @@ class CurlClient: self._stderrfile = f'{self._run_dir}/curl.stderr' self._headerfile = f'{self._run_dir}/curl.headers' self._log_path = f'{self._run_dir}/curl.log' + self._with_dtrace = with_dtrace self._silent = silent self._run_env = run_env self._server_addr = server_addr if server_addr else '127.0.0.1' @@ -784,6 +808,9 @@ class CurlClient: profile = RunProfile(p.pid, started_at, self._run_dir) if intext is not None and False: p.communicate(input=intext.encode(), timeout=1) + if self._with_dtrace: + dtrace = DTraceProfile(p.pid, self._run_dir) + dtrace.start() ptimeout = 0.0 while True: try: @@ -797,6 +824,8 @@ class CurlClient: ptimeout = 0.01 exitcode = p.returncode profile.finish() + if self._with_dtrace: + dtrace.finish() log.info(f'done: exit={exitcode}, profile={profile}') else: p = subprocess.run(args, stderr=cerr, stdout=cout, -- 2.47.2