]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
tvhpoll: update all code to use new tvhpoll wrapper rather than epoll
authorBernhard Froehlich <decke@bluelife.at>
Sat, 4 May 2013 14:45:51 +0000 (16:45 +0200)
committerAdam Sutton <dev@adamsutton.me.uk>
Fri, 31 May 2013 13:22:29 +0000 (14:22 +0100)
src/dvb/dvb.h
src/dvb/dvb_adapter.c
src/dvb/dvb_input_filtered.c
src/iptv_input.c
src/tcp.c
src/timeshift/timeshift_reader.c

index 127d676ff887e26db2e672346f067b9eb8f001c9..eb6d2b79c3f81a1fe7c468421afa0ec4a548e941 100644 (file)
@@ -24,6 +24,7 @@
 #include <pthread.h>
 #include "htsmsg.h"
 #include "psi.h"
+#include "tvhpoll.h"
 
 struct service;
 struct th_dvb_table;
@@ -204,7 +205,7 @@ typedef struct th_dvb_adapter {
 
   th_dvb_mux_instance_t *tda_mux_epg;
 
-  int tda_table_epollfd;
+  tvhpoll_t *tda_table_pd;
 
   uint32_t tda_enabled;
 
index 00a537047314a78adaa03aee2471e0c3634b9761..93f06cdbd2130cb96f482df3a53c104f8c0f8cad 100644 (file)
@@ -23,7 +23,6 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <sys/ioctl.h>
-#include <sys/epoll.h>
 #include <sys/types.h>
 #include <dirent.h>
 #include <fcntl.h>
@@ -48,6 +47,7 @@
 #include "epggrab.h"
 #include "diseqc.h"
 #include "atomic.h"
+#include "tvhpoll.h"
 
 struct th_dvb_adapter_queue dvb_adapters;
 struct th_dvb_mux_instance_tree dvb_muxes;
@@ -1020,11 +1020,11 @@ static void *
 dvb_adapter_input_dvr(void *aux)
 {
   th_dvb_adapter_t *tda = aux;
-  int fd = -1, i, r, c, efd, nfds, dmx = -1;
+  int fd = -1, i, r, c, nfds, dmx = -1;
   uint8_t tsb[188 * 10];
   service_t *t;
-  struct epoll_event ev;
-  int delay = 10;
+  tvhpoll_t *pd;
+  tvhpoll_event_t ev[2];
 
   /* Install RAW demux */
   if (tda->tda_rawmode) {
@@ -1040,26 +1040,25 @@ dvb_adapter_input_dvr(void *aux)
     return NULL;
   }
 
-  /* Create poll */
-  efd = epoll_create(2);
-  memset(&ev, 0, sizeof(ev));
-  ev.events  = EPOLLIN;
-  ev.data.fd = tda->tda_dvr_pipe.rd;
-  epoll_ctl(efd, EPOLL_CTL_ADD, tda->tda_dvr_pipe.rd, &ev);
-  ev.data.fd = fd;
-  epoll_ctl(efd, EPOLL_CTL_ADD, fd, &ev);
+  pd = tvhpoll_create(2);
+  memset(ev, 0, sizeof(ev));
+  ev[0].data.fd = ev[0].fd = tda->tda_dvr_pipe.rd;
+  ev[0].events  = TVHPOLL_IN;
+  ev[1].data.fd = ev[1].fd = fd;
+  ev[1].events  = TVHPOLL_IN;
+  tvhpoll_add(pd, ev, 2);
 
   r = i = 0;
   while(1) {
 
     /* Wait for input */
-    nfds = epoll_wait(efd, &ev, 1, delay);
+    nfds = tvhpoll_wait(pd, ev, 1, -1);
 
     /* No data */
     if (nfds < 1) continue;
 
     /* Exit */
-    if (ev.data.fd != fd) break;
+    if (ev[0].data.fd != fd) break;
 
     /* Read data */
     c = read(fd, tsb+r, sizeof(tsb)-r);
@@ -1140,7 +1139,7 @@ dvb_adapter_input_dvr(void *aux)
 
   if(dmx != -1)
     close(dmx);
-  close(efd);
+  tvhpoll_destroy(pd);
   close(fd);
   return NULL;
 }
index a4dddf264a0932abec5ef0ea15d08db5d2b76aba..0de91d072d53df3fd64046be490a3bda644e691e 100644 (file)
 #include <fcntl.h>
 #include <linux/dvb/frontend.h>
 #include <linux/dvb/dmx.h>
-#include <sys/epoll.h>
 
 #include "tvheadend.h"
 #include "dvb.h"
 #include "service.h"
+#include "tvhpoll.h"
 
 /**
  * Install filters for a service
@@ -58,9 +58,9 @@ open_service(th_dvb_adapter_t *tda, service_t *s)
     if(fd == -1) {
       st->es_demuxer_fd = -1;
       tvhlog(LOG_ERR, "dvb",
-            "\"%s\" unable to open demuxer \"%s\" for pid %d -- %s",
-            s->s_identifier, tda->tda_demux_path, 
-            st->es_pid, strerror(errno));
+       "\"%s\" unable to open demuxer \"%s\" for pid %d -- %s",
+       s->s_identifier, tda->tda_demux_path, 
+       st->es_pid, strerror(errno));
       continue;
     }
 
@@ -73,9 +73,9 @@ open_service(th_dvb_adapter_t *tda, service_t *s)
 
     if(ioctl(fd, DMX_SET_PES_FILTER, &dmx_param)) {
       tvhlog(LOG_ERR, "dvb",
-            "\"%s\" unable to configure demuxer \"%s\" for pid %d -- %s",
-            s->s_identifier, tda->tda_demux_path, 
-            st->es_pid, strerror(errno));
+       "\"%s\" unable to configure demuxer \"%s\" for pid %d -- %s",
+       s->s_identifier, tda->tda_demux_path, 
+       st->es_pid, strerror(errno));
       close(fd);
       fd = -1;
     }
@@ -113,7 +113,7 @@ static void
 open_table(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt)
 {
   th_dvb_adapter_t *tda = tdmi->tdmi_adapter;
-  struct epoll_event e;
+  tvhpoll_event_t ev;
   static int tdt_id_tally;
 
   tdt->tdt_fd = tvh_open(tda->tda_demux_path, O_RDWR, 0);
@@ -122,10 +122,11 @@ open_table(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt)
 
     tdt->tdt_id = ++tdt_id_tally;
 
-    e.events = EPOLLIN;
-    e.data.u64 = ((uint64_t)tdt->tdt_fd << 32) | tdt->tdt_id;
+    ev.fd       = tdt->tdt_fd;
+    ev.events   = TVHPOLL_IN;
+    ev.data.u64 = ((uint64_t)tdt->tdt_fd << 32) | tdt->tdt_id;
 
-    if(epoll_ctl(tda->tda_table_epollfd, EPOLL_CTL_ADD, tdt->tdt_fd, &e)) {
+    if(tvhpoll_add(tda->tda_table_pd, &ev, 1) != 0) {
       close(tdt->tdt_fd);
       tdt->tdt_fd = -1;
     } else {
@@ -136,13 +137,13 @@ open_table(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt)
       fp.filter.mask[0]   = tdt->tdt_mask;
 
       if(tdt->tdt_flags & TDT_CRC)
-       fp.flags |= DMX_CHECK_CRC;
+  fp.flags |= DMX_CHECK_CRC;
       fp.flags |= DMX_IMMEDIATE_START;
       fp.pid = tdt->tdt_pid;
 
       if(ioctl(tdt->tdt_fd, DMX_SET_FILTER, &fp)) {
-       close(tdt->tdt_fd);
-       tdt->tdt_fd = -1;
+  close(tdt->tdt_fd);
+  tdt->tdt_fd = -1;
       }
     }
   }
@@ -159,10 +160,12 @@ static void
 tdt_close_fd(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt)
 {
   th_dvb_adapter_t *tda = tdmi->tdmi_adapter;
+  tvhpoll_event_t ev;
 
   assert(tdt->tdt_fd != -1);
 
-  epoll_ctl(tda->tda_table_epollfd, EPOLL_CTL_DEL, tdt->tdt_fd, NULL);
+  ev.fd = tdt->tdt_fd;
+  tvhpoll_rem(tda->tda_table_pd, &ev, 1);
   close(tdt->tdt_fd);
 
   tdt->tdt_fd = -1;
@@ -178,51 +181,48 @@ static void *
 dvb_table_input(void *aux)
 {
   th_dvb_adapter_t *tda = aux;
-  int r, i, tid, fd, x;
-  struct epoll_event ev[1];
+  int r, tid, fd, x;
   uint8_t sec[4096];
   th_dvb_mux_instance_t *tdmi;
   th_dvb_table_t *tdt;
   int64_t cycle_barrier = 0; 
+  tvhpoll_event_t ev;
 
   while(1) {
-    x = epoll_wait(tda->tda_table_epollfd, ev, sizeof(ev) / sizeof(ev[0]), -1);
-
-    for(i = 0; i < x; i++) {
-    
-      tid = ev[i].data.u64 & 0xffffffff;
-      fd  = ev[i].data.u64 >> 32; 
-
-      if(!(ev[i].events & EPOLLIN))
-       continue;
-
-      if((r = read(fd, sec, sizeof(sec))) < 3)
-       continue;
-
-      pthread_mutex_lock(&global_lock);
-      if((tdmi = tda->tda_mux_current) != NULL) {
-       LIST_FOREACH(tdt, &tdmi->tdmi_tables, tdt_link)
-         if(tdt->tdt_id == tid)
-           break;
-
-       if(tdt != NULL) {
-         dvb_table_dispatch(sec, r, tdt);
-
-         /* Any tables pending (that wants a filter/fd), close this one */
-         if(TAILQ_FIRST(&tdmi->tdmi_table_queue) != NULL &&
-            cycle_barrier < getmonoclock()) {
-           tdt_close_fd(tdmi, tdt);
-           cycle_barrier = getmonoclock() + 100000;
-           tdt = TAILQ_FIRST(&tdmi->tdmi_table_queue);
-           assert(tdt != NULL);
-           TAILQ_REMOVE(&tdmi->tdmi_table_queue, tdt, tdt_pending_link);
-
-           open_table(tdmi, tdt);
-         }
-       }
+    x = tvhpoll_wait(tda->tda_table_pd, &ev, 1, -1);
+    if (x != 1) continue;
+
+    tid = ev.data.u64 & 0xffffffff;
+    fd  = ev.data.u64 >> 32; 
+
+    if(!(ev.events & TVHPOLL_IN))
+      continue;
+
+    if((r = read(fd, sec, sizeof(sec))) < 3)
+      continue;
+
+    pthread_mutex_lock(&global_lock);
+    if((tdmi = tda->tda_mux_current) != NULL) {
+      LIST_FOREACH(tdt, &tdmi->tdmi_tables, tdt_link)
+        if(tdt->tdt_id == tid)
+          break;
+
+      if(tdt != NULL) {
+        dvb_table_dispatch(sec, r, tdt);
+
+        /* Any tables pending (that wants a filter/fd), close this one */
+        if(TAILQ_FIRST(&tdmi->tdmi_table_queue) != NULL &&
+           cycle_barrier < getmonoclock()) {
+          tdt_close_fd(tdmi, tdt);
+          cycle_barrier = getmonoclock() + 100000;
+          tdt = TAILQ_FIRST(&tdmi->tdmi_table_queue);
+          assert(tdt != NULL);
+          TAILQ_REMOVE(&tdmi->tdmi_table_queue, tdt, tdt_pending_link);
+          open_table(tdmi, tdt);
+        }
       }
-      pthread_mutex_unlock(&global_lock);
     }
+    pthread_mutex_unlock(&global_lock);
   }
   return NULL;
 }
@@ -232,11 +232,13 @@ static void
 close_table(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt)
 {
   th_dvb_adapter_t *tda = tdmi->tdmi_adapter;
+  tvhpoll_event_t ev;
 
   if(tdt->tdt_fd == -1) {
     TAILQ_REMOVE(&tdmi->tdmi_table_queue, tdt, tdt_pending_link);
   } else {
-    epoll_ctl(tda->tda_table_epollfd, EPOLL_CTL_DEL, tdt->tdt_fd, NULL);
+    ev.fd = tdt->tdt_fd;
+    tvhpoll_rem(tda->tda_table_pd, &ev, 1);
     close(tdt->tdt_fd);
   }
 }
@@ -253,7 +255,7 @@ dvb_input_filtered_setup(th_dvb_adapter_t *tda)
   tda->tda_close_table   = close_table;
 
   pthread_t ptid;
-  tda->tda_table_epollfd = epoll_create(50);
+  tda->tda_table_pd = tvhpoll_create(50);
   pthread_create(&ptid, NULL, dvb_table_input, tda);
 }
 
index 7ca96f6d1f761216a37079077a5387de5aa68a19..1b232e81fc040531099fbee6d33eb46c00897a81 100644 (file)
@@ -21,7 +21,6 @@
 #include <sys/socket.h>
 #include <sys/types.h>
 #include <sys/ioctl.h>
-#include <sys/epoll.h>
 #include <fcntl.h>
 #include <assert.h>
 
@@ -40,6 +39,7 @@
 #include "tsdemux.h"
 #include "psi.h"
 #include "settings.h"
+#include "tvhpoll.h"
 
 #if defined(PLATFORM_LINUX)
 #include <linux/netdevice.h>
@@ -52,8 +52,8 @@
 #  endif
 #endif
 
-static int iptv_thread_running;
-static int iptv_epollfd;
+static int             iptv_thread_running;
+static tvhpoll_t      *iptv_poll;
 static pthread_mutex_t iptv_recvmutex;
 
 struct service_list iptv_all_services; /* All IPTV services */
@@ -137,11 +137,11 @@ iptv_thread(void *aux)
 {
   int nfds, fd, r, j, hlen;
   uint8_t tsb[65536], *buf;
-  struct epoll_event ev;
+  tvhpoll_event_t ev;
   service_t *t;
 
   while(1) {
-    nfds = epoll_wait(iptv_epollfd, &ev, 1, -1);
+    nfds = tvhpoll_wait(iptv_poll, &ev, 1, -1);
     if(nfds == -1) {
       tvhlog(LOG_ERR, "IPTV", "epoll() error -- %s, sleeping 1 second",
             strerror(errno));
@@ -220,13 +220,13 @@ iptv_service_start(service_t *t, unsigned int weight, int force_start)
   struct sockaddr_in sin;
   struct sockaddr_in6 sin6;
   struct ifreq ifr;
-  struct epoll_event ev;
+  tvhpoll_event_t ev;
 
   assert(t->s_iptv_fd == -1);
 
   if(iptv_thread_running == 0) {
     iptv_thread_running = 1;
-    iptv_epollfd = epoll_create(10);
+    iptv_poll = tvhpoll_create(10);
     pthread_create(&tid, NULL, iptv_thread, NULL);
   }
 
@@ -345,9 +345,10 @@ iptv_service_start(service_t *t, unsigned int weight, int force_start)
           resize, strerror(errno));
 
   memset(&ev, 0, sizeof(ev));
-  ev.events = EPOLLIN;
+  ev.events  = TVHPOLL_IN;
+  ev.fd      = fd;
   ev.data.fd = fd;
-  if(epoll_ctl(iptv_epollfd, EPOLL_CTL_ADD, fd, &ev) == -1) {
+  if(tvhpoll_add(iptv_poll, &ev, 1) == -1) {
     tvhlog(LOG_ERR, "IPTV", "\"%s\" cannot add to epoll set -- %s", 
           t->s_identifier, strerror(errno));
     close(fd);
@@ -451,6 +452,7 @@ iptv_service_stop(service_t *t)
 #endif
   }
   close(t->s_iptv_fd); // Automatically removes fd from epoll set
+  // TODO: this is an issue
 
   t->s_iptv_fd = -1;
 }
index fc134e4f38966508f6643a01c15fe7a448bdbb4a..a176bdacda3f24779061a1b5c76f6c7b4e44679d 100644 (file)
--- a/src/tcp.c
+++ b/src/tcp.c
@@ -18,7 +18,6 @@
 
 #include <pthread.h>
 #include <netdb.h>
-#include <sys/epoll.h>
 #include <poll.h>
 #include <assert.h>
 #include <stdio.h>
@@ -34,6 +33,7 @@
 
 #include "tcp.h"
 #include "tvheadend.h"
+#include "tvhpoll.h"
 
 int tcp_preferred_address_family = AF_INET;
 
@@ -144,16 +144,16 @@ tcp_connect(const char *hostname, int port, char *errbuf, size_t errbufsize,
 
       r = poll(&pfd, 1, timeout * 1000);
       if(r == 0) {
-       /* Timeout */
-       snprintf(errbuf, errbufsize, "Connection attempt timed out");
-       close(fd);
-       return -1;
+             /* Timeout */
+       snprintf(errbuf, errbufsize, "Connection attempt timed out");
+       close(fd);
+       return -1;
       }
       
       if(r == -1) {
-       snprintf(errbuf, errbufsize, "poll() error: %s", strerror(errno));
-       close(fd);
-       return -1;
+       snprintf(errbuf, errbufsize, "poll() error: %s", strerror(errno));
+       close(fd);
+       return -1;
       }
 
       getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);
@@ -331,7 +331,7 @@ tcp_read_timeout(int fd, void *buf, size_t len, int timeout)
     x = recv(fd, buf + tot, len - tot, MSG_DONTWAIT);
     if(x == -1) {
       if(errno == EAGAIN)
-       continue;
+             continue;
       return errno;
     }
 
@@ -373,7 +373,7 @@ tcp_get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen)
 /**
  *
  */
-static int tcp_server_epoll_fd;
+static tvhpoll_t *tcp_server_poll;
 
 typedef struct tcp_server {
   tcp_server_callback_t *start;
@@ -438,8 +438,8 @@ tcp_server_start(void *aux)
 static void *
 tcp_server_loop(void *aux)
 {
-  int r, i;
-  struct epoll_event ev[1];
+  int r;
+  tvhpoll_event_t ev;
   tcp_server_t *ts;
   tcp_server_launch_t *tsl;
   pthread_attr_t attr;
@@ -450,46 +450,45 @@ tcp_server_loop(void *aux)
   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 
   while(1) {
-    r = epoll_wait(tcp_server_epoll_fd, ev, sizeof(ev) / sizeof(ev[0]), -1);
+    r = tvhpoll_wait(tcp_server_poll, &ev, 1, -1);
     if(r == -1) {
-      perror("tcp_server: epoll_wait");
+      perror("tcp_server: tchpoll_wait");
       continue;
     }
 
-    for(i = 0; i < r; i++) {
-      ts = ev[i].data.ptr;
+    if (r == 0) continue;
 
-      if(ev[i].events & EPOLLHUP) {
-       close(ts->serverfd);
-       free(ts);
-       continue;
-      }
+    ts = ev.data.ptr;
 
-      if(ev[i].events & EPOLLIN) {
-       tsl = malloc(sizeof(tcp_server_launch_t));
-       tsl->start  = ts->start;
-       tsl->opaque = ts->opaque;
-       slen = sizeof(struct sockaddr_storage);
-
-       tsl->fd = accept(ts->serverfd, 
-                        (struct sockaddr *)&tsl->peer, &slen);
-       if(tsl->fd == -1) {
-         perror("accept");
-         free(tsl);
-         sleep(1);
-         continue;
-       }
-
-
-       slen = sizeof(struct sockaddr_storage);
-       if(getsockname(tsl->fd, (struct sockaddr *)&tsl->self, &slen)) {
-           close(tsl->fd);
-           free(tsl);
-           continue;
-       }
-
-       pthread_create(&tid, &attr, tcp_server_start, tsl);
-      }
+    if(ev.events & TVHPOLL_HUP) {
+           close(ts->serverfd);
+       free(ts);
+      continue;
+    } 
+
+    if(ev.events & TVHPOLL_IN) {
+           tsl = malloc(sizeof(tcp_server_launch_t));
+      tsl->start  = ts->start;
+      tsl->opaque = ts->opaque;
+      slen = sizeof(struct sockaddr_storage);
+
+      tsl->fd = accept(ts->serverfd, 
+                                        (struct sockaddr *)&tsl->peer, &slen);
+       if(tsl->fd == -1) {
+         perror("accept");
+         free(tsl);
+         sleep(1);
+         continue;
+       }
+
+       slen = sizeof(struct sockaddr_storage);
+           if(getsockname(tsl->fd, (struct sockaddr *)&tsl->self, &slen)) {
+        close(tsl->fd);
+        free(tsl);
+        continue;
+       }
+
+       pthread_create(&tid, &attr, tcp_server_start, tsl);
     }
   }
   return NULL;
@@ -502,14 +501,14 @@ void *
 tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start, void *opaque)
 {
   int fd, x;
-  struct epoll_event e;
+  tvhpoll_event_t ev;
   tcp_server_t *ts;
   struct addrinfo hints, *res, *ressave, *use = NULL;
   char *portBuf = (char*)malloc(6);
   int one = 1;
   int zero = 0;
 
-  memset(&e, 0, sizeof(e));
+  memset(&ev, 0, sizeof(ev));
 
   snprintf(portBuf, 6, "%d", port);
 
@@ -570,9 +569,10 @@ tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start,
   ts->start = start;
   ts->opaque = opaque;
 
-  e.events = EPOLLIN;
-  e.data.ptr = ts;
-  epoll_ctl(tcp_server_epoll_fd, EPOLL_CTL_ADD, fd, &e);
+  ev.fd       = fd;
+  ev.events   = TVHPOLL_IN;
+  ev.data.ptr = ts;
+  tvhpoll_add(tcp_server_poll, &ev, 1);
 
   return ts;
 }
@@ -588,7 +588,7 @@ tcp_server_init(int opt_ipv6)
   if(opt_ipv6)
     tcp_preferred_address_family = AF_INET6;
 
-  tcp_server_epoll_fd = epoll_create(10);
+  tcp_server_poll = tvhpoll_create(10);
   pthread_create(&tid, NULL, tcp_server_loop, NULL);
 }
 
index e9b3ca5391a1bfefc0cae88e23357abb38116bc6..3eac00a6f25e7f507297b35ef3f578f0b9c46a9c 100644 (file)
@@ -21,6 +21,7 @@
 #include "timeshift.h"
 #include "timeshift/private.h"
 #include "atomic.h"
+#include "tvhpoll.h"
 
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -399,7 +400,7 @@ static int _timeshift_flush_to_live
 void *timeshift_reader ( void *p )
 {
   timeshift_t *ts = p;
-  int efd, nfds, end, fd = -1, run = 1, wait = -1;
+  int nfds, end, fd = -1, run = 1, wait = -1;
   timeshift_file_t *cur_file = NULL;
   off_t cur_off = 0;
   int cur_speed = 100, keyframe_mode = 0;
@@ -409,13 +410,13 @@ void *timeshift_reader ( void *p )
   timeshift_index_iframe_t *tsi = NULL;
   streaming_skip_t *skip = NULL;
   time_t last_status = 0;
+  tvhpoll_t *pd;
+  tvhpoll_event_t ev = { 0 };
 
-  /* Poll */
-  struct epoll_event ev = { 0 };
-  efd        = epoll_create(1);
-  ev.events  = EPOLLIN;
-  ev.data.fd = ts->rd_pipe.rd;
-  epoll_ctl(efd, EPOLL_CTL_ADD, ev.data.fd, &ev);
+  pd = tvhpoll_create(1);
+  ev.fd     = ts->rd_pipe.rd;
+  ev.events = TVHPOLL_IN;
+  tvhpoll_add(pd, &ev, 1);
 
   /* Output */
   while (run) {
@@ -427,7 +428,7 @@ void *timeshift_reader ( void *p )
 
     /* Wait for data */
     if(wait)
-      nfds = epoll_wait(efd, &ev, 1, wait);
+      nfds = tvhpoll_wait(pd, &ev, 1, wait);
     else
       nfds = 0;
     wait      = -1;
@@ -438,7 +439,7 @@ void *timeshift_reader ( void *p )
     /* Control */
     pthread_mutex_lock(&ts->state_mutex);
     if (nfds == 1) {
-      if (_read_msg(ev.data.fd, &ctrl) > 0) {
+      if (_read_msg(ts->rd_pipe.rd, &ctrl) > 0) {
 
         /* Exit */
         if (ctrl->sm_type == SMT_EXIT) {
@@ -802,6 +803,7 @@ void *timeshift_reader ( void *p )
   }
 
   /* Cleanup */
+  tvhpoll_destroy(pd);
   if (fd != -1) close(fd);
   if (sm)       streaming_msg_free(sm);
   if (ctrl)     streaming_msg_free(ctrl);