From f946937549d897de1e9b2db2e12372266a1e6b84 Mon Sep 17 00:00:00 2001 From: Nick Porter Date: Fri, 5 Sep 2025 08:48:55 +0100 Subject: [PATCH] Update control_test to use current APIs and use variable number of workers --- src/tests/util/control_test.c | 142 ++++++++++++++++++---------------- 1 file changed, 74 insertions(+), 68 deletions(-) diff --git a/src/tests/util/control_test.c b/src/tests/util/control_test.c index 56090248a9..807d057ed4 100644 --- a/src/tests/util/control_test.c +++ b/src/tests/util/control_test.c @@ -22,6 +22,7 @@ RCSID("$Id$") +#include #include #include #include @@ -42,27 +43,18 @@ RCSID("$Id$") #define MPRINT1 if (debug_lvl) printf #define CONTROL_MAGIC 0xabcd6809 +typedef struct { + size_t id; + fr_ring_buffer_t *rb; +} worker_args_t; + static int debug_lvl = 0; -static int kq = -1; static fr_atomic_queue_t *aq; static size_t max_messages = 10; +static size_t num_workers = 1; static int aq_size = 16; static fr_control_t *control = NULL; -static fr_ring_buffer_t *rb = NULL; - -/**********************************************************************/ -typedef struct request_s request_t; -request_t *request_alloc(UNUSED TALLOC_CTX *ctx, UNUSED request_init_args_t const *args); -void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request); - -request_t *request_alloc(UNUSED TALLOC_CTX *ctx, UNUSED request_init_args_t const *args) -{ - return NULL; -} - -void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request) -{ -} +static fr_event_list_t *el = NULL; /**********************************************************************/ @@ -70,6 +62,7 @@ static NEVER_RETURNS void usage(void) { fprintf(stderr, "usage: control_test [OPTS]\n"); fprintf(stderr, " -m Send number of messages.\n"); + fprintf(stderr, " -w Number of workers.\n"); fprintf(stderr, " -x Debugging mode.\n"); fr_exit_now(EXIT_SUCCESS); @@ -78,59 +71,53 @@ static NEVER_RETURNS void usage(void) typedef struct { uint32_t header; size_t counter; + size_t worker; } my_message_t; +typedef struct { + size_t num_messages; +} master_ctx_t; + +static void recv_control_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now) +{ + my_message_t const *m = data; + master_ctx_t *master_ctx = ctx; + + fr_assert(m->header == CONTROL_MAGIC); + fr_assert(data_size == sizeof(*m)); + + MPRINT1("Master got worker %ld message %zu, size %ld.\n", m->worker, m->counter, data_size); + master_ctx->num_messages++; +} + static void *control_master(UNUSED void *arg) { TALLOC_CTX *ctx; + master_ctx_t *master_ctx; MEM(ctx = talloc_init_const("control_master")); + master_ctx = talloc_zero(ctx, master_ctx_t); + MPRINT1("Master started.\n"); - /* - * Busy loop. We're stupid. - */ - while (true) { + fr_control_callback_add(control, FR_CONTROL_ID_CHANNEL, master_ctx, recv_control_callback); + + while (master_ctx->num_messages < (max_messages * num_workers)) { int num_events; - ssize_t data_size; - my_message_t m; - struct kevent kev; - wait_for_events: - MPRINT1("Master waiting for events.\n"); + MPRINT1("Master waiting for events (seen %ld).\n", master_ctx->num_messages); - num_events = kevent(kq, NULL, 0, &kev, 1, NULL); + num_events = fr_event_corral(el, fr_time(), true); if (num_events < 0) { fprintf(stderr, "Failed reading kevent: %s\n", fr_syserror(errno)); fr_exit_now(EXIT_FAILURE); } - - MPRINT1("Master draining the control plane.\n"); - - while (true) { - uint32_t id; - - data_size = fr_control_message_pop(aq, &id, &m, sizeof(m)); - if (data_size == 0) goto wait_for_events; - - if (data_size < 0) { - fprintf(stderr, "Failed reading control message\n"); - fr_exit_now(EXIT_FAILURE); - } - - fr_assert(data_size == sizeof(m)); - fr_assert(id == FR_CONTROL_ID_CHANNEL); - - MPRINT1("Master got message %zu.\n", m.counter); - - fr_assert(m.header == CONTROL_MAGIC); - - if (m.counter == (max_messages - 1)) goto do_exit; + if (num_events > 0) { + fr_event_service(el); } } -do_exit: MPRINT1("Master exiting.\n"); talloc_free(ctx); @@ -138,32 +125,34 @@ do_exit: return NULL; } -static void *control_worker(UNUSED void *arg) +static void *control_worker(void *arg) { size_t i; TALLOC_CTX *ctx; + worker_args_t *wa = (worker_args_t *) arg; MEM(ctx = talloc_init_const("control_worker")); - MPRINT1("\tWorker started.\n"); + MPRINT1("\tWorker %ld started.\n", wa->id); for (i = 0; i < max_messages; i++) { my_message_t m; m.header = CONTROL_MAGIC; m.counter = i; + m.worker = wa->id; retry: - if (fr_control_message_send(control, rb, FR_CONTROL_ID_CHANNEL, &m, sizeof(m)) < 0) { - MPRINT1("\tWorker retrying message %zu\n", i); + if (fr_control_message_send(control, wa->rb, FR_CONTROL_ID_CHANNEL, &m, sizeof(m)) < 0) { + MPRINT1("\tWorker %ld retrying message %zu\n", wa->id, i); usleep(10); goto retry; } - MPRINT1("\tWorker sent message %zu\n", i); + MPRINT1("\tWorker %ld sent message %zu\n", wa->id, i); } - MPRINT1("\tWorker exiting.\n"); + MPRINT1("\tWorker %ld exiting.\n", wa->id); talloc_free(ctx); @@ -177,11 +166,15 @@ int main(int argc, char *argv[]) int c; TALLOC_CTX *autofree = talloc_autofree_context(); pthread_attr_t attr; - pthread_t master_id, worker_id; + pthread_t master_id, *worker_id; + size_t i; + worker_args_t *worker_args; + + fr_atexit_global_setup(); fr_time_start(); - while ((c = getopt(argc, argv, "hm:o:tx")) != -1) switch (c) { + while ((c = getopt(argc, argv, "hm:w:x")) != -1) switch (c) { case 'x': debug_lvl++; break; @@ -190,6 +183,10 @@ int main(int argc, char *argv[]) max_messages = atoi(optarg); break; + case 'w': + num_workers = atoi(optarg); + break; + case 'h': default: usage(); @@ -200,34 +197,43 @@ int main(int argc, char *argv[]) argv += (optind - 1); #endif - kq = kqueue(); - fr_assert(kq >= 0); + main_loop_init(); + el = main_loop_event_list(); aq = fr_atomic_queue_alloc(autofree, aq_size); fr_assert(aq != NULL); - control = fr_control_create(autofree, kq, aq, 1024); + control = fr_control_create(autofree, el, aq); if (!control) { fprintf(stderr, "control_test: Failed to create control plane\n"); fr_exit_now(EXIT_FAILURE); } - rb = fr_ring_buffer_create(autofree, FR_CONTROL_MAX_MESSAGES * FR_CONTROL_MAX_SIZE); - if (!rb) fr_exit_now(EXIT_FAILURE); - /* - * Start the two threads, with the channel. + * Start the threads, with the channel. */ (void) pthread_attr_init(&attr); (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - (void) pthread_create(&worker_id, &attr, control_worker, NULL); (void) pthread_create(&master_id, &attr, control_master, NULL); + worker_id = talloc_array(autofree, pthread_t, num_workers); + worker_args = talloc_array(autofree, worker_args_t, num_workers); + for (i = 0; i < num_workers; i++) { + worker_args[i].id = i; + worker_args[i].rb = fr_ring_buffer_create(worker_args, FR_CONTROL_MAX_MESSAGES * FR_CONTROL_MAX_SIZE); + (void) pthread_create(&worker_id[i], &attr, control_worker, &worker_args[i]); + } (void) pthread_join(master_id, NULL); - (void) pthread_join(worker_id, NULL); + for (i = 0; i < num_workers; i++) { + (void) pthread_join(worker_id[i], NULL); + } - close(kq); + talloc_free(control); - fr_exit_now(EXIT_SUCCESS); + main_loop_free(); + + fr_atexit_global_trigger_all(); + + return 0; } -- 2.47.3