From 34ad039ed0816bd7372aca95dc0d25327320154d Mon Sep 17 00:00:00 2001 From: Ray Strode Date: Sun, 15 Nov 2009 13:38:54 -0500 Subject: [PATCH] [event-loop] Sketch out cothreads implementation This is just an experiment. It would let us potentially avoid callback calling callback calling callback type code, and instead have each handler yield when it needs to wait, and resume where it left off when it's done waiting. This is an initial draft. It isn't finished being written, has only barely been tested, etc. --- src/libply/ply-event-loop.c | 352 +++++++++++++++++++++++++++++++++++- src/libply/ply-event-loop.h | 7 + 2 files changed, 355 insertions(+), 4 deletions(-) diff --git a/src/libply/ply-event-loop.c b/src/libply/ply-event-loop.c index 45c2dd59..9715e4f4 100644 --- a/src/libply/ply-event-loop.c +++ b/src/libply/ply-event-loop.c @@ -34,7 +34,9 @@ #include #include #include +#include #include +#include #include #include "ply-logger.h" @@ -49,6 +51,39 @@ #define PLY_EVENT_LOOP_NO_TIMED_WAKEUP 0.0 #endif +#ifndef PLY_EXECUTION_BATCH_STACK_SIZE +#define PLY_EXECUTION_BATCH_STACK_SIZE 8192 /* ~2 pages by default */ +#endif + +typedef struct _ply_execution_batch ply_execution_batch_t; +typedef void (* ply_execution_batch_function_t) (void *user_data, + ply_execution_batch_t *batch); + +struct _ply_execution_batch +{ + ply_event_loop_t *loop; + ucontext_t context; + void *stack; + size_t stack_size; + + ply_execution_batch_function_t function; + void *user_data; + + int reference_count; +}; + +typedef struct +{ + ply_execution_batch_t *batch; + ply_fd_watch_t *watch; +} ply_event_loop_fd_waiter_t; + +typedef struct +{ + ply_execution_batch_t *batch; + double timeout; +} ply_event_loop_timeout_waiter_t; + typedef struct { int fd; @@ -107,6 +142,9 @@ typedef struct struct _ply_event_loop { + ucontext_t context; + ply_execution_batch_t *running_batch; + int epoll_fd; int exit_code; double wakeup_time; @@ -115,6 +153,9 @@ struct _ply_event_loop ply_list_t *exit_closures; ply_list_t *timeout_watches; + ply_list_t *fd_waiters; + ply_list_t *timeout_waiters; + ply_signal_dispatcher_t *signal_dispatcher; uint32_t should_exit : 1; @@ -129,6 +170,174 @@ static ply_list_node_t * ply_signal_dispatcher_find_source_node (ply_signal_dispatcher_t *dispatcher, int signal_number); +static ply_execution_batch_t * +ply_execution_batch_new (ply_event_loop_t *loop) +{ + ply_execution_batch_t *batch; + + batch = calloc (1, sizeof (ply_execution_batch_t)); + + batch->loop = loop; + batch->stack_size = PLY_EXECUTION_BATCH_STACK_SIZE; + batch->stack = mmap (NULL, batch->stack_size, + PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + + if (batch->stack == MAP_FAILED) + { + free (batch); + return NULL; + } + + batch->reference_count++; + + return batch; +} + +static ply_execution_batch_t * +ply_execution_batch_add_reference (ply_execution_batch_t *batch) +{ + batch->reference_count++; + return batch; +} + +static void +ply_execution_batch_remove_reference (ply_execution_batch_t *batch) +{ + batch->reference_count--; + + if (batch->reference_count > 0) + return; + + munmap (batch->stack, batch->stack_size); + free (batch); +} + +static void +ply_execution_batch_suspend (ply_execution_batch_t *batch) +{ + swapcontext (&batch->context, &batch->loop->context); +} + +static void +ply_execution_batch_resume (ply_execution_batch_t *batch) +{ + swapcontext (&batch->loop->context, &batch->context); +} + +static void +dispatch_batch_function (uint32_t bottom_32_bits, + uint32_t top_32_bits) +{ + uintptr_t user_data; + ply_execution_batch_t *batch; + + switch (sizeof (void *)) + { + case 4: + user_data = (uintptr_t) bottom_32_bits; + break; + + case 8: + user_data = ((((uintptr_t) top_32_bits) << 32) | bottom_32_bits); + break; + } + + batch = (ply_execution_batch_t *) user_data; + + batch->function (batch->user_data, batch); +} + +static void +ply_execution_batch_start (ply_execution_batch_t *batch, + ply_execution_batch_function_t function, + void *user_data) +{ + uint32_t bottom_32_bits; + uint32_t top_32_bits; + + batch->function = function; + batch->user_data = user_data; + + getcontext (&batch->context); + + batch->context.uc_stack.ss_sp = batch->stack; + batch->context.uc_stack.ss_size = batch->stack_size; + batch->context.uc_link = &batch->loop->context; + + /* This icky bit of code is because makecontext takes + * ints instead of longs + */ + + switch (sizeof (void *)) + { + case 4: + bottom_32_bits = (unsigned int) (uintptr_t) batch; + top_32_bits = 0x00000000; + break; + + case 8: + bottom_32_bits = (uint32_t) (((uintptr_t) batch) & 0x00000000ffffffff); + top_32_bits = (uint32_t) ((((uintptr_t) batch) & 0xffffffff00000000) >> 32); + break; + } + + makecontext (&batch->context, (void (*) ()) dispatch_batch_function, + 2, bottom_32_bits, top_32_bits); + + ply_execution_batch_resume (batch); +} + +static void +on_fd_wait_over (ply_event_loop_fd_waiter_t *waiter) +{ + ply_list_append_data (waiter->batch->loop->fd_waiters, + waiter); +} + +void +ply_event_loop_wait_for_fd (ply_event_loop_t *loop, + int fd, + ply_event_loop_fd_status_t status) +{ + ply_event_loop_fd_waiter_t *waiter; + + assert (loop->running_batch != NULL); + + waiter = calloc (1, sizeof (ply_event_loop_fd_waiter_t)); + waiter->watch = ply_event_loop_watch_fd (loop, fd, status, + (ply_event_handler_t) + on_fd_wait_over, NULL, waiter); + waiter->batch = ply_execution_batch_add_reference (loop->running_batch); + + ply_execution_batch_suspend (loop->running_batch); +} + +static void +on_timeout_wait_over (ply_event_loop_timeout_waiter_t *waiter) +{ + ply_list_append_data (waiter->batch->loop->timeout_waiters, + waiter); +} + +void +ply_event_loop_wait_for_timeout (ply_event_loop_t *loop, + double seconds) +{ + + ply_event_loop_timeout_waiter_t *waiter; + + assert (loop->running_batch != NULL); + + waiter = calloc (1, sizeof (ply_event_loop_timeout_waiter_t)); + waiter->timeout = seconds; + ply_event_loop_watch_for_timeout (loop, seconds, + (ply_event_loop_timeout_handler_t) + on_timeout_wait_over, waiter); + waiter->batch = ply_execution_batch_add_reference (loop->running_batch); + + ply_execution_batch_suspend (loop->running_batch); +} static ply_signal_source_t * ply_signal_source_new (int signal_number, @@ -476,6 +685,9 @@ ply_event_loop_new (void) loop->exit_closures = ply_list_new (); loop->timeout_watches = ply_list_new (); + loop->fd_waiters = ply_list_new (); + loop->timeout_waiters = ply_list_new (); + loop->signal_dispatcher = ply_signal_dispatcher_new (); if (loop->signal_dispatcher == NULL) @@ -561,6 +773,9 @@ ply_event_loop_free (ply_event_loop_t *loop) ply_list_free (loop->sources); ply_list_free (loop->timeout_watches); + ply_list_free (loop->fd_waiters); + ply_list_free (loop->timeout_waiters); + close (loop->epoll_fd); free (loop); } @@ -993,6 +1208,14 @@ ply_event_loop_source_has_met_status (ply_event_source_t *source, return false; } +static void +on_status_met_batch_start (ply_event_destination_t *destination, + ply_execution_batch_t *batch) +{ + destination->status_met_handler (destination->user_data, + destination->source->fd); +} + static void ply_event_loop_handle_met_status_for_source (ply_event_loop_t *loop, ply_event_source_t *source, @@ -1015,7 +1238,15 @@ ply_event_loop_handle_met_status_for_source (ply_event_loop_t *loop, if (((destination->status & status) != 0) && (destination->status_met_handler != NULL)) - destination->status_met_handler (destination->user_data, source->fd); + { + loop->running_batch = ply_execution_batch_new (loop); + ply_execution_batch_start (loop->running_batch, + (ply_execution_batch_function_t) + on_status_met_batch_start, + destination); + ply_execution_batch_remove_reference (loop->running_batch); + loop->running_batch = NULL; + } node = next_node; } @@ -1108,6 +1339,60 @@ ply_event_loop_free_timeout_watches (ply_event_loop_t *loop) loop->wakeup_time = PLY_EVENT_LOOP_NO_TIMED_WAKEUP; } +static void +ply_event_loop_free_fd_waiters (ply_event_loop_t *loop) +{ + ply_list_node_t *node; + + assert (loop != NULL); + + node = ply_list_get_first_node (loop->fd_waiters); + while (node != NULL) + { + ply_list_node_t *next_node; + ply_event_loop_fd_waiter_t *waiter; + + waiter = (ply_event_loop_fd_waiter_t *) ply_list_node_get_data (node); + next_node = ply_list_get_next_node (loop->fd_waiters, node); + + if (waiter->batch != NULL) + ply_execution_batch_remove_reference (waiter->batch); + + free (waiter); + ply_list_remove_node (loop->fd_waiters, node); + + node = next_node; + } +} + +static void +ply_event_loop_free_timeout_waiters (ply_event_loop_t *loop) +{ + ply_list_node_t *node; + double now; + + assert (loop != NULL); + + now = ply_get_timestamp (); + node = ply_list_get_first_node (loop->timeout_watches); + while (node != NULL) + { + ply_list_node_t *next_node; + ply_event_loop_timeout_watch_t *watch; + + watch = (ply_event_loop_timeout_watch_t *) ply_list_node_get_data (node); + next_node = ply_list_get_next_node (loop->timeout_watches, node); + + free (watch); + ply_list_remove_node (loop->timeout_watches, node); + + node = next_node; + } + + assert (ply_list_get_length (loop->timeout_watches) == 0); + loop->wakeup_time = PLY_EVENT_LOOP_NO_TIMED_WAKEUP; +} + static void ply_event_loop_free_destinations_for_source (ply_event_loop_t *loop, ply_event_source_t *source) @@ -1167,6 +1452,13 @@ ply_event_loop_disconnect_source (ply_event_loop_t *loop, ply_event_source_free (source); } +static void +on_timeout_watch_batch_start (ply_event_loop_timeout_watch_t *watch, + ply_execution_batch_t *batch) +{ + watch->handler (watch->user_data, watch->loop); +} + static void ply_event_loop_handle_timeouts (ply_event_loop_t *loop) { @@ -1214,7 +1506,14 @@ ply_event_loop_handle_timeouts (ply_event_loop_t *loop) watch = (ply_event_loop_timeout_watch_t *) ply_list_node_get_data (node); next_node = ply_list_get_next_node (loop->timeout_watches, node); - watch->handler (watch->user_data, loop); + loop->running_batch = ply_execution_batch_new (watch->loop); + ply_execution_batch_start (loop->running_batch, + (ply_execution_batch_function_t) + on_timeout_watch_batch_start, + watch); + ply_execution_batch_remove_reference (loop->running_batch); + loop->running_batch = NULL; + free (watch); node = next_node; @@ -1222,6 +1521,26 @@ ply_event_loop_handle_timeouts (ply_event_loop_t *loop) ply_list_free (watches_to_dispatch); + node = ply_list_get_first_node (loop->timeout_waiters); + while (node != NULL) + { + ply_list_node_t *next_node; + ply_event_loop_timeout_waiter_t *waiter; + + waiter = (ply_event_loop_timeout_waiter_t *) ply_list_node_get_data (node); + next_node = ply_list_get_next_node (loop->timeout_waiters, node); + ply_list_remove_node (loop->timeout_waiters, node); + + loop->running_batch = ply_execution_batch_add_reference (waiter->batch); + ply_execution_batch_resume (loop->running_batch); + ply_execution_batch_remove_reference (loop->running_batch); + loop->running_batch = NULL; + + free (waiter); + + node = next_node; + } + } void @@ -1229,6 +1548,7 @@ ply_event_loop_process_pending_events (ply_event_loop_t *loop) { int number_of_received_events, i; struct epoll_event *events = NULL; + ply_list_node_t *node; assert (loop != NULL); @@ -1296,7 +1616,27 @@ ply_event_loop_process_pending_events (ply_event_loop_t *loop) ply_event_loop_handle_met_status_for_source (loop, source, status); if (loop->should_exit) - break; + return; + } + + node = ply_list_get_first_node (loop->fd_waiters); + while (node != NULL) + { + ply_list_node_t *next_node; + ply_event_loop_fd_waiter_t *waiter; + + waiter = (ply_event_loop_fd_waiter_t *) ply_list_node_get_data (node); + next_node = ply_list_get_next_node (loop->timeout_waiters, node); + ply_list_remove_node (loop->fd_waiters, node); + + loop->running_batch = ply_execution_batch_add_reference (waiter->batch); + ply_execution_batch_resume (loop->running_batch); + ply_execution_batch_remove_reference (loop->running_batch); + loop->running_batch = NULL; + + free (waiter); + + node = next_node; } } @@ -1319,6 +1659,8 @@ ply_event_loop_run (ply_event_loop_t *loop) ply_event_loop_run_exit_closures (loop); ply_event_loop_free_sources (loop); ply_event_loop_free_timeout_watches (loop); + ply_event_loop_free_fd_waiters (loop); + ply_event_loop_free_timeout_waiters (loop); loop->should_exit = false; @@ -1332,7 +1674,9 @@ static ply_event_loop_t *loop; static void alrm_signal_handler (void) { - write (1, "times up!\n", sizeof ("times up!\n") - 1); + write (1, "times up! exiting in 5 seconds\n", sizeof ("times up! exiting in 5 seconds\n") - 1); + ply_event_loop_wait_for_timeout (loop, 5.0); + write (1, "5 seconds later\n", sizeof ("5 seconds later\n") - 1); ply_event_loop_exit (loop, 0); } diff --git a/src/libply/ply-event-loop.h b/src/libply/ply-event-loop.h index d7c17630..66f30c0c 100644 --- a/src/libply/ply-event-loop.h +++ b/src/libply/ply-event-loop.h @@ -79,6 +79,13 @@ void ply_event_loop_stop_watching_for_timeout (ply_event_loop_t *loop, ply_event_loop_timeout_handler_t timeout_handler, void *user_data); +void ply_event_loop_wait_for_fd (ply_event_loop_t *loop, + int fd, + ply_event_loop_fd_status_t status); + +void ply_event_loop_wait_for_timeout (ply_event_loop_t *loop, + double seconds); + int ply_event_loop_run (ply_event_loop_t *loop); void ply_event_loop_exit (ply_event_loop_t *loop, int exit_code); -- 2.47.2