From: Alan T. DeKok Date: Mon, 27 Feb 2023 19:24:30 +0000 (-0500) Subject: add framework for network to notify workers that a socket has gone away X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=91c29866f67533086c223166919abe186410908e;p=thirdparty%2Ffreeradius-server.git add framework for network to notify workers that a socket has gone away --- diff --git a/src/lib/io/control.h b/src/lib/io/control.h index 147b9c7ef41..c82782bbbfa 100644 --- a/src/lib/io/control.h +++ b/src/lib/io/control.h @@ -58,6 +58,7 @@ typedef void (*fr_control_callback_t)(void *ctx, void const *data, size_t data_s #define FR_CONTROL_ID_WORKER (3) #define FR_CONTROL_ID_DIRECTORY (4) #define FR_CONTROL_ID_INJECT (5) +#define FR_CONTROL_ID_LISTEN_DEAD (6) fr_control_t *fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq) CC_HINT(nonnull(3)); diff --git a/src/lib/io/network.c b/src/lib/io/network.c index a505f0fc094..1575dd8f1e1 100644 --- a/src/lib/io/network.c +++ b/src/lib/io/network.c @@ -790,12 +790,21 @@ size_t fr_network_listen_outstanding(fr_network_t *nr, fr_listen_t *li) { */ static void fr_network_socket_dead(fr_network_t *nr, fr_network_socket_t *s) { + int i; + if (s->dead) return; s->dead = true; fr_event_fd_delete(nr->el, s->listen->fd, s->filter); + + for (i = 0; i < nr->max_workers; i++) { + if (!nr->workers[i]) continue; + + (void) fr_worker_listen_cancel(nr->workers[i]->worker, s->listen); + } + /* * If there are no outstanding packets, then we can free * it now. @@ -1181,6 +1190,8 @@ static int _network_socket_free(fr_network_socket_t *s) fr_network_t *nr = s->nr; fr_channel_data_t *cd; + fr_assert(s->outstanding == 0); + fr_rb_delete(nr->sockets, s); fr_rb_delete(nr->sockets_by_num, s); diff --git a/src/lib/io/worker.c b/src/lib/io/worker.c index ce89ce8a449..c3820d24fcf 100644 --- a/src/lib/io/worker.c +++ b/src/lib/io/worker.c @@ -74,6 +74,8 @@ static void worker_verify(fr_worker_t *worker); #define CACHE_LINE_SIZE 64 static alignas(CACHE_LINE_SIZE) atomic_uint64_t request_number = 0; +static _Thread_local fr_ring_buffer_t *fr_worker_rb; + /** * A worker which takes packets from a master, and processes them. */ @@ -120,6 +122,42 @@ struct fr_worker_s { fr_channel_t **channel; //!< list of channels }; +/* + * Explicitly cleanup the memory allocated to the ring buffer, + * just in case valgrind complains about it. + */ +static int _fr_worker_rb_free(void *arg) +{ + return talloc_free(arg); +} + +/** Initialise thread local storage + * + * @return fr_ring_buffer_t for messages + */ +static inline fr_ring_buffer_t *fr_worker_rb_init(void) +{ + fr_ring_buffer_t *rb; + + rb = fr_worker_rb; + if (rb) return rb; + + rb = fr_ring_buffer_create(NULL, FR_CONTROL_MAX_MESSAGES * FR_CONTROL_MAX_SIZE); + if (!rb) { + fr_perror("Failed allocating memory for worker ring buffer"); + return NULL; + } + + fr_atexit_thread_local(fr_worker_rb, _fr_worker_rb_free, rb); + + return rb; +} + +static inline bool is_worker_thread(fr_worker_t const *worker) +{ + return (pthread_equal(pthread_self(), worker->thread_id) != 0); +} + static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now); static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now); static void worker_max_request_time(UNUSED fr_event_list_t *el, UNUSED fr_time_t when, void *uctx); @@ -270,6 +308,31 @@ static void worker_channel_callback(void *ctx, void const *data, size_t data_siz } } +static int fr_worker_listen_cancel_self(UNUSED fr_worker_t *worker, UNUSED fr_listen_t const *li) +{ + + return 0; +} + + +/** A socket is going away, so clean up any requests which use this socket. + * + * @param[in] ctx the worker + * @param[in] data the message + * @param[in] data_size size of the data + * @param[in] now the current time + */ +static void worker_listen_cancel_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now) +{ + fr_listen_t const *li; + fr_worker_t *worker = ctx; + + fr_assert(data_size == sizeof(li)); + + memcpy(&li, data, sizeof(li)); + + (void) fr_worker_listen_cancel_self(worker, li); +} /** Send a NAK to the network thread * @@ -1272,6 +1335,11 @@ nomem: goto fail; } + if (fr_control_callback_add(worker->control, FR_CONTROL_ID_LISTEN_DEAD, worker, worker_listen_cancel_callback) < 0) { + fr_strerror_const_push("Failed adding callback for listeners"); + goto fail; + } + worker->runnable = fr_heap_talloc_alloc(worker, worker_runnable_cmp, request_t, runnable_id, 0); if (!worker->runnable) { fr_strerror_const("Failed creating runnable heap"); @@ -1458,6 +1526,23 @@ fr_channel_t *fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_ return ch; } +int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li) +{ + fr_ring_buffer_t *rb; + + /* + * Skip a bunch of work if we're already in the worker thread. + */ + if (is_worker_thread(worker)) { + return fr_worker_listen_cancel_self(worker, li); + } + + rb = fr_worker_rb_init(); + if (!rb) return -1; + + return fr_control_message_send(worker->control, rb, FR_CONTROL_ID_LISTEN, &li, sizeof(li)); +} + #ifdef WITH_VERIFY_PTR /** Verify the worker data structures. * diff --git a/src/lib/io/worker.h b/src/lib/io/worker.h index 21ad5422563..5a8b02f2c70 100644 --- a/src/lib/io/worker.h +++ b/src/lib/io/worker.h @@ -89,6 +89,8 @@ fr_channel_t *fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_ int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats) CC_HINT(nonnull); +int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li); + #include int fr_worker_subrequest_add(request_t *request) CC_HINT(nonnull);