]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
add a dlist per channel, and associate requests with it
authorAlan T. DeKok <aland@freeradius.org>
Fri, 23 Jun 2023 14:06:34 +0000 (10:06 -0400)
committerAlan T. DeKok <aland@freeradius.org>
Fri, 23 Jun 2023 14:06:34 +0000 (10:06 -0400)
so that when a channel closes, we can force-cancel all of the
requests which are associated with it.

src/lib/io/listen.h
src/lib/io/worker.c

index 610d3846bb35b5cc620f9319eae8d1fc42b6a2e7..d8bd275a871c4c01a6b1856f686b94e54c19c210 100644 (file)
@@ -57,6 +57,8 @@ struct fr_async_s {
        fr_time_tracking_t      tracking;
        fr_channel_t            *channel;
 
+       fr_dlist_t              entry;          //!< in the list of requests associated with this channel
+
        void                    *packet_ctx;
        fr_listen_t             *listen;        //!< How we received this request,
                                                //!< and how we'll send the reply.
index 99afd0272ba1acbeff7ed007fc5d5adcd1505f74..8f08644613a84b261e4ee0fe8ab7d4aa7f522b6e 100644 (file)
@@ -76,6 +76,16 @@ static alignas(CACHE_LINE_SIZE) atomic_uint64_t request_number = 0;
 
 static _Thread_local fr_ring_buffer_t *fr_worker_rb;
 
+typedef struct {
+       fr_channel_t            *ch;
+
+       /*
+        *      To save time, we don't care about num_elements here.  Which means that we don't
+        *      need to cache or lookup the fr_worker_listen_t when we free a request.
+        */
+       fr_dlist_head_t         dlist;
+} fr_worker_channel_t;
+
 /**
  *  A worker which takes packets from a master, and processes them.
  */
@@ -121,7 +131,7 @@ struct fr_worker_s {
 
        fr_event_timer_t const  *ev_cleanup;    //!< timer for max_request_time
 
-       fr_channel_t            **channel;      //!< list of channels
+       fr_worker_channel_t     *channel;       //!< list of channels
 };
 
 typedef struct {
@@ -202,6 +212,15 @@ static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *
        worker_request_bootstrap(worker, cd, fr_time());
 }
 
+static void worker_requests_cancel(fr_worker_channel_t *ch)
+{
+       request_t *request;
+
+       while ((request = fr_dlist_pop_head(&ch->dlist)) != NULL) {
+               unlang_interpret_signal(request, FR_SIGNAL_CANCEL);
+       }
+}
+
 static void worker_exit(fr_worker_t *worker)
 {
        worker->exiting = true;
@@ -271,11 +290,13 @@ static void worker_channel_callback(void *ctx, void const *data, size_t data_siz
 
                ok = false;
                for (i = 0; i < worker->config.max_channels; i++) {
-                       fr_assert(worker->channel[i] != ch);
+                       fr_assert(worker->channel[i].ch != ch);
 
-                       if (worker->channel[i] != NULL) continue;
+                       if (worker->channel[i].ch != NULL) continue;
+
+                       worker->channel[i].ch = ch;
+                       fr_dlist_init(&worker->channel[i].dlist, fr_async_t, entry);
 
-                       worker->channel[i] = ch;
                        DEBUG3("Received channel %p into array entry %d", ch, i);
 
                        ms = fr_message_set_create(worker, worker->config.message_set_size,
@@ -302,9 +323,11 @@ static void worker_channel_callback(void *ctx, void const *data, size_t data_siz
                 *      of channels.
                 */
                for (i = 0; i < worker->config.max_channels; i++) {
-                       if (!worker->channel[i]) continue;
+                       if (!worker->channel[i].ch) continue;
+
+                       if (worker->channel[i].ch != ch) continue;
 
-                       if (worker->channel[i] != ch) continue;
+                       worker_requests_cancel(&worker->channel[i]);
 
                        ms = fr_channel_responder_uctx_get(ch);
 
@@ -313,8 +336,11 @@ static void worker_channel_callback(void *ctx, void const *data, size_t data_siz
                        fr_message_set_gc(ms);
                        talloc_free(ms);
 
-                       worker->channel[i] = NULL;
+                       worker->channel[i].ch = NULL;
+
+                       fr_assert(!fr_dlist_head(&worker->channel[i].dlist)); /* we can't look at num_elements */
                        fr_assert(worker->num_channels > 0);
+
                        worker->num_channels--;
                        ok = true;
                        break;
@@ -757,6 +783,7 @@ void worker_request_init(fr_worker_t *worker, request_t *request, fr_time_t now)
        request->async = talloc_zero(request, fr_async_t);
        request->async->recv_time = now;
        request->async->el = worker->el;
+       fr_dlist_entry_init(&request->async->entry);
 }
 
 static inline CC_HINT(always_inline)
@@ -1017,9 +1044,9 @@ void fr_worker_destroy(fr_worker_t *worker)
         *      automatically freed when our talloc context is freed.
         */
        for (i = 0; i < worker->config.max_channels; i++) {
-               if (!worker->channel[i]) continue;
+               if (!worker->channel[i].ch) continue;
 
-               fr_channel_responder_ack_close(worker->channel[i]);
+               fr_channel_responder_ack_close(worker->channel[i].ch);
        }
 
        talloc_free(worker);
@@ -1090,6 +1117,13 @@ static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t
         */
        worker_request_time_tracking_end(worker, request, now);
 
+       /*
+        *      Remove it from the list of requests associated with this channel.
+        */
+       if (fr_dlist_entry_in_list(&request->async->entry)) {
+               fr_dlist_entry_unlink(&request->async->entry);
+       }
+
        /*
         *      These conditions are true when the server is
         *      exiting and we're stopping all the requests.
@@ -1118,6 +1152,7 @@ static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t
 
        fr_assert(!fr_heap_entry_inserted(request->runnable_id));
        fr_assert(!fr_minmax_heap_entry_inserted(request->time_order_id));
+       fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
 }
 
 /** Detached request (i.e. one generated by the interpreter with no parent) is now complete
@@ -1144,6 +1179,8 @@ static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t
         */
        (void)fr_minmax_heap_extract(worker->time_order, request);
 
+       fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
+
        /*
         *      Detached requests have to be freed by us
         *      as nothing else can free them.
@@ -1335,7 +1372,7 @@ nomem:
        CHECK_CONFIG(ring_buffer_size, (1 << 17), (1 << 20));
        CHECK_CONFIG_TIME_DELTA(max_request_time, fr_time_delta_from_sec(5), fr_time_delta_from_sec(120));
 
-       worker->channel = talloc_zero_array(worker, fr_channel_t *, worker->config.max_channels);
+       worker->channel = talloc_zero_array(worker, fr_worker_channel_t, worker->config.max_channels);
        if (!worker->channel) {
                talloc_free(worker);
                goto nomem;
@@ -1611,9 +1648,9 @@ static void worker_verify(fr_worker_t *worker)
        (void) talloc_get_type_abort(worker->dedup, fr_rb_tree_t);
 
        for (i = 0; i < worker->config.max_channels; i++) {
-               if (!worker->channel[i]) continue;
+               if (!worker->channel[i].ch) continue;
 
-               (void) talloc_get_type_abort(worker->channel[i], fr_channel_t);
+               (void) talloc_get_type_abort(worker->channel[i].ch, fr_channel_t);
        }
 }
 #endif