]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
tvhpoll: add events cache, add set function
authorJaroslav Kysela <perex@perex.cz>
Tue, 2 Jan 2018 16:27:08 +0000 (17:27 +0100)
committerJaroslav Kysela <perex@perex.cz>
Thu, 4 Jan 2018 14:03:22 +0000 (15:03 +0100)
src/tvhpoll.c
src/tvhpoll.h

index 8803e0bd1d9d15b881c4a3b256b6333c90cfd331..08c1b6a1d512df55656ed33757e4114a986798a3 100644 (file)
 
 struct tvhpoll
 {
+  pthread_mutex_t lock;
+  uint8_t *events;
+  uint32_t events_off;
+  uint32_t nevents;
 #if ENABLE_EPOLL
   int fd;
   struct epoll_event *ev;
-  int nev;
+  uint32_t nev;
 #elif ENABLE_KQUEUE
   int fd;
   struct kevent *ev;
-  int nev;
+  uint32_t nev;
 #else
 #endif
 };
 
 static void
-tvhpoll_alloc ( tvhpoll_t *tp, int n )
+tvhpoll_alloc_events ( tvhpoll_t *tp, int fd )
+{
+  tp->events = calloc(1, tp->nevents = 8);
+  tp->events_off = fd;
+}
+
+static void
+tvhpoll_realloc_events1 ( tvhpoll_t *tp, int fd )
+{
+  uint32_t diff = tp->events_off - fd;
+  uint8_t *evs = malloc(tp->nevents + diff);
+  memset(evs, 0, diff);
+  memcpy(evs + diff, tp->events, tp->nevents);
+  free(tp->events);
+  tp->events = evs;
+  tp->events_off = fd;
+  tp->nevents += diff;
+}
+
+static void
+tvhpoll_realloc_events2 ( tvhpoll_t *tp, int fd )
+{
+  uint32_t size = (fd - tp->events_off) + 4;
+  tp->events = realloc(tp->events, size);
+  memset(tp->events + tp->nevents, 0, size - tp->nevents);
+  tp->nevents = size;
+}
+
+static inline void
+tvhpoll_set_events ( tvhpoll_t *tp, int fd, uint32_t events )
+{
+  if (tp->nevents == 0) {
+    tvhpoll_alloc_events(tp, fd);
+  } else if (fd < tp->events_off) {
+    tvhpoll_realloc_events1(tp, fd);
+  } else if (fd - tp->events_off >= tp->nevents) {
+    tvhpoll_realloc_events2(tp, fd);
+  }
+  assert((events & 0xffffff00) == 0);
+  tp->events[fd - tp->events_off] = events;
+}
+
+static inline uint32_t
+tvhpoll_get_events( tvhpoll_t *tp, int fd )
+{
+  const uint32_t off = tp->events_off;
+  if (fd < off)
+    return 0;
+  fd -= off;
+  if (fd >= tp->nevents)
+    return 0;
+  return tp->events[fd];
+}
+
+static void
+tvhpoll_alloc ( tvhpoll_t *tp, uint32_t n )
 {
 #if ENABLE_EPOLL || ENABLE_KQUEUE
   if (n > tp->nev) {
@@ -83,6 +142,7 @@ tvhpoll_create ( size_t n )
   fd = -1;
 #endif
   tvhpoll_t *tp = calloc(1, sizeof(tvhpoll_t));
+  pthread_mutex_init(&tp->lock, NULL);
   tp->fd = fd;
   tvhpoll_alloc(tp, n);
   return tp;
@@ -96,64 +156,87 @@ void tvhpoll_destroy ( tvhpoll_t *tp )
   free(tp->ev);
   close(tp->fd);
 #endif
+  free(tp->events);
+  pthread_mutex_destroy(&tp->lock);
   free(tp);
 }
 
-int tvhpoll_add
+static int tvhpoll_add0
   ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num )
 {
-  int i, rc;
 #if ENABLE_EPOLL
+  int i;
   for (i = 0; i < num; i++) {
     struct epoll_event ev = { 0 };
+    const int fd = evs[i].fd;
+    const uint32_t events = evs[i].events;
+    const uint32_t oevents = tvhpoll_get_events(tp, fd);
+    if (oevents == events) continue;
     ev.data.ptr = evs[i].ptr;
-    if (evs[i].events & TVHPOLL_IN)  ev.events |= EPOLLIN;
-    if (evs[i].events & TVHPOLL_OUT) ev.events |= EPOLLOUT;
-    if (evs[i].events & TVHPOLL_PRI) ev.events |= EPOLLPRI;
-    if (evs[i].events & TVHPOLL_ERR) ev.events |= EPOLLERR;
-    if (evs[i].events & TVHPOLL_HUP) ev.events |= EPOLLHUP;
-    rc = epoll_ctl(tp->fd, EPOLL_CTL_ADD, evs[i].fd, &ev);
-    if (rc && errno == EEXIST) {
+    if (events & TVHPOLL_IN)  ev.events |= EPOLLIN;
+    if (events & TVHPOLL_OUT) ev.events |= EPOLLOUT;
+    if (events & TVHPOLL_PRI) ev.events |= EPOLLPRI;
+    if (events & TVHPOLL_ERR) ev.events |= EPOLLERR;
+    if (events & TVHPOLL_HUP) ev.events |= EPOLLHUP;
+    if (oevents) {
       if (epoll_ctl(tp->fd, EPOLL_CTL_MOD, evs[i].fd, &ev))
-        return -1;
+        break;
+    } else {
+      if (epoll_ctl(tp->fd, EPOLL_CTL_ADD, evs[i].fd, &ev))
+        break;
     }
+    tvhpoll_set_events(tp, fd, events);
   }
-  return 0;
+  return i >= num ? 0 : -1;
 #elif ENABLE_KQUEUE
-  tvhpoll_alloc(tp, num);
-  for (i = 0; i < num; i++) {
-    if (evs[i].events & TVHPOLL_OUT){
-      EV_SET(tp->ev+i, evs[i].fd, EVFILT_WRITE, EV_ADD, 0, 0, evs[i].ptr);
-      rc = kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL);
-      if (rc == -1) {
-        tvherror(LS_TVHPOLL, "failed to add kqueue WRITE filter [%d|%d]",
-           evs[i].fd, rc);
-        return -1;
-      }
-    } else {
-      EV_SET(tp->ev+i, evs[i].fd, EVFILT_WRITE,  EV_DELETE, 0, 0, NULL);
-      kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL);
+  int i, j, r = 0;
+  struct kevent ev = alloca(EV_SIZE * num * 2);
+  for (i = j = 0; i < num; i++) {
+    const int fd = evs[i].fd;
+    const void *ptr = evs[i].ptr;
+    const uint32_t events = evs[i].events;
+    const uint32_t oevents = tvhpoll_get_events(tp, fd);
+    if (events == ovents) continue;
+    tvhpoll_set_evbits(tp, fd, events);
+    if (events & (TVHPOLL_OUT|TVHPOLL_IN)) == (TVHPOLL_OUT|TVHPOLL_IN)) {
+      EV_SET(ev+j, fd, EVFILT_READ|EVFILT_WRITE, EV_ADD, 0, 0, ptr);
+      j++;
+      continue;
     }
-    if (evs[i].events & TVHPOLL_IN){
-      EV_SET(tp->ev+i, evs[i].fd, EVFILT_READ, EV_ADD, 0, 0, evs[i].ptr);
-      rc = kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL);
-      if (rc == -1) {
-        tvherror(LS_TVHPOLL, "failed to add kqueue READ filter [%d|%d]", evs[i].fd, rc);
-        return -1;
-      }
-    } else {
-      EV_SET(tp->ev+i, evs[i].fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
-      kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL);
+    if (events & TVHPOLL_OUT) {
+      EV_SET(ev+j, fd, EVFILT_WRITE, EV_ADD, 0, 0, ptr);
+      j++;
+    } else if (oevents & TVHPOLL_OUT) {
+      EV_SET(ev+j, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+      j++;
+    }
+    if (events & TVHPOLL_IN) {
+      EV_SET(ev+j, fd, EVFILT_READ, EV_ADD, 0, 0, ptr);
+      j++;
+    } else if (oevents & TVHPOLL_IN) {
+      EV_SET(ev+j, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+      j++;
     }
   }
-  return 0;
+  return kevent(tp->fd, ev, j, NULL, 0, NULL) >= 0 ? 0 : -1;
 #else
   return -1;
 #endif
 }
 
+int tvhpoll_add
+  ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num )
+{
+  int r;
+
+  pthread_mutex_lock(&tp->lock);
+  r = tvhpoll_add0(tp, evs, num);
+  pthread_mutex_unlock(&tp->lock);
+  return r;
+}
+
 int tvhpoll_add1
-  ( tvhpoll_t *tp, int fd, int events, void *ptr )
+  ( tvhpoll_t *tp, int fd, uint32_t events, void *ptr )
 {
   tvhpoll_event_t ev = { 0 };
   ev.fd = fd;
@@ -162,23 +245,51 @@ int tvhpoll_add1
   return tvhpoll_add(tp, &ev, 1);
 }
 
-int tvhpoll_rem
+static int tvhpoll_rem0
   ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num )
 {
+  int r = -1;
 #if ENABLE_EPOLL
-  int i;
-  for (i = 0; i < num; i++)
-    epoll_ctl(tp->fd, EPOLL_CTL_DEL, evs[i].fd, NULL);
-#elif ENABLE_KQUEUE
   int i;
   for (i = 0; i < num; i++) {
-    EV_SET(tp->ev+i, evs[i].fd, 0, EV_DELETE, 0, 0, NULL);
-    if (kevent(tp->fd, tp->ev+i, 1, NULL, 0, NULL) == -1)
-      return -1;
+    const int fd = evs[i].fd;
+    if (tvhpoll_get_events(tp, fd)) {
+      if (epoll_ctl(tp->fd, EPOLL_CTL_DEL, fd, NULL))
+        break;
+      tvhpoll_set_events(tp, fd, 0);
+    }
+  }
+  if (i >= num)
+    r = 0;
+#elif ENABLE_KQUEUE
+  int i, j;
+  struct kevent *ev = alloca(EV_SIZE * num);
+  for (i = j = 0; i < num; i++) {
+    const int fd = evs[i].fd;
+    if (tvhpoll_get_events(tp, fd)) {
+      EV_SET(ev+j, fd, 0, EV_DELETE, 0, 0, NULL);
+      j++;
+    }
+  }
+  if (kevent(tp->fd, ev, j, NULL, 0, NULL) >= 0) {
+    r = 0;
+    for (i = 0; i < j; i++)
+      tvhpoll_set_events(tp, ev[i].ident, 0);
   }
 #else
 #endif
-  return 0;
+  return r;
+}
+
+int tvhpoll_rem
+  ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num )
+{
+  int r;
+
+  pthread_mutex_lock(&tp->lock);
+  r = tvhpoll_rem0(tp, evs, num);
+  pthread_mutex_unlock(&tp->lock);
+  return r;
 }
 
 int tvhpoll_rem1
@@ -189,6 +300,33 @@ int tvhpoll_rem1
   return tvhpoll_rem(tp, &ev, 1);
 }
 
+int tvhpoll_set
+  ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num )
+{
+  tvhpoll_event_t *lev, *ev;
+  int i, j, k, r;
+  pthread_mutex_lock(&tp->lock);
+  lev = alloca(tp->nevents * sizeof(*lev));
+  for (i = k = 0; i < tp->nevents; i++)
+    if (tp->events[i]) {
+      for (j = 0; j < num; j++)
+        if (evs[j].fd == i + tp->events_off)
+          break;
+      if (j >= num) {
+        ev = lev + k;
+        k++;
+        ev->fd = i + tp->events_off;
+        ev->events = tp->events[i];
+        ev->ptr = 0;
+      }
+    }
+  r = tvhpoll_rem0(tp, lev, k);
+  if (r == 0)
+    r = tvhpoll_add0(tp, evs, num);
+  pthread_mutex_unlock(&tp->lock);
+  return r;
+}
+
 int tvhpoll_wait
   ( tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num, int ms )
 {
index 9e8ee26028970a1e730dcd20e8318757db024d4c..ead61c918d2f476432860d0dd1828fbe4f6012eb 100644 (file)
@@ -40,8 +40,9 @@ typedef struct tvhpoll_event
 
 tvhpoll_t *tvhpoll_create(size_t num);
 void tvhpoll_destroy(tvhpoll_t *tp);
+int tvhpoll_set(tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num);
 int tvhpoll_add(tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num);
-int tvhpoll_add1(tvhpoll_t *tp, int fd, int events, void *ptr);
+int tvhpoll_add1(tvhpoll_t *tp, int fd, uint32_t events, void *ptr);
 int tvhpoll_rem(tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num);
 int tvhpoll_rem1(tvhpoll_t *tp, int fd);
 int tvhpoll_wait(tvhpoll_t *tp, tvhpoll_event_t *evs, size_t num, int ms);