]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
dvb: Add support for grabbing entire mux directly via HTTP
authorAndreas Öman <andreas@lonelycoder.com>
Mon, 22 Oct 2012 14:19:26 +0000 (16:19 +0200)
committerAndreas Öman <andreas@lonelycoder.com>
Thu, 25 Oct 2012 11:06:05 +0000 (13:06 +0200)
src/dvb/dvb.c
src/dvb/dvb.h
src/dvb/dvb_adapter.c
src/dvb/dvb_multiplex.c
src/dvb/dvb_service.c
src/main.c
src/subscriptions.c
src/subscriptions.h
src/webui/webui.c

index 4c764eba1e4c452146102e48017980f38c2d52bc..9e3775b3156683d3db32895224cb93602912aa21 100644 (file)
@@ -23,8 +23,8 @@
 #include "dvb_charset.h"
 
 void
-dvb_init(uint32_t adapter_mask)
+dvb_init(uint32_t adapter_mask, const char *rawfile)
 {
   dvb_charset_init();
-  dvb_adapter_init(adapter_mask);
+  dvb_adapter_init(adapter_mask, rawfile);
 }
index 8d7929a9af84d22e59c04875ce160f9f10e4d917..2b0460d5ea78cd8faf4a8d308fec573df3b6ab6a 100644 (file)
@@ -150,6 +150,8 @@ typedef struct th_dvb_mux_instance {
 
   TAILQ_HEAD(, epggrab_ota_mux) tdmi_epg_grab;
 
+  struct th_subscription_list tdmi_subscriptions;
+
 } th_dvb_mux_instance_t;
 
 
@@ -255,6 +257,10 @@ typedef struct th_dvb_adapter {
 
   int tda_rawmode;
 
+  // Full mux streaming, protected via the delivery mutex
+
+  streaming_pad_t tda_streaming_pad;
+
 
   struct dvb_table_feed_queue tda_table_feed;
   pthread_cond_t tda_table_feed_cond;  // Bound to tda_delivery_mutex
@@ -317,12 +323,12 @@ typedef struct th_dvb_table {
 extern struct th_dvb_adapter_queue dvb_adapters;
 extern struct th_dvb_mux_instance_tree dvb_muxes;
 
-void dvb_init(uint32_t adapter_mask);
+void dvb_init(uint32_t adapter_mask, const char *rawfile);
 
 /**
  * DVB Adapter
  */
-void dvb_adapter_init(uint32_t adapter_mask);
+void dvb_adapter_init(uint32_t adapter_mask, const char *rawfile);
 
 void dvb_adapter_mux_scanner(void *aux);
 
@@ -524,4 +530,14 @@ dvb_satconf_t *dvb_satconf_entry_find(th_dvb_adapter_t *tda,
 void dvb_lnb_get_frequencies(const char *id, 
                             int *f_low, int *f_hi, int *f_switch);
 
+
+/**
+ * Raw demux
+ */
+struct th_subscription;
+struct th_subscription *dvb_subscription_create_from_tdmi(th_dvb_mux_instance_t *tdmi,
+                                                         const char *name,
+                                                         streaming_target_t *st);
+
 #endif /* DVB_H_ */
+
index c3ad82b76bc770367fc6ce256c8b146ea8da5616..b52df831918df1cf512bc5581f89a22b4ef9eb36 100644 (file)
@@ -64,6 +64,7 @@ tda_alloc(void)
     TAILQ_INIT(&tda->tda_scan_queues[i]);
   TAILQ_INIT(&tda->tda_initial_scan_queue);
   TAILQ_INIT(&tda->tda_satconfs);
+  streaming_pad_init(&tda->tda_streaming_pad);
   return tda;
 }
 
@@ -459,6 +460,53 @@ tda_add(int adapter_num)
   gtimer_arm(&tda->tda_mux_scanner_timer, dvb_adapter_mux_scanner, tda, 1);
 }
 
+
+/**
+ *
+ */
+static void
+tda_add_from_file(const char *filename)
+{
+  int i, r;
+  th_dvb_adapter_t *tda;
+  char buf[400];
+
+  tda = tda_alloc();
+
+  tda->tda_adapter_num = -1;
+  tda->tda_fe_fd       = -1;
+  tda->tda_dvr_pipe[0] = -1;
+
+  tda->tda_type = -1;
+
+  snprintf(buf, sizeof(buf), "%s", filename);
+
+  r = strlen(buf);
+  for(i = 0; i < r; i++)
+    if(!isalnum((int)buf[i]))
+      buf[i] = '_';
+
+  tda->tda_identifier = strdup(buf);
+  
+  tda->tda_autodiscovery = 0;
+  tda->tda_idlescan = 0;
+
+  tda->tda_sat = 0;
+
+  /* Come up with an initial displayname, user can change it and it will
+     be overridden by any stored settings later on */
+
+  tda->tda_displayname = strdup(filename);
+
+  TAILQ_INSERT_TAIL(&dvb_adapters, tda, tda_global_link);
+
+  dvb_input_raw_setup(tda);
+}
+
+
+/**
+ *
+ */
 void
 dvb_adapter_start ( th_dvb_adapter_t *tda )
 {
@@ -515,7 +563,7 @@ dvb_adapter_stop ( th_dvb_adapter_t *tda )
  *
  */
 void
-dvb_adapter_init(uint32_t adapter_mask)
+dvb_adapter_init(uint32_t adapter_mask, const char *rawfile)
 {
   htsmsg_t *l, *c;
   htsmsg_field_t *f;
@@ -529,6 +577,10 @@ dvb_adapter_init(uint32_t adapter_mask)
     if ((1 << i) & adapter_mask) 
       tda_add(i);
 
+  if(rawfile)
+    tda_add_from_file(rawfile);
+
+
   l = hts_settings_load("dvbadapters");
   if(l != NULL) {
     HTSMSG_FOREACH(f, l) {
@@ -605,6 +657,10 @@ dvb_adapter_mux_scanner(void *aux)
   if(service_compute_weight(&tda->tda_transports) > 0)
     return;
 
+  if(tda->tda_mux_current != NULL &&
+     LIST_FIRST(&tda->tda_mux_current->tdmi_subscriptions) != NULL)
+    return; // Someone is doing full mux dump
+
   /* Check if we have muxes pending for quickscan, if so, choose them */
   if((tdmi = TAILQ_FIRST(&tda->tda_initial_scan_queue)) != NULL) {
     dvb_fe_tune(tdmi, "Initial autoscan");
@@ -804,7 +860,18 @@ dvb_adapter_input_dvr(void *aux)
   
       /* sync */
       if (tsb[i] == 0x47) {
-       
+
+       if(LIST_FIRST(&tda->tda_streaming_pad.sp_targets) != NULL) {
+         streaming_message_t sm;
+         pktbuf_t *pb = pktbuf_alloc(tsb, 188);
+         memset(&sm, 0, sizeof(sm));
+         sm.sm_type = SMT_MPEGTS;
+         sm.sm_data = pb;
+         streaming_pad_deliver(&tda->tda_streaming_pad, &sm);
+         pktbuf_ref_dec(pb);
+       }
+
+
        if(!(tsb[i+1] & 0x80)) { // Only dispatch to table parser if not error
          int pid = (tsb[i+1] & 0x1f) << 8 | tsb[i+2];
          if(tda->tda_table_filter[pid]) {
index 46ea57efbb33bd11b2c46bd4080c00f3a66a0eea..1ac5caf1624b0b9c83688c275268523b08639423 100644 (file)
@@ -1285,3 +1285,32 @@ th_dvb_mux_instance_t *dvb_mux_find
   }
   return NULL;
 }
+
+
+/**
+ *
+ */
+th_subscription_t *
+dvb_subscription_create_from_tdmi(th_dvb_mux_instance_t *tdmi,
+                                 const char *name,
+                                 streaming_target_t *st)
+{
+  th_subscription_t *s;
+  th_dvb_adapter_t *tda = tdmi->tdmi_adapter;
+
+  s = subscription_create(INT32_MAX, name, st, SUBSCRIPTION_RAW_MPEGTS, 
+                         NULL, NULL, NULL, NULL);
+  
+
+  s->ths_tdmi = tdmi;
+  LIST_INSERT_HEAD(&tdmi->tdmi_subscriptions, s, ths_tdmi_link);
+
+  dvb_fe_tune(tdmi, "Full mux subscription");
+
+  pthread_mutex_lock(&tda->tda_delivery_mutex);
+  streaming_target_connect(&tda->tda_streaming_pad, &s->ths_input);
+  pthread_mutex_unlock(&tda->tda_delivery_mutex);
+
+  notify_reload("subscriptions");
+  return s;
+}
index 32877f8e91433642a8244f23fb764998e2831654..82621227125d11f6719a2e7d4ab5acebd6fa211e 100644 (file)
@@ -79,7 +79,10 @@ dvb_service_start(service_t *t, unsigned int weight, int force_start)
     if(w && w >= weight && !force_start)
       /* We are outranked by weight, cant use it */
       return SM_CODE_NOT_FREE;
-    
+
+    if(LIST_FIRST(&tdmi->tdmi_subscriptions) != NULL)
+      return SM_CODE_NOT_FREE;
+
     dvb_adapter_clean(tda);
   }
 
index 9cca79fdff63412a0bfbb79bffc652358d997cee..263bb6ce7f3e9b68cbc8fd159a0a6bb0f37e23f1 100644 (file)
@@ -262,6 +262,7 @@ main(int argc, char **argv)
   sigset_t set;
   const char *homedir;
   const char *rawts_input = NULL;
+  const char *dvb_rawts_input = NULL;
   const char *join_transport = NULL;
   const char *confpath = NULL;
   char *p, *endp;
@@ -279,7 +280,7 @@ main(int argc, char **argv)
   // make sure the timezone is set
   tzset();
 
-  while((c = getopt(argc, argv, "Aa:fp:u:g:c:Chdr:j:sw:e:E:")) != -1) {
+  while((c = getopt(argc, argv, "Aa:fp:u:g:c:Chdr:j:sw:e:E:R:")) != -1) {
     switch(c) {
     case 'a':
       adapter_mask = 0x0;
@@ -340,6 +341,9 @@ main(int argc, char **argv)
     case 'r':
       rawts_input = optarg;
       break;
+    case 'R':
+      dvb_rawts_input = optarg;
+      break;
     case 'j':
       join_transport = optarg;
       break;
@@ -421,7 +425,7 @@ main(int argc, char **argv)
 
   tcp_server_init();
 #if ENABLE_LINUXDVB
-  dvb_init(adapter_mask);
+  dvb_init(adapter_mask, dvb_rawts_input);
 #endif
   iptv_input_init();
 #if ENABLE_V4L
index 1bd429dbcd52b425a3365d3a8528e6ac558e81c8..1cae0a4f8389eba07c0bda0a1da1280073ebd6fa 100644 (file)
@@ -39,6 +39,7 @@
 #include "htsmsg.h"
 #include "notify.h"
 #include "atomic.h"
+#include "dvb/dvb.h"
 
 struct th_subscription_list subscriptions;
 static gtimer_t subscription_reschedule_timer;
@@ -215,6 +216,14 @@ subscription_unsubscribe(th_subscription_t *s)
   if(t != NULL)
     service_remove_subscriber(t, s, SM_CODE_OK);
 
+  if(s->ths_tdmi != NULL) {
+    LIST_REMOVE(s, ths_tdmi_link);
+    th_dvb_adapter_t *tda = s->ths_tdmi->tdmi_adapter;
+    pthread_mutex_lock(&tda->tda_delivery_mutex);
+    streaming_target_disconnect(&tda->tda_streaming_pad, &s->ths_input);
+    pthread_mutex_unlock(&tda->tda_delivery_mutex);
+  }
+
   if(s->ths_start_message != NULL) 
     streaming_msg_free(s->ths_start_message);
  
@@ -302,9 +311,9 @@ subscription_input_direct(void *opauqe, streaming_message_t *sm)
 /**
  *
  */
-static th_subscription_t *
+th_subscription_t *
 subscription_create(int weight, const char *name, streaming_target_t *st,
-                   int flags, int direct, const char *hostname,
+                   int flags, st_callback_t *cb, const char *hostname,
                    const char *username, const char *client)
 {
   th_subscription_t *s = calloc(1, sizeof(th_subscription_t));
@@ -316,9 +325,8 @@ subscription_create(int weight, const char *name, streaming_target_t *st,
   else
     reject |= SMT_TO_MASK(SMT_MPEGTS);  // Reject raw mpegts
 
-
-  streaming_target_init(&s->ths_input, direct ? subscription_input_direct : 
-                       subscription_input, s, reject);
+  streaming_target_init(&s->ths_input, 
+                       cb ?: subscription_input_direct, s, reject);
 
   s->ths_weight            = weight;
   s->ths_title             = strdup(name);
@@ -348,8 +356,10 @@ subscription_create_from_channel(channel_t *ch, unsigned int weight,
                                 int flags, const char *hostname,
                                 const char *username, const char *client)
 {
-  th_subscription_t *s = subscription_create(weight, name, st, flags, 0,
-                                            hostname, username, client);
+  th_subscription_t *s;
+
+  s = subscription_create(weight, name, st, flags, subscription_input,
+                         hostname, username, client);
 
   s->ths_channel = ch;
   LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link);
@@ -391,13 +401,16 @@ subscription_create_from_channel(channel_t *ch, unsigned int weight,
  */
 th_subscription_t *
 subscription_create_from_service(service_t *t, const char *name,
-                                  streaming_target_t *st, int flags)
+                                streaming_target_t *st, int flags)
 {
-  th_subscription_t *s = subscription_create(INT32_MAX, name, st, flags, 1,
-                                            NULL, NULL, NULL);
+  th_subscription_t *s;
   source_info_t si;
   int r;
 
+  s = subscription_create(INT32_MAX, name, st, flags, 
+                         subscription_input_direct,
+                         NULL, NULL, NULL);
+
   if(t->s_status != SERVICE_RUNNING) {
     if((r = service_start(t, INT32_MAX, 1)) != 0) {
       subscription_unsubscribe(s);
index ff5d1d1d929874af045ea88d7f3478f9c7334076..0a88e5f51a4e06e3d359b0b8b1720c2a91948b6b 100644 (file)
@@ -65,6 +65,10 @@ typedef struct th_subscription {
   char *ths_username;
   char *ths_client;
 
+  // Ugly ugly ugly to refer DVB code here
+
+  LIST_ENTRY(th_subscription) ths_tdmi_link;
+  struct th_dvb_mux_instance *ths_tdmi;
 
 } th_subscription_t;
 
@@ -95,9 +99,14 @@ th_subscription_t *subscription_create_from_service(struct service *t,
                                                    streaming_target_t *st,
                                                    int flags);
 
-void subscription_change_weight(th_subscription_t *s, int weight);
+th_subscription_t *subscription_create(int weight, const char *name,
+                                      streaming_target_t *st,
+                                      int flags, st_callback_t *cb,
+                                      const char *hostname,
+                                      const char *username,
+                                      const char *client);
 
-void subscription_stop(th_subscription_t *s);
+void subscription_change_weight(th_subscription_t *s, int weight);
 
 void subscription_unlink_service(th_subscription_t *s, int reason);
 
index 26ac8c636c911835e681d7d8fb49f1ea4f6c1c38..05e85df97db2d866802f675f89323a3ce5bac526 100644 (file)
@@ -42,6 +42,8 @@
 #include "plumbing/globalheaders.h"
 #include "epg.h"
 #include "muxer.h"
+#include "dvb/dvb.h"
+#include "dvb/dvb_support.h"
 
 /**
  *
@@ -264,6 +266,76 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
 }
 
 
+
+/**
+ * HTTP stream loop
+ */
+static void
+http_stream_run2(http_connection_t *hc, streaming_queue_t *sq)
+{
+  streaming_message_t *sm;
+  int run = 1;
+  int timeouts = 0;
+  struct timespec ts;
+  struct timeval  tp;
+  int err = 0;
+  socklen_t errlen = sizeof(err);
+
+  /* reduce timeout on write() for streaming */
+  tp.tv_sec  = 5;
+  tp.tv_usec = 0;
+  setsockopt(hc->hc_fd, SOL_SOCKET, SO_SNDTIMEO, &tp, sizeof(tp));
+  http_output_content(hc, "application/octet-stream");
+
+  while(run) {
+    pthread_mutex_lock(&sq->sq_mutex);
+    sm = TAILQ_FIRST(&sq->sq_queue);
+    if(sm == NULL) {      
+      gettimeofday(&tp, NULL);
+      ts.tv_sec  = tp.tv_sec + 1;
+      ts.tv_nsec = tp.tv_usec * 1000;
+
+      if(pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts) == ETIMEDOUT) {
+          timeouts++;
+
+          //Check socket status
+          getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen);  
+          if(err) {
+           tvhlog(LOG_DEBUG, "webui",  "Stop streaming %s, client hung up", hc->hc_url_orig);
+           run = 0;
+          }else if(timeouts >= 20) {
+           tvhlog(LOG_WARNING, "webui",  "Stop streaming %s, timeout waiting for packets", hc->hc_url_orig);
+           run = 0;
+          }
+      }
+      pthread_mutex_unlock(&sq->sq_mutex);
+      continue;
+    }
+
+    timeouts = 0; //Reset timeout counter
+    TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
+    pthread_mutex_unlock(&sq->sq_mutex);
+
+    pktbuf_t *pb;
+
+    switch(sm->sm_type) {
+    case SMT_MPEGTS:
+      pb = sm->sm_data;
+      if(write(hc->hc_fd, pb->pb_data, pb->pb_size) != pb->pb_size) {
+       tvhlog(LOG_DEBUG, "webui",  "Write error %s, stopping", hc->hc_url_orig);
+       run = 0;
+      }
+      break;
+    default:
+      break;
+    }
+
+    streaming_msg_free(sm);
+  }
+}
+
+
+
 /**
  * Output a playlist containing a single channel
  */
@@ -596,6 +668,34 @@ http_stream_service(http_connection_t *hc, service_t *service)
 }
 
 
+/**
+ * Subscribes to a service and starts the streaming loop
+ */
+static int
+http_stream_tdmi(http_connection_t *hc, th_dvb_mux_instance_t *tdmi)
+{
+  th_subscription_t *s;
+  streaming_queue_t sq;
+
+  streaming_queue_init(&sq, SMT_PACKET);
+
+  pthread_mutex_lock(&global_lock);
+  s = dvb_subscription_create_from_tdmi(tdmi, "HTTP", &sq.sq_st);
+  pthread_mutex_unlock(&global_lock);
+
+  http_stream_run2(hc, &sq);
+
+
+  pthread_mutex_lock(&global_lock);
+  subscription_unsubscribe(s);
+  pthread_mutex_unlock(&global_lock);
+
+  streaming_queue_deinit(&sq);
+
+  return 0;
+}
+
+
 /**
  * Subscribes to a channel and starts the streaming loop
  */
@@ -668,6 +768,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch)
  * Handle the http request. http://tvheadend/stream/channelid/<chid>
  *                          http://tvheadend/stream/channel/<chname>
  *                          http://tvheadend/stream/service/<servicename>
+ *                          http://tvheadend/stream/mux/<muxid>
  */
 static int
 http_stream(http_connection_t *hc, const char *remain, void *opaque)
@@ -675,6 +776,7 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque)
   char *components[2];
   channel_t *ch = NULL;
   service_t *service = NULL;
+  th_dvb_mux_instance_t *tdmi = NULL;
 
   hc->hc_keep_alive = 0;
 
@@ -698,14 +800,19 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque)
     ch = channel_find_by_name(components[1], 0, 0);
   } else if(!strcmp(components[0], "service")) {
     service = service_find_by_identifier(components[1]);
+  } else if(!strcmp(components[0], "mux")) {
+    tdmi = dvb_mux_find_by_identifier(components[1]);
   }
 
+  // bug here: We can't retain pointers to channels etc outside global_lock
   pthread_mutex_unlock(&global_lock);
 
   if(ch != NULL) {
     return http_stream_channel(hc, ch);
   } else if(service != NULL) {
     return http_stream_service(hc, service);
+  } else if(tdmi != NULL) {
+    return http_stream_tdmi(hc, tdmi);
   } else {
     http_error(hc, HTTP_STATUS_BAD_REQUEST);
     return HTTP_STATUS_BAD_REQUEST;