if(result)
return result;
}
- sslc->primary.CApath = data->set.str[STRING_SSL_CAPATH];
#endif
#ifdef CURL_CA_BUNDLE
if(!sslc->custom_cafile && !set->str[STRING_SSL_CAFILE]) {
}
sslc->primary.CAfile = data->set.str[STRING_SSL_CAFILE];
sslc->primary.CRLfile = data->set.str[STRING_SSL_CRLFILE];
+ sslc->primary.CApath = data->set.str[STRING_SSL_CAPATH];
sslc->primary.issuercert = data->set.str[STRING_SSL_ISSUERCERT];
sslc->primary.issuercert_blob = data->set.blobs[BLOB_SSL_ISSUERCERT];
sslc->primary.cipher_list = data->set.str[STRING_SSL_CIPHER_LIST];
if(result)
return result;
}
- sslc->primary.CApath = data->set.str[STRING_SSL_CAPATH_PROXY];
#endif
#ifdef CURL_CA_BUNDLE
if(!sslc->custom_cafile && !set->str[STRING_SSL_CAFILE_PROXY]) {
#endif
}
sslc->primary.CAfile = data->set.str[STRING_SSL_CAFILE_PROXY];
+ sslc->primary.CApath = data->set.str[STRING_SSL_CAPATH_PROXY];
sslc->primary.cipher_list = data->set.str[STRING_SSL_CIPHER_LIST_PROXY];
sslc->primary.cipher_list13 = data->set.str[STRING_SSL_CIPHER13_LIST_PROXY];
sslc->primary.pinned_key = data->set.str[STRING_SSL_PINNEDPUBLICKEY_PROXY];
])
# expect NOT_IMPLEMENTED or OK
assert r.exit_code in [0, 2], f'{r.dump_logs()}'
+
+ @pytest.mark.skipif(condition=not Env.have_openssl(), reason="needs openssl command")
+ def test_17_21_capath_valid(self, env: Env, httpd):
+ if env.curl_uses_lib('rustls-ffi'):
+ pytest.skip('rustls does not support CURLOPT_CAPATH')
+ proto = 'http/1.1'
+ curl = CurlClient(env=env)
+ url = f'https://{env.authority_for(env.domain1, proto)}/curltest/sslinfo'
+ r = curl.http_get(url=url, alpn_proto=proto, extra_args=[
+ '--capath', env.ca.hashdir
+ ])
+ assert r.exit_code == 0, f'{r.dump_logs()}'
+ assert r.json['HTTPS'] == 'on', f'{r.json}'
+
+ @pytest.mark.skipif(condition=not Env.have_openssl(), reason="needs openssl command")
+ def test_17_22_capath_invalid(self, env: Env, httpd):
+ # we can test all TLS backends here. the ones not supporting CAPATH
+ # need to fail as well as the ones which do, but get an invalid path.
+ proto = 'http/1.1'
+ curl = CurlClient(env=env)
+ url = f'https://{env.authority_for(env.domain1, proto)}/curltest/sslinfo'
+ r = curl.http_get(url=url, alpn_proto=proto, extra_args=[
+ '--capath', os.path.join(env.gen_dir, 'ca/invalid')
+ ])
+ # CURLE_PEER_FAILED_VERIFICATION or CURLE_SSL_CACERT_BADFILE
+ assert r.exit_code in [60, 77], f'{r.dump_logs()}'
import ipaddress
import os
import re
+import shutil
+import subprocess
from datetime import timedelta, datetime, timezone
from typing import List, Any, Optional
def combined_file(self) -> Optional[str]:
return self._combined_file
+ @property
+ def hashdir(self) -> Optional[str]:
+ return os.path.join(self._store.path, 'hashdir')
+
def get_first(self, name) -> Optional['Credentials']:
creds = self._store.get_credentials_for_name(name) if self._store else []
return creds[0] if len(creds) else None
creds.issue_certs(spec.sub_specs, chain=subchain)
return creds
+ def create_hashdir(self, openssl):
+ os.makedirs(self.hashdir, exist_ok=True)
+ p = subprocess.run(args=[
+ openssl, 'x509', '-hash', '-noout', '-in', self.cert_file
+ ], capture_output=True, text=True)
+ if p.returncode != 0:
+ raise Exception(f'openssl failed to compute cert hash: {p}')
+ cert_hname = f'{p.stdout.strip()}.0'
+ shutil.copy(self.cert_file, os.path.join(self.hashdir, cert_hname))
+
class CertStore:
pass
elif insecure:
args.append('--insecure')
- elif active_options and "--cacert" in active_options:
+ elif active_options and ("--cacert" in active_options or \
+ "--capath" in active_options):
pass
elif u.hostname:
args.extend(["--cacert", self.env.ca.cert_file])
]),
]
+ self.openssl = 'openssl'
+ p = subprocess.run(args=[self.openssl, 'version'],
+ capture_output=True, text=True)
+ if p.returncode != 0:
+ # no openssl in path
+ self.openssl = None
+ self.openssl_version = None
+ else:
+ self.openssl_version = p.stdout.strip()
+
self.nghttpx = self.config['nghttpx']['nghttpx']
if len(self.nghttpx.strip()) == 0:
self.nghttpx = None
def incomplete_reason() -> Optional[str]:
return Env.CONFIG.get_incomplete_reason()
+ @staticmethod
+ def have_openssl() -> bool:
+ return Env.CONFIG.openssl is not None
+
@staticmethod
def have_nghttpx() -> bool:
return Env.CONFIG.nghttpx is not None
store_dir=ca_dir,
key_type="rsa2048")
self._ca.issue_certs(self.CONFIG.cert_specs)
+ if self.have_openssl():
+ self._ca.create_hashdir(self.openssl)
def setup(self):
os.makedirs(self.gen_dir, exist_ok=True)
def curl(self) -> str:
return self.CONFIG.curl
+ @property
+ def openssl(self) -> Optional[str]:
+ return self.CONFIG.openssl
+
@property
def httpd(self) -> str:
return self.CONFIG.httpd