def pytest_report_header(config, startdir):
env = HttpdTestEnv()
- return f"[apache httpd: {env.get_httpd_version()}, mpm: {env.mpm_type}, {env.prefix}]"
+ return f"[apache httpd: {env.get_httpd_version()}, mpm: {env.mpm_module}, {env.prefix}]"
def pytest_report_header(config, startdir):
env = CoreTestEnv(setup_dirs=False)
- return f"core [apache: {env.get_httpd_version()}, mpm: {env.mpm_type}, {env.prefix}]"
+ return f"core [apache: {env.get_httpd_version()}, mpm: {env.mpm_module}, {env.prefix}]"
@pytest.fixture(scope="package")
logging.getLogger('').setLevel(level=level)
env = CoreTestEnv(pytestconfig=pytestconfig)
env.apache_access_log_clear()
- env.apache_error_log_clear()
+ env.httpd_error_log.clear_log()
return env
@pytest.fixture(autouse=True, scope="package")
def _session_scope(env):
+ env.httpd_error_log.set_ignored_lognos([
+ 'AH10244', # core: invalid URI path
+ 'AH01264', # mod_cgid script not found
+ ])
yield
assert env.apache_stop() == 0
- errors, warnings = env.apache_errors_and_warnings()
+ errors, warnings = env.httpd_error_log.get_missed()
assert (len(errors), len(warnings)) == (0, 0),\
f"apache logged {len(errors)} errors and {len(warnings)} warnings: \n"\
"{0}\n{1}\n".format("\n".join(errors), "\n".join(warnings))
@pytest.fixture(autouse=True, scope='class')
def _class_scope(self, env):
- conf = HttpdConf(env)
- conf.add(f"""
+ conf = HttpdConf(env, extras={
+ 'base': f"""
<Directory "{env.gen_dir}">
AllowOverride None
Options +ExecCGI -MultiViews +SymLinksIfOwnerMatch
Require all granted
</Directory>
- """)
- conf.add_vhost_test1()
- conf.add_vhost_test2(extras={
+ """,
f"test2.{env.http_tld}": "AllowEncodedSlashes on",
- })
- conf.add_vhost_cgi(extras={
f"cgi.{env.http_tld}": f"ScriptAlias /cgi-bin/ {env.gen_dir}",
})
+ conf.add_vhost_test1()
+ conf.add_vhost_test2()
+ conf.add_vhost_cgi()
conf.install()
assert env.apache_restart() == 0
- yield
- errors, warnings = env.apache_errors_and_warnings()
- nl = "\n"
- assert (len(errors), len(warnings)) == (TestEncoding.EXP_AH10244_ERRS, 0),\
- f"apache logged {len(errors)} errors and {len(warnings)} warnings: \n"\
- f"{nl.join(errors)}\n{nl.join(warnings)}\n"
- env.apache_error_log_clear()
# check handling of url encodings that are accepted
@pytest.mark.parametrize("path", [
def pytest_report_header(config, startdir):
env = H2TestEnv(setup_dirs=False)
- return f"mod_h2 [apache: {env.get_httpd_version()}, mpm: {env.mpm_type}, {env.prefix}]"
+ return f"mod_h2 [apache: {env.get_httpd_version()}, mpm: {env.mpm_module}, {env.prefix}]"
def pytest_addoption(parser):
logging.getLogger('').setLevel(level=level)
env = H2TestEnv(pytestconfig=pytestconfig)
env.apache_access_log_clear()
- env.apache_error_log_clear()
+ env.httpd_error_log.clear_log()
return env
def _session_scope(env):
yield
assert env.apache_stop() == 0
- errors, warnings = env.apache_errors_and_warnings()
+ errors, warnings = env.httpd_error_log.get_missed()
assert (len(errors), len(warnings)) == (0, 0),\
f"apache logged {len(errors)} errors and {len(warnings)} warnings: \n"\
"{0}\n{1}\n".format("\n".join(errors), "\n".join(warnings))
import inspect
import logging
import os
+import re
import subprocess
+from typing import Dict, Any
from pyhttpd.certs import CertificateSpec
from pyhttpd.conf import HttpdConf
def __init__(self, pytestconfig=None, setup_dirs=True):
super().__init__(pytestconfig=pytestconfig,
local_dir=os.path.dirname(inspect.getfile(H2TestEnv)),
- add_base_conf="""
- H2MinWorkers 1
- H2MaxWorkers 64
- """,
+ add_base_conf=[
+ "H2MinWorkers 1",
+ "H2MaxWorkers 64",
+ "Protocols h2 http/1.1 h2c",
+ ],
interesting_modules=["http2", "proxy_http2", "h2test"])
self.add_cert_specs([
CertificateSpec(domains=[
]),
CertificateSpec(domains=[f"noh2.{self.http_tld}"], key_type='rsa2048'),
])
+
+ self.httpd_error_log.set_ignored_lognos([
+ 'AH02032',
+ 'AH01276',
+ 'AH01630',
+ 'AH00135',
+ 'AH02261', # Re-negotiation handshake failed (our test_101)
+ ])
+ self.httpd_error_log.add_ignored_patterns([
+ re.compile(r'.*malformed header from script \'hecho.py\': Bad header: x.*'),
+ ])
+
if setup_dirs:
self._setup = H2TestSetup(env=self)
self._setup.make()
class H2Conf(HttpdConf):
- def __init__(self, env: HttpdTestEnv, path=None):
- super().__init__(env=env, path=path)
-
+ def __init__(self, env: HttpdTestEnv, extras: Dict[str, Any] = None):
+ super().__init__(env=env, extras=HttpdConf.merge_extras(extras, {
+ f"cgi.{env.http_tld}": [
+ "SSLOptions +StdEnvVars",
+ "AddHandler cgi-script .py",
+ ]
+ }))
+
+ def start_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=False):
+ super().start_vhost(domains=domains, port=port, doc_root=doc_root, with_ssl=with_ssl)
+ if f"noh2.{self.env.http_tld}" in domains:
+ protos = ["http/1.1"]
+ elif port == self.env.https_port or with_ssl is True:
+ protos = ["h2", "http/1.1"]
+ else:
+ protos = ["h2c", "http/1.1"]
+ if f"test2.{self.env.http_tld}" in domains:
+ protos = reversed(protos)
+ self.add(f"Protocols {' '.join(protos)}")
+ return self
def add_vhost_noh2(self):
- self.start_vhost(self.env.https_port, "noh2", aliases=["noh2-alias"], doc_root="htdocs/noh2", with_ssl=True)
- self.add(f"""
- Protocols http/1.1
- SSLOptions +StdEnvVars""")
+ domains = [f"noh2.{self.env.http_tld}", f"noh2-alias.{self.env.http_tld}"]
+ self.start_vhost(domains=domains, port=self.env.https_port, doc_root="htdocs/noh2")
+ self.add(["Protocols http/1.1", "SSLOptions +StdEnvVars"])
self.end_vhost()
- self.start_vhost(self.env.http_port, "noh2", aliases=["noh2-alias"], doc_root="htdocs/noh2", with_ssl=False)
- self.add(" Protocols http/1.1")
- self.add(" SSLOptions +StdEnvVars")
+ self.start_vhost(domains=domains, port=self.env.http_port, doc_root="htdocs/noh2")
+ self.add(["Protocols http/1.1", "SSLOptions +StdEnvVars"])
self.end_vhost()
return self
+
+ def add_vhost_test1(self, proxy_self=False, h2proxy_self=False):
+ return super().add_vhost_test1(proxy_self=proxy_self, h2proxy_self=h2proxy_self)
+
+ def add_vhost_test2(self):
+ return super().add_vhost_test2()
+
@pytest.fixture(autouse=True, scope='class')
def _class_scope(self, env):
- H2Conf(env).add_vhost_test1().add_vhost_test2().install()
+ conf = H2Conf(env)
+ conf.add_vhost_test1()
+ conf.add_vhost_test2()
+ conf.install()
assert env.apache_restart() == 0
# check that we see the correct documents when using the test1 server name over http:
def test_h2_002_01(self, env):
url = env.mkurl("http", "test1", "/alive.json")
r = env.curl_get(url, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert "HTTP/1.1" == r.response["protocol"]
- assert True == r.response["json"]["alive"]
- assert "test1" == r.response["json"]["host"]
+ assert r.response["json"]["alive"] is True
+ assert r.response["json"]["host"] == "test1"
# check that we see the correct documents when using the test1 server name over https:
def test_h2_002_02(self, env):
url = env.mkurl("https", "test1", "/alive.json")
r = env.curl_get(url, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert r.response["json"]["alive"] is True
assert "test1" == r.response["json"]["host"]
- assert "application/json" == r.response["header"]["content-type"]
+ assert r.response["header"]["content-type"] == "application/json"
# enforce HTTP/1.1
def test_h2_002_03(self, env):
url = env.mkurl("https", "test1", "/alive.json")
- r = env.curl_get(url, 5, [ "--http1.1" ])
- assert 200 == r.response["status"]
- assert "HTTP/1.1" == r.response["protocol"]
+ r = env.curl_get(url, 5, options=["--http1.1"])
+ assert r.response["status"] == 200
+ assert r.response["protocol"] == "HTTP/1.1"
# enforce HTTP/2
def test_h2_002_04(self, env):
url = env.mkurl("https", "test1", "/alive.json")
- r = env.curl_get(url, 5, [ "--http2" ])
- assert 200 == r.response["status"]
- assert "HTTP/2" == r.response["protocol"]
+ r = env.curl_get(url, 5, options=["--http2"])
+ assert r.response["status"] == 200
+ assert r.response["protocol"] == "HTTP/2"
# default is HTTP/2 on this host
def test_h2_002_04b(self, env):
url = env.mkurl("https", "test1", "/alive.json")
r = env.curl_get(url, 5)
- assert 200 == r.response["status"]
- assert "HTTP/2" == r.response["protocol"]
- assert "test1" == r.response["json"]["host"]
+ assert r.response["status"] == 200
+ assert r.response["protocol"] == "HTTP/2"
+ assert r.response["json"]["host"] == "test1"
# although, without ALPN, we cannot select it
def test_h2_002_05(self, env):
url = env.mkurl("https", "test1", "/alive.json")
- r = env.curl_get(url, 5, [ "--no-alpn" ])
- assert 200 == r.response["status"]
- assert "HTTP/1.1" == r.response["protocol"]
- assert "test1" == r.response["json"]["host"]
+ r = env.curl_get(url, 5, options=["--no-alpn"])
+ assert r.response["status"] == 200
+ assert r.response["protocol"] == "HTTP/1.1"
+ assert r.response["json"]["host"] == "test1"
# default is HTTP/1.1 on the other
def test_h2_002_06(self, env):
url = env.mkurl("https", "test2", "/alive.json")
r = env.curl_get(url, 5)
- assert 200 == r.response["status"]
- assert "HTTP/1.1" == r.response["protocol"]
- assert "test2" == r.response["json"]["host"]
-
+ assert r.response["status"] == 200
+ assert r.response["protocol"] == "HTTP/1.1"
+ assert r.response["json"]["host"] == "test2"
# check SSL environment variables from CGI script
def test_h2_003_01(self, env):
url = env.mkurl("https", "cgi", "/hello.py")
- r = env.curl_get(url, 5, ["--tlsv1.2"])
- assert 200 == r.response["status"]
- assert "HTTP/2.0" == r.response["json"]["protocol"]
- assert "on" == r.response["json"]["https"]
+ r = env.curl_get(url, 5, options=["--tlsv1.2"])
+ assert r.response["status"] == 200
+ assert r.response["json"]["protocol"] == "HTTP/2.0"
+ assert r.response["json"]["https"] == "on"
tls_version = r.response["json"]["ssl_protocol"]
assert tls_version in ["TLSv1.2", "TLSv1.3"]
- assert "on" == r.response["json"]["h2"]
- assert "off" == r.response["json"]["h2push"]
+ assert r.response["json"]["h2"] == "on"
+ assert r.response["json"]["h2push"] == "off"
- r = env.curl_get(url, 5, ["--http1.1", "--tlsv1.2"])
- assert 200 == r.response["status"]
+ r = env.curl_get(url, 5, options=["--http1.1", "--tlsv1.2"])
+ assert r.response["status"] == 200
assert "HTTP/1.1" == r.response["json"]["protocol"]
assert "on" == r.response["json"]["https"]
tls_version = r.response["json"]["ssl_protocol"]
url = env.mkurl("https", "test1", "/index.html")
r = env.curl_get(url, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert "HTTP/2" == r.response["protocol"]
assert src == r.response["body"]
url = env.mkurl("https", "test1", "/index.html")
- r = env.curl_get(url, 5, ["--http1.1"])
- assert 200 == r.response["status"]
+ r = env.curl_get(url, 5, options=["--http1.1"])
+ assert r.response["status"] == 200
assert "HTTP/1.1" == r.response["protocol"]
assert src == r.response["body"]
# retrieve chunked content from a cgi script
def check_necho(self, env, n, text):
url = env.mkurl("https", "cgi", "/necho.py")
- r = env.curl_get(url, 5, ["-F", f"count={n}", "-F", f"text={text}"])
- assert 200 == r.response["status"]
+ r = env.curl_get(url, 5, options=["-F", f"count={n}", "-F", f"text={text}"])
+ assert r.response["status"] == 200
exp = ""
for i in range(n):
exp += text + "\n"
def test_h2_003_20(self, env):
url = env.mkurl("https", "test1", "/006/")
r = env.curl_get(url, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
body = r.response["body"].decode('utf-8')
# our doctype varies between branches and in time, lets not compare
body = re.sub(r'^<!DOCTYPE[^>]+>', '', body)
def test_h2_003_21(self, env):
url = env.mkurl("https", "test1", "/index.html")
- r = env.curl_get(url, 5, ["-I"])
- assert 200 == r.response["status"]
+ r = env.curl_get(url, 5, options=["-I"])
+ assert r.response["status"] == 200
assert "HTTP/2" == r.response["protocol"]
s = self.clean_header(r.response["body"].decode('utf-8'))
assert '''HTTP/2 200
''' == s
- r = env.curl_get(url, 5, ["-I", url])
- assert 200 == r.response["status"]
+ r = env.curl_get(url, 5, options=["-I", url])
+ assert r.response["status"] == 200
assert "HTTP/2" == r.response["protocol"]
s = self.clean_header(r.response["body"].decode('utf-8'))
assert '''HTTP/2 200
def test_h2_003_30(self, env, path):
url = env.mkurl("https", "test1", path)
r = env.curl_get(url, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert "HTTP/2" == r.response["protocol"]
h = r.response["header"]
assert "last-modified" in h
lastmod = h["last-modified"]
- r = env.curl_get(url, 5, ['-H', ("if-modified-since: %s" % lastmod)])
+ r = env.curl_get(url, 5, options=['-H', ("if-modified-since: %s" % lastmod)])
assert 304 == r.response["status"]
# test conditionals: if-etag
def test_h2_003_31(self, env, path):
url = env.mkurl("https", "test1", path)
r = env.curl_get(url, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert "HTTP/2" == r.response["protocol"]
h = r.response["header"]
assert "etag" in h
etag = h["etag"]
- r = env.curl_get(url, 5, ['-H', ("if-none-match: %s" % etag)])
+ r = env.curl_get(url, 5, options=['-H', ("if-none-match: %s" % etag)])
assert 304 == r.response["status"]
# test various response body lengths to work correctly
while n <= 1025024:
url = env.mkurl("https", "cgi", f"/mnot164.py?count={n}&text=X")
r = env.curl_get(url, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert "HTTP/2" == r.response["protocol"]
assert n == len(r.response["body"])
n *= 2
def test_h2_003_41(self, env, n):
url = env.mkurl("https", "cgi", f"/mnot164.py?count={n}&text=X")
r = env.curl_get(url, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert "HTTP/2" == r.response["protocol"]
assert n == len(r.response["body"])
# check that the resource supports ranges and we see its raw content-length
url = env.mkurl("https", "test1", path)
r = env.curl_get(url, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert "HTTP/2" == r.response["protocol"]
h = r.response["header"]
assert "accept-ranges" in h
@pytest.fixture(autouse=True, scope='class')
def _class_scope(self, env):
env.setup_data_1k_1m()
- H2Conf(env).add("Timeout 10").add_vhost_cgi().install()
+ H2Conf(env).add_vhost_cgi().install()
assert env.apache_restart() == 0
# upload and GET again using curl, compare to original content
url = env.mkurl("https", "cgi", "/upload.py")
fpath = os.path.join(env.gen_dir, fname)
r = env.curl_upload(url, fpath, options=options)
- assert r.exit_code == 0, r.stderr
- assert r.response["status"] >= 200 and r.response["status"] < 300
+ assert r.exit_code == 0, f"{r}"
+ assert 200 <= r.response["status"] < 300
r2 = env.curl_get(r.response["header"]["location"])
assert r2.exit_code == 0
assert src == r2.response["body"]
def test_h2_004_01(self, env):
- self.curl_upload_and_verify(env, "data-1k", ["--http1.1"])
+ self.curl_upload_and_verify(env, "data-1k", ["-vvv", "--http1.1"])
self.curl_upload_and_verify(env, "data-1k", ["--http2"])
def test_h2_004_02(self, env):
assert r.response["body"] == src, f"expected '{src}', got '{r.response['body']}'"
@pytest.mark.parametrize("name", [
- # "data-1k", "data-10k", "data-100k", "data-1m"
- "data-1m"
+ "data-1k", "data-10k", "data-100k", "data-1m"
])
- def test_h2_004_21(self, env, name, repeat):
+ def test_h2_004_21(self, env, name):
self.nghttp_post_and_verify(env, name, [])
@pytest.mark.parametrize("name", [
""").add_vhost_cgi().install()
assert env.apache_restart() == 0
url = env.mkurl("https", "cgi", "/files/{0}".format(resource))
- r = env.curl_get(url, 5, ["--http2"])
- assert 200 == r.response["status"]
- r = env.curl_get(url, 5, ["--http1.1", "-H", "Range: bytes=0-{0}".format(chunk-1)])
+ r = env.curl_get(url, 5, options=["--http2"])
+ assert r.response["status"] == 200
+ r = env.curl_get(url, 5, options=["--http1.1", "-H", "Range: bytes=0-{0}".format(chunk-1)])
assert 206 == r.response["status"]
assert chunk == len(r.response["body"].decode('utf-8'))
- r = env.curl_get(url, 5, ["--http2", "-H", "Range: bytes=0-{0}".format(chunk-1)])
+ r = env.curl_get(url, 5, options=["--http2", "-H", "Range: bytes=0-{0}".format(chunk-1)])
assert 206 == r.response["status"]
assert chunk == len(r.response["body"].decode('utf-8'))
# now check what response lengths have actually been reported
url = env.mkurl("https", "cgi", self.URI_PATHS[2])
r = env.curl_get(url)
assert r.response, r.stderr + r.stdout
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
def test_h2_100_02(self, env):
url = env.mkurl("https", "cgi", "/hello.py")
hostname = ("cgi-alias.%s" % env.http_tld)
- r = env.curl_get(url, 5, [ "-H", "Host:%s" % hostname ])
- assert 200 == r.response["status"]
+ r = env.curl_get(url, 5, options=[ "-H", "Host:%s" % hostname ])
+ assert r.response["status"] == 200
assert "HTTP/2" == r.response["protocol"]
assert hostname == r.response["json"]["host"]
def test_h2_100_03(self, env):
url = env.mkurl("https", "cgi", "/")
hostname = ("test1.%s" % env.http_tld)
- r = env.curl_get(url, 5, [ "-H", "Host:%s" % hostname ])
- assert 200 == r.response["status"]
+ r = env.curl_get(url, 5, options=[ "-H", "Host:%s" % hostname ])
+ assert r.response["status"] == 200
assert "HTTP/2" == r.response["protocol"]
assert "text/html" == r.response["header"]["content-type"]
def test_h2_100_04(self, env):
url = env.mkurl("https", "cgi", "/hello.py")
hostname = ("noh2.%s" % env.http_tld)
- r = env.curl_get(url, 5, [ "-H", "Host:%s" % hostname ])
+ r = env.curl_get(url, 5, options=[ "-H", "Host:%s" % hostname ])
assert 421 == r.response["status"]
# access an unknown vhost, after using ServerName in SNI
def test_h2_100_05(self, env):
url = env.mkurl("https", "cgi", "/hello.py")
hostname = ("unknown.%s" % env.http_tld)
- r = env.curl_get(url, 5, [ "-H", "Host:%s" % hostname ])
+ r = env.curl_get(url, 5, options=[ "-H", "Host:%s" % hostname ])
assert 421 == r.response["status"]
@pytest.fixture(autouse=True, scope='class')
def _class_scope(self, env):
- H2Conf(env).add(
- f"""
- SSLCipherSuite ECDHE-RSA-AES256-GCM-SHA384
- <Directory \"{env.server_dir}/htdocs/ssl-client-verify\">
- Require all granted
- SSLVerifyClient require
- SSLVerifyDepth 0
- </Directory>"""
- ).start_vhost(
- env.https_port, "ssl", with_ssl=True
- ).add(
- f"""
- Protocols h2 http/1.1"
- <Location /renegotiate/cipher>
- SSLCipherSuite ECDHE-RSA-CHACHA20-POLY1305
- </Location>
- <Location /renegotiate/err-doc-cipher>
- SSLCipherSuite ECDHE-RSA-CHACHA20-POLY1305
- ErrorDocument 403 /forbidden.html
- </Location>
- <Location /renegotiate/verify>
- SSLVerifyClient require
- </Location>
- <Directory \"{env.server_dir}/htdocs/sslrequire\">
- SSLRequireSSL
- </Directory>
- <Directory \"{env.server_dir}/htdocs/requiressl\">
- Require ssl
- </Directory>"""
- ).end_vhost().install()
+ domain = f"ssl.{env.http_tld}"
+ conf = H2Conf(env, extras={
+ 'base': [
+ "SSLCipherSuite ECDHE-RSA-AES256-GCM-SHA384",
+ f"<Directory \"{env.server_dir}/htdocs/ssl-client-verify\">",
+ " Require all granted",
+ " SSLVerifyClient require",
+ " SSLVerifyDepth 0",
+ "</Directory>"
+ ],
+ domain: [
+ "Protocols h2 http/1.1",
+ "<Location /renegotiate/cipher>",
+ " SSLCipherSuite ECDHE-RSA-CHACHA20-POLY1305",
+ "</Location>",
+ "<Location /renegotiate/err-doc-cipher>",
+ " SSLCipherSuite ECDHE-RSA-CHACHA20-POLY1305",
+ " ErrorDocument 403 /forbidden.html",
+ "</Location>",
+ "<Location /renegotiate/verify>",
+ " SSLVerifyClient require",
+ "</Location>",
+ f"<Directory \"{env.server_dir}/htdocs/sslrequire\">",
+ " SSLRequireSSL",
+ "</Directory>",
+ f"<Directory \"{env.server_dir}/htdocs/requiressl\">",
+ " Require ssl",
+ "</Directory>",
+ ]})
+ conf.add_vhost(domains=[domain], port=env.https_port,
+ doc_root=f"{env.server_dir}/htdocs")
+ conf.install()
# the dir needs to exists for the configuration to have effect
env.mkpath("%s/htdocs/ssl-client-verify" % env.server_dir)
env.mkpath("%s/htdocs/renegotiate/cipher" % env.server_dir)
def test_h2_101_01(self, env):
url = env.mkurl("https", "ssl", "/renegotiate/cipher/")
r = env.curl_get(url, options=["-v", "--http1.1", "--tlsv1.2", "--tls-max", "1.2"])
- assert 0 == r.exit_code
+ assert 0 == r.exit_code, f"{r}"
assert r.response
assert 403 == r.response["status"]
def test_h2_101_04(self, env):
url = env.mkurl("https", "ssl", "/ssl-client-verify/index.html")
r = env.curl_get(url, options=["-vvv", "--tlsv1.2", "--tls-max", "1.2"])
- assert 0 != r.exit_code
+ assert 0 != r.exit_code, f"{r}"
assert not r.response
assert re.search(r'HTTP_1_1_REQUIRED \(err 13\)', r.stderr)
@pytest.fixture(autouse=True, scope='class')
def _class_scope(self, env):
- conf = H2Conf(env).start_vhost(env.https_port, "ssl", with_ssl=True)
+ domain = f"ssl.{env.http_tld}"
+ conf = H2Conf(env)
+ conf.start_vhost(domains=[domain], port=env.https_port)
conf.add("""
Protocols h2 http/1.1
SSLOptions +StdEnvVars
conf.end_vhost()
conf.install()
# the dir needs to exists for the configuration to have effect
- env.mkpath("%s/htdocs/ssl-client-verify" % env.server_dir)
+ env.mkpath(f"{env.server_dir}/htdocs/ssl-client-verify")
assert env.apache_restart() == 0
def test_h2_102_01(self, env):
@pytest.fixture(autouse=True, scope='class')
def _class_scope(self, env):
H2Conf(env).add_vhost_test1().add_vhost_test2().add_vhost_noh2(
- ).start_vhost(
- env.https_port, "test3", doc_root="htdocs/test1", with_ssl=True
+ ).start_vhost(domains=[f"test3.{env.http_tld}"], port=env.https_port, doc_root="htdocs/test1"
).add(
"""
Protocols h2 http/1.1
Header unset Upgrade"""
).end_vhost(
- ).start_vhost(
- env.http_port, "test1b", doc_root="htdocs/test1", with_ssl=False
+ ).start_vhost(domains=[f"test1b.{env.http_tld}"], port=env.http_port, doc_root="htdocs/test1"
).add(
"""
Protocols h2c http/1.1
def test_h2_103_20(self, env):
url = env.mkurl("http", "test1", "/index.html")
r = env.nghttp().get(url, options=["-u"])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
# upgrade to h2c for a request where http/1.1 is preferred, but the clients upgrade
# wish is honored nevertheless
def test_h2_103_24(self, env):
url = env.mkurl("http", "test1b", "/006.html")
r = env.nghttp().get(url, options=["-u"])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
@pytest.fixture(autouse=True, scope='class')
def _class_scope(self, env):
conf = H2Conf(env)
- conf.start_vhost(env.https_port, "ssl", doc_root="htdocs/cgi", with_ssl=True)
- conf.add("Protocols h2 http/1.1")
+ conf.start_vhost(domains=[f"ssl.{env.http_tld}"], port=env.https_port, doc_root="htdocs/cgi")
conf.add("AddHandler cgi-script .py")
conf.end_vhost()
- conf.start_vhost(env.https_port, "pad0", doc_root="htdocs/cgi", with_ssl=True)
- conf.add("Protocols h2 http/1.1")
+ conf.start_vhost(domains=[f"pad0.{env.http_tld}"], port=env.https_port, doc_root="htdocs/cgi")
conf.add("H2Padding 0")
conf.add("AddHandler cgi-script .py")
conf.end_vhost()
- conf.start_vhost(env.https_port, "pad1", doc_root="htdocs/cgi", with_ssl=True)
- conf.add("Protocols h2 http/1.1")
+ conf.start_vhost(domains=[f"pad1.{env.http_tld}"], port=env.https_port, doc_root="htdocs/cgi")
conf.add("H2Padding 1")
conf.add("AddHandler cgi-script .py")
conf.end_vhost()
- conf.start_vhost(env.https_port, "pad2", doc_root="htdocs/cgi", with_ssl=True)
- conf.add("Protocols h2 http/1.1")
+ conf.start_vhost(domains=[f"pad2.{env.http_tld}"], port=env.https_port, doc_root="htdocs/cgi")
conf.add("H2Padding 2")
conf.add("AddHandler cgi-script .py")
conf.end_vhost()
- conf.start_vhost(env.https_port, "pad3", doc_root="htdocs/cgi", with_ssl=True)
- conf.add("Protocols h2 http/1.1")
+ conf.start_vhost(domains=[f"pad3.{env.http_tld}"], port=env.https_port, doc_root="htdocs/cgi")
conf.add("H2Padding 3")
conf.add("AddHandler cgi-script .py")
conf.end_vhost()
- conf.start_vhost(env.https_port, "pad8", doc_root="htdocs/cgi", with_ssl=True)
- conf.add("Protocols h2 http/1.1")
+ conf.start_vhost(domains=[f"pad8.{env.http_tld}"], port=env.https_port, doc_root="htdocs/cgi")
conf.add("H2Padding 8")
conf.add("AddHandler cgi-script .py")
conf.end_vhost()
# check the number of padding bytes is as expected
for data in ["x", "xx", "xxx", "xxxx", "xxxxx", "xxxxxx", "xxxxxxx", "xxxxxxxx"]:
r = env.nghttp().post_data(url, data, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert r.results["paddings"] == [
frame_padding(len(data)+1, 0),
frame_padding(0, 0)
url = env.mkurl("https", "pad0", "/echo.py")
for data in ["x", "xx", "xxx", "xxxx", "xxxxx", "xxxxxx", "xxxxxxx", "xxxxxxxx"]:
r = env.nghttp().post_data(url, data, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert r.results["paddings"] == [0, 0]
# 1 bit of padding
url = env.mkurl("https", "pad1", "/echo.py")
for data in ["x", "xx", "xxx", "xxxx", "xxxxx", "xxxxxx", "xxxxxxx", "xxxxxxxx"]:
r = env.nghttp().post_data(url, data, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
for i in r.results["paddings"]:
assert i in range(0, 2)
url = env.mkurl("https", "pad2", "/echo.py")
for data in ["x", "xx", "xxx", "xxxx", "xxxxx", "xxxxxx", "xxxxxxx", "xxxxxxxx"]:
r = env.nghttp().post_data(url, data, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
for i in r.results["paddings"]:
assert i in range(0, 4)
url = env.mkurl("https", "pad3", "/echo.py")
for data in ["x", "xx", "xxx", "xxxx", "xxxxx", "xxxxxx", "xxxxxxx", "xxxxxxxx"]:
r = env.nghttp().post_data(url, data, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
for i in r.results["paddings"]:
assert i in range(0, 8)
url = env.mkurl("https", "pad8", "/echo.py")
for data in ["x", "xx", "xxx", "xxxx", "xxxxx", "xxxxxx", "xxxxxxx", "xxxxxxxx"]:
r = env.nghttp().post_data(url, data, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
for i in r.results["paddings"]:
assert i in range(0, 256)
conf.install()
assert env.apache_restart() == 0
url = env.mkurl("https", "cgi", "/necho.py")
- r = env.curl_get(url, 5, [
+ r = env.curl_get(url, 5, options=[
"-vvv",
"-F", ("count=%d" % 100),
"-F", ("text=%s" % "abcdefghijklmnopqrstuvwxyz"),
"-F", ("wait1=%f" % 1.5),
])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
def test_h2_105_10(self, env):
# just a check without delays if all is fine
"-F", f"text={text}",
"-F", f"wait2={wait2}",
]
- self.r = env.curl_get(url, 5, args)
+ self.r = env.curl_get(url, 5, options=args)
t = Thread(target=long_request)
t.start()
assert env.apache_reload() == 0
t.join()
# noinspection PyTypeChecker
+ time.sleep(1)
r: ExecResult = self.r
- assert r.response["status"] == 200
- assert len(r.response["body"]) == (lines * (len(text)+1))
+ assert r.exit_code == 0
+ assert r.response, f"no response via {r.args} in {r.stderr}\nstdout: {len(r.stdout)} bytes"
+ assert r.response["status"] == 200, f"{r}"
+ assert len(r.response["body"]) == (lines * (len(text)+1)), f"{r}"
val = "%s%s%s%s%s%s%s%s%s%s" % (val, val, val, val, val, val, val, val, val, val)
# LimitRequestLine 8190 ok, one more char -> 431
r = env.curl_get(url, options=["-H", "x: %s" % (val[:8187])])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
r = env.curl_get(url, options=["-H", "x: %sx" % (val[:8188])])
assert 431 == r.response["status"]
# same with field name
r = env.curl_get(url, options=["-H", "y%s: 1" % (val[:8186])])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
r = env.curl_get(url, options=["-H", "y%s: 1" % (val[:8188])])
assert 431 == r.response["status"]
# LimitRequestFieldSize 8190 ok, one more char -> 400 in HTTP/1.1
# (we send 4000+4185 since they are concatenated by ", " and start with "x: "
r = env.curl_get(url, options=["-H", "x: %s" % (val[:4000]), "-H", "x: %s" % (val[:4185])])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
r = env.curl_get(url, options=["--http1.1", "-H", "x: %s" % (val[:4000]), "-H", "x: %s" % (val[:4189])])
assert 400 == r.response["status"]
r = env.curl_get(url, options=["-H", "x: %s" % (val[:4000]), "-H", "x: %s" % (val[:4191])])
for i in range(98): # curl sends 2 headers itself (user-agent and accept)
opt += ["-H", "x: 1"]
r = env.curl_get(url, options=opt)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
r = env.curl_get(url, options=(opt + ["-H", "y: 2"]))
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
# test header field count, LimitRequestFields (default 100)
# different header names count each
for i in range(98): # curl sends 2 headers itself (user-agent and accept)
opt += ["-H", "x{0}: 1".format(i)]
r = env.curl_get(url, options=opt)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
r = env.curl_get(url, options=(opt + ["-H", "y: 2"]))
assert 431 == r.response["status"]
for i in range(100):
opt += ["-H", "x{0}: 1".format(i)]
r = env.curl_get(url, options=opt)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
# the uri limits
def test_h2_200_15(self, env):
assert env.apache_restart() == 0
url = env.mkurl("https", "cgi", "/")
r = env.curl_get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
url = env.mkurl("https", "cgi", "/" + (48*"x"))
r = env.curl_get(url)
assert 414 == r.response["status"]
def test_h2_201_01(self, env):
url = env.mkurl("https", "test1", "/006/006.css")
r = env.curl_get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
lm = r.response["header"]["last-modified"]
assert lm
r = env.curl_get(url, options=["-H", "if-modified-since: %s" % lm])
assert 304 == r.response["status"]
r = env.curl_get(url, options=["-H", "if-modified-since: Tue, 04 Sep 2010 11:51:59 GMT"])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
# check handling of 'if-none-match' header
def test_h2_201_02(self, env):
url = env.mkurl("https", "test1", "/006/006.css")
r = env.curl_get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
etag = r.response["header"]["etag"]
assert etag
r = env.curl_get(url, options=["-H", "if-none-match: %s" % etag])
assert 304 == r.response["status"]
r = env.curl_get(url, options=["-H", "if-none-match: dummy"])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
@pytest.mark.skipif(True, reason="304 misses the Vary header in trunk and 2.4.x")
def test_h2_201_03(self, env):
url = env.mkurl("https", "test1", "/006.html")
r = env.curl_get(url, options=["-H", "Accept-Encoding: gzip"])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
for h in r.response["header"]:
print("%s: %s" % (h, r.response["header"][h]))
lm = r.response["header"]["last-modified"]
def test_h2_201_04(self, env):
url = env.mkurl("https", "test1", "/006.html")
r = env.curl_get(url, options=["--http1.1", "-H", "Connection: keep-alive"])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert "timeout=30, max=30" == r.response["header"]["keep-alive"]
r = env.curl_get(url, options=["-H", "Connection: keep-alive"])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert "keep-alive" not in r.response["header"]
def test_h2_300_01(self, env):
url = env.mkurl("https", "test1", "/index.html")
r = env.curl_post_data(url, 'XYZ')
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert "previous" not in r.response
# check that we see an interim response when we ask for it
def test_h2_300_02(self, env):
url = env.mkurl("https", "cgi", "/echo.py")
r = env.curl_post_data(url, 'XYZ', options=["-H", "expect: 100-continue"])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert "previous" in r.response
assert 100 == r.response["previous"]["status"]
@pytest.fixture(autouse=True, scope='class')
def _class_scope(self, env):
- H2Conf(env).start_vhost(
- env.https_port, "push", doc_root="htdocs/test1", with_ssl=True
- ).add(r""" Protocols h2 http/1.1"
+ H2Conf(env).start_vhost(domains=[f"push.{env.http_tld}"],
+ port=env.https_port, doc_root="htdocs/test1"
+ ).add(r"""
RewriteEngine on
RewriteRule ^/006-push(.*)?\.html$ /006.html
<Location /006-push.html>
def test_h2_400_00(self, env):
url = env.mkurl("https", "push", "/006.html")
r = env.nghttp().get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 0 == len(promises)
def test_h2_400_01(self, env):
url = env.mkurl("https", "push", "/006-push.html")
r = env.nghttp().get(url, options=["-Haccept-encoding: none"])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 1 == len(promises)
assert '/006/006.css' == promises[0]["request"]["header"][":path"]
def test_h2_400_02(self, env):
url = env.mkurl("https", "push", "/006-push2.html")
r = env.nghttp().get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 1 == len(promises)
assert '/006/006.js' == promises[0]["request"]["header"][":path"]
def test_h2_400_03(self, env):
url = env.mkurl("https", "push", "/006-push3.html")
r = env.nghttp().get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 1 == len(promises)
assert '/006/006.js' == promises[0]["request"]["header"][":path"]
def test_h2_400_04(self, env):
url = env.mkurl("https", "push", "/006-push4.html")
r = env.nghttp().get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 0 == len(promises)
def test_h2_400_05(self, env):
url = env.mkurl("https", "push", "/006-push5.html")
r = env.nghttp().get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 1 == len(promises)
assert '/006/006.css' == promises[0]["request"]["header"][":path"]
def test_h2_400_06(self, env):
url = env.mkurl("https", "push", "/006-push6.html")
r = env.nghttp().get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 1 == len(promises)
assert '/006/006.css' == promises[0]["request"]["header"][":path"]
def test_h2_400_07(self, env):
url = env.mkurl("https", "push", "/006-push7.html")
r = env.nghttp().get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 1 == len(promises)
assert '/006/006.css' == promises[0]["request"]["header"][":path"]
def test_h2_400_08(self, env):
url = env.mkurl("https", "push", "/006-push8.html")
r = env.nghttp().get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 0 == len(promises)
# 2 H2PushResource config trigger on GET, but not on POST
- def test_h2_400_20(self, env, repeat):
+ def test_h2_400_20(self, env):
url = env.mkurl("https", "push", "/006-push20.html")
r = env.nghttp().get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 2 == len(promises)
with open(fpath, 'w') as f:
f.write("test upload data")
r = env.nghttp().upload(url, fpath)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 0 == len(promises)
def test_h2_400_30(self, env):
url = env.mkurl("https", "push", "/006-push30.html")
r = env.nghttp().get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 0 == len(promises)
def test_h2_400_50(self, env):
url = env.mkurl("https", "push", "/006-push.html")
r = env.nghttp().get(url, options=['-H', 'accept-push-policy: none'])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 0 == len(promises)
def test_h2_400_51(self, env):
url = env.mkurl("https", "push", "/006-push.html")
r = env.nghttp().get(url, options=['-H', 'accept-push-policy: default'])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 1 == len(promises)
def test_h2_400_52(self, env):
url = env.mkurl("https", "push", "/006-push.html")
r = env.nghttp().get(url, options=['-H', 'accept-push-policy: head'])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 1 == len(promises)
assert '/006/006.css' == promises[0]["request"]["header"][":path"]
def test_h2_400_53(self, env):
url = env.mkurl("https", "push", "/006-push.html")
r = env.nghttp().get(url, options=['-H', 'accept-push-policy: fast-load'])
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 1 == len(promises)
@pytest.fixture(autouse=True, scope='class')
def _class_scope(self, env):
- H2Conf(env).start_vhost(
- env.https_port, "hints", doc_root="htdocs/test1", with_ssl=True
- ).add(""" Protocols h2 http/1.1"
+ H2Conf(env).start_vhost(domains=[f"hints.{env.http_tld}"],
+ port=env.https_port, doc_root="htdocs/test1"
+ ).add("""
H2EarlyHints on
RewriteEngine on
RewriteRule ^/006-(.*)?\\.html$ /006.html
assert env.apache_restart() == 0
# H2EarlyHints enabled in general, check that it works for H2PushResource
- def test_h2_401_31(self, env, repeat):
+ def test_h2_401_31(self, env):
url = env.mkurl("https", "hints", "/006-hints.html")
r = env.nghttp().get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 1 == len(promises)
early = r.response["previous"]
def test_h2_401_32(self, env):
url = env.mkurl("https", "hints", "/006-nohints.html")
r = env.nghttp().get(url)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
promises = r.results["streams"][r.response["id"]]["promises"]
assert 1 == len(promises)
assert "previous" not in r.response
def test_h2_500_01(self, env):
url = env.mkurl("https", "cgi", "/proxy/hello.py")
r = env.curl_get(url, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert "HTTP/1.1" == r.response["json"]["protocol"]
assert "" == r.response["json"]["https"]
assert "" == r.response["json"]["ssl_protocol"]
@pytest.fixture(autouse=True, scope='class')
def _class_scope(self, env):
- conf = H2Conf(env)
- conf.add_vhost_cgi(proxy_self=True, h2proxy_self=False, extras={
- f'cgi.{env.http_tld}': f"""
- Header unset Server
- Header always set Server cgi
- """
+ conf = H2Conf(env, extras={
+ f'cgi.{env.http_tld}': [
+ "Header unset Server",
+ "Header always set Server cgi",
+ ]
})
+ conf.add_vhost_cgi(proxy_self=True, h2proxy_self=False)
conf.install()
assert env.apache_restart() == 0
def test_h2_501_01(self, env):
url = env.mkurl("https", "cgi", "/proxy/hello.py")
r = env.curl_get(url, 5)
- assert 200 == r.response["status"]
+ assert r.response["status"] == 200
assert "HTTP/1.1" == r.response["json"]["protocol"]
assert "" == r.response["json"]["https"]
assert "" == r.response["json"]["ssl_protocol"]
+from typing import Dict, Any
+
from pyhttpd.env import HttpdTestEnv
class HttpdConf(object):
- def __init__(self, env: HttpdTestEnv, path=None):
+ def __init__(self, env: HttpdTestEnv, extras: Dict[str, Any] = None):
+ """ Create a new httpd configuration.
+ :param env: then environment this operates in
+ :param extras: extra configuration directive with ServerName as key and
+ 'base' as special key for global configuration additions.
+ """
self.env = env
+ self._indents = 0
self._lines = []
- self._has_ssl_vhost = False
+ self._extras = extras.copy() if extras else {}
+ if 'base' in self._extras:
+ self.add(self._extras['base'])
+
+ def __repr__(self):
+ s = '\n'.join(self._lines)
+ return f"HttpdConf[{s}]"
def install(self):
- if not self._has_ssl_vhost:
- self.add_vhost_test1()
self.env.install_test_conf(self._lines)
- def add(self, line):
- if isinstance(line, list):
- self._lines.extend(line)
- else:
+ def add(self, line: Any):
+ if isinstance(line, str):
+ if self._indents > 0:
+ line = f"{' ' * self._indents}{line}"
self._lines.append(line)
+ else:
+ if self._indents > 0:
+ line = [f"{' ' * self._indents}{l}" for l in line]
+ self._lines.extend(line)
return self
- def add_vhost(self, port, name, aliases=None, doc_root="htdocs", with_ssl=True):
- self.start_vhost(port, name, aliases, doc_root, with_ssl)
+ def add_certificate(self, cert_file, key_file):
+ if self.env.ssl_module == "ssl":
+ self.add([
+ f"SSLCertificateFile {cert_file}",
+ f"SSLCertificateKeyFile {key_file}",
+ ])
+ elif self.env.ssl_module == "tls":
+ self.add(f"""
+ TLSCertificate {cert_file} {key_file}
+ """)
+
+ def add_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=True):
+ self.start_vhost(domains=domains, port=port, doc_root=doc_root, with_ssl=with_ssl)
self.end_vhost()
return self
- def start_vhost(self, port, name, aliases=None, doc_root="htdocs", with_ssl=True):
- server_domain = f"{name}.{self.env.http_tld}"
- lines = [
- f"<VirtualHost *:{port}>",
- f" ServerName {server_domain}"
- ]
- if aliases:
- lines.extend([
- f" ServerAlias {alias}.{self.env.http_tld}" for alias in aliases])
- lines.append(f" DocumentRoot {doc_root}")
- if with_ssl:
- self._has_ssl_vhost = True
- lines.append(" SSLEngine on")
- for cred in self.env.get_credentials_for_name(server_domain):
- lines.extend([
- f"SSLCertificateFile {cred.cert_file}",
- f"SSLCertificateKeyFile {cred.pkey_file}",
- ])
- return self.add(lines)
+ def start_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=False):
+ if not isinstance(domains, list):
+ domains = [domains]
+ if port is None:
+ port = self.env.https_port
+ self.add("")
+ self.add(f"<VirtualHost *:{port}>")
+ self._indents += 1
+ self.add(f"ServerName {domains[0]}")
+ for alias in domains[1:]:
+ self.add(f"ServerAlias {alias}")
+ self.add(f"DocumentRoot {doc_root}")
+ if self.env.https_port == port or with_ssl:
+ if self.env.ssl_module == "ssl":
+ self.add("SSLEngine on")
+ for cred in self.env.get_credentials_for_name(domains[0]):
+ self.add_certificate(cred.cert_file, cred.pkey_file)
+ if domains[0] in self._extras:
+ self.add(self._extras[domains[0]])
+ return self
def end_vhost(self):
+ self._indents -= 1
self.add("</VirtualHost>")
+ self.add("")
return self
def add_proxies(self, host, proxy_self=False, h2proxy_self=False):
if proxy_self or h2proxy_self:
- self.add(" ProxyPreserveHost on")
+ self.add("ProxyPreserveHost on")
if proxy_self:
- self.add(f"""
- ProxyPass /proxy/ http://127.0.0.1:{self.env.http_port}/
- ProxyPassReverse /proxy/ http://{host}.{self.env.http_tld}:{self.env.http_port}/
- """)
+ self.add([
+ f"ProxyPass /proxy/ http://127.0.0.1:{self.env.http_port}/",
+ f"ProxyPassReverse /proxy/ http://{host}.{self.env.http_tld}:{self.env.http_port}/",
+ ])
if h2proxy_self:
- self.add(f"""
- ProxyPass /h2proxy/ h2://127.0.0.1:{self.env.https_port}/
- ProxyPassReverse /h2proxy/ https://{host}.{self.env.http_tld}:self.env.https_port/
- """)
+ self.add([
+ f"ProxyPass /h2proxy/ h2://127.0.0.1:{self.env.https_port}/",
+ f"ProxyPassReverse /h2proxy/ https://{host}.{self.env.http_tld}:self.env.https_port/",
+ ])
return self
- def add_proxy_setup(self):
- self.add("ProxyStatus on")
- self.add("ProxyTimeout 5")
- self.add("SSLProxyEngine on")
- self.add("SSLProxyVerify none")
- return self
-
- def add_vhost_test1(self, proxy_self=False, h2proxy_self=False, extras=None):
+ def add_vhost_test1(self, proxy_self=False, h2proxy_self=False):
domain = f"test1.{self.env.http_tld}"
- if extras and 'base' in extras:
- self.add(extras['base'])
- self.start_vhost(
- self.env.http_port, "test1", aliases=["www1"], doc_root="htdocs/test1", with_ssl=False
- ).add(
- " Protocols h2c http/1.1"
- ).end_vhost()
- self.start_vhost(
- self.env.https_port, "test1", aliases=["www1"], doc_root="htdocs/test1", with_ssl=True)
- self.add(f"""
- Protocols h2 http/1.1
- <Location /006>
- Options +Indexes
- HeaderName /006/header.html
- </Location>
- {extras[domain] if extras and domain in extras else ""}
- """)
+ self.start_vhost(domains=[domain, f"www1.{self.env.http_tld}"],
+ port=self.env.http_port, doc_root="htdocs/test1")
+ self.end_vhost()
+ self.start_vhost(domains=[domain, f"www1.{self.env.http_tld}"],
+ port=self.env.https_port, doc_root="htdocs/test1")
+ self.add([
+ "<Location /006>",
+ " Options +Indexes",
+ " HeaderName /006/header.html",
+ "</Location>",
+ ])
self.add_proxies("test1", proxy_self, h2proxy_self)
self.end_vhost()
return self
- def add_vhost_test2(self, extras=None):
+ def add_vhost_test2(self):
domain = f"test2.{self.env.http_tld}"
- if extras and 'base' in extras:
- self.add(extras['base'])
- self.start_vhost(self.env.http_port, "test2", aliases=["www2"], doc_root="htdocs/test2", with_ssl=False)
- self.add(" Protocols http/1.1 h2c")
+ self.start_vhost(domains=[domain, f"www2.{self.env.http_tld}"],
+ port=self.env.http_port, doc_root="htdocs/test2")
self.end_vhost()
- self.start_vhost(self.env.https_port, "test2", aliases=["www2"], doc_root="htdocs/test2", with_ssl=True)
- self.add(f"""
- Protocols http/1.1 h2
- <Location /006>
- Options +Indexes
- HeaderName /006/header.html
- </Location>
- {extras[domain] if extras and domain in extras else ""}
- """)
+ self.start_vhost(domains=[domain, f"www2.{self.env.http_tld}"],
+ port=self.env.https_port, doc_root="htdocs/test2")
+ self.add([
+ "<Location /006>",
+ " Options +Indexes",
+ " HeaderName /006/header.html",
+ "</Location>",
+ ])
self.end_vhost()
return self
- def add_vhost_cgi(self, proxy_self=False, h2proxy_self=False, extras=None):
+ def add_vhost_cgi(self, proxy_self=False, h2proxy_self=False):
domain = f"cgi.{self.env.http_tld}"
- if extras and 'base' in extras:
- self.add(extras['base'])
if proxy_self:
- self.add_proxy_setup()
+ self.add(["ProxyStatus on", "ProxyTimeout 5",
+ "SSLProxyEngine on", "SSLProxyVerify none"])
if h2proxy_self:
- self.add(" SSLProxyEngine on")
- self.add(" SSLProxyCheckPeerName off")
- self.start_vhost(self.env.https_port, "cgi", aliases=["cgi-alias"], doc_root="htdocs/cgi", with_ssl=True)
- self.add("""
- Protocols h2 http/1.1
- SSLOptions +StdEnvVars
- AddHandler cgi-script .py
- <Location \"/.well-known/h2/state\">
- SetHandler http2-status
- </Location>""")
+ self.add(["SSLProxyEngine on", "SSLProxyCheckPeerName off"])
+ self.start_vhost(domains=[domain, f"cgi-alias.{self.env.http_tld}"],
+ port=self.env.https_port, doc_root="htdocs/cgi")
self.add_proxies("cgi", proxy_self=proxy_self, h2proxy_self=h2proxy_self)
- self.add(" <Location \"/h2test/echo\">")
- self.add(" SetHandler h2test-echo")
- self.add(" </Location>")
- self.add(" <Location \"/h2test/delay\">")
- self.add(" SetHandler h2test-delay")
- self.add(" </Location>")
- if extras and domain in extras:
- self.add(extras[domain])
+ self.add("<Location \"/h2test/echo\">")
+ self.add(" SetHandler h2test-echo")
+ self.add("</Location>")
+ self.add("<Location \"/h2test/delay\">")
+ self.add(" SetHandler h2test-delay")
+ self.add("</Location>")
+ if domain in self._extras:
+ self.add(self._extras[domain])
self.end_vhost()
- self.start_vhost(self.env.http_port, "cgi", aliases=["cgi-alias"], doc_root="htdocs/cgi", with_ssl=False)
- self.add(" AddHandler cgi-script .py")
+ self.start_vhost(domains=[domain, f"cgi-alias.{self.env.http_tld}"],
+ port=self.env.http_port, doc_root="htdocs/cgi")
+ self.add("AddHandler cgi-script .py")
self.add_proxies("cgi", proxy_self=proxy_self, h2proxy_self=h2proxy_self)
- if extras and domain in extras:
- self.add(extras[domain])
self.end_vhost()
- self.add(" LogLevel proxy:info")
- self.add(" LogLevel proxy_http:info")
+ self.add("LogLevel proxy:info")
+ self.add("LogLevel proxy_http:info")
return self
+
+ @staticmethod
+ def merge_extras(e1: Dict[str, Any], e2: Dict[str, Any]) -> Dict[str, Any]:
+ def _concat(v1, v2):
+ if isinstance(v1, str):
+ v1 = [v1]
+ if isinstance(v2, str):
+ v2 = [v2]
+ v1.extend(v2)
+ return v1
+
+ if e1 is None:
+ return e2.copy() if e2 else None
+ if e2 is None:
+ return e1.copy()
+ e3 = e1.copy()
+ for name, val in e2.items():
+ if name in e3:
+ e3[name] = _concat(e3[name], val)
+ else:
+ e3[name] = val
+ return e3
Listen ${http_port}
Listen ${https_port}
+<IfModule mod_ssl.c>
+ # provide some default
+ SSLSessionCache "shmcb:ssl_gcache_data(32000)"
+</IfModule>
+
# Insert our test specific configuration before the first vhost,
# so that its vhosts can be the default one. This is relevant in
# certain behaviours, such as protocol selection during SSL ALPN
</IfModule>
<VirtualHost *:${http_port}>
- ServerName not-forbidden.org
- ServerAlias www.not-forbidden.org
-
+ ServerName ${http_tld}
+ ServerAlias www.${http_tld}
+ <IfModule ssl_module>
+ SSLEngine off
+ </IfModule>
DocumentRoot "${server_dir}/htdocs"
</VirtualHost>
apr_bindir = @APR_BINDIR@
apxs = @bindir@/apxs
apachectl = @sbindir@/apachectl
-dso_modules = @DSO_MODULES@
[httpd]
version = @HTTPD_VERSION@
name = @progname@
+dso_modules = @DSO_MODULES@
+static_modules = @STATIC_MODULES@
[test]
-http_port = 40001
-https_port = 40002
-http_tld = tests.httpd.apache.org
-test_dir = @abs_srcdir@/..
-server_dir = @abs_srcdir@/../gen/apache
gen_dir = @abs_srcdir@/../gen
+http_port = 5002
+https_port = 5001
+proxy_port = 5003
+http_tld = tests.httpd.apache.org
+test_dir = @abs_srcdir@
+test_src_dir = @abs_srcdir@
import time
from datetime import datetime, timedelta
from string import Template
-from typing import List
-
-import requests
+from typing import List, Optional
from configparser import ConfigParser, ExtendedInterpolation
from urllib.parse import urlparse
from .certs import Credentials, HttpdTestCA, CertificateSpec
+from .log import HttpdErrorLog
from .nghttp import Nghttp
from .result import ExecResult
"headers",
"setenvif",
"slotmem_shm",
- "ssl",
"status",
"autoindex",
"cgid",
mod_names = modules.copy() if modules else self.MODULES.copy()
if add_modules:
mod_names.extend(add_modules)
+ if self.env.mpm_module is not None and self.env.mpm_module not in mod_names:
+ mod_names.append(self.env.mpm_module)
+ if self.env.ssl_module is not None and self.env.ssl_module not in mod_names:
+ mod_names.append(self.env.ssl_module)
self._make_modules_conf(modules=mod_names)
self._make_htdocs()
+ self.env.clear_curl_headerfiles()
def _make_dirs(self):
if os.path.exists(self.env.gen_dir):
modules_conf = os.path.join(self.env.server_dir, 'conf/modules.conf')
with open(modules_conf, 'w') as fd:
# issue load directives for all modules we want that are shared
+ missing_mods = list()
for m in modules:
mod_path = os.path.join(self.env.libexec_dir, f"mod_{m}.so")
if os.path.isfile(mod_path):
fd.write(f"LoadModule {m}_module \"{mod_path}\"\n")
+ elif m in self.env.static_modules:
+ fd.write(f"#built static: LoadModule {m}_module \"{mod_path}\"\n")
+ else:
+ missing_mods.append(m)
+ if len(missing_mods) > 0:
+ raise Exception(f"Unable to find modules: {missing_mods} "
+ f"DSOs: {self.env.dso_modules}")
def _make_htdocs(self):
our_dir = os.path.dirname(inspect.getfile(Dummy))
class HttpdTestEnv:
+ @classmethod
+ def get_ssl_module(cls):
+ return os.environ['SSL'] if 'SSL' in os.environ else 'ssl'
+
def __init__(self, pytestconfig=None,
- local_dir=None, add_base_conf: str = None,
+ local_dir=None, add_base_conf: List[str] = None,
interesting_modules: List[str] = None):
self._our_dir = os.path.dirname(inspect.getfile(Dummy))
self._local_dir = local_dir if local_dir else self._our_dir
self.config = ConfigParser(interpolation=ExtendedInterpolation())
self.config.read(os.path.join(self._our_dir, 'config.ini'))
+ self._bin_dir = self.config.get('global', 'bindir')
self._apxs = self.config.get('global', 'apxs')
self._prefix = self.config.get('global', 'prefix')
self._apachectl = self.config.get('global', 'apachectl')
self._http_port = int(self.config.get('test', 'http_port'))
self._https_port = int(self.config.get('test', 'https_port'))
+ self._proxy_port = int(self.config.get('test', 'proxy_port'))
self._http_tld = self.config.get('test', 'http_tld')
self._test_dir = self.config.get('test', 'test_dir')
self._gen_dir = self.config.get('test', 'gen_dir')
self._server_docs_dir = os.path.join(self._server_dir, "htdocs")
self._server_logs_dir = os.path.join(self.server_dir, "logs")
self._server_access_log = os.path.join(self._server_logs_dir, "access_log")
- self._server_error_log = os.path.join(self._server_logs_dir, "error_log")
+ self._error_log = HttpdErrorLog(os.path.join(self._server_logs_dir, "error_log"))
+ self._apachectl_stderr = None
- self._dso_modules = self.config.get('global', 'dso_modules').split(' ')
- self._mpm_type = os.environ['MPM'] if 'MPM' in os.environ else 'event'
+ self._dso_modules = self.config.get('httpd', 'dso_modules').split(' ')
+ self._static_modules = self.config.get('httpd', 'static_modules').split(' ')
+ self._mpm_module = f"mpm_{os.environ['MPM']}" if 'MPM' in os.environ else 'mpm_event'
+ self._ssl_module = self.get_ssl_module()
+ if len(self._ssl_module.strip()) == 0:
+ self._ssl_module = None
self._httpd_addr = "127.0.0.1"
self._http_base = f"http://{self._httpd_addr}:{self.http_port}"
self._https_base = f"https://{self._httpd_addr}:{self.https_port}"
self._test_conf = os.path.join(self._server_conf_dir, "test.conf")
- self._httpd_base_conf = f"""
- LoadModule mpm_{self.mpm_type}_module \"{self.libexec_dir}/mod_mpm_{self.mpm_type}.so\"
- <IfModule mod_ssl.c>
- SSLSessionCache "shmcb:ssl_gcache_data(32000)"
- </IfModule>
- """
+ self._httpd_base_conf = []
if add_base_conf:
- self._httpd_base_conf += f"\n{add_base_conf}"
+ self._httpd_base_conf.extend(add_base_conf)
self._verbosity = pytestconfig.option.verbose if pytestconfig is not None else 0
if self._verbosity >= 2:
log_level = "trace2"
- self._httpd_base_conf += f"""
- LogLevel core:trace5 mpm_{self.mpm_type}:trace5
- """
+ self._httpd_base_conf .append(f"LogLevel core:trace5 {self.mpm_module}:trace5")
elif self._verbosity >= 1:
log_level = "debug"
else:
log_level = "info"
if interesting_modules:
- self._httpd_base_conf += "\nLogLevel"
+ l = "LogLevel"
for name in interesting_modules:
- self._httpd_base_conf += f" {name}:{log_level}"
- self._httpd_base_conf += "\n"
+ l += f" {name}:{log_level}"
+ self._httpd_base_conf.append(l)
self._ca = None
self._cert_specs = [CertificateSpec(domains=[
], key_type='rsa4096')]
self._verify_certs = False
+ self._curl_headerfiles_n = 0
@property
def apxs(self) -> str:
return self._prefix
@property
- def mpm_type(self) -> str:
- return self._mpm_type
+ def mpm_module(self) -> str:
+ return self._mpm_module
+
+ @property
+ def ssl_module(self) -> str:
+ return self._ssl_module
+
+ @property
+ def http_addr(self) -> str:
+ return self._httpd_addr
@property
def http_port(self) -> int:
def https_port(self) -> int:
return self._https_port
+ @property
+ def proxy_port(self) -> int:
+ return self._proxy_port
+
@property
def http_tld(self) -> str:
return self._http_tld
def https_base_url(self) -> str:
return self._https_base
+ @property
+ def bin_dir(self) -> str:
+ return self._bin_dir
+
@property
def gen_dir(self) -> str:
return self._gen_dir
def dso_modules(self) -> List[str]:
return self._dso_modules
+ @property
+ def static_modules(self) -> List[str]:
+ return self._static_modules
+
@property
def server_conf_dir(self) -> str:
return self._server_conf_dir
return self._server_docs_dir
@property
- def httpd_base_conf(self) -> str:
+ def httpd_base_conf(self) -> List[str]:
return self._httpd_base_conf
+ @property
+ def httpd_error_log(self) -> HttpdErrorLog:
+ return self._error_log
+
def local_src(self, path):
return os.path.join(self.local_dir, path)
def ca(self) -> Credentials:
return self._ca
+ @property
+ def apachectl_stderr(self):
+ return self._apachectl_stderr
+
def add_cert_specs(self, specs: List[CertificateSpec]):
self._cert_specs.extend(specs)
def issue_certs(self):
if self._ca is None:
self._ca = HttpdTestCA.create_root(name=self.http_tld,
- store_dir=os.path.join(self.server_dir, 'ca'), key_type="rsa4096")
+ store_dir=os.path.join(self.server_dir, 'ca'),
+ key_type="rsa4096")
self._ca.issue_certs(self._cert_specs)
def get_credentials_for_name(self, dns_name) -> List['Credentials']:
return self.ca.get_credentials_for_name(spec.domains[0])
return []
+ def _versiontuple(self, v):
+ return tuple(map(int, v.split('.')))
+
+ def httpd_is_at_least(self, minv):
+ hv = self._versiontuple(self.get_httpd_version())
+ return hv >= self._versiontuple(minv)
+
def has_h2load(self):
return self._h2load != ""
if not os.path.exists(path):
return os.makedirs(path)
- def run(self, args) -> ExecResult:
- log.debug("execute: %s", " ".join(args))
+ def run(self, args, input=None, debug_log=True):
+ if debug_log:
+ log.debug(f"run: {args}")
start = datetime.now()
- p = subprocess.run(args, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
- return ExecResult(exit_code=p.returncode, stdout=p.stdout, stderr=p.stderr,
+ p = subprocess.run(args, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
+ input=input.encode() if input else None)
+ return ExecResult(args=args, exit_code=p.returncode,
+ stdout=p.stdout, stderr=p.stderr,
duration=datetime.now() - start)
def mkurl(self, scheme, hostname, path='/'):
port = self.https_port if scheme == 'https' else self.http_port
- return "%s://%s.%s:%s%s" % (scheme, hostname, self.http_tld, port, path)
+ return f"{scheme}://{hostname}.{self.http_tld}:{port}{path}"
- def install_test_conf(self, conf: List[str]):
+ def install_test_conf(self, lines: List[str]):
with open(self._test_conf, 'w') as fd:
- fd.write(f"{self.httpd_base_conf}\n")
- for line in conf:
- fd.write(f"{line}\n")
-
- def is_live(self, url, timeout: timedelta = None):
- s = requests.Session()
- if not timeout:
- timeout = timedelta(seconds=10)
+ fd.write('\n'.join(self._httpd_base_conf))
+ fd.write('\n')
+ fd.write('\n'.join(lines))
+ fd.write('\n')
+
+ def is_live(self, url: str = None, timeout: timedelta = None):
+ if url is None:
+ url = self._http_base
+ if timeout is None:
+ timeout = timedelta(seconds=5)
try_until = datetime.now() + timeout
- log.debug("checking reachability of %s", url)
+ last_err = ""
while datetime.now() < try_until:
+ # noinspection PyBroadException
try:
- req = requests.Request('HEAD', url).prepare()
- s.send(req, verify=self._verify_certs, timeout=timeout.total_seconds())
- return True
- except IOError:
- log.debug("connect error: %s", sys.exc_info()[0])
- time.sleep(.2)
+ r = self.curl_get(url, insecure=True, debug_log=False)
+ if r.exit_code == 0:
+ return True
+ time.sleep(.1)
+ except ConnectionRefusedError:
+ log.debug("connection refused")
+ time.sleep(.1)
except:
- log.warning("Unexpected error: %s", sys.exc_info()[0])
- time.sleep(.2)
- log.debug(f"Unable to contact '{url}' after {timeout} sec")
+ if last_err != str(sys.exc_info()[0]):
+ last_err = str(sys.exc_info()[0])
+ log.debug("Unexpected error: %s", last_err)
+ time.sleep(.1)
+ log.debug(f"Unable to contact server after {timeout}")
return False
- def is_dead(self, url, timeout: timedelta = None):
- s = requests.Session()
- if not timeout:
- timeout = timedelta(seconds=10)
+ def is_dead(self, url: str = None, timeout: timedelta = None):
+ if url is None:
+ url = self._http_base
+ if timeout is None:
+ timeout = timedelta(seconds=5)
try_until = datetime.now() + timeout
- log.debug("checking reachability of %s", url)
+ last_err = None
while datetime.now() < try_until:
+ # noinspection PyBroadException
try:
- req = requests.Request('HEAD', url).prepare()
- s.send(req, verify=self._verify_certs, timeout=int(timeout.total_seconds()))
- time.sleep(.2)
- except IOError:
+ r = self.curl_get(url, debug_log=False)
+ if r.exit_code != 0:
+ return True
+ time.sleep(.1)
+ except ConnectionRefusedError:
+ log.debug("connection refused")
return True
- log.debug("Server still responding after %d sec", timeout)
+ except:
+ if last_err != str(sys.exc_info()[0]):
+ last_err = str(sys.exc_info()[0])
+ log.debug("Unexpected error: %s", last_err)
+ time.sleep(.1)
+ log.debug(f"Server still responding after {timeout}")
return False
- def _run_apachectl(self, cmd):
+ def _run_apachectl(self, cmd) -> ExecResult:
args = [self._apachectl,
"-d", self.server_dir,
"-f", os.path.join(self._server_dir, 'conf/httpd.conf'),
"-k", cmd]
- log.debug("execute: %s", " ".join(args))
- p = subprocess.run(args, capture_output=True, text=True)
- rv = p.returncode
- if rv != 0:
- log.warning(f"exit {rv}, stdout: {p.stdout}, stderr: {p.stderr}")
- return rv
+ r = self.run(args)
+ self._apachectl_stderr = r.stderr
+ if r.exit_code != 0:
+ log.warning(f"failed: {r}")
+ return r
def apache_reload(self):
- rv = self._run_apachectl("graceful")
- if rv == 0:
+ r = self._run_apachectl("graceful")
+ if r.exit_code == 0:
timeout = timedelta(seconds=10)
- rv = 0 if self.is_live(self._http_base, timeout=timeout) else -1
- return rv
+ return 0 if self.is_live(self._http_base, timeout=timeout) else -1
+ return r.exit_code
def apache_restart(self):
self.apache_stop()
- rv = self._run_apachectl("start")
- if rv == 0:
+ r = self._run_apachectl("start")
+ if r.exit_code == 0:
timeout = timedelta(seconds=10)
- rv = 0 if self.is_live(self._http_base, timeout=timeout) else -1
- return rv
+ return 0 if self.is_live(self._http_base, timeout=timeout) else -1
+ return r.exit_code
def apache_stop(self):
- rv = self._run_apachectl("stop")
- if rv == 0:
+ r = self._run_apachectl("stop")
+ if r.exit_code == 0:
timeout = timedelta(seconds=10)
- rv = 0 if self.is_dead(self._http_base, timeout=timeout) else -1
- log.debug("waited for a apache.is_dead, rv=%d", rv)
+ return 0 if self.is_dead(self._http_base, timeout=timeout) else -1
+ return r
+
+ def apache_graceful_stop(self):
+ log.debug("stop apache")
+ self._run_apachectl("graceful-stop")
+ return 0 if self.is_dead() else -1
+
+ def apache_fail(self):
+ log.debug("expect apache fail")
+ self._run_apachectl("stop")
+ rv = self._run_apachectl("start")
+ if rv == 0:
+ rv = 0 if self.is_dead() else -1
+ else:
+ rv = 0
return rv
def apache_access_log_clear(self):
if os.path.isfile(self._server_access_log):
os.remove(self._server_access_log)
- def apache_error_log_clear(self):
- if os.path.isfile(self._server_error_log):
- os.remove(self._server_error_log)
-
- RE_APLOGNO = re.compile(r'.*\[(?P<module>[^:]+):(error|warn)].* (?P<aplogno>AH\d+): .+')
- RE_SSL_LIB_ERR = re.compile(r'.*\[ssl:error].* SSL Library Error: error:(?P<errno>\S+):.+')
- RE_ERRLOG_ERROR = re.compile(r'.*\[(?P<module>[^:]+):error].*')
- RE_ERRLOG_WARN = re.compile(r'.*\[(?P<module>[^:]+):warn].*')
-
- def apache_errors_and_warnings(self):
- errors = []
- warnings = []
-
- if os.path.isfile(self._server_error_log):
- for line in open(self._server_error_log):
- m = self.RE_APLOGNO.match(line)
- if m and m.group('aplogno') in [
- 'AH02032',
- 'AH01276',
- 'AH01630',
- 'AH00135',
- 'AH02261', # Re-negotiation handshake failed (our test_101
- ]:
- # we know these happen normally in our tests
- continue
- m = self.RE_SSL_LIB_ERR.match(line)
- if m and m.group('errno') in [
- '1417A0C1', # cipher suite mismatch, test_101
- '1417C0C7', # client cert not accepted, test_101
- ]:
- # we know these happen normally in our tests
- continue
- m = self.RE_ERRLOG_ERROR.match(line)
- if m and m.group('module') not in ['cgid']:
- errors.append(line)
- continue
- m = self.RE_ERRLOG_WARN.match(line)
- if m:
- warnings.append(line)
- continue
- return errors, warnings
-
- def curl_complete_args(self, urls, timeout, options):
+ def get_ca_pem_file(self, hostname: str) -> Optional[str]:
+ if len(self.get_credentials_for_name(hostname)) > 0:
+ return self.ca.cert_file
+ return None
+
+ def clear_curl_headerfiles(self):
+ for fname in os.listdir(path=self.gen_dir):
+ if re.match(r'curl\.headers\.\d+', fname):
+ os.remove(os.path.join(self.gen_dir, fname))
+ self._curl_headerfiles_n = 0
+
+ def curl_complete_args(self, urls, timeout=None, options=None,
+ insecure=False, force_resolve=True):
if not isinstance(urls, list):
urls = [urls]
u = urlparse(urls[0])
assert u.hostname, f"hostname not in url: {urls[0]}"
- assert u.port, f"port not in url: {urls[0]}"
- headerfile = ("%s/curl.headers" % self.gen_dir)
- if os.path.isfile(headerfile):
- os.remove(headerfile)
-
- args = [
- self._curl,
- "--cacert", self.ca.cert_file,
- "-s", "-D", headerfile,
- "--resolve", ("%s:%s:%s" % (u.hostname, u.port, self._httpd_addr)),
- "--connect-timeout", ("%d" % timeout),
- "--path-as-is"
+ headerfile = f"{self.gen_dir}/curl.headers.{self._curl_headerfiles_n}"
+ self._curl_headerfiles_n += 1
+
+ args = [
+ self._curl, "-s", "--path-as-is", "-D", headerfile,
]
+ if u.scheme == 'http':
+ pass
+ elif insecure:
+ args.append('--insecure')
+ elif options and "--cacert" in options:
+ pass
+ else:
+ ca_pem = self.get_ca_pem_file(u.hostname)
+ if ca_pem:
+ args.extend(["--cacert", ca_pem])
+
+ if force_resolve and u.hostname != 'localhost' \
+ and u.hostname != self._httpd_addr \
+ and not re.match(r'^(\d+|\[|:).*', u.hostname):
+ assert u.port, f"port not in url: {urls[0]}"
+ args.extend(["--resolve", f"{u.hostname}:{u.port}:{self._httpd_addr}"])
+ if timeout is not None and int(timeout) > 0:
+ args.extend(["--connect-timeout", str(int(timeout))])
if options:
args.extend(options)
args += urls
lines = open(headerfile).readlines()
exp_stat = True
if r is None:
- r = ExecResult(exit_code=0, stdout=b'', stderr=b'')
+ r = ExecResult(args=[], exit_code=0, stdout=b'', stderr=b'')
header = {}
for line in lines:
if exp_stat:
m = re.match(r'^([^:]+):\s*(.*)$', line)
assert m
header[m.group(1).lower()] = m.group(2)
- r.response["header"] = header
+ if r.response:
+ r.response["header"] = header
return r
- def curl_raw(self, urls, timeout, options):
+ def curl_raw(self, urls, timeout=10, options=None, insecure=False,
+ debug_log=True, force_resolve=True):
xopt = ['-vvvv']
if options:
xopt.extend(options)
- args, headerfile = self.curl_complete_args(urls, timeout, xopt)
+ args, headerfile = self.curl_complete_args(
+ urls=urls, timeout=timeout, options=options, insecure=insecure,
+ force_resolve=force_resolve)
r = self.run(args)
if r.exit_code == 0:
self.curl_parse_headerfile(headerfile, r=r)
if r.json:
r.response["json"] = r.json
+ os.remove(headerfile)
return r
- def curl_get(self, url, timeout=5, options=None):
- return self.curl_raw([url], timeout=timeout, options=options)
+ def curl_get(self, url, insecure=False, debug_log=True, options=None):
+ return self.curl_raw([url], insecure=insecure,
+ options=options, debug_log=debug_log)
def curl_upload(self, url, fpath, timeout=5, options=None):
if not options:
options.extend([
"--form", ("file=@%s" % fpath)
])
- return self.curl_raw([url], timeout, options)
+ return self.curl_raw(urls=[url], timeout=timeout, options=options)
def curl_post_data(self, url, data="", timeout=5, options=None):
if not options:
--- /dev/null
+import os
+import re
+import time
+from datetime import datetime, timedelta
+from io import SEEK_END
+from typing import List, Tuple, Any
+
+
+class HttpdErrorLog:
+ """Checking the httpd error log for errors and warnings, including
+ limiting checks from a last known position forward.
+ """
+
+ RE_ERRLOG_ERROR = re.compile(r'.*\[(?P<module>[^:]+):error].*')
+ RE_ERRLOG_WARN = re.compile(r'.*\[(?P<module>[^:]+):warn].*')
+ RE_APLOGNO = re.compile(r'.*\[(?P<module>[^:]+):(error|warn)].* (?P<aplogno>AH\d+): .+')
+ RE_SSL_LIB_ERR = re.compile(r'.*\[ssl:error].* SSL Library Error: error:(?P<errno>\S+):.+')
+
+ def __init__(self, path: str):
+ self._path = path
+ self._ignored_modules = []
+ self._ignored_lognos = set()
+ self._ignored_patterns = []
+ # remember the file position we started with
+ self._start_pos = 0
+ if os.path.isfile(self._path):
+ with open(self._path) as fd:
+ self._start_pos = fd.seek(0, SEEK_END)
+ self._last_pos = self._start_pos
+ self._last_errors = []
+ self._last_warnings = []
+ self._observed_erros = set()
+ self._observed_warnings = set()
+
+ def __repr__(self):
+ return f"HttpdErrorLog[{self._path}, errors: {' '.join(self._last_errors)}, " \
+ f"warnings: {' '.join(self._last_warnings)}]"
+
+ @property
+ def path(self) -> str:
+ return self._path
+
+ def clear_log(self):
+ if os.path.isfile(self.path):
+ os.remove(self.path)
+ self._start_pos = 0
+ self._last_pos = self._start_pos
+ self._last_errors = []
+ self._last_warnings = []
+ self._observed_erros = set()
+ self._observed_warnings = set()
+
+ def set_ignored_modules(self, modules: List[str]):
+ self._ignored_modules = modules.copy() if modules else []
+
+ def set_ignored_lognos(self, lognos: List[str]):
+ if lognos:
+ for l in lognos:
+ self._ignored_lognos.add(l)
+
+ def add_ignored_patterns(self, patterns: List[Any]):
+ self._ignored_patterns.extend(patterns)
+
+ def _is_ignored(self, line: str) -> bool:
+ for p in self._ignored_patterns:
+ if p.match(line):
+ return True
+ m = self.RE_APLOGNO.match(line)
+ if m and m.group('aplogno') in self._ignored_lognos:
+ return True
+ return False
+
+ def get_recent(self, advance=True) -> Tuple[List[str], List[str]]:
+ """Collect error and warning from the log since the last remembered position
+ :param advance: advance the position to the end of the log afterwards
+ :return: list of error and list of warnings as tuple
+ """
+ self._last_errors = []
+ self._last_warnings = []
+ if os.path.isfile(self._path):
+ with open(self._path) as fd:
+ fd.seek(self._last_pos, os.SEEK_SET)
+ for line in fd:
+ if self._is_ignored(line):
+ continue
+ m = self.RE_ERRLOG_ERROR.match(line)
+ if m and m.group('module') not in self._ignored_modules:
+ self._last_errors.append(line)
+ continue
+ m = self.RE_ERRLOG_WARN.match(line)
+ if m:
+ if m and m.group('module') not in self._ignored_modules:
+ self._last_warnings.append(line)
+ continue
+ if advance:
+ self._last_pos = fd.tell()
+ self._observed_erros.update(set(self._last_errors))
+ self._observed_warnings.update(set(self._last_warnings))
+ return self._last_errors, self._last_warnings
+
+ def get_recent_count(self, advance=True):
+ errors, warnings = self.get_recent(advance=advance)
+ return len(errors), len(warnings)
+
+ def ignore_recent(self):
+ """After a test case triggered errors/warnings on purpose, add
+ those to our 'observed' list so the do not get reported as 'missed'.
+ """
+ self._last_errors = []
+ self._last_warnings = []
+ if os.path.isfile(self._path):
+ with open(self._path) as fd:
+ fd.seek(self._last_pos, os.SEEK_SET)
+ for line in fd:
+ if self._is_ignored(line):
+ continue
+ m = self.RE_ERRLOG_ERROR.match(line)
+ if m and m.group('module') not in self._ignored_modules:
+ self._observed_erros.add(line)
+ continue
+ m = self.RE_ERRLOG_WARN.match(line)
+ if m:
+ if m and m.group('module') not in self._ignored_modules:
+ self._observed_warnings.add(line)
+ continue
+ self._last_pos = fd.tell()
+
+ def get_missed(self) -> Tuple[List[str], List[str]]:
+ errors = []
+ warnings = []
+ if os.path.isfile(self._path):
+ with open(self._path) as fd:
+ fd.seek(self._start_pos, os.SEEK_SET)
+ for line in fd:
+ if self._is_ignored(line):
+ continue
+ m = self.RE_ERRLOG_ERROR.match(line)
+ if m and m.group('module') not in self._ignored_modules \
+ and line not in self._observed_erros:
+ errors.append(line)
+ continue
+ m = self.RE_ERRLOG_WARN.match(line)
+ if m:
+ if m and m.group('module') not in self._ignored_modules \
+ and line not in self._observed_warnings:
+ warnings.append(line)
+ continue
+ return errors, warnings
+
+ def scan_recent(self, pattern: re, timeout=10):
+ if not os.path.isfile(self.path):
+ return False
+ with open(self.path) as fd:
+ end = datetime.now() + timedelta(seconds=timeout)
+ while True:
+ fd.seek(self._last_pos, os.SEEK_SET)
+ for line in fd:
+ if pattern.match(line):
+ return True
+ if datetime.now() > end:
+ raise TimeoutError(f"pattern not found in error log after {timeout} seconds")
+ time.sleep(.1)
+ return False
if len(l) == 0:
body += '\n'
continue
- m = re.match(r'\[(.*)] recv \(stream_id=(\d+)\) (\S+): (\S*)', l)
+ m = re.match(r'\[.*] recv \(stream_id=(\d+)\) (\S+): (\S*)', l)
if m:
- s = self.get_stream(streams, m.group(2))
- hname = m.group(3)
- hval = m.group(4)
- print(f"{m.group(1)}: stream {s['id']} header {hname}: {hval}")
+ s = self.get_stream(streams, m.group(1))
+ hname = m.group(2)
+ hval = m.group(3)
+ print("stream %d header %s: %s" % (s["id"], hname, hval))
header = s["header"]
if hname in header:
header[hname] += ", %s" % hval
body = ''
continue
- m = re.match(r'\[(.*)] recv HEADERS frame <.* stream_id=(\d+)>', l)
+ m = re.match(r'\[.*] recv HEADERS frame <.* stream_id=(\d+)>', l)
if m:
- s = self.get_stream(streams, m.group(2))
+ s = self.get_stream(streams, m.group(1))
if s:
- print(f"{m.group(1)}: recv HEADERS on stream {s['id']} with {len(s['header'])} fields")
+ print("stream %d: recv %d header" % (s["id"], len(s["header"])))
response = s["response"]
hkey = "header"
if "header" in response:
body = ''
continue
- m = re.match(r'(.*)\[(.*)] recv DATA frame <length=(\d+), .*stream_id=(\d+)>', l)
+ m = re.match(r'(.*)\[.*] recv DATA frame <length=(\d+), .*stream_id=(\d+)>', l)
if m:
- s = self.get_stream(streams, m.group(4))
+ s = self.get_stream(streams, m.group(3))
body += m.group(1)
- blen = int(m.group(3))
+ blen = int(m.group(2))
if s:
- print(f"{m.group(2)}: recv DATA on stream {s['id']} with {blen} bytes")
+ print("stream %d: %d DATA bytes added" % (s["id"], blen))
padlen = 0
if len(lines) > lidx + 2:
mpad = re.match(r' +\(padlen=(\d+)\)', lines[lidx+2])
skip_indents = True
continue
- m = re.match(r'\[(.*)] recv PUSH_PROMISE frame <.* stream_id=(\d+)>', l)
+ m = re.match(r'\[.*] recv PUSH_PROMISE frame <.* stream_id=(\d+)>', l)
if m:
- s = self.get_stream(streams, m.group(2))
+ s = self.get_stream(streams, m.group(1))
if s:
# headers we have are request headers for the PUSHed stream
# these have been received on the originating stream, the promised
# stream id it mentioned in the following lines
- print(f"{m.group(1)}: recv PUSH_PROMISE on stream {s['id']} with {len(s['header'])} header")
+ print("stream %d: %d PUSH_PROMISE header" % (s["id"], len(s["header"])))
if len(lines) > lidx+2:
m2 = re.match(r'\s+\(.*promised_stream_id=(\d+)\)', lines[lidx+2])
if m2:
s["header"] = {}
continue
- m = re.match(r'(.*)\[(.*)] recv (\S+) frame <length=(\d+), .*stream_id=(\d+)>', l)
+ m = re.match(r'(.*)\[.*] recv (\S+) frame <length=(\d+), .*stream_id=(\d+)>', l)
if m:
- print(f"{m.group(2)}: recv frame {m.group(3)} on stream {m.group(5)}")
+ print("recv frame %s on stream %s" % (m.group(2), m.group(4)))
body += m.group(1)
skip_indents = True
continue
- m = re.match(r'(.*)\[(.*)] send (\S+) frame <length=(\d+), .*stream_id=(\d+)>', l)
+ m = re.match(r'(.*)\[.*] send (\S+) frame <length=(\d+), .*stream_id=(\d+)>', l)
if m:
- print(f"{m.group(2)}: send frame {m.group(3)} on stream {m.group(5)}")
+ print("send frame %s on stream %s" % (m.group(2), m.group(4)))
body += m.group(1)
skip_indents = True
continue
print(("execute: %s" % " ".join(args)))
start = datetime.now()
p = subprocess.run(args, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
- return ExecResult(exit_code=p.returncode, stdout=p.stdout, stderr=p.stderr,
+ return ExecResult(args=args, exit_code=p.returncode,
+ stdout=p.stdout, stderr=p.stderr,
duration=datetime.now() - start)
class ExecResult:
- def __init__(self, exit_code: int, stdout: bytes, stderr: bytes = None, duration: timedelta = None):
+ def __init__(self, args: List[str], exit_code: int,
+ stdout: bytes, stderr: bytes = None, duration: timedelta = None):
+ self._args = args
self._exit_code = exit_code
self._raw = stdout if stdout else b''
self._stdout = stdout.decode() if stdout is not None else ""
except:
self._json_out = None
+ def __repr__(self):
+ return f"ExecResult[code={self.exit_code}, args={self._args}, stdout={self.stdout}, stderr={self.stderr}]"
+
@property
def exit_code(self) -> int:
return self._exit_code
+ @property
+ def args(self) -> List[str]:
+ return self._args
+
@property
def outraw(self) -> bytes:
return self._raw