struct tvhpoll
{
+ pthread_mutex_t lock;
+ uint8_t *events;
+ uint32_t events_off;
+ uint32_t nevents;
#if ENABLE_EPOLL
int fd;
struct epoll_event *ev;
- int nev;
+ uint32_t nev;
#elif ENABLE_KQUEUE
int fd;
struct kevent *ev;
- int nev;
+ uint32_t nev;
#else
#endif
};
static void
-tvhpoll_alloc ( tvhpoll_t *tp, int n )
+tvhpoll_alloc_events ( tvhpoll_t *tp, int fd )
+{
+ tp->events = calloc(1, tp->nevents = 8);
+ tp->events_off = fd;
+}
+
+static void
+tvhpoll_realloc_events1 ( tvhpoll_t *tp, int fd )
+{
+ uint32_t diff = tp->events_off - fd;
+ uint8_t *evs = malloc(tp->nevents + diff);
+ memset(evs, 0, diff);
+ memcpy(evs + diff, tp->events, tp->nevents);
+ free(tp->events);
+ tp->events = evs;
+ tp->events_off = fd;
+ tp->nevents += diff;
+}
+
+static void
+tvhpoll_realloc_events2 ( tvhpoll_t *tp, int fd )
+{
+ uint32_t size = (fd - tp->events_off) + 4;
+ tp->events = realloc(tp->events, size);
+ memset(tp->events + tp->nevents, 0, size - tp->nevents);
+ tp->nevents = size;
+}
+
+static inline void
+tvhpoll_set_events ( tvhpoll_t *tp, int fd, uint32_t events )
+{
+ if (tp->nevents == 0) {
+ tvhpoll_alloc_events(tp, fd);
+ } else if (fd < tp->events_off) {
+ tvhpoll_realloc_events1(tp, fd);
+ } else if (fd - tp->events_off >= tp->nevents) {
+ tvhpoll_realloc_events2(tp, fd);
+ }
+ assert((events & 0xffffff00) == 0);
+ tp->events[fd - tp->events_off] = events;
+}
+
+static inline uint32_t
+tvhpoll_get_events( tvhpoll_t *tp, int fd )
+{
+ const uint32_t off = tp->events_off;
+ if (fd < off)
+ return 0;
+ fd -= off;
+ if (fd >= tp->nevents)
+ return 0;
+ return tp->events[fd];
+}
+
+static void
+tvhpoll_alloc ( tvhpoll_t *tp, uint32_t n )
{
#if ENABLE_EPOLL || ENABLE_KQUEUE
if (n > tp->nev) {
fd = -1;
#endif
tvhpoll_t *tp = calloc(1, sizeof(tvhpoll_t));
+ pthread_mutex_init(&tp->lock, NULL);
tp->fd = fd;
tvhpoll_alloc(tp, n);
return tp;
free(tp->ev);
close(tp->fd);
#endif
+ free(tp->events);
+ pthread_mutex_destroy(&tp->lock);
free(tp);
}
-int tvhpoll_add
+static int tvhpoll_add0
( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num )
{
- int i, rc;
#if ENABLE_EPOLL
+ int i;
for (i = 0; i < num; i++) {
struct epoll_event ev = { 0 };
+ const int fd = evs[i].fd;
+ const uint32_t events = evs[i].events;
+ const uint32_t oevents = tvhpoll_get_events(tp, fd);
+ if (oevents == events) continue;
ev.data.ptr = evs[i].ptr;
- if (evs[i].events & TVHPOLL_IN) ev.events |= EPOLLIN;
- if (evs[i].events & TVHPOLL_OUT) ev.events |= EPOLLOUT;
- if (evs[i].events & TVHPOLL_PRI) ev.events |= EPOLLPRI;
- if (evs[i].events & TVHPOLL_ERR) ev.events |= EPOLLERR;
- if (evs[i].events & TVHPOLL_HUP) ev.events |= EPOLLHUP;
- rc = epoll_ctl(tp->fd, EPOLL_CTL_ADD, evs[i].fd, &ev);
- if (rc && errno == EEXIST) {
+ if (events & TVHPOLL_IN) ev.events |= EPOLLIN;
+ if (events & TVHPOLL_OUT) ev.events |= EPOLLOUT;
+ if (events & TVHPOLL_PRI) ev.events |= EPOLLPRI;
+ if (events & TVHPOLL_ERR) ev.events |= EPOLLERR;
+ if (events & TVHPOLL_HUP) ev.events |= EPOLLHUP;
+ if (oevents) {
if (epoll_ctl(tp->fd, EPOLL_CTL_MOD, evs[i].fd, &ev))
- return -1;
+ break;
+ } else {
+ if (epoll_ctl(tp->fd, EPOLL_CTL_ADD, evs[i].fd, &ev))
+ break;
}
+ tvhpoll_set_events(tp, fd, events);
}
- return 0;
+ return i >= num ? 0 : -1;
#elif ENABLE_KQUEUE
- tvhpoll_alloc(tp, num);
- for (i = 0; i < num; i++) {
- if (evs[i].events & TVHPOLL_OUT){
- EV_SET(tp->ev+i, evs[i].fd, EVFILT_WRITE, EV_ADD, 0, 0, evs[i].ptr);
- rc = kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL);
- if (rc == -1) {
- tvherror(LS_TVHPOLL, "failed to add kqueue WRITE filter [%d|%d]",
- evs[i].fd, rc);
- return -1;
- }
- } else {
- EV_SET(tp->ev+i, evs[i].fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL);
+ int i, j, r = 0;
+ struct kevent ev = alloca(EV_SIZE * num * 2);
+ for (i = j = 0; i < num; i++) {
+ const int fd = evs[i].fd;
+ const void *ptr = evs[i].ptr;
+ const uint32_t events = evs[i].events;
+ const uint32_t oevents = tvhpoll_get_events(tp, fd);
+ if (events == ovents) continue;
+ tvhpoll_set_evbits(tp, fd, events);
+ if (events & (TVHPOLL_OUT|TVHPOLL_IN)) == (TVHPOLL_OUT|TVHPOLL_IN)) {
+ EV_SET(ev+j, fd, EVFILT_READ|EVFILT_WRITE, EV_ADD, 0, 0, ptr);
+ j++;
+ continue;
}
- if (evs[i].events & TVHPOLL_IN){
- EV_SET(tp->ev+i, evs[i].fd, EVFILT_READ, EV_ADD, 0, 0, evs[i].ptr);
- rc = kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL);
- if (rc == -1) {
- tvherror(LS_TVHPOLL, "failed to add kqueue READ filter [%d|%d]", evs[i].fd, rc);
- return -1;
- }
- } else {
- EV_SET(tp->ev+i, evs[i].fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
- kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL);
+ if (events & TVHPOLL_OUT) {
+ EV_SET(ev+j, fd, EVFILT_WRITE, EV_ADD, 0, 0, ptr);
+ j++;
+ } else if (oevents & TVHPOLL_OUT) {
+ EV_SET(ev+j, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+ j++;
+ }
+ if (events & TVHPOLL_IN) {
+ EV_SET(ev+j, fd, EVFILT_READ, EV_ADD, 0, 0, ptr);
+ j++;
+ } else if (oevents & TVHPOLL_IN) {
+ EV_SET(ev+j, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+ j++;
}
}
- return 0;
+ return kevent(tp->fd, ev, j, NULL, 0, NULL) >= 0 ? 0 : -1;
#else
return -1;
#endif
}
+int tvhpoll_add
+ ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num )
+{
+ int r;
+
+ pthread_mutex_lock(&tp->lock);
+ r = tvhpoll_add0(tp, evs, num);
+ pthread_mutex_unlock(&tp->lock);
+ return r;
+}
+
int tvhpoll_add1
- ( tvhpoll_t *tp, int fd, int events, void *ptr )
+ ( tvhpoll_t *tp, int fd, uint32_t events, void *ptr )
{
tvhpoll_event_t ev = { 0 };
ev.fd = fd;
return tvhpoll_add(tp, &ev, 1);
}
-int tvhpoll_rem
+static int tvhpoll_rem0
( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num )
{
+ int r = -1;
#if ENABLE_EPOLL
- int i;
- for (i = 0; i < num; i++)
- epoll_ctl(tp->fd, EPOLL_CTL_DEL, evs[i].fd, NULL);
-#elif ENABLE_KQUEUE
int i;
for (i = 0; i < num; i++) {
- EV_SET(tp->ev+i, evs[i].fd, 0, EV_DELETE, 0, 0, NULL);
- if (kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL) == -1)
- return -1;
+ const int fd = evs[i].fd;
+ if (tvhpoll_get_events(tp, fd)) {
+ if (epoll_ctl(tp->fd, EPOLL_CTL_DEL, fd, NULL))
+ break;
+ tvhpoll_set_events(tp, fd, 0);
+ }
+ }
+ if (i >= num)
+ r = 0;
+#elif ENABLE_KQUEUE
+ int i, j;
+ struct kevent *ev = alloca(EV_SIZE * num);
+ for (i = j = 0; i < num; i++) {
+ const int fd = evs[i].fd;
+ if (tvhpoll_get_events(tp, fd)) {
+ EV_SET(ev+j, fd, 0, EV_DELETE, 0, 0, NULL);
+ j++;
+ }
+ }
+ if (kevent(tp->fd, ev, j, NULL, 0, NULL) >= 0) {
+ r = 0;
+ for (i = 0; i < j; i++)
+ tvhpoll_set_events(tp, ev[i].ident, 0);
}
#else
#endif
- return 0;
+ return r;
+}
+
+int tvhpoll_rem
+ ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num )
+{
+ int r;
+
+ pthread_mutex_lock(&tp->lock);
+ r = tvhpoll_rem0(tp, evs, num);
+ pthread_mutex_unlock(&tp->lock);
+ return r;
}
int tvhpoll_rem1
return tvhpoll_rem(tp, &ev, 1);
}
+int tvhpoll_set
+ ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num )
+{
+ tvhpoll_event_t *lev, *ev;
+ int i, j, k, r;
+ pthread_mutex_lock(&tp->lock);
+ lev = alloca(tp->nevents * sizeof(*lev));
+ for (i = k = 0; i < tp->nevents; i++)
+ if (tp->events[i]) {
+ for (j = 0; j < num; j++)
+ if (evs[j].fd == i + tp->events_off)
+ break;
+ if (j >= num) {
+ ev = lev + k;
+ k++;
+ ev->fd = i + tp->events_off;
+ ev->events = tp->events[i];
+ ev->ptr = 0;
+ }
+ }
+ r = tvhpoll_rem0(tp, lev, k);
+ if (r == 0)
+ r = tvhpoll_add0(tp, evs, num);
+ pthread_mutex_unlock(&tp->lock);
+ return r;
+}
+
int tvhpoll_wait
( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num, int ms )
{