]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
Handle main thread connecting to coordinators
authorNick Porter <nick@portercomputing.co.uk>
Mon, 23 Mar 2026 11:08:31 +0000 (11:08 +0000)
committerNick Porter <nick@portercomputing.co.uk>
Mon, 23 Mar 2026 11:14:52 +0000 (11:14 +0000)
Which is happening now that the main thread is creating thread instance
data.

The main thread will return -1 when calling fr_schedule_worker_id().

This case is now handled by increasing the size of the coordinator
arrays to handle the extra "worker" and calculating array indexes
appropriately.

src/lib/io/coord.c
src/lib/io/coord.h
src/lib/io/coord_priv.h

index 0edbc50f3b3cac3915ea9e107a9e56e02a591d8b..c52012e10bfa7f0561594ffe0813c0a536e78a9c 100644 (file)
@@ -38,6 +38,8 @@ RCSID("$Id$")
 #define FR_CONTROL_ID_COORD_WORKER_ACK         (3)     //!< Message sent to worker to acknowledge attach / detach
 #define FR_CONTROL_ID_COORD_DATA               (4)     //!< Worker <-> coordinator message to pass data to a callback
 
+#define MIN_WORKER_ID -1       //!< The minimum value we expect as worker id.  -1 is the main thread.
+
 static fr_dlist_head_t *coord_regs = NULL;
 static fr_dlist_head_t *coord_threads = NULL;
 static fr_rb_tree_t    coords = (fr_rb_tree_t){ .num_elements = 0 };
@@ -103,13 +105,13 @@ typedef struct {
 /** Control plane message used for workers attaching / detaching to coordinators
  */
 typedef struct {
-       uint32_t                        worker;                 //!< Worker ID
+       int32_t                         worker;                 //!< Worker ID
        fr_control_t                    *worker_recv_control;   //!< Control plane to send messages to this worker
        fr_atomic_queue_t               *worker_recv_aq;        //!< Atomic queue to send data to this worker
 } fr_coord_worker_attach_msg_t;
 
 typedef struct {
-       uint32_t                        worker;                 //!< Worker ID
+       int32_t                         worker;                 //!< Worker ID
        bool                            exiting;                //!< Is the server exiting
 } fr_coord_worker_detach_msg_t;
 
@@ -209,7 +211,7 @@ static void coord_data_recv(void *ctx, void const *data, size_t data_size, fr_ti
 
        fr_assert(data_size == sizeof(cm));
        memcpy(&cm, data, data_size);
-       fr_assert(cm.worker <= coord->max_workers);
+       fr_assert((cm.worker >= MIN_WORKER_ID) && (cm.worker < (int32_t)coord->max_workers));
 
        if (unlikely(!fr_atomic_queue_pop(coord->coord_recv_aq, (void **)&cd))) return;
 
@@ -264,17 +266,19 @@ static void coord_worker_attach(void *ctx, void const *data, NDEBUG_UNUSED size_
        fr_coord_t                              *coord = talloc_get_type_abort(ctx, fr_coord_t);
        fr_coord_worker_attach_msg_t const      *msg = data;
        fr_coord_msg_t                          ack;
+       uint32_t                                thread_id;
 
        fr_assert(data_size == sizeof(fr_coord_worker_attach_msg_t));
-       fr_assert(msg->worker < coord->max_workers);
+       fr_assert((msg->worker >= MIN_WORKER_ID) && (msg->worker < (int32_t)coord->max_workers));
 
        DEBUG2("Worker %d attached to %s", msg->worker, coord->coord_reg->name);
        coord->num_workers++;
-       coord->coord_send_control[msg->worker] = msg->worker_recv_control;
-       coord->coord_send_aq[msg->worker] = msg->worker_recv_aq;
+       thread_id = msg->worker - MIN_WORKER_ID;
+       coord->coord_send_control[thread_id] = msg->worker_recv_control;
+       coord->coord_send_aq[thread_id] = msg->worker_recv_aq;
 
        ack.worker = msg->worker;
-       fr_control_message_send(coord->coord_send_control[msg->worker], coord->coord_send_rb[msg->worker],
+       fr_control_message_send(coord->coord_send_control[thread_id], coord->coord_send_rb[thread_id],
                                FR_CONTROL_ID_COORD_WORKER_ACK, &ack, sizeof(ack));
 }
 
@@ -285,19 +289,21 @@ static void coord_worker_detach(void *ctx, void const *data, NDEBUG_UNUSED size_
        fr_coord_t                              *coord = talloc_get_type_abort(ctx, fr_coord_t);
        fr_coord_worker_detach_msg_t const      *msg = data;
        fr_coord_msg_t                          ack;
+       uint32_t                                thread_id;
 
        fr_assert(data_size == sizeof(fr_coord_worker_detach_msg_t));
-       fr_assert(msg->worker < coord->max_workers);
+       fr_assert((msg->worker >= MIN_WORKER_ID) && (msg->worker < (int32_t)coord->max_workers));
+       thread_id = msg->worker - MIN_WORKER_ID;
 
        DEBUG2("Worker %d detached from %s", msg->worker, coord->coord_reg->name);
        coord->num_workers--;
 
        ack.worker = msg->worker;
-       fr_control_message_send(coord->coord_send_control[msg->worker], coord->coord_send_rb[msg->worker],
+       fr_control_message_send(coord->coord_send_control[thread_id], coord->coord_send_rb[thread_id],
                                FR_CONTROL_ID_COORD_WORKER_ACK, &ack, sizeof(fr_coord_msg_t));
 
-       coord->coord_send_control[msg->worker] = NULL;
-       coord->coord_send_aq[msg->worker] = NULL;
+       coord->coord_send_control[thread_id] = NULL;
+       coord->coord_send_aq[thread_id] = NULL;
        if (msg->exiting) coord->exiting = true;
 }
 
@@ -316,7 +322,7 @@ static fr_coord_t *fr_coord_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coor
                                   bool single_thread, uint32_t max_workers)
 {
        fr_coord_t              *coord;
-       uint32_t                i;
+       uint32_t                i, num_threads = max_workers - MIN_WORKER_ID;
        fr_coord_cb_reg_t       *cb = coord_reg->coord_cb;
        fr_atomic_queue_t       *aq;
 
@@ -368,9 +374,16 @@ static fr_coord_t *fr_coord_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coor
                goto fail;
        }
 
-       MEM(coord->coord_send_rb = talloc_array(coord, fr_ring_buffer_t *, coord->max_workers));
-       MEM(coord->coord_send_ms = talloc_array(coord, fr_message_set_t *, coord->max_workers));
-       for (i = 0; i < coord->max_workers; i++) {
+       /*
+        *      Coordinator side arrays for holding pointers to worker
+        *      specific communication structures.  The array sizes are the
+        *      number of threads expected to attach which is the number of
+        *      workers plus any additional threads, currently just the main
+        *      thread (worker id -1)
+        */
+       MEM(coord->coord_send_rb = talloc_array(coord, fr_ring_buffer_t *, num_threads));
+       MEM(coord->coord_send_ms = talloc_array(coord, fr_message_set_t *, num_threads));
+       for (i = 0; i < num_threads; i++) {
                coord->coord_send_rb[i] = fr_ring_buffer_create(coord, FR_CONTROL_MAX_MESSAGES * FR_CONTROL_MAX_SIZE);
                if (!coord->coord_send_rb[i]) goto fail;
 
@@ -378,8 +391,8 @@ static fr_coord_t *fr_coord_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coor
                                                                coord_reg->coord_send_size, true);
                if (!coord->coord_send_ms[i]) goto fail;
        }
-       MEM(coord->coord_send_control = talloc_zero_array(coord, fr_control_t *, coord->max_workers));
-       MEM(coord->coord_send_aq = talloc_zero_array(coord, fr_atomic_queue_t *, coord->max_workers));
+       MEM(coord->coord_send_control = talloc_zero_array(coord, fr_control_t *, num_threads));
+       MEM(coord->coord_send_aq = talloc_zero_array(coord, fr_atomic_queue_t *, num_threads));
 
        MEM(coord->cb_inst = talloc_zero_array(coord, fr_coord_cb_inst_t *, coord->num_callbacks));
 
@@ -613,7 +626,7 @@ static void coordinate_worker_ack(UNUSED void *ctx, NDEBUG_UNUSED void const *da
        fr_coord_msg_t const            *cm = data;
 
        fr_assert(data_size == sizeof(fr_coord_msg_t));
-       fr_assert(cm->worker == (uint32_t)fr_schedule_worker_id());
+       fr_assert(cm->worker == fr_schedule_worker_id());
 #endif
 }
 
@@ -689,26 +702,29 @@ fr_coord_worker_t *fr_coord_attach(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coor
  *     - 0 on success
  *     - <0 on failure
  */
-int fr_coord_to_worker_send(fr_coord_t *coord, uint32_t worker_id, uint32_t cb_id, fr_dbuff_t *dbuff)
+int fr_coord_to_worker_send(fr_coord_t *coord, int32_t worker_id, uint32_t cb_id, fr_dbuff_t *dbuff)
 {
        fr_coord_msg_t          cm;
        fr_coord_data_t         *cd = NULL;
+       uint32_t                thread_id = worker_id - MIN_WORKER_ID;
+
+       fr_assert((worker_id >= MIN_WORKER_ID) && (worker_id < (int32_t)coord->max_workers));
 
        cm = (fr_coord_msg_t) {
                .worker = worker_id
        };
 
-       cd = (fr_coord_data_t *)fr_message_alloc(coord->coord_send_ms[worker_id], (fr_message_t *)cd,
+       cd = (fr_coord_data_t *)fr_message_alloc(coord->coord_send_ms[thread_id], (fr_message_t *)cd,
                                                 fr_dbuff_used(dbuff));
        if (!cd) return -1;
 
        memcpy(cd->m.data, fr_dbuff_buff(dbuff), fr_dbuff_used(dbuff));
        cd->coord_cb_id = cb_id;
-       if (!fr_atomic_queue_push(coord->coord_send_aq[worker_id], cd)) {
+       if (!fr_atomic_queue_push(coord->coord_send_aq[thread_id], cd)) {
                fr_message_done((fr_message_t *)cd);
                return -1;
        }
-       return fr_control_message_send(coord->coord_send_control[worker_id], coord->coord_send_rb[worker_id],
+       return fr_control_message_send(coord->coord_send_control[thread_id], coord->coord_send_rb[thread_id],
                                       FR_CONTROL_ID_COORD_DATA,
                                       &cm, sizeof(fr_coord_msg_t));
 }
index f93afd986e308bf1c181410b61234ea58b8be6ec..20a0ecfbc45fedae2dd96831b747bcd16f976398 100644 (file)
@@ -89,7 +89,7 @@ void          fr_coords_destroy(void);
 fr_coord_worker_t      *fr_coord_attach(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coord_reg_t *coord_reg);
 int            fr_coord_detach(fr_coord_worker_t *cw, bool exiting);
 
-int            fr_coord_to_worker_send(fr_coord_t *coord, uint32_t worker_id, uint32_t cb_id, fr_dbuff_t *dbuff);
+int            fr_coord_to_worker_send(fr_coord_t *coord, int32_t worker_id, uint32_t cb_id, fr_dbuff_t *dbuff);
 
 int            fr_coord_to_worker_broadcast(fr_coord_t *coord, uint32_t cb_id, fr_dbuff_t *dbuff);
 
index 6f4485e967a55922b0eb53b0e974973ec413e3d9..a9633391a3b36ab4aa74f95cbcdb7acb0974a762 100644 (file)
@@ -30,7 +30,7 @@ RCSIDH(coord_priv_h, "$Id$")
 /** Generic control message used between workers and coordinators
  */
 typedef struct {
-       uint32_t                        worker;                 //!< Worker ID
+       int32_t                         worker;                 //!< Worker ID
 } fr_coord_msg_t;
 
 /** List / data message used between workers and coordinators