]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
add "want_read" and "want_write" callbacks.
authorAlan T. DeKok <aland@freeradius.org>
Sun, 9 Apr 2023 15:22:40 +0000 (11:22 -0400)
committerAlan T. DeKok <aland@freeradius.org>
Sun, 9 Apr 2023 15:22:40 +0000 (11:22 -0400)
src/lib/event.c

index 0665c099f4d62a55b6542ccfa75dbc69504d5dc2..467bc0a1a15acdf73288a817422a668964b5591e 100644 (file)
@@ -41,6 +41,12 @@ RCSID("$Id$")
 
 typedef struct fr_event_fd_t {
        int                     fd;
+
+#ifdef HAVE_KQUEUE
+       bool                    want_read;
+       bool                    want_write;
+#endif
+
        fr_event_fd_handler_t   handler;
        fr_event_fd_handler_t   write_handler;
        void                    *ctx;
@@ -66,12 +72,23 @@ struct fr_event_list_t {
        int             max_readers;
        int             max_fd;
 
+       fd_set          want_read_fds;
+       fd_set          want_write_fds;
+
        fd_set          read_fds;
        fd_set          write_fds;
 #else
        int             kq;
        struct kevent   events[FR_EV_MAX_FDS]; /* so it doesn't go on the stack every time */
 #endif
+
+       /*
+        *      Hacks for want_read / want_write
+        */
+       bool            want_read;
+       bool            want_write;
+       pthread_mutex_t mutex;  /* for want_read / want_write */
+
        fr_event_fd_t   readers[FR_EV_MAX_FDS];
 };
 
@@ -116,6 +133,7 @@ static int _event_list_free(fr_event_list_t *list)
 #ifdef HAVE_KQUEUE
        close(el->kq);
 #endif
+       pthread_mutex_destroy(&el->mutex);
 
        return 0;
 }
@@ -146,6 +164,9 @@ fr_event_list_t *fr_event_list_create(TALLOC_CTX *ctx, fr_event_status_t status)
        el->max_fd = 0;
        FD_ZERO(&el->read_fds);
        FD_ZERO(&el->write_fds);
+
+       FD_ZERO(&el->want_read_fds);
+       FD_ZERO(&el->want_write_fds);
 #else
        el->kq = kqueue();
        if (el->kq < 0) {
@@ -154,6 +175,8 @@ fr_event_list_t *fr_event_list_create(TALLOC_CTX *ctx, fr_event_status_t status)
        }
 #endif
 
+        pthread_mutex_init(&el->mutex, NULL);
+
        el->status = status;
 
        return el;
@@ -457,6 +480,48 @@ int fr_event_fd_insert(fr_event_list_t *el, int type, int fd,
        return 1;
 }
 
+void fr_event_fd_want_read(fr_event_list_t *el, int fd)
+{
+       pthread_mutex_lock(&el->mutex);
+       el->want_read = true;
+
+#ifndef HAVE_KQUEUE
+       FD_SET(fd, &el->want_read_fds);
+#else
+       for (int i = 0; i < FR_EV_MAX_FDS; i++) {
+               int j;
+
+               j = (i + fd) & (FR_EV_MAX_FDS - 1);
+
+               if (el->readers[j].fd != fd) continue;
+               el->readers[j].want_read = true;
+               break;
+       }
+#endif
+       pthread_mutex_unlock(&el->mutex);
+}
+
+void fr_event_fd_want_write(fr_event_list_t *el, int fd)
+{
+       pthread_mutex_lock(&el->mutex);
+       el->want_write = true;
+#ifndef HAVE_KQUEUE
+
+       FD_SET(fd, &el->want_write_fds);
+#else
+       for (int i = 0; i < FR_EV_MAX_FDS; i++) {
+               int j;
+
+               j = (i + fd) & (FR_EV_MAX_FDS - 1);
+
+               if (el->readers[j].fd != fd) continue;
+               el->readers[j].want_write = true;
+               break;
+       }
+#endif
+       pthread_mutex_unlock(&el->mutex);
+}
+
 int fr_event_fd_write_handler(fr_event_list_t *el, int type, int fd,
                              fr_event_fd_handler_t write_handler, void *ctx)
 {
@@ -568,6 +633,9 @@ int fr_event_fd_delete(fr_event_list_t *el, int type, int fd)
                        FD_CLR(fd, &el->read_fds);
                        FD_CLR(fd, &el->write_fds);
 
+                       FD_CLR(fd, &el->want_read_fds);
+                       FD_CLR(fd, &el->want_write_fds);
+
                        /*
                         *      @todo - update el->max_fd, too.
                         */
@@ -663,15 +731,45 @@ int fr_event_loop(fr_event_list_t *el)
                        return -1;
                }
 
+               /*
+                *      Merge the flags together for FDs where we need
+                *      to call the read or write function.
+                */
+               pthread_mutex_lock(&el->mutex);
+               if (el->want_read || el->want_write) {
+                       when.tv_sec = 0;
+                       when.tv_usec = 0;
+                       wake = &when;
+
+                       for (i = 0; i <= el->max_fd; i++) {
+                               if (FD_ISSET(i, &el->want_read_fds)) FD_SET(i, &read_fds);
+                               if (FD_ISSET(i, &el->want_write_fds)) FD_SET(i, &write_fds);
+                       }
+
+                       el->want_read = el->want_write = 0;
+
+                       FD_ZERO(&el->want_read_fds);
+                       FD_ZERO(&el->want_write_fds);
+               }
+               pthread_mutex_unlock(&el->mutex);
+
 #else  /* HAVE_KQUEUE */
 
-               if (wake) {
+               pthread_mutex_lock(&el->mutex);
+               if (el->want_read || el->want_write) {
+                       ts_wake = &ts_when;
+                       ts_when.tv_sec = 0;
+                       ts_when.tv_nsec = 0;
+
+               } else if (wake) {
                        ts_wake = &ts_when;
                        ts_when.tv_sec = when.tv_sec;
                        ts_when.tv_nsec = when.tv_usec * 1000;
+
                } else {
                        ts_wake = NULL;
                }
+               pthread_mutex_unlock(&el->mutex);
 
                rcode = kevent(el->kq, NULL, 0, el->events, FR_EV_MAX_FDS, ts_wake);
 #endif /* HAVE_KQUEUE */
@@ -725,11 +823,13 @@ int fr_event_loop(fr_event_list_t *el)
                                 *      Call the handler, which SHOULD
                                 *      delete the connection.
                                 */
+                               ef->want_read = false;
                                ef->handler(el, ef->fd, ef->ctx);
                                continue;
                        }
 
                        if (el->events[i].filter == EVFILT_WRITE) {
+                               ef->want_write = false;
                                ef->write_handler(el, ef->fd, ef->ctx);
                                continue;
                        }
@@ -739,8 +839,44 @@ int fr_event_loop(fr_event_list_t *el)
                         *      EVFILT_READ, so it must be a read
                         *      event.
                         */
+                       ef->want_read = false;
                        ef->handler(el, ef->fd, ef->ctx);
                }
+
+               /*
+                *      Loop over all of the sockets (again) if we have want_read or want_write
+                */
+               pthread_mutex_lock(&el->mutex);
+               if (el->want_read || el->want_write) {
+                       for (i = 0; i < el->num_readers; i++) {
+                               fr_event_fd_t *ef = &el->readers[i];
+
+                               if (ef->fd < 0) continue;
+
+                               /*
+                                *      Check if the socket is available for writing.
+                                */
+                               if (ef->want_write) {
+                                       ef->want_write = false;
+
+                                       pthread_mutex_unlock(&el->mutex);
+                                       ef->write_handler(el, ef->fd, ef->ctx);
+                                       pthread_mutex_lock(&el->mutex);
+                               }
+
+                               if (ef->want_read) {
+                                       ef->want_read = false;
+
+                                       pthread_mutex_unlock(&el->mutex);
+                                       ef->handler(el, ef->fd, ef->ctx);
+                                       pthread_mutex_lock(&el->mutex);
+                               }
+
+                       }
+
+                       el->want_read = el->want_write = false;
+               }
+               pthread_mutex_unlock(&el->mutex);
 #endif /* HAVE_KQUEUE */
        }