From: Alan T. DeKok Date: Fri, 23 Jun 2023 14:06:34 +0000 (-0400) Subject: add a dlist per channel, and associate requests with it X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8c8c4bf0cfbda02ad265e984e142bc67804d8b81;p=thirdparty%2Ffreeradius-server.git add a dlist per channel, and associate requests with it so that when a channel closes, we can force-cancel all of the requests which are associated with it. --- diff --git a/src/lib/io/listen.h b/src/lib/io/listen.h index 610d3846bb3..d8bd275a871 100644 --- a/src/lib/io/listen.h +++ b/src/lib/io/listen.h @@ -57,6 +57,8 @@ struct fr_async_s { fr_time_tracking_t tracking; fr_channel_t *channel; + fr_dlist_t entry; //!< in the list of requests associated with this channel + void *packet_ctx; fr_listen_t *listen; //!< How we received this request, //!< and how we'll send the reply. diff --git a/src/lib/io/worker.c b/src/lib/io/worker.c index 99afd0272ba..8f08644613a 100644 --- a/src/lib/io/worker.c +++ b/src/lib/io/worker.c @@ -76,6 +76,16 @@ static alignas(CACHE_LINE_SIZE) atomic_uint64_t request_number = 0; static _Thread_local fr_ring_buffer_t *fr_worker_rb; +typedef struct { + fr_channel_t *ch; + + /* + * To save time, we don't care about num_elements here. Which means that we don't + * need to cache or lookup the fr_worker_listen_t when we free a request. + */ + fr_dlist_head_t dlist; +} fr_worker_channel_t; + /** * A worker which takes packets from a master, and processes them. */ @@ -121,7 +131,7 @@ struct fr_worker_s { fr_event_timer_t const *ev_cleanup; //!< timer for max_request_time - fr_channel_t **channel; //!< list of channels + fr_worker_channel_t *channel; //!< list of channels }; typedef struct { @@ -202,6 +212,15 @@ static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t * worker_request_bootstrap(worker, cd, fr_time()); } +static void worker_requests_cancel(fr_worker_channel_t *ch) +{ + request_t *request; + + while ((request = fr_dlist_pop_head(&ch->dlist)) != NULL) { + unlang_interpret_signal(request, FR_SIGNAL_CANCEL); + } +} + static void worker_exit(fr_worker_t *worker) { worker->exiting = true; @@ -271,11 +290,13 @@ static void worker_channel_callback(void *ctx, void const *data, size_t data_siz ok = false; for (i = 0; i < worker->config.max_channels; i++) { - fr_assert(worker->channel[i] != ch); + fr_assert(worker->channel[i].ch != ch); - if (worker->channel[i] != NULL) continue; + if (worker->channel[i].ch != NULL) continue; + + worker->channel[i].ch = ch; + fr_dlist_init(&worker->channel[i].dlist, fr_async_t, entry); - worker->channel[i] = ch; DEBUG3("Received channel %p into array entry %d", ch, i); ms = fr_message_set_create(worker, worker->config.message_set_size, @@ -302,9 +323,11 @@ static void worker_channel_callback(void *ctx, void const *data, size_t data_siz * of channels. */ for (i = 0; i < worker->config.max_channels; i++) { - if (!worker->channel[i]) continue; + if (!worker->channel[i].ch) continue; + + if (worker->channel[i].ch != ch) continue; - if (worker->channel[i] != ch) continue; + worker_requests_cancel(&worker->channel[i]); ms = fr_channel_responder_uctx_get(ch); @@ -313,8 +336,11 @@ static void worker_channel_callback(void *ctx, void const *data, size_t data_siz fr_message_set_gc(ms); talloc_free(ms); - worker->channel[i] = NULL; + worker->channel[i].ch = NULL; + + fr_assert(!fr_dlist_head(&worker->channel[i].dlist)); /* we can't look at num_elements */ fr_assert(worker->num_channels > 0); + worker->num_channels--; ok = true; break; @@ -757,6 +783,7 @@ void worker_request_init(fr_worker_t *worker, request_t *request, fr_time_t now) request->async = talloc_zero(request, fr_async_t); request->async->recv_time = now; request->async->el = worker->el; + fr_dlist_entry_init(&request->async->entry); } static inline CC_HINT(always_inline) @@ -1017,9 +1044,9 @@ void fr_worker_destroy(fr_worker_t *worker) * automatically freed when our talloc context is freed. */ for (i = 0; i < worker->config.max_channels; i++) { - if (!worker->channel[i]) continue; + if (!worker->channel[i].ch) continue; - fr_channel_responder_ack_close(worker->channel[i]); + fr_channel_responder_ack_close(worker->channel[i].ch); } talloc_free(worker); @@ -1090,6 +1117,13 @@ static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t */ worker_request_time_tracking_end(worker, request, now); + /* + * Remove it from the list of requests associated with this channel. + */ + if (fr_dlist_entry_in_list(&request->async->entry)) { + fr_dlist_entry_unlink(&request->async->entry); + } + /* * These conditions are true when the server is * exiting and we're stopping all the requests. @@ -1118,6 +1152,7 @@ static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t fr_assert(!fr_heap_entry_inserted(request->runnable_id)); fr_assert(!fr_minmax_heap_entry_inserted(request->time_order_id)); + fr_assert(!fr_dlist_entry_in_list(&request->async->entry)); } /** Detached request (i.e. one generated by the interpreter with no parent) is now complete @@ -1144,6 +1179,8 @@ static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t */ (void)fr_minmax_heap_extract(worker->time_order, request); + fr_assert(!fr_dlist_entry_in_list(&request->async->entry)); + /* * Detached requests have to be freed by us * as nothing else can free them. @@ -1335,7 +1372,7 @@ nomem: CHECK_CONFIG(ring_buffer_size, (1 << 17), (1 << 20)); CHECK_CONFIG_TIME_DELTA(max_request_time, fr_time_delta_from_sec(5), fr_time_delta_from_sec(120)); - worker->channel = talloc_zero_array(worker, fr_channel_t *, worker->config.max_channels); + worker->channel = talloc_zero_array(worker, fr_worker_channel_t, worker->config.max_channels); if (!worker->channel) { talloc_free(worker); goto nomem; @@ -1611,9 +1648,9 @@ static void worker_verify(fr_worker_t *worker) (void) talloc_get_type_abort(worker->dedup, fr_rb_tree_t); for (i = 0; i < worker->config.max_channels; i++) { - if (!worker->channel[i]) continue; + if (!worker->channel[i].ch) continue; - (void) talloc_get_type_abort(worker->channel[i], fr_channel_t); + (void) talloc_get_type_abort(worker->channel[i].ch, fr_channel_t); } } #endif