]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
add framework for network to notify workers that a socket has gone away
authorAlan T. DeKok <aland@freeradius.org>
Mon, 27 Feb 2023 19:24:30 +0000 (14:24 -0500)
committerAlan T. DeKok <aland@freeradius.org>
Mon, 27 Feb 2023 19:24:48 +0000 (14:24 -0500)
src/lib/io/control.h
src/lib/io/network.c
src/lib/io/worker.c
src/lib/io/worker.h

index 147b9c7ef411856a2495b44b727eda4ca6d82c65..c82782bbbfad0f10cf1b9db838006f90bb8cc4c3 100644 (file)
@@ -58,6 +58,7 @@ typedef       void (*fr_control_callback_t)(void *ctx, void const *data, size_t data_s
 #define FR_CONTROL_ID_WORKER   (3)
 #define FR_CONTROL_ID_DIRECTORY (4)
 #define FR_CONTROL_ID_INJECT   (5)
+#define FR_CONTROL_ID_LISTEN_DEAD (6)
 
 fr_control_t *fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq) CC_HINT(nonnull(3));
 
index a505f0fc094fd102a72cceb7ebdd65ad6382226e..1575dd8f1e17c1eb6e80c5dcd40f5f593bc95265 100644 (file)
@@ -790,12 +790,21 @@ size_t fr_network_listen_outstanding(fr_network_t *nr, fr_listen_t *li) {
  */
 static void fr_network_socket_dead(fr_network_t *nr, fr_network_socket_t *s)
 {
+       int i;
+
        if (s->dead) return;
 
        s->dead = true;
 
        fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
 
+
+       for (i = 0; i < nr->max_workers; i++) {
+               if (!nr->workers[i]) continue;
+
+               (void) fr_worker_listen_cancel(nr->workers[i]->worker, s->listen);
+       }
+
        /*
         *      If there are no outstanding packets, then we can free
         *      it now.
@@ -1181,6 +1190,8 @@ static int _network_socket_free(fr_network_socket_t *s)
        fr_network_t *nr = s->nr;
        fr_channel_data_t *cd;
 
+       fr_assert(s->outstanding == 0);
+
        fr_rb_delete(nr->sockets, s);
        fr_rb_delete(nr->sockets_by_num, s);
 
index ce89ce8a449a1f5d62ef735be025cf39868ac0c3..c3820d24fcfef1e3e22b641845babe95eae7ed04 100644 (file)
@@ -74,6 +74,8 @@ static void worker_verify(fr_worker_t *worker);
 #define CACHE_LINE_SIZE        64
 static alignas(CACHE_LINE_SIZE) atomic_uint64_t request_number = 0;
 
+static _Thread_local fr_ring_buffer_t *fr_worker_rb;
+
 /**
  *  A worker which takes packets from a master, and processes them.
  */
@@ -120,6 +122,42 @@ struct fr_worker_s {
        fr_channel_t            **channel;      //!< list of channels
 };
 
+/*
+ *     Explicitly cleanup the memory allocated to the ring buffer,
+ *     just in case valgrind complains about it.
+ */
+static int _fr_worker_rb_free(void *arg)
+{
+       return talloc_free(arg);
+}
+
+/** Initialise thread local storage
+ *
+ * @return fr_ring_buffer_t for messages
+ */
+static inline fr_ring_buffer_t *fr_worker_rb_init(void)
+{
+       fr_ring_buffer_t *rb;
+
+       rb = fr_worker_rb;
+       if (rb) return rb;
+
+       rb = fr_ring_buffer_create(NULL, FR_CONTROL_MAX_MESSAGES * FR_CONTROL_MAX_SIZE);
+       if (!rb) {
+               fr_perror("Failed allocating memory for worker ring buffer");
+               return NULL;
+       }
+
+       fr_atexit_thread_local(fr_worker_rb, _fr_worker_rb_free, rb);
+
+       return rb;
+}
+
+static inline bool is_worker_thread(fr_worker_t const *worker)
+{
+       return (pthread_equal(pthread_self(), worker->thread_id) != 0);
+}
+
 static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now);
 static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now);
 static void worker_max_request_time(UNUSED fr_event_list_t *el, UNUSED fr_time_t when, void *uctx);
@@ -270,6 +308,31 @@ static void worker_channel_callback(void *ctx, void const *data, size_t data_siz
        }
 }
 
+static int fr_worker_listen_cancel_self(UNUSED fr_worker_t *worker, UNUSED fr_listen_t const *li)
+{
+
+       return 0;
+}
+
+
+/** A socket is going away, so clean up any requests which use this socket.
+ *
+ * @param[in] ctx      the worker
+ * @param[in] data     the message
+ * @param[in] data_size        size of the data
+ * @param[in] now      the current time
+ */
+static void worker_listen_cancel_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
+{
+       fr_listen_t const       *li;
+       fr_worker_t             *worker = ctx;
+
+       fr_assert(data_size == sizeof(li));
+
+       memcpy(&li, data, sizeof(li));
+
+       (void) fr_worker_listen_cancel_self(worker, li);
+}
 
 /** Send a NAK to the network thread
  *
@@ -1272,6 +1335,11 @@ nomem:
                goto fail;
        }
 
+       if (fr_control_callback_add(worker->control, FR_CONTROL_ID_LISTEN_DEAD, worker, worker_listen_cancel_callback) < 0) {
+               fr_strerror_const_push("Failed adding callback for listeners");
+               goto fail;
+       }
+
        worker->runnable = fr_heap_talloc_alloc(worker, worker_runnable_cmp, request_t, runnable_id, 0);
        if (!worker->runnable) {
                fr_strerror_const("Failed creating runnable heap");
@@ -1458,6 +1526,23 @@ fr_channel_t *fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_
        return ch;
 }
 
+int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
+{
+       fr_ring_buffer_t *rb;
+
+       /*
+        *      Skip a bunch of work if we're already in the worker thread.
+        */
+       if (is_worker_thread(worker)) {
+               return fr_worker_listen_cancel_self(worker, li);
+       }
+
+       rb = fr_worker_rb_init();
+       if (!rb) return -1;
+
+       return fr_control_message_send(worker->control, rb, FR_CONTROL_ID_LISTEN, &li, sizeof(li));
+}
+
 #ifdef WITH_VERIFY_PTR
 /** Verify the worker data structures.
  *
index 21ad54225633e7a15d5f5ac7a6d176c9ca160197..5a8b02f2c70912155e3a5a359117dfc30c8bd6db 100644 (file)
@@ -89,6 +89,8 @@ fr_channel_t  *fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_
 
 int            fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats) CC_HINT(nonnull);
 
+int            fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li);
+
 #include <freeradius-devel/server/module.h>
 
 int            fr_worker_subrequest_add(request_t *request) CC_HINT(nonnull);