/* Set an exception of given type using the given OpenSSL error code. */
static void
-set_ssl_exception_from_errcode(PyObject *exc, unsigned long errcode)
+set_ssl_exception_from_errcode(PyObject *exc_type, unsigned long errcode)
{
+ assert(exc_type != NULL);
assert(errcode != 0);
/* ERR_ERROR_STRING(3) ensures that the messages below are ASCII */
const char *reason = ERR_reason_error_string(errcode);
if (lib && func) {
- PyErr_Format(exc, "[%s: %s] %s", lib, func, reason);
+ PyErr_Format(exc_type, "[%s: %s] %s", lib, func, reason);
}
else if (lib) {
- PyErr_Format(exc, "[%s] %s", lib, reason);
+ PyErr_Format(exc_type, "[%s] %s", lib, reason);
}
else {
- PyErr_SetString(exc, reason);
+ PyErr_SetString(exc_type, reason);
+ }
+}
+
+/*
+ * Get an appropriate exception type for the given OpenSSL error code.
+ *
+ * The exception type depends on the error code reason.
+ */
+static PyObject *
+get_smart_ssl_exception_type(unsigned long errcode, PyObject *default_exc_type)
+{
+ switch (ERR_GET_REASON(errcode)) {
+ case ERR_R_MALLOC_FAILURE:
+ return PyExc_MemoryError;
+ default:
+ return default_exc_type;
}
}
* Set an exception of given type.
*
* By default, the exception's message is constructed by using the last SSL
- * error that occurred. If no error occurred, the 'fallback_format' is used
- * to create a C-style formatted fallback message.
+ * error that occurred. If no error occurred, the 'fallback_message' is used
+ * to create an exception message.
*/
static void
-raise_ssl_error(PyObject *exc, const char *fallback_format, ...)
+raise_ssl_error(PyObject *exc_type, const char *fallback_message)
+{
+ assert(fallback_message != NULL);
+ unsigned long errcode = ERR_peek_last_error();
+ if (errcode) {
+ ERR_clear_error();
+ set_ssl_exception_from_errcode(exc_type, errcode);
+ }
+ else {
+ PyErr_SetString(exc_type, fallback_message);
+ }
+}
+
+/* Same as raise_ssl_error() but with a C-style formatted message. */
+static void
+raise_ssl_error_f(PyObject *exc_type, const char *fallback_format, ...)
{
assert(fallback_format != NULL);
unsigned long errcode = ERR_peek_last_error();
if (errcode) {
ERR_clear_error();
- set_ssl_exception_from_errcode(exc, errcode);
+ set_ssl_exception_from_errcode(exc_type, errcode);
}
else {
va_list vargs;
va_start(vargs, fallback_format);
- PyErr_FormatV(exc, fallback_format, vargs);
+ PyErr_FormatV(exc_type, fallback_format, vargs);
+ va_end(vargs);
+ }
+}
+
+/* Same as raise_ssl_error_f() with smart exception types. */
+static void
+raise_smart_ssl_error_f(PyObject *exc_type, const char *fallback_format, ...)
+{
+ unsigned long errcode = ERR_peek_last_error();
+ if (errcode) {
+ ERR_clear_error();
+ exc_type = get_smart_ssl_exception_type(errcode, exc_type);
+ set_ssl_exception_from_errcode(exc_type, errcode);
+ }
+ else {
+ va_list vargs;
+ va_start(vargs, fallback_format);
+ PyErr_FormatV(exc_type, fallback_format, vargs);
va_end(vargs);
}
}
/*
- * Set an exception with a generic default message after an error occurred.
- *
- * It can also be used without previous calls to SSL built-in functions,
- * in which case a generic error message is provided.
+ * Raise a ValueError with a default message after an error occurred.
+ * It can also be used without previous calls to SSL built-in functions.
*/
static inline void
-notify_ssl_error_occurred(void)
+notify_ssl_error_occurred(const char *message)
+{
+ raise_ssl_error(PyExc_ValueError, message);
+}
+
+/* Same as notify_ssl_error_occurred() for failed OpenSSL functions. */
+static inline void
+notify_ssl_error_occurred_in(const char *funcname)
{
- raise_ssl_error(PyExc_ValueError, "no reason supplied");
+ raise_ssl_error_f(PyExc_ValueError,
+ "error in OpenSSL function %s()", funcname);
+}
+
+/* Same as notify_ssl_error_occurred_in() with smart exception types. */
+static inline void
+notify_smart_ssl_error_occurred_in(const char *funcname)
+{
+ raise_smart_ssl_error_f(PyExc_ValueError,
+ "error in OpenSSL function %s()", funcname);
}
/* LCOV_EXCL_STOP */
// In OpenSSL 3.0 and later, OBJ_nid*() are thread-safe and may raise.
assert(ERR_peek_last_error() != 0);
if (ERR_GET_REASON(ERR_peek_last_error()) != OBJ_R_UNKNOWN_NID) {
- notify_ssl_error_occurred();
- return NULL;
+ goto error;
}
// fallback to short name and unconditionally propagate errors
name = OBJ_nid2sn(nid);
if (name == NULL) {
- raise_ssl_error(PyExc_ValueError, "cannot resolve NID %d", nid);
+ goto error;
}
}
return name;
+
+error:
+ raise_ssl_error_f(PyExc_ValueError, "cannot resolve NID %d", nid);
+ return NULL;
}
/*
get_openssl_evp_md_by_utf8name(PyObject *module, const char *name,
Py_hash_type py_ht)
{
- PY_EVP_MD *digest = NULL;
- PY_EVP_MD *other_digest = NULL;
+ PY_EVP_MD *digest = NULL, *other_digest = NULL;
_hashlibstate *state = get_hashlib_state(module);
py_hashentry_t *entry = (py_hashentry_t *)_Py_hashtable_get(
state->hashtable, (const void*)name
#endif
}
break;
+ default:
+ goto invalid_hash_type;
}
// if another thread same thing at same time make sure we got same ptr
assert(other_digest == NULL || other_digest == digest);
- if (digest != NULL) {
- if (other_digest == NULL) {
- PY_EVP_MD_up_ref(digest);
- }
+ if (digest != NULL && other_digest == NULL) {
+ PY_EVP_MD_up_ref(digest);
}
- } else {
+ }
+ else {
// Fall back for looking up an unindexed OpenSSL specific name.
switch (py_ht) {
case Py_ht_evp:
case Py_ht_evp_nosecurity:
digest = PY_EVP_MD_fetch(name, "-fips");
break;
+ default:
+ goto invalid_hash_type;
}
}
if (digest == NULL) {
- raise_ssl_error(state->unsupported_digestmod_error,
- "unsupported hash type %s", name);
+ raise_ssl_error_f(state->unsupported_digestmod_error,
+ "unsupported digest name: %s", name);
return NULL;
}
return digest;
+
+invalid_hash_type:
+ assert(digest == NULL);
+ PyErr_Format(PyExc_SystemError, "unsupported hash type %d", py_ht);
+ return NULL;
}
/*
return get_openssl_evp_md_by_utf8name(module, name, py_ht);
}
+// --- OpenSSL HASH wrappers --------------------------------------------------
+
+/* Thin wrapper around EVP_MD_CTX_new() which sets an exception on failure. */
+static EVP_MD_CTX *
+py_wrapper_EVP_MD_CTX_new(void)
+{
+ EVP_MD_CTX *ctx = EVP_MD_CTX_new();
+ if (ctx == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ return ctx;
+}
+
+// --- HASH interface ---------------------------------------------------------
+
static HASHobject *
new_hash_object(PyTypeObject *type)
{
}
HASHLIB_INIT_MUTEX(retval);
- retval->ctx = EVP_MD_CTX_new();
+ retval->ctx = py_wrapper_EVP_MD_CTX_new();
if (retval->ctx == NULL) {
Py_DECREF(retval);
- PyErr_NoMemory();
return NULL;
}
else
process = Py_SAFE_DOWNCAST(len, Py_ssize_t, unsigned int);
if (!EVP_DigestUpdate(self->ctx, (const void*)cp, process)) {
- notify_ssl_error_occurred();
+ notify_ssl_error_occurred_in(Py_STRINGIFY(EVP_DigestUpdate));
return -1;
}
len -= process;
ENTER_HASHLIB(self);
result = EVP_MD_CTX_copy(new_ctx_p, self->ctx);
LEAVE_HASHLIB(self);
- return result;
+ if (result == 0) {
+ notify_smart_ssl_error_occurred_in(Py_STRINGIFY(EVP_MD_CTX_copy));
+ return -1;
+ }
+ return 0;
}
/* External methods for a hash object */
if ((newobj = new_hash_object(Py_TYPE(self))) == NULL)
return NULL;
- if (!_hashlib_HASH_copy_locked(self, newobj->ctx)) {
+ if (_hashlib_HASH_copy_locked(self, newobj->ctx) < 0) {
Py_DECREF(newobj);
- notify_ssl_error_occurred();
return NULL;
}
return (PyObject *)newobj;
}
+static Py_ssize_t
+_hashlib_HASH_digest_compute(HASHobject *self, unsigned char *digest)
+{
+ EVP_MD_CTX *ctx = py_wrapper_EVP_MD_CTX_new();
+ if (ctx == NULL) {
+ return -1;
+ }
+ if (_hashlib_HASH_copy_locked(self, ctx) < 0) {
+ goto error;
+ }
+ Py_ssize_t digest_size = EVP_MD_CTX_size(ctx);
+ if (!EVP_DigestFinal(ctx, digest, NULL)) {
+ notify_ssl_error_occurred_in(Py_STRINGIFY(EVP_DigestFinal));
+ goto error;
+ }
+ EVP_MD_CTX_free(ctx);
+ return digest_size;
+
+error:
+ EVP_MD_CTX_free(ctx);
+ return -1;
+}
+
/*[clinic input]
_hashlib.HASH.digest
/*[clinic end generated code: output=3fc6f9671d712850 input=d8d528d6e50af0de]*/
{
unsigned char digest[EVP_MAX_MD_SIZE];
- EVP_MD_CTX *temp_ctx;
- PyObject *retval;
- unsigned int digest_size;
-
- temp_ctx = EVP_MD_CTX_new();
- if (temp_ctx == NULL) {
- PyErr_NoMemory();
- return NULL;
- }
-
- if (!_hashlib_HASH_copy_locked(self, temp_ctx)) {
- goto error;
- }
- digest_size = EVP_MD_CTX_size(temp_ctx);
- if (!EVP_DigestFinal(temp_ctx, digest, NULL)) {
- goto error;
- }
-
- retval = PyBytes_FromStringAndSize((const char *)digest, digest_size);
- EVP_MD_CTX_free(temp_ctx);
- return retval;
-
-error:
- EVP_MD_CTX_free(temp_ctx);
- notify_ssl_error_occurred();
- return NULL;
+ Py_ssize_t n = _hashlib_HASH_digest_compute(self, digest);
+ return n < 0 ? NULL : PyBytes_FromStringAndSize((const char *)digest, n);
}
/*[clinic input]
/*[clinic end generated code: output=1b8e60d9711e7f4d input=ae7553f78f8372d8]*/
{
unsigned char digest[EVP_MAX_MD_SIZE];
- EVP_MD_CTX *temp_ctx;
- unsigned int digest_size;
-
- temp_ctx = EVP_MD_CTX_new();
- if (temp_ctx == NULL) {
- PyErr_NoMemory();
- return NULL;
- }
-
- /* Get the raw (binary) digest value */
- if (!_hashlib_HASH_copy_locked(self, temp_ctx)) {
- goto error;
- }
- digest_size = EVP_MD_CTX_size(temp_ctx);
- if (!EVP_DigestFinal(temp_ctx, digest, NULL)) {
- goto error;
- }
-
- EVP_MD_CTX_free(temp_ctx);
-
- return _Py_strhex((const char *)digest, (Py_ssize_t)digest_size);
-
-error:
- EVP_MD_CTX_free(temp_ctx);
- notify_ssl_error_occurred();
- return NULL;
+ Py_ssize_t n = _hashlib_HASH_digest_compute(self, digest);
+ return n < 0 ? NULL : _Py_strhex((const char *)digest, n);
}
/*[clinic input]
HASHobject *self = HASHobject_CAST(op);
const EVP_MD *md = EVP_MD_CTX_md(self->ctx);
if (md == NULL) {
- notify_ssl_error_occurred();
+ notify_ssl_error_occurred("missing EVP_MD for HASH context");
return NULL;
}
const char *name = get_hashlib_utf8name_by_evp_md(md);
return NULL;
}
- temp_ctx = EVP_MD_CTX_new();
+ temp_ctx = py_wrapper_EVP_MD_CTX_new();
if (temp_ctx == NULL) {
Py_DECREF(retval);
- PyErr_NoMemory();
return NULL;
}
- if (!_hashlib_HASH_copy_locked(self, temp_ctx)) {
+ if (_hashlib_HASH_copy_locked(self, temp_ctx) < 0) {
goto error;
}
if (!EVP_DigestFinalXOF(temp_ctx,
(unsigned char*)PyBytes_AS_STRING(retval),
length))
{
+ notify_ssl_error_occurred_in(Py_STRINGIFY(EVP_DigestFinalXOF));
goto error;
}
error:
Py_DECREF(retval);
EVP_MD_CTX_free(temp_ctx);
- notify_ssl_error_occurred();
return NULL;
}
return NULL;
}
- temp_ctx = EVP_MD_CTX_new();
+ temp_ctx = py_wrapper_EVP_MD_CTX_new();
if (temp_ctx == NULL) {
PyMem_Free(digest);
- PyErr_NoMemory();
return NULL;
}
/* Get the raw (binary) digest value */
- if (!_hashlib_HASH_copy_locked(self, temp_ctx)) {
+ if (_hashlib_HASH_copy_locked(self, temp_ctx) < 0) {
goto error;
}
if (!EVP_DigestFinalXOF(temp_ctx, digest, length)) {
+ notify_ssl_error_occurred_in(Py_STRINGIFY(EVP_DigestFinalXOF));
goto error;
}
error:
PyMem_Free(digest);
EVP_MD_CTX_free(temp_ctx);
- notify_ssl_error_occurred();
return NULL;
}
int result = EVP_DigestInit_ex(self->ctx, digest, NULL);
if (!result) {
- notify_ssl_error_occurred();
+ notify_ssl_error_occurred_in(Py_STRINGIFY(EVP_DigestInit_ex));
Py_CLEAR(self);
goto exit;
}
if (!retval) {
Py_CLEAR(key_obj);
- notify_ssl_error_occurred();
+ notify_ssl_error_occurred_in(Py_STRINGIFY(PKCS5_PBKDF2_HMAC));
goto end;
}
/* let OpenSSL validate the rest */
retval = EVP_PBE_scrypt(NULL, 0, NULL, 0, n, r, p, maxmem, NULL, 0);
if (!retval) {
- raise_ssl_error(PyExc_ValueError,
- "Invalid parameter combination for n, r, p, maxmem.");
+ notify_ssl_error_occurred(
+ "Invalid parameter combination for n, r, p, maxmem.");
return NULL;
}
if (!retval) {
Py_CLEAR(key_obj);
- notify_ssl_error_occurred();
+ notify_ssl_error_occurred_in(Py_STRINGIFY(EVP_PBE_scrypt));
return NULL;
}
return key_obj;
PY_EVP_MD_free(evp);
if (result == NULL) {
- notify_ssl_error_occurred();
+ notify_ssl_error_occurred_in(Py_STRINGIFY(HMAC));
return NULL;
}
return PyBytes_FromStringAndSize((const char*)md, md_len);
/* OpenSSL-based HMAC implementation
*/
+/* Thin wrapper around HMAC_CTX_new() which sets an exception on failure. */
+static HMAC_CTX *
+py_openssl_wrapper_HMAC_CTX_new(void)
+{
+ HMAC_CTX *ctx = HMAC_CTX_new();
+ if (ctx == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ return ctx;
+}
+
static int _hmac_update(HMACobject*, PyObject*);
static const EVP_MD *
{
const EVP_MD *md = HMAC_CTX_get_md(self->ctx);
if (md == NULL) {
- raise_ssl_error(PyExc_ValueError, "missing EVP_MD for HMAC context");
+ notify_ssl_error_occurred("missing EVP_MD for HMAC context");
}
return md;
}
return NULL;
}
- ctx = HMAC_CTX_new();
+ ctx = py_openssl_wrapper_HMAC_CTX_new();
if (ctx == NULL) {
PY_EVP_MD_free(digest);
- PyErr_NoMemory();
goto error;
}
r = HMAC_Init_ex(ctx, key->buf, (int)key->len, digest, NULL /* impl */);
PY_EVP_MD_free(digest);
if (r == 0) {
- notify_ssl_error_occurred();
+ notify_ssl_error_occurred_in(Py_STRINGIFY(HMAC_Init_ex));
goto error;
}
ENTER_HASHLIB(self);
result = HMAC_CTX_copy(new_ctx_p, self->ctx);
LEAVE_HASHLIB(self);
- return result;
+ if (result == 0) {
+ notify_smart_ssl_error_occurred_in(Py_STRINGIFY(HMAC_CTX_copy));
+ return -1;
+ }
+ return 0;
}
/* returning 0 means that an error occurred and an exception is set */
unsigned int digest_size = EVP_MD_size(md);
assert(digest_size <= EVP_MAX_MD_SIZE);
if (digest_size == 0) {
- raise_ssl_error(PyExc_ValueError, "invalid digest size");
+ notify_ssl_error_occurred("invalid digest size");
}
return digest_size;
}
PyBuffer_Release(&view);
if (r == 0) {
- notify_ssl_error_occurred();
+ notify_ssl_error_occurred_in(Py_STRINGIFY(HMAC_Update));
return 0;
}
return 1;
{
HMACobject *retval;
- HMAC_CTX *ctx = HMAC_CTX_new();
+ HMAC_CTX *ctx = py_openssl_wrapper_HMAC_CTX_new();
if (ctx == NULL) {
- return PyErr_NoMemory();
+ return NULL;
}
- if (!locked_HMAC_CTX_copy(ctx, self)) {
+ if (locked_HMAC_CTX_copy(ctx, self) < 0) {
HMAC_CTX_free(ctx);
- notify_ssl_error_occurred();
return NULL;
}
static int
_hmac_digest(HMACobject *self, unsigned char *buf, unsigned int len)
{
- HMAC_CTX *temp_ctx = HMAC_CTX_new();
+ HMAC_CTX *temp_ctx = py_openssl_wrapper_HMAC_CTX_new();
if (temp_ctx == NULL) {
- (void)PyErr_NoMemory();
return 0;
}
- if (!locked_HMAC_CTX_copy(temp_ctx, self)) {
+ if (locked_HMAC_CTX_copy(temp_ctx, self) < 0) {
HMAC_CTX_free(temp_ctx);
- notify_ssl_error_occurred();
return 0;
}
int r = HMAC_Final(temp_ctx, buf, &len);
HMAC_CTX_free(temp_ctx);
if (r == 0) {
- notify_ssl_error_occurred();
+ notify_ssl_error_occurred_in(Py_STRINGIFY(HMAC_Final));
return 0;
}
return 1;
#else
ERR_clear_error();
int result = FIPS_mode();
- if (result == 0) {
+ if (result == 0 && ERR_peek_last_error()) {
// "If the library was built without support of the FIPS Object Module,
// then the function will return 0 with an error code of
// CRYPTO_R_FIPS_MODE_NOT_SUPPORTED (0x0f06d065)."
// But 0 is also a valid result value.
- unsigned long errcode = ERR_peek_last_error();
- if (errcode) {
- notify_ssl_error_occurred();
- return -1;
- }
+ notify_ssl_error_occurred_in(Py_STRINGIFY(FIPS_mode));
+ return -1;
}
return result;
#endif