]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
capmt: recode - use polling and write queue
authorJaroslav Kysela <perex@perex.cz>
Thu, 5 Jun 2014 13:59:12 +0000 (15:59 +0200)
committerJaroslav Kysela <perex@perex.cz>
Tue, 10 Jun 2014 12:02:35 +0000 (14:02 +0200)
src/descrambler/capmt.c
src/tcp.c
src/tvheadend.h
src/utils.c

index 6a48ed91c9aaa94249e29332bb2ff0b3d3465850..e0ce57edefa07b6cbb2c8eb403823b76048264e1 100644 (file)
@@ -43,6 +43,7 @@
 #include "input.h"
 #include "service.h"
 #include "tcp.h"
+#include "tvhpoll.h"
 #include "capmt.h"
 
 #include "notify.h"
  * Linux compatible definitions
  */
 
-typedef struct ca_slot_info {
-  int num;               /* slot number */
-
-  int type;              /* CA interface this slot supports */
-#define CA_CI            1     /* CI high level interface */
-#define CA_CI_LINK       2     /* CI link layer level interface */
-#define CA_CI_PHYS       4     /* CI physical layer level interface */
-#define CA_DESCR         8     /* built-in descrambler */
-#define CA_SC          128     /* simple smart card interface */
-
-  unsigned int flags;
-#define CA_CI_MODULE_PRESENT 1 /* module (or card) inserted */
-#define CA_CI_MODULE_READY   2
-} ca_slot_info_t;
-
-
-/* descrambler types and info */
-
-typedef struct ca_descr_info {
-  unsigned int num;          /* number of available descramblers (keys) */
-  unsigned int type;         /* type of supported scrambling system */
-#define CA_ECD           1
-#define CA_NDS           2
-#define CA_DSS           4
-} ca_descr_info_t;
-
-typedef struct ca_caps {
-  unsigned int slot_num;     /* total number of CA card and module slots */
-  unsigned int slot_type;    /* OR of all supported types */
-  unsigned int descr_num;    /* total number of descrambler slots (keys) */
-  unsigned int descr_type;   /* OR of all supported types */
-} ca_caps_t;
-
-/* a message to/from a CI-CAM */
-typedef struct ca_msg {
-  unsigned int index;
-  unsigned int type;
-  unsigned int length;
-  unsigned char msg[256];
-} ca_msg_t;
-
-typedef struct ca_descr {
-  unsigned int index;
-  unsigned int parity; /* 0 == even, 1 == odd */
-  unsigned char cw[8];
-} ca_descr_t;
-
-typedef struct ca_pid {
-  unsigned int pid;
-  int index;           /* -1 == disable*/
-} ca_pid_t;
-
-#define CA_RESET          _IO('o', 128)
-#define CA_GET_CAP        _IOR('o', 129, ca_caps_t)
-#define CA_GET_SLOT_INFO  _IOR('o', 130, ca_slot_info_t)
-#define CA_GET_DESCR_INFO _IOR('o', 131, ca_descr_info_t)
-#define CA_GET_MSG        _IOR('o', 132, ca_msg_t)
-#define CA_SEND_MSG       _IOW('o', 133, ca_msg_t)
-#define CA_SET_DESCR      _IOW('o', 134, ca_descr_t)
-#define CA_SET_PID        _IOW('o', 135, ca_pid_t)
-
 #define DMX_FILTER_SIZE 16
 
-typedef struct dmx_filter
-{
+typedef struct dmx_filter {
   uint8_t filter[DMX_FILTER_SIZE];
   uint8_t mask[DMX_FILTER_SIZE];
   uint8_t mode[DMX_FILTER_SIZE];
 } dmx_filter_t;
 
-typedef struct dmx_sct_filter_params
-{
+typedef struct dmx_sct_filter_params {
   uint16_t       pid;
   dmx_filter_t   filter;
   uint32_t       timeout;
@@ -145,8 +83,14 @@ typedef struct dmx_sct_filter_params
 #define DMX_KERNEL_CLIENT   0x8000
 } dmx_filter_params_t;
 
-#define DMX_STOP          _IO('o', 42)
-#define DMX_SET_FILTER    _IOW('o', 43, struct dmx_sct_filter_params)
+#define CA_SET_DESCR      0x40106f86
+#define CA_SET_DESCR_X    0x866f1040
+#define CA_SET_PID        0x40086f87
+#define CA_SET_PID_X      0x876f0840
+#define DMX_STOP          0x00006f2a
+#define DMX_STOP_X        0x6f2a0000
+#define DMX_SET_FILTER    0x403c6f2b
+#define DMX_SET_FILTER_X  0x2b6f3c40
 
 // ca_pmt_list_management values:
 #define CAPMT_LIST_MORE   0x00    // append a 'MORE' CAPMT object the list and start receiving the next object
@@ -265,6 +209,12 @@ typedef struct capmt_demuxes {
   capmt_filters_t filters[MAX_INDEX];
 } capmt_demuxes_t;
 
+typedef struct capmt_message {
+  TAILQ_ENTRY(capmt_message) cm_link;
+  int cm_sid;
+  sbuf_t cm_sb;
+} capmt_message_t;
+
 /**
  *
  */
@@ -279,7 +229,6 @@ typedef struct capmt {
 
   /* from capmt configuration */
   char *capmt_sockfile;
-  char *capmt_hostname;
   int   capmt_port;
   char *capmt_comment;
   char *capmt_id;
@@ -294,6 +243,7 @@ typedef struct capmt {
   /* capmt sockets */
   int   sids[MAX_SOCKETS];
   int   capmt_sock[MAX_SOCKETS];
+  int   capmt_sock_reconnect[MAX_SOCKETS];
   int   capmt_sock_ca0[MAX_CA];
 
   /* thread flags */
@@ -306,9 +256,13 @@ typedef struct capmt {
   uint16_t capmt_seq;
 
   /* runtime status */
+  tvhpoll_t      *capmt_poll;
+  th_pipe_t       capmt_pipe;
   mpegts_input_t *capmt_tuners[MAX_CA];
   capmt_demuxes_t capmt_demuxes;
-  ca_info_t capmt_ca_info[MAX_CA][MAX_INDEX];
+  ca_info_t       capmt_ca_info[MAX_CA][MAX_INDEX];
+  TAILQ_HEAD(, capmt_message) capmt_writeq;
+  pthread_mutex_t capmt_mutex;
 } capmt_t;
 
 static void capmt_enumerate_services(capmt_t *capmt, int force);
@@ -345,6 +299,32 @@ capmt_oscam_new(capmt_t *capmt)
          oscam != CAPMT_OSCAM_OLD;
 }
 
+static void
+capmt_poll_add(capmt_t *capmt, int fd, uint32_t u32)
+{
+  tvhpoll_event_t ev;
+
+  if (capmt->capmt_poll == NULL)
+    return;
+  memset(&ev, 0, sizeof(ev));
+  ev.events   = TVHPOLL_IN;
+  ev.fd       = fd;
+  ev.data.u32 = u32;
+  tvhpoll_add(capmt->capmt_poll, &ev, 1);
+}
+
+static void
+capmt_poll_rem(capmt_t *capmt, int fd)
+{
+  tvhpoll_event_t ev;
+
+  if (capmt->capmt_poll == NULL)
+    return;
+  memset(&ev, 0, sizeof(ev));
+  ev.fd       = fd;
+  tvhpoll_rem(capmt->capmt_poll, &ev, 1);
+}
+
 /**
  *
  */
@@ -361,12 +341,14 @@ capmt_set_connected(capmt_t *capmt, int c)
  *
  */
 static int
-capmt_connect(capmt_t *capmt)
+capmt_connect(capmt_t *capmt, int i)
 {
   int fd;
 
+  capmt->capmt_sock[i] = -1;
+
   if (!capmt->capmt_running)
-    return 0;
+    return -1;
 
   if (capmt->capmt_oscam == CAPMT_OSCAM_TCP) {
 
@@ -378,7 +360,7 @@ capmt_connect(capmt_t *capmt)
       tvhlog(LOG_ERR, "capmt",
              "Cannot connect to %s:%i (%s); Do you have OSCam running?",
              capmt->capmt_sockfile, capmt->capmt_port, errbuf);
-      fd = 0;
+      fd = -1;
     }
 
   } else {
@@ -398,26 +380,45 @@ capmt_connect(capmt_t *capmt)
              "Cannot connect to %s (%s); Do you have OSCam running?",
              capmt->capmt_sockfile, strerror(errno));
       close(fd);
-      fd = 0;
+      fd = -1;
     }
 
   }
 
-  if (fd)
-    tvhlog(LOG_DEBUG, "capmt", "created socket with socket_fd=%d", fd);
+  if (fd) {
+    tvhlog(LOG_DEBUG, "capmt", "Created socket %d", fd);
+    capmt->capmt_sock[i] = fd;
+    capmt->capmt_sock_reconnect[i]++;
+    capmt_poll_add(capmt, fd, i + 1);
+  }
 
   return fd;
 }
 
+/**
+ *
+ */
+static void
+capmt_socket_close(capmt_t *capmt, int i)
+{
+  int fd = capmt->capmt_sock[i];
+  if (fd < 0)
+    return;
+  capmt_poll_rem(capmt, fd);
+  close(fd);
+  capmt->capmt_sock[i] = -1;
+}
+
 /**
  *
  */
 static int
-capmt_send_msg(capmt_t *capmt, int sid, const uint8_t *buf, size_t len)
+capmt_write_msg(capmt_t *capmt, int sid, const uint8_t *buf, size_t len)
 {
+  int i = 0, found = 0, fd;
+  ssize_t res;
+
   if (capmt->capmt_oscam != CAPMT_OSCAM_SO_WRAPPER) {
-    int i = 0;
-    int found = 0;
     if (capmt->capmt_oscam == CAPMT_OSCAM_OLD) {
       // dumping current SID table
       for (i = 0; i < MAX_SOCKETS; i++)
@@ -455,40 +456,97 @@ capmt_send_msg(capmt_t *capmt, int sid, const uint8_t *buf, size_t len)
     }
 
     // check if the socket is still alive by writing 0 bytes
-    if (capmt->capmt_sock[i] > 0) {
-      if (write(capmt->capmt_sock[i], NULL, 0) < 0)
-        capmt->capmt_sock[i] = 0;
+    if (capmt->capmt_sock[i] >= 0) {
+      if (send(capmt->capmt_sock[i], NULL, 0, MSG_DONTWAIT) < 0)
+        capmt->capmt_sock[i] = -1;
       else if ((capmt->capmt_oscam == CAPMT_OSCAM_SO_WRAPPER ||
                 capmt->capmt_oscam == CAPMT_OSCAM_OLD) && found)
         return 0;
     }
 
     // opening socket and sending
-    if (capmt->capmt_sock[i] == 0) {
-      capmt->capmt_sock[i] = capmt_connect(capmt);
-      capmt_set_connected(capmt, capmt->capmt_sock[i] ? 2 : 0);
+    if (capmt->capmt_sock[i] < 0) {
+      fd = capmt_connect(capmt, i);
+      capmt_set_connected(capmt, fd ? 2 : 0);
     }
+  } else {  // standard old capmt mode
+    i = 0;
+  }
 
-    if (capmt->capmt_sock[i] > 0) {
-      if (tvh_write(capmt->capmt_sock[i], buf, len)) {
-        tvhlog(LOG_DEBUG, "capmt", "socket_fd=%d send failed", capmt->capmt_sock[i]);
-        close(capmt->capmt_sock[i]);
-        capmt->capmt_sock[i] = 0;
-        return -1;
-      }
+  fd = capmt->capmt_sock[i];
+
+  if (fd <= 0) {
+    tvhtrace("capmt", "Unable to send message for sid %i", sid);
+    return -1;
+  }
+
+  tvhtrace("capmt", "Sending message to socket %i (sid %i)", fd, sid);
+  tvhlog_hexdump("capmt", buf, len);
+
+  res = send(fd, buf, len, MSG_DONTWAIT);
+  if (res < len) {
+    tvhlog(LOG_DEBUG, "capmt", "Message send failed to socket %i (%zi)", fd, res);
+    if (capmt->capmt_oscam != CAPMT_OSCAM_SO_WRAPPER) {
+      capmt_socket_close(capmt, i);
+      return -1;
     }
   }
-  else  // standard old capmt mode
-    tvh_write(capmt->capmt_sock[0], buf, len);
+
   return 0;
 }
 
+/**
+ *
+ */
+static void
+capmt_queue_msg(capmt_t *capmt, int sid, const uint8_t *buf, size_t len)
+{
+  capmt_message_t *msg = malloc(sizeof(*msg));
+
+  sbuf_init_fixed(&msg->cm_sb, len);
+  sbuf_append(&msg->cm_sb, buf, len);
+  msg->cm_sid = sid;
+  pthread_mutex_lock(&capmt->capmt_mutex);
+  TAILQ_INSERT_TAIL(&capmt->capmt_writeq, msg, cm_link);
+  pthread_mutex_unlock(&capmt->capmt_mutex);
+  tvh_write(capmt->capmt_pipe.wr, "c", 1);
+}
+
+/**
+ *
+ */
+static void
+capmt_flush_queue(capmt_t *capmt, int del_only)
+{
+  capmt_message_t *msg;
+
+  while (1) {
+    pthread_mutex_lock(&capmt->capmt_mutex);
+    msg = TAILQ_FIRST(&capmt->capmt_writeq);
+    if (msg)
+      TAILQ_REMOVE(&capmt->capmt_writeq, msg, cm_link);
+    pthread_mutex_unlock(&capmt->capmt_mutex);
+    if (msg == NULL)
+      break;
+
+    if (!del_only)
+      capmt_write_msg(capmt, msg->cm_sid,
+                      msg->cm_sb.sb_data, msg->cm_sb.sb_ptr);
+    sbuf_free(&msg->cm_sb);
+    free(msg);
+  }
+}
+
+/**
+ *
+ */
 static void 
 capmt_send_stop(capmt_service_t *t)
 {
   mpegts_service_t *s = (mpegts_service_t *)t->td_service;
+  int oscam = t->ct_capmt->capmt_oscam;
 
-  if (t->ct_capmt->capmt_oscam != CAPMT_OSCAM_SO_WRAPPER) {
+  if (oscam == CAPMT_OSCAM_OLD) {
     int i;
     // searching for socket to close
     for (i = 0; i < MAX_SOCKETS; i++)
@@ -503,10 +561,8 @@ capmt_send_stop(capmt_service_t *t)
     // closing socket (oscam handle this as event and stop decrypting)
     tvhlog(LOG_DEBUG, "capmt", "%s: closing socket i=%d, socket_fd=%d", __FUNCTION__, i, t->ct_capmt->capmt_sock[i]);
     t->ct_capmt->sids[i] = 0;
-    if (t->ct_capmt->capmt_sock[i] > 0)
-      close(t->ct_capmt->capmt_sock[i]);
-    t->ct_capmt->capmt_sock[i] = 0;
-  } else {  // standard old capmt mode
+    capmt_socket_close(t->ct_capmt, i);
+  } else if (oscam == CAPMT_OSCAM_SO_WRAPPER) {  // standard old capmt mode
     /* buffer for capmt */
     int pos = 0;
     uint8_t buf[4094];
@@ -535,7 +591,7 @@ capmt_send_stop(capmt_service_t *t)
     buf[10] = ((pos - 5 - 12) & 0xF00) >> 8;
     buf[11] = ((pos - 5 - 12) & 0xFF);
   
-    capmt_send_msg(t->ct_capmt, s->s_dvb_service_id, buf, pos);
+    capmt_queue_msg(t->ct_capmt, s->s_dvb_service_id, buf, pos);
   }
 }
 
@@ -586,24 +642,28 @@ static void
 capmt_filter_data(capmt_t *capmt, int sid, uint8_t adapter, uint8_t demux_index,
                   uint8_t filter_index, const uint8_t *data, int len)
 {
-  uint8_t buf[4096 + 8];
+  uint8_t *buf = alloca(len + 6);
 
-  buf[0] = buf[1] = buf[3] = 0xff;
+  buf[0] = buf[1] = buf[2] = buf[3] = 0xff;
   buf[4] = demux_index;
   buf[5] = filter_index;
   memcpy(buf + 6, data, len);
   if (len - 3 == ((((uint16_t)buf[7] << 8) | buf[8]) & 0xfff))
-    capmt_send_msg(capmt, sid, buf, len + 6);
+    capmt_queue_msg(capmt, sid, buf, len + 6);
 }
 
 static void
-capmt_set_filter(capmt_t *capmt, uint8_t adapter, uint8_t *buf)
+capmt_set_filter(capmt_t *capmt, int adapter, sbuf_t *sb, int offset)
 {
-  uint8_t demux_index = buf[4];
-  uint8_t filter_index = buf[5];
-  dmx_filter_params_t *filter, *params = (dmx_filter_params_t *)(buf + 6);
+  uint8_t demux_index  = sbuf_peek_u8 (sb, offset + 4);
+  uint8_t filter_index = sbuf_peek_u8 (sb, offset + 5);
+  int     pid          = sbuf_peek_s32(sb, offset + 6);
+  dmx_filter_params_t *filter;
+  dmx_filter_params_t *params = (dmx_filter_params_t *)sbuf_peek(sb, offset + 6);
   capmt_filters_t *cf;
 
+  tvhtrace("capmt", "setting filter: adapter=%d, demux=%d, filter=%d, pid=%d",
+           adapter, demux_index, filter_index, pid);
   if (adapter >= MAX_CA ||
       demux_index >= MAX_INDEX ||
       filter_index >= MAX_FILTER)
@@ -613,7 +673,9 @@ capmt_set_filter(capmt_t *capmt, uint8_t adapter, uint8_t *buf)
     return;
   cf->adapter = adapter;
   filter = &cf->dmx[filter_index];
-  *filter = *params;
+  filter->pid = pid;
+  memcpy(&filter->filter, &params->filter, sizeof(params->filter));
+  filter->timeout = filter->flags = 0;
   if (capmt->capmt_demuxes.max <= demux_index)
     capmt->capmt_demuxes.max = demux_index + 1;
   if (cf->max <= filter_index)
@@ -626,14 +688,16 @@ capmt_set_filter(capmt_t *capmt, uint8_t adapter, uint8_t *buf)
 }
 
 static void
-capmt_stop_filter(capmt_t *capmt, uint8_t adapter, uint8_t *buf)
+capmt_stop_filter(capmt_t *capmt, int adapter, sbuf_t *sb, int offset)
 {
-  uint8_t demux_index = buf[4];
-  uint8_t filter_index = buf[5];
-  int16_t pid = ((int16_t)buf[6] << 8) | buf[7];
+  uint8_t demux_index  = sbuf_peek_u8   (sb, offset + 4);
+  uint8_t filter_index = sbuf_peek_u8   (sb, offset + 5);
+  int16_t pid          = sbuf_peek_s16le(sb, offset + 6);
   dmx_filter_params_t *filter;
   capmt_filters_t *cf;
 
+  tvhtrace("capmt", "stopping filter: adapter=%d, demux=%d, filter=%d, pid=%d",
+           adapter, demux_index, filter_index, pid);
   if (adapter >= MAX_CA ||
       demux_index >= MAX_INDEX ||
       filter_index >= MAX_FILTER)
@@ -669,220 +733,396 @@ capmt_notify_server(capmt_t *capmt, capmt_service_t *ct)
   }
 }
 
-static void 
-handle_ca0(capmt_t* capmt) {
-  capmt_service_t *ct;
+static void
+capmt_process_key(capmt_t *capmt, uint8_t adapter, uint16_t seq,
+                  const uint8_t *even, const uint8_t *odd,
+                  int ok)
+{
   mpegts_service_t *t;
-  int ret, bufsize;
-  unsigned int *request = NULL;
-  ca_descr_t *ca;
-  ca_pid_t *cpd;
-  int process_key, process_next, cai = 0;
-  int i, j;
-  int recvsock = 0;
-
-  if (capmt->capmt_oscam != CAPMT_OSCAM_SO_WRAPPER)
-    bufsize = sizeof(int) + sizeof(ca_descr_t);
-  else
-    bufsize = 18;
+  capmt_service_t *ct;
+  unsigned int i;
+
+  pthread_mutex_lock(&global_lock);
+  LIST_FOREACH(ct, &capmt->capmt_services, ct_link) {
+    t = (mpegts_service_t *)ct->td_service;
+
+    if (!ok) {
+      if (ct->td_keystate != DS_FORBIDDEN) {
+        tvhlog(LOG_ERR, "capmt",
+               "Can not descramble service \"%s\", access denied",
+               t->s_dvb_svcname);
+        ct->td_keystate = DS_FORBIDDEN;
+      }
+      continue;
+    }
 
-  uint8_t buffer[bufsize], *even, *odd;
-  uint16_t seq;
+    if (seq != ct->ct_seq)
+      continue;
+    if (adapter != ct->ct_adapter)
+      continue;
 
-  tvhlog(LOG_INFO, "capmt", "got connection from client ...");
+    for (i = 0; i < 8; i++)
+      if (even[i]) {
+        tvhcsa_set_key_even(&ct->ct_csa, even);
+        break;
+      }
+    for (i = 0; i < 8; i++)
+      if (odd[i]) {
+        tvhcsa_set_key_odd(&ct->ct_csa, odd);
+        break;
+      }
+
+    if (ct->td_keystate != DS_RESOLVED)
+      tvhlog(LOG_DEBUG, "capmt", "Obtained key for service \"%s\"",
+                                 t->s_dvb_svcname);
+
+    ct->td_keystate = DS_RESOLVED;
+  }
+  pthread_mutex_unlock(&global_lock);
+}
+
+static int
+capmt_msg_size(capmt_t *capmt, sbuf_t *sb, int offset)
+{
+  uint32_t cmd;
+
+  if (sb->sb_ptr - offset < 4)
+    return 0;
+  cmd = sbuf_peek_u32(sb, offset);
+  if (!sb->sb_bswap && !sb->sb_err) {
+    if (cmd == CA_SET_PID_X ||
+        cmd == CA_SET_DESCR_X ||
+        cmd == DMX_SET_FILTER_X ||
+        cmd == DMX_STOP_X) {
+      sb->sb_bswap = 1;
+      cmd = sbuf_peek_u32(sb, offset);
+    }
+  }
+  sb->sb_err = 1; /* "first seen" flag for the moment */
+  if (cmd == CA_SET_PID)
+    return 4 + 8;
+  else if (cmd == CA_SET_DESCR)
+    return 4 + 16;
+  else if (cmd == DMX_SET_FILTER)
+    return 4 + 2 + sizeof(dmx_filter_params_t);
+  else if (cmd == DMX_STOP)
+    return 4 + 4;
+  else {
+    sb->sb_err = 0;
+    return -1; /* fatal */
+  }
+}
+
+static void
+capmt_analyze_cmd(capmt_t *capmt, int adapter, sbuf_t *sb, int offset)
+{
+  uint32_t cmd;
+
+  cmd = sbuf_peek_u32(sb, offset);
+
+  if (cmd == CA_SET_PID) {
+
+    uint32_t pid   = sbuf_peek_u32(sb, offset + 4);
+    int32_t  index = sbuf_peek_s32(sb, offset + 8);
+    tvhlog(LOG_DEBUG, "capmt", "CA_SET_PID adapter %d index %d pid 0x%04x", adapter, index, pid);
+    if (adapter < MAX_CA && index >= 0 && index < MAX_INDEX) {
+      capmt->capmt_ca_info[adapter][index].pid = pid;
+    } else if (index < 0) {
+      memset(&capmt->capmt_ca_info[adapter], 0, sizeof(capmt->capmt_ca_info[adapter]));
+    } else
+      tvhlog(LOG_ERR, "capmt", "Invalid index %d in CA_SET_PID (%d) for adapter %d", index, MAX_INDEX, adapter);
+
+  } else if (cmd == CA_SET_DESCR) {
+
+    int32_t index  = sbuf_peek_s32(sb, offset + 4);
+    int32_t parity = sbuf_peek_s32(sb, offset + 8);
+    uint8_t *cw    = sbuf_peek    (sb, offset + 12);
+    ca_info_t *cai;
+
+    tvhlog(LOG_DEBUG, "capmt", "CA_SET_DESCR adapter %d par %d idx %d %02x%02x%02x%02x%02x%02x%02x%02x", adapter, parity, index, cw[0], cw[1], cw[2], cw[3], cw[4], cw[5], cw[6], cw[7]);
+    if (index == -1)   // skipping removal request
+      return;
+    if (adapter >= MAX_CA || index >= MAX_INDEX)
+      return;
+    cai = &capmt->capmt_ca_info[adapter][index];
+    if (parity == 0) {
+      memcpy(cai->even, cw, 8); // even key
+      capmt_process_key(capmt, adapter, cai->pid, cai->even, cai->odd, 1);
+    } else if (parity == 1) {
+      memcpy(cai->odd,  cw, 8); // odd  key
+      capmt_process_key(capmt, adapter, cai->pid, cai->even, cai->odd, 1);
+    } else
+      tvhlog(LOG_ERR, "capmt", "Invalid parity %d in CA_SET_DESCR for adapter%d", parity, adapter);
+
+  } else if (cmd == DMX_SET_FILTER) {
+
+    capmt_set_filter(capmt, adapter, sb, offset);
+
+  } else if (cmd == DMX_STOP) {
+
+    capmt_stop_filter(capmt, adapter, sb, offset);
+
+  }
+}
+
+static void
+show_connection(capmt_t *capmt, const char *what)
+{
+  if (capmt->capmt_oscam == CAPMT_OSCAM_TCP) {
+    tvhlog(LOG_INFO, "capmt",
+           "mode %i connected to %s:%i (%s)",
+           capmt->capmt_oscam,
+           capmt->capmt_sockfile, capmt->capmt_port,
+           what);
+  } else if (capmt->capmt_oscam == CAPMT_OSCAM_UNIX_SOCKET) {
+    tvhlog(LOG_INFO, "capmt",
+           "mode %i sockfile %s got connection from client (%s)",
+           capmt->capmt_oscam,
+           capmt->capmt_sockfile,
+           what);
+  } else {
+    tvhlog(LOG_INFO, "capmt",
+           "mode %i sockfile %s port %i got connection from client (%s)",
+           capmt->capmt_oscam,
+           capmt->capmt_sockfile, capmt->capmt_port,
+           what);
+  }
+}
+
+static void 
+handle_ca0(capmt_t *capmt) {
+  int i, ret, recvsock, adapter, nfds, cmd_size;
+  uint8_t buf[256];
+  sbuf_t buffer[MAX_CA];
+  sbuf_t *pbuf;
+  tvhpoll_event_t ev[MAX_CA + 1];
+
+  show_connection(capmt, "ca0");
+
+  for (i = 0; i < MAX_CA; i++)
+    sbuf_init(&buffer[i]);
 
   pthread_mutex_lock(&global_lock);
   capmt_notify_server(capmt, NULL);
   pthread_mutex_unlock(&global_lock);
 
+  capmt->capmt_poll = tvhpoll_create(MAX_CA + 1);
+  capmt_poll_add(capmt, capmt->capmt_pipe.rd, 0);
+  for (i = 0; i < MAX_CA; i++)
+    if (capmt->capmt_sock_ca0[i])
+      capmt_poll_add(capmt, capmt->capmt_sock_ca0[i], i + 1);
+
   i = 0;
+  adapter = -1;
+
   while (capmt->capmt_running) {
-    process_key = 0;
 
-    // receiving data from UDP socket
-    if (capmt->capmt_oscam == CAPMT_OSCAM_SO_WRAPPER) {
-      ret = recv(capmt->capmt_sock_ca0[0], buffer, bufsize, MSG_WAITALL);
+    nfds = tvhpoll_wait(capmt->capmt_poll, ev, MAX_CA + 1, 500);
 
-      if (ret < 0)
-        tvhlog(LOG_ERR, "capmt", "error receiving over socket");
-      else if (ret == 0) {
-        // normal socket shutdown
+    if (nfds <= 0)
+      continue;
+
+    for (i = 0; i < MAX_CA + 1; i++) {
+
+      if (ev[i].data.u32 == 0) {
+        ret = read(capmt->capmt_pipe.rd, buf, 1);
+        if (ret == 1 && buf[0] == 'c') {
+          capmt_flush_queue(capmt, 0);
+          continue;
+        }
+
+        tvhtrace("capmt", "thread received shutdown");
+        capmt->capmt_running = 0;
+        continue;
+      }
+
+      adapter = ev[i].data.u32;
+
+      if (adapter >= MAX_CA)
+        continue;
+
+      recvsock = capmt->capmt_sock_ca0[adapter];
+
+      if (recvsock <= 0)
+        continue;
+
+      ret = recv(recvsock, buf, sizeof(buf), MSG_DONTWAIT);
+
+      if (ret == 0) {
         tvhlog(LOG_INFO, "capmt", "normal socket shutdown");
-        break;
+
+        close(recvsock);
+        capmt_poll_rem(capmt, recvsock);
+        capmt->capmt_sock_ca0[adapter] = -1;
+        continue;
       }
-    } else {
-      process_next = 1;
-      if (capmt_oscam_new(capmt))
-        recvsock = capmt->capmt_sock[0];
-      else
-        recvsock = capmt->capmt_sock_ca0[i];
-      if (recvsock > 0) {
-        if (capmt_oscam_new(capmt))
-        {
-          // adapter index is in first byte
-          uint8_t adapter_index;
-          ret = recv(recvsock, &adapter_index, 1, MSG_DONTWAIT);
-          if (ret < 0)
-          {
-            usleep(10 * 1000);
-            continue;
-          }
-          cai = adapter_index;
-        }
-        request = NULL;
-        ret = recv(recvsock, buffer, capmt_oscam_new(capmt) ? sizeof(int) : bufsize, MSG_DONTWAIT);
-        if (ret > 0) {
-          request = (unsigned int *) &buffer;
-          if (!capmt_oscam_new(capmt))
-            process_next = 0;
-          else {
-            int ret = 0;
-            if (*request == CA_SET_PID) {            //receive CA_SET_PID
-              ret = recv(recvsock, buffer+sizeof(int), sizeof(ca_pid_t), MSG_DONTWAIT);
-              if (ret != sizeof(ca_pid_t))
-                *request = 0;
-            } else if (*request == CA_SET_DESCR) {   //receive CA_SET_DESCR
-              ret = recv(recvsock, buffer+sizeof(int), sizeof(ca_descr_t), MSG_DONTWAIT);
-              if (ret != sizeof(ca_descr_t))
-                *request = 0;
-            } else if (*request == DMX_SET_FILTER) { //receive DMX_SET_FILTER
-              ret = recv(recvsock, buffer+sizeof(int), 2 + sizeof(dmx_filter_params_t), MSG_DONTWAIT);
-              if (ret != 2 + sizeof(dmx_filter_params_t))
-                *request = 0;
-            } else if (*request == DMX_STOP) {       //receive DMX_STOP
-              ret = recv(recvsock, buffer+sizeof(int), 4, MSG_DONTWAIT);
-              if (ret != 4)
-                *request = 0;
-            }
-            if (ret > 0)
-              process_next = 0;
-          }
-        }
-        else if (ret == 0) {
-          // normal socket shutdown
-          tvhlog(LOG_INFO, "capmt", "normal socket shutdown");
-
-          // we are not connected any more - set services as unavailable
-          LIST_FOREACH(ct, &capmt->capmt_services, ct_link)
-            ct->td_keystate = DS_FORBIDDEN;
-
-          int still_left = 0;
-          if (!capmt_oscam_new(capmt)) {
-           close(capmt->capmt_sock_ca0[i]);
-           capmt->capmt_sock_ca0[i] = -1;
-
-            for (j = 0; j < MAX_CA; j++) {
-              if (capmt->capmt_sock_ca0[j] > 0) {
-                still_left = 1;
-                break;
-              }
-            }
-          }
-          if (!still_left) //all sockets closed
-            break;
-        }
-      } 
+      
+      if (ret < 0)
+        continue;
 
-      if (process_next) {
-        if (!capmt_oscam_new(capmt)) {
-          i++;
-          if (i >= MAX_CA)
-            i = 0;
+      tvhtrace("capmt", "Received message from socket %i", recvsock);
+      tvhlog_hexdump("capmt", buf, ret);
+
+      pbuf = &buffer[adapter];
+      sbuf_append(pbuf, buf, ret);
+
+      while (pbuf->sb_ptr > 0) {
+        cmd_size = 0;
+        while (pbuf->sb_ptr) {
+          cmd_size = capmt_msg_size(capmt, pbuf, 0);
+          if (cmd_size < 0)
+            sbuf_cut(pbuf, 1);
+        }
+        if (cmd_size <= pbuf->sb_ptr) {
+          capmt_analyze_cmd(capmt, adapter, pbuf, 0);
+          sbuf_cut(pbuf, cmd_size);
+        } else {
+          break;
         }
-        usleep(10 * 1000);
-        continue;
       }
+
     }
+  }
+
+  for (i = 0; i < MAX_CA; i++)
+    sbuf_free(&buffer[i]);
+  tvhpoll_destroy(capmt->capmt_poll);
+  capmt->capmt_poll = NULL;
+}
+
+static void
+handle_single(capmt_t *capmt)
+{
+  int ret, recvsock, adapter, nfds, cmd_size, reconnect;
+  uint8_t buf[256];
+  sbuf_t buffer;
+  tvhpoll_event_t ev;
+
+  show_connection(capmt, "single");
 
-    // parsing data
-    if (capmt->capmt_oscam) {
-      if (!request)
+  reconnect = capmt->capmt_sock_reconnect[0];
+  sbuf_init(&buffer);
+
+  pthread_mutex_lock(&global_lock);
+  capmt_notify_server(capmt, NULL);
+  pthread_mutex_unlock(&global_lock);
+
+  capmt->capmt_poll = tvhpoll_create(2);
+  capmt_poll_add(capmt, capmt->capmt_pipe.rd, 0);
+  capmt_poll_add(capmt, capmt->capmt_sock[0], 1);
+
+  while (capmt->capmt_running) {
+
+    tvhtrace("capmt", "poll");
+    nfds = tvhpoll_wait(capmt->capmt_poll, &ev, 1, 500);
+
+    if (nfds <= 0)
+      continue;
+
+    tvhtrace("capmt", "poll ok for %i", ev.data.u32);
+
+    if (ev.data.u32 == 0) {
+      ret = read(capmt->capmt_pipe.rd, buf, 1);
+      if (ret == 1 && buf[0] == 'c') {
+        capmt_flush_queue(capmt, 0);
         continue;
-      if (!capmt_oscam_new(capmt)) //in mode 2+ we read it directly from socket
-        cai = i;
-      if (*request == CA_SET_PID) {
-        cpd = (ca_pid_t *)&buffer[sizeof(int)];
-        tvhlog(LOG_DEBUG, "capmt", "CA_SET_PID cai %d req %d (%d %04x)", cai, *request, cpd->index, cpd->pid);
-
-        if (cai < MAX_CA && cpd->index >=0 && cpd->index < MAX_INDEX) {
-          capmt->capmt_ca_info[cai][cpd->index].pid = cpd->pid;
-        } else if (cpd->index == -1) {
-          memset(&capmt->capmt_ca_info[cai], 0, sizeof(capmt->capmt_ca_info[cai]));
-        } else
-          tvhlog(LOG_ERR, "capmt", "Invalid index %d in CA_SET_PID (%d) for ca id %d", cpd->index, MAX_INDEX, cai);
-      } else if (*request == CA_SET_DESCR) {
-        ca = (ca_descr_t *)&buffer[sizeof(int)];
-        tvhlog(LOG_DEBUG, "capmt", "CA_SET_DESCR cai %d req %d par %d idx %d %02x%02x%02x%02x%02x%02x%02x%02x", cai, *request, ca->parity, ca->index, ca->cw[0], ca->cw[1], ca->cw[2], ca->cw[3], ca->cw[4], ca->cw[5], ca->cw[6], ca->cw[7]);
-        if (ca->index == -1)   // skipping removal request
-          continue;
-        if (cai >= MAX_CA || ca->index >= MAX_INDEX)
-          continue;
-        if(ca->parity==0) {
-          memcpy(capmt->capmt_ca_info[cai][ca->index].even,ca->cw,8); // even key
-          process_key = 1;
-        } else if(ca->parity==1) {
-          memcpy(capmt->capmt_ca_info[cai][ca->index].odd,ca->cw,8);  // odd key
-          process_key = 1;
-        } else
-          tvhlog(LOG_ERR, "capmt", "Invalid parity %d in CA_SET_DESCR for ca id %d", ca->parity, cai);
-
-        seq  = capmt->capmt_ca_info[cai][ca->index].pid;
-        even = capmt->capmt_ca_info[cai][ca->index].even;
-        odd  = capmt->capmt_ca_info[cai][ca->index].odd;
-      } else if (*request == DMX_SET_FILTER) {
-        capmt_set_filter(capmt, cai, buffer);
-      } else if (*request == DMX_STOP) {
-        capmt_stop_filter(capmt, cai, buffer);
       }
-    } else {
-      /* get control words */
-      seq  = buffer[0] | ((uint16_t)buffer[1] << 8);
-      even = &buffer[2];
-      odd  = &buffer[10];
-      process_key = 1;
+      
+      tvhtrace("capmt", "thread received shutdown");
+      capmt->capmt_running = 0;
+      continue;
+    }
+    
+    if (reconnect != capmt->capmt_sock_reconnect[0]) {
+      buffer.sb_bswap = 0;
+      sbuf_reset(&buffer, 1024);
+      capmt_flush_queue(capmt, 1);
+      reconnect = capmt->capmt_sock_reconnect[0];
     }
 
-    // processing key
-    if (process_key) {
-      pthread_mutex_lock(&global_lock);
-      LIST_FOREACH(ct, &capmt->capmt_services, ct_link) {
-        t = (mpegts_service_t *)ct->td_service;
+    recvsock = capmt->capmt_sock[0];
+    ret = recv(recvsock, buf, sizeof(buf), MSG_DONTWAIT);
+
+    if (ret == 0) {
+      tvhlog(LOG_INFO, "capmt", "normal socket shutdown");
+      capmt_poll_rem(capmt, recvsock);
+      break;
+    }
+    
+    if (ret < 0)
+      continue;
 
-        if (capmt->capmt_oscam == CAPMT_OSCAM_SO_WRAPPER && ret < bufsize) {
-          if(ct->td_keystate != DS_FORBIDDEN) {
-            tvhlog(LOG_ERR, "capmt", "Can not descramble service \"%s\", access denied", t->s_dvb_svcname);
+    tvhtrace("capmt", "Received message from socket %i", recvsock);
+    tvhlog_hexdump("capmt", buf, ret);
 
-            ct->td_keystate = DS_FORBIDDEN;
-          }
+    sbuf_append(&buffer, buf, ret);
 
-          continue;
+    while (buffer.sb_ptr > 0) {
+      cmd_size = 0;
+      adapter = -1;
+      while (buffer.sb_ptr > 0) {
+        adapter = buffer.sb_data[0];
+        if (adapter < MAX_CA) {
+          cmd_size = capmt_msg_size(capmt, &buffer, 1);
+          if (cmd_size >= 0)
+            break;
         }
+        sbuf_cut(&buffer, 1);
+      }
+      if (cmd_size + 1 <= buffer.sb_ptr) {
+        capmt_analyze_cmd(capmt, adapter, &buffer, 1);
+        sbuf_cut(&buffer, cmd_size + 1);
+      } else {
+        break;
+      }
+    }
+  }
 
-        if(seq != ct->ct_seq)
-          continue;
+  sbuf_free(&buffer);
+  tvhpoll_destroy(capmt->capmt_poll);
+  capmt->capmt_poll = NULL;
+}
 
-        for (i = 0; i < 8; i++)
-          if (even[i]) {
-            tvhcsa_set_key_even(&ct->ct_csa, even);
-            break;
-          }
-        for (i = 0; i < 8; i++)
-          if (odd[i]) {
-            tvhcsa_set_key_odd(&ct->ct_csa, odd);
-            break;
-          }
+#if CONFIG_LINUXDVB
+static void 
+handle_ca0_wrapper(capmt_t *capmt)
+{
+  uint8_t buffer[18];
+  int ret;
 
-        if(ct->td_keystate != DS_RESOLVED)
-          tvhlog(LOG_DEBUG, "capmt", "Obtained key for service \"%s\"",t->s_dvb_svcname);
+  show_connection(capmt, ".so wrapper");
 
-        ct->td_keystate = DS_RESOLVED;
-      }
-      pthread_mutex_unlock(&global_lock);
+  pthread_mutex_lock(&global_lock);
+  capmt_notify_server(capmt, NULL);
+  pthread_mutex_unlock(&global_lock);
+
+  while (capmt->capmt_running) {
+
+    /* receiving data from UDP socket */
+    ret = recv(capmt->capmt_sock_ca0[0], buffer, 18, MSG_WAITALL);
+
+    if (ret < 0) {
+      tvhlog(LOG_ERR, "capmt", "error receiving over socket");
+      break;
+    } else if (ret == 0) {
+      /* normal socket shutdown */
+      tvhlog(LOG_INFO, "capmt", "normal socket shutdown");
+      break;
+    } else {
+
+      tvhtrace("capmt", "Received message from socket %i", capmt->capmt_sock_ca0[0]);
+      tvhlog_hexdump("capmt", buffer, ret);
+
+      capmt_process_key(capmt, 0,
+                        buffer[0] | ((uint16_t)buffer[1] << 8),
+                        buffer + 2, buffer + 10,
+                        ret == 18);
     }
   }
 
   tvhlog(LOG_INFO, "capmt", "connection from client closed ...");
 }
+#endif
 
 #if ENABLE_LINUXDVB
 static int
@@ -921,7 +1161,8 @@ capmt_thread(void *aux)
       capmt->capmt_sock_ca0[i] = -1;
     for (i = 0; i < MAX_SOCKETS; i++) {
       capmt->sids[i] = 0;
-      capmt->capmt_sock[i] = 0;
+      capmt->capmt_sock[i] = -1;
+      capmt->capmt_sock_reconnect[i] = 0;
     }
     memset(&capmt->capmt_demuxes, 0, sizeof(capmt->capmt_demuxes));
 
@@ -939,18 +1180,20 @@ capmt_thread(void *aux)
     pthread_mutex_unlock(&global_lock);
 
     /* open connection to camd.socket */
-    capmt->capmt_sock[0] = capmt_connect(capmt);
+    capmt_connect(capmt, 0);
 
-    if (capmt->capmt_sock[0]) {
+    if (capmt->capmt_sock[0] >= 0) {
       capmt_set_connected(capmt, 2);
 #if CONFIG_LINUXDVB
       if (capmt_oscam_new(capmt)) {
-        handle_ca0(capmt);
+        handle_single(capmt);
       } else {
         int bind_ok = 0;
         /* open connection to emulated ca0 device */
         if (capmt->capmt_oscam == CAPMT_OSCAM_SO_WRAPPER) {
           bind_ok = capmt_create_udp_socket(&capmt->capmt_sock_ca0[0], capmt->capmt_port);
+          if (bind_ok)
+            handle_ca0_wrapper(capmt);
         } else {
           int i, n;
           extern const idclass_t linuxdvb_adapter_class;
@@ -967,16 +1210,16 @@ capmt_thread(void *aux)
             tvhlog(LOG_INFO, "capmt", "created UDP socket %d", n);
             bind_ok = capmt_create_udp_socket(&capmt->capmt_sock_ca0[n], capmt->capmt_port + n);
           }
+          if (bind_ok)
+            handle_ca0(capmt);
         }
-        if (bind_ok)
-          handle_ca0(capmt);
-        else
+        if (!bind_ok)
           fatal = 1;
       }
 #else
      if (capmt->capmt_oscam == CAPMT_OSCAM_TCP ||
          capmt->capmt_oscam == CAPMT_OSCAM_UNIX_SOCKET) {
-       handle_ca0(capmt);
+       handle_single(capmt);
      } else {
        tvhlog(LOG_ERR, "capmt", "Only modes 3 and 4 are supported for non-linuxdvb devices");
        fatal = 1;
@@ -988,10 +1231,10 @@ capmt_thread(void *aux)
 
     /* close opened sockets */
     for (i = 0; i < MAX_SOCKETS; i++)
-      if (capmt->capmt_sock[i] > 0)
+      if (capmt->capmt_sock[i] >= 0)
         close(capmt->capmt_sock[i]);
     for (i = 0; i < MAX_CA; i++)
-      if (capmt->capmt_sock_ca0[i] > 0)
+      if (capmt->capmt_sock_ca0[i] >= 0)
         close(capmt->capmt_sock_ca0[i]);
 
     if (!capmt->capmt_running)
@@ -1014,6 +1257,7 @@ capmt_thread(void *aux)
     pthread_mutex_unlock(&global_lock);
   }
 
+  capmt_flush_queue(capmt, 1);
   free(capmt->capmt_id);
   free(capmt);
 
@@ -1055,10 +1299,11 @@ capmt_table_input(struct th_descrambler *td,
           if ((data[i] & f->mask[i]) != f->filter[i])
             break;
         }
-        if (i >= DMX_FILTER_SIZE && i <= len)
+        if (i >= DMX_FILTER_SIZE && i <= len) {
           capmt_filter_data(capmt, t->s_dvb_service_id,
                             ct->ct_adapter, demux_index,
                             filter_index, data, len);
+        }
       }
   }
 
@@ -1172,21 +1417,21 @@ capmt_send_request(capmt_service_t *ct, int lm)
       if (cce2->cce_caid >> 8 == 0x01) {
         cad.cad_length = 0x11;
         cad.cad_data[4] = cce2->cce_providerid >> 8;
-        cad.cad_data[5] = cce2->cce_providerid & 0xffffff;
+        cad.cad_data[5] = cce2->cce_providerid & 0xff;
       } else if (cce2->cce_caid >> 8 == 0x05) {
         cad.cad_length = 0x0f;
         cad.cad_data[10] = 0x14;
         cad.cad_data[11] = cce2->cce_providerid >> 24;
         cad.cad_data[12] = cce2->cce_providerid >> 16;
         cad.cad_data[13] = cce2->cce_providerid >> 8;
-        cad.cad_data[14] = cce2->cce_providerid & 0xffffff;
+        cad.cad_data[14] = cce2->cce_providerid & 0xff;
       } else if (cce2->cce_caid >> 8 == 0x18) {
         cad.cad_length = 0x07;
         cad.cad_data[5] = cce2->cce_providerid >> 8;
-        cad.cad_data[6] = cce2->cce_providerid & 0xffffff;
+        cad.cad_data[6] = cce2->cce_providerid & 0xff;
       } else if (cce2->cce_caid >> 8 == 0x4a) {
         cad.cad_length = 0x05;
-        cad.cad_data[4] = cce2->cce_providerid & 0xffffff;
+        cad.cad_data[4] = cce2->cce_providerid & 0xff;
       } else
         tvhlog(LOG_WARNING, "capmt", "Unknown CAID type, don't know where to put provider ID");
     }
@@ -1218,7 +1463,7 @@ capmt_send_request(capmt_service_t *ct, int lm)
   buf[9] = pmtversion;
   pmtversion = (pmtversion + 1) & 0x1F;
 
-  capmt_send_msg(capmt, sid, buf, pos);
+  capmt_queue_msg(capmt, sid, buf, pos);
 }
 
 static void
@@ -1239,11 +1484,9 @@ capmt_enumerate_services(capmt_t *capmt, int force)
   if (!all_srv_count && !res_srv_count) {
     // closing socket (oscam handle this as event and stop decrypting)
     tvhlog(LOG_DEBUG, "capmt", "%s: no subscribed services, closing socket, fd=%d", __FUNCTION__, capmt->capmt_sock[0]);
-    if (capmt->capmt_sock[0] > 0) {
-      close(capmt->capmt_sock[0]);
+    if (capmt->capmt_sock[0] >= 0)
       capmt_set_connected(capmt, 1);
-    }
-    capmt->capmt_sock[0] = 0;
+    capmt_socket_close(capmt, 0);
   }
   else if (force || (res_srv_count != all_srv_count)) {
     LIST_FOREACH(ct, &capmt->capmt_services, ct_link) {
@@ -1370,6 +1613,7 @@ capmt_service_start(service_t *s)
 
     /* wake-up idle thread */
     pthread_cond_signal(&capmt->capmt_cond);
+    pthread_cond_signal(&capmt_config_changed);
 
     if (change)
       capmt_notify_server(capmt, NULL);
@@ -1387,6 +1631,15 @@ capmt_destroy(capmt_t *capmt)
   TAILQ_REMOVE(&capmts, capmt, capmt_link);  
   capmt->capmt_running = 0;
   pthread_cond_signal(&capmt->capmt_cond);
+  pthread_join(capmt->capmt_tid, NULL);
+  free(capmt->capmt_sockfile);
+  capmt->capmt_sockfile = NULL;
+  free(capmt->capmt_comment);
+  capmt->capmt_comment = NULL;
+  tvhlog(LOG_INFO, "capmt", "mode %i %s %s port %i destroyed",
+         capmt->capmt_oscam,
+         capmt->capmt_oscam == CAPMT_OSCAM_TCP ? "IP address" : "sockfile",
+         capmt->capmt_sockfile, capmt->capmt_port);
 }
 
 /**
@@ -1420,8 +1673,11 @@ capmt_entry_find(const char *id, int create)
   capmt->capmt_id      = strdup(id); 
   capmt->capmt_running = 1; 
   capmt->capmt_seq     = 0;
+  TAILQ_INIT(&capmt->capmt_writeq);
+
+  TAILQ_INSERT_TAIL(&capmts, capmt, capmt_link);
 
-  TAILQ_INSERT_TAIL(&capmts, capmt, capmt_link);  
+  tvh_pipe(O_NONBLOCK, &capmt->capmt_pipe);
 
   tvhthread_create(&capmt->capmt_tid, NULL, capmt_thread, capmt, 1);
 
@@ -1565,15 +1821,13 @@ void
 capmt_done(void)
 {
   capmt_t *capmt, *n;
-  pthread_t tid;
 
   for (capmt = TAILQ_FIRST(&capmts); capmt != NULL; capmt = n) {
     n = TAILQ_NEXT(capmt, capmt_link);
     pthread_mutex_lock(&global_lock);
-    tid = capmt->capmt_tid;
+    tvh_write(capmt->capmt_pipe.wr, "", 1);
     capmt_destroy(capmt);
     pthread_mutex_unlock(&global_lock);
-    pthread_join(tid, NULL);
   }
   dtable_delete("capmt");
 }
index a9f7e1bffdf722f264974ab195afc34dc0e84bb1..fe65042fca310720ac6a3a6b97058531c470a216 100644 (file)
--- a/src/tcp.c
+++ b/src/tcp.c
@@ -54,7 +54,7 @@ tcp_connect(const char *hostname, int port, const char *bindaddr,
   int fd, r, res, err;
   struct addrinfo *ai;
   char portstr[6];
-  socklen_t errlen = sizeof(int);
+  socklen_t errlen = sizeof(err);
 
   snprintf(portstr, 6, "%u", port);
   res = getaddrinfo(hostname, portstr, NULL, &ai);
@@ -159,6 +159,12 @@ tcp_connect(const char *hostname, int port, const char *bindaddr,
   }
   
   fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK);
+
+
+  /* Set the keep-alive active */
+  err = 1;
+  setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&err, errlen);
+
   return fd;
 }
 
index e5910f8ddec819e255da1c7b3560efc78f61717a..96525bd7eede8e39089c6f7d5d471af3803ba58d 100644 (file)
@@ -472,7 +472,8 @@ typedef struct sbuf {
   uint8_t *sb_data;
   int sb_ptr;
   int sb_size;
-  int sb_err;
+  unsigned int sb_err  : 1;
+  unsigned int sb_bswap: 1;
 } sbuf_t;
 
 
@@ -651,6 +652,22 @@ void sbuf_put_byte(sbuf_t *sb, uint8_t u8);
 
 ssize_t sbuf_read(sbuf_t *sb, int fd);
 
+static inline uint8_t sbuf_peek_u8(sbuf_t *sb, int off) { return sb->sb_data[off]; }
+static inline  int8_t sbuf_peek_s8(sbuf_t *sb, int off) { return sb->sb_data[off]; }
+uint16_t sbuf_peek_u16(sbuf_t *sb, int off);
+static inline int16_t sbuf_peek_s16(sbuf_t *sb, int off) { return sbuf_peek_u16(sb, off); }
+uint16_t sbuf_peek_u16le(sbuf_t *sb, int off);
+static inline int16_t sbuf_peek_s16le(sbuf_t *sb, int off) { return sbuf_peek_u16le(sb, off); }
+uint16_t sbuf_peek_u16be(sbuf_t *sb, int off);
+static inline int16_t sbuf_peek_s16be(sbuf_t *sb, int off) { return sbuf_peek_u16be(sb, off); }
+uint32_t sbuf_peek_u32(sbuf_t *sb, int off);
+static inline int32_t sbuf_peek_s32(sbuf_t *sb, int off) { return sbuf_peek_u32(sb, off); }
+uint32_t sbuf_peek_u32le(sbuf_t *sb, int off);
+static inline int32_t sbuf_peek_s32le(sbuf_t *sb, int off) { return sbuf_peek_u32le(sb, off); }
+uint32_t sbuf_peek_u32be(sbuf_t *sb, int off);
+static inline  int32_t sbuf_peek_s32be(sbuf_t *sb, int off) { return sbuf_peek_u32be(sb, off); }
+static inline uint8_t *sbuf_peek(sbuf_t *sb, int off) { return sb->sb_data + off; }
+
 char *md5sum ( const char *str );
 
 int makedirs ( const char *path, int mode );
index b14597572bf5f41c24745a09b794dd8a8711bd4c..c771c59f3b27d72aaae9f71deedd7cfbf936e29a 100644 (file)
 #include <sys/types.h>
 #include <dirent.h>
 #include <unistd.h>
+#include <endian.h>
 #include "tvheadend.h"
 
+#ifndef BYTE_ORDER
+#define BYTE_ORDER __BYTE_ORDER
+#endif
+#ifndef LITTLE_ENDIAN
+#define LITTLE_ENDIAN __LITTLE_ENDIAN
+#endif
+#ifndef BIG_ENDIAN
+#define BIG_ENDIAN __BIG_ENDIAN
+#endif
+#if BYTE_ORDER == LITTLE_ENDIAN
+#define ENDIAN_SWAP_COND(x) (!(x))
+#else
+#define ENDIAN_SWAP_COND(x) (x)
+#endif
+
 /**
  * CRC32 
  */
@@ -390,6 +406,51 @@ sbuf_put_byte(sbuf_t *sb, uint8_t u8)
   sbuf_append(sb, &u8, 1);
 }
 
+uint16_t sbuf_peek_u16(sbuf_t *sb, int off)
+{
+  uint8_t *p = sb->sb_data + off;
+  if (ENDIAN_SWAP_COND(sb->sb_bswap))
+    return p[0] | (((uint16_t)p[1]) << 8);
+  else
+    return (((uint16_t)p[0]) << 8) | p[1];
+}
+
+uint16_t sbuf_peek_u16le(sbuf_t *sb, int off)
+{
+  uint8_t *p = sb->sb_data + off;
+  return p[0] | (((uint16_t)p[1]) << 8);
+}
+
+uint16_t sbuf_peek_u16be(sbuf_t *sb, int off)
+{
+  uint8_t *p = sb->sb_data + off;
+  return (((uint16_t)p[0]) << 8) | p[1];
+}
+
+uint32_t sbuf_peek_u32(sbuf_t *sb, int off)
+{
+  uint8_t *p = sb->sb_data + off;
+  if (ENDIAN_SWAP_COND(sb->sb_bswap))
+    return p[0] | (((uint32_t)p[1]) << 8) |
+           (((uint32_t)p[2]) << 16) | (((uint32_t)p[3]) << 24);
+  else
+    return (((uint16_t)p[0]) << 24) | (((uint16_t)p[1]) << 16) |
+            (((uint16_t)p[2]) << 8) | p[3];
+}
+
+uint32_t sbuf_peek_u32le(sbuf_t *sb, int off)
+{
+  uint8_t *p = sb->sb_data + off;
+  return p[0] | (((uint32_t)p[1]) << 8) |
+         (((uint32_t)p[2]) << 16) | (((uint32_t)p[3]) << 24);
+}
+
+uint32_t sbuf_peek_u32be(sbuf_t *sb, int off)
+{
+  uint8_t *p = sb->sb_data + off;
+  return (((uint16_t)p[0]) << 24) | (((uint16_t)p[1]) << 16) |
+         (((uint16_t)p[2]) << 8) | p[3];
+}
 
 void 
 sbuf_cut(sbuf_t *sb, int off)