From: Alan T. DeKok Date: Sun, 9 Apr 2023 15:22:40 +0000 (-0400) Subject: add "want_read" and "want_write" callbacks. X-Git-Tag: release_3_2_3~74 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=311582bc8906285101eea3b6fafda7ca9b957d1c;p=thirdparty%2Ffreeradius-server.git add "want_read" and "want_write" callbacks. --- diff --git a/src/lib/event.c b/src/lib/event.c index 0665c099f4..467bc0a1a1 100644 --- a/src/lib/event.c +++ b/src/lib/event.c @@ -41,6 +41,12 @@ RCSID("$Id$") typedef struct fr_event_fd_t { int fd; + +#ifdef HAVE_KQUEUE + bool want_read; + bool want_write; +#endif + fr_event_fd_handler_t handler; fr_event_fd_handler_t write_handler; void *ctx; @@ -66,12 +72,23 @@ struct fr_event_list_t { int max_readers; int max_fd; + fd_set want_read_fds; + fd_set want_write_fds; + fd_set read_fds; fd_set write_fds; #else int kq; struct kevent events[FR_EV_MAX_FDS]; /* so it doesn't go on the stack every time */ #endif + + /* + * Hacks for want_read / want_write + */ + bool want_read; + bool want_write; + pthread_mutex_t mutex; /* for want_read / want_write */ + fr_event_fd_t readers[FR_EV_MAX_FDS]; }; @@ -116,6 +133,7 @@ static int _event_list_free(fr_event_list_t *list) #ifdef HAVE_KQUEUE close(el->kq); #endif + pthread_mutex_destroy(&el->mutex); return 0; } @@ -146,6 +164,9 @@ fr_event_list_t *fr_event_list_create(TALLOC_CTX *ctx, fr_event_status_t status) el->max_fd = 0; FD_ZERO(&el->read_fds); FD_ZERO(&el->write_fds); + + FD_ZERO(&el->want_read_fds); + FD_ZERO(&el->want_write_fds); #else el->kq = kqueue(); if (el->kq < 0) { @@ -154,6 +175,8 @@ fr_event_list_t *fr_event_list_create(TALLOC_CTX *ctx, fr_event_status_t status) } #endif + pthread_mutex_init(&el->mutex, NULL); + el->status = status; return el; @@ -457,6 +480,48 @@ int fr_event_fd_insert(fr_event_list_t *el, int type, int fd, return 1; } +void fr_event_fd_want_read(fr_event_list_t *el, int fd) +{ + pthread_mutex_lock(&el->mutex); + el->want_read = true; + +#ifndef HAVE_KQUEUE + FD_SET(fd, &el->want_read_fds); +#else + for (int i = 0; i < FR_EV_MAX_FDS; i++) { + int j; + + j = (i + fd) & (FR_EV_MAX_FDS - 1); + + if (el->readers[j].fd != fd) continue; + el->readers[j].want_read = true; + break; + } +#endif + pthread_mutex_unlock(&el->mutex); +} + +void fr_event_fd_want_write(fr_event_list_t *el, int fd) +{ + pthread_mutex_lock(&el->mutex); + el->want_write = true; +#ifndef HAVE_KQUEUE + + FD_SET(fd, &el->want_write_fds); +#else + for (int i = 0; i < FR_EV_MAX_FDS; i++) { + int j; + + j = (i + fd) & (FR_EV_MAX_FDS - 1); + + if (el->readers[j].fd != fd) continue; + el->readers[j].want_write = true; + break; + } +#endif + pthread_mutex_unlock(&el->mutex); +} + int fr_event_fd_write_handler(fr_event_list_t *el, int type, int fd, fr_event_fd_handler_t write_handler, void *ctx) { @@ -568,6 +633,9 @@ int fr_event_fd_delete(fr_event_list_t *el, int type, int fd) FD_CLR(fd, &el->read_fds); FD_CLR(fd, &el->write_fds); + FD_CLR(fd, &el->want_read_fds); + FD_CLR(fd, &el->want_write_fds); + /* * @todo - update el->max_fd, too. */ @@ -663,15 +731,45 @@ int fr_event_loop(fr_event_list_t *el) return -1; } + /* + * Merge the flags together for FDs where we need + * to call the read or write function. + */ + pthread_mutex_lock(&el->mutex); + if (el->want_read || el->want_write) { + when.tv_sec = 0; + when.tv_usec = 0; + wake = &when; + + for (i = 0; i <= el->max_fd; i++) { + if (FD_ISSET(i, &el->want_read_fds)) FD_SET(i, &read_fds); + if (FD_ISSET(i, &el->want_write_fds)) FD_SET(i, &write_fds); + } + + el->want_read = el->want_write = 0; + + FD_ZERO(&el->want_read_fds); + FD_ZERO(&el->want_write_fds); + } + pthread_mutex_unlock(&el->mutex); + #else /* HAVE_KQUEUE */ - if (wake) { + pthread_mutex_lock(&el->mutex); + if (el->want_read || el->want_write) { + ts_wake = &ts_when; + ts_when.tv_sec = 0; + ts_when.tv_nsec = 0; + + } else if (wake) { ts_wake = &ts_when; ts_when.tv_sec = when.tv_sec; ts_when.tv_nsec = when.tv_usec * 1000; + } else { ts_wake = NULL; } + pthread_mutex_unlock(&el->mutex); rcode = kevent(el->kq, NULL, 0, el->events, FR_EV_MAX_FDS, ts_wake); #endif /* HAVE_KQUEUE */ @@ -725,11 +823,13 @@ int fr_event_loop(fr_event_list_t *el) * Call the handler, which SHOULD * delete the connection. */ + ef->want_read = false; ef->handler(el, ef->fd, ef->ctx); continue; } if (el->events[i].filter == EVFILT_WRITE) { + ef->want_write = false; ef->write_handler(el, ef->fd, ef->ctx); continue; } @@ -739,8 +839,44 @@ int fr_event_loop(fr_event_list_t *el) * EVFILT_READ, so it must be a read * event. */ + ef->want_read = false; ef->handler(el, ef->fd, ef->ctx); } + + /* + * Loop over all of the sockets (again) if we have want_read or want_write + */ + pthread_mutex_lock(&el->mutex); + if (el->want_read || el->want_write) { + for (i = 0; i < el->num_readers; i++) { + fr_event_fd_t *ef = &el->readers[i]; + + if (ef->fd < 0) continue; + + /* + * Check if the socket is available for writing. + */ + if (ef->want_write) { + ef->want_write = false; + + pthread_mutex_unlock(&el->mutex); + ef->write_handler(el, ef->fd, ef->ctx); + pthread_mutex_lock(&el->mutex); + } + + if (ef->want_read) { + ef->want_read = false; + + pthread_mutex_unlock(&el->mutex); + ef->handler(el, ef->fd, ef->ctx); + pthread_mutex_lock(&el->mutex); + } + + } + + el->want_read = el->want_write = false; + } + pthread_mutex_unlock(&el->mutex); #endif /* HAVE_KQUEUE */ }