From: innes-labs Date: Thu, 5 Sep 2019 14:19:00 +0000 (+0200) Subject: output: UDP streaming X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5f9404117f59ad1f5aa7ca542ce39d9e064e8209;p=thirdparty%2Ftvheadend.git output: UDP streaming --- diff --git a/Makefile b/Makefile index 4e5a947a3..1f1c2e36d 100644 --- a/Makefile +++ b/Makefile @@ -226,6 +226,7 @@ SRCS-1 = \ src/access.c \ src/tcp.c \ src/udp.c \ + src/udp_stream.c \ src/url.c \ src/http.c \ src/notify.c \ diff --git a/src/muxer.h b/src/muxer.h index d20af8bdd..e9f2bc22f 100644 --- a/src/muxer.h +++ b/src/muxer.h @@ -64,6 +64,7 @@ typedef struct muxer_config { */ int m_file_permissions; int m_directory_permissions; + int m_output_chunk; /* > 0 if muxer output needs writing in chunks */ /* * type specific section diff --git a/src/muxer/muxer_pass.c b/src/muxer/muxer_pass.c index aa9a0eea6..95ba86e52 100644 --- a/src/muxer/muxer_pass.c +++ b/src/muxer/muxer_pass.c @@ -583,10 +583,20 @@ static void pass_muxer_write(muxer_t *m, const void *data, size_t size) { pass_muxer_t *pm = (pass_muxer_t*)m; + int ret; if(pm->pm_error) { pm->m_errors++; - } else if(tvh_write(pm->pm_fd, data, size)) { + return; + } + + if (pm->m_config.m_output_chunk > 0) { + ret = tvh_write_in_chunks(pm->pm_fd, data, size, pm->m_config.m_output_chunk); + } else { + ret = tvh_write(pm->pm_fd, data, size); + } + + if(ret) { pm->pm_error = errno; if (!MC_IS_EOS_ERROR(errno)) tvherror(LS_PASS, "%s: Write failed -- %s", pm->pm_filename, diff --git a/src/tvheadend.h b/src/tvheadend.h index 072391d3e..8a7965235 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -245,6 +245,8 @@ void tvh_pipe_close(th_pipe_t *pipe); int tvh_write(int fd, const void *buf, size_t len); +int tvh_write_in_chunks(int fd, const void *buf, size_t len, size_t chunkSize); + int tvh_nonblock_write(int fd, const void *buf, size_t len); FILE *tvh_fopen(const char *filename, const char *mode); diff --git a/src/tvhlog.c b/src/tvhlog.c index 5e025cec4..ec0eba874 100644 --- a/src/tvhlog.c +++ b/src/tvhlog.c @@ -184,6 +184,7 @@ tvhlog_subsys_t tvhlog_subsystems[] = { #if ENABLE_DDCI [LS_DDCI] = { "ddci", N_("DD-CI") }, #endif + [LS_UDP] = { "udp", N_("UDP Streamer") }, }; diff --git a/src/tvhlog.h b/src/tvhlog.h index c891b826b..3b7a76fb1 100644 --- a/src/tvhlog.h +++ b/src/tvhlog.h @@ -198,6 +198,7 @@ enum { #if ENABLE_DDCI LS_DDCI, #endif + LS_UDP, LS_LAST /* keep this last */ }; diff --git a/src/udp.c b/src/udp.c index 192877069..b88734011 100644 --- a/src/udp.c +++ b/src/udp.c @@ -168,7 +168,8 @@ udp_bind ( int subsystem, const char *name, uc->ifname = ifname ? strdup(ifname) : NULL; uc->subsystem = subsystem; uc->name = name ? strdup(name) : NULL; - uc->rxtxsize = rxsize; + uc->rxsize = rxsize; + uc->txsize = txsize; if (udp_resolve(uc, &uc->ip, uc->host, port, &uc->multicast, 1)) { udp_close(uc); @@ -203,13 +204,15 @@ udp_bind ( int subsystem, const char *name, /* IPv4 */ if (uc->ip.ss_family == AF_INET) { - /* Bind */ + /* Bind useful for receiver subsystem (not for udp streamer) */ + if (subsystem != LS_UDP) { if (bind(fd, (struct sockaddr *)&uc->ip, sizeof(struct sockaddr_in))) { inet_ntop(AF_INET, &IP_AS_V4(uc->ip, addr), buf, sizeof(buf)); tvherror(subsystem, "%s - cannot bind %s:%hu [e=%s]", name, buf, ntohs(IP_AS_V4(uc->ip, port)), strerror(errno)); goto error; } + } if (uc->multicast) { /* Join multicast group */ @@ -381,7 +384,7 @@ udp_sendinit ( int subsystem, const char *name, uc->ifname = ifname ? strdup(ifname) : NULL; uc->subsystem = subsystem; uc->name = name ? strdup(name) : NULL; - uc->rxtxsize = txsize; + uc->txsize = txsize; /* Open socket */ if ((fd = tvh_socket(uc->ip.ss_family, SOCK_DGRAM, 0)) == -1) { diff --git a/src/udp.h b/src/udp.h index 7e810727c..55b6dfad3 100644 --- a/src/udp.h +++ b/src/udp.h @@ -38,7 +38,8 @@ typedef struct udp_connection { int fd; int subsystem; char *name; - int rxtxsize; + int rxsize; + int txsize; } udp_connection_t; udp_connection_t * diff --git a/src/udp_stream.c b/src/udp_stream.c new file mode 100755 index 000000000..48842a992 --- /dev/null +++ b/src/udp_stream.c @@ -0,0 +1,266 @@ +/* + * TVHeadend - UDP stream common routines + * + * Copyright (C) 2019 Stephane Duperron + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#define _GNU_SOURCE +#include "tvheadend.h" +#include "config.h" +#include "udp_stream.h" + +static tvh_mutex_t udp_streams_mutex = TVH_THREAD_MUTEX_INITIALIZER; +static struct udp_stream_list udp_streams; + +udp_stream_t * +create_udp_stream ( udp_connection_t *uc, const char *hint ) +{ + udp_stream_t *ustream; + char port_buf[6]; + size_t unlen; + + ustream = calloc(1,sizeof(udp_stream_t)); + if (!ustream) { + return NULL; + } + + ustream->us_uc = uc; + ustream->us_hint = strdup(hint); + snprintf(port_buf, 6, "%d", uc->port); + unlen = strlen(uc->host) + strlen(port_buf) + 8; + ustream->us_udp_url = malloc(unlen); + snprintf(ustream->us_udp_url, unlen, "udp://%s:%s", uc->host, port_buf); + + tvh_mutex_lock(&udp_streams_mutex); + LIST_INSERT_HEAD(&udp_streams, ustream, us_link); + tvh_mutex_unlock(&udp_streams_mutex); + return ustream; +} + +void +delete_udp_stream (udp_stream_t * ustream) +{ + tvh_mutex_lock(&udp_streams_mutex); + LIST_REMOVE(ustream, us_link); + tvh_mutex_unlock(&udp_streams_mutex); + free(ustream->us_hint); + free(ustream->us_udp_url); + free(ustream); +} + +udp_stream_t * +find_udp_stream_by_hint ( const char *hint ) +{ + udp_stream_t *us; + int found = 0; + + tvh_mutex_lock(&udp_streams_mutex); + LIST_FOREACH(us, &udp_streams, us_link) + if(strcmp(us->us_hint, hint) == 0) { + found = 1; + break; + } + tvh_mutex_unlock(&udp_streams_mutex); + + if (found) { + return us; + } else { + return NULL; + } +} + +udp_stream_t * +find_udp_stream_by_url ( const char *url ) +{ + udp_stream_t *us; + int found = 0; + + tvh_mutex_lock(&udp_streams_mutex); + LIST_FOREACH(us, &udp_streams, us_link) + if(strcmp(us->us_udp_url, url) == 0) { + found = 1; + break; + } + tvh_mutex_unlock(&udp_streams_mutex); + + if (found) { + return us; + } else { + return NULL; + } +} + +static void * +udp_stream_thread ( void *aux ) +{ + udp_stream_t *us = (udp_stream_t *)aux; + profile_chain_t *prch = &(us->us_prch); + streaming_message_t *sm; + int run = 1, started = 0; + streaming_queue_t *sq = &prch->prch_sq; + muxer_t *mux = prch->prch_muxer; + int ptimeout, grace = 20, r; + streaming_start_t *ss_copy; + int64_t lastpkt, mono; + int fd = us->us_uc->fd; + + if(muxer_open_stream(mux, fd)) + run = 0; + mux->m_config.m_output_chunk = us->us_uc->txsize; + + if (config.dscp >= 0) + socket_set_dscp(fd, config.dscp, NULL, 0); + + lastpkt = mclk(); + ptimeout = prch->prch_pro ? prch->prch_pro->pro_timeout : 5; + + while(atomic_get(&us->us_running) && run && tvheadend_is_running()) { + tvh_mutex_lock(&sq->sq_mutex); + sm = TAILQ_FIRST(&sq->sq_queue); + if(sm == NULL) { + mono = mclk() + sec2mono(1); + do { + r = tvh_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, mono); + if (r == ETIMEDOUT) { + break; + } + } while (ERRNO_AGAIN(r)); + tvh_mutex_unlock(&sq->sq_mutex); + continue; + } + + streaming_queue_remove(sq, sm); + tvh_mutex_unlock(&sq->sq_mutex); + + switch(sm->sm_type) { + case SMT_MPEGTS: + case SMT_PACKET: + if(started) { + pktbuf_t *pb; + int len; + if (sm->sm_type == SMT_PACKET) + pb = ((th_pkt_t*)sm->sm_data)->pkt_payload; + else + pb = sm->sm_data; + subscription_add_bytes_out(us->us_subscript, len = pktbuf_len(pb)); + if (len > 0) + lastpkt = mclk(); + muxer_write_pkt(mux, sm->sm_type, sm->sm_data); + sm->sm_data = NULL; + } + break; + + case SMT_GRACE: + grace = sm->sm_code < 5 ? 5 : grace; + break; + + case SMT_START: + grace = 10; + if(!started) { + tvhdebug(LS_UDP, "Start streaming %s", us->us_udp_url); + ss_copy = streaming_start_copy((streaming_start_t *)sm->sm_data); + if(muxer_init(mux, ss_copy, us->us_content_name) < 0) + run = 0; + streaming_start_unref(ss_copy); + + started = 1; + } else if(muxer_reconfigure(mux, sm->sm_data) < 0) { + tvhwarn(LS_UDP, "Unable to reconfigure stream %s", us->us_udp_url); + } + break; + + case SMT_STOP: + if((mux->m_caps & MC_CAP_ANOTHER_SERVICE) != 0) /* give a chance to use another svc */ + break; + if(sm->sm_code != SM_CODE_SOURCE_RECONFIGURED) { + tvhwarn(LS_UDP, "Stop streaming %s, %s", us->us_udp_url, + streaming_code2txt(sm->sm_code)); + run = 0; + } + break; + + case SMT_SERVICE_STATUS: + case SMT_SIGNAL_STATUS: + case SMT_DESCRAMBLE_INFO: + if((!started && mclk() - lastpkt > sec2mono(grace)) || + (started && ptimeout > 0 && mclk() - lastpkt > sec2mono(ptimeout))) { + tvhwarn(LS_UDP, "Stop streaming %s, timeout waiting for packets", us->us_udp_url); + run = 0; + } + break; + + case SMT_NOSTART_WARN: + case SMT_SKIP: + case SMT_SPEED: + case SMT_TIMESHIFT_STATUS: + break; + + case SMT_NOSTART: + tvhwarn(LS_UDP, "Couldn't start streaming %s, %s", + us->us_udp_url, streaming_code2txt(sm->sm_code)); + run = 0; + break; + + case SMT_EXIT: + tvhwarn(LS_UDP, "Stop streaming %s, %s", us->us_udp_url, + streaming_code2txt(sm->sm_code)); + run = 0; + break; + } + + streaming_msg_free(sm); + + if(mux->m_errors) { + if (!mux->m_eos) + tvhwarn(LS_UDP, "Stop streaming %s, muxer reported errors", us->us_udp_url); + run = 0; + } + } + + if(started) + muxer_close(mux); + atomic_set(&us->us_running, 0); + tvh_mutex_lock(us->us_global_lock); + subscription_unsubscribe(us->us_subscript, UNSUBSCRIBE_FINAL); + profile_chain_close(&us->us_prch); + tvh_mutex_unlock(us->us_global_lock); + udp_close(us->us_uc); + free(us->us_content_name); + delete_udp_stream(us); + return NULL; +} + +int +udp_stream_run (udp_stream_t *us) +{ + if (atomic_get(&us->us_running)) { + tvhwarn(LS_UDP, "UDP stream %s is already running", us->us_udp_url); + return -1; + } + atomic_set(&us->us_running, 1); + return tvh_thread_create(&us->us_tid, NULL, udp_stream_thread, (void *)us, us->us_udp_url); +} + +int +udp_stream_shutdown (udp_stream_t *us) +{ + if (!atomic_get(&us->us_running)) { + tvhwarn(LS_UDP, "UDP stream %s is not currently running", us->us_udp_url); + return -1; + } + atomic_set(&us->us_running, 0); + return pthread_join(us->us_tid, NULL); +} \ No newline at end of file diff --git a/src/udp_stream.h b/src/udp_stream.h new file mode 100755 index 000000000..64bbea0e2 --- /dev/null +++ b/src/udp_stream.h @@ -0,0 +1,59 @@ +/* + * tvheadend, UDP stream interface + * Copyright (C) 2019 Stephane Duperron + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef UDP_STREAM_H_ +#define UDP_STREAM_H_ + +#include "udp.h" +#include "subscriptions.h" +#include "profile.h" + +typedef struct udp_stream { + LIST_ENTRY(udp_stream) us_link; + char *us_hint; + char *us_udp_url; + udp_connection_t *us_uc; + profile_chain_t us_prch; + th_subscription_t *us_subscript; + char *us_content_name; + pthread_t us_tid; + int us_running; + tvh_mutex_t* us_global_lock; +} udp_stream_t; + +LIST_HEAD(udp_stream_list, udp_stream); + +udp_stream_t * +create_udp_stream ( udp_connection_t *uc, const char *hint ); + +void +delete_udp_stream (udp_stream_t * ustream); + +udp_stream_t * +find_udp_stream_by_hint ( const char *hint ); + +udp_stream_t * +find_udp_stream_by_url ( const char *url ); + +int +udp_stream_run (udp_stream_t *us); + +int +udp_stream_shutdown (udp_stream_t *us); + +#endif /* UDP_STREAM_H_ */ diff --git a/src/webui/webui.c b/src/webui/webui.c old mode 100644 new mode 100755 index 3e63aeacc..8f5010a41 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -24,6 +24,7 @@ #include "config.h" #include "http.h" #include "tcp.h" +#include "udp_stream.h" #include "webui.h" #include "dvr/dvr.h" #include "filebundle.h" @@ -1182,6 +1183,117 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight) return res; } +static int +udp_stream_service(http_connection_t *hc, service_t *service, int weight) +{ + th_subscription_t *s; + udp_connection_t *uc; + profile_t *pro; + muxer_hints_t *hints; + const char *str; + size_t qsize; + const char *address; + int port; + int res = HTTP_STATUS_SERVICE; + int flags, eflags = 0; + udp_stream_t *ustream; + size_t unlen; + char *stop_url; + int pos; + + if ((str = http_arg_get(&hc->hc_req_args, "port"))) { + port = atol(str); + } else { + tvhwarn(LS_WEBUI, "No port supplied in udp stream request"); + return res; + } + if (!(address = http_arg_get(&hc->hc_req_args, "address"))) { + tvhwarn(LS_WEBUI, "No address supplied in udp stream request"); + return res; + } + + unlen = strlen(str) + strlen(address) + 2; + hc->hc_username = malloc(unlen); + snprintf(hc->hc_username, unlen, "%s:%s", address, str); + + if (!(uc = udp_bind(LS_UDP, "udp_streamer", + address, port, NULL, + NULL, 1024, 188*7))) { + tvhwarn(LS_WEBUI, "Could not create and bind udp socket"); + return res; + } + + if (udp_connect (uc, "udp_streamer", address, port)) { + tvhwarn(LS_WEBUI, "Could not connect udp socket"); + return res; + } + + if ((str = http_arg_get(&hc->hc_req_args, "descramble"))) + if (strcmp(str, "0") == 0) + eflags |= SUBSCRIPTION_NODESCR; + + if ((str = http_arg_get(&hc->hc_req_args, "emm"))) + if (strcmp(str, "1") == 0) + eflags |= SUBSCRIPTION_EMM; + + flags = SUBSCRIPTION_MPEGTS | eflags; + if ((eflags & SUBSCRIPTION_NODESCR) == 0) + flags |= SUBSCRIPTION_PACKET; + if(!(pro = profile_find_by_list(hc->hc_access->aa_profiles, + http_arg_get(&hc->hc_req_args, "profile"), + "service", flags))) { + udp_close(uc); + return HTTP_STATUS_NOT_ALLOWED; + } + + stop_url = strdup(hc->hc_url_orig); + unlen = strlen(hc->hc_url_orig) - 1; + str = strstr(hc->hc_url_orig, "start"); + pos = str - hc->hc_url_orig; + if (str && (pos > 0)) + snprintf(&stop_url[pos], unlen-pos+1, "stop%s", &str[5]); + + ustream = create_udp_stream(uc, stop_url); + free(stop_url); + if (!ustream) { + udp_close(uc); + return HTTP_STATUS_NOT_ALLOWED; + } + + if ((str = http_arg_get(&hc->hc_req_args, "qsize"))) + qsize = atoll(str); + else + qsize = 1500000; + + hints = muxer_hints_create(http_arg_get(&hc->hc_args, "User-Agent")); + + profile_chain_init(&ustream->us_prch, pro, service, 1); + if (!profile_chain_open(&ustream->us_prch, NULL, hints, 0, qsize)) { + + s = subscription_create_from_service(&ustream->us_prch, NULL, weight ?: 100, "UDP", + ustream->us_prch.prch_flags | SUBSCRIPTION_STREAMING | + eflags, + address, + http_username(hc), + http_arg_get(&hc->hc_args, "User-Agent"), + NULL); + if(s) { + ustream->us_content_name = strdup(service->s_nicename); + ustream->us_subscript = s; + ustream->us_global_lock = &global_lock; + udp_stream_run(ustream); + http_output_html(hc); + close(hc->hc_fd); + return 0; + } + } + + profile_chain_close(&ustream->us_prch); + udp_close(uc); + delete_udp_stream(ustream); + return res; +} + /** * Subscribe to a mux for grabbing a raw dump * @@ -1325,17 +1437,107 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight) return res; } +static int +udp_stream_channel(http_connection_t *hc, channel_t *ch, int weight) +{ + th_subscription_t *s; + udp_connection_t *uc; + profile_t *pro; + muxer_hints_t *hints; + const char *str; + size_t qsize; + const char *address; + int port; + int res = HTTP_STATUS_SERVICE; + udp_stream_t *ustream; + size_t unlen; + char *stop_url; + int pos; + + if ((str = http_arg_get(&hc->hc_req_args, "port"))) { + port = atol(str); + } else { + tvhwarn(LS_WEBUI, "No port supplied in udp stream request"); + return res; + } + if (!(address = http_arg_get(&hc->hc_req_args, "address"))) { + tvhwarn(LS_WEBUI, "No address supplied in udp stream request"); + return res; + } + + unlen = strlen(str) + strlen(address) + 2; + hc->hc_username = malloc(unlen); + snprintf(hc->hc_username, unlen, "%s:%s", address, str); + + if (!(uc = udp_bind(LS_UDP, "udp_streamer", + address, port, NULL, + NULL, 1024, 188*7))) { + tvhwarn(LS_WEBUI, "Could not create and bind udp socket"); + return res; + } + + if (udp_connect (uc, "udp_streamer", address, port)) { + tvhwarn(LS_WEBUI, "Could not connect udp socket"); + return res; + } + + if(!(pro = profile_find_by_list(hc->hc_access->aa_profiles, + http_arg_get(&hc->hc_req_args, "profile"), + "channel", + SUBSCRIPTION_PACKET | SUBSCRIPTION_MPEGTS))) { + udp_close(uc); + return HTTP_STATUS_NOT_ALLOWED; + } + + stop_url = strdup(hc->hc_url_orig); + unlen = strlen(hc->hc_url_orig) - 1; + str = strstr(hc->hc_url_orig, "start"); + pos = str - hc->hc_url_orig; + if (str && (pos > 0)) + snprintf(&stop_url[pos], unlen-pos+1, "stop%s", &str[5]); + + ustream = create_udp_stream(uc, stop_url); + free(stop_url); + if (!ustream) { + udp_close(uc); + return HTTP_STATUS_NOT_ALLOWED; + } + + if ((str = http_arg_get(&hc->hc_req_args, "qsize"))) + qsize = atoll(str); + else + qsize = 1500000; + + hints = muxer_hints_create(http_arg_get(&hc->hc_args, "User-Agent")); + + profile_chain_init(&ustream->us_prch, pro, ch, 1); + if (!profile_chain_open(&ustream->us_prch, NULL, hints, 0, qsize)) { + + s = subscription_create_from_channel(&ustream->us_prch, NULL, weight ?: 100, "UDP", + ustream->us_prch.prch_flags | SUBSCRIPTION_STREAMING, + address, + http_username(hc), + http_arg_get(&hc->hc_args, "User-Agent"), + NULL); + if(s) { + ustream->us_content_name = strdup(channel_get_name(ch, channel_blank_name)); + ustream->us_subscript = s; + ustream->us_global_lock = &global_lock; + udp_stream_run(ustream); + http_output_html(hc); + close(hc->hc_fd); + return 0; + } + } + + profile_chain_close(&ustream->us_prch); + udp_close(uc); + delete_udp_stream(ustream); + return res; +} -/** - * Handle the http request. http://tvheadend/stream/channelid/ - * http://tvheadend/stream/channel/ - * http://tvheadend/stream/channelnumber/ - * http://tvheadend/stream/channelname/ - * http://tvheadend/stream/service/ - * http://tvheadend/stream/mux/ - */ static int -http_stream(http_connection_t *hc, const char *remain, void *opaque) +do_stream(http_connection_t *hc, const char *remain, void *opaque, int isUdp) { char *components[2]; channel_t *ch = NULL; @@ -1379,9 +1581,9 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque) } if(ch != NULL) { - r = http_stream_channel(hc, ch, weight); + r = isUdp ? udp_stream_channel(hc, ch, weight) : http_stream_channel(hc, ch, weight); } else if(service != NULL) { - r = http_stream_service(hc, service, weight); + r = isUdp ? udp_stream_service(hc, service, weight) : http_stream_service(hc, service, weight); #if ENABLE_MPEGTS } else if(mm != NULL) { r = http_stream_mux(hc, mm, weight); @@ -1394,6 +1596,66 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque) return r; } +/** + * Handle the http request. http://tvheadend/stream/channelid/ + * http://tvheadend/stream/channel/ + * http://tvheadend/stream/channelnumber/ + * http://tvheadend/stream/channelname/ + * http://tvheadend/stream/service/ + * http://tvheadend/stream/mux/ + */ +static int +http_stream(http_connection_t *hc, const char *remain, void *opaque) { + return do_stream(hc, remain, opaque, 0); +} + +/** + * Handle the http request. http://tvheadend/udpstream/start/channelid/?address=&port= + * http://tvheadend/udpstream/start/channel/?address=&port= + * http://tvheadend/udpstream/start/channelnumber/?address=&port= + * http://tvheadend/udpstream/start/channelname/?address=&port= + * http://tvheadend/udpstream/start/service/?address=&port= + */ +static int +start_udp_stream(http_connection_t *hc, const char *remain, void *opaque) { + return do_stream(hc, remain, opaque, 1); +} + +/** + * Handle the http request. http://tvheadend/udpstream/stop/channelid/?address=&port= + * http://tvheadend/udpstream/stop/channel/?address=&port= + * http://tvheadend/udpstream/stop/channelnumber/?address=&port= + * http://tvheadend/udpstream/stop/channelname/?address=&port= + * http://tvheadend/udpstream/stop/service/?address=&port= + */ +static int +stop_udp_stream(http_connection_t *hc, const char *remain, void *opaque) { + char *components[2]; + udp_stream_t *us; + + hc->hc_keep_alive = 0; + + if(remain == NULL) + return HTTP_STATUS_BAD_REQUEST; + + if(http_tokenize((char *)remain, components, 2, '/') != 2) + return HTTP_STATUS_BAD_REQUEST; + + http_deescape(components[1]); + + us = find_udp_stream_by_hint(hc->hc_url_orig); + if (us) { + tvhdebug(LS_WEBUI, "Stop UDP stream %s", us->us_udp_url); + udp_stream_shutdown(us); + } else { + tvhwarn(LS_WEBUI, "UDP stream not found (stop request %s)", hc->hc_url_orig); + } + + http_output_html(hc); + close(hc->hc_fd); + return 0; +} + /** * Generate a xspf playlist * http://en.wikipedia.org/wiki/XML_Shareable_Playlist_Format @@ -2149,6 +2411,8 @@ webui_init(int xspf) http_path_add("/state", NULL, page_statedump, ACCESS_ADMIN); http_path_add("/stream", NULL, http_stream, ACCESS_ANONYMOUS); + http_path_add("/udpstream/start", NULL, start_udp_stream, ACCESS_ANONYMOUS); + http_path_add("/udpstream/stop", NULL, stop_udp_stream, ACCESS_ANONYMOUS); http_path_add("/imagecache", NULL, page_imagecache, ACCESS_ANONYMOUS); diff --git a/src/wrappers.c b/src/wrappers.c index 09ba146a8..1f93b22c5 100644 --- a/src/wrappers.c +++ b/src/wrappers.c @@ -87,6 +87,32 @@ tvh_write(int fd, const void *buf, size_t len) return len ? 1 : 0; } +int +tvh_write_in_chunks(int fd, const void *buf, size_t len, size_t chunkSize) +{ + int64_t limit = mclk() + sec2mono(25); + ssize_t c; + ssize_t towrite; + + while (len) { + towrite = (len > chunkSize) ? chunkSize : len; + c = write(fd, buf, towrite); + if (c < 0) { + if (ERRNO_AGAIN(errno)) { + if (mclk() > limit) + break; + tvh_safe_usleep(100); + continue; + } + break; + } + len -= c; + buf += c; + } + + return len ? 1 : 0; +} + int tvh_nonblock_write(int fd, const void *buf, size_t len) {