};
#if defined(DEBUGBUILD) && defined(USE_LIBUV)
+
+#define DEBUG_UV 0
+
/* object to pass to the callbacks */
struct datauv {
uv_timer_t timeout;
static CURLcode check_finished(struct parastate *s);
-static void check_multi_info(struct contextuv *context)
+static void check_multi_info(struct datauv *uv)
{
- (void)check_finished(context->uv->s);
+ CURLcode result;
+
+ result = check_finished(uv->s);
+ if(result && !uv->s->result)
+ uv->s->result = result;
+
+ if(uv->s->more_transfers) {
+ result = add_parallel_transfers(uv->s->global, uv->s->multi,
+ uv->s->share,
+ &uv->s->more_transfers,
+ &uv->s->added_transfers);
+ if(result && !uv->s->result)
+ uv->s->result = result;
+ if(result)
+ uv_stop(uv->loop);
+ }
}
/* callback from libuv on socket activity */
curl_multi_socket_action(c->uv->s->multi, c->sockfd, flags,
&c->uv->s->still_running);
- check_multi_info(c);
}
/* callback from libuv when timeout expires */
static void on_uv_timeout(uv_timer_t *req)
{
- struct contextuv *c = (struct contextuv *) req->data;
- if(c) {
- curl_multi_socket_action(c->uv->s->multi, CURL_SOCKET_TIMEOUT, 0,
- &c->uv->s->still_running);
- check_multi_info(c);
+ struct datauv *uv = (struct datauv *) req->data;
+#if DEBUG_UV
+ fprintf(tool_stderr, "parallel_event: on_uv_timeout\n");
+#endif
+ if(uv && uv->s) {
+ curl_multi_socket_action(uv->s->multi, CURL_SOCKET_TIMEOUT, 0,
+ &uv->s->still_running);
+ check_multi_info(uv);
}
}
struct datauv *uv)
{
(void)multi;
+#if DEBUG_UV
+ fprintf(tool_stderr, "parallel_event: cb_timeout=%ld\n", timeout_ms);
+#endif
if(timeout_ms < 0)
uv_timer_stop(&uv->timeout);
else {
uv_poll_stop(&c->poll_handle);
destroy_context(c);
curl_multi_assign(uv->s->multi, s, NULL);
+ /* check if we can do more now */
+ check_multi_info(uv);
}
break;
default:
CURLcode result = CURLE_OK;
struct datauv uv = { 0 };
+ s->result = CURLE_OK;
+ uv.s = s;
uv.loop = uv_default_loop();
uv_timer_init(uv.loop, &uv.timeout);
- uv.s = s;
+ uv.timeout.data = &uv;
/* setup event callbacks */
curl_multi_setopt(s->multi, CURLMOPT_SOCKETFUNCTION, cb_socket);
/* kickstart the thing */
curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0,
- &uv.s->still_running);
- uv_run(uv.loop, UV_RUN_DEFAULT);
+ &s->still_running);
- return result;
+ while(!s->mcode && (s->still_running || s->more_transfers)) {
+#if DEBUG_UV
+ fprintf(tool_stderr, "parallel_event: uv_run(), mcode=%d, %d running, "
+ "%d more\n", s->mcode, uv.s->still_running, s->more_transfers);
+#endif
+ uv_run(uv.loop, UV_RUN_DEFAULT);
+#if DEBUG_UV
+ fprintf(tool_stderr, "parallel_event: uv_run() returned\n");
+#endif
+
+ result = check_finished(s);
+ if(result && !s->result)
+ s->result = result;
+
+ /* early exit called */
+ if(s->wrapitup) {
+ if(s->still_running && !s->wrapitup_processed) {
+ struct per_transfer *per;
+ for(per = transfers; per; per = per->next) {
+ if(per->added)
+ per->abort = TRUE;
+ }
+ s->wrapitup_processed = TRUE;
+ }
+ break;
+ }
+
+ if(s->more_transfers) {
+ result = add_parallel_transfers(s->global, s->multi, s->share,
+ &s->more_transfers, &s->added_transfers);
+ if(result && !s->result)
+ s->result = result;
+ }
+ }
+
+#if DEBUG_UV
+ fprintf(tool_stderr, "DONE parallel_event -> %d, mcode=%d, %d running, "
+ "%d more\n",
+ s->result, s->mcode, uv.s->still_running, s->more_transfers);
+#endif
+ return s->result;
}
#endif
r.check_response(http_status=200, count=100, connect_count=1)
# download 100 files parallel
- @pytest.mark.parametrize("proto", ['h2', 'h3'])
- def test_02_04_download_100_parallel(self, env: Env,
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_02_04_download_20_parallel(self, env: Env,
httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
- max_parallel = 50
+ count = 20
+ max_parallel = 10
curl = CurlClient(env=env)
- urln = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-99]'
+ urln = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-{count-1}]'
r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[
'--parallel', '--parallel-max', f'{max_parallel}'
])
- r.check_response(http_status=200, count=100)
+ r.check_response(http_status=200, count=count)
if proto == 'http/1.1':
# http/1.1 parallel transfers will open multiple connections
assert r.total_connects > 1, r.dump_logs()
self.name = name
self.path = os.path.join(env.project_dir, f'tests/http/clients/{name}')
self.env = env
- self._run_env= run_env
+ self._run_env = run_env
self._timeout = timeout if timeout else env.test_timeout
self._curl = os.environ['CURL'] if 'CURL' in os.environ else env.curl
self._run_dir = run_dir if run_dir else os.path.join(env.gen_dir, name)
exception = None
myargs = [self.path]
myargs.extend(args)
+ run_env = None
+ if self._run_env:
+ run_env = self._run_env.copy()
+ for key in ['CURL_DEBUG']:
+ if key in os.environ and key not in run_env:
+ run_env[key] = os.environ[key]
try:
with open(self._stdoutfile, 'w') as cout:
with open(self._stderrfile, 'w') as cerr:
p = subprocess.run(myargs, stderr=cerr, stdout=cout,
cwd=self._run_dir, shell=False,
- input=None, env=self._run_env,
+ input=None, env=run_env,
timeout=self._timeout)
exitcode = p.returncode
except subprocess.TimeoutExpired: