]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
cclient: remove writer thread, use poll
authorJaroslav Kysela <perex@perex.cz>
Fri, 29 Dec 2017 19:06:25 +0000 (20:06 +0100)
committerJaroslav Kysela <perex@perex.cz>
Thu, 4 Jan 2018 14:03:22 +0000 (15:03 +0100)
src/descrambler/cccam.c
src/descrambler/cclient.c
src/descrambler/cclient.h
src/descrambler/cwc.c
src/tvheadend.h
src/wrappers.c

index 1a87bdd9f0b847a9d002ecbb978046b8d88a532e..e17c287f3b2dd91ff0e60e5079577a7b1c9f286f 100644 (file)
@@ -363,34 +363,23 @@ cccam_running_reply(cccam_t *cccam, uint8_t *buf, int len)
  *
  */
 static int
-cccam_read_message(cccam_t *cccam, const char *state, uint8_t *buf, int len, int timeout)
+cccam_read_message0(cccam_t *cccam, const char *state, sbuf_t *rbuf, int timeout)
 {
-  int32_t ret;
   uint16_t msglen;
+  uint8_t hdr[4];
+  struct cccam_crypt_block block;
 
-  pthread_mutex_unlock(&cccam->cc_mutex);
-  ret = tcp_read_timeout(cccam->cc_fd, buf, 4, timeout);
-  pthread_mutex_lock(&cccam->cc_mutex);
-  if (ret) {
-    tvhdebug(cccam->cc_subsys, "%s: recv error %d or timeout",
-             cccam->cc_name, ret);
-    return -1;
-  }
-  cccam_decrypt(&cccam->recvblock, buf, 4);
-  msglen = (buf[2] << 8) | buf[3];
-  if (msglen > 0) {
-    if (msglen > len - 2) {
-      tvhdebug(cccam->cc_subsys, "%s: received message too large", cccam->cc_name);
-      return -1;
-    }
-    pthread_mutex_unlock(&cccam->cc_mutex);
-    ret = tcp_read_timeout(cccam->cc_fd, buf + 4, msglen, 5000);
-    pthread_mutex_lock(&cccam->cc_mutex);
-    if (ret) {
-      tvhdebug(cccam->cc_subsys, "%s: timeout reading message", cccam->cc_name);
-      return -1;
-    }
-    cccam_decrypt(&cccam->recvblock, buf + 4, msglen);
+  if (rbuf->sb_ptr < 4)
+    return 0;
+  block = cccam->recvblock;
+  memcpy(hdr, rbuf->sb_data, 4);
+  cccam_decrypt(&cccam->recvblock, hdr, 4);
+  msglen = (hdr[2] << 8) | hdr[3];
+  if (rbuf->sb_ptr >= msglen + 4) {
+    memcpy(rbuf->sb_data, hdr, 4);
+    cccam_decrypt(&cccam->recvblock, rbuf->sb_data + 4, msglen);
+  } else {
+    cccam->recvblock = block;
   }
   return msglen + 4;
 }
@@ -674,15 +663,21 @@ cccam_send_emm(void *cc, cc_service_t *ct, cc_card_data_t *pcard,
  *
  */
 static int
-cccam_read(void *cc)
+cccam_read(void *cc, sbuf_t *rbuf)
 {
   cccam_t *cccam = cc;
-  uint8_t buf[CCCAM_NETMSGSIZE];
   const int ka_interval = cccam->cc_keepalive_interval * 2 * 1000;
-  int r = cccam_read_message(cccam, "Decoderloop", buf, sizeof(buf), ka_interval);
+  int r = cccam_read_message0(cccam, "Decoderloop", rbuf, ka_interval);
   if (r < 0)
     return -1;
-  return cccam_running_reply(cccam, buf, r);
+  if (r > 0) {
+    int ret = cccam_running_reply(cccam, rbuf->sb_data, r);
+    if (ret > 0)
+      sbuf_cut(rbuf, r);
+    if (ret < 0)
+      return -1;
+  }
+  return 0;
 }
 
 /**
index 21822aeac061bc90e37626e56f01f83d572f6b7a..3ce2c2bdd4b936fdde298f14de22a3e0dae84ab1 100644 (file)
  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
+#include <fcntl.h>
 #include <signal.h>
 #include <pthread.h>
 #include "tvheadend.h"
 #include "tcp.h"
 #include "cclient.h"
+#include "tvhpoll.h"
 
 static void cc_service_pid_free(cc_service_t *ct);
 
@@ -331,7 +333,7 @@ cc_read(cclient_t *cc, void *buf, size_t len, int timeout)
     tvhwarn(cc->cc_subsys, "%s: read error %d (%s)",
             cc->cc_name, r, strerror(r));
 
-  if(cc_must_break(cc))
+  if (cc_must_break(cc))
     return ECONNABORTED;
 
   return r;
@@ -348,10 +350,14 @@ cc_write_message(cclient_t *cc, cc_message_t *msg, int enq)
   tvhlog_hexdump(cc->cc_subsys, msg->cm_data, msg->cm_len);
 
   if (enq) {
-    pthread_mutex_lock(&cc->cc_writer_mutex);
-    TAILQ_INSERT_TAIL(&cc->cc_writeq, msg, cm_link);
-    tvh_cond_signal(&cc->cc_writer_cond, 0);
-    pthread_mutex_unlock(&cc->cc_writer_mutex);
+    pthread_mutex_lock(&cc->cc_mutex);
+    if (cc->cc_write_running) {
+      TAILQ_INSERT_TAIL(&cc->cc_writeq, msg, cm_link);
+      tvh_nonblock_write(cc->cc_pipe.wr, "w", 1);
+    } else {
+      free(msg);
+    }
+    pthread_mutex_unlock(&cc->cc_mutex);
   } else {
     if (tvh_write(cc->cc_fd, msg->cm_data, msg->cm_len))
       tvhinfo(cc->cc_subsys, "%s: write error %s",
@@ -363,60 +369,18 @@ cc_write_message(cclient_t *cc, cc_message_t *msg, int enq)
 /**
  *
  */
-static void *
-cc_writer_thread(void *aux)
+static void
+cc_session(cclient_t *cc)
 {
-  cclient_t *cc = aux;
+  tvhpoll_t *poll;
+  tvhpoll_event_t ev;
+  char buf[16];
+  sbuf_t rbuf;
   cc_message_t *cm;
+  ssize_t len;
   int64_t mono;
   int r;
 
-  pthread_mutex_lock(&cc->cc_writer_mutex);
-
-  while(cc->cc_writer_running) {
-
-    if((cm = TAILQ_FIRST(&cc->cc_writeq)) != NULL) {
-      TAILQ_REMOVE(&cc->cc_writeq, cm, cm_link);
-      pthread_mutex_unlock(&cc->cc_writer_mutex);
-      //      int64_t ts = getfastmonoclock();
-      if (tvh_write(cc->cc_fd, cm->cm_data, cm->cm_len))
-        tvhinfo(cc->cc_subsys, "%s: write error %s",
-                cc->cc_name, strerror(errno));
-      //      printf("Write took %lld usec\n", getfastmonoclock() - ts);
-      free(cm);
-      pthread_mutex_lock(&cc->cc_writer_mutex);
-      continue;
-    }
-
-
-    /* If nothing is to be sent in keepalive interval seconds we
-       need to send a keepalive */
-    mono = mclk() + sec2mono(cc->cc_keepalive_interval);
-    do {
-      r = tvh_cond_timedwait(&cc->cc_writer_cond, &cc->cc_writer_mutex, mono);
-      if(r == ETIMEDOUT) {
-        if (cc->cc_keepalive)
-          cc->cc_keepalive(cc);
-        break;
-      }
-    } while (ERRNO_AGAIN(r));
-  }
-
-  pthread_mutex_unlock(&cc->cc_writer_mutex);
-  return NULL;
-}
-
-
-
-/**
- *
- */
-static void
-cc_session(cclient_t *cc)
-{
-  char buf[32];
-  pthread_t writer_thread_id;
-
   if (cc->cc_init_session(cc))
     return;
 
@@ -429,30 +393,46 @@ cc_session(cclient_t *cc)
   /**
    * We do all requests from now on in a separate thread
    */
-  cc->cc_writer_running = 1;
-  tvh_cond_init(&cc->cc_writer_cond);
-  pthread_mutex_init(&cc->cc_writer_mutex, NULL);
   TAILQ_INIT(&cc->cc_writeq);
-  snprintf(buf, sizeof(buf), "%s-writer", cc->cc_id);
-  tvhthread_create(&writer_thread_id, NULL, cc_writer_thread, cc, buf);
+  cc->cc_write_running = 1;
+  sbuf_init(&rbuf);
 
   /**
    * Mainloop
    */
-  while(!cc_must_break(cc)) {
-    if (cc->cc_read(cc))
-      break;
+  poll = tvhpoll_create(1);
+  tvhpoll_add1(poll, cc->cc_pipe.rd, TVHPOLL_IN, &cc->cc_pipe);
+  tvhpoll_add1(poll, cc->cc_fd, TVHPOLL_IN, &cc->cc_fd);
+  mono = mclk() + sec2mono(cc->cc_keepalive_interval);
+  while (!cc_must_break(cc)) {
+    pthread_mutex_unlock(&cc->cc_mutex);
+    r = tvhpoll_wait(poll, &ev, 1, 1000);
+    pthread_mutex_lock(&cc->cc_mutex);
+    if (r < 0 && ERRNO_AGAIN(errno))
+      continue;
+    if (ev.data.ptr == &cc->cc_pipe)
+      read(cc->cc_pipe.rd, buf, sizeof(buf));
+    else if (ev.data.ptr == &cc->cc_fd) {
+      len = sbuf_read(&rbuf, cc->cc_fd);
+      if (len > 0 && cc->cc_read(cc, &rbuf))
+        break;
+    } else {
+      abort();
+    }
+    if ((cm = TAILQ_FIRST(&cc->cc_writeq)) != NULL) {
+      if (tvh_nonblock_write(cc->cc_fd, cm->cm_data, cm->cm_len))
+        break;
+    }
+    if (mono < mclk()) {
+      mono = mclk();
+      if (cc->cc_keepalive)
+        cc->cc_keepalive(cc);
+    }
   }
   tvhdebug(cc->cc_subsys, "%s: session exiting", cc->cc_name);
-
-  /**
-   * Collect the writer thread
-   */
+  tvhpoll_destroy(poll);
+  sbuf_free(&rbuf);
   shutdown(cc->cc_fd, SHUT_RDWR);
-  cc->cc_writer_running = 0;
-  tvh_cond_signal(&cc->cc_writer_cond, 0);
-  pthread_join(writer_thread_id, NULL);
-  tvhdebug(cc->cc_subsys, "%s: Write thread joined", cc->cc_name);
 }
 
 /**
@@ -622,7 +602,7 @@ cc_emm(void *opaque, int pid, const uint8_t *data, int len, int emm)
   cc = pcard->cs_client;
   pthread_mutex_lock(&cc->cc_mutex);
   mux = pcard->cs_mux;
-  if (pcard->cs_running && cc->cc_forward_emm && cc->cc_writer_running) {
+  if (pcard->cs_running && cc->cc_forward_emm && cc->cc_write_running) {
     if (cc->cc_emmex) {
       if (cc->cc_emm_mux && cc->cc_emm_mux != mux) {
         if (cc->cc_emm_update_time + sec2mono(25) > mclk())
@@ -1041,6 +1021,7 @@ cc_conf_changed(caclient_t *cac)
     }
     if (!cc->cc_running) {
       cc->cc_running = 1;
+      tvh_pipe(O_NONBLOCK, &cc->cc_pipe);
       tvhthread_create(&cc->cc_tid, NULL, cc_thread, cc, "cc");
       return;
     }
@@ -1060,8 +1041,10 @@ cc_conf_changed(caclient_t *cac)
     if (cc->cc_fd >= 0)
       shutdown(cc->cc_fd, SHUT_RDWR);
     pthread_mutex_unlock(&cc->cc_mutex);
+    tvh_write(cc->cc_pipe.wr, "q", 1);
     pthread_kill(tid, SIGHUP);
     pthread_join(tid, NULL);
+    tvh_pipe_close(&cc->cc_pipe);
     caclient_set_status(cac, CACLIENT_STATUS_NONE);
     pthread_mutex_lock(&cc->cc_mutex);
     free(cc->cc_name);
index 89f23724875bc0db6bd0cd13f4e0a20d9959602f..b7144c86a344deafac37c8cf48781ada271de962 100644 (file)
@@ -17,6 +17,7 @@
  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
+#include "tvheadend.h"
 #include "caclient.h"
 #include "descrambler.h"
 #include "emm_reass.h"
@@ -142,7 +143,7 @@ typedef struct cclient {
   /* Callbacks */
   void *(*cc_alloc_service)(void *cc);
   int (*cc_init_session)(void *cc);
-  int (*cc_read)(void *cc);
+  int (*cc_read)(void *cc, sbuf_t *sbuf);
   void (*cc_keepalive)(void *cc);
   uint32_t (*cc_send_ecm)(void *cc, cc_service_t *ct, cc_ecm_section_t *es,
                           cc_card_data_t *pcard, const uint8_t *data, int len);
@@ -158,6 +159,7 @@ typedef struct cclient {
   pthread_t cc_tid;
   tvh_cond_t cc_cond;
   pthread_mutex_t cc_mutex;
+  th_pipe_t cc_pipe;
 
   /* Database */
   LIST_HEAD(, cc_service) cc_services;
@@ -166,9 +168,7 @@ typedef struct cclient {
   /* Writer */
   int cc_seq;
   TAILQ_HEAD(, cc_message) cc_writeq;
-  pthread_mutex_t cc_writer_mutex;
-  tvh_cond_t cc_writer_cond;
-  uint8_t cc_writer_running;
+  uint8_t cc_write_running;
 
   /* Emm forwarding */
   int cc_forward_emm;
index b1d3dbbd4e94a5a07bdc1349ba0a430a468be886..7ffecba3c559ca72e36cda02e5381fa4c19f118a 100644 (file)
@@ -493,6 +493,31 @@ cwc_running_reply(cwc_t *cwc, uint8_t msgtype, uint8_t *msg, int len)
   return 0;
 }
 
+/**
+ *
+ */
+static int
+cwc_read_message0
+  (cwc_t *cwc, const char *state, sbuf_t *rbuf, int timeout)
+{
+  int msglen;
+
+  if (rbuf->sb_ptr < 2)
+    return 0;
+
+  msglen = (rbuf->sb_data[0] << 8) | rbuf->sb_data[1];
+  if(rbuf->sb_ptr < 2 + msglen)
+    return 0;
+
+  if((msglen = des_decrypt(rbuf->sb_data, msglen + 2, cwc)) < 15) {
+    tvhinfo(cwc->cc_subsys, "%s: %s: Decrypt failed",
+            cwc->cc_name, state);
+    return -1;
+  }
+
+  return msglen;
+}
+
 /**
  *
  */
@@ -502,7 +527,7 @@ cwc_read_message
 {
   int msglen, r;
 
-  if((r = cc_read((cclient_t *)cwc, buf, 2, timeout))) {
+  if ((r = cc_read((cclient_t *)cwc, buf, 2, timeout))) {
     if (tvheadend_is_running())
       tvhinfo(cwc->cc_subsys, "%s: %s: Read error (header): %s",
               cwc->cc_name, state, strerror(r));
@@ -510,7 +535,7 @@ cwc_read_message
   }
 
   msglen = (buf[0] << 8) | buf[1];
-  if(msglen > len) {
+  if (msglen > len) {
     if (tvheadend_is_running())
       tvhinfo(cwc->cc_subsys, "%s: %s: Invalid message size: %d",
               cwc->cc_name, state, msglen);
@@ -520,14 +545,14 @@ cwc_read_message
   /* We expect the rest of the message to arrive fairly quick,
      so just wait 1 second here */
 
-  if((r = cc_read((cclient_t *)cwc, buf + 2, msglen, 1000))) {
+  if ((r = cc_read((cclient_t *)cwc, buf + 2, msglen, 1000))) {
     if (tvheadend_is_running())
       tvhinfo(cwc->cc_subsys, "%s: %s: Read error: %s",
               cwc->cc_name, state, strerror(r));
     return -1;
   }
 
-  if((msglen = des_decrypt(buf, msglen + 2, cwc)) < 15) {
+  if ((msglen = des_decrypt(buf, msglen + 2, cwc)) < 15) {
     tvhinfo(cwc->cc_subsys, "%s: %s: Decrypt failed",
             cwc->cc_name, state);
     return -1;
@@ -563,10 +588,10 @@ cwc_init_session(void *cc)
   if (cwc_send_login(cwc))
     return -1;
   
-  if(cwc_read_message(cwc, "Wait login response", buf, sizeof(buf), 5000) < 0)
+  if (cwc_read_message(cwc, "Wait login response", buf, sizeof(buf), 5000) < 0)
     return -1;
 
-  if(buf[12] != MSG_CLIENT_2_SERVER_LOGIN_ACK) {
+  if (buf[12] != MSG_CLIENT_2_SERVER_LOGIN_ACK) {
     tvhinfo(cwc->cc_subsys, "%s: Login failed", cwc->cc_name);
     return -1;
   }
@@ -577,7 +602,7 @@ cwc_init_session(void *cc)
    * Request card data
    */
   cwc_send_data_req(cwc);
-  if((r = cwc_read_message(cwc, "Request card data", buf, sizeof(buf), 5000)) < 0)
+  if ((r = cwc_read_message(cwc, "Request card data", buf, sizeof(buf), 5000)) < 0)
     return -1;
 
   if (buf[12] != MSG_CARD_DATA) {
@@ -594,17 +619,24 @@ cwc_init_session(void *cc)
  *
  */
 static int
-cwc_read(void *cc)
+cwc_read(void *cc, sbuf_t *rbuf)
 {
   cwc_t *cwc = cc;
-  uint8_t buf[CWS_NETMSGSIZE];
   const int ka_interval = cwc->cc_keepalive_interval * 2 * 1000;
-  int r = cwc_read_message(cwc, "Decoderloop", buf, sizeof(buf), ka_interval);
+  int r = cwc_read_message0(cwc, "DecoderLoop", rbuf, ka_interval);
   if (r < 0)
     return -1;
-  if (r > 12)
-    return cwc_running_reply(cwc, buf[12], buf, r);
-  return -1;
+  if (r > 12) {
+    int ret = cwc_running_reply(cwc, rbuf->sb_data[12], rbuf->sb_data, r);
+    if (ret > 0)
+      sbuf_cut(rbuf, r);
+    if (ret < 0)
+      return -1;
+    return 0;
+  }
+  if (r > 0)
+    return -1;
+  return 0;
 }
 
 /**
index 3f1f6909727e796ade691f3a41c978d2623801d4..8bd94ef382bfd0ef280c26516c15aa371f000d31 100644 (file)
@@ -733,6 +733,8 @@ void tvh_pipe_close(th_pipe_t *pipe);
 
 int tvh_write(int fd, const void *buf, size_t len);
 
+int tvh_nonblock_write(int fd, const void *buf, size_t len);
+
 FILE *tvh_fopen(const char *filename, const char *mode);
 
 void hexdump(const char *pfx, const uint8_t *data, int len);
index b20bac523af95317a49f7f383a028c1b8988c6d2..83a69e135d4a50f41118b5a32dfe0d4cb93a5c2c 100644 (file)
@@ -102,6 +102,25 @@ tvh_write(int fd, const void *buf, size_t len)
   return len ? 1 : 0;
 }
 
+int
+tvh_nonblock_write(int fd, const void *buf, size_t len)
+{
+  ssize_t c;
+
+  while (len) {
+    c = write(fd, buf, len);
+    if (c < 0) {
+      if (errno == EINTR)
+        continue;
+      break;
+    }
+    len -= c;
+    buf += c;
+  }
+
+  return len ? 1 : 0;
+}
+
 FILE *
 tvh_fopen(const char *filename, const char *mode)
 {