RCSID("$Id$")
+#include <freeradius-devel/server/base.h>
#include <freeradius-devel/io/control.h>
#include <freeradius-devel/util/debug.h>
#include <freeradius-devel/util/syserror.h>
#define MPRINT1 if (debug_lvl) printf
#define CONTROL_MAGIC 0xabcd6809
+typedef struct {
+ size_t id;
+ fr_ring_buffer_t *rb;
+} worker_args_t;
+
static int debug_lvl = 0;
-static int kq = -1;
static fr_atomic_queue_t *aq;
static size_t max_messages = 10;
+static size_t num_workers = 1;
static int aq_size = 16;
static fr_control_t *control = NULL;
-static fr_ring_buffer_t *rb = NULL;
-
-/**********************************************************************/
-typedef struct request_s request_t;
-request_t *request_alloc(UNUSED TALLOC_CTX *ctx, UNUSED request_init_args_t const *args);
-void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request);
-
-request_t *request_alloc(UNUSED TALLOC_CTX *ctx, UNUSED request_init_args_t const *args)
-{
- return NULL;
-}
-
-void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request)
-{
-}
+static fr_event_list_t *el = NULL;
/**********************************************************************/
{
fprintf(stderr, "usage: control_test [OPTS]\n");
fprintf(stderr, " -m <messages> Send number of messages.\n");
+ fprintf(stderr, " -w <workers> Number of workers.\n");
fprintf(stderr, " -x Debugging mode.\n");
fr_exit_now(EXIT_SUCCESS);
typedef struct {
uint32_t header;
size_t counter;
+ size_t worker;
} my_message_t;
+typedef struct {
+ size_t num_messages;
+} master_ctx_t;
+
+static void recv_control_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
+{
+ my_message_t const *m = data;
+ master_ctx_t *master_ctx = ctx;
+
+ fr_assert(m->header == CONTROL_MAGIC);
+ fr_assert(data_size == sizeof(*m));
+
+ MPRINT1("Master got worker %ld message %zu, size %ld.\n", m->worker, m->counter, data_size);
+ master_ctx->num_messages++;
+}
+
static void *control_master(UNUSED void *arg)
{
TALLOC_CTX *ctx;
+ master_ctx_t *master_ctx;
MEM(ctx = talloc_init_const("control_master"));
+ master_ctx = talloc_zero(ctx, master_ctx_t);
+
MPRINT1("Master started.\n");
- /*
- * Busy loop. We're stupid.
- */
- while (true) {
+ fr_control_callback_add(control, FR_CONTROL_ID_CHANNEL, master_ctx, recv_control_callback);
+
+ while (master_ctx->num_messages < (max_messages * num_workers)) {
int num_events;
- ssize_t data_size;
- my_message_t m;
- struct kevent kev;
- wait_for_events:
- MPRINT1("Master waiting for events.\n");
+ MPRINT1("Master waiting for events (seen %ld).\n", master_ctx->num_messages);
- num_events = kevent(kq, NULL, 0, &kev, 1, NULL);
+ num_events = fr_event_corral(el, fr_time(), true);
if (num_events < 0) {
fprintf(stderr, "Failed reading kevent: %s\n", fr_syserror(errno));
fr_exit_now(EXIT_FAILURE);
}
-
- MPRINT1("Master draining the control plane.\n");
-
- while (true) {
- uint32_t id;
-
- data_size = fr_control_message_pop(aq, &id, &m, sizeof(m));
- if (data_size == 0) goto wait_for_events;
-
- if (data_size < 0) {
- fprintf(stderr, "Failed reading control message\n");
- fr_exit_now(EXIT_FAILURE);
- }
-
- fr_assert(data_size == sizeof(m));
- fr_assert(id == FR_CONTROL_ID_CHANNEL);
-
- MPRINT1("Master got message %zu.\n", m.counter);
-
- fr_assert(m.header == CONTROL_MAGIC);
-
- if (m.counter == (max_messages - 1)) goto do_exit;
+ if (num_events > 0) {
+ fr_event_service(el);
}
}
-do_exit:
MPRINT1("Master exiting.\n");
talloc_free(ctx);
return NULL;
}
-static void *control_worker(UNUSED void *arg)
+static void *control_worker(void *arg)
{
size_t i;
TALLOC_CTX *ctx;
+ worker_args_t *wa = (worker_args_t *) arg;
MEM(ctx = talloc_init_const("control_worker"));
- MPRINT1("\tWorker started.\n");
+ MPRINT1("\tWorker %ld started.\n", wa->id);
for (i = 0; i < max_messages; i++) {
my_message_t m;
m.header = CONTROL_MAGIC;
m.counter = i;
+ m.worker = wa->id;
retry:
- if (fr_control_message_send(control, rb, FR_CONTROL_ID_CHANNEL, &m, sizeof(m)) < 0) {
- MPRINT1("\tWorker retrying message %zu\n", i);
+ if (fr_control_message_send(control, wa->rb, FR_CONTROL_ID_CHANNEL, &m, sizeof(m)) < 0) {
+ MPRINT1("\tWorker %ld retrying message %zu\n", wa->id, i);
usleep(10);
goto retry;
}
- MPRINT1("\tWorker sent message %zu\n", i);
+ MPRINT1("\tWorker %ld sent message %zu\n", wa->id, i);
}
- MPRINT1("\tWorker exiting.\n");
+ MPRINT1("\tWorker %ld exiting.\n", wa->id);
talloc_free(ctx);
int c;
TALLOC_CTX *autofree = talloc_autofree_context();
pthread_attr_t attr;
- pthread_t master_id, worker_id;
+ pthread_t master_id, *worker_id;
+ size_t i;
+ worker_args_t *worker_args;
+
+ fr_atexit_global_setup();
fr_time_start();
- while ((c = getopt(argc, argv, "hm:o:tx")) != -1) switch (c) {
+ while ((c = getopt(argc, argv, "hm:w:x")) != -1) switch (c) {
case 'x':
debug_lvl++;
break;
max_messages = atoi(optarg);
break;
+ case 'w':
+ num_workers = atoi(optarg);
+ break;
+
case 'h':
default:
usage();
argv += (optind - 1);
#endif
- kq = kqueue();
- fr_assert(kq >= 0);
+ main_loop_init();
+ el = main_loop_event_list();
aq = fr_atomic_queue_alloc(autofree, aq_size);
fr_assert(aq != NULL);
- control = fr_control_create(autofree, kq, aq, 1024);
+ control = fr_control_create(autofree, el, aq);
if (!control) {
fprintf(stderr, "control_test: Failed to create control plane\n");
fr_exit_now(EXIT_FAILURE);
}
- rb = fr_ring_buffer_create(autofree, FR_CONTROL_MAX_MESSAGES * FR_CONTROL_MAX_SIZE);
- if (!rb) fr_exit_now(EXIT_FAILURE);
-
/*
- * Start the two threads, with the channel.
+ * Start the threads, with the channel.
*/
(void) pthread_attr_init(&attr);
(void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
- (void) pthread_create(&worker_id, &attr, control_worker, NULL);
(void) pthread_create(&master_id, &attr, control_master, NULL);
+ worker_id = talloc_array(autofree, pthread_t, num_workers);
+ worker_args = talloc_array(autofree, worker_args_t, num_workers);
+ for (i = 0; i < num_workers; i++) {
+ worker_args[i].id = i;
+ worker_args[i].rb = fr_ring_buffer_create(worker_args, FR_CONTROL_MAX_MESSAGES * FR_CONTROL_MAX_SIZE);
+ (void) pthread_create(&worker_id[i], &attr, control_worker, &worker_args[i]);
+ }
(void) pthread_join(master_id, NULL);
- (void) pthread_join(worker_id, NULL);
+ for (i = 0; i < num_workers; i++) {
+ (void) pthread_join(worker_id[i], NULL);
+ }
- close(kq);
+ talloc_free(control);
- fr_exit_now(EXIT_SUCCESS);
+ main_loop_free();
+
+ fr_atexit_global_trigger_all();
+
+ return 0;
}