#define FR_CONTROL_ID_COORD_WORKER_ACK (3) //!< Message sent to worker to acknowledge attach / detach
#define FR_CONTROL_ID_COORD_DATA (4) //!< Worker <-> coordinator message to pass data to a callback
+#define MIN_WORKER_ID -1 //!< The minimum value we expect as worker id. -1 is the main thread.
+
static fr_dlist_head_t *coord_regs = NULL;
static fr_dlist_head_t *coord_threads = NULL;
static fr_rb_tree_t coords = (fr_rb_tree_t){ .num_elements = 0 };
/** Control plane message used for workers attaching / detaching to coordinators
*/
typedef struct {
- uint32_t worker; //!< Worker ID
+ int32_t worker; //!< Worker ID
fr_control_t *worker_recv_control; //!< Control plane to send messages to this worker
fr_atomic_queue_t *worker_recv_aq; //!< Atomic queue to send data to this worker
} fr_coord_worker_attach_msg_t;
typedef struct {
- uint32_t worker; //!< Worker ID
+ int32_t worker; //!< Worker ID
bool exiting; //!< Is the server exiting
} fr_coord_worker_detach_msg_t;
fr_assert(data_size == sizeof(cm));
memcpy(&cm, data, data_size);
- fr_assert(cm.worker <= coord->max_workers);
+ fr_assert((cm.worker >= MIN_WORKER_ID) && (cm.worker < (int32_t)coord->max_workers));
if (unlikely(!fr_atomic_queue_pop(coord->coord_recv_aq, (void **)&cd))) return;
fr_coord_t *coord = talloc_get_type_abort(ctx, fr_coord_t);
fr_coord_worker_attach_msg_t const *msg = data;
fr_coord_msg_t ack;
+ uint32_t thread_id;
fr_assert(data_size == sizeof(fr_coord_worker_attach_msg_t));
- fr_assert(msg->worker < coord->max_workers);
+ fr_assert((msg->worker >= MIN_WORKER_ID) && (msg->worker < (int32_t)coord->max_workers));
DEBUG2("Worker %d attached to %s", msg->worker, coord->coord_reg->name);
coord->num_workers++;
- coord->coord_send_control[msg->worker] = msg->worker_recv_control;
- coord->coord_send_aq[msg->worker] = msg->worker_recv_aq;
+ thread_id = msg->worker - MIN_WORKER_ID;
+ coord->coord_send_control[thread_id] = msg->worker_recv_control;
+ coord->coord_send_aq[thread_id] = msg->worker_recv_aq;
ack.worker = msg->worker;
- fr_control_message_send(coord->coord_send_control[msg->worker], coord->coord_send_rb[msg->worker],
+ fr_control_message_send(coord->coord_send_control[thread_id], coord->coord_send_rb[thread_id],
FR_CONTROL_ID_COORD_WORKER_ACK, &ack, sizeof(ack));
}
fr_coord_t *coord = talloc_get_type_abort(ctx, fr_coord_t);
fr_coord_worker_detach_msg_t const *msg = data;
fr_coord_msg_t ack;
+ uint32_t thread_id;
fr_assert(data_size == sizeof(fr_coord_worker_detach_msg_t));
- fr_assert(msg->worker < coord->max_workers);
+ fr_assert((msg->worker >= MIN_WORKER_ID) && (msg->worker < (int32_t)coord->max_workers));
+ thread_id = msg->worker - MIN_WORKER_ID;
DEBUG2("Worker %d detached from %s", msg->worker, coord->coord_reg->name);
coord->num_workers--;
ack.worker = msg->worker;
- fr_control_message_send(coord->coord_send_control[msg->worker], coord->coord_send_rb[msg->worker],
+ fr_control_message_send(coord->coord_send_control[thread_id], coord->coord_send_rb[thread_id],
FR_CONTROL_ID_COORD_WORKER_ACK, &ack, sizeof(fr_coord_msg_t));
- coord->coord_send_control[msg->worker] = NULL;
- coord->coord_send_aq[msg->worker] = NULL;
+ coord->coord_send_control[thread_id] = NULL;
+ coord->coord_send_aq[thread_id] = NULL;
if (msg->exiting) coord->exiting = true;
}
bool single_thread, uint32_t max_workers)
{
fr_coord_t *coord;
- uint32_t i;
+ uint32_t i, num_threads = max_workers - MIN_WORKER_ID;
fr_coord_cb_reg_t *cb = coord_reg->coord_cb;
fr_atomic_queue_t *aq;
goto fail;
}
- MEM(coord->coord_send_rb = talloc_array(coord, fr_ring_buffer_t *, coord->max_workers));
- MEM(coord->coord_send_ms = talloc_array(coord, fr_message_set_t *, coord->max_workers));
- for (i = 0; i < coord->max_workers; i++) {
+ /*
+ * Coordinator side arrays for holding pointers to worker
+ * specific communication structures. The array sizes are the
+ * number of threads expected to attach which is the number of
+ * workers plus any additional threads, currently just the main
+ * thread (worker id -1)
+ */
+ MEM(coord->coord_send_rb = talloc_array(coord, fr_ring_buffer_t *, num_threads));
+ MEM(coord->coord_send_ms = talloc_array(coord, fr_message_set_t *, num_threads));
+ for (i = 0; i < num_threads; i++) {
coord->coord_send_rb[i] = fr_ring_buffer_create(coord, FR_CONTROL_MAX_MESSAGES * FR_CONTROL_MAX_SIZE);
if (!coord->coord_send_rb[i]) goto fail;
coord_reg->coord_send_size, true);
if (!coord->coord_send_ms[i]) goto fail;
}
- MEM(coord->coord_send_control = talloc_zero_array(coord, fr_control_t *, coord->max_workers));
- MEM(coord->coord_send_aq = talloc_zero_array(coord, fr_atomic_queue_t *, coord->max_workers));
+ MEM(coord->coord_send_control = talloc_zero_array(coord, fr_control_t *, num_threads));
+ MEM(coord->coord_send_aq = talloc_zero_array(coord, fr_atomic_queue_t *, num_threads));
MEM(coord->cb_inst = talloc_zero_array(coord, fr_coord_cb_inst_t *, coord->num_callbacks));
fr_coord_msg_t const *cm = data;
fr_assert(data_size == sizeof(fr_coord_msg_t));
- fr_assert(cm->worker == (uint32_t)fr_schedule_worker_id());
+ fr_assert(cm->worker == fr_schedule_worker_id());
#endif
}
* - 0 on success
* - <0 on failure
*/
-int fr_coord_to_worker_send(fr_coord_t *coord, uint32_t worker_id, uint32_t cb_id, fr_dbuff_t *dbuff)
+int fr_coord_to_worker_send(fr_coord_t *coord, int32_t worker_id, uint32_t cb_id, fr_dbuff_t *dbuff)
{
fr_coord_msg_t cm;
fr_coord_data_t *cd = NULL;
+ uint32_t thread_id = worker_id - MIN_WORKER_ID;
+
+ fr_assert((worker_id >= MIN_WORKER_ID) && (worker_id < (int32_t)coord->max_workers));
cm = (fr_coord_msg_t) {
.worker = worker_id
};
- cd = (fr_coord_data_t *)fr_message_alloc(coord->coord_send_ms[worker_id], (fr_message_t *)cd,
+ cd = (fr_coord_data_t *)fr_message_alloc(coord->coord_send_ms[thread_id], (fr_message_t *)cd,
fr_dbuff_used(dbuff));
if (!cd) return -1;
memcpy(cd->m.data, fr_dbuff_buff(dbuff), fr_dbuff_used(dbuff));
cd->coord_cb_id = cb_id;
- if (!fr_atomic_queue_push(coord->coord_send_aq[worker_id], cd)) {
+ if (!fr_atomic_queue_push(coord->coord_send_aq[thread_id], cd)) {
fr_message_done((fr_message_t *)cd);
return -1;
}
- return fr_control_message_send(coord->coord_send_control[worker_id], coord->coord_send_rb[worker_id],
+ return fr_control_message_send(coord->coord_send_control[thread_id], coord->coord_send_rb[thread_id],
FR_CONTROL_ID_COORD_DATA,
&cm, sizeof(fr_coord_msg_t));
}