* @file atomic_queue.h
* @brief Thread-safe queues.
*
- * @copyright 2016 Alan DeKok <aland@freeradius.org>
+ * @copyright 2016 Alan DeKok (aland@freeradius.org)
*/
RCSIDH(atomic_queue_h, "$Id$")
# include <freeradius-devel/stdatomic.h>
#endif
-#ifdef __cplusplus
-extern "C" {
-#endif
-
/*
* Some macros to make our life easier.
*/
+#define atomic_int64_t _Atomic(int64_t)
#define atomic_uint32_t _Atomic(uint32_t)
+#define atomic_uint64_t _Atomic(uint64_t)
#define cas_incr(_store, _var) atomic_compare_exchange_strong_explicit(&_store, &_var, _var + 1, memory_order_release, memory_order_relaxed)
#define cas_decr(_store, _var) atomic_compare_exchange_strong_explicit(&_store, &_var, _var - 1, memory_order_release, memory_order_relaxed)
#define aquire(_var) atomic_load_explicit(&_var, memory_order_acquire)
#define store(_store, _var) atomic_store_explicit(&_store, _var, memory_order_release);
-typedef struct fr_atomic_queue_t fr_atomic_queue_t;
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct fr_atomic_queue_s fr_atomic_queue_t;
-fr_atomic_queue_t *fr_atomic_queue_create(TALLOC_CTX *ctx, int size);
+fr_atomic_queue_t *fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size);
+void fr_atomic_queue_free(fr_atomic_queue_t **aq);
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data);
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data);
+size_t fr_atomic_queue_size(fr_atomic_queue_t *aq);
+
+#ifdef WITH_VERIFY_PTR
+void fr_atomic_queue_verify(fr_atomic_queue_t *aq);
+#endif
#ifndef NDEBUG
void fr_atomic_queue_debug(fr_atomic_queue_t *aq, FILE *fp);
--- /dev/null
+#pragma once
+/*
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
+ */
+
+/** Functions which we wish were included in the standard talloc distribution
+ *
+ * @file src/lib/util/talloc.h
+ *
+ * @copyright 2017 The FreeRADIUS server project
+ * @copyright 2017 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
+ */
+RCSIDH(talloc_h, "$Id$")
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <ctype.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+#ifdef HAVE_WDOCUMENTATION
+DIAG_OFF(documentation)
+#endif
+#include <talloc.h>
+#ifdef HAVE_WDOCUMENTATION
+DIAG_ON(documentation)
+#endif
+
+#include <freeradius-devel/autoconf.h> /* Very easy to miss including in special builds */
+#include <freeradius-devel/build.h>
+#include <freeradius-devel/missing.h>
+
+TALLOC_CTX *talloc_aligned_array(TALLOC_CTX *ctx, void **start, size_t alignment, size_t size);
+
+#ifdef __cplusplus
+}
+#endif
tcp.c \
base64.c \
version.c \
- atomic_queue.c
+ atomic_queue.c \
+ talloc.c
SRC_CFLAGS := -D_LIBRADIUS -I$(top_builddir)/src
* @brief Thread-safe queues.
* @file atomic_queue.c
*
- * @copyright 2016 Alan DeKok <aland@freeradius.org>
+ * @copyright 2016 Alan DeKok (aland@freeradius.org)
* @copyright 2016 Alister Winfield
*/
RCSID("$Id$")
#include <freeradius-devel/autoconf.h>
#include <freeradius-devel/atomic_queue.h>
+#include <freeradius-devel/talloc.h>
-/*
- * Some macros to make our life easier.
- */
-#define atomic_int64_t _Atomic(int64_t)
+#define CACHE_LINE_SIZE 64
-typedef struct fr_atomic_queue_entry_t {
- alignas(128) void *data;
- atomic_int64_t seq;
+/** Entry in the queue
+ *
+ * @note This structure is cache line aligned for modern AMD/Intel CPUs.
+ * This is to avoid contention when the producer and consumer are executing
+ * on different CPU cores.
+ */
+typedef struct CC_HINT(packed, aligned(CACHE_LINE_SIZE)) {
+ atomic_int64_t seq; //!< Must be seq then data to ensure
+ ///< seq is 64bit aligned for 32bit address
+ ///< spaces.
+ void *data;
} fr_atomic_queue_entry_t;
-struct fr_atomic_queue_t {
- alignas(128) atomic_int64_t head;
- atomic_int64_t tail;
+/** Structure to hold the atomic queue
+ *
+ */
+struct fr_atomic_queue_s {
+ alignas(CACHE_LINE_SIZE) atomic_int64_t head; //!< Head, aligned bytes to ensure
+ ///< it's in a different cache line to tail
+ ///< to reduce memory contention.
+ atomic_int64_t tail;
+
+ size_t size;
- int size;
+ void *chunk; //!< To pass to free. The non-aligned address.
- fr_atomic_queue_entry_t entry[1];
+ alignas(CACHE_LINE_SIZE) fr_atomic_queue_entry_t entry[]; //!< The entry array, also aligned
+ ///< to ensure it's not in the same cache
+ ///< line as tail and size.
};
/** Create fixed-size atomic queue
+ *
+ * @note the queue must be freed explicitly by the ctx being freed, or by using
+ * the #fr_atomic_queue_free function.
*
* @param[in] ctx The talloc ctx to allocate the queue in.
* @param[in] size The number of entries in the queue.
* - NULL on error.
* - fr_atomic_queue_t *, a pointer to the allocated and initialized queue.
*/
-fr_atomic_queue_t *fr_atomic_queue_create(TALLOC_CTX *ctx, int size)
+fr_atomic_queue_t *fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
{
- int i;
- int64_t seq;
- fr_atomic_queue_t *aq;
+ size_t i;
+ int64_t seq;
+ fr_atomic_queue_t *aq;
+ TALLOC_CTX *chunk;
- if (size <= 0) return NULL;
+ if (size == 0) return NULL;
/*
* Allocate a contiguous blob for the header and queue.
* Since we're allocating a blob, we should also set the
* name of the data, too.
*/
- aq = talloc_size(ctx, sizeof(*aq) + (size - 1) * sizeof(aq->entry[0]));
- if (!aq) return NULL;
+ chunk = talloc_aligned_array(ctx, (void **)&aq, CACHE_LINE_SIZE,
+ sizeof(*aq) + (size) * sizeof(aq->entry[0]));
+ if (!chunk) return NULL;
+ aq->chunk = chunk;
- talloc_set_name(aq, "fr_atomic_queue_t");
+ talloc_set_name_const(chunk, "fr_atomic_queue_t");
/*
* Initialize the array. Data is NULL, and indexes are
return aq;
}
+/** Free an atomic queue if it's not freed by ctx
+ *
+ * This function is needed because the atomic queue memory
+ * must be cache line aligned.
+ */
+void fr_atomic_queue_free(fr_atomic_queue_t **aq)
+{
+ if (!*aq) return;
+
+ talloc_free((*aq)->chunk);
+ *aq = NULL;
+}
/** Push a pointer into the atomic queue
*
*/
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
{
- int64_t tail, seq;
- fr_atomic_queue_entry_t *entry;
+ int64_t tail, seq;
+ fr_atomic_queue_entry_t *entry;
if (!p_data) return false;
return true;
}
+size_t fr_atomic_queue_size(fr_atomic_queue_t *aq)
+{
+ return aq->size;
+}
+
+#ifdef WITH_VERIFY_PTR
+/** Check the talloc chunk is still valid
+ *
+ */
+void fr_atomic_queue_verify(fr_atomic_queue_t *aq)
+{
+ (void)talloc_get_type_abort(aq->chunk, fr_atomic_queue_t);
+}
+#endif
+
#ifndef NDEBUG
#if 0
-typedef struct fr_control_message_t {
+typedef struct {
int status; //!< status of this message
- size_t data_size; //!< size of the data we're sending
+ size_t data_size; //!< size of the data we're sending
int signal; //!< the signal to send
uint64_t ack; //!< or the endpoint..
*/
void fr_atomic_queue_debug(fr_atomic_queue_t *aq, FILE *fp)
{
- int i;
+ size_t i;
int64_t head, tail;
head = load(aq->head);
tail = load(aq->head);
- fprintf(fp, "AQ %p size %d, head %" PRId64 ", tail %" PRId64 "\n",
+ fprintf(fp, "AQ %p size %zu, head %" PRId64 ", tail %" PRId64 "\n",
aq, aq->size, head, tail);
for (i = 0; i < aq->size; i++) {
entry = &aq->entry[i];
- fprintf(fp, "\t[%d] = { %p, %" PRId64 " }",
+ fprintf(fp, "\t[%zu] = { %p, %" PRId64 " }",
i, entry->data, load(entry->seq));
#if 0
if (entry->data) {
}
#endif
-#endif /* HAVE_STDALIGN_H */
+#endif /* HAVE_STDALIGN_H */
--- /dev/null
+/*
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
+ */
+
+/** Functions which we wish were included in the standard talloc distribution
+ *
+ * @file src/lib/talloc.c
+ *
+ * @copyright 2017 The FreeRADIUS server project
+ * @copyright 2017 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
+ */
+RCSID("$Id$")
+
+#include <freeradius-devel/math.h>
+#include <freeradius-devel/libradius.h>
+#include <freeradius-devel/talloc.h>
+
+/** Return a page aligned talloc memory array
+ *
+ * Because we can't intercept talloc's malloc() calls, we need to do some tricks
+ * in order to get the first allocation in the array page aligned, and to limit
+ * the size of the array to a multiple of the page size.
+ *
+ * The reason for wanting a page aligned talloc array, is it allows us to
+ * mprotect() the pages that belong to the array.
+ *
+ * Talloc chunks appear to be allocated within the protected region, so this should
+ * catch frees too.
+ *
+ * @param[in] ctx to allocate array memory in.
+ * @param[out] start The first aligned address in the array.
+ * @param[in] alignment What alignment the memory chunk should have.
+ * @param[in] size How big to make the array. Will be corrected to a multiple
+ * of the page size. The actual array size will be size
+ * rounded to a multiple of the (page_size), + page_size
+ * @return
+ * - A talloc chunk on success.
+ * - NULL on failure.
+ */
+TALLOC_CTX *talloc_aligned_array(TALLOC_CTX *ctx, void **start, size_t alignment, size_t size)
+{
+ size_t rounded;
+ size_t array_size;
+ void *next;
+ TALLOC_CTX *array;
+
+ rounded = ROUND_UP(size, alignment); /* Round up to a multiple of the page size */
+ if (rounded == 0) rounded = alignment;
+
+ array_size = rounded + alignment;
+ array = talloc_array(ctx, uint8_t, array_size); /* Over allocate */
+ if (!array) {
+ fr_strerror_printf("Out of memory");
+ return NULL;
+ }
+
+ next = (void *)ROUND_UP((uintptr_t)array, alignment); /* Round up address to the next multiple */
+ *start = next;
+
+ return array;
+}
+
*
* FIXME: What to do on a SIGHUP???
*/
+DIAG_OFF(deprecated-declarations)
int thread_pool_init(CONF_SECTION *cs, bool *spawn_flag)
{
#ifndef WITH_GCD
time_t now;
#ifdef HAVE_STDATOMIC_H
int num;
+ TALLOC_CTX *autofree;
+
+ autofree = talloc_autofree_context();
#endif
now = time(NULL);
*/
for (i = 0; i < NUM_FIFOS; i++) {
#ifdef HAVE_STDATOMIC_H
- thread_pool.queue[i] = fr_atomic_queue_create(NULL, thread_pool.max_queue_size);
+ thread_pool.queue[i] = fr_atomic_queue_alloc(autofree, thread_pool.max_queue_size);
if (!thread_pool.queue[i]) {
ERROR("FATAL: Failed to set up request fifo");
return -1;
pool_initialized = true;
return 0;
}
-
+DIAG_ON(deprecated-declarations)
/*
* Stop all threads in the pool.