From: Jaroslav Kysela Date: Mon, 30 Nov 2015 16:06:56 +0000 (+0100) Subject: SAT>IP server: implement embedded RTSP TCP data transfer mode X-Git-Tag: v4.2.1~1443 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=640e522c6beb70036724c00a19d50d12fb993f07;p=thirdparty%2Ftvheadend.git SAT>IP server: implement embedded RTSP TCP data transfer mode --- diff --git a/src/http.c b/src/http.c index 3122bb7e8..1d91ed20c 100644 --- a/src/http.c +++ b/src/http.c @@ -217,7 +217,7 @@ static const char *cachemonths[12] = { * Transmit a HTTP reply */ void -http_send_header(http_connection_t *hc, int rc, const char *content, +http_send_header(http_connection_t *hc, int rc, const char *content, int64_t contentlen, const char *encoding, const char *location, int maxage, const char *range, @@ -230,6 +230,8 @@ http_send_header(http_connection_t *hc, int rc, const char *content, time_t t; int sess = 0; + lock_assert(&hc->hc_fd_lock); + htsbuf_queue_init(&hdrs, 0); htsbuf_qprintf(&hdrs, "%s %d %s\r\n", @@ -378,6 +380,7 @@ http_send_reply(http_connection_t *hc, int rc, const char *content, } #endif + pthread_mutex_lock(&hc->hc_fd_lock); http_send_header(hc, rc, content, size, encoding, location, maxage, 0, NULL, NULL); @@ -387,6 +390,7 @@ http_send_reply(http_connection_t *hc, int rc, const char *content, else tvh_write(hc->hc_fd, data, size); } + pthread_mutex_unlock(&hc->hc_fd_lock); free(data); } @@ -1095,6 +1099,7 @@ http_serve_requests(http_connection_t *hc) char *argv[3], *c, *cmdline = NULL, *hdrline = NULL; int n, r; + pthread_mutex_init(&hc->hc_fd_lock, NULL); http_arg_init(&hc->hc_args); http_arg_init(&hc->hc_req_args); htsbuf_queue_init(&spill, 0); diff --git a/src/http.h b/src/http.h index 7e17dc195..c0b9b52d3 100644 --- a/src/http.h +++ b/src/http.h @@ -121,6 +121,7 @@ typedef enum http_ver { } http_ver_t; typedef struct http_connection { + pthread_mutex_t hc_fd_lock; int hc_fd; struct sockaddr_storage *hc_peer; char *hc_peer_ipstr; diff --git a/src/satip/rtp.c b/src/satip/rtp.c index 19b71a012..51e60fb14 100644 --- a/src/satip/rtp.c +++ b/src/satip/rtp.c @@ -31,6 +31,7 @@ #define RTP_PACKETS 128 #define RTP_PAYLOAD (7*188+12) +#define RTP_TCP_PAYLOAD (87*188+12+4) /* cca 16kB */ #define RTCP_PAYLOAD (1420) typedef struct satip_rtp_table { @@ -63,6 +64,7 @@ typedef struct satip_rtp_session { signal_status_t sig; int sig_lock; pthread_mutex_t lock; + pthread_mutex_t *tcp_lock; uint8_t *table_data; int table_data_len; } satip_rtp_session_t; @@ -134,9 +136,8 @@ satip_rtp_pmt_cb(mpegts_psi_table_t *mt, const uint8_t *buf, int len) } static void -satip_rtp_header(satip_rtp_session_t *rtp) +satip_rtp_header(satip_rtp_session_t *rtp, struct iovec *v) { - struct iovec *v = rtp->um_iovec + rtp->um_packet; uint8_t *data = v->iov_base; uint32_t tstamp = dispatch_clock + rtp->seq; @@ -179,7 +180,7 @@ satip_rtp_send(satip_rtp_session_t *rtp) v->iov_len = len; } if (v->iov_len == 0) - satip_rtp_header(rtp); + satip_rtp_header(rtp, rtp->um_iovec + rtp->um_packet); return 0; } @@ -198,7 +199,7 @@ satip_rtp_append_data(satip_rtp_session_t *rtp, struct iovec **_v, uint8_t *data return r; } else { rtp->um_packet++; - satip_rtp_header(rtp); + satip_rtp_header(rtp, rtp->um_iovec + rtp->um_packet); } *_v = rtp->um_iovec + rtp->um_packet; } else { @@ -233,7 +234,7 @@ found: dvb_table_parse(&tbl->tbl, "-", data, 188, 1, 0, satip_rtp_pmt_cb); if (rtp->table_data_len) { for (i = 0; i < rtp->table_data_len; i += 188) { - r = satip_rtp_append_data(rtp, &v, data); + r = satip_rtp_append_data(rtp, &v, rtp->table_data + i); if (r < 0) return r; } @@ -253,6 +254,90 @@ found: return 0; } +static void +satip_rtp_tcp_data(satip_rtp_session_t *rtp, uint8_t stream, uint8_t *data, size_t data_len) +{ + assert(data_len <= 0xffff); + data[0] = '$'; + data[1] = stream; + data[2] = (data_len - 4) >> 8; + data[3] = data_len & 0xff; + pthread_mutex_lock(rtp->tcp_lock); + tvh_write(rtp->fd_rtp, data, data_len); + pthread_mutex_unlock(rtp->tcp_lock); +} + +static inline void +satip_rtp_flush_tcp_data(satip_rtp_session_t *rtp, struct iovec *v) +{ + if (v->iov_len) + satip_rtp_tcp_data(rtp, 0, v->iov_base, v->iov_len); + free(v->iov_base); + v->iov_base = NULL; + v->iov_len = 0; +} + +static inline int +satip_rtp_append_tcp_data(satip_rtp_session_t *rtp, struct iovec *v, uint8_t *data, size_t len) +{ + if (v->iov_base == NULL) { + v->iov_base = malloc(RTP_TCP_PAYLOAD); + v->iov_len = 4; /* keep room for RTSP embedded data header */ + satip_rtp_header(rtp, v); + } + assert(v->iov_len + len <= RTP_TCP_PAYLOAD); + memcpy(v->iov_base + v->iov_len, data, len); + v->iov_len += len; + if (v->iov_len == RTP_TCP_PAYLOAD) + satip_rtp_flush_tcp_data(rtp, v); + return 0; +} + + +static int +satip_rtp_tcp_loop(satip_rtp_session_t *rtp, uint8_t *data, int len) +{ + int i, j, pid, last_pid = -1, r; + mpegts_apid_t *pids = rtp->pids.pids; + struct iovec v = {NULL, 0}; + satip_rtp_table_t *tbl; + + assert((len % 188) == 0); + if (len > 0) + rtp->sig_lock = 1; + for ( ; len >= 188 ; data += 188, len -= 188) { + pid = ((data[1] & 0x1f) << 8) | data[2]; + if (pid != last_pid && !rtp->pids.all) { + for (i = 0; i < rtp->pids.count; i++) { + j = pids[i].pid; + if (pid < j) break; + if (j == pid) goto found; + } + continue; +found: + TAILQ_FOREACH(tbl, &rtp->pmt_tables, link) + if (tbl->pid == pid) { + dvb_table_parse(&tbl->tbl, "-", data, 188, 1, 0, satip_rtp_pmt_cb); + if (rtp->table_data_len) { + satip_rtp_append_tcp_data(rtp, &v, rtp->table_data, rtp->table_data_len); + free(rtp->table_data); + rtp->table_data = NULL; + } + break; + } + if (tbl) + continue; + last_pid = pid; + } + r = satip_rtp_append_tcp_data(rtp, &v, data, 188); + if (r < 0) + return r; + } + if (v.iov_len) + satip_rtp_flush_tcp_data(rtp, &v); + return 0; +} + static void satip_rtp_signal_status(satip_rtp_session_t *rtp, signal_status_t *sig) { @@ -271,9 +356,11 @@ satip_rtp_thread(void *aux) pktbuf_t *pb; char peername[50]; int alive = 1, fatal = 0, r; + int tcp = rtp->port == RTSP_TCP_DATA; tcp_get_str_from_ip((struct sockaddr *)&rtp->peer, peername, sizeof(peername)); - tvhdebug("satips", "RTP streaming to %s:%d open", peername, rtp->port); + tvhdebug("satips", "RTP streaming to %s:%d open", peername, + tcp ? IP_PORT(rtp->peer) : rtp->port); pthread_mutex_lock(&sq->sq_mutex); while (rtp->sq && !fatal) { @@ -295,7 +382,10 @@ satip_rtp_thread(void *aux) pb = sm->sm_data; subscription_add_bytes_out(subs, pktbuf_len(pb)); pthread_mutex_lock(&rtp->lock); - r = satip_rtp_loop(rtp, pktbuf_ptr(pb), pktbuf_len(pb)); + if (tcp) + r = satip_rtp_tcp_loop(rtp, pktbuf_ptr(pb), pktbuf_len(pb)); + else + r = satip_rtp_loop(rtp, pktbuf_ptr(pb), pktbuf_len(pb)); pthread_mutex_unlock(&rtp->lock); if (r) fatal = 1; break; @@ -350,12 +440,14 @@ satip_rtp_find(void *id) */ void satip_rtp_queue(void *id, th_subscription_t *subs, streaming_queue_t *sq, + pthread_mutex_t *tcp_lock, struct sockaddr_storage *peer, int port, int fd_rtp, int fd_rtcp, int frontend, int source, dvb_mux_conf_t *dmc, mpegts_apids_t *pids, int perm_lock) { satip_rtp_session_t *rtp = calloc(1, sizeof(*rtp)); + int dscp; if (rtp == NULL) return; @@ -363,29 +455,28 @@ void satip_rtp_queue(void *id, th_subscription_t *subs, rtp->id = id; rtp->peer = *peer; rtp->peer2 = *peer; - IP_PORT_SET(rtp->peer2, htons(port + 1)); + if (port != RTSP_TCP_DATA) + IP_PORT_SET(rtp->peer2, htons(port + 1)); rtp->port = port; rtp->fd_rtp = fd_rtp; rtp->fd_rtcp = fd_rtcp; rtp->subs = subs; rtp->sq = sq; + rtp->tcp_lock = tcp_lock; mpegts_pid_init(&rtp->pids); mpegts_pid_copy(&rtp->pids, pids); TAILQ_INIT(&rtp->pmt_tables); udp_multisend_init(&rtp->um, RTP_PACKETS, RTP_PAYLOAD, &rtp->um_iovec); - satip_rtp_header(rtp); + satip_rtp_header(rtp, rtp->um_iovec); rtp->frontend = frontend; rtp->dmc = *dmc; rtp->source = source; pthread_mutex_init(&rtp->lock, NULL); - if (config.dscp >= 0) { - socket_set_dscp(rtp->fd_rtp, config.dscp, NULL, 0); - socket_set_dscp(rtp->fd_rtcp, config.dscp, NULL, 0); - } else { - socket_set_dscp(rtp->fd_rtp, IPTOS_DSCP_EF, NULL, 0); - socket_set_dscp(rtp->fd_rtcp, IPTOS_DSCP_EF, NULL, 0); - } + dscp = config.dscp >= 0 ? config.dscp : IPTOS_DSCP_EF; + socket_set_dscp(rtp->fd_rtp, dscp, NULL, 0); + if (rtp->fd_rtcp >= 0) + socket_set_dscp(rtp->fd_rtcp, dscp, NULL, 0); if (perm_lock) { rtp->sig.signal_scale = SIGNAL_STATUS_SCALE_RELATIVE; @@ -462,12 +553,15 @@ void satip_rtp_close(void *id) if (rtp) { TAILQ_REMOVE(&satip_rtp_sessions, rtp, link); sq = rtp->sq; + pthread_mutex_lock(rtp->tcp_lock); pthread_mutex_lock(&sq->sq_mutex); rtp->sq = NULL; pthread_cond_signal(&sq->sq_cond); pthread_mutex_unlock(&sq->sq_mutex); pthread_mutex_unlock(&satip_rtp_lock); + pthread_mutex_lock(rtp->tcp_lock); pthread_join(rtp->tid, NULL); + pthread_mutex_unlock(rtp->tcp_lock); udp_multisend_free(&rtp->um); mpegts_pid_done(&rtp->pids); while ((tbl = TAILQ_FIRST(&rtp->pmt_tables)) != NULL) { @@ -776,10 +870,15 @@ satip_rtcp_thread(void *aux) tcp_get_str_from_ip((struct sockaddr*)&rtp->peer2, addrbuf, sizeof(addrbuf)); tvhtrace("satips", "RTCP send to %s:%d : %s", addrbuf, IP_PORT(rtp->peer2), msg + 16); } - r = sendto(rtp->fd_rtcp, msg, len, 0, - (struct sockaddr*)&rtp->peer2, - rtp->peer2.ss_family == AF_INET6 ? - sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); + if (rtp->port == RTSP_TCP_DATA) { + satip_rtp_tcp_data(rtp, 1, msg, len); + r = 0; + } else { + r = sendto(rtp->fd_rtcp, msg, len, 0, + (struct sockaddr*)&rtp->peer2, + rtp->peer2.ss_family == AF_INET6 ? + sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); + } if (r < 0) { err = errno; tcp_get_str_from_ip((struct sockaddr*)&rtp->peer2, addrbuf, sizeof(addrbuf)); diff --git a/src/satip/rtsp.c b/src/satip/rtsp.c index 58685cbf0..14ec8b68c 100644 --- a/src/satip/rtsp.c +++ b/src/satip/rtsp.c @@ -21,6 +21,7 @@ #include "htsbuf.h" #include "config.h" #include "profile.h" +#include "streaming.h" #include "satip/server.h" #include "input/mpegts/iptv/iptv_private.h" @@ -581,8 +582,9 @@ pids: goto endclean; satip_rtp_queue((void *)(intptr_t)rs->stream, rs->subs, &rs->prch.prch_sq, - hc->hc_peer, rs->rtp_peer_port, - rs->udp_rtp->fd, rs->udp_rtcp->fd, + &hc->hc_fd_lock, hc->hc_peer, rs->rtp_peer_port, + rs->udp_rtp ? rs->udp_rtp->fd : -1, + rs->udp_rtcp ? rs->udp_rtcp->fd : -1, rs->frontend, rs->findex, &rs->dmc_tuned, &rs->pids, rs->perm_lock); if (!rs->pids.all && rs->pids.count == 0) @@ -804,19 +806,24 @@ parse_transport(http_connection_t *hc) const char *s = http_arg_get(&hc->hc_args, "Transport"); const char *u; int a, b; - if (!s || strncmp(s, "RTP/AVP;unicast;client_port=", 28)) + if (!s) return -1; - for (s += 28, u = s; isdigit(*u); u++); - if (*u != '-') - return -1; - a = atoi(s); - for (s = ++u; isdigit(*s); s++); - if (*s != '\0' && *s != ';') - return -1; - b = atoi(u); - if (a + 1 != b) - return -1; - return a; + if (strncmp(s, "RTP/AVP;unicast;client_port=", 28) == 0) { + for (s += 28, u = s; isdigit(*u); u++); + if (*u != '-') + return -1; + a = atoi(s); + for (s = ++u; isdigit(*s); s++); + if (*s != '\0' && *s != ';') + return -1; + b = atoi(u); + if (a + 1 != b) + return -1; + return a; + } else if (strncmp(s, "RTP/AVP/TCP;interleaved=0-1", 27) == 0) { + return RTSP_TCP_DATA; + } + return -1; } /* @@ -1145,7 +1152,9 @@ rtsp_process_options(http_connection_t *hc) http_arg_set(&args, "Public", "OPTIONS,DESCRIBE,SETUP,PLAY,TEARDOWN"); if (hc->hc_session) http_arg_set(&args, "Session", hc->hc_session); + pthread_mutex_lock(&hc->hc_fd_lock); http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, &args); + pthread_mutex_unlock(&hc->hc_fd_lock); http_arg_flush(&args); return 0; @@ -1283,9 +1292,11 @@ rtsp_process_describe(http_connection_t *hc) else snprintf(buf, sizeof(buf), "rtsp://%s", rtsp_ip); http_arg_set(&args, "Content-Base", buf); + pthread_mutex_lock(&hc->hc_fd_lock); http_send_header(hc, HTTP_STATUS_OK, "application/sdp", q.hq_size, NULL, NULL, 0, NULL, NULL, &args); tcp_write_queue(hc->hc_fd, &q); + pthread_mutex_unlock(&hc->hc_fd_lock); http_arg_flush(&args); htsbuf_queue_flush(&q); return 0; @@ -1321,7 +1332,7 @@ rtsp_process_play(http_connection_t *hc, int setup) if (errcode) goto error; - if (setup) { + if (setup && rs->rtp_peer_port != RTSP_TCP_DATA) { if (udp_bind_double(&rs->udp_rtp, &rs->udp_rtcp, "satips", "rtsp", "rtcp", rtsp_ip, 0, NULL, @@ -1358,7 +1369,9 @@ rtsp_process_play(http_connection_t *hc, int setup) pthread_mutex_unlock(&rtsp_lock); + pthread_mutex_lock(&hc->hc_fd_lock); http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, &args); + pthread_mutex_unlock(&hc->hc_fd_lock); goto end; @@ -1406,7 +1419,9 @@ rtsp_process_teardown(http_connection_t *hc) pthread_mutex_unlock(&rtsp_lock); http_arg_init(&args); http_arg_set(&args, "Session", session); + pthread_mutex_lock(&hc->hc_fd_lock); http_send_header(hc, HTTP_STATUS_OK, NULL, 0, NULL, NULL, 0, NULL, NULL, NULL); + pthread_mutex_unlock(&hc->hc_fd_lock); http_arg_flush(&args); } return 0; diff --git a/src/satip/server.c b/src/satip/server.c index 65c9ec011..1da69b3a0 100644 --- a/src/satip/server.c +++ b/src/satip/server.c @@ -204,10 +204,11 @@ satip_server_http_xml(http_connection_t *hc) snprintf(buf2, sizeof(buf2), "%d", srcs); http_arg_set(&args, "X-SATIP-Sources", buf2); } + pthread_mutex_lock(&hc->hc_fd_lock); http_send_header(hc, 200, "text/xml", strlen(buf), 0, NULL, 10, 0, NULL, &args); - http_arg_flush(&args); - tvh_write(hc->hc_fd, buf, strlen(buf)); + pthread_mutex_unlock(&hc->hc_fd_lock); + http_arg_flush(&args); return 0; #undef MSG diff --git a/src/satip/server.h b/src/satip/server.h index 38291ffcd..ea618d295 100644 --- a/src/satip/server.h +++ b/src/satip/server.h @@ -33,6 +33,8 @@ #define MUXCNF_KEEP 1 #define MUXCNF_REJECT 2 +#define RTSP_TCP_DATA 1000000 + struct satip_server_conf { idnode_t idnode; int satip_deviceid; @@ -59,6 +61,7 @@ extern const idclass_t satip_server_class; void satip_rtp_queue(void *id, th_subscription_t *subs, streaming_queue_t *sq, + pthread_mutex_t *tcp_lock, struct sockaddr_storage *peer, int port, int fd_rtp, int fd_rtcp, int frontend, int source, diff --git a/src/tcp.h b/src/tcp.h index 08b8cce5f..e449b9608 100644 --- a/src/tcp.h +++ b/src/tcp.h @@ -37,9 +37,11 @@ ((struct sockaddr_in6 *)&(storage))->sin6_port : \ ((struct sockaddr_in *)&(storage))->sin_port) #define IP_PORT_SET(storage, port) \ - if ((storage).ss_family == AF_INET6) \ + do { \ + if ((storage).ss_family == AF_INET6) \ ((struct sockaddr_in6 *)&(storage))->sin6_port = (port); else \ - ((struct sockaddr_in *)&(storage))->sin_port = (port); + ((struct sockaddr_in *)&(storage))->sin_port = (port); \ + } while (0) typedef struct tcp_server_ops { diff --git a/src/webui/webui.c b/src/webui/webui.c index 68a26946c..80411520c 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -252,6 +252,7 @@ page_static_file(http_connection_t *hc, const char *_remain, void *opaque) if (!gzip && fb_gzipped(fp)) gzip = "gzip"; + pthread_mutex_lock(&hc->hc_fd_lock); http_send_header(hc, 200, content, size, gzip, NULL, 10, 0, NULL, NULL); while (!fb_eof(fp)) { ssize_t c = fb_read(fp, buf, sizeof(buf)); @@ -264,6 +265,7 @@ page_static_file(http_connection_t *hc, const char *_remain, void *opaque) break; } } + pthread_mutex_unlock(&hc->hc_fd_lock); fb_close(fp); return ret; @@ -1355,8 +1357,10 @@ page_xspf(http_connection_t *hc, const char *remain, void *opaque) pthread_mutex_unlock(&global_lock); len = strlen(buf); + pthread_mutex_lock(&hc->hc_fd_lock); http_send_header(hc, 200, "application/xspf+xml", len, 0, NULL, 10, 0, NULL, NULL); tvh_write(hc->hc_fd, buf, len); + pthread_mutex_unlock(&hc->hc_fd_lock); free(hostpath); return 0; @@ -1396,8 +1400,10 @@ page_m3u(http_connection_t *hc, const char *remain, void *opaque) pthread_mutex_unlock(&global_lock); len = strlen(buf); + pthread_mutex_lock(&hc->hc_fd_lock); http_send_header(hc, 200, MIME_M3U, len, 0, NULL, 10, 0, NULL, NULL); tvh_write(hc->hc_fd, buf, len); + pthread_mutex_unlock(&hc->hc_fd_lock); free(hostpath); return 0; @@ -1608,6 +1614,7 @@ page_dvrfile(http_connection_t *hc, const char *remain, void *opaque) return HTTP_STATUS_NOT_ALLOWED; } + pthread_mutex_lock(&hc->hc_fd_lock); http_send_header(hc, range ? HTTP_STATUS_PARTIAL_CONTENT : HTTP_STATUS_OK, content, content_len, NULL, NULL, 10, range ? range_buf : NULL, disposition, NULL); @@ -1635,6 +1642,7 @@ page_dvrfile(http_connection_t *hc, const char *remain, void *opaque) } } } + pthread_mutex_unlock(&hc->hc_fd_lock); close(fd); pthread_mutex_lock(&global_lock); @@ -1689,6 +1697,7 @@ page_imagecache(http_connection_t *hc, const char *remain, void *opaque) return HTTP_STATUS_NOT_FOUND; } + pthread_mutex_lock(&hc->hc_fd_lock); http_send_header(hc, 200, NULL, st.st_size, 0, NULL, 10, 0, NULL, NULL); while (1) { @@ -1698,6 +1707,7 @@ page_imagecache(http_connection_t *hc, const char *remain, void *opaque) if (tvh_write(hc->hc_fd, buf, c)) break; } + pthread_mutex_unlock(&hc->hc_fd_lock); close(fd); return 0; @@ -1766,8 +1776,10 @@ http_redir(http_connection_t *hc, const char *remain, void *opaque) } } snprintf(buf, sizeof(buf), "tvh_locale={};tvh_locale_lang='';"); + pthread_mutex_lock(&hc->hc_fd_lock); http_send_header(hc, 200, "text/javascript; charset=UTF-8", strlen(buf), 0, NULL, 10, 0, NULL, NULL); tvh_write(hc->hc_fd, buf, strlen(buf)); + pthread_mutex_unlock(&hc->hc_fd_lock); return 0; } }