/** Scheduler specific information for coordinator threads
*/
typedef struct {
- TALLOC_CTX *ctx; //!< Our allocation ctx
- fr_event_list_t *el; //!< Event list for this coordinator.
- pthread_t pthread_id; //!< the thread of this coordinator
+ fr_thread_t thread; //!< common thread information - must be first!
+
uint32_t max_workers; //!< Maximum number of workers which will connect to this coordinator.
fr_coord_reg_t *coord_reg; //!< Coordinator registration details.
fr_coord_t *coord; //!< The coordinator data structure.
fr_sem_t *sem; //!< For inter-thread signaling.
- fr_dlist_t entry; //!< Entry in list of running coordinator threads.
} fr_schedule_coord_t;
/** Control plane message used for workers attaching / detaching to coordinators
fr_dlist_foreach(coord_threads, fr_schedule_coord_t, sc) {
if (sc->coord_reg == coord_reg) {
- if ((ret = pthread_join(sc->pthread_id, NULL)) != 0) {
+ if ((ret = pthread_join(sc->thread.pthread_id, NULL)) != 0) {
ERROR("Failed joining coordinator %s: %s", coord_reg->name, fr_syserror(ret));
} else {
DEBUG2("Coordinator %s joined (cleaned up)", coord_reg->name);
*/
static void *fr_coordinate_thread(void *arg)
{
- TALLOC_CTX *ctx;
fr_schedule_coord_t *sc = talloc_get_type_abort(arg, fr_schedule_coord_t);
fr_coord_reg_t *coord_reg = sc->coord_reg;
char coordinate_name[64];
snprintf(coordinate_name, sizeof(coordinate_name), "Coordinate %s", coord_reg->name);
- if (fr_thread_setup(&ctx, &sc->el, coordinate_name) < 0) goto fail;
- sc->ctx = ctx;
+ if (fr_thread_setup(&sc->thread, coordinate_name) < 0) goto fail;
INFO("%s - Starting", coordinate_name);
- sc->coord = fr_coord_create(ctx, sc->el, coord_reg, false, sc->max_workers);
+ sc->coord = fr_coord_create(sc->thread.ctx, sc->thread.el, coord_reg, false, sc->max_workers);
if (!sc->coord) {
PERROR("%s - Failed creating coordinator thread", coordinate_name);
goto fail;
/*
* Create all the thread specific data for the coordinator thread
*/
- if (fr_thread_instantiate(ctx, sc->el) < 0) goto fail;
+ if (fr_thread_instantiate(sc->thread.ctx, sc->thread.el) < 0) goto fail;
sem_post(sc->sem);
fr_thread_detach();
- talloc_free(ctx);
+ talloc_free(sc->thread.ctx);
return NULL;
}
if (!coord_regs) return 0;
MEM(coord_threads = talloc(NULL, fr_dlist_head_t));
- fr_dlist_init(coord_threads, fr_schedule_coord_t, entry);
+ fr_dlist_init(coord_threads, fr_schedule_coord_t, thread.entry);
fr_rb_inline_talloc_init(&coords, fr_coord_t, node, coord_cmp, NULL);
fr_dlist_foreach(coord_regs, fr_coord_reg_t, coord_reg) {
sc->max_workers = num_workers;
sc->sem = sem;
- if (fr_thread_create(&sc->pthread_id, fr_coordinate_thread, sc) < 0) {
+ if (fr_thread_create(&sc->thread.pthread_id, fr_coordinate_thread, sc) < 0) {
talloc_free(sc);
PERROR("Failed creating coordinator %s", coord_reg->name);
return -1;
/*
* Wait for all the coordinators to start.
*/
- fr_thread_wait(sem, fr_dlist_num_elements(coord_threads));
+ if (fr_thread_wait(sem, coord_threads) < 0) {
+ ERROR("Failed creating coordinator threads");
+ return -1;
+ }
/*
* Insert the coordinators in the tree
fr_rb_iter_delete_inorder(&coords, &iter);
talloc_free(coord);
}
+
+ TALLOC_FREE(coord_regs);
+ TALLOC_FREE(coord_threads);
}
/** Start coordinators in single threaded mode
#include <pthread.h>
-/**
- * Track the child thread status.
- */
-typedef enum fr_schedule_child_status_t {
- FR_CHILD_FREE = 0, //!< child is free
- FR_CHILD_INITIALIZING, //!< initialized, but not running
- FR_CHILD_RUNNING, //!< running, and in the running queue
- FR_CHILD_EXITED, //!< exited, and in the exited queue
- FR_CHILD_FAIL //!< failed, and in the exited queue
-} fr_schedule_child_status_t;
-
/** Scheduler specific information for worker threads
*
* Wraps a fr_worker_t, tracking additional information that
* the scheduler uses.
*/
typedef struct {
- TALLOC_CTX *ctx; //!< our allocation ctx
- fr_event_list_t *el; //!< our event list
- pthread_t pthread_id; //!< the thread of this worker
+ fr_thread_t thread; //!< common thread structure - must be first!
- unsigned int id; //!< a unique ID
int uses; //!< how many network threads are using it
fr_time_t cpu_time; //!< how much CPU time this worker has used
- fr_dlist_t entry; //!< our entry into the linked list of workers
-
fr_schedule_t *sc; //!< the scheduler we are running under
- fr_schedule_child_status_t status; //!< status of the worker
fr_worker_t *worker; //!< the worker data structure
} fr_schedule_worker_t;
* the scheduler uses.
*/
typedef struct {
- TALLOC_CTX *ctx; //!< our allocation ctx
- pthread_t pthread_id; //!< the thread of this network
-
- unsigned int id; //!< a unique ID
-
- fr_dlist_t entry; //!< our entry into the linked list of networks
+ fr_thread_t thread; //!< common thread structure - must be first!
fr_schedule_t *sc; //!< the scheduler we are running under
- fr_schedule_child_status_t status; //!< status of the worker
fr_network_t *nr; //!< the receive data structure
fr_timer_t *ev; //!< timer for stats_interval
*/
static void *fr_schedule_worker_thread(void *arg)
{
- TALLOC_CTX *ctx;
fr_schedule_worker_t *sw = talloc_get_type_abort(arg, fr_schedule_worker_t);
fr_schedule_t *sc = sw->sc;
- fr_schedule_child_status_t status = FR_CHILD_FAIL;
+ fr_thread_status_t status = FR_THREAD_FAIL;
fr_schedule_network_t *sn;
char worker_name[32];
- worker_id = sw->id; /* Store the current worker ID */
+ worker_id = sw->thread.id; /* Store the current worker ID */
- snprintf(worker_name, sizeof(worker_name), "Worker %d", sw->id);
+ snprintf(worker_name, sizeof(worker_name), "Worker %d", sw->thread.id);
- if (fr_thread_setup(&ctx, &sw->el, worker_name) < 0) goto fail;
- sw->ctx = ctx;
+ if (fr_thread_setup(&sw->thread, worker_name) < 0) goto fail;
INFO("%s - Starting", worker_name);
- sw->worker = fr_worker_alloc(ctx, sw->el, worker_name, sc->log, sc->lvl, &sc->config->worker);
+ sw->worker = fr_worker_alloc(sw->thread.ctx, sw->thread.el, worker_name, sc->log, sc->lvl, &sc->config->worker);
if (!sw->worker) {
PERROR("%s - Failed creating worker", worker_name);
goto fail;
CONF_SECTION *cs;
char section_name[32];
- snprintf(section_name, sizeof(section_name), "%u", sw->id);
+ snprintf(section_name, sizeof(section_name), "%u", sw->thread.id);
cs = cf_section_find(sc->cs, "worker", section_name);
if (!cs) cs = cf_section_find(sc->cs, "worker", NULL);
- if (sc->worker_thread_instantiate(sw->ctx, sw->el, cs) < 0) {
+ if (sc->worker_thread_instantiate(sw->thread.ctx, sw->thread.el, cs) < 0) {
PERROR("%s - Worker thread instantiation failed", worker_name);
goto fail;
}
}
- sw->status = FR_CHILD_RUNNING;
+ sw->thread.status = FR_THREAD_RUNNING;
/*
* Add this worker to all network threads.
sn != NULL;
sn = fr_dlist_next(&sc->networks, sn)) {
if (unlikely(fr_network_worker_add(sn->nr, sw->worker) < 0)) {
- PERROR("%s - Failed adding worker to network %u", worker_name, sn->id);
+ PERROR("%s - Failed adding worker to network %u", worker_name, sn->thread.id);
goto fail; /* FIXME - Should maybe try to undo partial adds? */
}
}
*/
fr_worker(sw->worker);
- status = FR_CHILD_EXITED;
+ status = FR_THREAD_EXITED;
fail:
- sw->status = status;
+ sw->thread.status = status;
if (sw->worker) {
fr_worker_destroy(sw->worker);
* insertions being done after the thread should have
* exited.
*/
- if (sw->el) fr_event_loop_exit(sw->el, 1);
+ if (sw->thread.el) fr_event_loop_exit(sw->thread.el, 1);
/*
* Tell the scheduler we're done.
*/
sem_post(sc->worker_sem);
- talloc_free(ctx);
+ talloc_free(sw->thread.ctx);
return NULL;
}
*/
static void *fr_schedule_network_thread(void *arg)
{
- TALLOC_CTX *ctx;
fr_schedule_network_t *sn = talloc_get_type_abort(arg, fr_schedule_network_t);
fr_schedule_t *sc = sn->sc;
- fr_schedule_child_status_t status = FR_CHILD_FAIL;
- fr_event_list_t *el;
+ fr_thread_status_t status = FR_THREAD_FAIL;
char network_name[32];
- snprintf(network_name, sizeof(network_name), "Network %d", sn->id);
+ snprintf(network_name, sizeof(network_name), "Network %d", sn->thread.id);
- if (fr_thread_setup(&ctx, &el, network_name) < 0) goto fail;
- sn->ctx = ctx;
+ if (fr_thread_setup(&sn->thread, network_name) < 0) goto fail;
INFO("%s - Starting", network_name);
- sn->nr = fr_network_create(ctx, el, network_name, sc->log, sc->lvl, &sc->config->network);
+ sn->nr = fr_network_create(sn->thread.ctx, sn->thread.el, network_name, sc->log, sc->lvl, &sc->config->network);
if (!sn->nr) {
PERROR("%s - Failed creating network", network_name);
goto fail;
}
- sn->status = FR_CHILD_RUNNING;
+ sn->thread.status = FR_THREAD_RUNNING;
/*
* Tell the originator that the thread has started.
* Print out statistics for this network IO handler.
*/
if (fr_time_delta_ispos(sc->config->stats_interval)) {
- (void) fr_timer_in(sn, el->tl, &sn->ev, sn->sc->config->stats_interval, false, stats_timer, sn);
+ (void) fr_timer_in(sn, sn->thread.el->tl, &sn->ev, sn->sc->config->stats_interval, false, stats_timer, sn);
}
/*
* Call the main event processing loop of the network
*/
fr_network(sn->nr);
- status = FR_CHILD_EXITED;
+ status = FR_THREAD_EXITED;
fail:
- sn->status = status;
+ sn->thread.status = status;
INFO("%s - Exiting", network_name);
*/
sem_post(sc->network_sem);
- talloc_free(ctx);
+ talloc_free(sn->thread.ctx);
return NULL;
}
/*
* Create the lists which hold the workers and networks.
*/
- fr_dlist_init(&sc->workers, fr_schedule_worker_t, entry);
- fr_dlist_init(&sc->networks, fr_schedule_network_t, entry);
+ fr_dlist_init(&sc->workers, fr_schedule_worker_t, thread.entry);
+ fr_dlist_init(&sc->networks, fr_schedule_network_t, thread.entry);
sc->network_sem = fr_sem_alloc();
if (!sc->network_sem) {
break;
}
- sn->id = i;
+ sn->thread.id = i;
sn->sc = sc;
- sn->status = FR_CHILD_INITIALIZING;
+ sn->thread.status = FR_THREAD_INITIALIZING;
- if (fr_thread_create(&sn->pthread_id, fr_schedule_network_thread, sn) < 0) {
+ if (fr_thread_create(&sn->thread.pthread_id, fr_schedule_network_thread, sn) < 0) {
talloc_free(sn);
PERROR("Failed creating network %u", i);
break;
* they've started, OR there's been a problem and they
* can't start.
*/
- fr_thread_wait(sc->network_sem, fr_dlist_num_elements(&sc->networks));
-
- /*
- * See if all of the networks have started.
- */
- for (sn = fr_dlist_head(&sc->networks);
- sn != NULL;
- sn = next_sn) {
- next_sn = fr_dlist_next(&sc->networks, sn);
-
- if (sn->status != FR_CHILD_RUNNING) {
- fr_dlist_remove(&sc->networks, sn);
- continue;
- }
- }
-
- /*
- * Failed to start some networks, refuse to do anything!
- */
- if ((unsigned int)fr_dlist_num_elements(&sc->networks) < sc->config->max_networks) {
+ if (fr_thread_wait(sc->network_sem, &sc->networks) < 0) {
fr_schedule_destroy(&sc);
return NULL;
}
break;
}
- sw->id = i;
+ sw->thread.id = i;
sw->sc = sc;
- sw->status = FR_CHILD_INITIALIZING;
+ sw->thread.status = FR_THREAD_INITIALIZING;
- if (fr_thread_create(&sw->pthread_id, fr_schedule_worker_thread, sw) < 0) {
+ if (fr_thread_create(&sw->thread.pthread_id, fr_schedule_worker_thread, sw) < 0) {
talloc_free(sw);
PERROR("Failed creating worker %u", i);
break;
* they've started, OR there's been a problem and they
* can't start.
*/
- fr_thread_wait(sc->worker_sem, fr_dlist_num_elements(&sc->workers));
-
- /*
- * See if all of the workers have started.
- */
- for (sw = fr_dlist_head(&sc->workers);
- sw != NULL;
- sw = next_sw) {
-
- next_sw = fr_dlist_next(&sc->workers, sw);
-
- if (sw->status != FR_CHILD_RUNNING) {
- fr_dlist_remove(&sc->workers, sw);
- continue;
- }
- }
-
- /*
- * Failed to start some workers, refuse to do anything!
- */
- if ((unsigned int)fr_dlist_num_elements(&sc->workers) < sc->config->max_workers) {
+ if (fr_thread_wait(sc->worker_sem, &sc->workers) < 0) {
fr_schedule_destroy(&sc);
return NULL;
}
*/
fr_dlist_foreach(&sc->networks, fr_schedule_network_t, sne) {
if (fr_network_exit(sne->nr) < 0) {
- PERROR("Failed signaling network %i to exit", sne->id);
+ PERROR("Failed signaling network %i to exit", sne->thread.id);
}
}
* exited before the main thread cleans up the
* module instances.
*/
- if ((ret = pthread_join(sn->pthread_id, NULL)) != 0) {
- ERROR("Failed joining network %i: %s", sn->id, fr_syserror(ret));
+ if ((ret = pthread_join(sn->thread.pthread_id, NULL)) != 0) {
+ ERROR("Failed joining network %i: %s", sn->thread.id, fr_syserror(ret));
} else {
- DEBUG2("Network %i joined (cleaned up)", sn->id);
+ DEBUG2("Network %i joined (cleaned up)", sn->thread.id);
}
}
* exited before the main thread cleans up the
* module instances.
*/
- if ((ret = pthread_join(sw->pthread_id, NULL)) != 0) {
- ERROR("Failed joining worker %i: %s", sw->id, fr_syserror(ret));
+ if ((ret = pthread_join(sw->thread.pthread_id, NULL)) != 0) {
+ ERROR("Failed joining worker %i: %s", sw->thread.id, fr_syserror(ret));
} else {
- DEBUG2("Worker %i joined (cleaned up)", sw->id);
+ DEBUG2("Worker %i joined (cleaned up)", sw->thread.id);
}
}