]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
mpegts input: cleanups in locking and queue shutdown
authorJaroslav Kysela <perex@perex.cz>
Sun, 26 Oct 2014 19:56:11 +0000 (20:56 +0100)
committerJaroslav Kysela <perex@perex.cz>
Sun, 26 Oct 2014 20:43:05 +0000 (21:43 +0100)
src/input/mpegts/mpegts_input.c
src/input/mpegts/mpegts_mux.c

index bba6539134d945af29bdfbe5350bc3499cf5fe50..56e423a4a7936cf4a23a4593a4fbfd2d79760cd2 100644 (file)
@@ -478,6 +478,9 @@ static void
 mpegts_input_stopping_mux
   ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
 {
+  pthread_mutex_lock(&mi->mi_input_lock);
+  mi->mi_stop = 1;
+  pthread_mutex_unlock(&mi->mi_input_lock);
   pthread_mutex_lock(&mi->mi_output_lock);
   mi->mi_stop = 1;
   pthread_mutex_unlock(&mi->mi_output_lock);
@@ -599,9 +602,10 @@ mpegts_input_recv_packets
     off += len2;
 
     pthread_mutex_lock(&mi->mi_input_lock);
-    if (TAILQ_FIRST(&mi->mi_input_queue) == NULL)
+    if (!mi->mi_stop) {
+      TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
       pthread_cond_signal(&mi->mi_input_cond);
-    TAILQ_INSERT_TAIL(&mi->mi_input_queue, mp, mp_link);
+    }
     pthread_mutex_unlock(&mi->mi_input_lock);
   }
 
@@ -790,11 +794,13 @@ mpegts_input_process
             // TODO: might be able to optimise this a bit by having slightly
             //       larger buffering and trying to aggregate data (if we get
             //       same PID multiple times in the loop)
-            mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
-            memcpy(mtf->mtf_tsb, tsb, 188);
-            mtf->mtf_mux   = mm;
-            TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
-            table_wakeup = 1;
+            if (!mi->mi_stop) {
+              mpegts_table_feed_t *mtf = malloc(sizeof(mpegts_table_feed_t));
+              memcpy(mtf->mtf_tsb, tsb, 188);
+              mtf->mtf_mux   = mm;
+              TAILQ_INSERT_TAIL(&mi->mi_table_queue, mtf, mtf_link);
+              table_wakeup = 1;
+            }
           }
         } else {
           //tvhdebug("tsdemux", "%s - SI packet had errors", name);
@@ -884,12 +890,10 @@ mpegts_input_table_thread ( void *aux )
     pthread_mutex_unlock(&mi->mi_output_lock);
     
     /* Process */
-    if (mtf->mtf_mux) {
-      pthread_mutex_lock(&global_lock);
-      if (!mi->mi_stop)
-        mpegts_input_table_dispatch(mtf->mtf_mux, mtf->mtf_tsb);
-      pthread_mutex_unlock(&global_lock);
-    }
+    pthread_mutex_lock(&global_lock);
+    if (!mi->mi_stop && mtf->mtf_mux)
+      mpegts_input_table_dispatch(mtf->mtf_mux, mtf->mtf_tsb);
+    pthread_mutex_unlock(&global_lock);
 
     /* Cleanup */
     free(mtf);
@@ -927,11 +931,13 @@ mpegts_input_flush_mux
   }
   pthread_mutex_unlock(&mi->mi_input_lock);
 
-  /* Flush table Q - the global lock is already held */
+  /* Flush table Q */
+  pthread_mutex_lock(&mi->mi_output_lock);
   TAILQ_FOREACH(mtf, &mi->mi_table_queue, mtf_link) {
     if (mtf->mtf_mux == mm)
       mtf->mtf_mux = NULL;
   }
+  pthread_mutex_unlock(&mi->mi_output_lock);
   /* stop flag must be set here */
   /* otherwise the picked mtf might be processed after mux deactivation */
   assert(mi->mi_stop);
index 9fed7f08f19908b1723c4745c3ac4f7ead06012b..3c43f0bfd677064c47c9b95524147dec08c42320 100644 (file)
@@ -691,19 +691,20 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force )
 
   /* Stop possible recursion */
   if (!mmi) return;
+
+  /* Clear */
   mm->mm_active = NULL;
 
   mpegts_mux_nice_name(mm, buf, sizeof(buf));
   tvhdebug("mpegts", "%s - stopping mux", buf);
 
-  if (mmi) {
-    mi = mmi->mmi_input;
-    mi->mi_stopping_mux(mi, mmi);
-    LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link)
-      subscription_unlink_mux(sub, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
-    mi->mi_stop_mux(mi, mmi);
-    mi->mi_stopped_mux(mi, mmi);
-  }
+  mi = mmi->mmi_input;
+  assert(mi);
+  mi->mi_stopping_mux(mi, mmi);
+  LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link)
+    subscription_unlink_mux(sub, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
+  mi->mi_stop_mux(mi, mmi);
+  mi->mi_stopped_mux(mi, mmi);
 
   /* Flush all tables */
   tvhtrace("mpegts", "%s - flush tables", buf);
@@ -711,31 +712,26 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force )
 
   tvhtrace("mpegts", "%s - mi=%p", buf, (void *)mi);
   /* Flush table data queue */
-  if (mi)
-    mpegts_input_flush_mux(mi, mm);
+  mpegts_input_flush_mux(mi, mm);
 
   /* Ensure PIDs are cleared */
-  if (mi) {
-    pthread_mutex_lock(&mi->mi_output_lock);
-    mm->mm_last_pid = -1;
-    mm->mm_last_mp = NULL;
-    while ((mp = RB_FIRST(&mm->mm_pids))) {
-      assert(mi);
-      while ((mps = RB_FIRST(&mp->mp_subs))) {
-        RB_REMOVE(&mp->mp_subs, mps, mps_link);
-        free(mps);
-      }
-      RB_REMOVE(&mm->mm_pids, mp, mp_link);
-      if (mp->mp_fd != -1) {
-        tvhdebug("mpegts", "%s - close PID %04X (%d)", buf, mp->mp_pid, mp->mp_pid);
-        close(mp->mp_fd);
-      }
-      free(mp);
+  pthread_mutex_lock(&mi->mi_output_lock);
+  mm->mm_last_pid = -1;
+  mm->mm_last_mp = NULL;
+  while ((mp = RB_FIRST(&mm->mm_pids))) {
+    assert(mi);
+    while ((mps = RB_FIRST(&mp->mp_subs))) {
+      RB_REMOVE(&mp->mp_subs, mps, mps_link);
+      free(mps);
     }
-    pthread_mutex_unlock(&mi->mi_output_lock);
-  } else {
-    assert(RB_FIRST(&mm->mm_pids) == NULL);
+    RB_REMOVE(&mm->mm_pids, mp, mp_link);
+    if (mp->mp_fd != -1) {
+      tvhdebug("mpegts", "%s - close PID %04X (%d)", buf, mp->mp_pid, mp->mp_pid);
+      close(mp->mp_fd);
+    }
+    free(mp);
   }
+  pthread_mutex_unlock(&mi->mi_output_lock);
 
   /* Scanning */
   mpegts_network_scan_mux_cancel(mm, 1);
@@ -745,9 +741,6 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force )
 
   /* Events */
   mpegts_fire_event(mm, ml_mux_stop);
-
-  /* Clear */
-  mm->mm_active = NULL;
 }
 
 void