From: Nick Porter Date: Fri, 5 Sep 2025 08:24:58 +0000 (+0100) Subject: Update control_test to allow one atomic queue per worker X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=eaaf57400bb3760b6dec392963a113d82c0889d8;p=thirdparty%2Ffreeradius-server.git Update control_test to allow one atomic queue per worker --- diff --git a/src/tests/util/control_test.c b/src/tests/util/control_test.c index 807d057ed4d..0ba5956d0b7 100644 --- a/src/tests/util/control_test.c +++ b/src/tests/util/control_test.c @@ -49,12 +49,14 @@ typedef struct { } worker_args_t; static int debug_lvl = 0; -static fr_atomic_queue_t *aq; +static fr_atomic_queue_t **aq; static size_t max_messages = 10; static size_t num_workers = 1; static int aq_size = 16; -static fr_control_t *control = NULL; +static fr_control_t **control = NULL; static fr_event_list_t *el = NULL; +static bool single_aq = true; +static size_t num_aq = 1; /**********************************************************************/ @@ -63,6 +65,7 @@ static NEVER_RETURNS void usage(void) fprintf(stderr, "usage: control_test [OPTS]\n"); fprintf(stderr, " -m Send number of messages.\n"); fprintf(stderr, " -w Number of workers.\n"); + fprintf(stderr, " -q Use per-worker atomic queues.\n"); fprintf(stderr, " -x Debugging mode.\n"); fr_exit_now(EXIT_SUCCESS); @@ -94,6 +97,7 @@ static void *control_master(UNUSED void *arg) { TALLOC_CTX *ctx; master_ctx_t *master_ctx; + size_t i; MEM(ctx = talloc_init_const("control_master")); @@ -101,7 +105,9 @@ static void *control_master(UNUSED void *arg) MPRINT1("Master started.\n"); - fr_control_callback_add(control, FR_CONTROL_ID_CHANNEL, master_ctx, recv_control_callback); + for (i = 0; i < num_aq; i++) { + fr_control_callback_add(control[i], FR_CONTROL_ID_CHANNEL, master_ctx, recv_control_callback); + } while (master_ctx->num_messages < (max_messages * num_workers)) { int num_events; @@ -127,13 +133,15 @@ static void *control_master(UNUSED void *arg) static void *control_worker(void *arg) { - size_t i; + size_t i, aq_num; TALLOC_CTX *ctx; worker_args_t *wa = (worker_args_t *) arg; MEM(ctx = talloc_init_const("control_worker")); - MPRINT1("\tWorker %ld started.\n", wa->id); + aq_num = single_aq ? 0 : wa->id; + + MPRINT1("\tWorker %ld started using queue %ld.\n", wa->id, aq_num); for (i = 0; i < max_messages; i++) { my_message_t m; @@ -143,7 +151,7 @@ static void *control_worker(void *arg) m.worker = wa->id; retry: - if (fr_control_message_send(control, wa->rb, FR_CONTROL_ID_CHANNEL, &m, sizeof(m)) < 0) { + if (fr_control_message_send(control[aq_num], wa->rb, FR_CONTROL_ID_CHANNEL, &m, sizeof(m)) < 0) { MPRINT1("\tWorker %ld retrying message %zu\n", wa->id, i); usleep(10); goto retry; @@ -174,7 +182,7 @@ int main(int argc, char *argv[]) fr_time_start(); - while ((c = getopt(argc, argv, "hm:w:x")) != -1) switch (c) { + while ((c = getopt(argc, argv, "hm:qw:x")) != -1) switch (c) { case 'x': debug_lvl++; break; @@ -183,6 +191,10 @@ int main(int argc, char *argv[]) max_messages = atoi(optarg); break; + case 'q': + single_aq = false; + break; + case 'w': num_workers = atoi(optarg); break; @@ -200,13 +212,20 @@ int main(int argc, char *argv[]) main_loop_init(); el = main_loop_event_list(); - aq = fr_atomic_queue_alloc(autofree, aq_size); - fr_assert(aq != NULL); + num_aq = single_aq ? 1 : num_workers; + aq = talloc_array(autofree, fr_atomic_queue_t *, num_aq); + for (i = 0; i < num_aq; i++) { + aq[i] = fr_atomic_queue_alloc(aq, aq_size); + fr_assert(aq[i] != NULL); + } - control = fr_control_create(autofree, el, aq); - if (!control) { - fprintf(stderr, "control_test: Failed to create control plane\n"); - fr_exit_now(EXIT_FAILURE); + control = talloc_array(autofree, fr_control_t *, num_aq); + for (i = 0; i < num_aq; i++) { + control[i] = fr_control_create(control, el, aq[i]); + if (!control[i]) { + fprintf(stderr, "control_test: Failed to create control plane\n"); + fr_exit_now(EXIT_FAILURE); + } } /*