Add positive/negative test cases in pytest for pinned public keys.
Closes #17412
}
wolfSSL_X509_free(cert);
}
+ if(!result)
+ result = Curl_wssl_verify_pinned(cf, data, &ctx->wssl);
#endif
/* on error, remove any session we might have in the pool */
if(result)
return buf;
}
-static CURLcode wssl_verify_pinned(struct Curl_cfilter *cf,
- struct Curl_easy *data)
+CURLcode Curl_wssl_verify_pinned(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct wssl_ctx *wssl)
{
- struct ssl_connect_data *connssl = cf->ctx;
#ifndef CURL_DISABLE_PROXY
const char * const pinnedpubkey = Curl_ssl_cf_is_proxy(cf) ?
data->set.str[STRING_SSL_PINNEDPUBLICKEY_PROXY] :
if(pinnedpubkey) {
#ifdef KEEP_PEER_CERT
- struct wssl_ctx *wssl = (struct wssl_ctx *)connssl->backend;
WOLFSSL_X509 *x509;
const char *x509_der;
int x509_der_len;
result = wssl->hs_result;
goto out;
}
- result = wssl_verify_pinned(cf, data);
+ result = Curl_wssl_verify_pinned(cf, data, wssl);
if(result) {
wssl->hs_result = result;
goto out;
struct Curl_easy *data,
struct wssl_ctx *wssl);
-CURLcode Curl_wssl_setup_session(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- struct wssl_ctx *wss,
- const char *ssl_peer_key);
-
CURLcode Curl_wssl_cache_session(struct Curl_cfilter *cf,
struct Curl_easy *data,
const char *ssl_peer_key,
unsigned char *quic_tp,
size_t quic_tp_len);
+CURLcode Curl_wssl_verify_pinned(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct wssl_ctx *wssl);
+
#endif /* USE_WOLFSSL */
#endif /* HEADER_CURL_WOLFSSL_H */
assert r.json['SSL_CIPHER'] in ciphers, r.dump_logs()
else:
assert r.exit_code != 0, r.dump_logs()
+
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_17_19_wrong_pin(self, env: Env, proto, httpd):
+ if proto == 'h3' and not env.have_h3():
+ pytest.skip("h3 not supported")
+ if env.curl_uses_any_libs(['bearssl', 'rustls-ffi']):
+ pytest.skip('TLS backend ignores --pinnedpubkey')
+ curl = CurlClient(env=env)
+ url = f'https://{env.authority_for(env.domain1, proto)}/curltest/sslinfo'
+ r = curl.http_get(url=url, alpn_proto=proto, extra_args=[
+ '--pinnedpubkey', 'sha256//ffff'
+ ])
+ # expect NOT_IMPLEMENTED or CURLE_SSL_PINNEDPUBKEYNOTMATCH
+ assert r.exit_code in [2, 90], f'{r.dump_logs()}'
+
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_17_20_correct_pin(self, env: Env, proto, httpd):
+ if proto == 'h3' and not env.have_h3():
+ pytest.skip("h3 not supported")
+ curl = CurlClient(env=env)
+ creds = env.get_credentials(env.domain1)
+ assert creds
+ url = f'https://{env.authority_for(env.domain1, proto)}/curltest/sslinfo'
+ r = curl.http_get(url=url, alpn_proto=proto, extra_args=[
+ '--pinnedpubkey', f'sha256//{creds.pub_sha256_b64()}'
+ ])
+ # expect NOT_IMPLEMENTED or OK
+ assert r.exit_code in [0, 2], f'{r.dump_logs()}'
#
###########################################################################
#
+import base64
import ipaddress
import os
import re
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives._serialization import PublicFormat
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
def private_key(self) -> Any:
return self._pkey
+ def pub_sha256_b64(self) -> Any:
+ pubkey = self._pkey.public_key()
+ sha256 = hashes.Hash(algorithm=hashes.SHA256())
+ sha256.update(pubkey.public_bytes(
+ encoding=Encoding.DER,
+ format=PublicFormat.SubjectPublicKeyInfo
+ ))
+ return base64.b64encode(sha256.finalize()).decode('utf8')
+
@property
def certificate(self) -> Any:
return self._cert
issuer_subject: Optional[Credentials],
valid_from_delta: Optional[timedelta] = None,
valid_until_delta: Optional[timedelta] = None
- ):
+ ) -> x509.CertificateBuilder:
pubkey = pkey.public_key()
issuer_subject = issuer_subject if issuer_subject is not None else subject