]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
pytest: add pinnedpubkey test cases
authorStefan Eissing <stefan@eissing.org>
Wed, 21 May 2025 15:40:11 +0000 (17:40 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Wed, 21 May 2025 20:45:42 +0000 (22:45 +0200)
Add positive/negative test cases in pytest for pinned public keys.

Closes #17412

lib/vquic/vquic-tls.c
lib/vtls/wolfssl.c
lib/vtls/wolfssl.h
tests/http/test_17_ssl_use.py
tests/http/testenv/certs.py

index c4b56e4ff10da2c580dbc6cd2a8055bba4e1f5cd..2a5be138fc68c9f5ac0b4535a5c8815d29eb938e 100644 (file)
@@ -187,6 +187,8 @@ CURLcode Curl_vquic_tls_verify_peer(struct curl_tls_ctx *ctx,
     }
     wolfSSL_X509_free(cert);
   }
+  if(!result)
+    result = Curl_wssl_verify_pinned(cf, data, &ctx->wssl);
 #endif
   /* on error, remove any session we might have in the pool */
   if(result)
index ee3f650fce932e2b20e8cb90592c559674bfd338..fb27b543e4b6affac898af4d6bae405aec5e7ff2 100644 (file)
@@ -1526,10 +1526,10 @@ static char *wssl_strerror(unsigned long error, char *buf,
   return buf;
 }
 
-static CURLcode wssl_verify_pinned(struct Curl_cfilter *cf,
-                                   struct Curl_easy *data)
+CURLcode Curl_wssl_verify_pinned(struct Curl_cfilter *cf,
+                                 struct Curl_easy *data,
+                                 struct wssl_ctx *wssl)
 {
-  struct ssl_connect_data *connssl = cf->ctx;
 #ifndef CURL_DISABLE_PROXY
   const char * const pinnedpubkey = Curl_ssl_cf_is_proxy(cf) ?
     data->set.str[STRING_SSL_PINNEDPUBLICKEY_PROXY] :
@@ -1540,7 +1540,6 @@ static CURLcode wssl_verify_pinned(struct Curl_cfilter *cf,
 
   if(pinnedpubkey) {
 #ifdef KEEP_PEER_CERT
-    struct wssl_ctx *wssl = (struct wssl_ctx *)connssl->backend;
     WOLFSSL_X509 *x509;
     const char *x509_der;
     int x509_der_len;
@@ -2138,7 +2137,7 @@ static CURLcode wssl_connect(struct Curl_cfilter *cf,
       result = wssl->hs_result;
       goto out;
     }
-    result = wssl_verify_pinned(cf, data);
+    result = Curl_wssl_verify_pinned(cf, data, wssl);
     if(result) {
       wssl->hs_result = result;
       goto out;
index c741f01bd9f91ea3f3848604e0cdad0cff5a637a..0ddbee9ed9e60850d5ad6b044400dc9a3ee3fa17 100644 (file)
@@ -75,11 +75,6 @@ CURLcode Curl_wssl_setup_x509_store(struct Curl_cfilter *cf,
                                     struct Curl_easy *data,
                                     struct wssl_ctx *wssl);
 
-CURLcode Curl_wssl_setup_session(struct Curl_cfilter *cf,
-                                 struct Curl_easy *data,
-                                 struct wssl_ctx *wss,
-                                 const char *ssl_peer_key);
-
 CURLcode Curl_wssl_cache_session(struct Curl_cfilter *cf,
                                  struct Curl_easy *data,
                                  const char *ssl_peer_key,
@@ -89,6 +84,10 @@ CURLcode Curl_wssl_cache_session(struct Curl_cfilter *cf,
                                  unsigned char *quic_tp,
                                  size_t quic_tp_len);
 
+CURLcode Curl_wssl_verify_pinned(struct Curl_cfilter *cf,
+                                 struct Curl_easy *data,
+                                 struct wssl_ctx *wssl);
+
 
 #endif /* USE_WOLFSSL */
 #endif /* HEADER_CURL_WOLFSSL_H */
index d76f255da9903a9f83a244b98f7ceaf6ab708b16..9c93013b360c5e57939a2781309773493c06df5b 100644 (file)
@@ -513,3 +513,31 @@ class TestSSLUse:
             assert r.json['SSL_CIPHER'] in ciphers, r.dump_logs()
         else:
             assert r.exit_code != 0, r.dump_logs()
+
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_17_19_wrong_pin(self, env: Env, proto, httpd):
+        if proto == 'h3' and not env.have_h3():
+            pytest.skip("h3 not supported")
+        if env.curl_uses_any_libs(['bearssl', 'rustls-ffi']):
+            pytest.skip('TLS backend ignores --pinnedpubkey')
+        curl = CurlClient(env=env)
+        url = f'https://{env.authority_for(env.domain1, proto)}/curltest/sslinfo'
+        r = curl.http_get(url=url, alpn_proto=proto, extra_args=[
+            '--pinnedpubkey', 'sha256//ffff'
+        ])
+        # expect NOT_IMPLEMENTED or CURLE_SSL_PINNEDPUBKEYNOTMATCH
+        assert r.exit_code in [2, 90], f'{r.dump_logs()}'
+
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_17_20_correct_pin(self, env: Env, proto, httpd):
+        if proto == 'h3' and not env.have_h3():
+            pytest.skip("h3 not supported")
+        curl = CurlClient(env=env)
+        creds = env.get_credentials(env.domain1)
+        assert creds
+        url = f'https://{env.authority_for(env.domain1, proto)}/curltest/sslinfo'
+        r = curl.http_get(url=url, alpn_proto=proto, extra_args=[
+            '--pinnedpubkey', f'sha256//{creds.pub_sha256_b64()}'
+        ])
+        # expect NOT_IMPLEMENTED or OK
+        assert r.exit_code in [0, 2], f'{r.dump_logs()}'
index 3795ba947a47804747c7805ec6619d60aed144f7..bfb7287eeee5f0f15a8694ddf2f60dbd6cc90c79 100644 (file)
@@ -24,6 +24,7 @@
 #
 ###########################################################################
 #
+import base64
 import ipaddress
 import os
 import re
@@ -33,6 +34,7 @@ from typing import List, Any, Optional
 from cryptography import x509
 from cryptography.hazmat.backends import default_backend
 from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives._serialization import PublicFormat
 from cryptography.hazmat.primitives.asymmetric import ec, rsa
 from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
 from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
@@ -149,6 +151,15 @@ class Credentials:
     def private_key(self) -> Any:
         return self._pkey
 
+    def pub_sha256_b64(self) -> Any:
+        pubkey = self._pkey.public_key()
+        sha256 = hashes.Hash(algorithm=hashes.SHA256())
+        sha256.update(pubkey.public_bytes(
+            encoding=Encoding.DER,
+            format=PublicFormat.SubjectPublicKeyInfo
+        ))
+        return base64.b64encode(sha256.finalize()).decode('utf8')
+
     @property
     def certificate(self) -> Any:
         return self._cert
@@ -393,7 +404,7 @@ class TestCA:
             issuer_subject: Optional[Credentials],
             valid_from_delta: Optional[timedelta] = None,
             valid_until_delta: Optional[timedelta] = None
-    ):
+    ) -> x509.CertificateBuilder:
         pubkey = pkey.public_key()
         issuer_subject = issuer_subject if issuer_subject is not None else subject