From 78e4ddf86e3a7e75ef9f4218653993aa639b291c Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Tue, 24 Feb 2015 15:27:51 +0100 Subject: [PATCH] SAT>IP server: more work, add subscriptions and add RTP and RTCP threads --- Makefile | 3 +- src/satip/rtp.c | 526 +++++++++++++++++++++++++++++++++++++++++++++ src/satip/rtsp.c | 161 +++++++++++--- src/satip/server.c | 12 -- src/satip/server.h | 20 ++ src/udp.c | 10 +- src/udp.h | 2 + 7 files changed, 692 insertions(+), 42 deletions(-) create mode 100644 src/satip/rtp.c diff --git a/Makefile b/Makefile index 55dbf63f6..3e9e6a52a 100644 --- a/Makefile +++ b/Makefile @@ -162,7 +162,8 @@ SRCS-${CONFIG_UPNP} += \ # SATIP Server SRCS-${CONFIG_SATIP_SERVER} += \ src/satip/server.c \ - src/satip/rtsp.c + src/satip/rtsp.c \ + src/satip/rtp.c SRCS += \ src/api.c \ diff --git a/src/satip/rtp.c b/src/satip/rtp.c new file mode 100644 index 000000000..7e7767135 --- /dev/null +++ b/src/satip/rtp.c @@ -0,0 +1,526 @@ +/* + * Tvheadend - SAT-IP server - RTP part + * + * Copyright (C) 2015 Jaroslav Kysela + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include +#include "tvheadend.h" +#include "input.h" +#include "streaming.h" +#include "satip/server.h" + +#define RTP_PACKETS 128 +#define RTP_PAYLOAD (1356-20-8) +#define RTCP_PAYLOAD (1472-20-8) + +typedef struct satip_rtp_session { + TAILQ_ENTRY(satip_rtp_session) link; + pthread_t tid; + void *id; + struct sockaddr_storage peer; + struct sockaddr_storage peer2; + int port; + th_subscription_t *subs; + streaming_queue_t *sq; + int fd_rtp; + int fd_rtcp; + int frontend; + int source; + dvb_mux_conf_t dmc; + int16_t pids[RTSP_PIDS]; + udp_multisend_t um; + struct iovec *um_iovec; + int um_packet; + uint16_t seq; + signal_status_t sig; +} satip_rtp_session_t; + +static pthread_mutex_t satip_rtp_lock; +static pthread_t satip_rtcp_tid; +static int satip_rtcp_run; +static TAILQ_HEAD(, satip_rtp_session) satip_rtp_sessions; + +static void +satip_rtp_header(satip_rtp_session_t *rtp) +{ + struct iovec *v = rtp->um_iovec + rtp->um_packet; + uint8_t *data = v->iov_base; + uint32_t tstamp = dispatch_clock + rtp->seq; + + rtp->seq++; + + v->iov_len = 12; + data[0] = 0x80; + data[1] = 33; + data[2] = (rtp->seq >> 8) & 0xff; + data[3] = rtp->seq & 0xff; + data[4] = (tstamp >> 24) & 0xff; + data[5] = (tstamp >> 16) & 0xff; + data[6] = (tstamp >> 8) & 0xff; + data[7] = tstamp & 0xff; + memset(data + 8, 0xa5, 8); +} + +static int +satip_rtp_send(satip_rtp_session_t *rtp) +{ + struct iovec *v = rtp->um_iovec, *v2; + int packets, copy, len, r; + if (v->iov_len == RTP_PAYLOAD) { + packets = rtp->um_packet; + v2 = v + packets; + if (v2->iov_len == RTP_PAYLOAD) { + packets++; + copy = 0; + } else + copy = 1; + r = udp_multisend_send(&rtp->um, rtp->fd_rtp, packets); + if (r < 0) + return r; + if (copy) + memcpy(v->iov_base, v2->iov_base, len = v2->iov_len); + else + len = 0; + rtp->um_packet = 0; + udp_multisend_clean(&rtp->um); + v->iov_len = len; + } + if (v->iov_len == 0) + satip_rtp_header(rtp); + return 0; +} + +static int +satip_rtp_loop(satip_rtp_session_t *rtp, uint8_t *data, int len) +{ + int i, pid, last_pid = -1, r; + int16_t *pids = rtp->pids; + struct iovec *v = rtp->um_iovec + rtp->um_packet; + + assert((len % 188) == 0); + for ( ; len >= 188 ; data += 188, len -= 188) { + pid = ((data[1] & 0x1f) << 8) | data[2]; + if (pid != last_pid) { + for (i = 0; i < RTSP_PIDS && pids[i] >= 0; i++) + if (pids[i] == pid) + break; + if (i >= RTSP_PIDS) continue; /* skip PID */ + last_pid = pid; + } + memcpy(v->iov_base + v->iov_len, data, 188); + v->iov_len += 188; + if (v->iov_len >= RTP_PAYLOAD) { + if ((rtp->um_packet + 1) == RTP_PACKETS) { + r = satip_rtp_send(rtp); + if (r < 0) + return r; + } else + rtp->um_packet++; + } + } + return 0; +} + +static void +satip_rtp_signal_status(satip_rtp_session_t *rtp, signal_status_t *sig) +{ + rtp->sig = *sig; +} + +static void * +satip_rtp_thread(void *aux) +{ + satip_rtp_session_t *rtp = aux; + streaming_queue_t *sq = rtp->sq; + streaming_message_t *sm; + th_subscription_t *subs = rtp->subs; + pktbuf_t *pb; + char peername[50]; + int alive = 1, fatal = 0, r; + + tcp_get_ip_str((struct sockaddr *)&rtp->peer, peername, sizeof(peername)); + tvhdebug("satips", "RTP streaming to %s:%d open", peername, rtp->port); + + pthread_mutex_lock(&sq->sq_mutex); + while (rtp->sq && !fatal) { + sm = TAILQ_FIRST(&sq->sq_queue); + if (sm == NULL) { + r = satip_rtp_send(rtp); + if (r) { + fatal = 1; + continue; + } + pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); + continue; + } + TAILQ_REMOVE(&sq->sq_queue, sm, sm_link); + pthread_mutex_unlock(&sq->sq_mutex); + + switch (sm->sm_type) { + case SMT_MPEGTS: + pb = sm->sm_data; + atomic_add(&subs->ths_bytes_out, pktbuf_len(pb)); + r = satip_rtp_loop(rtp, pktbuf_ptr(pb), pktbuf_len(pb)); + if (r) fatal = 1; + break; + case SMT_SIGNAL_STATUS: + satip_rtp_signal_status(rtp, sm->sm_data); + break; + case SMT_NOSTART: + case SMT_EXIT: + alive = 0; + break; + + case SMT_START: + case SMT_STOP: + case SMT_PACKET: + case SMT_GRACE: + case SMT_SKIP: + case SMT_SPEED: + case SMT_SERVICE_STATUS: + case SMT_TIMESHIFT_STATUS: + break; + } + + streaming_msg_free(sm); + pthread_mutex_lock(&sq->sq_mutex); + } + pthread_mutex_unlock(&sq->sq_mutex); + + tvhdebug("satips", "RTP streaming to %s:%d closed (%s request)", + peername, rtp->port, alive ? "remote" : "streaming"); + + return NULL; +} + +/* + * + */ +static satip_rtp_session_t * +satip_rtp_find(void *id) +{ + satip_rtp_session_t *rtp; + + pthread_mutex_lock(&satip_rtp_lock); + TAILQ_FOREACH(rtp, &satip_rtp_sessions, link) + if (rtp == id) + break; + pthread_mutex_unlock(&satip_rtp_lock); + return rtp; +} + +/* + * + */ +void satip_rtp_queue(void *id, th_subscription_t *subs, + streaming_queue_t *sq, + struct sockaddr_storage *peer, int port, + int fd_rtp, int fd_rtcp, + int frontend, int source, dvb_mux_conf_t *dmc, + int16_t *pids) +{ + satip_rtp_session_t *rtp = calloc(1, sizeof(*rtp)); + + if (rtp == NULL) + return; + + rtp->id = id; + rtp->peer = *peer; + rtp->peer2 = *peer; + IP_PORT_SET(rtp->peer2, port + 1); + rtp->port = port; + rtp->fd_rtp = fd_rtp; + rtp->fd_rtcp = fd_rtcp; + rtp->subs = subs; + rtp->sq = sq; + memcpy(rtp->pids, pids, sizeof(*pids)*RTSP_PIDS); + udp_multisend_init(&rtp->um, RTP_PACKETS, RTP_PAYLOAD, &rtp->um_iovec); + satip_rtp_header(rtp); + rtp->frontend = frontend; + rtp->dmc = *dmc; + rtp->source = source; + + pthread_mutex_lock(&satip_rtp_lock); + TAILQ_INSERT_TAIL(&satip_rtp_sessions, rtp, link); + tvhthread_create(&rtp->tid, NULL, satip_rtp_thread, rtp); + pthread_mutex_unlock(&satip_rtp_lock); +} + +void satip_rtp_close(void *id) +{ + satip_rtp_session_t *rtp; + streaming_queue_t *sq; + + pthread_mutex_lock(&satip_rtp_lock); + rtp = satip_rtp_find(id); + if (rtp) { + sq = rtp->sq; + pthread_mutex_lock(&sq->sq_mutex); + rtp->sq = NULL; + pthread_cond_signal(&sq->sq_cond); + pthread_mutex_unlock(&sq->sq_mutex); + pthread_join(rtp->tid, NULL); + pthread_mutex_lock(&satip_rtp_lock); + udp_multisend_free(&rtp->um); + free(rtp); + } + pthread_mutex_unlock(&satip_rtp_lock); +} + +/* + * + */ +static const char * +satip_rtcp_fec(int fec) +{ + static char buf[16]; + const char *s = dvb_fec2str(fec); + char *p = buf; + if (s == NULL) + return ""; + strncpy(buf, s, sizeof(buf)); + buf[sizeof(buf)-1] = '\0'; + p = strchr(buf, '/'); + while (*p) { + *p = *(p+1); + p++; + } + return s; +} + +/* + * + */ +static int +satip_rtcp_build(satip_rtp_session_t *rtp, uint8_t *msg) +{ + char buf[1500], pids[1400]; + const char *delsys, *msys, *pilot, *rolloff; + const char *bw, *tmode, *gi, *plp, *t2id, *sm, *c2tft, *ds, *specinv; + int i, len, len2, level = 0, lock = 0, quality = 0; + + pids[0] = 0; + for (i = len = 00; i < RTSP_PIDS && rtp->pids[i] >= 0; i++) + len += snprintf(pids + len, sizeof(pids) - len, "%d,", rtp->pids[i]); + if (len && pids[len-1] == ',') + pids[len-1] = '\0'; + + switch (rtp->dmc.dmc_fe_delsys) { + case DVB_SYS_DVBS: + case DVB_SYS_DVBS2: + delsys = rtp->dmc.dmc_fe_delsys == DVB_SYS_DVBS ? "dvbs" : "dvbs2"; + switch (rtp->dmc.dmc_fe_modulation) { + case DVB_MOD_QPSK: msys = "qpsk"; break; + case DVB_MOD_PSK_8: msys = "8psk"; break; + default: msys = ""; break; + } + switch (rtp->dmc.dmc_fe_pilot) { + case DVB_PILOT_ON: pilot = "on"; break; + case DVB_PILOT_OFF: pilot = "off"; break; + default: pilot = ""; break; + } + switch (rtp->dmc.dmc_fe_rolloff) { + case DVB_ROLLOFF_20: rolloff = "20"; break; + case DVB_ROLLOFF_25: rolloff = "25"; break; + case DVB_ROLLOFF_35: rolloff = "35"; break; + default: rolloff = ""; break; + } + /* ver=.;src=;tuner=,,,,,,\ + * ,,,,,;pids=,..., + */ + snprintf(buf, sizeof(buf), + "vers=1.0;src=%d;tuner=%d,%d,%d,%d,%.f,%s,%s,%s,%s,%s,%.f,%s;pids=%s", + rtp->source, rtp->frontend, level, lock, quality, + (float)rtp->dmc.dmc_fe_freq / 1000.0, + dvb_pol2str(rtp->dmc.u.dmc_fe_qpsk.polarisation), + delsys, msys, pilot, rolloff, + (float)rtp->dmc.u.dmc_fe_qpsk.symbol_rate / 1000.0, + satip_rtcp_fec(rtp->dmc.u.dmc_fe_qpsk.fec_inner), + pids); + break; + case DVB_SYS_DVBT: + case DVB_SYS_DVBT2: + delsys = rtp->dmc.dmc_fe_delsys == DVB_SYS_DVBT ? "dvbt" : "dvbt2"; + switch (rtp->dmc.u.dmc_fe_ofdm.bandwidth) { + case DVB_BANDWIDTH_1_712_MHZ: bw = "1.712"; break; + case DVB_BANDWIDTH_5_MHZ: bw = "5"; break; + case DVB_BANDWIDTH_6_MHZ: bw = "6"; break; + case DVB_BANDWIDTH_7_MHZ: bw = "7"; break; + case DVB_BANDWIDTH_8_MHZ: bw = "8"; break; + case DVB_BANDWIDTH_10_MHZ: bw = "10"; break; + default: bw = ""; break; + } + switch (rtp->dmc.u.dmc_fe_ofdm.transmission_mode) { + case DVB_TRANSMISSION_MODE_1K: tmode = "1k"; break; + case DVB_TRANSMISSION_MODE_2K: tmode = "2k"; break; + case DVB_TRANSMISSION_MODE_4K: tmode = "4k"; break; + case DVB_TRANSMISSION_MODE_8K: tmode = "8k"; break; + case DVB_TRANSMISSION_MODE_16K: tmode = "16k"; break; + case DVB_TRANSMISSION_MODE_32K: tmode = "32k"; break; + default: tmode = ""; break; + } + switch (rtp->dmc.dmc_fe_modulation) { + case DVB_MOD_QAM_16: msys = "qam16"; break; + case DVB_MOD_QAM_32: msys = "qam32"; break; + case DVB_MOD_QAM_64: msys = "qam64"; break; + case DVB_MOD_QAM_128: msys = "qam128"; break; + default: msys = ""; break; + } + switch (rtp->dmc.u.dmc_fe_ofdm.guard_interval) { + case DVB_GUARD_INTERVAL_1_4: gi = "14"; break; + case DVB_GUARD_INTERVAL_1_8: gi = "18"; break; + case DVB_GUARD_INTERVAL_1_16: gi = "116"; break; + case DVB_GUARD_INTERVAL_1_32: gi = "132"; break; + case DVB_GUARD_INTERVAL_1_128: gi = "1128"; break; + case DVB_GUARD_INTERVAL_19_128: gi = "19128"; break; + case DVB_GUARD_INTERVAL_19_256: gi = "19256"; break; + default: gi = ""; break; + } + plp = ""; + t2id = ""; + sm = ""; + /* ver=1.1;tuner=,,,,,,,,,,\ + * ,,,;pids=,..., + */ + snprintf(buf, sizeof(buf), + "vers=1.1;tuner=%d,%d,%d,%d,%.f,%s,%s,%s,%s,%s,%s,%s,%s,%s;pids=%s", + rtp->frontend, level, lock, quality, + (float)rtp->dmc.dmc_fe_freq / 1000.0, + bw, delsys, tmode, msys, gi, + satip_rtcp_fec(rtp->dmc.u.dmc_fe_ofdm.code_rate_HP), + plp, t2id, sm, pids); + break; + case DVB_SYS_DVBC_ANNEX_A: + case DVB_SYS_DVBC_ANNEX_C: + delsys = rtp->dmc.dmc_fe_delsys == DVB_SYS_DVBC_ANNEX_A ? "dvbc" : "dvbc2"; + bw = ""; + switch (rtp->dmc.dmc_fe_modulation) { + case DVB_MOD_QAM_16: msys = "qam16"; break; + case DVB_MOD_QAM_32: msys = "qam32"; break; + case DVB_MOD_QAM_64: msys = "qam64"; break; + case DVB_MOD_QAM_128: msys = "qam128"; break; + default: msys = ""; break; + } + c2tft = ""; + ds = ""; + plp = ""; + specinv = ""; + /* ver=1.2;tuner=,,,,,,,,,,,, + * ;pids=,..., + */ + snprintf(buf, sizeof(buf), + "vers=1.1;tuner=%d,%d,%d,%d,%.f,%s,%s,%s,%.f,%s,%s,%s,%s;pids=%s", + rtp->frontend, level, lock, quality, + (float)rtp->dmc.dmc_fe_freq / 1000.0, + bw, delsys, msys, + (float)rtp->dmc.u.dmc_fe_qam.symbol_rate / 1000.0, + c2tft, ds, plp, specinv, pids); + break; + default: + return 0; + } + + len = len2 = strlen(buf); + while ((len % 4) != 0) + buf[len++] = 0; + memcpy(msg + 16, buf, len); + + len += 16; + msg[0] = 0x80; + msg[1] = 204; + msg[2] = (len >> 8) & 0xff; + msg[3] = len & 0xff; + msg[4] = 0; + msg[5] = 0; + msg[6] = 0; + msg[7] = 0; + msg[8] = 'S'; + msg[9] = 'E'; + msg[10] = 'S'; + msg[11] = '1'; + msg[12] = 0; + msg[13] = 0; + msg[14] = (len2 >> 8) & 0xff; + msg[15] = len2 & 0xff; + + return len2; +} + +/* + * + */ +static void * +satip_rtcp_thread(void *aux) +{ + satip_rtp_session_t *rtp; + struct timespec ts; + uint8_t msg[RTCP_PAYLOAD]; + char addrbuf[50]; + int r, len, err; + + while (satip_rtcp_run) { + ts.tv_sec = 0; + ts.tv_nsec = 150000000; + while (1) { + nanosleep(&ts, &ts); + if (satip_rtcp_run) + goto end; + } while (ts.tv_nsec); + pthread_mutex_lock(&satip_rtp_lock); + TAILQ_FOREACH(rtp, &satip_rtp_sessions, link) { + if (rtp->sq == NULL) continue; + len = satip_rtcp_build(rtp, msg); + if (len <= 0) continue; + r = sendto(rtp->fd_rtcp, msg, len, 0, + (struct sockaddr*)&rtp->peer2, + rtp->peer2.ss_family == AF_INET6 ? + sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); + if (r) { + err = errno; + tcp_get_ip_str((struct sockaddr*)&rtp->peer2, addrbuf, sizeof(addrbuf)); + tvhwarn("satips", "RTCP send to error %s:%d : %s", + addrbuf, IP_PORT(rtp->peer2), strerror(err)); + } + } + pthread_mutex_unlock(&satip_rtp_lock); + } +end: + return NULL; +} + +/* + * + */ +void satip_rtp_init(void) +{ + TAILQ_INIT(&satip_rtp_sessions); + pthread_mutex_init(&satip_rtp_lock, NULL); + + satip_rtcp_run = 1; + tvhthread_create(&satip_rtcp_tid, NULL, satip_rtcp_thread, NULL); +} + +/* + * + */ +void satip_rtp_done(void) +{ + assert(TAILQ_EMPTY(&satip_rtp_sessions)); + satip_rtcp_run = 0; + pthread_kill(satip_rtcp_tid, SIGTERM); + pthread_join(satip_rtcp_tid, NULL); +} diff --git a/src/satip/rtsp.c b/src/satip/rtsp.c index 229d75693..531ddc180 100644 --- a/src/satip/rtsp.c +++ b/src/satip/rtsp.c @@ -18,36 +18,34 @@ */ #include "tvheadend.h" -#include "input.h" #include "htsbuf.h" -#include "htsmsg_xml.h" -#include "upnp.h" -#include "http.h" -#include "settings.h" #include "config.h" +#include "profile.h" #include "satip/server.h" #include -#include -#include - -#if defined(PLATFORM_FREEBSD) || ENABLE_ANDROID -#include -#include -#endif #define RTSP_TIMEOUT 30 -#define RTSP_PIDS 128 +#define RTP_BUFSIZE (256*1024) +#define RTCP_BUFSIZE (16*1024) typedef struct session { TAILQ_ENTRY(session) link; int delsys; int stream; int frontend; + int findex; + uint32_t nsession; char session[9]; dvb_mux_conf_t dmc; int16_t pids[RTSP_PIDS]; gtimer_t timer; + dvb_mux_t *mux; + int mux_created; + profile_chain_t prch; + th_subscription_t *subs; + udp_connection_t *udp_rtp; + udp_connection_t *udp_rtcp; } session_t; static uint32_t session_number; @@ -65,26 +63,36 @@ static void rtsp_free_session(session_t *rs); * */ static int -rtsp_delsys(int fe) +rtsp_delsys(int fe, int *findex) { - int i; + int res, i; if (fe < 1) return DVB_SYS_NONE; pthread_mutex_lock(&global_lock); i = config_get_int("satip_dvbt", 0); - if (fe <= i) - return DVB_SYS_DVBT; + if (fe <= i) { + res = DVB_SYS_DVBT; + goto result; + } fe -= i; i = config_get_int("satip_dvbs", 0); - if (fe <= i) - return DVB_SYS_DVBS; + if (fe <= i) { + res = DVB_SYS_DVBS; + goto result; + } fe -= i; i = config_get_int("satip_dvbc", 0); - if (fe <= i) - return DVB_SYS_DVBC_ANNEX_A; + if (fe <= i) { + res = DVB_SYS_DVBC_ANNEX_A; + goto result; + } pthread_mutex_unlock(&global_lock); return DVB_SYS_NONE; +result: + pthread_mutex_unlock(&global_lock); + *findex = i; + return res; } /* @@ -97,8 +105,10 @@ rtsp_new_session(int delsys) if (rs == NULL) return NULL; rs->delsys = delsys; + rs->nsession = session_number; snprintf(rs->session, sizeof(rs->session), "%08X", session_number); session_number += 9876; + TAILQ_INSERT_TAIL(&rtsp_sessions, rs, link); return rs; } @@ -238,13 +248,94 @@ rtsp_delpids(session_t *rs, int16_t *pids) return 0; } +/* + * + */ +static void +rtsp_clean(session_t *rs) +{ + if (rs->subs) { + subscription_unsubscribe(rs->subs); + rs->subs = NULL; + } + if (rs->prch.prch_pro) + profile_chain_close(&rs->prch); + if (rs->mux && rs->mux_created) { + rs->mux->mm_delete((mpegts_mux_t *)rs->mux, 1); + rs->mux = NULL; + rs->mux_created = 0; + } +} + /* * */ static int -rtsp_start(session_t *rs) +rtsp_start(http_connection_t *hc, session_t *rs) { + mpegts_network_t *mn; + dvb_network_t *ln; + char buf[256], addrbuf[50]; + int res = HTTP_STATUS_SERVICE, qsize = 3000000; + + if (rs->mux) + return 0; + rs->mux_created = 0; + pthread_mutex_lock(&global_lock); + LIST_FOREACH(mn, &mpegts_network_all, mn_global_link) { + ln = (dvb_network_t *)mn; + if (ln->ln_type == rs->dmc.dmc_fe_type && + mn->mn_satip_source == rs->findex) + break; + } + if (mn) { + rs->mux = dvb_network_find_mux((dvb_network_t *)mn, &rs->dmc, + MPEGTS_ONID_NONE, MPEGTS_TSID_NONE); + if (rs->mux == NULL) { + rs->mux = (dvb_mux_t *) + mn->mn_create_mux(mn, (void *)(intptr_t)rs->nsession, + MPEGTS_ONID_NONE, MPEGTS_TSID_NONE, + &rs->dmc, 0); + if (rs->mux) + rs->mux_created = 1; + } + } + if (rs->mux == NULL) { + dvb_mux_conf_str(&rs->dmc, buf, sizeof(buf)); + tvhwarn("satips", "%i: unable to create mux %s", rs->frontend, buf); + goto end; + } + if (profile_chain_raw_open(&rs->prch, (mpegts_mux_t *)rs->mux, qsize)) + goto endclean; + tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, sizeof(addrbuf)); + rs->subs = subscription_create_from_mux(&rs->prch, NULL, + config_get_int("satip_weight", 100), + "SAT>IP", + SUBSCRIPTION_FULLMUX | SUBSCRIPTION_STREAMING, + addrbuf, hc->hc_username, + http_arg_get(&hc->hc_args, "User-Agent"), NULL); + if (!rs->subs) + goto endclean; + memset(&rs->udp_rtp, 0, sizeof(rs->udp_rtp)); + memset(&rs->udp_rtcp, 0, sizeof(rs->udp_rtcp)); + if (udp_bind_double(&rs->udp_rtp, &rs->udp_rtcp, + "satips", "rtsp", "rtcp", + addrbuf, 0, NULL, + 4*1024, 4*1024, + RTP_BUFSIZE, RTCP_BUFSIZE)) + goto endclean; + satip_rtp_queue((void *)(intptr_t)rs->nsession, + rs->subs, &rs->prch.prch_sq, + hc->hc_peer, ntohs(IP_PORT(rs->udp_rtp->ip)), + rs->udp_rtp->fd, rs->udp_rtcp->fd, + rs->frontend, rs->findex, &rs->mux->lm_tuning, rs->pids); return 0; + +endclean: + rtsp_clean(rs); +end: + pthread_mutex_unlock(&global_lock); + return res; } @@ -473,13 +564,14 @@ static int rtsp_process_play(http_connection_t *hc, int setup) { session_t *rs; - int errcode = HTTP_STATUS_BAD_REQUEST; + int errcode = HTTP_STATUS_BAD_REQUEST, r, findex = 0; int stream, delsys = DVB_SYS_NONE, msys, fe, src, freq, pol, sr; int fec, ro, plts, bw, tmode, mtype, gi, plp, t2id, sm, c2tft, ds, specinv; char *u, *s; char *pids, *addpids, *delpids; int16_t _pids[RTSP_PIDS+1], _addpids[RTSP_PIDS+1], _delpids[RTSP_PIDS+1]; dvb_mux_conf_t *dmc; + char buf[256]; u = tvh_strdupa(hc->hc_url); if ((u = rtsp_check_urlbase(u)) == NULL || @@ -507,7 +599,7 @@ rtsp_process_play(http_connection_t *hc, int setup) rs = rtsp_find_session(hc); if (fe > 0) { - delsys = rtsp_delsys(fe); + delsys = rtsp_delsys(fe, &findex); if (delsys == DVB_SYS_NONE) goto error; } @@ -533,6 +625,7 @@ rtsp_process_play(http_connection_t *hc, int setup) dmc = &rs->dmc; dvb_mux_conf_init(dmc, msys); rs->frontend = fe; + rs->findex = findex; pids = http_arg_get_remove(&hc->hc_req_args, "pids"); if (parse_pids(pids, _pids)) goto error; @@ -615,8 +708,8 @@ rtsp_process_play(http_connection_t *hc, int setup) if (!TAILQ_EMPTY(&hc->hc_req_args)) goto error; - dmc->u.dmc_fe_qpsk.symbol_rate = sr; - dmc->u.dmc_fe_qpsk.fec_inner = DVB_FEC_NONE; + dmc->u.dmc_fe_qam.symbol_rate = sr; + dmc->u.dmc_fe_qam.fec_inner = DVB_FEC_NONE; dmc->dmc_fe_inversion = specinv; dmc->dmc_fe_stream_id = plp; dmc->dmc_fe_pls_code = ds; /* check */ @@ -627,6 +720,9 @@ rtsp_process_play(http_connection_t *hc, int setup) } + dvb_mux_conf_str(dmc, buf, sizeof(buf)); + tvhdebug("satips", "%i: setup %s", rs->frontend, buf); + dmc->dmc_fe_freq = freq; dmc->dmc_fe_modulation = mtype; @@ -641,10 +737,11 @@ play: rtsp_delpids(rs, _delpids); if (addpids) rtsp_addpids(rs, _addpids); - if (rtsp_start(rs) < 0) { - errcode = HTTP_STATUS_SERVICE;; + if ((r = rtsp_start(hc, rs)) < 0) { + errcode = r; goto error; } + tvhdebug("satips", "%i: play", rs->frontend); end: pthread_mutex_unlock(&rtsp_lock); @@ -742,6 +839,12 @@ rtsp_serve(int fd, void **opaque, struct sockaddr_storage *peer, static void rtsp_close_session(session_t *rs) { + satip_rtp_close((void *)(intptr_t)rs->nsession); + udp_close(rs->udp_rtp); + udp_close(rs->udp_rtcp); + pthread_mutex_lock(&global_lock); + rtsp_clean(rs); + pthread_mutex_unlock(&global_lock); gtimer_disarm(&rs->timer); } @@ -786,6 +889,7 @@ void satip_server_rtsp_init(const char *bindaddr, int port) session_number = *(uint32_t *)rnd; TAILQ_INIT(&rtsp_sessions); pthread_mutex_init(&rtsp_lock, NULL); + satip_rtp_init(); } if (rtsp_port != port && rtsp_server) { pthread_mutex_lock(&rtsp_lock); @@ -817,5 +921,6 @@ void satip_server_rtsp_done(void) rtsp_port = -1; free(rtsp_ip); rtsp_ip = NULL; + satip_rtp_done(); pthread_mutex_unlock(&global_lock); } diff --git a/src/satip/server.c b/src/satip/server.c index aed63d11d..2ac8e8279 100644 --- a/src/satip/server.c +++ b/src/satip/server.c @@ -18,23 +18,11 @@ */ #include "tvheadend.h" -#include "input.h" -#include "htsbuf.h" -#include "htsmsg_xml.h" #include "upnp.h" -#include "http.h" #include "settings.h" #include "config.h" #include "satip/server.h" -#include -#include - -#if defined(PLATFORM_FREEBSD) || ENABLE_ANDROID -#include -#include -#endif - #define UPNP_MAX_AGE 1800 static char *http_server_ip; diff --git a/src/satip/server.h b/src/satip/server.h index 41cb18571..f1a439a81 100644 --- a/src/satip/server.h +++ b/src/satip/server.h @@ -29,6 +29,26 @@ #include "udp.h" #include "http.h" +#define RTSP_PIDS 128 + +void satip_rtp_queue(void *id, th_subscription_t *subs, + streaming_queue_t *sq, + struct sockaddr_storage *peer, int port, + int fd_rtp, int fd_rtcp, + int frontend, int source, + dvb_mux_conf_t *dmc, + int16_t *pids); +void satip_rtp_update(void *id, th_subscription_t *subs, + streaming_queue_t *sq, + int frontend, int source, + dvb_mux_conf_t *dmc, + int16_t *pids); +void satip_rtp_update_pids(void *id, int16_t *pids); +void satip_rtp_close(void *id); + +void satip_rtp_init(void); +void satip_rtp_done(void); + void satip_server_rtsp_init(const char *bindaddr, int port); void satip_server_rtsp_register(void); void satip_server_rtsp_done(void); diff --git a/src/udp.c b/src/udp.c index 58f41b540..ff5ef2324 100644 --- a/src/udp.c +++ b/src/udp.c @@ -679,7 +679,7 @@ udp_multisend_init( udp_multisend_t *um, int packets, int psize, ((struct mmsghdr *)um->um_msg)[i].msg_hdr.msg_iov = &um->um_iovec[i]; ((struct mmsghdr *)um->um_msg)[i].msg_hdr.msg_iovlen = 1; um->um_iovec[i].iov_base = um->um_data + i * psize; - um->um_iovec[i].iov_len = psize; + um->um_iovec[i].iov_len = 0; } *iovec = um->um_iovec; } @@ -696,6 +696,14 @@ udp_multisend_free( udp_multisend_t *um ) um->um_packets = 0; } +void +udp_multisend_clean( udp_multisend_t *um ) +{ + int i; + for (i = 0; i < um->um_packets; i++) + um->um_iovec[i].iov_len = 0; +} + int udp_multisend_send( udp_multisend_t *um, int fd, int packets ) { diff --git a/src/udp.h b/src/udp.h index 995bfa650..5bc142014 100644 --- a/src/udp.h +++ b/src/udp.h @@ -89,6 +89,8 @@ void udp_multisend_init( udp_multisend_t *um, int packets, int psize, struct iovec **iovec ); void +udp_multisend_clean( udp_multisend_t *um ); +void udp_multisend_free( udp_multisend_t *um ); int udp_multisend_send( udp_multisend_t *um, int fd, int packets ); -- 2.47.3