]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
cf-socket: make socket data_pending a nop
authorStefan Eissing <stefan@eissing.org>
Mon, 30 Jun 2025 06:53:31 +0000 (08:53 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Mon, 30 Jun 2025 11:44:24 +0000 (13:44 +0200)
Eliminating the socket readability check in the socket connection
filters for the 'data_pending' callback. Improves performance of
handling of transfers, up to ~30%, depending on parallelism and response
size.

Whatever `data_pending()` once was, its semantics are now:
"Is there anything buffered in the connection filters that needs
 receive?"
Any checks of the socket's readability are done via `multi_wait()`
and friends.

Fix the one place in HTTP/1 proxy code that checked `data_pending()` and
did an early return if false. Remove that check and actually try to
receive data every time.

Closes #17785

lib/cf-h1-proxy.c
lib/cf-socket.c
lib/vquic/curl_ngtcp2.c
tests/http/scorecard.py
tests/http/testenv/curl.py

index 38d6d14a9d6d51cbec429d35a7c9eafeb6e443ac..56c4d88d0b8050382c680ddece56f56b07f03160 100644 (file)
@@ -374,9 +374,6 @@ static CURLcode recv_CONNECT_resp(struct Curl_cfilter *cf,
   error = SELECT_OK;
   *done = FALSE;
 
-  if(!Curl_conn_data_pending(data, cf->sockindex))
-    return CURLE_OK;
-
   while(ts->keepon) {
     size_t nread;
     char byte;
index 87e7dd412f39974c4138334e1abd902ec4d466e3..4187a6b2cf00d833262024c486e7917ff8e0f0ac 100644 (file)
@@ -1404,17 +1404,6 @@ static void cf_socket_adjust_pollset(struct Curl_cfilter *cf,
   }
 }
 
-static bool cf_socket_data_pending(struct Curl_cfilter *cf,
-                                   const struct Curl_easy *data)
-{
-  struct cf_socket_ctx *ctx = cf->ctx;
-  int readable;
-
-  (void)data;
-  readable = SOCKET_READABLE(ctx->sock, 0);
-  return readable > 0 && (readable & CURL_CSELECT_IN);
-}
-
 #ifdef USE_WINSOCK
 
 #ifndef SIO_IDEAL_SEND_BACKLOG_QUERY
@@ -1750,7 +1739,7 @@ struct Curl_cftype Curl_cft_tcp = {
   cf_socket_close,
   cf_socket_shutdown,
   cf_socket_adjust_pollset,
-  cf_socket_data_pending,
+  Curl_cf_def_data_pending,
   cf_socket_send,
   cf_socket_recv,
   cf_socket_cntrl,
@@ -1904,7 +1893,7 @@ struct Curl_cftype Curl_cft_udp = {
   cf_socket_close,
   cf_socket_shutdown,
   cf_socket_adjust_pollset,
-  cf_socket_data_pending,
+  Curl_cf_def_data_pending,
   cf_socket_send,
   cf_socket_recv,
   cf_socket_cntrl,
@@ -1958,7 +1947,7 @@ struct Curl_cftype Curl_cft_unix = {
   cf_socket_close,
   cf_socket_shutdown,
   cf_socket_adjust_pollset,
-  cf_socket_data_pending,
+  Curl_cf_def_data_pending,
   cf_socket_send,
   cf_socket_recv,
   cf_socket_cntrl,
@@ -2178,7 +2167,7 @@ struct Curl_cftype Curl_cft_tcp_accept = {
   cf_socket_close,
   cf_socket_shutdown,
   cf_socket_adjust_pollset,
-  cf_socket_data_pending,
+  Curl_cf_def_data_pending,
   cf_socket_send,
   cf_socket_recv,
   cf_socket_cntrl,
index d91cd6b083ca7a2cb11360d22d61a0b45150af08..f6046c717e71512b8a912390ee290cf14137f77b 100644 (file)
@@ -1941,18 +1941,6 @@ out:
   return CURLE_OK;
 }
 
-/*
- * Called from transfer.c:data_pending to know if we should keep looping
- * to receive more data from the connection.
- */
-static bool cf_ngtcp2_data_pending(struct Curl_cfilter *cf,
-                                   const struct Curl_easy *data)
-{
-  (void)cf;
-  (void)data;
-  return FALSE;
-}
-
 static CURLcode h3_data_pause(struct Curl_cfilter *cf,
                               struct Curl_easy *data,
                               bool pause)
@@ -2728,7 +2716,7 @@ struct Curl_cftype Curl_cft_http3 = {
   cf_ngtcp2_close,
   cf_ngtcp2_shutdown,
   cf_ngtcp2_adjust_pollset,
-  cf_ngtcp2_data_pending,
+  Curl_cf_def_data_pending,
   cf_ngtcp2_send,
   cf_ngtcp2_recv,
   cf_ngtcp2_data_event,
index 1966d9285c3c3e83c43da9cbb656e1905763cfa1..51a183a162f5d5016de75c1046c2bd467307546f 100644 (file)
@@ -185,7 +185,8 @@ class ScoreRunner:
                  verbose: int,
                  curl_verbose: int,
                  download_parallel: int = 0,
-                 server_addr: Optional[str] = None):
+                 server_addr: Optional[str] = None,
+                 with_dtrace: bool = False):
         self.verbose = verbose
         self.env = env
         self.protocol = protocol
@@ -194,12 +195,18 @@ class ScoreRunner:
         self.server_port = server_port
         self._silent_curl = not curl_verbose
         self._download_parallel = download_parallel
+        self._with_dtrace = with_dtrace
 
     def info(self, msg):
         if self.verbose > 0:
             sys.stderr.write(msg)
             sys.stderr.flush()
 
+    def mk_curl_client(self):
+        return CurlClient(env=self.env, silent=self._silent_curl,
+                          server_addr=self.server_addr,
+                          with_dtrace=self._with_dtrace)
+
     def handshakes(self) -> Dict[str, Any]:
         props = {}
         sample_size = 5
@@ -215,8 +222,7 @@ class ScoreRunner:
                 hs_samples = []
                 errors = []
                 for _ in range(sample_size):
-                    curl = CurlClient(env=self.env, silent=self._silent_curl,
-                                      server_addr=self.server_addr)
+                    curl = self.mk_curl_client()
                     args = [
                         '--http3-only' if self.protocol == 'h3' else '--http2',
                         f'--{ipv}', f'https://{authority}/'
@@ -274,8 +280,7 @@ class ScoreRunner:
         profiles = []
         self.info('single...')
         for _ in range(nsamples):
-            curl = CurlClient(env=self.env, silent=self._silent_curl,
-                              server_addr=self.server_addr)
+            curl = self.mk_curl_client()
             r = curl.http_download(urls=[url], alpn_proto=self.protocol,
                                    no_save=True, with_headers=False,
                                    with_profile=True)
@@ -295,8 +300,7 @@ class ScoreRunner:
         url = f'{url}?[0-{count - 1}]'
         self.info('serial...')
         for _ in range(nsamples):
-            curl = CurlClient(env=self.env, silent=self._silent_curl,
-                              server_addr=self.server_addr)
+            curl = self.mk_curl_client()
             r = curl.http_download(urls=[url], alpn_proto=self.protocol,
                                    no_save=True,
                                    with_headers=False, with_profile=True)
@@ -317,8 +321,7 @@ class ScoreRunner:
         url = f'{url}?[0-{count - 1}]'
         self.info('parallel...')
         for _ in range(nsamples):
-            curl = CurlClient(env=self.env, silent=self._silent_curl,
-                              server_addr=self.server_addr)
+            curl = self.mk_curl_client()
             r = curl.http_download(urls=[url], alpn_proto=self.protocol,
                                    no_save=True,
                                    with_headers=False,
@@ -339,9 +342,12 @@ class ScoreRunner:
     def downloads(self, count: int, fsizes: List[int], meta: Dict[str, Any]) -> Dict[str, Any]:
         nsamples = meta['samples']
         max_parallel = self._download_parallel if self._download_parallel > 0 else count
-        cols = ['size', 'single']
+        cols = ['size']
+        if not self._download_parallel:
+            cols.append('single')
+            if count > 1:
+                cols.append(f'serial({count})')
         if count > 1:
-            cols.append(f'serial({count})')
             cols.append(f'parallel({count}x{max_parallel})')
         rows = []
         for fsize in fsizes:
@@ -351,10 +357,11 @@ class ScoreRunner:
             }]
             self.info(f'{row[0]["sval"]} downloads...')
             url = f'https://{self.env.domain1}:{self.server_port}/score{row[0]["sval"]}.data'
-
-            row.append(self.dl_single(url=url, nsamples=nsamples))
+            if 'single' in cols:
+                row.append(self.dl_single(url=url, nsamples=nsamples))
             if count > 1:
-                row.append(self.dl_serial(url=url, count=count, nsamples=nsamples))
+                if 'single' in cols:
+                    row.append(self.dl_serial(url=url, count=count, nsamples=nsamples))
                 row.append(self.dl_parallel(url=url, count=count, nsamples=nsamples))
             rows.append(row)
             self.info('done.\n')
@@ -387,8 +394,7 @@ class ScoreRunner:
         profiles = []
         self.info('single...')
         for _ in range(nsamples):
-            curl = CurlClient(env=self.env, silent=self._silent_curl,
-                              server_addr=self.server_addr)
+            curl = self.mk_curl_client()
             r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=self.protocol,
                               with_headers=False, with_profile=True)
             err = self._check_uploads(r, 1)
@@ -407,8 +413,7 @@ class ScoreRunner:
         url = f'{url}?id=[0-{count - 1}]'
         self.info('serial...')
         for _ in range(nsamples):
-            curl = CurlClient(env=self.env, silent=self._silent_curl,
-                              server_addr=self.server_addr)
+            curl = self.mk_curl_client()
             r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=self.protocol,
                               with_headers=False, with_profile=True)
             err = self._check_uploads(r, count)
@@ -428,8 +433,7 @@ class ScoreRunner:
         url = f'{url}?id=[0-{count - 1}]'
         self.info('parallel...')
         for _ in range(nsamples):
-            curl = CurlClient(env=self.env, silent=self._silent_curl,
-                              server_addr=self.server_addr)
+            curl = self.mk_curl_client()
             r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=self.protocol,
                               with_headers=False, with_profile=True,
                               extra_args=[
@@ -494,8 +498,7 @@ class ScoreRunner:
             ])
         self.info(f'{max_parallel}...')
         for _ in range(nsamples):
-            curl = CurlClient(env=self.env, silent=self._silent_curl,
-                              server_addr=self.server_addr)
+            curl = self.mk_curl_client()
             r = curl.http_download(urls=[url], alpn_proto=self.protocol, no_save=True,
                                    with_headers=False, with_profile=True,
                                    with_stats=False, extra_args=extra_args)
@@ -517,7 +520,8 @@ class ScoreRunner:
         fsize = 10*1024
         cols = ['size', 'total']
         rows = []
-        cols.extend([f'{mp} max' for mp in [1, 6, 25, 50, 100, 300]])
+        mparallel = meta['request_parallels']
+        cols.extend([f'{mp} max' for mp in mparallel])
         row = [{
             'val': fsize,
             'sval': Card.fmt_size(fsize)
@@ -528,7 +532,7 @@ class ScoreRunner:
         self.info('requests, max parallel...')
         row.extend([self.do_requests(url=url, count=count,
                                      max_parallel=mp, nsamples=meta["samples"])
-                    for mp in [1, 6, 25, 50, 100, 300]])
+                    for mp in mparallel])
         rows.append(row)
         self.info('done.\n')
         return {
@@ -547,6 +551,7 @@ class ScoreRunner:
               uploads: Optional[List[int]] = None,
               upload_count: int = 50,
               req_count=5000,
+              request_parallels=None,
               nsamples: int = 1,
               requests: bool = True):
         self.info(f"scoring {self.protocol} against {self.server_descr}\n")
@@ -599,6 +604,9 @@ class ScoreRunner:
                                             fsizes=uploads,
                                             meta=score['meta'])
         if requests:
+            if request_parallels is None:
+                request_parallels = [1, 6, 25, 50, 100, 300]
+            score['meta']['request_parallels'] = request_parallels
             score['requests'] = self.requests(count=req_count, meta=score['meta'])
         return score
 
@@ -624,6 +632,13 @@ def run_score(args, protocol):
             uploads.extend([Card.parse_size(s) for s in x.split(',')])
 
     requests = True
+    request_parallels = None
+    if args.request_parallels:
+        request_parallels = []
+        for x in args.request_parallels:
+            request_parallels.extend([int(s) for s in x.split(',')])
+
+
     if args.downloads or args.uploads or args.requests or args.handshakes:
         handshakes = args.handshakes
         if not args.downloads:
@@ -663,7 +678,8 @@ def run_score(args, protocol):
                                server_port=remote_port,
                                verbose=args.verbose,
                                curl_verbose=args.curl_verbose,
-                               download_parallel=args.download_parallel)
+                               download_parallel=args.download_parallel,
+                               with_dtrace=args.dtrace)
             cards.append(card)
 
         if test_httpd:
@@ -687,7 +703,8 @@ def run_score(args, protocol):
                                server_descr=server_descr,
                                server_port=server_port,
                                verbose=args.verbose, curl_verbose=args.curl_verbose,
-                               download_parallel=args.download_parallel)
+                               download_parallel=args.download_parallel,
+                               with_dtrace=args.dtrace)
             card.setup_resources(server_docs, downloads)
             cards.append(card)
 
@@ -711,7 +728,8 @@ def run_score(args, protocol):
                                server_descr=server_descr,
                                server_port=server_port,
                                verbose=args.verbose, curl_verbose=args.curl_verbose,
-                               download_parallel=args.download_parallel)
+                               download_parallel=args.download_parallel,
+                               with_dtrace=args.dtrace)
             card.setup_resources(server_docs, downloads)
             cards.append(card)
 
@@ -731,6 +749,7 @@ def run_score(args, protocol):
                                    upload_count=args.upload_count,
                                    req_count=args.request_count,
                                    requests=requests,
+                                   request_parallels=request_parallels,
                                    nsamples=args.samples)
                 if args.json:
                     print(json.JSONEncoder(indent=2).encode(score))
@@ -772,8 +791,28 @@ def main():
                         help="log more output on stderr")
     parser.add_argument("-j", "--json", action='store_true',
                         default=False, help="print json instead of text")
+    parser.add_argument("--samples", action='store', type=int, metavar='number',
+                        default=1, help="how many sample runs to make")
+    parser.add_argument("--httpd", action='store_true', default=False,
+                        help="evaluate httpd server only")
+    parser.add_argument("--caddy", action='store_true', default=False,
+                        help="evaluate caddy server only")
+    parser.add_argument("--curl-verbose", action='store_true',
+                        default=False, help="run curl with `-v`")
+    parser.add_argument("--print", type=str, default=None, metavar='filename',
+                        help="print the results from a JSON file")
+    parser.add_argument("protocol", default=None, nargs='?',
+                        help="Name of protocol to score")
+    parser.add_argument("--start-only", action='store_true', default=False,
+                        help="only start the servers")
+    parser.add_argument("--remote", action='store', type=str,
+                        default=None, help="score against the remote server at <ip>:<port>")
+    parser.add_argument("--dtrace", action='store_true',
+                        default = False, help = "produce dtrace of curl")
+
     parser.add_argument("-H", "--handshakes", action='store_true',
                         default=False, help="evaluate handshakes only")
+
     parser.add_argument("-d", "--downloads", action='store_true',
                         default=False, help="evaluate downloads")
     parser.add_argument("--download-sizes", action='append', type=str,
@@ -782,11 +821,10 @@ def main():
     parser.add_argument("--download-count", action='store', type=int,
                         metavar='number',
                         default=50, help="perform that many downloads")
-    parser.add_argument("--samples", action='store', type=int, metavar='number',
-                        default=1, help="how many sample runs to make")
     parser.add_argument("--download-parallel", action='store', type=int,
                         metavar='number', default=0,
                         help="perform that many downloads in parallel (default all)")
+
     parser.add_argument("-u", "--uploads", action='store_true',
                         default=False, help="evaluate uploads")
     parser.add_argument("--upload-sizes", action='append', type=str,
@@ -795,25 +833,15 @@ def main():
     parser.add_argument("--upload-count", action='store', type=int,
                         metavar='number', default=50,
                         help="perform that many uploads")
+
     parser.add_argument("-r", "--requests", action='store_true',
                         default=False, help="evaluate requests")
     parser.add_argument("--request-count", action='store', type=int,
                         metavar='number',
                         default=5000, help="perform that many requests")
-    parser.add_argument("--httpd", action='store_true', default=False,
-                        help="evaluate httpd server only")
-    parser.add_argument("--caddy", action='store_true', default=False,
-                        help="evaluate caddy server only")
-    parser.add_argument("--curl-verbose", action='store_true',
-                        default=False, help="run curl with `-v`")
-    parser.add_argument("--print", type=str, default=None, metavar='filename',
-                        help="print the results from a JSON file")
-    parser.add_argument("protocol", default=None, nargs='?',
-                        help="Name of protocol to score")
-    parser.add_argument("--start-only", action='store_true', default=False,
-                        help="only start the servers")
-    parser.add_argument("--remote", action='store', type=str,
-                        default=None, help="score against the remote server at <ip>:<port>")
+    parser.add_argument("--request-parallels", action='append', type=str,
+                        metavar='numberlist',
+                        default=None, help="evaluate request with these max-parallel numbers")
     args = parser.parse_args()
 
     if args.verbose > 0:
index eb4d5d327f2eb471f6df521eece0c5d5188e13c0..21bc2fc762e29006124280db2230f046b0f27a83 100644 (file)
@@ -108,6 +108,28 @@ class RunProfile:
                f'stats={self.stats}]'
 
 
+class DTraceProfile:
+
+    def __init__(self, pid: int, run_dir):
+        self._pid = pid
+        self._run_dir = run_dir
+        self._proc = None
+
+    def start(self):
+        args = [
+            'sudo', 'dtrace',
+            '-x', 'ustackframes=100',
+            '-n', f'profile-97 /pid == {self._pid}/ {{ @[ustack()] = count(); }} tick-60s {{ exit(0); }}',
+            '-o', f'{self._run_dir}/curl.user_stacks'
+        ]
+        self._proc = subprocess.Popen(args, text=True, cwd=self._run_dir, shell=False)
+        assert self._proc
+
+    def finish(self):
+        if self._proc:
+            self._proc.terminate()
+
+
 class RunTcpDump:
 
     def __init__(self, env, run_dir):
@@ -467,7 +489,8 @@ class CurlClient:
                  timeout: Optional[float] = None,
                  silent: bool = False,
                  run_env: Optional[Dict[str, str]] = None,
-                 server_addr: Optional[str] = None):
+                 server_addr: Optional[str] = None,
+                 with_dtrace: bool = False):
         self.env = env
         self._timeout = timeout if timeout else env.test_timeout
         self._curl = os.environ['CURL'] if 'CURL' in os.environ else env.curl
@@ -476,6 +499,7 @@ class CurlClient:
         self._stderrfile = f'{self._run_dir}/curl.stderr'
         self._headerfile = f'{self._run_dir}/curl.headers'
         self._log_path = f'{self._run_dir}/curl.log'
+        self._with_dtrace = with_dtrace
         self._silent = silent
         self._run_env = run_env
         self._server_addr = server_addr if server_addr else '127.0.0.1'
@@ -784,6 +808,9 @@ class CurlClient:
                     profile = RunProfile(p.pid, started_at, self._run_dir)
                     if intext is not None and False:
                         p.communicate(input=intext.encode(), timeout=1)
+                    if self._with_dtrace:
+                        dtrace = DTraceProfile(p.pid, self._run_dir)
+                        dtrace.start()
                     ptimeout = 0.0
                     while True:
                         try:
@@ -797,6 +824,8 @@ class CurlClient:
                             ptimeout = 0.01
                     exitcode = p.returncode
                     profile.finish()
+                    if self._with_dtrace:
+                        dtrace.finish()
                     log.info(f'done: exit={exitcode}, profile={profile}')
                 else:
                     p = subprocess.run(args, stderr=cerr, stdout=cout,