]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
http: Let full mux dump use the normal straeming code
authorAndreas Öman <andreas@lonelycoder.com>
Thu, 25 Oct 2012 07:53:12 +0000 (09:53 +0200)
committerAndreas Öman <andreas@lonelycoder.com>
Thu, 25 Oct 2012 11:06:55 +0000 (13:06 +0200)
Fix some locking issues in the http streaming code

src/muxer_pass.c
src/webui/webui.c

index 7305343838b7b6d6034ee5078dc71aa11f5d0399..20a986a75fe013be9681d466bf8a2d117ae5aa6d 100644 (file)
@@ -107,7 +107,8 @@ pass_muxer_reconfigure(muxer_t* m, const struct streaming_start *ss)
   pass_muxer_t *pm = (pass_muxer_t*)m;
   const source_info_t *si = &ss->ss_si;
 
-  if(si->si_type == S_MPEG_TS) {
+  if(si->si_type == S_MPEG_TS && ss->ss_pmt_pid) {
+    pm->pm_pat = realloc(pm->pm_pat, 188);
     memset(pm->pm_pat, 0xff, 188);
     pm->pm_pat[0] = 0x47;
     pm->pm_pat[1] = 0x40;
@@ -120,6 +121,7 @@ pass_muxer_reconfigure(muxer_t* m, const struct streaming_start *ss)
       return -1;
     }
 
+    pm->pm_pmt = realloc(pm->pm_pmt, 188);
     memset(pm->pm_pmt, 0xff, 188);
     pm->pm_pmt[0] = 0x47;
     pm->pm_pmt[1] = 0x40 | (ss->ss_pmt_pid >> 8);
@@ -216,14 +218,16 @@ pass_muxer_write_ts(muxer_t *m, pktbuf_t *pb)
   pass_muxer_t *pm = (pass_muxer_t*)m;
   int rem;
 
-  // Inject pmt and pat into the stream
-  rem = pm->pm_pc % TS_INJECTION_RATE;
-  if(!rem) {
-    pm->pm_pat[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f);
-    pm->pm_pmt[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f);
-    pass_muxer_write(m, pm->pm_pmt, 188);
-    pass_muxer_write(m, pm->pm_pat, 188);
-    pm->pm_ic++;
+  if(pm->pm_pat != NULL) {
+    // Inject pmt and pat into the stream
+    rem = pm->pm_pc % TS_INJECTION_RATE;
+    if(!rem) {
+      pm->pm_pat[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f);
+      pm->pm_pmt[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f);
+      pass_muxer_write(m, pm->pm_pmt, 188);
+      pass_muxer_write(m, pm->pm_pat, 188);
+      pm->pm_ic++;
+    }
   }
 
   pass_muxer_write(m, pb->pb_data, pb->pb_size);
@@ -331,9 +335,6 @@ pass_muxer_create(muxer_container_type_t mc)
   pm->m_close        = pass_muxer_close;
   pm->m_destroy      = pass_muxer_destroy;
 
-  pm->pm_pat = malloc(188);
-  pm->pm_pmt = malloc(188);
-
   return (muxer_t *)pm;
 }
 
index 05e85df97db2d866802f675f89323a3ce5bac526..7919f04015baf40de27cfff1a4aabf5a7b7602cc 100644 (file)
@@ -142,8 +142,8 @@ page_static_file(http_connection_t *hc, const char *remain, void *opaque)
  * HTTP stream loop
  */
 static void
-http_stream_run(http_connection_t *hc, streaming_queue_t *sq, 
-               th_subscription_t *s, muxer_container_type_t mc)
+http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
+               const char *name, muxer_container_type_t mc)
 {
   streaming_message_t *sm;
   int run = 1;
@@ -154,17 +154,11 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
   struct timeval  tp;
   int err = 0;
   socklen_t errlen = sizeof(err);
-  const char *name;
 
   mux = muxer_create(mc);
   if(muxer_open_stream(mux, hc->hc_fd))
     run = 0;
 
-  if(s->ths_channel)
-    name = s->ths_channel->ch_name;
-  else
-    name = "Live Stream";
-
   /* reduce timeout on write() for streaming */
   tp.tv_sec  = 5;
   tp.tv_usec = 0;
@@ -266,76 +260,6 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
 }
 
 
-
-/**
- * HTTP stream loop
- */
-static void
-http_stream_run2(http_connection_t *hc, streaming_queue_t *sq)
-{
-  streaming_message_t *sm;
-  int run = 1;
-  int timeouts = 0;
-  struct timespec ts;
-  struct timeval  tp;
-  int err = 0;
-  socklen_t errlen = sizeof(err);
-
-  /* reduce timeout on write() for streaming */
-  tp.tv_sec  = 5;
-  tp.tv_usec = 0;
-  setsockopt(hc->hc_fd, SOL_SOCKET, SO_SNDTIMEO, &tp, sizeof(tp));
-  http_output_content(hc, "application/octet-stream");
-
-  while(run) {
-    pthread_mutex_lock(&sq->sq_mutex);
-    sm = TAILQ_FIRST(&sq->sq_queue);
-    if(sm == NULL) {      
-      gettimeofday(&tp, NULL);
-      ts.tv_sec  = tp.tv_sec + 1;
-      ts.tv_nsec = tp.tv_usec * 1000;
-
-      if(pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts) == ETIMEDOUT) {
-          timeouts++;
-
-          //Check socket status
-          getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen);  
-          if(err) {
-           tvhlog(LOG_DEBUG, "webui",  "Stop streaming %s, client hung up", hc->hc_url_orig);
-           run = 0;
-          }else if(timeouts >= 20) {
-           tvhlog(LOG_WARNING, "webui",  "Stop streaming %s, timeout waiting for packets", hc->hc_url_orig);
-           run = 0;
-          }
-      }
-      pthread_mutex_unlock(&sq->sq_mutex);
-      continue;
-    }
-
-    timeouts = 0; //Reset timeout counter
-    TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
-    pthread_mutex_unlock(&sq->sq_mutex);
-
-    pktbuf_t *pb;
-
-    switch(sm->sm_type) {
-    case SMT_MPEGTS:
-      pb = sm->sm_data;
-      if(write(hc->hc_fd, pb->pb_data, pb->pb_size) != pb->pb_size) {
-       tvhlog(LOG_DEBUG, "webui",  "Write error %s, stopping", hc->hc_url_orig);
-       run = 0;
-      }
-      break;
-    default:
-      break;
-    }
-
-    streaming_msg_free(sm);
-  }
-}
-
-
-
 /**
  * Output a playlist containing a single channel
  */
@@ -618,7 +542,8 @@ http_stream_service(http_connection_t *hc, service_t *service)
   muxer_container_type_t mc;
   int flags;
   const char *str;
-  size_t qsize ;
+  size_t qsize;
+  const char *name;
 
   mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux"));
   if(mc == MC_UNKNOWN) {
@@ -645,15 +570,14 @@ http_stream_service(http_connection_t *hc, service_t *service)
     flags = 0;
   }
 
-  pthread_mutex_lock(&global_lock);
   s = subscription_create_from_service(service, "HTTP", st, flags);
-  pthread_mutex_unlock(&global_lock);
-
   if(s) {
-    http_stream_run(hc, &sq, s, mc);
+    name = strdupa(service->s_ch ?
+                   service->s_ch->ch_name : service->s_nicename);
+    pthread_mutex_unlock(&global_lock);
+    http_stream_run(hc, &sq, name, mc);
     pthread_mutex_lock(&global_lock);
     subscription_unsubscribe(s);
-    pthread_mutex_unlock(&global_lock);
   }
 
   if(gh)
@@ -676,19 +600,15 @@ http_stream_tdmi(http_connection_t *hc, th_dvb_mux_instance_t *tdmi)
 {
   th_subscription_t *s;
   streaming_queue_t sq;
-
+  const char *name;
   streaming_queue_init(&sq, SMT_PACKET);
 
-  pthread_mutex_lock(&global_lock);
   s = dvb_subscription_create_from_tdmi(tdmi, "HTTP", &sq.sq_st);
+  name = strdupa(tdmi->tdmi_identifier);
   pthread_mutex_unlock(&global_lock);
-
-  http_stream_run2(hc, &sq);
-
-
+  http_stream_run(hc, &sq, name, MC_PASS);
   pthread_mutex_lock(&global_lock);
   subscription_unsubscribe(s);
-  pthread_mutex_unlock(&global_lock);
 
   streaming_queue_deinit(&sq);
 
@@ -713,6 +633,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch)
   muxer_container_type_t mc;
   char *str;
   size_t qsize;
+  const char *name;
 
   mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux"));
   if(mc == MC_UNKNOWN) {
@@ -739,18 +660,17 @@ http_stream_channel(http_connection_t *hc, channel_t *ch)
     flags = 0;
   }
 
-  pthread_mutex_lock(&global_lock);
   s = subscription_create_from_channel(ch, priority, "HTTP", st, flags,
                                       inet_ntoa(hc->hc_peer->sin_addr),
                                       hc->hc_username,
                                       http_arg_get(&hc->hc_args, "User-Agent"));
-  pthread_mutex_unlock(&global_lock);
 
   if(s) {
-    http_stream_run(hc, &sq, s, mc);
+    name = strdupa(ch->ch_name);
+    pthread_mutex_unlock(&global_lock);
+    http_stream_run(hc, &sq, name, mc);
     pthread_mutex_lock(&global_lock);
     subscription_unsubscribe(s);
-    pthread_mutex_unlock(&global_lock);
   }
 
   if(gh)
@@ -792,7 +712,7 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque)
 
   http_deescape(components[1]);
 
-  pthread_mutex_lock(&global_lock);
+  scopedgloballock();
 
   if(!strcmp(components[0], "channelid")) {
     ch = channel_find_by_identifier(atoi(components[1]));
@@ -804,9 +724,6 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque)
     tdmi = dvb_mux_find_by_identifier(components[1]);
   }
 
-  // bug here: We can't retain pointers to channels etc outside global_lock
-  pthread_mutex_unlock(&global_lock);
-
   if(ch != NULL) {
     return http_stream_channel(hc, ch);
   } else if(service != NULL) {