]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: ev_kqueue: make the poller support speculative events
authorWilly Tarreau <w@1wt.eu>
Sun, 11 Nov 2012 19:49:49 +0000 (20:49 +0100)
committerWilly Tarreau <w@1wt.eu>
Sun, 11 Nov 2012 19:53:29 +0000 (20:53 +0100)
The poller was updated to support speculative events. We'll need this
to fully support SSL.

As an a side effect, the code has become much simpler and much more
efficient, by taking advantage of the nice kqueue API which supports
batched updates. All references to fd_sets have disappeared, and only
the fdtab[].spec_e fields are used to decide about file descriptor
state.

src/ev_kqueue.c

index e771c33a3162d63c062ee69ecac36ab67e3c0475..4ede2ecbc553326aac24c486ee7365e88058b311 100644 (file)
 #include <proto/task.h>
 
 /* private data */
-static fd_set *fd_evts[2];
 static int kqueue_fd;
 static struct kevent *kev = NULL;
 
-/* speeds up conversion of DIR_RD/DIR_WR to EVFILT* */
-static const int dir2filt[2] = { EVFILT_READ, EVFILT_WRITE };
-
-/* completes a change list for deletion */
-REGPRM3 static int kqev_del(struct kevent *kev, const int fd, const int dir)
-{
-       if (FD_ISSET(fd, fd_evts[dir])) {
-               FD_CLR(fd, fd_evts[dir]);
-               EV_SET(kev, fd, dir2filt[dir], EV_DELETE, 0, 0, NULL);
-               return 1;
-       }
-       return 0;
-}
-
 /*
- * Returns non-zero if direction <dir> is already set for <fd>.
+ * kqueue() poller
  */
-REGPRM2 static int __fd_is_set(const int fd, int dir)
-{
-       return FD_ISSET(fd, fd_evts[dir]);
-}
-
-REGPRM2 static void __fd_set(const int fd, int dir)
+REGPRM2 static void _do_poll(struct poller *p, int exp)
 {
-       /* if the value was set, do nothing */
-       if (FD_ISSET(fd, fd_evts[dir]))
-               return;
+       int status;
+       int count, fd, delta_ms;
+       struct timespec timeout;
+       int updt_idx, en, eo;
+       int changes = 0;
 
-       FD_SET(fd, fd_evts[dir]);
-       EV_SET(kev, fd, dir2filt[dir], EV_ADD, 0, 0, NULL);
-       kevent(kqueue_fd, kev, 1, NULL, 0, NULL);
-}
+       /* first, scan the update list to find changes */
+       for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
+               fd = fd_updt[updt_idx];
+               en = fdtab[fd].spec_e & 15;  /* new events */
+               eo = fdtab[fd].spec_e >> 4;  /* previous events */
+
+               if (fdtab[fd].owner && (eo ^ en)) {
+                       if ((eo ^ en) & FD_EV_POLLED_R) {
+                               /* read poll status changed */
+                               if (en & FD_EV_POLLED_R) {
+                                       EV_SET(&kev[changes], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
+                                       changes++;
+                               }
+                               else {
+                                       EV_SET(&kev[changes], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+                                       changes++;
+                               }
+                       }
 
-REGPRM2 static void __fd_clr(const int fd, int dir)
-{
-       if (!kqev_del(kev, fd, dir))
-               return;
-       kevent(kqueue_fd, kev, 1, NULL, 0, NULL);
-}
+                       if ((eo ^ en) & FD_EV_POLLED_W) {
+                               /* write poll status changed */
+                               if (en & FD_EV_POLLED_W) {
+                                       EV_SET(&kev[changes], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
+                                       changes++;
+                               }
+                               else {
+                                       EV_SET(&kev[changes], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+                                       changes++;
+                               }
+                       }
 
-REGPRM1 static void __fd_rem(int fd)
-{
-       int changes = 0;
+                       fdtab[fd].spec_e = (en << 4) + en;  /* save new events */
 
-       changes += kqev_del(&kev[changes], fd, DIR_RD);
-       changes += kqev_del(&kev[changes], fd, DIR_WR);
+                       if (!(en & FD_EV_ACTIVE_RW)) {
+                               /* This fd doesn't use any active entry anymore, we can
+                                * kill its entry.
+                                */
+                               release_spec_entry(fd);
+                       }
+                       else if ((en & ~eo) & FD_EV_ACTIVE_RW) {
+                               /* we need a new spec entry now */
+                               alloc_spec_entry(fd);
+                       }
 
+               }
+               fdtab[fd].updated = 0;
+               fdtab[fd].new = 0;
+       }
        if (changes)
                kevent(kqueue_fd, kev, changes, NULL, 0, NULL);
-}
-
-REGPRM1 static void __fd_clo(int fd)
-{
-       FD_CLR(fd, fd_evts[DIR_RD]);
-       FD_CLR(fd, fd_evts[DIR_WR]);
-}
-
-/*
- * kqueue() poller
- */
-REGPRM2 static void _do_poll(struct poller *p, int exp)
-{
-       int status;
-       int count, fd, delta_ms;
-       struct timespec timeout;
+       fd_nbupdt = 0;
 
        delta_ms        = 0;
        timeout.tv_sec  = 0;
        timeout.tv_nsec = 0;
 
-       if (!run_queue && !signal_queue_len) {
+       if (!fd_nbspec && !run_queue && !signal_queue_len) {
                if (!exp) {
                        delta_ms        = MAX_DELAY_MS;
                        timeout.tv_sec  = (MAX_DELAY_MS / 1000);
@@ -141,17 +137,29 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                        continue;
 
                fdtab[fd].ev &= FD_POLL_STICKY;
+
                if (kev[count].filter ==  EVFILT_READ) {
-                       if (FD_ISSET(fd, fd_evts[DIR_RD])) {
+                       if ((fdtab[fd].spec_e & FD_EV_STATUS_R))
                                fdtab[fd].ev |= FD_POLL_IN;
-                       }
-               else if (kev[count].filter ==  EVFILT_WRITE) {
-                       if (FD_ISSET(fd, fd_evts[DIR_WR])) {
+               }
+               else if (kev[count].filter ==  EVFILT_WRITE) {
+                       if ((fdtab[fd].spec_e & FD_EV_STATUS_W))
                                fdtab[fd].ev |= FD_POLL_OUT;
-                       }
                }
-               if (fdtab[fd].iocb && fdtab[fd].ev)
+
+               if (fdtab[fd].iocb && fdtab[fd].ev) {
+                       /* Mark the events as speculative before processing
+                        * them so that if nothing can be done we don't need
+                        * to poll again.
+                        */
+                       if (fdtab[fd].ev & (FD_POLL_IN|FD_POLL_HUP|FD_POLL_ERR))
+                               fd_ev_set(fd, DIR_RD);
+
+                       if (fdtab[fd].ev & (FD_POLL_OUT|FD_POLL_ERR))
+                               fd_ev_set(fd, DIR_WR);
+
                        fdtab[fd].iocb(fd);
+               }
        }
 }
 
@@ -162,33 +170,19 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
  */
 REGPRM1 static int _do_init(struct poller *p)
 {
-       __label__ fail_wevt, fail_revt, fail_fd;
-       int fd_set_bytes;
-
        p->private = NULL;
-       fd_set_bytes = sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE;
 
        kqueue_fd = kqueue();
        if (kqueue_fd < 0)
                goto fail_fd;
 
-       kev = (struct kevent*)calloc(1, sizeof(struct kevent) * global.tune.maxpollevents);
-
+       /* we can have up to two events per fd (*/
+       kev = (struct kevent*)calloc(1, sizeof(struct kevent) * 2 * global.maxsock);
        if (kev == NULL)
                goto fail_kev;
                
-       if ((fd_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
-               goto fail_revt;
-
-       if ((fd_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
-               goto fail_wevt;
-
        return 1;
 
- fail_wevt:
-       free(fd_evts[DIR_RD]);
- fail_revt:
-       free(kev);
  fail_kev:
        close(kqueue_fd);
        kqueue_fd = -1;
@@ -203,8 +197,6 @@ REGPRM1 static int _do_init(struct poller *p)
  */
 REGPRM1 static void _do_term(struct poller *p)
 {
-       free(fd_evts[DIR_WR]);
-       free(fd_evts[DIR_RD]);
        free(kev);
 
        if (kqueue_fd >= 0) {
@@ -272,12 +264,12 @@ static void _do_register(void)
        p->poll = _do_poll;
        p->fork = _do_fork;
 
-       p->is_set  = __fd_is_set;
-       p->set = __fd_set;
-       p->wai = __fd_set;
-       p->clr = __fd_clr;
-       p->rem = __fd_rem;
-       p->clo = __fd_clo;
+       p->is_set = NULL;
+       p->set = NULL;
+       p->wai = NULL;
+       p->clr = NULL;
+       p->rem = NULL;
+       p->clo = NULL;
 }