def long_run():
curl = CurlClient(env=env)
# send 10 chunks of 1024 bytes in a response body with 100ms delay in between
+ # pause 2 seconds between requests
urln = f'https://{env.authority_for(env.domain1, proto)}' \
f'/curltest/tweak?id=[0-{count - 1}]'\
'&chunks=10&chunk_size=1024&chunk_delay=100ms'
- self.r = curl.http_download(urls=[urln], alpn_proto=proto)
+ self.r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[
+ '--rate', '30/m',
+ ])
t = Thread(target=long_run)
t.start()
t.join()
r: ExecResult = self.r
r.check_response(count=count, http_status=200, connect_count=2)
- # reload will shut down the connection gracefully with GOAWAY
+ # reload will shut down the connection gracefully
# we expect to see a second connection opened afterwards
for idx, s in enumerate(r.stats):
if s['num_connects'] > 0:
--- /dev/null
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#***************************************************************************
+# _ _ ____ _
+# Project ___| | | | _ \| |
+# / __| | | | |_) | |
+# | (__| |_| | _ <| |___
+# \___|\___/|_| \_\_____|
+#
+# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
+#
+# This software is licensed as described in the file COPYING, which
+# you should have received as part of this distribution. The terms
+# are also available at https://curl.se/docs/copyright.html.
+#
+# You may opt to use, copy, modify, merge, publish, distribute and/or sell
+# copies of the Software, and permit persons to whom the Software is
+# furnished to do so, under the terms of the COPYING file.
+#
+# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
+# KIND, either express or implied.
+#
+# SPDX-License-Identifier: curl
+#
+###########################################################################
+#
+import difflib
+import filecmp
+import logging
+import os
+from datetime import timedelta
+import pytest
+
+from testenv import Env, CurlClient, LocalClient
+
+
+log = logging.getLogger(__name__)
+
+
+class TestMethods:
+
+ @pytest.fixture(autouse=True, scope='class')
+ def _class_scope(self, env, httpd, nghttpx):
+ if env.have_h3():
+ nghttpx.start_if_needed()
+ httpd.clear_extra_configs()
+ httpd.reload()
+
+ @pytest.fixture(autouse=True, scope='class')
+ def _class_scope(self, env, httpd):
+ indir = httpd.docs_dir
+ env.make_data_file(indir=indir, fname="data-10k", fsize=10*1024)
+ env.make_data_file(indir=indir, fname="data-100k", fsize=100*1024)
+ env.make_data_file(indir=indir, fname="data-1m", fsize=1024*1024)
+
+ # download 1 file
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_18_01_delete(self, env: Env, httpd, nghttpx, repeat, proto):
+ if proto == 'h3' and not env.have_h3():
+ pytest.skip("h3 not supported")
+ count = 1
+ curl = CurlClient(env=env)
+ url = f'https://{env.authority_for(env.domain1, proto)}/curltest/tweak?id=[0-{count-1}]'
+ r = curl.http_delete(urls=[url], alpn_proto=proto)
+ r.check_stats(count=count, http_status=204, exitcode=0)
+
+ # make HTTP/2 in the server send
+ # - HEADER frame with 204 and eos=0
+ # - 10ms later DATA frame length=0 and eos=1
+ # should be accepted
+ def test_18_02_delete_h2_special(self, env: Env, httpd, nghttpx, repeat):
+ proto = 'h2'
+ count = 1
+ curl = CurlClient(env=env)
+ url = f'https://{env.authority_for(env.domain1, proto)}/curltest/tweak?id=[0-{count-1}]'\
+ '&chunks=1&chunk_size=0&chunk_delay=10ms'
+ r = curl.http_delete(urls=[url], alpn_proto=proto)
+ r.check_stats(count=count, http_status=204, exitcode=0)
+
with_headers=with_headers,
with_profile=with_profile)
+ def http_delete(self, urls: List[str],
+ alpn_proto: Optional[str] = None,
+ with_stats: bool = True,
+ with_profile: bool = False,
+ extra_args: Optional[List[str]] = None):
+ if extra_args is None:
+ extra_args = []
+ extra_args.extend([
+ '-X', 'DELETE', '-o', '/dev/null',
+ ])
+ if with_stats:
+ extra_args.extend([
+ '-w', '%{json}\\n'
+ ])
+ return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
+ with_stats=with_stats,
+ with_headers=False,
+ with_profile=with_profile)
+
def http_put(self, urls: List[str], data=None, fdata=None,
alpn_proto: Optional[str] = None,
with_stats: bool = True,