From: Jaroslav Kysela Date: Tue, 2 Jan 2018 16:27:08 +0000 (+0100) Subject: tvhpoll: add events cache, add set function X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=ff7770798c9d0e23bc6f779b364603cf703fa48b;p=thirdparty%2Ftvheadend.git tvhpoll: add events cache, add set function --- diff --git a/src/tvhpoll.c b/src/tvhpoll.c index 8803e0bd1..08c1b6a1d 100644 --- a/src/tvhpoll.c +++ b/src/tvhpoll.c @@ -41,20 +41,79 @@ struct tvhpoll { + pthread_mutex_t lock; + uint8_t *events; + uint32_t events_off; + uint32_t nevents; #if ENABLE_EPOLL int fd; struct epoll_event *ev; - int nev; + uint32_t nev; #elif ENABLE_KQUEUE int fd; struct kevent *ev; - int nev; + uint32_t nev; #else #endif }; static void -tvhpoll_alloc ( tvhpoll_t *tp, int n ) +tvhpoll_alloc_events ( tvhpoll_t *tp, int fd ) +{ + tp->events = calloc(1, tp->nevents = 8); + tp->events_off = fd; +} + +static void +tvhpoll_realloc_events1 ( tvhpoll_t *tp, int fd ) +{ + uint32_t diff = tp->events_off - fd; + uint8_t *evs = malloc(tp->nevents + diff); + memset(evs, 0, diff); + memcpy(evs + diff, tp->events, tp->nevents); + free(tp->events); + tp->events = evs; + tp->events_off = fd; + tp->nevents += diff; +} + +static void +tvhpoll_realloc_events2 ( tvhpoll_t *tp, int fd ) +{ + uint32_t size = (fd - tp->events_off) + 4; + tp->events = realloc(tp->events, size); + memset(tp->events + tp->nevents, 0, size - tp->nevents); + tp->nevents = size; +} + +static inline void +tvhpoll_set_events ( tvhpoll_t *tp, int fd, uint32_t events ) +{ + if (tp->nevents == 0) { + tvhpoll_alloc_events(tp, fd); + } else if (fd < tp->events_off) { + tvhpoll_realloc_events1(tp, fd); + } else if (fd - tp->events_off >= tp->nevents) { + tvhpoll_realloc_events2(tp, fd); + } + assert((events & 0xffffff00) == 0); + tp->events[fd - tp->events_off] = events; +} + +static inline uint32_t +tvhpoll_get_events( tvhpoll_t *tp, int fd ) +{ + const uint32_t off = tp->events_off; + if (fd < off) + return 0; + fd -= off; + if (fd >= tp->nevents) + return 0; + return tp->events[fd]; +} + +static void +tvhpoll_alloc ( tvhpoll_t *tp, uint32_t n ) { #if ENABLE_EPOLL || ENABLE_KQUEUE if (n > tp->nev) { @@ -83,6 +142,7 @@ tvhpoll_create ( size_t n ) fd = -1; #endif tvhpoll_t *tp = calloc(1, sizeof(tvhpoll_t)); + pthread_mutex_init(&tp->lock, NULL); tp->fd = fd; tvhpoll_alloc(tp, n); return tp; @@ -96,64 +156,87 @@ void tvhpoll_destroy ( tvhpoll_t *tp ) free(tp->ev); close(tp->fd); #endif + free(tp->events); + pthread_mutex_destroy(&tp->lock); free(tp); } -int tvhpoll_add +static int tvhpoll_add0 ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num ) { - int i, rc; #if ENABLE_EPOLL + int i; for (i = 0; i < num; i++) { struct epoll_event ev = { 0 }; + const int fd = evs[i].fd; + const uint32_t events = evs[i].events; + const uint32_t oevents = tvhpoll_get_events(tp, fd); + if (oevents == events) continue; ev.data.ptr = evs[i].ptr; - if (evs[i].events & TVHPOLL_IN) ev.events |= EPOLLIN; - if (evs[i].events & TVHPOLL_OUT) ev.events |= EPOLLOUT; - if (evs[i].events & TVHPOLL_PRI) ev.events |= EPOLLPRI; - if (evs[i].events & TVHPOLL_ERR) ev.events |= EPOLLERR; - if (evs[i].events & TVHPOLL_HUP) ev.events |= EPOLLHUP; - rc = epoll_ctl(tp->fd, EPOLL_CTL_ADD, evs[i].fd, &ev); - if (rc && errno == EEXIST) { + if (events & TVHPOLL_IN) ev.events |= EPOLLIN; + if (events & TVHPOLL_OUT) ev.events |= EPOLLOUT; + if (events & TVHPOLL_PRI) ev.events |= EPOLLPRI; + if (events & TVHPOLL_ERR) ev.events |= EPOLLERR; + if (events & TVHPOLL_HUP) ev.events |= EPOLLHUP; + if (oevents) { if (epoll_ctl(tp->fd, EPOLL_CTL_MOD, evs[i].fd, &ev)) - return -1; + break; + } else { + if (epoll_ctl(tp->fd, EPOLL_CTL_ADD, evs[i].fd, &ev)) + break; } + tvhpoll_set_events(tp, fd, events); } - return 0; + return i >= num ? 0 : -1; #elif ENABLE_KQUEUE - tvhpoll_alloc(tp, num); - for (i = 0; i < num; i++) { - if (evs[i].events & TVHPOLL_OUT){ - EV_SET(tp->ev+i, evs[i].fd, EVFILT_WRITE, EV_ADD, 0, 0, evs[i].ptr); - rc = kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL); - if (rc == -1) { - tvherror(LS_TVHPOLL, "failed to add kqueue WRITE filter [%d|%d]", - evs[i].fd, rc); - return -1; - } - } else { - EV_SET(tp->ev+i, evs[i].fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); - kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL); + int i, j, r = 0; + struct kevent ev = alloca(EV_SIZE * num * 2); + for (i = j = 0; i < num; i++) { + const int fd = evs[i].fd; + const void *ptr = evs[i].ptr; + const uint32_t events = evs[i].events; + const uint32_t oevents = tvhpoll_get_events(tp, fd); + if (events == ovents) continue; + tvhpoll_set_evbits(tp, fd, events); + if (events & (TVHPOLL_OUT|TVHPOLL_IN)) == (TVHPOLL_OUT|TVHPOLL_IN)) { + EV_SET(ev+j, fd, EVFILT_READ|EVFILT_WRITE, EV_ADD, 0, 0, ptr); + j++; + continue; } - if (evs[i].events & TVHPOLL_IN){ - EV_SET(tp->ev+i, evs[i].fd, EVFILT_READ, EV_ADD, 0, 0, evs[i].ptr); - rc = kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL); - if (rc == -1) { - tvherror(LS_TVHPOLL, "failed to add kqueue READ filter [%d|%d]", evs[i].fd, rc); - return -1; - } - } else { - EV_SET(tp->ev+i, evs[i].fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); - kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL); + if (events & TVHPOLL_OUT) { + EV_SET(ev+j, fd, EVFILT_WRITE, EV_ADD, 0, 0, ptr); + j++; + } else if (oevents & TVHPOLL_OUT) { + EV_SET(ev+j, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + j++; + } + if (events & TVHPOLL_IN) { + EV_SET(ev+j, fd, EVFILT_READ, EV_ADD, 0, 0, ptr); + j++; + } else if (oevents & TVHPOLL_IN) { + EV_SET(ev+j, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + j++; } } - return 0; + return kevent(tp->fd, ev, j, NULL, 0, NULL) >= 0 ? 0 : -1; #else return -1; #endif } +int tvhpoll_add + ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num ) +{ + int r; + + pthread_mutex_lock(&tp->lock); + r = tvhpoll_add0(tp, evs, num); + pthread_mutex_unlock(&tp->lock); + return r; +} + int tvhpoll_add1 - ( tvhpoll_t *tp, int fd, int events, void *ptr ) + ( tvhpoll_t *tp, int fd, uint32_t events, void *ptr ) { tvhpoll_event_t ev = { 0 }; ev.fd = fd; @@ -162,23 +245,51 @@ int tvhpoll_add1 return tvhpoll_add(tp, &ev, 1); } -int tvhpoll_rem +static int tvhpoll_rem0 ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num ) { + int r = -1; #if ENABLE_EPOLL - int i; - for (i = 0; i < num; i++) - epoll_ctl(tp->fd, EPOLL_CTL_DEL, evs[i].fd, NULL); -#elif ENABLE_KQUEUE int i; for (i = 0; i < num; i++) { - EV_SET(tp->ev+i, evs[i].fd, 0, EV_DELETE, 0, 0, NULL); - if (kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL) == -1) - return -1; + const int fd = evs[i].fd; + if (tvhpoll_get_events(tp, fd)) { + if (epoll_ctl(tp->fd, EPOLL_CTL_DEL, fd, NULL)) + break; + tvhpoll_set_events(tp, fd, 0); + } + } + if (i >= num) + r = 0; +#elif ENABLE_KQUEUE + int i, j; + struct kevent *ev = alloca(EV_SIZE * num); + for (i = j = 0; i < num; i++) { + const int fd = evs[i].fd; + if (tvhpoll_get_events(tp, fd)) { + EV_SET(ev+j, fd, 0, EV_DELETE, 0, 0, NULL); + j++; + } + } + if (kevent(tp->fd, ev, j, NULL, 0, NULL) >= 0) { + r = 0; + for (i = 0; i < j; i++) + tvhpoll_set_events(tp, ev[i].ident, 0); } #else #endif - return 0; + return r; +} + +int tvhpoll_rem + ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num ) +{ + int r; + + pthread_mutex_lock(&tp->lock); + r = tvhpoll_rem0(tp, evs, num); + pthread_mutex_unlock(&tp->lock); + return r; } int tvhpoll_rem1 @@ -189,6 +300,33 @@ int tvhpoll_rem1 return tvhpoll_rem(tp, &ev, 1); } +int tvhpoll_set + ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num ) +{ + tvhpoll_event_t *lev, *ev; + int i, j, k, r; + pthread_mutex_lock(&tp->lock); + lev = alloca(tp->nevents * sizeof(*lev)); + for (i = k = 0; i < tp->nevents; i++) + if (tp->events[i]) { + for (j = 0; j < num; j++) + if (evs[j].fd == i + tp->events_off) + break; + if (j >= num) { + ev = lev + k; + k++; + ev->fd = i + tp->events_off; + ev->events = tp->events[i]; + ev->ptr = 0; + } + } + r = tvhpoll_rem0(tp, lev, k); + if (r == 0) + r = tvhpoll_add0(tp, evs, num); + pthread_mutex_unlock(&tp->lock); + return r; +} + int tvhpoll_wait ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num, int ms ) { diff --git a/src/tvhpoll.h b/src/tvhpoll.h index 9e8ee2602..ead61c918 100644 --- a/src/tvhpoll.h +++ b/src/tvhpoll.h @@ -40,8 +40,9 @@ typedef struct tvhpoll_event tvhpoll_t *tvhpoll_create(size_t num); void tvhpoll_destroy(tvhpoll_t *tp); +int tvhpoll_set(tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num); int tvhpoll_add(tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num); -int tvhpoll_add1(tvhpoll_t *tp, int fd, int events, void *ptr); +int tvhpoll_add1(tvhpoll_t *tp, int fd, uint32_t events, void *ptr); int tvhpoll_rem(tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num); int tvhpoll_rem1(tvhpoll_t *tp, int fd); int tvhpoll_wait(tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num, int ms);