} worker_args_t;
static int debug_lvl = 0;
-static fr_atomic_queue_t *aq;
+static fr_atomic_queue_t **aq;
static size_t max_messages = 10;
static size_t num_workers = 1;
static int aq_size = 16;
-static fr_control_t *control = NULL;
+static fr_control_t **control = NULL;
static fr_event_list_t *el = NULL;
+static bool single_aq = true;
+static size_t num_aq = 1;
/**********************************************************************/
fprintf(stderr, "usage: control_test [OPTS]\n");
fprintf(stderr, " -m <messages> Send number of messages.\n");
fprintf(stderr, " -w <workers> Number of workers.\n");
+ fprintf(stderr, " -q Use per-worker atomic queues.\n");
fprintf(stderr, " -x Debugging mode.\n");
fr_exit_now(EXIT_SUCCESS);
{
TALLOC_CTX *ctx;
master_ctx_t *master_ctx;
+ size_t i;
MEM(ctx = talloc_init_const("control_master"));
MPRINT1("Master started.\n");
- fr_control_callback_add(control, FR_CONTROL_ID_CHANNEL, master_ctx, recv_control_callback);
+ for (i = 0; i < num_aq; i++) {
+ fr_control_callback_add(control[i], FR_CONTROL_ID_CHANNEL, master_ctx, recv_control_callback);
+ }
while (master_ctx->num_messages < (max_messages * num_workers)) {
int num_events;
static void *control_worker(void *arg)
{
- size_t i;
+ size_t i, aq_num;
TALLOC_CTX *ctx;
worker_args_t *wa = (worker_args_t *) arg;
MEM(ctx = talloc_init_const("control_worker"));
- MPRINT1("\tWorker %ld started.\n", wa->id);
+ aq_num = single_aq ? 0 : wa->id;
+
+ MPRINT1("\tWorker %ld started using queue %ld.\n", wa->id, aq_num);
for (i = 0; i < max_messages; i++) {
my_message_t m;
m.worker = wa->id;
retry:
- if (fr_control_message_send(control, wa->rb, FR_CONTROL_ID_CHANNEL, &m, sizeof(m)) < 0) {
+ if (fr_control_message_send(control[aq_num], wa->rb, FR_CONTROL_ID_CHANNEL, &m, sizeof(m)) < 0) {
MPRINT1("\tWorker %ld retrying message %zu\n", wa->id, i);
usleep(10);
goto retry;
fr_time_start();
- while ((c = getopt(argc, argv, "hm:w:x")) != -1) switch (c) {
+ while ((c = getopt(argc, argv, "hm:qw:x")) != -1) switch (c) {
case 'x':
debug_lvl++;
break;
max_messages = atoi(optarg);
break;
+ case 'q':
+ single_aq = false;
+ break;
+
case 'w':
num_workers = atoi(optarg);
break;
main_loop_init();
el = main_loop_event_list();
- aq = fr_atomic_queue_alloc(autofree, aq_size);
- fr_assert(aq != NULL);
+ num_aq = single_aq ? 1 : num_workers;
+ aq = talloc_array(autofree, fr_atomic_queue_t *, num_aq);
+ for (i = 0; i < num_aq; i++) {
+ aq[i] = fr_atomic_queue_alloc(aq, aq_size);
+ fr_assert(aq[i] != NULL);
+ }
- control = fr_control_create(autofree, el, aq);
- if (!control) {
- fprintf(stderr, "control_test: Failed to create control plane\n");
- fr_exit_now(EXIT_FAILURE);
+ control = talloc_array(autofree, fr_control_t *, num_aq);
+ for (i = 0; i < num_aq; i++) {
+ control[i] = fr_control_create(control, el, aq[i]);
+ if (!control[i]) {
+ fprintf(stderr, "control_test: Failed to create control plane\n");
+ fr_exit_now(EXIT_FAILURE);
+ }
}
/*