From: Olivier Houchard Date: Thu, 25 Jul 2019 14:00:18 +0000 (+0000) Subject: MEDIUM: pollers: Remember the state for read and write for each threads. X-Git-Tag: v2.1-dev2~255 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=53055055c517d018a36677cbae0a0ad464700fd4;p=thirdparty%2Fhaproxy.git MEDIUM: pollers: Remember the state for read and write for each threads. In the poller code, instead of just remembering if we're currently polling a fd or not, remember if we're polling it for writing and/or for reading, that way, we can avoid to modify the polling if it's already polled as needed. --- diff --git a/include/proto/fd.h b/include/proto/fd.h index 673eaae0f5..4fbb9dc1af 100644 --- a/include/proto/fd.h +++ b/include/proto/fd.h @@ -37,7 +37,11 @@ extern volatile struct fdlist update_list; -extern unsigned long *polled_mask; + +extern struct polled_mask { + unsigned long poll_recv; + unsigned long poll_send; +} *polled_mask; extern THREAD_LOCAL int *fd_updt; // FD updates list extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list diff --git a/src/ev_epoll.c b/src/ev_epoll.c index bd2d616cd1..dd3a561b23 100644 --- a/src/ev_epoll.c +++ b/src/ev_epoll.c @@ -53,7 +53,7 @@ static THREAD_LOCAL struct epoll_event ev; REGPRM1 static void __fd_clo(int fd) { if (unlikely(fdtab[fd].cloned)) { - unsigned long m = polled_mask[fd]; + unsigned long m = polled_mask[fd].poll_recv | polled_mask[fd].poll_send; int i; for (i = global.nbthread - 1; i >= 0; i--) @@ -68,13 +68,35 @@ static void _update_fd(int fd) en = fdtab[fd].state; - if (polled_mask[fd] & tid_bit) { + if ((polled_mask[fd].poll_send | polled_mask[fd].poll_recv) & tid_bit) { if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) { /* fd removed from poll list */ opcode = EPOLL_CTL_DEL; - _HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit); + if (polled_mask[fd].poll_recv & tid_bit) + _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit); + if (polled_mask[fd].poll_send & tid_bit) + _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit); } else { + if (((en & FD_EV_POLLED_R) != 0) == + ((polled_mask[fd].poll_recv & tid_bit) != 0) && + ((en & FD_EV_POLLED_W) != 0) == + ((polled_mask[fd].poll_send & tid_bit) != 0)) + return; + if (en & FD_EV_POLLED_R) { + if (!(polled_mask[fd].poll_recv & tid_bit)) + _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit); + } else { + if (polled_mask[fd].poll_recv & tid_bit) + _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit); + } + if (en & FD_EV_POLLED_W) { + if (!(polled_mask[fd].poll_send & tid_bit)) + _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit); + } else { + if (polled_mask[fd].poll_send & tid_bit) + _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit); + } /* fd status changed */ opcode = EPOLL_CTL_MOD; } @@ -82,7 +104,10 @@ static void _update_fd(int fd) else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) { /* new fd in the poll list */ opcode = EPOLL_CTL_ADD; - _HA_ATOMIC_OR(&polled_mask[fd], tid_bit); + if (en & FD_EV_POLLED_R) + _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit); + if (en & FD_EV_POLLED_W) + _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit); } else { return; @@ -188,7 +213,8 @@ REGPRM3 static void _do_poll(struct poller *p, int exp, int wake) /* FD has been migrated */ activity[tid].poll_skip++; epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev); - _HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit); + _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit); + _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit); continue; } diff --git a/src/ev_evports.c b/src/ev_evports.c index 7842bf242b..d9d1637d41 100644 --- a/src/ev_evports.c +++ b/src/ev_evports.c @@ -74,18 +74,36 @@ static void _update_fd(int fd) en = fdtab[fd].state; if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) { - if (!(polled_mask[fd] & tid_bit)) { + if (!(polled_mask[fd].poll_recv & tid_bit) && + !(polled_mask[fd].poll_send & tid_bit)) { /* fd was not watched, it's still not */ return; } /* fd totally removed from poll list */ events = 0; - _HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit); + if (polled_mask[fd].poll_recv & tid_bit) + _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit); + if (polled_mask[fd].poll_send & tid_bit) + _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit); } else { /* OK fd has to be monitored, it was either added or changed */ events = evports_state_to_events(en); - _HA_ATOMIC_OR(&polled_mask[fd], tid_bit); + if (en & FD_EV_POLLED_R) { + if (!(polled_mask[fd].poll_recv & tid_bit)) + _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit); + } else { + if (polled_mask[fd].poll_recv & tid_bit) + _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit); + } + if (en & FD_EV_POLLED_W) { + if (!(polled_mask[fd].poll_send & tid_bit)) + _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit); + } else { + if (polled_mask[fd].poll_send & tid_bit) + _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit); + } + } evports_resync_fd(fd, events); } diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c index 692437731b..d634b67280 100644 --- a/src/ev_kqueue.c +++ b/src/ev_kqueue.c @@ -44,29 +44,44 @@ static int _update_fd(int fd, int start) en = fdtab[fd].state; if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) { - if (!(polled_mask[fd] & tid_bit)) { + if (!(polled_mask[fd].poll_recv & tid_bit) && + !(polled_mask[fd].poll_send & tid_bit)) { /* fd was not watched, it's still not */ return changes; } /* fd totally removed from poll list */ EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - _HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit); + if (polled_mask[fd].poll_recv & tid_bit) + _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit); + if (polled_mask[fd].poll_send & tid_bit) + _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit); } else { /* OK fd has to be monitored, it was either added or changed */ - if (en & FD_EV_POLLED_R) - EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL); - else if (polled_mask[fd] & tid_bit) + if (en & FD_EV_POLLED_R) { + if (!(polled_mask[fd].poll_recv & tid_bit)) { + EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL); + _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit); + } + } + else if (polled_mask[fd].poll_recv & tid_bit) { EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit); + } - if (en & FD_EV_POLLED_W) - EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); - else if (polled_mask[fd] & tid_bit) + if (en & FD_EV_POLLED_W) { + if (!(polled_mask[fd].poll_send & tid_bit)) { + EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); + _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit); + } + } + else if (polled_mask[fd].poll_send & tid_bit) { EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit); + } - _HA_ATOMIC_OR(&polled_mask[fd], tid_bit); } return changes; } diff --git a/src/ev_poll.c b/src/ev_poll.c index b349c555f6..d4a1351a2b 100644 --- a/src/ev_poll.c +++ b/src/ev_poll.c @@ -58,28 +58,38 @@ static void _update_fd(int fd, int *max_add_fd) * takes it for every other one. */ if (!(en & FD_EV_POLLED_RW)) { - if (!polled_mask[fd]) { + if (!(polled_mask[fd].poll_recv | polled_mask[fd].poll_send)) { /* fd was not watched, it's still not */ return; } /* fd totally removed from poll list */ hap_fd_clr(fd, fd_evts[DIR_RD]); hap_fd_clr(fd, fd_evts[DIR_WR]); - _HA_ATOMIC_AND(&polled_mask[fd], 0); + _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, 0); + _HA_ATOMIC_AND(&polled_mask[fd].poll_send, 0); } else { /* OK fd has to be monitored, it was either added or changed */ - if (!(en & FD_EV_POLLED_R)) + if (!(en & FD_EV_POLLED_R)) { hap_fd_clr(fd, fd_evts[DIR_RD]); - else + if (polled_mask[fd].poll_recv & tid_bit) + _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit); + } else { hap_fd_set(fd, fd_evts[DIR_RD]); + if (!(polled_mask[fd].poll_recv & tid_bit)) + _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit); + } - if (!(en & FD_EV_POLLED_W)) + if (!(en & FD_EV_POLLED_W)) { hap_fd_clr(fd, fd_evts[DIR_WR]); - else + if (polled_mask[fd].poll_send & tid_bit) + _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit); + }else { hap_fd_set(fd, fd_evts[DIR_WR]); + if (!(polled_mask[fd].poll_send & tid_bit)) + _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit); + } - _HA_ATOMIC_OR(&polled_mask[fd], tid_bit); if (fd > *max_add_fd) *max_add_fd = fd; } diff --git a/src/ev_select.c b/src/ev_select.c index be88bc2bf4..f2a2acfa05 100644 --- a/src/ev_select.c +++ b/src/ev_select.c @@ -49,28 +49,38 @@ static void _update_fd(int fd, int *max_add_fd) * takes it for every other one. */ if (!(en & FD_EV_POLLED_RW)) { - if (!polled_mask[fd]) { + if (!(polled_mask[fd].poll_recv | polled_mask[fd].poll_send)) { /* fd was not watched, it's still not */ return; } /* fd totally removed from poll list */ hap_fd_clr(fd, fd_evts[DIR_RD]); hap_fd_clr(fd, fd_evts[DIR_WR]); - _HA_ATOMIC_AND(&polled_mask[fd], 0); + _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, 0); + _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, 0); } else { /* OK fd has to be monitored, it was either added or changed */ - if (!(en & FD_EV_POLLED_R)) + if (!(en & FD_EV_POLLED_R)) { hap_fd_clr(fd, fd_evts[DIR_RD]); - else + if (polled_mask[fd].poll_recv & tid_bit) + _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit); + } else { hap_fd_set(fd, fd_evts[DIR_RD]); + if (!(polled_mask[fd].poll_recv & tid_bit)) + _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit); + } - if (!(en & FD_EV_POLLED_W)) + if (!(en & FD_EV_POLLED_W)) { hap_fd_clr(fd, fd_evts[DIR_WR]); - else + if (polled_mask[fd].poll_send & tid_bit) + _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit); + } else { hap_fd_set(fd, fd_evts[DIR_WR]); + if (!(polled_mask[fd].poll_send & tid_bit)) + _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit); + } - _HA_ATOMIC_OR(&polled_mask[fd], tid_bit); if (fd > *max_add_fd) *max_add_fd = fd; } diff --git a/src/fd.c b/src/fd.c index 71df46e05e..a1a457802e 100644 --- a/src/fd.c +++ b/src/fd.c @@ -122,7 +122,7 @@ #include struct fdtab *fdtab = NULL; /* array of all the file descriptors */ -unsigned long *polled_mask = NULL; /* Array for the polled_mask of each fd */ +struct polled_mask *polled_mask = NULL; /* Array for the polled_mask of each fd */ struct fdinfo *fdinfo = NULL; /* less-often used infos for file descriptors */ int totalconn; /* total # of terminated sessions */ int actconn; /* # of active sessions */ @@ -338,7 +338,7 @@ static void fd_dodelete(int fd, int do_close) fdtab[fd].owner = NULL; fdtab[fd].thread_mask = 0; if (do_close) { - polled_mask[fd] = 0; + polled_mask[fd].poll_recv = polled_mask[fd].poll_send = 0; close(fd); _HA_ATOMIC_SUB(&ha_used_fds, 1); } @@ -525,7 +525,7 @@ int init_pollers() if ((fdtab = calloc(global.maxsock, sizeof(struct fdtab))) == NULL) goto fail_tab; - if ((polled_mask = calloc(global.maxsock, sizeof(unsigned long))) == NULL) + if ((polled_mask = calloc(global.maxsock, sizeof(*polled_mask))) == NULL) goto fail_polledmask; if ((fdinfo = calloc(global.maxsock, sizeof(struct fdinfo))) == NULL)