fr_rb_node_t node; //!< Entry in the tree of coordinators.
fr_coord_cb_reg_t *callbacks; //!< Array of callbacks for worker -> coordinator messages.
uint32_t num_callbacks; //!< Number of callbacks defined.
+ fr_coord_cb_inst_t **cb_inst; //!< Array of callback instance specific data.
uint32_t max_workers; //!< Maximum number of workers we expect.
uint32_t num_workers; //!< How many workers are attached.
}
fr_dbuff_init(&dbuff, (uint8_t const *)cd->m.data, cd->m.data_size);
- coord->callbacks[cd->coord_cb_id].callback(coord, cm.worker, &dbuff, now, coord->callbacks[cd->coord_cb_id].uctx);
+ coord->callbacks[cd->coord_cb_id].callback(coord, cm.worker, &dbuff, now,
+ coord->cb_inst[cd->coord_cb_id] ?
+ coord->cb_inst[cd->coord_cb_id]->inst_data : NULL,
+ coord->callbacks[cd->coord_cb_id].uctx);
fr_message_done(&cd->m);
}
MEM(coord->coord_send_control = talloc_zero_array(coord, fr_control_t *, coord->max_workers));
MEM(coord->coord_send_aq = talloc_zero_array(coord, fr_atomic_queue_t *, coord->max_workers));
+ MEM(coord->cb_inst = talloc_zero_array(coord, fr_coord_cb_inst_t *, coord->num_callbacks));
+
+ for (i = 0; i < coord->num_callbacks; i++) {
+ if (!coord->callbacks[i].inst_create) continue;
+ coord->cb_inst[i] = coord->callbacks[i].inst_create(coord, coord, coord->el, coord->single_thread,
+ coord->callbacks[i].uctx);
+ if (!coord->cb_inst[i]) goto fail;
+ }
+
return coord;
}
*/
static void fr_coordinate(fr_coord_t *coord)
{
+ uint32_t i;
+ fr_coord_cb_inst_t *cb_inst;
+
/*
* Run until we're told to exit AND the number of
* workers has dropped to zero.
DEBUG4("Servicing event(s)");
fr_event_service(coord->el);
}
+
+ /*
+ * Run any registered instance specific event callbacks
+ */
+ for (i = 0; i < coord->num_callbacks; i++) {
+ cb_inst = coord->cb_inst[i];
+ if (cb_inst && cb_inst->event_cb) cb_inst->event_cb(coord->el, cb_inst->inst_data);
+ }
}
return;
return fr_control_message_send(cw->coord->coord_recv_control, cw->worker_send_rb,
FR_CONTROL_ID_COORD_DATA, &cm, sizeof(fr_coord_msg_t));
}
+
+/** Insert instance specific pre-event callbacks
+ */
+int fr_coord_pre_event_insert(fr_event_list_t *el)
+{
+ fr_coord_t *coord;
+ fr_rb_iter_inorder_t iter;
+ fr_coord_cb_inst_t *cb_inst;
+ uint32_t i;
+
+ if (!coord_regs) return 0;
+
+ for (coord = fr_rb_iter_init_inorder(&coords, &iter);
+ coord != NULL;
+ coord = fr_rb_iter_next_inorder(&coords, &iter)) {
+ for (i = 0; i < coord->num_callbacks; i++) {
+ cb_inst = coord->cb_inst[i];
+ if (cb_inst && cb_inst->event_pre_cb &&
+ fr_event_pre_insert(el, cb_inst->event_pre_cb, cb_inst->inst_data) < 0) {
+ return -1;
+ }
+ }
+ }
+ return 0;
+}
+
+/** Insert instance specific post-event callbacks
+ */
+int fr_coord_post_event_insert(fr_event_list_t *el)
+{
+ fr_coord_t *coord;
+ fr_rb_iter_inorder_t iter;
+ fr_coord_cb_inst_t *cb_inst;
+ uint32_t i;
+
+ if (!coord_regs) return 0;
+
+ for (coord = fr_rb_iter_init_inorder(&coords, &iter);
+ coord != NULL;
+ coord = fr_rb_iter_next_inorder(&coords, &iter)) {
+ for (i = 0; i < coord->num_callbacks; i++) {
+ cb_inst = coord->cb_inst[i];
+ if (cb_inst && cb_inst->event_post_cb &&
+ fr_event_post_insert(el, cb_inst->event_post_cb, cb_inst->inst_data) < 0) {
+ return -1;
+ }
+ }
+ }
+ return 0;
+}
typedef struct fr_coord_s fr_coord_t;
typedef struct fr_coord_worker_s fr_coord_worker_t;
+typedef struct fr_coord_cb_inst_s fr_coord_cb_inst_t;
-typedef void (*fr_coord_cb_t)(fr_coord_t *coord, uint32_t worker_id, fr_dbuff_t *dbuff, fr_time_t now, void *uctx);
+typedef void (*fr_coord_cb_t)(fr_coord_t *coord, uint32_t worker_id, fr_dbuff_t *dbuff, fr_time_t now, void *plugin_data, void *uctx);
typedef void (*fr_coord_worker_cb_t)(fr_coord_worker_t *cw, fr_dbuff_t *dbuff, fr_time_t now, void *uctx);
+typedef fr_coord_cb_inst_t *(*fr_coord_cb_inst_create_t)(TALLOC_CTX *ctx, fr_coord_t *coord, fr_event_list_t *el, bool single_thread, void *uctx);
typedef struct {
char const *name;
fr_coord_cb_t callback;
+ fr_coord_cb_inst_create_t inst_create;
void *uctx;
} fr_coord_cb_reg_t;
int fr_coord_to_worker_broadcast(fr_coord_t *coord, uint32_t cb_id, fr_dbuff_t *dbuff);
int fr_worker_to_coord_send(fr_coord_worker_t *cw, uint32_t cb_id, fr_dbuff_t *dbuff);
+
+int fr_coord_pre_event_insert(fr_event_list_t *el);
+
+int fr_coord_post_event_insert(fr_event_list_t *el);