]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
mpegts: reworking of the input threading
authorAdam Sutton <dev@adamsutton.me.uk>
Fri, 11 Apr 2014 22:43:12 +0000 (23:43 +0100)
committerAdam Sutton <dev@adamsutton.me.uk>
Mon, 14 Apr 2014 16:51:16 +0000 (17:51 +0100)
I have split the input threading in two. There is now a smaller/faster thread
responisble for reading data from the source device (file/socket/DVB/etc...)
and a potentially slower (though not too slow!) thread for processing.

This ensures that any minor delay in processing (potentially due to unexpected
effects during start/stop, or anything else!) do not unduly impact reading from
the source which could otherwise lead to loss of data.

src/input/mpegts.h
src/input/mpegts/mpegts_input.c
src/input/mpegts/mpegts_mux.c

index a1a70d1d3e2def6f91094be4b7053bab154d13f0..242861d5a66e4f520d8c7a5ae8752d2c7d4127ce 100644 (file)
@@ -45,6 +45,8 @@ typedef struct mpegts_mux_sub       mpegts_mux_sub_t;
 typedef struct mpegts_input         mpegts_input_t;
 typedef struct mpegts_table_feed    mpegts_table_feed_t;
 typedef struct mpegts_network_link  mpegts_network_link_t;
+typedef struct mpegts_packet        mpegts_packet_t;
+typedef struct mpegts_buffer        mpegts_buffer_t;
 
 /* Lists */
 typedef LIST_HEAD (,mpegts_network)             mpegts_network_list_t;
@@ -52,7 +54,8 @@ typedef LIST_HEAD (,mpegts_input)               mpegts_input_list_t;
 typedef TAILQ_HEAD(mpegts_mux_queue,mpegts_mux) mpegts_mux_queue_t;
 typedef LIST_HEAD (,mpegts_mux)                 mpegts_mux_list_t;
 typedef LIST_HEAD (,mpegts_network_link)        mpegts_network_link_list_t;
-TAILQ_HEAD(mpegts_table_feed_queue, mpegts_table_feed);
+typedef TAILQ_HEAD(mpegts_table_feed_queue, mpegts_table_feed)
+  mpegts_table_feed_queue_t;
 
 /* Classes */
 extern const idclass_t mpegts_network_class;
@@ -61,9 +64,17 @@ extern const idclass_t mpegts_service_class;
 extern const idclass_t mpegts_input_class;
 
 /* **************************************************************************
- * SI processing
+ * Data / SI processing
  * *************************************************************************/
 
+struct mpegts_packet
+{
+  TAILQ_ENTRY(mpegts_packet)  mp_link;
+  size_t                      mp_len;
+  mpegts_mux_t               *mp_mux;
+  uint8_t                     mp_data[0];
+};
+
 typedef int (*mpegts_table_callback_t)
   ( mpegts_table_t*, const uint8_t *buf, int len, int tableid );
 
@@ -437,10 +448,9 @@ struct mpegts_input
 
   mpegts_network_link_list_t mi_networks;
 
-  LIST_HEAD(,mpegts_mux_instance) mi_mux_active;
-
   LIST_HEAD(,mpegts_mux_instance) mi_mux_instances;
 
+
   /*
    * Status
    */
@@ -450,20 +460,28 @@ struct mpegts_input
    * Input processing
    */
 
-  pthread_mutex_t mi_delivery_mutex;
-
-  LIST_HEAD(,service) mi_transports;
+  int mi_running;
 
+  /* Data input */
+  // Note: this section is protected by mi_input_lock
+  pthread_t                       mi_input_tid;
+  pthread_mutex_t                 mi_input_lock;
+  pthread_cond_t                  mi_input_cond;
+  TAILQ_HEAD(,mpegts_packet)      mi_input_queue;
 
-  struct mpegts_table_feed_queue mi_table_feed;
-  pthread_cond_t mi_table_feed_cond;  // Bound to mi_delivery_mutex
+  /* Data processing/output */
+  // Note: this lock (mi_output_lock) protects all the remaining
+  //       data fields (excluding the callback functions)
+  pthread_mutex_t                 mi_output_lock;
 
-
-  pthread_t mi_thread_id;
-  th_pipe_t mi_thread_pipe;
-
-  int mi_delivery_running;
-  pthread_t mi_thread_table_id;
+  /* Active sources */
+  LIST_HEAD(,mpegts_mux_instance) mi_mux_active;
+  LIST_HEAD(,service)             mi_transports;
+  
+  /* Table processing */
+  pthread_t                       mi_table_tid;
+  pthread_cond_t                  mi_table_cond;
+  mpegts_table_feed_queue_t       mi_table_queue;
 
   /*
    * Functions
@@ -520,6 +538,8 @@ mpegts_input_t *mpegts_input_create0
   mpegts_input_create0(calloc(1, sizeof(mpegts_input_t)),\
                        &mpegts_input_class, u, c)
 
+void mpegts_input_stop_all ( mpegts_input_t *mi );
+
 void mpegts_input_delete ( mpegts_input_t *mi, int delconf );
 
 #define mpegts_input_find(u) idnode_find(u, &mpegts_input_class);
@@ -637,13 +657,9 @@ mpegts_mux_find_pid(mpegts_mux_t *mm, int pid, int create)
     return mm->mm_last_mp;
 }
 
-size_t mpegts_input_recv_packets
-  (mpegts_input_t *mi, mpegts_mux_instance_t *mmi, uint8_t *tsb, size_t len,
-   int64_t *pcr, uint16_t *pcr_pid, const char *name);
-
-void mpegts_input_table_thread_start( mpegts_input_t *mi );
-
-void mpegts_input_table_thread_stop( mpegts_input_t *mi );
+void mpegts_input_recv_packets
+  (mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb, size_t off,
+   int64_t *pcr, uint16_t *pcr_pid);
 
 int mpegts_input_is_free ( mpegts_input_t *mi );
 
index dabc97793e138d6a06af5019a0eb1e9514901a71..12a9be099f375b181f74dc03f3f2b973b824dc09 100644 (file)
@@ -186,13 +186,13 @@ mpegts_input_get_weight ( mpegts_input_t *mi )
   }
 
   /* Service subs */
-  pthread_mutex_lock(&mi->mi_delivery_mutex);
+  pthread_mutex_lock(&mi->mi_output_lock);
   LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
     LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) {
       w = MAX(w, ths->ths_weight);
     }
   }
-  pthread_mutex_unlock(&mi->mi_delivery_mutex);
+  pthread_mutex_unlock(&mi->mi_output_lock);
   return w;
 }
 
@@ -288,13 +288,12 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int init )
   elementary_stream_t *st;
 
   /* Add to list */
-  pthread_mutex_lock(&mi->mi_delivery_mutex);
+  pthread_mutex_lock(&mi->mi_output_lock);
   if (!s->s_dvb_active_input) {
     LIST_INSERT_HEAD(&mi->mi_transports, ((service_t*)s), s_active_link);
     s->s_dvb_active_input = mi;
   }
 
-
   /* Register PIDs */
   pthread_mutex_lock(&s->s_stream_mutex);
   mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_STREAM, s);
@@ -303,7 +302,7 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int init )
     mi->mi_open_pid(mi, s->s_dvb_mux, st->es_pid, MPS_STREAM, s);
 
   pthread_mutex_unlock(&s->s_stream_mutex);
-  pthread_mutex_unlock(&mi->mi_delivery_mutex);
+  pthread_mutex_unlock(&mi->mi_output_lock);
 }
 
 void
@@ -312,7 +311,7 @@ mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s )
   elementary_stream_t *st;
 
   /* Remove from list */
-  pthread_mutex_lock(&mi->mi_delivery_mutex);
+  pthread_mutex_lock(&mi->mi_output_lock);
   if (s->s_dvb_active_input != NULL) {
     LIST_REMOVE(((service_t*)s), s_active_link);
     s->s_dvb_active_input = NULL;
@@ -326,7 +325,7 @@ mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s )
     mi->mi_close_pid(mi, s->s_dvb_mux, st->es_pid, MPS_STREAM, s);
 
   pthread_mutex_unlock(&s->s_stream_mutex);
-  pthread_mutex_unlock(&mi->mi_delivery_mutex);
+  pthread_mutex_unlock(&mi->mi_output_lock);
 
   /* Stop mux? */
   s->s_dvb_mux->mm_stop(s->s_dvb_mux, 0);
@@ -388,14 +387,14 @@ mpegts_input_has_subscription ( mpegts_input_t *mi, mpegts_mux_t *mm )
 {
   int ret = 0;
   service_t *t;
-  pthread_mutex_lock(&mi->mi_delivery_mutex);
+  pthread_mutex_lock(&mi->mi_output_lock);
   LIST_FOREACH(t, &mi->mi_transports, s_active_link) {
     if (((mpegts_service_t*)t)->s_dvb_mux == mm) {
       ret = 1;
       break;
     }
   }
-  pthread_mutex_unlock(&mi->mi_delivery_mutex);
+  pthread_mutex_unlock(&mi->mi_output_lock);
   return ret;
 }
 
@@ -403,112 +402,160 @@ mpegts_input_has_subscription ( mpegts_input_t *mi, mpegts_mux_t *mm )
  * Data processing
  * *************************************************************************/
 
-size_t
+static int inline
+ts_sync_count ( const uint8_t *tsb, int len )
+{
+  int i = 0;
+  while (len >= 188 && *tsb == 0x47) {
+    ++i;
+    len -= 188;
+    tsb += 188;
+  }
+  return i;
+}
+
+void
 mpegts_input_recv_packets
-  ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi,
-    uint8_t *tsb, size_t l, int64_t *pcr, uint16_t *pcr_pid,
-    const char *name )
+  ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb, size_t off,
+    int64_t *pcr, uint16_t *pcr_pid )
+{
+  int i, p = 0;
+  mpegts_packet_t *mp;
+  uint8_t *tsb = sb->sb_data + off;
+  int     len  = sb->sb_size - off;
+#define MIN_TS_PKT 10
+
+  /* Check for sync */
+  while ( (len >= (MIN_TS_PKT * 188)) &&
+          ((p = ts_sync_count(tsb, len)) < MIN_TS_PKT) ) {
+    --len;
+    ++tsb;
+    ++off;
+  }
+
+  // Note: we check for sync here so that the buffer can always be
+  //       processed in its entirety inside the processing thread
+  //       without the potential need to buffer data (since that would
+  //       require per mmi buffers, where this is generally not required)
+
+  /* Extract PCR */
+  // Note: this is only used by tsfile for timing the play out of packets
+  //       maybe we should move it?
+  if (pcr && pcr_pid) {
+    uint8_t *tmp = tsb;
+    for (i = 0; i < p; i++) {
+      if (*pcr_pid == (((tmp[1] & 0x1f) << 8) | tmp[2]))
+        ts_recv_packet1(NULL, tmp, pcr, 0);
+      tmp += 188;
+    }
+  }
+
+  /* Pass */
+  if (p >= 10) {
+    size_t sz = sizeof(mpegts_packet_t) + (p * 188);
+    
+    mp = calloc(1, sz);
+    mp->mp_mux  = mmi->mmi_mux;
+    mp->mp_len  = p * 188;
+    memcpy(mp->mp_data, tsb, mp->mp_len);
+
+    pthread_mutex_lock(&mi->mi_input_lock);
+    if (TAILQ_FIRST(&mi->mi_input_queue) == NULL)
+      pthread_cond_signal(&mi->mi_input_cond);
+    TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
+    pthread_mutex_unlock(&mi->mi_input_lock);
+
+    len -= mp->mp_len;
+    off += mp->mp_len;
+  }
+
+  /* Adjust buffer */
+  if (len)
+    sbuf_cut(sb, off); // cut off the bottom
+  else
+    sb->sb_ptr = 0;    // clear
+}
+
+static void
+mpegts_input_process
+  ( mpegts_input_t *mi, mpegts_packet_t *mp )
 {
-  int len = l;
+  int len = mp->mp_len;
   int i = 0, table_wakeup = 0;
-  int stream = 0;
-  int table = 0;
-  mpegts_mux_t *mm = mmi->mmi_mux;
+  int table, stream;
+  uint8_t *tsb = mp->mp_data;
+  mpegts_mux_t          *mm  = mp->mp_mux;
+  mpegts_mux_instance_t *mmi = mm->mm_active;
   mpegts_pid_t *last_mp = NULL;
-  assert(mm != NULL);
-  assert(name != NULL);
-
-  // TODO: get the input name
-  tvhtrace("tsdemux", "%s - recv pkts tsb=%p len=%d pcr=%p pcr_pid=%p mmi=%p",
-           name, tsb, (int)len, pcr, pcr_pid, mmi);
-  
-  /* Not enough data */
-  if (len < 188) return len;
-  
-  /* Streaming - lock mutex */
-  pthread_mutex_lock(&mi->mi_delivery_mutex);
 
   /* Process */
   while ( len >= 188 ) {
-
-    /* Sync */
-    if ( tsb[i] == 0x47 ) {
-      mpegts_pid_t *mp;
-      mpegts_pid_sub_t *mps;
-      service_t *s;
-      int     pid   = ((tsb[i+1] & 0x1f) << 8) | tsb[i+2];
-      int64_t *ppcr = (pcr_pid && *pcr_pid == pid) ? pcr : NULL;
-      tvhtrace("tsdemux", "%s - recv pkt for pid %04X (%d) on mmi %p",
-               name, pid, pid, mmi);
-
-      /* Find PID */
-      if ((mp = mpegts_mux_find_pid(mm, pid, 0))) {
-
-       if (mp != last_mp) {
-         last_mp = mp;
-         stream = 0;
-          table = 0;
-
-          /* Stream takes pref. */
+    mpegts_pid_t *mp;
+    mpegts_pid_sub_t *mps;
+    service_t *s;
+    int pid = ((tsb[i+1] & 0x1f) << 8) | tsb[i+2];
+
+    /* Find PID */
+    if ((mp = mpegts_mux_find_pid(mm, pid, 0))) {
+      // Note: there is a minor danger this caching will get things
+      //       wrong for a brief period of time if the registrations on
+      //       the PID change
+      if (mp != last_mp) {
+        if (pid == 0)
+          stream = table = 1;
+        else {
+          stream = table = 0;
+
+          /* Determine PID type */
           RB_FOREACH(mps, &mp->mp_subs, mps_link) {
             if (mps->mps_type & MPS_STREAM)
               stream = 1;
             if (mps->mps_type & MPS_TABLE)
               table  = 1;
+            if (table && stream) break;
           }
 
           /* Special case streams */
-          if (pid == 0) table = stream = 1;
           LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
             if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue;
                  if (pid == s->s_pmt_pid) stream = 1;
             else if (pid == s->s_pcr_pid) stream = 1;
           }
         }
+      }
     
-        /* Stream data */
-        if (stream) {
-          LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
-            int f;
-            if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue;
-            f = table || (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
-            ts_recv_packet1((mpegts_service_t*)s, tsb+i, ppcr, f);
-          }
+      /* Stream data */
+      if (stream) {
+        LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
+          int f;
+          if (((mpegts_service_t*)s)->s_dvb_mux != mmi->mmi_mux) continue;
+          f = table || (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid);
+          ts_recv_packet1((mpegts_service_t*)s, tsb+i, NULL, f);
         }
+      }
 
-        /* Table data */
-        if (table) {
-          if (!(tsb[i+1] & 0x80)) {
-            mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
-            memcpy(mtf->mtf_tsb, tsb+i, 188);
-            mtf->mtf_mux = mm;
-            TAILQ_INSERT_TAIL(&mi->mi_table_feed, mtf, mtf_link);
-            table_wakeup = 1;
-          } else {
-            tvhdebug("tsdemux", "%s - SI packet had errors", name);
-          }
+      /* Table data */
+      if (table) {
+        if (!(tsb[i+1] & 0x80)) {
+          // TODO: might be able to optimise this a bit by having slightly
+          //       larger buffering and trying to aggregate data (if we get
+          //       same PID multiple times in the loop)
+          mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
+          memcpy(mtf->mtf_tsb, tsb+i, 188);
+          mtf->mtf_mux = mm;
+          TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
+          table_wakeup = 1;
+        } else {
+          //tvhdebug("tsdemux", "%s - SI packet had errors", name);
         }
-
-      /* Force PCR extraction for tsfile */
-      } else {
-        if (ppcr && *ppcr == PTS_UNSET)
-          ts_recv_packet1(NULL, tsb+i, ppcr, 0);
       }
-
-      i   += 188;
-      len -= 188;
-    
-    /* Re-sync */
-    } else {
-      tvhdebug("tsdemux", "%s - ts sync lost", name);
-      if (ts_resync(tsb, &len, &i)) break;
-      tvhdebug("tsdemux", "%s - ts sync found", name);
     }
 
+    i   += 188;
+    len -= 188;
   }
 
   /* Raw stream */
-  // Note: this will include unsynced data if that's what is received
   if (i > 0 && LIST_FIRST(&mmi->mmi_streaming_pad.sp_targets) != NULL) {
     streaming_message_t sm;
     pktbuf_t *pb = pktbuf_alloc(tsb, i);
@@ -521,17 +568,48 @@ mpegts_input_recv_packets
 
   /* Wake table */
   if (table_wakeup)
-    pthread_cond_signal(&mi->mi_table_feed_cond);
-
-  pthread_mutex_unlock(&mi->mi_delivery_mutex);
+    pthread_cond_signal(&mi->mi_table_cond);
 
   /* Bandwidth monitoring */
   atomic_add(&mmi->mmi_stats.bps, i);
-  
-  /* Reset buffer */
-  if (len) memmove(tsb, tsb+i, len);
+}
+
+static void *
+mpegts_input_thread ( void * p )
+{
+  mpegts_packet_t *mp;
+  mpegts_input_t  *mi = p;
+
+  pthread_mutex_lock(&mi->mi_input_lock);
+  while (mi->mi_running) {
 
-  return len;
+    /* Wait for a packet */
+    if (!(mp = TAILQ_FIRST(&mi->mi_input_queue))) {
+      pthread_cond_wait(&mi->mi_input_cond, &mi->mi_input_lock);
+      continue;
+    }
+    TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
+    pthread_mutex_unlock(&mi->mi_input_lock);
+      
+    /* Process */
+    pthread_mutex_lock(&mi->mi_output_lock);
+    if (mp->mp_mux && mp->mp_mux->mm_active)
+      mpegts_input_process(mi, mp);
+    pthread_mutex_unlock(&mi->mi_output_lock);
+
+    /* Cleanup */
+    free(mp);
+    pthread_mutex_lock(&mi->mi_input_lock);
+  }
+
+  /* Flush */
+  while ((mp = TAILQ_FIRST(&mi->mi_input_queue))) {
+    TAILQ_REMOVE(&mi->mi_input_queue, mp, mp_link);
+    free(mp);
+  }
+  pthread_mutex_unlock(&mi->mi_input_lock);
+
+  return NULL;
 }
 
 static void
@@ -570,71 +648,65 @@ mpegts_input_table_thread ( void *aux )
   mpegts_table_feed_t   *mtf;
   mpegts_input_t        *mi = aux;
 
-  pthread_mutex_lock(&mi->mi_delivery_mutex);
-  while (mi->mi_delivery_running) {
+  pthread_mutex_lock(&mi->mi_output_lock);
+  while (mi->mi_running) {
 
     /* Wait for data */
-    while(!(mtf = TAILQ_FIRST(&mi->mi_table_feed))) {
-      if (!mi->mi_delivery_running)
-        break;
-      pthread_cond_wait(&mi->mi_table_feed_cond, &mi->mi_delivery_mutex);
+    if (!(mtf = TAILQ_FIRST(&mi->mi_table_queue))) {
+      pthread_cond_wait(&mi->mi_table_cond, &mi->mi_output_lock);
+      continue;
     }
-    if (mtf)
-      TAILQ_REMOVE(&mi->mi_table_feed, mtf, mtf_link);
+    TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
+    pthread_mutex_unlock(&mi->mi_output_lock);
     
     /* Process */
-    if (mtf) {
-      pthread_mutex_unlock(&mi->mi_delivery_mutex);
+    if (mtf->mtf_mux) {
       pthread_mutex_lock(&global_lock);
       mpegts_input_table_dispatch(mtf->mtf_mux, mtf);
       pthread_mutex_unlock(&global_lock);
-      free(mtf);
-      pthread_mutex_lock(&mi->mi_delivery_mutex);
     }
-  }
-  while ((mtf = TAILQ_FIRST(&mi->mi_table_feed)) != NULL) {
-    TAILQ_REMOVE(&mi->mi_table_feed, mtf, mtf_link);
+
+    /* Cleanup */
     free(mtf);
+    pthread_mutex_lock(&mi->mi_output_lock);
   }
-  pthread_mutex_unlock(&mi->mi_delivery_mutex);
-  return NULL;
-}
 
-void
-mpegts_input_table_thread_start( mpegts_input_t *mi )
-{
-  mi->mi_delivery_running = 1;
-  tvhthread_create(&mi->mi_thread_table_id, NULL,  
-                   mpegts_input_table_thread, mi, 0);
-}
+  /* Flush */
+  while ((mtf = TAILQ_FIRST(&mi->mi_table_queue)) != NULL) {
+    TAILQ_REMOVE(&mi->mi_table_queue, mtf, mtf_link);
+    free(mtf);
+  }
+  pthread_mutex_unlock(&mi->mi_output_lock);
 
-void
-mpegts_input_table_thread_stop( mpegts_input_t *mi )
-{
-  pthread_mutex_lock(&mi->mi_delivery_mutex);
-  mi->mi_delivery_running = 0;
-  pthread_cond_signal(&mi->mi_table_feed_cond);
-  pthread_mutex_unlock(&mi->mi_delivery_mutex);
-  pthread_join(mi->mi_thread_table_id, NULL);
+  return NULL;
 }
 
 void
 mpegts_input_flush_mux
   ( mpegts_input_t *mi, mpegts_mux_t *mm )
 {
-  mpegts_table_feed_t *mtf, *next;
+  mpegts_table_feed_t *mtf;
+  mpegts_packet_t *mp;
 
-  pthread_mutex_lock(&mi->mi_delivery_mutex);
-  mtf = TAILQ_FIRST(&mi->mi_table_feed);
-  while (mtf) {
-    next = TAILQ_NEXT(mtf, mtf_link);
-    if (mtf->mtf_mux == mm) {
-      TAILQ_REMOVE(&mi->mi_table_feed, mtf, mtf_link);
-      free(mtf);
-    }
-    mtf  = next;
+  // Note: to avoid long delays in here, rather than actually
+  //       remove things from the Q, we simply invalidate by clearing
+  //       the mux pointer and allow the threads to deal with the deletion
+
+  /* Flush input Q */
+  pthread_mutex_lock(&mi->mi_input_lock);
+  TAILQ_FOREACH(mp, &mi->mi_input_queue, mp_link) {
+    if (mp->mp_mux == mm)
+      mp->mp_mux = NULL;
+  }
+  pthread_mutex_unlock(&mi->mi_input_lock);
+
+  /* Flush table Q */
+  pthread_mutex_lock(&mi->mi_output_lock);
+  TAILQ_FOREACH(mtf, &mi->mi_table_queue, mtf_link) {
+    if (mtf->mtf_mux == mm)
+      mtf->mtf_mux = NULL;
   }
-  pthread_mutex_unlock(&mi->mi_delivery_mutex);
+  pthread_mutex_unlock(&mi->mi_output_lock);
 }
 
 static void
@@ -685,13 +757,44 @@ mpegts_input_get_streams
   mpegts_input_t *mi = (mpegts_input_t*)i;
   mpegts_mux_instance_t *mmi;
 
-  pthread_mutex_lock(&mi->mi_delivery_mutex);
+  pthread_mutex_lock(&mi->mi_output_lock);
   LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
     st = calloc(1, sizeof(tvh_input_stream_t));
     mpegts_input_stream_status(mmi, st);
     LIST_INSERT_HEAD(isl, st, link);
   }
-  pthread_mutex_unlock(&mi->mi_delivery_mutex);
+  pthread_mutex_unlock(&mi->mi_output_lock);
+}
+
+static void
+mpegts_input_thread_start ( mpegts_input_t *mi )
+{
+  mi->mi_running = 1;
+  
+  tvhthread_create(&mi->mi_table_tid, NULL,
+                   mpegts_input_table_thread, mi, 0);
+  tvhthread_create(&mi->mi_input_tid, NULL,
+                   mpegts_input_thread, mi, 0);
+}
+
+static void
+mpegts_input_thread_stop ( mpegts_input_t *mi )
+{
+  mi->mi_running = 0;
+
+  /* Stop input thread */
+  pthread_mutex_lock(&mi->mi_input_lock);
+  pthread_cond_signal(&mi->mi_input_cond);
+  pthread_mutex_unlock(&mi->mi_input_lock);
+
+  /* Stop table thread */
+  pthread_mutex_lock(&mi->mi_output_lock);
+  pthread_cond_signal(&mi->mi_table_cond);
+  pthread_mutex_unlock(&mi->mi_output_lock);
+
+  /* Join threads */
+  pthread_join(mi->mi_input_tid, NULL);
+  pthread_join(mi->mi_table_tid, NULL);
 }
 
 /* **************************************************************************
@@ -705,7 +808,7 @@ mpegts_input_status_timer ( void *p )
   mpegts_input_t *mi = p;
   mpegts_mux_instance_t *mmi;
   htsmsg_t *e;
-  pthread_mutex_lock(&mi->mi_delivery_mutex);
+  pthread_mutex_lock(&mi->mi_output_lock);
   LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
     memset(&st, 0, sizeof(st));
     mpegts_input_stream_status(mmi, &st);
@@ -714,7 +817,7 @@ mpegts_input_status_timer ( void *p )
     notify_by_msg("input_status", e);
     tvh_input_stream_destroy(&st);
   }
-  pthread_mutex_unlock(&mi->mi_delivery_mutex);
+  pthread_mutex_unlock(&mi->mi_output_lock);
   gtimer_arm(&mi->mi_status_timer, mpegts_input_status_timer, mi, 1);
 }
 
@@ -754,15 +857,14 @@ mpegts_input_create0
   /* Index */
   mi->mi_instance             = ++mpegts_input_idx;
 
-  /* Init mutex */
-  pthread_mutex_init(&mi->mi_delivery_mutex, NULL);
-  
-  /* Table input */
-  TAILQ_INIT(&mi->mi_table_feed);
-  pthread_cond_init(&mi->mi_table_feed_cond, NULL);
+  /* Init input/output structures */
+  pthread_mutex_init(&mi->mi_input_lock, NULL);
+  pthread_cond_init(&mi->mi_input_cond, NULL);
+  TAILQ_INIT(&mi->mi_input_queue);
 
-  /* Init input thread control */
-  mi->mi_thread_pipe.rd = mi->mi_thread_pipe.wr = -1;
+  pthread_mutex_init(&mi->mi_output_lock, NULL);
+  pthread_cond_init(&mi->mi_table_cond, NULL);
+  TAILQ_INIT(&mi->mi_table_queue);
 
   /* Add to global list */
   LIST_INSERT_HEAD(&mpegts_input_all, mi, mi_global_link);
@@ -771,19 +873,30 @@ mpegts_input_create0
   if (c)
     idnode_load(&mi->ti_id, c);
 
+  /* Start threads */
+  mpegts_input_thread_start(mi);
+
   return mi;
 }
 
+void
+mpegts_input_stop_all ( mpegts_input_t *mi )
+{
+  mpegts_mux_instance_t *mmi;
+  while ((mmi = LIST_FIRST(&mi->mi_mux_active)))
+    mmi->mmi_mux->mm_stop(mmi->mmi_mux, 1);
+}
+
 void
 mpegts_input_delete ( mpegts_input_t *mi, int delconf )
 {
   mpegts_network_link_t *mnl;
+  mpegts_input_thread_stop(mi);
   while ((mnl = LIST_FIRST(&mi->mi_networks)))
     mpegts_input_del_network(mnl);
   idnode_unlink(&mi->ti_id);
-  pthread_mutex_destroy(&mi->mi_delivery_mutex);
-  pthread_cond_destroy(&mi->mi_table_feed_cond);
-  tvh_pipe_close(&mi->mi_thread_pipe);
+  pthread_mutex_destroy(&mi->mi_output_lock);
+  pthread_cond_destroy(&mi->mi_table_cond);
   LIST_REMOVE(mi, ti_link);
   LIST_REMOVE(mi, mi_global_link);
   free(mi->mi_name);
index 0ccdcaf45ad267f12b72eb4f66d0166730860c81..72b86226eb7c228d9b76ede8c1f01c586e3d576d 100644 (file)
@@ -147,7 +147,7 @@ mpegts_mux_instance_weight ( mpegts_mux_instance_t *mmi )
   const service_t *s;
   const th_subscription_t *ths;
   mpegts_input_t *mi = mmi->mmi_input;
-  lock_assert(&mi->mi_delivery_mutex);
+  lock_assert(&mi->mi_output_lock);
 
   /* Direct subs */
   LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) {
@@ -578,9 +578,9 @@ mpegts_mux_open_table ( mpegts_mux_t *mm, mpegts_table_t *mt )
   mpegts_input_t *mi;
   if (!mm->mm_active || !mm->mm_active->mmi_input) return;
   mi = mm->mm_active->mmi_input;
-  pthread_mutex_lock(&mi->mi_delivery_mutex);
+  pthread_mutex_lock(&mi->mi_output_lock);
   mi->mi_open_pid(mi, mm, mt->mt_pid, type, mt);
-  pthread_mutex_unlock(&mi->mi_delivery_mutex);
+  pthread_mutex_unlock(&mi->mi_output_lock);
 }
 
 void
@@ -591,9 +591,9 @@ mpegts_mux_close_table ( mpegts_mux_t *mm, mpegts_table_t *mt )
   if (mt->mt_flags & MT_RECORD) type |= MPS_STREAM;
   if (!mm->mm_active || !mm->mm_active->mmi_input) return;
   mi = mm->mm_active->mmi_input;
-  pthread_mutex_lock(&mi->mi_delivery_mutex);
+  pthread_mutex_lock(&mi->mi_output_lock);
   mi->mi_close_pid(mi, mm, mt->mt_pid, type, mt);
-  pthread_mutex_unlock(&mi->mi_delivery_mutex);
+  pthread_mutex_unlock(&mi->mi_output_lock);
 }
 
 /* **************************************************************************