]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
* testsuite: possible now to issue client certificates and the chain file for them
authorStefan Eissing <icing@apache.org>
Wed, 10 Nov 2021 15:54:27 +0000 (15:54 +0000)
committerStefan Eissing <icing@apache.org>
Wed, 10 Nov 2021 15:54:27 +0000 (15:54 +0000)
 * testsuite: handling of cert+key in same file improved
 * testsuite: using 'stop' configuration to terminate server in case test cases
   leave borked test configs lying around.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1894919 13f79535-47bb-0310-9956-ffa450edef68

test/modules/http2/env.py
test/modules/md/conftest.py
test/modules/md/test_310_conf_store.py
test/pyhttpd/certs.py
test/pyhttpd/conf.py
test/pyhttpd/conf/stop.conf.template [new file with mode: 0644]
test/pyhttpd/env.py

index d6d96d50a3fba6c274fe94f48582c1fe5c87571c..dae91657764572ff94412b82a364a53c31574baf 100644 (file)
@@ -106,7 +106,7 @@ class H2Conf(HttpdConf):
             ]
         }))
 
-    def start_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=False):
+    def start_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=None):
         super().start_vhost(domains=domains, port=port, doc_root=doc_root, with_ssl=with_ssl)
         if f"noh2.{self.env.http_tld}" in domains:
             protos = ["http/1.1"]
index 63dab24c178135bdf397b3fa9f52fc8eaddda151..fc3206db108d15fa5f337959d51191d66e4acdb3 100755 (executable)
@@ -66,7 +66,6 @@ def _session_scope(env):
             re.compile(r'.*certificate with serial \S+ has no OCSP responder URL.*'),
         ])
     yield
-    HttpdConf(env).install()
     assert env.apache_stop() == 0
     errors, warnings = env.httpd_error_log.get_missed()
     assert (len(errors), len(warnings)) == (0, 0),\
index 4a791cbd2b86f140e57f3f1f441c5b70221a2227..69a086ebd92f21df01c5bf8f14a232cf182ff6e5 100644 (file)
@@ -250,7 +250,7 @@ class TestConf:
             """)
         conf.add_vhost(port=12346, domains=[
             "testdomain.org", "test.testdomain.org", "mail.testdomain.org",
-        ])
+        ], with_ssl=True)
         conf.install()
         assert env.apache_restart() == 0
         assert env.a2md(["list"]).json['output'][0]['domains'] == \
index 2ce93c07b22baef55b79427ae9550edd1b780cdc..5519f16188bf46866da6b45ac680450a0b55071e 100644 (file)
@@ -70,13 +70,24 @@ class CertificateSpec:
             return self.domains[0]
         return None
 
+    @property
+    def type(self) -> Optional[str]:
+        if self.domains and len(self.domains):
+            return "server"
+        elif self.client:
+            return "client"
+        elif self.name:
+            return "ca"
+        return None
+
 
 class Credentials:
 
-    def __init__(self, name: str, cert: Any, pkey: Any):
+    def __init__(self, name: str, cert: Any, pkey: Any, issuer: 'Credentials' = None):
         self._name = name
         self._cert = cert
         self._pkey = pkey
+        self._issuer = issuer
         self._cert_file = None
         self._pkey_file = None
         self._store = None
@@ -117,6 +128,10 @@ class Credentials:
             PrivateFormat.TraditionalOpenSSL if self.key_type.startswith('rsa') else PrivateFormat.PKCS8,
             NoEncryption())
 
+    @property
+    def issuer(self) -> Optional['Credentials']:
+        return self._issuer
+
     def set_store(self, store: 'CertStore'):
         self._store = store
 
@@ -145,13 +160,17 @@ class Credentials:
 
     def issue_cert(self, spec: CertificateSpec, chain: List['Credentials'] = None) -> 'Credentials':
         key_type = spec.key_type if spec.key_type else self.key_type
-        creds = self._store.load_credentials(name=spec.name, key_type=key_type, single_file=spec.single_file) \
-            if self._store else None
+        creds = None
+        if self._store:
+            creds = self._store.load_credentials(
+                name=spec.name, key_type=key_type, single_file=spec.single_file, issuer=self)
         if creds is None:
             creds = HttpdTestCA.create_credentials(spec=spec, issuer=self, key_type=key_type,
                                                    valid_from=spec.valid_from, valid_to=spec.valid_to)
             if self._store:
                 self._store.save(creds, single_file=spec.single_file)
+                if spec.type == "ca":
+                    self._store.save_chain(creds, "ca", with_root=True)
 
         if spec.sub_specs:
             if self._store:
@@ -196,6 +215,19 @@ class CertStore:
         creds.set_files(cert_file, pkey_file)
         self._add_credentials(name, creds)
 
+    def save_chain(self, creds: Credentials, infix: str, with_root=False):
+        name = creds.name
+        chain = [creds]
+        while creds.issuer is not None:
+            creds = creds.issuer
+            chain.append(creds)
+        if not with_root and len(chain) > 1:
+            chain = chain[:-1]
+        chain_file = os.path.join(self._store_dir, f'{name}-{infix}.pem')
+        with open(chain_file, "wb") as fd:
+            for c in chain:
+                fd.write(c.cert_pem)
+
     def _add_credentials(self, name: str, creds: Credentials):
         if name not in self._creds_by_name:
             self._creds_by_name[name] = []
@@ -220,13 +252,13 @@ class CertStore:
         with open(fpath) as fd:
             return load_pem_private_key("".join(fd.readlines()).encode(), password=None)
 
-    def load_credentials(self, name: str, key_type=None, single_file: bool = False):
+    def load_credentials(self, name: str, key_type=None, single_file: bool = False, issuer: Credentials = None):
         cert_file = self.get_cert_file(name=name, key_type=key_type)
         pkey_file = cert_file if single_file else self.get_pkey_file(name=name, key_type=key_type)
         if os.path.isfile(cert_file) and os.path.isfile(pkey_file):
             cert = self.load_pem_cert(cert_file)
             pkey = self.load_pem_pkey(pkey_file)
-            creds = Credentials(name=name, cert=cert, pkey=pkey)
+            creds = Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
             creds.set_store(self)
             creds.set_files(cert_file, pkey_file)
             self._add_credentials(name, creds)
@@ -239,7 +271,7 @@ class HttpdTestCA:
     @classmethod
     def create_root(cls, name: str, store_dir: str, key_type: str = "rsa2048") -> Credentials:
         store = CertStore(fpath=store_dir)
-        creds = store.load_credentials(name="ca", key_type=key_type)
+        creds = store.load_credentials(name="ca", key_type=key_type, issuer=None)
         if creds is None:
             creds = HttpdTestCA._make_ca_credentials(name=name, key_type=key_type)
             store.save(creds, name="ca")
@@ -405,7 +437,7 @@ class HttpdTestCA:
         cert = csr.sign(private_key=issuer_key,
                         algorithm=hashes.SHA256(),
                         backend=default_backend())
-        return Credentials(name=name, cert=cert, pkey=pkey)
+        return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
 
     @staticmethod
     def _make_server_credentials(name: str, domains: List[str], issuer: Credentials,
@@ -423,7 +455,7 @@ class HttpdTestCA:
         cert = csr.sign(private_key=issuer.private_key,
                         algorithm=hashes.SHA256(),
                         backend=default_backend())
-        return Credentials(name=name, cert=cert, pkey=pkey)
+        return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
 
     @staticmethod
     def _make_client_credentials(name: str,
@@ -441,4 +473,4 @@ class HttpdTestCA:
         cert = csr.sign(private_key=issuer.private_key,
                         algorithm=hashes.SHA256(),
                         backend=default_backend())
-        return Credentials(name=name, cert=cert, pkey=pkey)
+        return Credentials(name=name, cert=cert, pkey=pkey, issuer=issuer)
index 383b82d590b5a339d33d762376d4e50681b082ce..3fefffaa8c0261e01f1cf024ffff9cb1f96141c1 100644 (file)
@@ -40,23 +40,25 @@ class HttpdConf(object):
         if self.env.ssl_module == "ssl":
             self.add([
                 f"SSLCertificateFile {cert_file}",
-                f"SSLCertificateKeyFile {key_file}",
+                f"SSLCertificateKeyFile {key_file if key_file else cert_file}",
             ])
         elif self.env.ssl_module == "tls":
             self.add(f"""
                 TLSCertificate {cert_file} {key_file}
             """)
 
-    def add_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=True):
+    def add_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=None):
         self.start_vhost(domains=domains, port=port, doc_root=doc_root, with_ssl=with_ssl)
         self.end_vhost()
         return self
 
-    def start_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=False):
+    def start_vhost(self, domains, port=None, doc_root="htdocs", with_ssl=None):
         if not isinstance(domains, list):
             domains = [domains]
         if port is None:
             port = self.env.https_port
+        if with_ssl is None:
+            with_ssl = (self.env.https_port == port)
         self.add("")
         self.add(f"<VirtualHost *:{port}>")
         self._indents += 1
@@ -64,7 +66,7 @@ class HttpdConf(object):
         for alias in domains[1:]:
             self.add(f"ServerAlias {alias}")
         self.add(f"DocumentRoot {doc_root}")
-        if self.env.https_port == port or with_ssl:
+        if with_ssl:
             if self.env.ssl_module == "ssl":
                 self.add("SSLEngine on")
             for cred in self.env.get_credentials_for_name(domains[0]):
diff --git a/test/pyhttpd/conf/stop.conf.template b/test/pyhttpd/conf/stop.conf.template
new file mode 100644 (file)
index 0000000..21bae84
--- /dev/null
@@ -0,0 +1,46 @@
+# a config safe to use for stopping the server
+# this allows us to stop the server even when+
+# the config in the file is borked (as test cases may try to do that)
+#
+ServerName localhost
+ServerRoot "${server_dir}"
+
+Include "conf/modules.conf"
+
+DocumentRoot "${server_dir}/htdocs"
+
+<IfModule log_config_module>
+    LogFormat "%h %l %u %t \"%r\" %>s %O \"%{Referer}i\" \"%{User-Agent}i\" %k" combined
+    LogFormat "%h %l %u %t \"%r\" %>s %b" common
+    CustomLog "logs/access_log" combined
+
+</IfModule>
+
+TypesConfig "${gen_dir}/apache/conf/mime.types"
+
+Listen ${http_port}
+Listen ${https_port}
+
+<IfModule mod_ssl.c>
+    # provide some default
+    SSLSessionCache "shmcb:ssl_gcache_data(32000)"
+</IfModule>
+
+<VirtualHost *:${http_port}>
+    ServerName ${http_tld}
+    ServerAlias www.${http_tld}
+    <IfModule ssl_module>
+      SSLEngine off
+    </IfModule>
+    DocumentRoot "${server_dir}/htdocs"
+</VirtualHost>
+
+<Directory "${server_dir}/htdocs/cgi">
+    Options Indexes FollowSymLinks
+    AllowOverride None
+    Require all granted
+
+    AddHandler cgi-script .py
+    AddHandler cgi-script .cgi
+    Options +ExecCGI
+</Directory>
index eaee5c497ba91248ddd2558e75b9952b6f4a7adc..73044ae40b122de82fc48aae34d913cb97b54cba 100644 (file)
@@ -362,7 +362,7 @@ class HttpdTestEnv:
         self._cert_specs.extend(specs)
 
     def get_credentials_for_name(self, dns_name) -> List['Credentials']:
-        for spec in self._cert_specs:
+        for spec in [s for s in self._cert_specs if s.domains is not None]:
             if dns_name in spec.domains:
                 return self.ca.get_credentials_for_name(spec.domains[0])
         return []
@@ -420,6 +420,7 @@ class HttpdTestEnv:
     def install_test_conf(self, lines: List[str]):
         with open(self._test_conf, 'w') as fd:
             fd.write('\n'.join(self._httpd_base_conf))
+            fd.write('\n')
             if self._verbosity >= 2:
                 fd.write(f"LogLevel core:trace5 {self.mpm_module}:trace5\n")
             if self._log_interesting:
@@ -479,9 +480,10 @@ class HttpdTestEnv:
         return False
 
     def _run_apachectl(self, cmd) -> ExecResult:
+        conf_file = 'stop.conf' if cmd == 'stop' else 'httpd.conf'
         args = [self._apachectl,
                 "-d", self.server_dir,
-                "-f", os.path.join(self._server_dir, 'conf/httpd.conf'),
+                "-f", os.path.join(self._server_dir, f'conf/{conf_file}'),
                 "-k", cmd]
         r = self.run(args)
         self._apachectl_stderr = r.stderr