VRF
VRFY
VSE
+vsftpd
vsprintf
vt
vtls
- if: contains(matrix.build.install_steps, 'pytest')
run: |
- sudo apt-get install apache2 apache2-dev libnghttp2-dev
+ sudo apt-get install apache2 apache2-dev libnghttp2-dev vsftpd
sudo python3 -m pip install -r tests/http/requirements.txt
name: 'install pytest and apach2-dev'
)
AC_SUBST(TEST_NGHTTPX)
-CADDY=caddy
+CADDY=/usr/bin/caddy
AC_ARG_WITH(test-caddy,dnl
AS_HELP_STRING([--with-test-caddy=PATH],[where to find caddy for testing]),
CADDY=$withval
)
AC_SUBST(CADDY)
+VSFTPD=/usr/sbin/vsftpd
+AC_ARG_WITH(test-vsftpd,dnl
+AS_HELP_STRING([--with-test-vsftpd=PATH],[where to find vsftpd for testing]),
+ VSFTPD=$withval
+ if test X"$OPT_VSFTPD" = "Xno" ; then
+ VSFTPD=""
+ fi
+)
+AC_SUBST(VSFTPD)
+
dnl we'd like a httpd+apachectl as test server
dnl
HTTPD_ENABLED="maybe"
report.extend([
f' Caddy: {env.caddy_version()}, http:{env.caddy_http_port} https:{env.caddy_https_port}'
])
+ if env.has_vsftpd():
+ report.extend([
+ f' VsFTPD: {env.vsftpd_version()}, ftp:{env.ftp_port}'
+ ])
return '\n'.join(report)
* `--with-test-nghttpx=<path-of-nghttpx>` if you have nghttpx to use somewhere outside your `$PATH`.
* `--with-test-httpd=<httpd-install-path>` if you have an Apache httpd installed somewhere else. On Debian/Ubuntu it will otherwise look into `/usr/bin` and `/usr/sbin` to find those.
+ * `--with-test-caddy=<caddy-install-path>` if you have a Caddy web server installed somewhere else.
+ * `--with-test-vsftpd=<vsftpd-install-path>` if you have a vsftpd ftp server installed somewhere else.
## Usage Tips
[caddy]
caddy = @CADDY@
+
+[vsftpd]
+vsftpd = @VSFTPD@
@pytest.mark.parametrize("pause_offset", [0, 10*1024, 100*1023, 640000])
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_02_21_lib_serial(self, env: Env, httpd, nghttpx, proto, pause_offset, repeat):
+ if proto == 'h3' and not env.have_h3():
+ pytest.skip("h3 not supported")
count = 2 if proto == 'http/1.1' else 10
docname = 'data-10m'
url = f'https://localhost:{env.https_port}/{docname}'
@pytest.mark.parametrize("pause_offset", [100*1023])
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_02_22_lib_parallel_resume(self, env: Env, httpd, nghttpx, proto, pause_offset, repeat):
+ if proto == 'h3' and not env.have_h3():
+ pytest.skip("h3 not supported")
count = 2 if proto == 'http/1.1' else 10
max_parallel = 5
docname = 'data-10m'
# download, several at a time, pause and abort paused
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_02_23a_lib_abort_paused(self, env: Env, httpd, nghttpx, proto, repeat):
+ if proto == 'h3' and not env.have_h3():
+ pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_ossl_quic():
pytest.skip('OpenSSL QUIC fails here')
if proto == 'h3' and env.ci_run and env.curl_uses_lib('quiche'):
# download, several at a time, abort after n bytes
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_02_23b_lib_abort_offset(self, env: Env, httpd, nghttpx, proto, repeat):
+ if proto == 'h3' and not env.have_h3():
+ pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_ossl_quic():
pytest.skip('OpenSSL QUIC fails here')
if proto == 'h3' and env.ci_run and env.curl_uses_lib('quiche'):
# download, several at a time, abort after n bytes
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_02_23c_lib_fail_offset(self, env: Env, httpd, nghttpx, proto, repeat):
+ if proto == 'h3' and not env.have_h3():
+ pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_ossl_quic():
pytest.skip('OpenSSL QUIC fails here')
if proto == 'h3' and env.ci_run and env.curl_uses_lib('quiche'):
assert nghttpx.reload(timeout=timedelta(seconds=2))
t.join()
r: ExecResult = self.r
- # this should take `count` seconds to retrieve
- assert r.duration >= timedelta(seconds=count)
+ # this should take `count` seconds to retrieve, maybe a little less
+ assert r.duration >= timedelta(seconds=count-1)
r.check_response(count=count, http_status=200, connect_count=2)
# reload will shut down the connection gracefully with GOAWAY
# we expect to see a second connection opened afterwards
r = curl.http_download(urls=[url], alpn_proto=proto, extra_args=[
'--parallel',
])
- if proto == 'http/1.0':
+ if proto == 'http/1.0' and \
+ (env.curl_is_debug() or not env.curl_uses_lib('openssl')):
+ # we are inconsistent if we fail or not in missing TLS shutdown
+ # openssl code ignore such errors intentionally in non-debug builds
r.check_exit_code(56)
else:
r.check_exit_code(0)
except ConnectionAbortedError:
self._done = True
+ except OSError:
+ self._done = True
class TestUnix:
--- /dev/null
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#***************************************************************************
+# _ _ ____ _
+# Project ___| | | | _ \| |
+# / __| | | | |_) | |
+# | (__| |_| | _ <| |___
+# \___|\___/|_| \_\_____|
+#
+# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
+#
+# This software is licensed as described in the file COPYING, which
+# you should have received as part of this distribution. The terms
+# are also available at https://curl.se/docs/copyright.html.
+#
+# You may opt to use, copy, modify, merge, publish, distribute and/or sell
+# copies of the Software, and permit persons to whom the Software is
+# furnished to do so, under the terms of the COPYING file.
+#
+# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
+# KIND, either express or implied.
+#
+# SPDX-License-Identifier: curl
+#
+###########################################################################
+#
+import difflib
+import filecmp
+import logging
+import os
+import shutil
+import pytest
+
+from testenv import Env, CurlClient, VsFTPD
+
+
+log = logging.getLogger(__name__)
+
+
+@pytest.mark.skipif(condition=not Env.has_vsftpd(), reason=f"missing vsftpd")
+class TestVsFTPD:
+
+ @pytest.fixture(autouse=True, scope='class')
+ def vsftpd(self, env):
+ vsftpd = VsFTPD(env=env)
+ assert vsftpd.start()
+ yield vsftpd
+ vsftpd.stop()
+
+ def _make_docs_file(self, docs_dir: str, fname: str, fsize: int):
+ fpath = os.path.join(docs_dir, fname)
+ data1k = 1024*'x'
+ flen = 0
+ with open(fpath, 'w') as fd:
+ while flen < fsize:
+ fd.write(data1k)
+ flen += len(data1k)
+ return flen
+
+ @pytest.fixture(autouse=True, scope='class')
+ def _class_scope(self, env, vsftpd):
+ if os.path.exists(vsftpd.docs_dir):
+ shutil.rmtree(vsftpd.docs_dir)
+ if not os.path.exists(vsftpd.docs_dir):
+ os.makedirs(vsftpd.docs_dir)
+ self._make_docs_file(docs_dir=vsftpd.docs_dir, fname='data-1k', fsize=1024)
+ self._make_docs_file(docs_dir=vsftpd.docs_dir, fname='data-10k', fsize=10*1024)
+ self._make_docs_file(docs_dir=vsftpd.docs_dir, fname='data-1m', fsize=1024*1024)
+ self._make_docs_file(docs_dir=vsftpd.docs_dir, fname='data-10m', fsize=10*1024*1024)
+
+ def test_30_01_list_dir(self, env: Env, vsftpd: VsFTPD, repeat):
+ curl = CurlClient(env=env)
+ url = f'ftp://{env.ftp_domain}:{vsftpd.port}/'
+ r = curl.ftp_get(urls=[url], with_stats=True)
+ r.check_stats(count=1, http_status=226)
+ lines = open(os.path.join(curl.run_dir, 'download_#1.data')).readlines()
+ assert len(lines) == 4, f'list: {lines}'
+
+ # download 1 file, no SSL
+ @pytest.mark.parametrize("docname", [
+ 'data-1k', 'data-1m', 'data-10m'
+ ])
+ def test_30_02_download_1(self, env: Env, vsftpd: VsFTPD, docname, repeat):
+ curl = CurlClient(env=env)
+ srcfile = os.path.join(vsftpd.docs_dir, f'{docname}')
+ count = 1
+ url = f'ftp://{env.ftp_domain}:{vsftpd.port}/{docname}?[0-{count-1}]'
+ r = curl.ftp_get(urls=[url], with_stats=True)
+ r.check_stats(count=count, http_status=226)
+ self.check_downloads(curl, srcfile, count)
+
+ @pytest.mark.parametrize("docname", [
+ 'data-1k', 'data-1m', 'data-10m'
+ ])
+ def test_30_03_download_10_serial(self, env: Env, vsftpd: VsFTPD, docname, repeat):
+ curl = CurlClient(env=env)
+ srcfile = os.path.join(vsftpd.docs_dir, f'{docname}')
+ count = 10
+ url = f'ftp://{env.ftp_domain}:{vsftpd.port}/{docname}?[0-{count-1}]'
+ r = curl.ftp_get(urls=[url], with_stats=True)
+ r.check_stats(count=count, http_status=226)
+ self.check_downloads(curl, srcfile, count)
+
+ @pytest.mark.parametrize("docname", [
+ 'data-1k', 'data-1m', 'data-10m'
+ ])
+ def test_30_04_download_10_parallel(self, env: Env, vsftpd: VsFTPD, docname, repeat):
+ curl = CurlClient(env=env)
+ srcfile = os.path.join(vsftpd.docs_dir, f'{docname}')
+ count = 10
+ url = f'ftp://{env.ftp_domain}:{vsftpd.port}/{docname}?[0-{count-1}]'
+ r = curl.ftp_get(urls=[url], with_stats=True, extra_args=[
+ '--parallel'
+ ])
+ r.check_stats(count=count, http_status=226)
+ self.check_downloads(curl, srcfile, count)
+
+ def check_downloads(self, client, srcfile: str, count: int,
+ complete: bool = True):
+ for i in range(count):
+ dfile = client.download_file(i)
+ assert os.path.exists(dfile)
+ if complete and not filecmp.cmp(srcfile, dfile, shallow=False):
+ diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
+ b=open(dfile).readlines(),
+ fromfile=srcfile,
+ tofile=dfile,
+ n=1))
+ assert False, f'download {dfile} differs:\n{diff}'
+
+
+
--- /dev/null
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#***************************************************************************
+# _ _ ____ _
+# Project ___| | | | _ \| |
+# / __| | | | |_) | |
+# | (__| |_| | _ <| |___
+# \___|\___/|_| \_\_____|
+#
+# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
+#
+# This software is licensed as described in the file COPYING, which
+# you should have received as part of this distribution. The terms
+# are also available at https://curl.se/docs/copyright.html.
+#
+# You may opt to use, copy, modify, merge, publish, distribute and/or sell
+# copies of the Software, and permit persons to whom the Software is
+# furnished to do so, under the terms of the COPYING file.
+#
+# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
+# KIND, either express or implied.
+#
+# SPDX-License-Identifier: curl
+#
+###########################################################################
+#
+import difflib
+import filecmp
+import logging
+import os
+import shutil
+import pytest
+
+from testenv import Env, CurlClient, VsFTPD
+
+
+log = logging.getLogger(__name__)
+
+
+@pytest.mark.skipif(condition=not Env.has_vsftpd(), reason=f"missing vsftpd")
+# rustsl: transfers sometimes fail with "received corrupt message of type InvalidContentType"
+# sporadic, never seen when filter tracing is on
+@pytest.mark.skipif(condition=Env.curl_uses_lib('rustls-ffi'), reason=f"rustls unreliable here")
+class TestVsFTPD:
+
+ SUPPORTS_SSL = True
+
+ @pytest.fixture(autouse=True, scope='class')
+ def vsftpds(self, env):
+ if not TestVsFTPD.SUPPORTS_SSL:
+ pytest.skip('vsftpd does not seem to support SSL')
+ vsftpds = VsFTPD(env=env, with_ssl=True)
+ if not vsftpds.start():
+ vsftpds.stop()
+ TestVsFTPD.SUPPORTS_SSL = False
+ pytest.skip('vsftpd does not seem to support SSL')
+ yield vsftpds
+ vsftpds.stop()
+
+ def _make_docs_file(self, docs_dir: str, fname: str, fsize: int):
+ fpath = os.path.join(docs_dir, fname)
+ data1k = 1024*'x'
+ flen = 0
+ with open(fpath, 'w') as fd:
+ while flen < fsize:
+ fd.write(data1k)
+ flen += len(data1k)
+ return flen
+
+ @pytest.fixture(autouse=True, scope='class')
+ def _class_scope(self, env, vsftpds):
+ if os.path.exists(vsftpds.docs_dir):
+ shutil.rmtree(vsftpds.docs_dir)
+ if not os.path.exists(vsftpds.docs_dir):
+ os.makedirs(vsftpds.docs_dir)
+ self._make_docs_file(docs_dir=vsftpds.docs_dir, fname='data-1k', fsize=1024)
+ self._make_docs_file(docs_dir=vsftpds.docs_dir, fname='data-10k', fsize=10*1024)
+ self._make_docs_file(docs_dir=vsftpds.docs_dir, fname='data-1m', fsize=1024*1024)
+ self._make_docs_file(docs_dir=vsftpds.docs_dir, fname='data-10m', fsize=10*1024*1024)
+
+ def test_31_01_list_dir(self, env: Env, vsftpds: VsFTPD, repeat):
+ curl = CurlClient(env=env)
+ url = f'ftp://{env.ftp_domain}:{vsftpds.port}/'
+ r = curl.ftp_ssl_get(urls=[url], with_stats=True)
+ r.check_stats(count=1, http_status=226)
+ lines = open(os.path.join(curl.run_dir, 'download_#1.data')).readlines()
+ assert len(lines) == 4, f'list: {lines}'
+
+ # download 1 file, no SSL
+ @pytest.mark.parametrize("docname", [
+ 'data-1k', 'data-1m', 'data-10m'
+ ])
+ def test_31_02_download_1(self, env: Env, vsftpds: VsFTPD, docname, repeat):
+ curl = CurlClient(env=env)
+ srcfile = os.path.join(vsftpds.docs_dir, f'{docname}')
+ count = 1
+ url = f'ftp://{env.ftp_domain}:{vsftpds.port}/{docname}?[0-{count-1}]'
+ r = curl.ftp_ssl_get(urls=[url], with_stats=True)
+ r.check_stats(count=count, http_status=226)
+ self.check_downloads(curl, srcfile, count)
+
+ @pytest.mark.parametrize("docname", [
+ 'data-1k', 'data-1m', 'data-10m'
+ ])
+ def test_31_03_download_10_serial(self, env: Env, vsftpds: VsFTPD, docname, repeat):
+ curl = CurlClient(env=env)
+ srcfile = os.path.join(vsftpds.docs_dir, f'{docname}')
+ count = 10
+ url = f'ftp://{env.ftp_domain}:{vsftpds.port}/{docname}?[0-{count-1}]'
+ r = curl.ftp_ssl_get(urls=[url], with_stats=True)
+ r.check_stats(count=count, http_status=226)
+ self.check_downloads(curl, srcfile, count)
+
+ @pytest.mark.parametrize("docname", [
+ 'data-1k', 'data-1m', 'data-10m'
+ ])
+ def test_31_04_download_10_parallel(self, env: Env, vsftpds: VsFTPD, docname, repeat):
+ curl = CurlClient(env=env)
+ srcfile = os.path.join(vsftpds.docs_dir, f'{docname}')
+ count = 10
+ url = f'ftp://{env.ftp_domain}:{vsftpds.port}/{docname}?[0-{count-1}]'
+ r = curl.ftp_ssl_get(urls=[url], with_stats=True, extra_args=[
+ '--parallel'
+ ])
+ r.check_stats(count=count, http_status=226)
+ self.check_downloads(curl, srcfile, count)
+
+ def check_downloads(self, client, srcfile: str, count: int,
+ complete: bool = True):
+ for i in range(count):
+ dfile = client.download_file(i)
+ assert os.path.exists(dfile)
+ if complete and not filecmp.cmp(srcfile, dfile, shallow=False):
+ diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
+ b=open(dfile).readlines(),
+ fromfile=srcfile,
+ tofile=dfile,
+ n=1))
+ assert False, f'download {dfile} differs:\n{diff}'
+
+
+
from .client import LocalClient
from .nghttpx import Nghttpx
from .nghttpx import Nghttpx, NghttpxQuic, NghttpxFwd
+from .vsftpd import VsFTPD
import ipaddress
import os
import re
-from datetime import timedelta, datetime
+from datetime import timedelta, datetime, timezone
from typing import List, Any, Optional
from cryptography import x509
if os.path.isfile(cert_file) and os.path.isfile(pkey_file):
cert = self.load_pem_cert(cert_file)
pkey = self.load_pem_pkey(pkey_file)
- if check_valid and \
- ((cert.not_valid_after < datetime.now()) or
- (cert.not_valid_before > datetime.now())):
- return None
+ try:
+ now = datetime.now(tz=timezone.utc)
+ if check_valid and \
+ ((cert.not_valid_after_utc < now) or
+ (cert.not_valid_before_utc > now)):
+ return None
+ except AttributeError: # older python
+ now = datetime.now()
+ if check_valid and \
+ ((cert.not_valid_after < now) or
+ (cert.not_valid_before > now)):
+ return None
creds = Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
creds.set_store(self)
creds.set_files(cert_file, pkey_file, comb_file)
with_stats=with_stats,
with_headers=with_headers)
+ def ftp_get(self, urls: List[str],
+ with_stats: bool = True,
+ with_profile: bool = False,
+ no_save: bool = False,
+ extra_args: List[str] = None):
+ if extra_args is None:
+ extra_args = []
+ if no_save:
+ extra_args.extend([
+ '-o', '/dev/null',
+ ])
+ else:
+ extra_args.extend([
+ '-o', 'download_#1.data',
+ ])
+ # remove any existing ones
+ for i in range(100):
+ self._rmf(self.download_file(i))
+ if with_stats:
+ extra_args.extend([
+ '-w', '%{json}\\n'
+ ])
+ return self._raw(urls, options=extra_args,
+ with_stats=with_stats,
+ with_headers=False,
+ with_profile=with_profile)
+
+ def ftp_ssl_get(self, urls: List[str],
+ with_stats: bool = True,
+ with_profile: bool = False,
+ no_save: bool = False,
+ extra_args: List[str] = None):
+ if extra_args is None:
+ extra_args = []
+ extra_args.extend([
+ '--ssl-reqd',
+ ])
+ return self.ftp_get(urls=urls, with_stats=with_stats,
+ with_profile=with_profile, no_save=no_save,
+ extra_args=extra_args)
+
def response_file(self, idx: int):
return os.path.join(self._run_dir, f'download_{idx}.data')
'libs': [],
'lib_versions': [],
}
+ self.curl_is_debug = False
self.curl_protos = []
p = subprocess.run(args=[self.curl, '-V'],
capture_output=True, text=True)
if p.returncode != 0:
assert False, f'{self.curl} -V failed with exit code: {p.returncode}'
+ if p.stderr.startswith('WARNING:'):
+ self.curl_is_debug = True
for l in p.stdout.splitlines(keepends=False):
if l.startswith('curl '):
m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', l)
]
self.ports = alloc_ports(port_specs={
+ 'ftp': socket.SOCK_STREAM,
+ 'ftps': socket.SOCK_STREAM,
'http': socket.SOCK_STREAM,
'https': socket.SOCK_STREAM,
'proxy': socket.SOCK_STREAM,
self.domain1 = f"one.{self.tld}"
self.domain1brotli = f"brotli.one.{self.tld}"
self.domain2 = f"two.{self.tld}"
+ self.ftp_domain = f"ftp.{self.tld}"
self.proxy_domain = f"proxy.{self.tld}"
self.cert_specs = [
CertificateSpec(domains=[self.domain1, self.domain1brotli, 'localhost', '127.0.0.1'], key_type='rsa2048'),
CertificateSpec(domains=[self.domain2], key_type='rsa2048'),
+ CertificateSpec(domains=[self.ftp_domain], key_type='rsa2048'),
CertificateSpec(domains=[self.proxy_domain, '127.0.0.1'], key_type='rsa2048'),
CertificateSpec(name="clientsX", sub_specs=[
CertificateSpec(name="user1", client=True),
if p.returncode != 0:
# not a working caddy
self.caddy = None
- m = re.match(r'v?(\d+\.\d+\.\d+) .*', p.stdout)
+ m = re.match(r'v?(\d+\.\d+\.\d+).*', p.stdout)
if m:
self._caddy_version = m.group(1)
else:
except:
self.caddy = None
+ self.vsftpd = self.config['vsftpd']['vsftpd']
+ self._vsftpd_version = None
+ if self.vsftpd is not None:
+ try:
+ p = subprocess.run(args=[self.vsftpd, '-v'],
+ capture_output=True, text=True)
+ if p.returncode != 0:
+ # not a working vsftpd
+ self.vsftpd = None
+ m = re.match(r'vsftpd: version (\d+\.\d+\.\d+)', p.stderr)
+ if m:
+ self._vsftpd_version = m.group(1)
+ elif len(p.stderr) == 0:
+ # vsftp does not use stdout or stderr for printing its version... -.-
+ self._vsftpd_version = 'unknown'
+ else:
+ raise Exception(f'Unable to determine VsFTPD version from: {p.stderr}')
+ except Exception as e:
+ self.vsftpd = None
+
@property
def httpd_version(self):
if self._httpd_version is None and self.apxs is not None:
def caddy_version(self):
return self._caddy_version
+ @property
+ def vsftpd_version(self):
+ return self._vsftpd_version
+
class Env:
def curl_version() -> str:
return Env.CONFIG.curl_props['version']
+ @staticmethod
+ def curl_is_debug() -> bool:
+ return Env.CONFIG.curl_is_debug
+
@staticmethod
def have_h3() -> bool:
return Env.have_h3_curl() and Env.have_h3_server()
def has_caddy() -> bool:
return Env.CONFIG.caddy is not None
+ @staticmethod
+ def has_vsftpd() -> bool:
+ return Env.CONFIG.vsftpd is not None
+
+ @staticmethod
+ def vsftpd_version() -> str:
+ return Env.CONFIG.vsftpd_version
+
def __init__(self, pytestconfig=None):
self._verbose = pytestconfig.option.verbose \
if pytestconfig is not None else 0
def domain2(self) -> str:
return self.CONFIG.domain2
+ @property
+ def ftp_domain(self) -> str:
+ return self.CONFIG.ftp_domain
+
@property
def proxy_domain(self) -> str:
return self.CONFIG.proxy_domain
def proxys_port(self) -> int:
return self.CONFIG.ports['proxys']
+ @property
+ def ftp_port(self) -> int:
+ return self.CONFIG.ports['ftp']
+
+ @property
+ def ftps_port(self) -> int:
+ return self.CONFIG.ports['ftps']
+
@property
def h2proxys_port(self) -> int:
return self.CONFIG.ports['h2proxys']
def caddy_http_port(self) -> int:
return self.CONFIG.ports['caddy']
+ @property
+ def vsftpd(self) -> str:
+ return self.CONFIG.vsftpd
+
@property
def ws_port(self) -> int:
return self.CONFIG.ports['ws']
--- /dev/null
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#***************************************************************************
+# _ _ ____ _
+# Project ___| | | | _ \| |
+# / __| | | | |_) | |
+# | (__| |_| | _ <| |___
+# \___|\___/|_| \_\_____|
+#
+# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
+#
+# This software is licensed as described in the file COPYING, which
+# you should have received as part of this distribution. The terms
+# are also available at https://curl.se/docs/copyright.html.
+#
+# You may opt to use, copy, modify, merge, publish, distribute and/or sell
+# copies of the Software, and permit persons to whom the Software is
+# furnished to do so, under the terms of the COPYING file.
+#
+# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
+# KIND, either express or implied.
+#
+# SPDX-License-Identifier: curl
+#
+###########################################################################
+#
+import inspect
+import logging
+import os
+import subprocess
+from datetime import timedelta, datetime
+from json import JSONEncoder
+import time
+from typing import List, Union, Optional
+
+from .curl import CurlClient, ExecResult
+from .env import Env
+
+
+log = logging.getLogger(__name__)
+
+
+class VsFTPD:
+
+ def __init__(self, env: Env, with_ssl=False):
+ self.env = env
+ self._cmd = env.vsftpd
+ self._scheme = 'ftp'
+ self._with_ssl = with_ssl
+ if self._with_ssl:
+ self._port = self.env.ftps_port
+ name = 'vsftpds'
+ else:
+ self._port = self.env.ftp_port
+ name = 'vsftpd'
+ self._vsftpd_dir = os.path.join(env.gen_dir, name)
+ self._run_dir = os.path.join(self._vsftpd_dir, 'run')
+ self._docs_dir = os.path.join(self._vsftpd_dir, 'docs')
+ self._tmp_dir = os.path.join(self._vsftpd_dir, 'tmp')
+ self._conf_file = os.path.join(self._vsftpd_dir, 'test.conf')
+ self._pid_file = os.path.join(self._vsftpd_dir, 'vsftpd.pid')
+ self._error_log = os.path.join(self._vsftpd_dir, 'vsftpd.log')
+ self._process = None
+
+ self.clear_logs()
+
+ @property
+ def domain(self):
+ return self.env.ftp_domain
+
+ @property
+ def docs_dir(self):
+ return self._docs_dir
+
+ @property
+ def port(self) -> str:
+ return self._port
+
+ def clear_logs(self):
+ self._rmf(self._error_log)
+
+ def exists(self):
+ return os.path.exists(self._cmd)
+
+ def is_running(self):
+ if self._process:
+ self._process.poll()
+ return self._process.returncode is None
+ return False
+
+ def start_if_needed(self):
+ if not self.is_running():
+ return self.start()
+ return True
+
+ def start(self, wait_live=True):
+ pass
+
+ def stop_if_running(self):
+ if self.is_running():
+ return self.stop()
+ return True
+
+ def stop(self, wait_dead=True):
+ self._mkpath(self._tmp_dir)
+ if self._process:
+ self._process.terminate()
+ self._process.wait(timeout=2)
+ self._process = None
+ return not wait_dead or self.wait_dead(timeout=timedelta(seconds=5))
+ return True
+
+ def restart(self):
+ self.stop()
+ return self.start()
+
+ def start(self, wait_live=True):
+ self._mkpath(self._tmp_dir)
+ if self._process:
+ self.stop()
+ self._write_config()
+ args = [
+ self._cmd,
+ f'{self._conf_file}',
+ ]
+ procerr = open(self._error_log, 'a')
+ self._process = subprocess.Popen(args=args, stderr=procerr)
+ if self._process.returncode is not None:
+ return False
+ return not wait_live or self.wait_live(timeout=timedelta(seconds=5))
+
+ def wait_dead(self, timeout: timedelta):
+ curl = CurlClient(env=self.env, run_dir=self._tmp_dir)
+ try_until = datetime.now() + timeout
+ while datetime.now() < try_until:
+ check_url = f'{self._scheme}://{self.domain}:{self.port}/'
+ r = curl.ftp_get(urls=[check_url], extra_args=['-v'])
+ if r.exit_code != 0:
+ return True
+ log.debug(f'waiting for vsftpd to stop responding: {r}')
+ time.sleep(.1)
+ log.debug(f"Server still responding after {timeout}")
+ return False
+
+ def wait_live(self, timeout: timedelta):
+ curl = CurlClient(env=self.env, run_dir=self._tmp_dir)
+ try_until = datetime.now() + timeout
+ while datetime.now() < try_until:
+ check_url = f'{self._scheme}://{self.domain}:{self.port}/'
+ r = curl.ftp_get(urls=[check_url], extra_args=[
+ '--trace', 'curl-start.trace', '--trace-time'
+ ])
+ if r.exit_code == 0:
+ return True
+ log.debug(f'waiting for vsftpd to become responsive: {r}')
+ time.sleep(.1)
+ log.error(f"Server still not responding after {timeout}")
+ return False
+
+ def _run(self, args, intext=''):
+ env = {}
+ for key, val in os.environ.items():
+ env[key] = val
+ with open(self._error_log, 'w') as cerr:
+ self._process = subprocess.run(args, stderr=cerr, stdout=cerr,
+ cwd=self._vsftpd_dir,
+ input=intext.encode() if intext else None,
+ env=env)
+ start = datetime.now()
+ return ExecResult(args=args, exit_code=self._process.returncode,
+ duration=datetime.now() - start)
+
+ def _rmf(self, path):
+ if os.path.exists(path):
+ return os.remove(path)
+
+ def _mkpath(self, path):
+ if not os.path.exists(path):
+ return os.makedirs(path)
+
+ def _write_config(self):
+ self._mkpath(self._docs_dir)
+ self._mkpath(self._tmp_dir)
+ conf = [ # base server config
+ f'listen=YES',
+ f'run_as_launching_user=YES',
+ f'#listen_address=127.0.0.1',
+ f'listen_port={self.port}',
+ f'local_enable=NO',
+ f'anonymous_enable=YES',
+ f'anon_root={self._docs_dir}',
+ f'dirmessage_enable=YES',
+ f'log_ftp_protocol=YES',
+ f'xferlog_enable=YES',
+ f'xferlog_std_format=YES',
+ f'xferlog_file={self._error_log}',
+ f'\n',
+ ]
+ if self._with_ssl:
+ creds = self.env.get_credentials(self.domain)
+ conf.extend([
+ f'ssl_enable=YES',
+ f'allow_anon_ssl=YES',
+ f'rsa_cert_file={creds.cert_file}',
+ f'rsa_private_key_file={creds.pkey_file}',
+ # require_ssl_reuse=YES means ctrl and data connection need to use the same session
+ f'require_ssl_reuse=NO',
+ ])
+
+ with open(self._conf_file, 'w') as fd:
+ fd.write("\n".join(conf))