]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
output: UDP streaming
authorinnes-labs <labs@innes.fr>
Thu, 5 Sep 2019 14:19:00 +0000 (16:19 +0200)
committerFlole998 <Flole998@users.noreply.github.com>
Fri, 7 Oct 2022 19:24:18 +0000 (21:24 +0200)
12 files changed:
Makefile
src/muxer.h
src/muxer/muxer_pass.c
src/tvheadend.h
src/tvhlog.c
src/tvhlog.h
src/udp.c
src/udp.h
src/udp_stream.c [new file with mode: 0755]
src/udp_stream.h [new file with mode: 0755]
src/webui/webui.c [changed mode: 0644->0755]
src/wrappers.c

index 4e5a947a375500419bf70c3ef227f940fa6aed88..1f1c2e36d729e946091883432f795e284047170a 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -226,6 +226,7 @@ SRCS-1 = \
        src/access.c \
        src/tcp.c \
        src/udp.c \
+       src/udp_stream.c \
        src/url.c \
        src/http.c \
        src/notify.c \
index d20af8bdd383238d39034172816cdcc6dfe136b7..e9f2bc22fbf12966f7dc16bc0623b0d378209364 100644 (file)
@@ -64,6 +64,7 @@ typedef struct muxer_config {
    */
   int                  m_file_permissions;
   int                  m_directory_permissions; 
+  int                  m_output_chunk; /* > 0 if muxer output needs writing in chunks */   
 
   /*
    * type specific section
index aa9a0eea660b4a6766fee70affd0d9b140c031f8..95ba86e527b9fea5b6075006fd8471c77eef6afc 100644 (file)
@@ -583,10 +583,20 @@ static void
 pass_muxer_write(muxer_t *m, const void *data, size_t size)
 {
   pass_muxer_t *pm = (pass_muxer_t*)m;
+  int ret;
 
   if(pm->pm_error) {
     pm->m_errors++;
-  } else if(tvh_write(pm->pm_fd, data, size)) {
+    return;
+  } 
+  
+  if (pm->m_config.m_output_chunk > 0) {
+    ret = tvh_write_in_chunks(pm->pm_fd, data, size, pm->m_config.m_output_chunk);
+  } else {
+    ret = tvh_write(pm->pm_fd, data, size);
+  }
+
+  if(ret) {
     pm->pm_error = errno;
     if (!MC_IS_EOS_ERROR(errno))
       tvherror(LS_PASS, "%s: Write failed -- %s", pm->pm_filename,
index 072391d3e0db5942dbeab9e7f221d3b8d042fc59..8a7965235d5489dfd94689f647d2fad3f80a9f8a 100644 (file)
@@ -245,6 +245,8 @@ void tvh_pipe_close(th_pipe_t *pipe);
 
 int tvh_write(int fd, const void *buf, size_t len);
 
+int tvh_write_in_chunks(int fd, const void *buf, size_t len, size_t chunkSize);
+
 int tvh_nonblock_write(int fd, const void *buf, size_t len);
 
 FILE *tvh_fopen(const char *filename, const char *mode);
index 5e025cec4c4682a723e734134e1551e4a89bea08..ec0eba874f3448c3ecbcbe3c97a7b2a778d8f08b 100644 (file)
@@ -184,6 +184,7 @@ tvhlog_subsys_t tvhlog_subsystems[] = {
 #if ENABLE_DDCI
   [LS_DDCI]          = { "ddci",          N_("DD-CI") },
 #endif
+  [LS_UDP]           = { "udp",           N_("UDP Streamer") },  
 
 };
 
index c891b826bd0550b73d1dcf6d605cd8637aa6d99a..3b7a76fb185f59c8c51fa2f00d9c61fb1fbf16a8 100644 (file)
@@ -198,6 +198,7 @@ enum {
 #if ENABLE_DDCI
   LS_DDCI,
 #endif
+  LS_UDP,      
   LS_LAST     /* keep this last */
 };
 
index 192877069806a99bb798f8f43d1ff6f5329e6bc0..b88734011a4a1a6e1c299fa1856a174978b3dfd4 100644 (file)
--- a/src/udp.c
+++ b/src/udp.c
@@ -168,7 +168,8 @@ udp_bind ( int subsystem, const char *name,
   uc->ifname               = ifname ? strdup(ifname) : NULL;
   uc->subsystem            = subsystem;
   uc->name                 = name ? strdup(name) : NULL;
-  uc->rxtxsize             = rxsize;
+  uc->rxsize               = rxsize;
+  uc->txsize               = txsize;
 
   if (udp_resolve(uc, &uc->ip, uc->host, port, &uc->multicast, 1)) {
     udp_close(uc);
@@ -203,13 +204,15 @@ udp_bind ( int subsystem, const char *name,
 
   /* IPv4 */
   if (uc->ip.ss_family == AF_INET) {
-    /* Bind */
+    /* Bind useful for receiver subsystem (not for udp streamer) */
+    if (subsystem != LS_UDP) {
     if (bind(fd, (struct sockaddr *)&uc->ip, sizeof(struct sockaddr_in))) {
       inet_ntop(AF_INET, &IP_AS_V4(uc->ip, addr), buf, sizeof(buf));
       tvherror(subsystem, "%s - cannot bind %s:%hu [e=%s]",
                name, buf, ntohs(IP_AS_V4(uc->ip, port)), strerror(errno));
       goto error;
     }
+    }  
 
     if (uc->multicast) {
       /* Join multicast group */
@@ -381,7 +384,7 @@ udp_sendinit ( int subsystem, const char *name,
   uc->ifname               = ifname ? strdup(ifname) : NULL;
   uc->subsystem            = subsystem;
   uc->name                 = name ? strdup(name) : NULL;
-  uc->rxtxsize             = txsize;
+  uc->txsize               = txsize;
 
   /* Open socket */
   if ((fd = tvh_socket(uc->ip.ss_family, SOCK_DGRAM, 0)) == -1) {
index 7e810727cf52f84b7b9dc017b1dd1540f739e8ef..55b6dfad39b9c07a559b477a14bd7df59d73d9d1 100644 (file)
--- a/src/udp.h
+++ b/src/udp.h
@@ -38,7 +38,8 @@ typedef struct udp_connection {
   int fd;
   int subsystem;
   char *name;
-  int rxtxsize;
+  int rxsize;
+  int txsize;
 } udp_connection_t;
 
 udp_connection_t *
diff --git a/src/udp_stream.c b/src/udp_stream.c
new file mode 100755 (executable)
index 0000000..48842a9
--- /dev/null
@@ -0,0 +1,266 @@
+/*
+ *  TVHeadend - UDP stream common routines
+ *
+ *  Copyright (C) 2019 Stephane Duperron
+ *
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#define _GNU_SOURCE
+#include "tvheadend.h"
+#include "config.h"
+#include "udp_stream.h"
+
+static tvh_mutex_t udp_streams_mutex = TVH_THREAD_MUTEX_INITIALIZER;
+static struct udp_stream_list udp_streams;
+
+udp_stream_t *
+create_udp_stream ( udp_connection_t *uc, const char *hint )
+{
+  udp_stream_t *ustream;
+  char port_buf[6];  
+  size_t unlen;
+
+  ustream = calloc(1,sizeof(udp_stream_t));
+  if (!ustream) {
+    return NULL;
+  }
+
+  ustream->us_uc = uc;
+  ustream->us_hint = strdup(hint);
+  snprintf(port_buf, 6, "%d", uc->port);  
+  unlen = strlen(uc->host) + strlen(port_buf) + 8;
+  ustream->us_udp_url = malloc(unlen);
+  snprintf(ustream->us_udp_url, unlen, "udp://%s:%s", uc->host, port_buf);  
+
+  tvh_mutex_lock(&udp_streams_mutex);
+  LIST_INSERT_HEAD(&udp_streams, ustream, us_link);
+  tvh_mutex_unlock(&udp_streams_mutex);
+  return ustream;
+}
+
+void
+delete_udp_stream (udp_stream_t * ustream)
+{
+  tvh_mutex_lock(&udp_streams_mutex);
+  LIST_REMOVE(ustream, us_link);
+  tvh_mutex_unlock(&udp_streams_mutex);
+  free(ustream->us_hint);
+  free(ustream->us_udp_url);
+  free(ustream);  
+}
+
+udp_stream_t *
+find_udp_stream_by_hint ( const char *hint )
+{
+  udp_stream_t *us;
+  int found = 0;
+
+  tvh_mutex_lock(&udp_streams_mutex);
+  LIST_FOREACH(us, &udp_streams, us_link)
+  if(strcmp(us->us_hint, hint) == 0) {
+    found = 1;
+    break;
+  }
+  tvh_mutex_unlock(&udp_streams_mutex);
+
+  if (found) {
+    return us;
+  } else {
+    return NULL;
+  }
+}
+
+udp_stream_t *
+find_udp_stream_by_url ( const char *url )
+{
+  udp_stream_t *us;
+  int found = 0;
+
+  tvh_mutex_lock(&udp_streams_mutex);
+  LIST_FOREACH(us, &udp_streams, us_link)
+  if(strcmp(us->us_udp_url, url) == 0) {
+    found = 1;
+    break;
+  }
+  tvh_mutex_unlock(&udp_streams_mutex);
+
+  if (found) {
+    return us;
+  } else {
+    return NULL;
+  }
+}
+
+static void *
+udp_stream_thread ( void *aux )    
+{
+  udp_stream_t *us = (udp_stream_t *)aux;
+  profile_chain_t *prch = &(us->us_prch);
+  streaming_message_t *sm;
+  int run = 1, started = 0;
+  streaming_queue_t *sq = &prch->prch_sq;
+  muxer_t *mux = prch->prch_muxer;
+  int ptimeout, grace = 20, r;
+  streaming_start_t *ss_copy;
+  int64_t lastpkt, mono;
+  int fd = us->us_uc->fd;
+
+  if(muxer_open_stream(mux, fd))
+    run = 0;
+       mux->m_config.m_output_chunk = us->us_uc->txsize;
+  if (config.dscp >= 0)
+    socket_set_dscp(fd, config.dscp, NULL, 0);
+
+  lastpkt = mclk();
+  ptimeout = prch->prch_pro ? prch->prch_pro->pro_timeout : 5;
+
+  while(atomic_get(&us->us_running) && run && tvheadend_is_running()) {
+    tvh_mutex_lock(&sq->sq_mutex);
+    sm = TAILQ_FIRST(&sq->sq_queue);
+    if(sm == NULL) {
+      mono = mclk() + sec2mono(1);
+      do {
+        r = tvh_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, mono);
+        if (r == ETIMEDOUT) {
+          break;
+        }
+      } while (ERRNO_AGAIN(r));
+      tvh_mutex_unlock(&sq->sq_mutex);
+      continue;
+    }
+
+    streaming_queue_remove(sq, sm);
+    tvh_mutex_unlock(&sq->sq_mutex);
+
+    switch(sm->sm_type) {
+    case SMT_MPEGTS:
+    case SMT_PACKET:
+      if(started) {
+        pktbuf_t *pb;
+        int len;
+        if (sm->sm_type == SMT_PACKET)
+          pb = ((th_pkt_t*)sm->sm_data)->pkt_payload;
+        else
+          pb = sm->sm_data;
+        subscription_add_bytes_out(us->us_subscript, len = pktbuf_len(pb));
+        if (len > 0)
+          lastpkt = mclk();
+        muxer_write_pkt(mux, sm->sm_type, sm->sm_data);
+        sm->sm_data = NULL;
+      }
+      break;
+
+    case SMT_GRACE:
+      grace = sm->sm_code < 5 ? 5 : grace;
+      break;
+
+    case SMT_START:
+      grace = 10;
+      if(!started) {
+        tvhdebug(LS_UDP, "Start streaming %s", us->us_udp_url);
+        ss_copy = streaming_start_copy((streaming_start_t *)sm->sm_data);
+        if(muxer_init(mux, ss_copy, us->us_content_name) < 0)
+          run = 0;
+        streaming_start_unref(ss_copy);
+
+        started = 1;
+      } else if(muxer_reconfigure(mux, sm->sm_data) < 0) {
+        tvhwarn(LS_UDP,  "Unable to reconfigure stream %s", us->us_udp_url);
+      }
+      break;
+
+    case SMT_STOP:
+      if((mux->m_caps & MC_CAP_ANOTHER_SERVICE) != 0) /* give a chance to use another svc */
+        break;
+      if(sm->sm_code != SM_CODE_SOURCE_RECONFIGURED) {
+        tvhwarn(LS_UDP,  "Stop streaming %s, %s", us->us_udp_url, 
+                streaming_code2txt(sm->sm_code));
+        run = 0;
+      }
+      break;
+
+    case SMT_SERVICE_STATUS:
+    case SMT_SIGNAL_STATUS:
+    case SMT_DESCRAMBLE_INFO:
+      if((!started && mclk() - lastpkt > sec2mono(grace)) ||
+                 (started && ptimeout > 0 && mclk() - lastpkt > sec2mono(ptimeout))) {
+        tvhwarn(LS_UDP,  "Stop streaming %s, timeout waiting for packets", us->us_udp_url);
+        run = 0;
+      }
+      break;
+
+    case SMT_NOSTART_WARN:
+    case SMT_SKIP:
+    case SMT_SPEED:
+    case SMT_TIMESHIFT_STATUS:
+      break;
+
+    case SMT_NOSTART:
+      tvhwarn(LS_UDP,  "Couldn't start streaming %s, %s",
+              us->us_udp_url, streaming_code2txt(sm->sm_code));
+      run = 0;
+      break;
+
+    case SMT_EXIT:
+      tvhwarn(LS_UDP,  "Stop streaming %s, %s", us->us_udp_url,
+              streaming_code2txt(sm->sm_code));
+      run = 0;
+      break;
+    }
+
+    streaming_msg_free(sm);
+
+    if(mux->m_errors) {
+      if (!mux->m_eos)
+        tvhwarn(LS_UDP,  "Stop streaming %s, muxer reported errors", us->us_udp_url);
+      run = 0;
+    }
+  }
+
+  if(started)
+    muxer_close(mux);
+  atomic_set(&us->us_running, 0);
+  tvh_mutex_lock(us->us_global_lock);
+  subscription_unsubscribe(us->us_subscript, UNSUBSCRIBE_FINAL);
+  profile_chain_close(&us->us_prch);
+  tvh_mutex_unlock(us->us_global_lock);
+  udp_close(us->us_uc);
+  free(us->us_content_name);
+  delete_udp_stream(us); 
+  return NULL;  
+}
+
+int
+udp_stream_run (udp_stream_t *us)
+{
+  if (atomic_get(&us->us_running)) {
+    tvhwarn(LS_UDP,  "UDP stream %s is already running", us->us_udp_url);
+    return -1;
+  }
+  atomic_set(&us->us_running, 1);
+  return tvh_thread_create(&us->us_tid, NULL, udp_stream_thread, (void *)us, us->us_udp_url);
+}
+
+int
+udp_stream_shutdown (udp_stream_t *us)
+{
+  if (!atomic_get(&us->us_running)) {
+    tvhwarn(LS_UDP,  "UDP stream %s is not currently running", us->us_udp_url);
+    return -1;
+  }
+  atomic_set(&us->us_running, 0);
+  return pthread_join(us->us_tid, NULL);
+}
\ No newline at end of file
diff --git a/src/udp_stream.h b/src/udp_stream.h
new file mode 100755 (executable)
index 0000000..64bbea0
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ *  tvheadend, UDP stream interface
+ *  Copyright (C) 2019 Stephane Duperron
+ *
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef UDP_STREAM_H_
+#define UDP_STREAM_H_
+
+#include "udp.h"
+#include "subscriptions.h"
+#include "profile.h"
+
+typedef struct udp_stream {
+  LIST_ENTRY(udp_stream) us_link;
+  char *us_hint;
+  char *us_udp_url;
+  udp_connection_t *us_uc;
+  profile_chain_t us_prch;
+  th_subscription_t *us_subscript;
+  char *us_content_name;
+  pthread_t us_tid;
+  int us_running;
+  tvh_mutex_t* us_global_lock;
+} udp_stream_t;
+
+LIST_HEAD(udp_stream_list, udp_stream);
+
+udp_stream_t *
+create_udp_stream ( udp_connection_t *uc, const char *hint );
+
+void
+delete_udp_stream (udp_stream_t * ustream);
+
+udp_stream_t *
+find_udp_stream_by_hint ( const char *hint );
+
+udp_stream_t *
+find_udp_stream_by_url ( const char *url );
+
+int
+udp_stream_run (udp_stream_t *us);
+
+int
+udp_stream_shutdown (udp_stream_t *us);
+
+#endif /* UDP_STREAM_H_ */
old mode 100644 (file)
new mode 100755 (executable)
index 3e63aea..8f5010a
@@ -24,6 +24,7 @@
 #include "config.h"
 #include "http.h"
 #include "tcp.h"
+#include "udp_stream.h"
 #include "webui.h"
 #include "dvr/dvr.h"
 #include "filebundle.h"
@@ -1182,6 +1183,117 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight)
   return res;
 }
 
+static int
+udp_stream_service(http_connection_t *hc, service_t *service, int weight)
+{
+  th_subscription_t *s;
+  udp_connection_t *uc;
+  profile_t *pro;
+  muxer_hints_t *hints;
+  const char *str;
+  size_t qsize;
+  const char *address;
+  int port;
+  int res = HTTP_STATUS_SERVICE;
+  int flags, eflags = 0;
+  udp_stream_t *ustream;
+  size_t unlen;
+  char *stop_url;  
+  int pos;
+
+  if ((str = http_arg_get(&hc->hc_req_args, "port"))) {
+    port = atol(str);
+  } else {
+    tvhwarn(LS_WEBUI, "No port supplied in udp stream request");
+    return res;
+  }
+  if (!(address = http_arg_get(&hc->hc_req_args, "address"))) {
+    tvhwarn(LS_WEBUI, "No address supplied in udp stream request");
+    return res;
+  }
+
+  unlen = strlen(str) + strlen(address) + 2;
+  hc->hc_username = malloc(unlen);
+  snprintf(hc->hc_username, unlen, "%s:%s", address, str);
+
+  if (!(uc = udp_bind(LS_UDP, "udp_streamer",
+                       address, port, NULL,
+                       NULL, 1024, 188*7))) {
+    tvhwarn(LS_WEBUI, "Could not create and bind udp socket");
+    return res; 
+  }  
+
+  if (udp_connect (uc, "udp_streamer", address, port)) {
+    tvhwarn(LS_WEBUI, "Could not connect udp socket");
+    return res;
+  }  
+
+  if ((str = http_arg_get(&hc->hc_req_args, "descramble")))
+    if (strcmp(str, "0") == 0)
+      eflags |= SUBSCRIPTION_NODESCR;
+
+  if ((str = http_arg_get(&hc->hc_req_args, "emm")))
+    if (strcmp(str, "1") == 0)
+      eflags |= SUBSCRIPTION_EMM;
+
+  flags = SUBSCRIPTION_MPEGTS | eflags;
+  if ((eflags & SUBSCRIPTION_NODESCR) == 0)
+    flags |= SUBSCRIPTION_PACKET;
+  if(!(pro = profile_find_by_list(hc->hc_access->aa_profiles,
+                                  http_arg_get(&hc->hc_req_args, "profile"),
+                                  "service", flags))) {
+    udp_close(uc);
+    return HTTP_STATUS_NOT_ALLOWED;
+  }
+
+  stop_url = strdup(hc->hc_url_orig);
+  unlen = strlen(hc->hc_url_orig) - 1;
+  str = strstr(hc->hc_url_orig, "start");
+  pos = str - hc->hc_url_orig;
+  if (str && (pos > 0))
+    snprintf(&stop_url[pos], unlen-pos+1, "stop%s", &str[5]);
+
+  ustream = create_udp_stream(uc, stop_url);
+  free(stop_url);
+  if (!ustream) {
+    udp_close(uc);
+    return HTTP_STATUS_NOT_ALLOWED;    
+  }
+
+  if ((str = http_arg_get(&hc->hc_req_args, "qsize")))
+    qsize = atoll(str);
+  else
+    qsize = 1500000;
+
+  hints = muxer_hints_create(http_arg_get(&hc->hc_args, "User-Agent"));
+
+  profile_chain_init(&ustream->us_prch, pro, service, 1);
+  if (!profile_chain_open(&ustream->us_prch, NULL, hints, 0, qsize)) {
+
+    s = subscription_create_from_service(&ustream->us_prch, NULL, weight ?: 100, "UDP",
+                                         ustream->us_prch.prch_flags | SUBSCRIPTION_STREAMING |
+                                           eflags,
+                                         address,
+                                        http_username(hc),
+                                        http_arg_get(&hc->hc_args, "User-Agent"),
+                             NULL);
+    if(s) {
+      ustream->us_content_name = strdup(service->s_nicename);
+      ustream->us_subscript = s;
+      ustream->us_global_lock = &global_lock;
+      udp_stream_run(ustream);
+      http_output_html(hc);
+      close(hc->hc_fd);
+      return 0;
+    }
+  }
+
+  profile_chain_close(&ustream->us_prch);
+  udp_close(uc);
+  delete_udp_stream(ustream);
+  return res;
+}
+
 /**
  * Subscribe to a mux for grabbing a raw dump
  *
@@ -1325,17 +1437,107 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight)
   return res;
 }
 
+static int
+udp_stream_channel(http_connection_t *hc, channel_t *ch, int weight)
+{
+  th_subscription_t *s;
+  udp_connection_t *uc;
+  profile_t *pro;
+   muxer_hints_t *hints;
+  const char *str;
+  size_t qsize;
+  const char *address;
+  int port;
+  int res = HTTP_STATUS_SERVICE;
+  udp_stream_t *ustream;
+  size_t unlen;
+  char *stop_url; 
+  int pos;
+
+  if ((str = http_arg_get(&hc->hc_req_args, "port"))) {
+    port = atol(str);
+  } else {
+    tvhwarn(LS_WEBUI, "No port supplied in udp stream request");
+    return res;
+  }
+  if (!(address = http_arg_get(&hc->hc_req_args, "address"))) {
+    tvhwarn(LS_WEBUI, "No address supplied in udp stream request");
+    return res;
+  }
+
+  unlen = strlen(str) + strlen(address) + 2;
+  hc->hc_username = malloc(unlen);
+  snprintf(hc->hc_username, unlen, "%s:%s", address, str);
+
+  if (!(uc = udp_bind(LS_UDP, "udp_streamer",
+                       address, port, NULL,
+                       NULL, 1024, 188*7))) {
+    tvhwarn(LS_WEBUI, "Could not create and bind udp socket");
+    return res; 
+  }  
+
+  if (udp_connect (uc, "udp_streamer", address, port)) {
+    tvhwarn(LS_WEBUI, "Could not connect udp socket");
+    return res;
+  }  
+
+  if(!(pro = profile_find_by_list(hc->hc_access->aa_profiles,
+                                  http_arg_get(&hc->hc_req_args, "profile"),
+                                  "channel", 
+                                  SUBSCRIPTION_PACKET | SUBSCRIPTION_MPEGTS))) {
+    udp_close(uc);
+    return HTTP_STATUS_NOT_ALLOWED;
+  }  
+
+  stop_url = strdup(hc->hc_url_orig);
+  unlen = strlen(hc->hc_url_orig) - 1;
+  str = strstr(hc->hc_url_orig, "start");
+  pos = str - hc->hc_url_orig;
+  if (str && (pos > 0))
+    snprintf(&stop_url[pos], unlen-pos+1, "stop%s", &str[5]);
+
+  ustream = create_udp_stream(uc, stop_url);
+  free(stop_url);
+  if (!ustream) {
+    udp_close(uc);
+    return HTTP_STATUS_NOT_ALLOWED;    
+  }
+
+  if ((str = http_arg_get(&hc->hc_req_args, "qsize")))
+    qsize = atoll(str);
+  else
+    qsize = 1500000;
+
+ hints = muxer_hints_create(http_arg_get(&hc->hc_args, "User-Agent"));
+
+  profile_chain_init(&ustream->us_prch, pro, ch, 1);
+  if (!profile_chain_open(&ustream->us_prch, NULL, hints, 0, qsize)) {
+
+    s = subscription_create_from_channel(&ustream->us_prch, NULL, weight ?: 100, "UDP",
+                                         ustream->us_prch.prch_flags | SUBSCRIPTION_STREAMING,
+                                         address,
+                                        http_username(hc),
+                                        http_arg_get(&hc->hc_args, "User-Agent"),
+                             NULL);
+    if(s) {
+      ustream->us_content_name = strdup(channel_get_name(ch, channel_blank_name));
+      ustream->us_subscript = s;
+      ustream->us_global_lock = &global_lock;
+      udp_stream_run(ustream);
+      http_output_html(hc);
+      close(hc->hc_fd);
+      return 0;
+    }
+  }
+
+  profile_chain_close(&ustream->us_prch);
+  udp_close(uc);
+  delete_udp_stream(ustream);
+  return res;
+}
 
-/**
- * Handle the http request. http://tvheadend/stream/channelid/<chid>
- *                          http://tvheadend/stream/channel/<uuid>
- *                          http://tvheadend/stream/channelnumber/<channelnumber>
- *                          http://tvheadend/stream/channelname/<channelname>
- *                          http://tvheadend/stream/service/<servicename>
- *                          http://tvheadend/stream/mux/<muxid>
- */
 static int
-http_stream(http_connection_t *hc, const char *remain, void *opaque)
+do_stream(http_connection_t *hc, const char *remain, void *opaque, int isUdp)
 {
   char *components[2];
   channel_t *ch = NULL;
@@ -1379,9 +1581,9 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque)
   }
 
   if(ch != NULL) {
-    r = http_stream_channel(hc, ch, weight);
+    r = isUdp ? udp_stream_channel(hc, ch, weight) : http_stream_channel(hc, ch, weight);
   } else if(service != NULL) {
-    r = http_stream_service(hc, service, weight);
+    r = isUdp ? udp_stream_service(hc, service, weight) : http_stream_service(hc, service, weight);
 #if ENABLE_MPEGTS
   } else if(mm != NULL) {
     r = http_stream_mux(hc, mm, weight);
@@ -1394,6 +1596,66 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque)
   return r;
 }
 
+/**
+ * Handle the http request. http://tvheadend/stream/channelid/<chid>
+ *                          http://tvheadend/stream/channel/<uuid>
+ *                          http://tvheadend/stream/channelnumber/<channelnumber>
+ *                          http://tvheadend/stream/channelname/<channelname>
+ *                          http://tvheadend/stream/service/<servicename>
+ *                          http://tvheadend/stream/mux/<muxid>
+ */
+static int
+http_stream(http_connection_t *hc, const char *remain, void *opaque) {
+  return do_stream(hc, remain, opaque, 0);
+}
+
+/**
+ * Handle the http request. http://tvheadend/udpstream/start/channelid/<chid>?address=<destaddr>&port=<udpport>
+ *                          http://tvheadend/udpstream/start/channel/<uuid>?address=<destaddr>&port=<udpport>
+ *                          http://tvheadend/udpstream/start/channelnumber/<channelnumber>?address=<destaddr>&port=<udpport>
+ *                          http://tvheadend/udpstream/start/channelname/<channelname>?address=<destaddr>&port=<udpport>
+ *                          http://tvheadend/udpstream/start/service/<servicename>?address=<destaddr>&port=<udpport>
+ */
+static int
+start_udp_stream(http_connection_t *hc, const char *remain, void *opaque) {
+  return do_stream(hc, remain, opaque, 1);
+}
+
+/**
+ * Handle the http request. http://tvheadend/udpstream/stop/channelid/<chid>?address=<destaddr>&port=<udpport>
+ *                          http://tvheadend/udpstream/stop/channel/<uuid>?address=<destaddr>&port=<udpport>
+ *                          http://tvheadend/udpstream/stop/channelnumber/<channelnumber>?address=<destaddr>&port=<udpport>
+ *                          http://tvheadend/udpstream/stop/channelname/<channelname>?address=<destaddr>&port=<udpport>
+ *                          http://tvheadend/udpstream/stop/service/<servicename>?address=<destaddr>&port=<udpport>
+ */
+static int
+stop_udp_stream(http_connection_t *hc, const char *remain, void *opaque) {
+  char *components[2];
+  udp_stream_t *us;
+
+  hc->hc_keep_alive = 0;
+
+  if(remain == NULL)
+    return HTTP_STATUS_BAD_REQUEST;
+
+  if(http_tokenize((char *)remain, components, 2, '/') != 2)
+    return HTTP_STATUS_BAD_REQUEST;
+
+  http_deescape(components[1]);
+
+  us = find_udp_stream_by_hint(hc->hc_url_orig);
+  if (us) {
+    tvhdebug(LS_WEBUI, "Stop UDP stream %s", us->us_udp_url);
+    udp_stream_shutdown(us);
+  } else {
+    tvhwarn(LS_WEBUI,  "UDP stream not found (stop request %s)", hc->hc_url_orig);
+  }
+
+  http_output_html(hc);
+  close(hc->hc_fd);
+  return 0;
+}
+
 /**
  * Generate a xspf playlist
  * http://en.wikipedia.org/wiki/XML_Shareable_Playlist_Format
@@ -2149,6 +2411,8 @@ webui_init(int xspf)
   http_path_add("/state", NULL, page_statedump, ACCESS_ADMIN);
 
   http_path_add("/stream",  NULL, http_stream,  ACCESS_ANONYMOUS);
+  http_path_add("/udpstream/start",  NULL, start_udp_stream,  ACCESS_ANONYMOUS);
+  http_path_add("/udpstream/stop",  NULL, stop_udp_stream,  ACCESS_ANONYMOUS);
 
   http_path_add("/imagecache", NULL, page_imagecache, ACCESS_ANONYMOUS);
 
index 09ba146a883dd3234ea27ebded6fad9f32259930..1f93b22c5451e3ee368a0119a9db95932f72edaa 100644 (file)
@@ -87,6 +87,32 @@ tvh_write(int fd, const void *buf, size_t len)
   return len ? 1 : 0;
 }
 
+int
+tvh_write_in_chunks(int fd, const void *buf, size_t len, size_t chunkSize)
+{
+  int64_t limit = mclk() + sec2mono(25);
+  ssize_t c;
+  ssize_t towrite;
+
+  while (len) {
+    towrite = (len > chunkSize) ? chunkSize : len;
+    c = write(fd, buf, towrite);
+    if (c < 0) {
+      if (ERRNO_AGAIN(errno)) {
+        if (mclk() > limit)
+          break;
+        tvh_safe_usleep(100);
+        continue;
+      }
+      break;
+    }
+    len -= c;
+    buf += c;
+  }
+
+  return len ? 1 : 0;
+}
+
 int
 tvh_nonblock_write(int fd, const void *buf, size_t len)
 {