From: spdfrk Date: Sat, 13 Jun 2020 14:26:30 +0000 (+0200) Subject: iptv: new features for multicast, rtsp & rtcp X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d67fff914417955e4ab8e9fbc091576855425ae2;p=thirdparty%2Ftvheadend.git iptv: new features for multicast, rtsp & rtcp - Implement RTCP Negative Acknowledge (a.k.a. Retransmission) support for RTP streams. When packet loss is detected the client will send a RTCP Generic Feedback report to the server. The server can than resend these lost packets. Retransmitted packets are send through a second connection or as part of the main stream, both cases are supported. For Multicast manual setup of the RTCP server is required, for RTSP automatic setup (was already implemented for Receiver Reports) or manual override is possible. - General clean-up of unused RTCP code and restructure to allow for easy implementation of different types of RTCP messages. - Make RTCP Receiver Reports optional. - RTSP start session with DESCRIBE and parse response content. - RTSP DESCRIBE redirect support. - Parse DESCRIBE response for AVPF support (required for Retransmission). - Implement remote time shift support for RTSP streams. This option can be enabled for a channel to pass-through time shift commands to the RTSP server, the internal time shift buffer is then disabled. --- diff --git a/src/channels.c b/src/channels.c index 521a6e0f5..d06eacb23 100644 --- a/src/channels.c +++ b/src/channels.c @@ -542,6 +542,16 @@ const idclass_t channel_class = { .list = channel_class_epg_running_list, .opts = PO_EXPERT | PO_DOC_NLIST, }, +#if ENABLE_TIMESHIFT + { + .type = PT_BOOL, + .id = "remote_timeshift", + .name = N_("Remote timeshift"), + .desc = N_("Pass timeshift commands to a remote RTSP server"), + .off = offsetof(channel_t, ch_remote_timeshift), + .opts = PO_ADVANCED, + }, +#endif { .type = PT_STR, .islist = 1, diff --git a/src/channels.h b/src/channels.h index 7b377487d..268e2fa81 100644 --- a/src/channels.h +++ b/src/channels.h @@ -82,6 +82,7 @@ typedef struct channel int ch_dvr_extra_time_pre; int ch_dvr_extra_time_post; int ch_epg_running; + int ch_remote_timeshift; LIST_HEAD(, dvr_entry) ch_dvrs; LIST_HEAD(, dvr_autorec_entry) ch_autorecs; LIST_HEAD(, dvr_timerec_entry) ch_timerecs; diff --git a/src/htsp_server.c b/src/htsp_server.c index 6268cd61a..77cd8e71b 100644 --- a/src/htsp_server.c +++ b/src/htsp_server.c @@ -2520,10 +2520,14 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) profile_id = htsmsg_get_str(in, "profile"); #if ENABLE_TIMESHIFT - if (timeshift_conf.enabled) { - timeshiftPeriod = htsmsg_get_u32_or_default(in, "timeshiftPeriod", 0); - if (!timeshift_conf.unlimited_period) - timeshiftPeriod = MIN(timeshiftPeriod, timeshift_conf.max_period * 60); + if(ch->ch_remote_timeshift) { + timeshiftPeriod = ~0; + } else { + if (timeshift_conf.enabled) { + timeshiftPeriod = htsmsg_get_u32_or_default(in, "timeshiftPeriod", 0); + if (!timeshift_conf.unlimited_period) + timeshiftPeriod = MIN(timeshiftPeriod, timeshift_conf.max_period * 60); + } } #endif @@ -2541,18 +2545,24 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) streaming_target_init(&hs->hs_input, &htsp_streaming_input_ops, hs, 0); #if ENABLE_TIMESHIFT - if (timeshiftPeriod != 0) { - if (timeshiftPeriod == ~0) - tvhdebug(LS_HTSP, "using timeshift buffer (unlimited)"); - else - tvhdebug(LS_HTSP, "using timeshift buffer (%u mins)", timeshiftPeriod / 60); + if (ch->ch_remote_timeshift) { + tvhdebug(LS_HTSP, "using remote timeshift (RTSP)"); + } else { + if (timeshiftPeriod != 0) { + if (timeshiftPeriod == ~0) + tvhdebug(LS_HTSP, "using timeshift buffer (unlimited)"); + else + tvhdebug(LS_HTSP, "using timeshift buffer (%u mins)", + timeshiftPeriod / 60); + } } #endif pro = profile_find_by_list(htsp->htsp_granted_access->aa_profiles, profile_id, "htsp", SUBSCRIPTION_PACKET | SUBSCRIPTION_HTSP); profile_chain_init(&hs->hs_prch, pro, ch, 1); - if (profile_chain_work(&hs->hs_prch, &hs->hs_input, timeshiftPeriod, 0)) { + if (profile_chain_work(&hs->hs_prch, &hs->hs_input, timeshiftPeriod, ch->ch_remote_timeshift ? + PROFILE_WORK_REMOTE_TS : PROFILE_WORK_NONE)) { tvherror(LS_HTSP, "unable to create profile chain '%s'", profile_get_name(pro)); profile_chain_close(&hs->hs_prch); free(hs); @@ -2576,8 +2586,12 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) htsmsg_add_u32(rep, "weight", hs->hs_s->ths_weight >= 0 ? hs->hs_s->ths_weight : 0); #if ENABLE_TIMESHIFT - if(timeshiftPeriod) + if (ch->ch_remote_timeshift) { htsmsg_add_u32(rep, "timeshiftPeriod", timeshiftPeriod); + } else { + if (timeshiftPeriod) + htsmsg_add_u32(rep, "timeshiftPeriod", timeshiftPeriod); + } #endif htsp_reply(htsp, in, rep); diff --git a/src/http.h b/src/http.h index 43fb7103f..36bf7bd26 100644 --- a/src/http.h +++ b/src/http.h @@ -426,11 +426,16 @@ struct http_client { int hc_rtcp_tcp; int hc_rtcp_server_port; int hc_rtp_multicast:1; + int hc_rtp_avpf:1; long hc_rtsp_stream_id; int hc_rtp_timeout; char *hc_rtsp_user; char *hc_rtsp_pass; char hc_rtsp_keep_alive_cmd; + time_t hc_rtsp_stream_start; + time_t hc_rtsp_range_start; + time_t hc_rtsp_range_end; + float hc_rtsp_scale; struct http_client_ssl *hc_ssl; /* ssl internals */ @@ -489,12 +494,15 @@ rtsp_send( http_client_t *hc, http_cmd_t cmd, const char *path, void rtsp_clear_session( http_client_t *hc ); -int rtsp_options_decode( http_client_t *hc ); static inline int rtsp_options( http_client_t *hc ) { return rtsp_send(hc, RTSP_CMD_OPTIONS, NULL, NULL, NULL); } -int rtsp_setup_decode( http_client_t *hc, int satip ); +static inline int +rtsp_describe( http_client_t *hc, const char *path, const char *query ) { + return rtsp_send(hc, RTSP_CMD_DESCRIBE, path, query, NULL); +} + int rtsp_setup( http_client_t *hc, const char *path, const char *query, const char *multicast_addr, int rtp_port, int rtcp_port ); @@ -515,10 +523,15 @@ rtsp_teardown( http_client_t *hc, const char *path, const char *query ) { int rtsp_get_parameter( http_client_t *hc, const char *parameter ); -int rtsp_describe_decode( http_client_t *hc ); -static inline int -rtsp_describe( http_client_t *hc, const char *path, const char *query ) { - return rtsp_send(hc, RTSP_CMD_DESCRIBE, path, query, NULL); -} +int rtsp_set_speed( http_client_t *hc, float speed ); + +int rtsp_set_position( http_client_t *hc, time_t position ); + +int rtsp_describe_decode( http_client_t *hc, const char *buf, size_t len ); + +int rtsp_setup_decode( http_client_t *hc, int satip ); + +int rtsp_options_decode( http_client_t *hc ); +int rtsp_play_decode( http_client_t *hc ); #endif /* HTTP_H_ */ diff --git a/src/input/mpegts/iptv/iptv.c b/src/input/mpegts/iptv/iptv.c index 78341d159..c51a5c1ab 100644 --- a/src/input/mpegts/iptv/iptv.c +++ b/src/input/mpegts/iptv/iptv.c @@ -411,8 +411,8 @@ iptv_input_close_fds ( iptv_input_t *mi, iptv_mux_t *im ) { iptv_thread_pool_t *pool = mi->mi_tpool; - if (im->mm_iptv_fd > 0 || im->mm_iptv_fd2 > 0) - tvhtrace(LS_IPTV, "iptv_input_close_fds %d %d", im->mm_iptv_fd, im->mm_iptv_fd2); + if (im->mm_iptv_fd > 0 || im->im_rtcp_info.connection_fd > 0) + tvhtrace(LS_IPTV, "iptv_input_close_fds %d %d", im->mm_iptv_fd, im->im_rtcp_info.connection_fd); /* Close file */ if (im->mm_iptv_fd > 0) { @@ -423,11 +423,11 @@ iptv_input_close_fds ( iptv_input_t *mi, iptv_mux_t *im ) } /* Close file2 */ - if (im->mm_iptv_fd2 > 0) { - tvhpoll_rem1(pool->poll, im->mm_iptv_fd2); - udp_close(im->mm_iptv_connection2); - im->mm_iptv_connection2 = NULL; - im->mm_iptv_fd2 = -1; + if (im->im_rtcp_info.connection_fd > 0) { + tvhpoll_rem1(pool->poll, im->im_rtcp_info.connection_fd); + udp_close(im->im_rtcp_info.connection); + im->im_rtcp_info.connection = NULL; + im->im_rtcp_info.connection_fd = -1; } } @@ -676,12 +676,12 @@ iptv_input_fd_started ( iptv_input_t *mi, iptv_mux_t *im ) } /* Setup poll2 */ - if (im->mm_iptv_fd2 > 0) { + if (im->im_rtcp_info.connection_fd > 0) { /* Error? */ - if (tvhpoll_add1(tpool->poll, im->mm_iptv_fd2, TVHPOLL_IN, im) < 0) { + if (tvhpoll_add1(tpool->poll, im->im_rtcp_info.connection_fd, TVHPOLL_IN, im) < 0) { tvherror(LS_IPTV, "%s - failed to add to poll q (2)", im->mm_nicename); - close(im->mm_iptv_fd2); - im->mm_iptv_fd2 = -1; + close(im->im_rtcp_info.connection_fd); + im->im_rtcp_info.connection_fd = -1; return -1; } } diff --git a/src/input/mpegts/iptv/iptv_mux.c b/src/input/mpegts/iptv/iptv_mux.c index ca1d56f69..33b041b91 100644 --- a/src/input/mpegts/iptv/iptv_mux.c +++ b/src/input/mpegts/iptv/iptv_mux.c @@ -72,7 +72,7 @@ iptv_url_set ( char **url, char **sane_url, const char *str, int allow_file, int } return 0; -} +} static int iptv_mux_url_set ( void *p, const void *v ) @@ -81,6 +81,13 @@ iptv_mux_url_set ( void *p, const void *v ) return iptv_url_set(&im->mm_iptv_url, &im->mm_iptv_url_sane, v, 1, 1); } +static int +iptv_mux_ret_url_set ( void *p, const void *v ) +{ + iptv_mux_t *im = p; + return iptv_url_set(&im->mm_iptv_ret_url, &im->mm_iptv_ret_url_sane, v, 0, 0); +} + #if ENABLE_LIBAV static htsmsg_t * iptv_mux_libav_enum ( void *o, const char *lang ) @@ -124,6 +131,24 @@ const idclass_t iptv_mux_class = .set = iptv_mux_url_set, .opts = PO_MULTILINE }, + { + .type = PT_BOOL, + .id = "iptv_send_reports", + .name = N_("Send RTCP status reports"), + .off = offsetof(iptv_mux_t, mm_iptv_send_reports), + .opts = PO_ADVANCED + }, + { + .type = PT_STR, + .id = "iptv_ret_url", + .name = N_("Retransmission URL"), + .desc = N_("Manually setup a retransmission URL for Multicast streams." + " For RTSP streams this URL is automatically setup" + " if this value is not set."), + .off = offsetof(iptv_mux_t, mm_iptv_ret_url), + .set = iptv_mux_ret_url_set, + .opts = PO_ADVANCED + }, { .type = PT_STR, .id = "iptv_url_cmpid", @@ -305,6 +330,10 @@ iptv_mux_free ( mpegts_mux_t *mm ) free(im->mm_iptv_url_sane); free(im->mm_iptv_url_raw); free(im->mm_iptv_url_cmpid); + free(im->mm_iptv_ret_url); + free(im->mm_iptv_ret_url_sane); + free(im->mm_iptv_ret_url_raw); + free(im->mm_iptv_ret_url_cmpid); free(im->mm_iptv_muxname); free(im->mm_iptv_interface); free(im->mm_iptv_svcname); diff --git a/src/input/mpegts/iptv/iptv_private.h b/src/input/mpegts/iptv/iptv_private.h index a922ead11..b854bdece 100644 --- a/src/input/mpegts/iptv/iptv_private.h +++ b/src/input/mpegts/iptv/iptv_private.h @@ -25,8 +25,9 @@ #include "url.h" #include "udp.h" #include "tvhpoll.h" +#include "profile.h" -#define IPTV_BUF_SIZE (300*188) +#define IPTV_BUF_SIZE (2000*188) #define IPTV_PKTS 32 #define IPTV_PKT_PAYLOAD 1472 @@ -104,6 +105,30 @@ struct iptv_network iptv_network_t *iptv_network_create0 ( const char *uuid, htsmsg_t *conf, const idclass_t *idc ); +typedef struct { + /* Last transmitted packet timestamp */ + time_t last_ts; + /* Next scheduled packet sending timestamp */ + time_t next_ts; + + double average_packet_size; + + int members; + int senders; + + uint16_t last_received_sequence; + uint16_t ce_cnt; + uint16_t sequence_cycle; + uint16_t nak_req_limit; + + /* Connection to the RTCP remote */ + udp_connection_t *connection; + int connection_fd; + + uint32_t source_ssrc; + uint32_t my_ssrc; +} rtcp_t; + struct iptv_mux { mpegts_mux_t; @@ -112,14 +137,17 @@ struct iptv_mux int mm_iptv_streaming_priority; int mm_iptv_fd; udp_connection_t *mm_iptv_connection; - int mm_iptv_fd2; - udp_connection_t *mm_iptv_connection2; char *mm_iptv_url; char *mm_iptv_url_sane; char *mm_iptv_url_raw; char *mm_iptv_url_cmpid; + char *mm_iptv_ret_url; + char *mm_iptv_ret_url_sane; + char *mm_iptv_ret_url_raw; + char *mm_iptv_ret_url_cmpid; char *mm_iptv_interface; + int mm_iptv_send_reports; int mm_iptv_substitute; int mm_iptv_libav; int mm_iptv_atsc; @@ -160,6 +188,13 @@ struct iptv_mux int im_delete_flag; void *im_opaque; + + udp_multirecv_t im_um1; + udp_multirecv_t im_um2; + char im_use_retransmission; + sbuf_t im_temp_buffer; + char im_is_ce_detected; + rtcp_t im_rtcp_info; }; iptv_mux_t* iptv_mux_create0 @@ -198,11 +233,27 @@ void iptv_pipe_init ( void ); void iptv_file_init ( void ); void iptv_libav_init ( void ); -ssize_t iptv_rtp_read ( iptv_mux_t *im, udp_multirecv_t *um, - void (*pkt_cb)(iptv_mux_t *im, uint8_t *buf, int len) ); +ssize_t iptv_rtp_read(iptv_mux_t *im, void (*pkt_cb)(iptv_mux_t *im, uint8_t *buf, int len)); void iptv_input_unpause ( void *aux ); +struct rtsp_st { + // Note: input MUST BE FIRST in struct + streaming_target_t input; ///< Input source + streaming_target_t *output; ///< Output dest + streaming_target_t *tsfix; + pthread_t st_thread; + volatile int run; + volatile int rtsp_input_start; + iptv_mux_t *im; +}; + +typedef struct rtsp_st rtsp_st_t; +#if ENABLE_TIMESHIFT +void *rtsp_status_thread(void *p) ; +streaming_target_t* rtsp_st_create(streaming_target_t *out, profile_chain_t *prch); +void rtsp_st_destroy(streaming_target_t *st); +#endif #endif /* __IPTV_PRIVATE_H__ */ /****************************************************************************** diff --git a/src/input/mpegts/iptv/iptv_rtcp.c b/src/input/mpegts/iptv/iptv_rtcp.c index 702b78b6d..2b37da005 100644 --- a/src/input/mpegts/iptv/iptv_rtcp.c +++ b/src/input/mpegts/iptv/iptv_rtcp.c @@ -193,31 +193,31 @@ rtcp_interval(int members, int senders, double rtcp_bw, int we_sent, double avg_ Version and padding are set to fixed values, i.e. 2 and 0; */ static void -rtcp_append_headers(sbuf_t *buffer, rtcp_t *packet) +rtcp_append_headers(sbuf_t *buffer, rtcp_header_t *packet) { - packet->common.version = 2; - packet->common.p = 0; - uint8_t byte = 0; - byte |= packet->common.version << 6; - byte |= packet->common.p << 5; - byte |= packet->common.count; + packet->version = 2; + packet->p = 0; + + byte |= packet->version << 6; + byte |= packet->p << 5; + byte |= packet->count; sbuf_put_byte(buffer, byte); - byte = packet->common.pt; + byte = packet->pt; sbuf_put_byte(buffer, byte); - sbuf_append(buffer, &packet->common.length, sizeof(packet->common.length)); + sbuf_append(buffer, &packet->length, sizeof(packet->length)); } /* Append RTCP receiver report data to the buffer. */ static void -rtcp_append_rr(sbuf_t *buffer, rtcp_t *packet) +rtcp_append_rr(sbuf_t *buffer, rtcp_rr_t *packet) { uint8_t byte = 0; - rtcp_rr_t report = packet->r.rr.rr[0]; + rtcp_rr_block_t report = packet->rr[0]; - sbuf_append(buffer, &packet->r.rr.ssrc, sizeof(packet->r.rr.ssrc)); + sbuf_append(buffer, &packet->ssrc, sizeof(packet->ssrc)); sbuf_append(buffer, &report.ssrc, sizeof(report.ssrc)); byte = report.fraction; sbuf_put_byte(buffer, byte); @@ -236,11 +236,23 @@ rtcp_append_rr(sbuf_t *buffer, rtcp_t *packet) sbuf_append(buffer, &report.dlsr, sizeof(report.dlsr)); } +/* + Append RTCP NAK data to the buffer. + */ +static void +rtcp_append_nak(sbuf_t *buffer, rtcp_gf_t *packet) +{ + sbuf_append(buffer, &packet->my_ssrc, sizeof(packet->my_ssrc)); + sbuf_append(buffer, &packet->ssrc, sizeof(packet->ssrc)); + sbuf_append(buffer, &packet->pid, sizeof(packet->pid)); + sbuf_append(buffer, &packet->blp, sizeof(packet->blp)); +} + /* Just send the buffer to the host in the rtcp_info. */ static void -rtcp_send(iptv_rtcp_info_t *info, sbuf_t *buffer) +rtcp_send(rtcp_t *info, sbuf_t *buffer) { tvhdebug(LS_IPTV, "RTCP: Sending receiver report"); // We don't care of the result right now @@ -252,10 +264,11 @@ rtcp_send(iptv_rtcp_info_t *info, sbuf_t *buffer) It uses the actual informations stored in rtcp_info. */ static void -rtcp_send_rr(iptv_rtcp_info_t *info) +rtcp_send_rr(rtcp_t *info) { - rtcp_rr_t report; - + rtcp_rr_block_t report; + rtcp_header_t header; + rtcp_rr_t packet; report.ssrc = htonl(info->source_ssrc); // Fill in the extended last sequence @@ -277,18 +290,17 @@ rtcp_send_rr(iptv_rtcp_info_t *info) report.jitter = htonl(12); // Build the full packet - rtcp_t packet; - packet.common.pt = RTCP_RR; - packet.common.count = 1; + header.pt = RTCP_RR; + header.count = 1; // TODO : set the real length - packet.common.length = htons(7); - packet.r.rr.ssrc = htonl(info->my_ssrc); - packet.r.rr.rr[0] = report; + header.length = htons(7); + packet.ssrc = htonl(info->my_ssrc); + packet.rr[0] = report; // Build the network packet sbuf_t network_buffer; sbuf_init(&network_buffer); - rtcp_append_headers(&network_buffer, &packet); + rtcp_append_headers(&network_buffer, &header); rtcp_append_rr(&network_buffer, &packet); // Send it @@ -300,12 +312,115 @@ rtcp_send_rr(iptv_rtcp_info_t *info) sbuf_free(&network_buffer); } -int -rtcp_init(iptv_rtcp_info_t * info) +ssize_t +rtcp_send_nak(rtcp_t *rtcp_info, uint32_t ssrc, uint16_t seqn, uint16_t len) +{ + rtcp_header_t rtcp_header; + rtcp_gf_t rtcp_data; + sbuf_t network_buffer; + uint32_t n; + uint16_t blp = 0; + udp_connection_t *uc = rtcp_info->connection; + + if (len > rtcp_info->nak_req_limit) { + seqn += len - rtcp_info->nak_req_limit; + len = rtcp_info->nak_req_limit; + } + tvhinfo(LS_IPTV, + "RTCP: Sending NAK report for SSRC 0x%x, missing: %d, following packets: %d", + ssrc, seqn, len - 1); + + rtcp_info->last_received_sequence = seqn; + rtcp_info->ce_cnt = len; + + // Build the full packet + rtcp_header.version = 2; + rtcp_header.p = 0; + rtcp_header.count = 1; // Generic NAK + rtcp_header.pt = RTCP_GF; + rtcp_header.length = htons(3); + rtcp_data.my_ssrc = 0; + rtcp_data.ssrc = htonl(ssrc); + + while (len > 0) { + len--; + rtcp_data.pid = htons(seqn); + if (len > 16) { + blp = 0xffff; + len -= 16; + seqn += 17; + } else { + blp = 0; + for (n = 0; n < len; n++) { + blp |= 1 << n; + } + len = 0; + } + rtcp_data.blp = htons(blp); + + // Build the network packet + sbuf_init(&network_buffer); + rtcp_append_headers(&network_buffer, &rtcp_header); + rtcp_append_nak(&network_buffer, &rtcp_data); + + // Send it + n = udp_write(uc, network_buffer.sb_data, network_buffer.sb_ptr, &uc->peer); + if (n) { + tvhwarn(LS_IPTV, + "RTCP: Sending NAK report for SSRC 0x%x failed, no data send %d %d", + ssrc, n, (uint32_t )sizeof(network_buffer)); + } + // Cleanup + sbuf_free(&network_buffer); + } + return 0; +} + +int +rtcp_connect(rtcp_t * info, char *url, char *host, int port, char *interface, char *nicename) { - uint32_t rnd; + udp_connection_t *rtcp_conn; + url_t rtcp_url; + + if (info->connection == NULL) { + rtcp_conn = udp_bind(LS_IPTV, nicename, NULL, 0, NULL, interface, + IPTV_BUF_SIZE, 1024); + if (rtcp_conn == NULL || rtcp_conn == UDP_FATAL_ERROR) { + tvhwarn(LS_IPTV, "%s - Unable to bind, RTCP won't be available", + nicename); + return -1; + } + info->connection_fd = rtcp_conn->fd; + info->connection = rtcp_conn; + } + urlinit(&rtcp_url); + if (host == NULL) { + if (urlparse(url ? : "", &rtcp_url)) { + tvhwarn(LS_IPTV, "%s - invalid RTCP URL, should be rtp://HOST:PORT [%s]", + nicename, url); + goto fail; + } + host = rtcp_url.host; + port = rtcp_url.port; + } + + if (udp_connect(info->connection, "rtcp", host, port)) { + tvhwarn(LS_IPTV, "%s - Unable to connect, RTCP won't be available", + nicename); + goto fail; + } + urlreset(&rtcp_url); + return 0; +fail: + urlreset(&rtcp_url); + return -1; +} + +int +rtcp_init(rtcp_t * info) +{ info->last_ts = 0; info->next_ts = 0; info->members = 2; @@ -314,17 +429,14 @@ rtcp_init(iptv_rtcp_info_t * info) info->sequence_cycle = 1; info->source_ssrc = 0; info->average_packet_size = 52; - - // Fill my SSRC - uuid_random((uint8_t *)&rnd, sizeof(random)); - info->my_ssrc = rnd; - srand48(rnd * 0x4232a9b9); + info->my_ssrc = 0; // Since we are not transmitting, set this to 0 + info->nak_req_limit = 128; // This appears to be a safe limit return 0; } int -rtcp_destroy(iptv_rtcp_info_t *info) +rtcp_destroy(rtcp_t *info) { return 0; } @@ -333,7 +445,7 @@ rtcp_destroy(iptv_rtcp_info_t *info) * Buffer is a raw RTP buffer */ int -rtcp_receiver_update(iptv_rtcp_info_t *info, uint8_t *buffer) +rtcp_receiver_update(rtcp_t *info, uint8_t *buffer) { union { uint8_t bytes[2]; diff --git a/src/input/mpegts/iptv/iptv_rtcp.h b/src/input/mpegts/iptv/iptv_rtcp.h index 8070518ef..7a166a76f 100644 --- a/src/input/mpegts/iptv/iptv_rtcp.h +++ b/src/input/mpegts/iptv/iptv_rtcp.h @@ -36,7 +36,8 @@ typedef enum RTCP_RR = 201, RTCP_SDES = 202, RTCP_BYE = 203, - RTCP_APP = 204 + RTCP_APP = 204, + RTCP_GF = 205 } rtcp_type_t; typedef enum @@ -62,7 +63,7 @@ typedef struct unsigned int count : 5; /* varies by packet type */ unsigned int pt : 8; /* RTCP packet type */ uint16_t length; /* pkt len in words, w/o this word */ -} rtcp_common_t; +} rtcp_header_t; /* @@ -84,7 +85,20 @@ typedef struct uint32_t jitter; /* interarrival jitter */ uint32_t lsr; /* last SR packet from this source */ uint32_t dlsr; /* delay since last SR packet */ -} rtcp_rr_t; +} rtcp_rr_block_t; + + + +/* + * Generic RTC Feedback block + */ +typedef struct +{ + uint32_t my_ssrc; /* not used */ + uint32_t ssrc; /* data source being reported */ + uint16_t pid; /* first missing frame id */ + uint16_t blp; /* missing frame count */ +} rtcp_gf_t; /* * SDES item @@ -103,83 +117,32 @@ typedef struct rtcp_sdes } rtcp_sdes_t; /* - * One RTCP packet + * reception report (RR) */ typedef struct { - rtcp_common_t common; /* common header */ - - union - { - - /* sender report (SR) */ - struct - { - uint32_t ssrc; /* sender generating this report */ - uint32_t ntp_sec; /* NTP timestamp */ - uint32_t ntp_frac; - uint32_t rtp_ts; /* RTP timestamp */ - uint32_t psent; /* packets sent */ - uint32_t osent; /* octets sent */ - rtcp_rr_t rr[1]; /* variable-length list */ - } sr; - - /* reception report (RR) */ - struct - { - uint32_t ssrc; /* receiver generating this report */ - rtcp_rr_t rr[1]; /* variable-length list */ - } rr; - - /* source description (SDES) */ - rtcp_sdes_t sdes; - - /* BYE */ - struct - { - uint32_t src[1]; /* list of sources */ - /* can't express trailing text for reason */ - } bye; - } r; -} rtcp_t; - -typedef struct iptv_rtcp_info { - /* Last transmitted packet timestamp */ - time_t last_ts; - /* Next scheduled packet sending timestamp */ - time_t next_ts; - - double average_packet_size; - - int members; - int senders; - - uint16_t last_received_sequence; - uint16_t sequence_cycle; - - /* Connection to the RTCP remote. Initialized by the RTSP code. */ - udp_connection_t *connection; - - uint32_t source_ssrc; - uint32_t my_ssrc; -} iptv_rtcp_info_t; + uint32_t ssrc; /* receiver generating this report */ + rtcp_rr_block_t rr[1]; /* variable-length list */ +} rtcp_rr_t; /* Init rtcp_info field of the rtsp_info. Return 0 if everything was OK. rtcp_destroy must be called when rtsp_info is destroyed. */ -int rtcp_init(iptv_rtcp_info_t *info); +int rtcp_init(rtcp_t *info); /* Destroy rtcp_info field of rtsp_info. */ -int rtcp_destroy(iptv_rtcp_info_t *info); +int rtcp_destroy(rtcp_t *info); /* Update RTCP informations. It can also send a RTCP RR packet if the timer has expired. */ -int rtcp_receiver_update(iptv_rtcp_info_t *info, uint8_t *rtp_packet); +int rtcp_receiver_update(rtcp_t *info, uint8_t *rtp_packet); +ssize_t rtcp_send_nak(rtcp_t *rtcp_info, uint32_t ssrc, uint16_t seqn, uint16_t len); +int rtcp_connect(rtcp_t * info, char *url, char *host, int port, char *interface, char *nicename); #endif /* IPTV_RTCP_H */ diff --git a/src/input/mpegts/iptv/iptv_rtsp.c b/src/input/mpegts/iptv/iptv_rtsp.c index 860e64f47..9d1235d8a 100644 --- a/src/input/mpegts/iptv/iptv_rtsp.c +++ b/src/input/mpegts/iptv/iptv_rtsp.c @@ -27,12 +27,14 @@ typedef struct { http_client_t *hc; - udp_multirecv_t um; char *path; char *query; mtimer_t alive_timer; int play; - iptv_rtcp_info_t * rtcp_info; + time_t start_position; + time_t range_start; + time_t range_end; + time_t position; } rtsp_priv_t; /* @@ -56,7 +58,10 @@ iptv_rtsp_alive_cb ( void *aux ) rtsp_get_parameter(rp->hc, "position"); else if(rp->hc->hc_rtsp_keep_alive_cmd == RTSP_CMD_OPTIONS) rtsp_send(rp->hc, RTSP_CMD_OPTIONS, rp->path, rp->query, NULL); - else + else if(rp->hc->hc_rtsp_keep_alive_cmd == RTSP_CMD_DESCRIBE) { + rtsp_send(rp->hc, RTSP_CMD_DESCRIBE, rp->path, rp->query, NULL); + rtsp_get_parameter(rp->hc, "position"); + } else return; mtimer_arm_rel(&rp->alive_timer, iptv_rtsp_alive_cb, im, sec2mono(MAX(1, (rp->hc->hc_rtp_timeout / 2) - 1))); @@ -70,7 +75,9 @@ iptv_rtsp_header ( http_client_t *hc ) { iptv_mux_t *im = hc->hc_aux; rtsp_priv_t *rp; + url_t url; int r; + char *p; if (im == NULL) { /* teardown (or teardown timeout) */ @@ -89,17 +96,67 @@ iptv_rtsp_header ( http_client_t *hc ) return 0; } - if (hc->hc_code != HTTP_STATUS_OK) { + if (hc->hc_cmd != RTSP_CMD_DESCRIBE && hc->hc_code != HTTP_STATUS_OK) { tvherror(LS_IPTV, "invalid error code %d for '%s'", hc->hc_code, im->mm_iptv_url_raw); return 0; } + if (hc->hc_cmd == RTSP_CMD_DESCRIBE && hc->hc_code != HTTP_STATUS_OK && hc->hc_code != HTTP_STATUS_SEE_OTHER) { + tvherror(LS_IPTV, "DESCRIBE request returned an invalid error code (%d) for '%s', " + "fall back to GET_PARAMETER in keep alive loop.", hc->hc_code, im->mm_iptv_url_raw); + hc->hc_rtsp_keep_alive_cmd = RTSP_CMD_GET_PARAMETER; + return 0; + } + rp = im->im_data; + if(rp == NULL) + return 0; switch (hc->hc_cmd) { + case RTSP_CMD_DESCRIBE: + if(rp->play) { + // Already active, most probably a keep-alive response + break; + } + if (hc->hc_code == HTTP_STATUS_SEE_OTHER) { + if (!hc->hc_handle_location) { + tvherror(LS_IPTV, "received code 303 from RTSP server but redirects disabled '%s'", + im->mm_iptv_url_raw); + return -1; + } + // Redirect from RTSP server, parse new location and use that instead + p = http_arg_get(&hc->hc_args, "Location"); + if (p == NULL) { + tvherror(LS_IPTV, "received code 303 from RTSP server but no new location given for '%s'", + im->mm_iptv_url_raw); + return -1; + } + tvhinfo(LS_IPTV, "received new location from RTSP server '%s' was '%s'", p, + im->mm_iptv_url_raw); + urlinit(&url); + if (urlparse(p, &url) || strncmp(url.scheme, "rtsp", 4) != 0) { + tvherror(LS_IPTV, "%s - invalid URL [%s]", im->mm_nicename, p); + return -1; + } + if(rp->path) + free(rp->path); + if(rp->query) + free(rp->query); + rp->path = strdup(url.path ? : ""); + rp->query = strdup(url.query ? : ""); + urlreset(&url); + r = rtsp_describe(hc, rp->path, rp->query); + if (r < 0) { + tvherror(LS_IPTV, "rtsp: DESCRIBE failed"); + return -1; + } + } + break; case RTSP_CMD_SETUP: r = rtsp_setup_decode(hc, 0); if (r >= 0) { + if(hc->hc_rtp_timeout > 20) + hc->hc_rtp_timeout = 20; rtsp_play(hc, rp->path, rp->query); rp->play = 1; } @@ -107,8 +164,24 @@ iptv_rtsp_header ( http_client_t *hc ) case RTSP_CMD_PLAY: // Now let's set peer port for RTCP // Use the HTTP host for sending RTCP reports, NOT the hc_rtp_dest (which is where the stream is sent) - if (udp_connect(rp->rtcp_info->connection, "rtcp", hc->hc_host, hc->hc_rtcp_server_port)) { - tvhwarn(LS_RTSP, "Can't connect to remote, RTCP receiver reports won't be sent"); + if (im->mm_iptv_ret_url) { + if (rtcp_connect(&im->im_rtcp_info, im->mm_iptv_ret_url, NULL, 0, + im->mm_iptv_interface, im->mm_nicename) == 0) { + im->im_use_retransmission = 1; + } + } else if (rtcp_connect(&im->im_rtcp_info, NULL, hc->hc_host, + hc->hc_rtcp_server_port, im->mm_iptv_interface, im->mm_nicename) == 0) { + im->im_use_retransmission = 1; + } + if (rp->start_position == 0) { + if (rtsp_play_decode(hc) == 0) + rp->position = rp->start_position = hc->hc_rtsp_stream_start; + else + rp->position = rp->start_position = time(NULL); + } else if (rtsp_play_decode(hc) == 0) { + rp->position = hc->hc_rtsp_stream_start; + tvhdebug(LS_IPTV, "rtsp: position update: %" PRItime_t, + hc->hc_rtsp_stream_start); } hc->hc_cmd = HTTP_CMD_NONE; tvh_mutex_lock(&global_lock); @@ -132,14 +205,61 @@ static int iptv_rtsp_data ( http_client_t *hc, void *buf, size_t len ) { + rtsp_priv_t *rp; iptv_mux_t *im = hc->hc_aux; + int r; if (im == NULL) return 0; - if (len > 0) - tvherror(LS_IPTV, "unknown data %zd received for '%s'", len, im->mm_iptv_url_raw); + if (hc->hc_code != HTTP_STATUS_OK) + return 0; + rp = (rtsp_priv_t*) im->im_data; + + switch (hc->hc_cmd) { + case RTSP_CMD_DESCRIBE: + if (rp == NULL) + break; + if (rtsp_describe_decode(hc, buf, len) >= 0) { + if(rp->range_start == 0) + rp->range_start = hc->hc_rtsp_range_start; + rp->range_end = hc->hc_rtsp_range_end; + tvhdebug(LS_IPTV, "rtsp: buffer update, start: %" PRItime_t ", end: %" PRItime_t, + rp->range_start, rp->range_end); + } + if(rp->play) { + // Already active, most probably a keep-alive response + break; + } + r = rtsp_setup(hc, rp->path, rp->query, NULL, + ntohs(IP_PORT(im->mm_iptv_connection->ip)), + ntohs(IP_PORT(im->im_rtcp_info.connection->ip))); + if (r < 0) { + udp_close(im->im_rtcp_info.connection); + udp_close(im->mm_iptv_connection); + http_client_close(hc); + return -1; + } + udp_multirecv_init(&im->im_um1, IPTV_PKTS, IPTV_PKT_PAYLOAD); + udp_multirecv_init(&im->im_um2, IPTV_PKTS, IPTV_PKT_PAYLOAD); + sbuf_alloc_(&im->im_temp_buffer, IPTV_BUF_SIZE); + break; + case RTSP_CMD_GET_PARAMETER: + if (rp == NULL) + break; + // Generic position update + if (strncmp(buf, "position", 8) == 0) { + rp->position = strtoumax(buf + 10, NULL, 10); + tvhdebug(LS_IPTV, "rtsp: position update: %" PRItime_t, rp->position); + } + break; + default: + if (len > 0) { + tvherror(LS_IPTV, "unknown data %zd received for '%s':\n%s", len, + im->mm_iptv_url_raw, (char* )buf); + } + } return 0; } @@ -175,33 +295,26 @@ iptv_rtsp_start hc->hc_hdr_received = iptv_rtsp_header; hc->hc_data_received = iptv_rtsp_data; hc->hc_handle_location = 1; /* allow redirects */ - hc->hc_rtsp_keep_alive_cmd = RTSP_CMD_GET_PARAMETER; /* start keep alive loop with GET_PARAMETER */ + hc->hc_rtsp_keep_alive_cmd = RTSP_CMD_DESCRIBE; /* start keep alive loop with DESCRIBE */ http_client_register(hc); /* register to the HTTP thread */ - r = rtsp_setup(hc, u->path, u->query, NULL, - ntohs(IP_PORT(rtp->ip)), - ntohs(IP_PORT(rtcp->ip))); + r = rtsp_describe(hc, u->path, u->query); if (r < 0) { - udp_close(rtcp); - udp_close(rtp); - http_client_close(hc); + tvherror(LS_IPTV, "rtsp: DESCRIBE failed"); return SM_CODE_TUNING_FAILED; } - rp = calloc(1, sizeof(*rp)); - rp->rtcp_info = calloc(1, sizeof(iptv_rtcp_info_t)); - rtcp_init(rp->rtcp_info); - rp->rtcp_info->connection = rtcp; + rtcp_init(&im->im_rtcp_info); + im->im_rtcp_info.connection = rtcp; rp->hc = hc; - udp_multirecv_init(&rp->um, IPTV_PKTS, IPTV_PKT_PAYLOAD); rp->path = strdup(u->path ?: ""); rp->query = strdup(u->query ?: ""); im->im_data = rp; im->mm_iptv_fd = rtp->fd; im->mm_iptv_connection = rtp; - im->mm_iptv_fd2 = rtcp->fd; - im->mm_iptv_connection2 = rtcp; - + im->im_rtcp_info.connection_fd = rtcp->fd; + im->im_rtcp_info.connection = rtcp; + im->mm_iptv_rtp_seq = -1; return 0; } @@ -227,13 +340,13 @@ iptv_rtsp_stop rtsp_teardown(rp->hc, rp->path, ""); tvh_mutex_unlock(&iptv_lock); mtimer_disarm(&rp->alive_timer); - udp_multirecv_free(&rp->um); + udp_multirecv_free(&im->im_um1); + udp_multirecv_free(&im->im_um2); if (!play) http_client_close(rp->hc); free(rp->path); free(rp->query); - rtcp_destroy(rp->rtcp_info); - free(rp->rtcp_info); + rtcp_destroy(&im->im_rtcp_info); free(rp); tvh_mutex_lock(&iptv_lock); } @@ -241,8 +354,7 @@ iptv_rtsp_stop static void iptv_rtp_header_callback ( iptv_mux_t *im, uint8_t *rtp, int len ) { - rtsp_priv_t *rp = im->im_data; - iptv_rtcp_info_t *rtcp_info = rp->rtcp_info; + rtcp_t *rtcp_info = &im->im_rtcp_info; ssize_t hlen; /* Basic headers checks */ @@ -270,26 +382,174 @@ iptv_rtp_header_callback ( iptv_mux_t *im, uint8_t *rtp, int len ) static ssize_t iptv_rtsp_read ( iptv_input_t *mi, iptv_mux_t *im ) { - rtsp_priv_t *rp = im->im_data; - udp_multirecv_t *um = &rp->um; ssize_t r; - uint8_t buf[1500]; - - /* RTCP - ignore all incoming packets for now */ - do { - r = recv(im->mm_iptv_fd2, buf, sizeof(buf), MSG_DONTWAIT); - } while (r > 0); - - r = iptv_rtp_read(im, um, iptv_rtp_header_callback); + if (im->mm_iptv_send_reports) { + uint8_t buf[1500]; + /* RTCP - ignore all incoming packets for now */ + do { + r = recv(im->im_rtcp_info.connection_fd, buf, sizeof(buf), MSG_DONTWAIT); + } while (r > 0); + r = iptv_rtp_read(im, iptv_rtp_header_callback); + } else + r = iptv_rtp_read(im, NULL); if (r < 0 && ERRNO_AGAIN(errno)) r = 0; return r; } /* - * Initialise RTSP handler + * Send the status message */ +#if ENABLE_TIMESHIFT +static void rtsp_timeshift_fill_status(rtsp_st_t *ts, rtsp_priv_t *rp, + timeshift_status_t *status) { + int64_t start, end, current; + + if (rp == NULL) { + start = 0; + end = 3600; + current = 0; + } else { + start = 0; + end = rp->range_end - rp->range_start; + current = rp->position - rp->range_start; + } + status->full = 0; + tvhdebug(LS_TIMESHIFT, + "remote ts status start %"PRId64" end %"PRId64 " current %"PRId64, start, end, + current); + + status->shift = ts_rescale_inv(current, 1); + status->pts_start = ts_rescale_inv(start, 1); + status->pts_end = ts_rescale_inv(end, 1); +} + +static void rtsp_timeshift_status + ( rtsp_st_t *pd, rtsp_priv_t *rp ) +{ + streaming_message_t *tsm, *tsm2; + timeshift_status_t *status; + + status = calloc(1, sizeof(timeshift_status_t)); + rtsp_timeshift_fill_status(pd, rp, status); + tsm = streaming_msg_create_data(SMT_TIMESHIFT_STATUS, status); + tsm2 = streaming_msg_clone(tsm); + streaming_target_deliver2(pd->output, tsm); + streaming_target_deliver2(pd->tsfix, tsm2); +} + +void *rtsp_status_thread(void *p) { + int64_t mono_now, mono_last_status = 0; + rtsp_st_t *pd = p; + rtsp_priv_t *rp; + + while (pd->run) { + mono_now = getfastmonoclock(); + if(pd->im == NULL) + continue; + rp = (rtsp_priv_t*) pd->im->im_data; + if(rp == NULL || !pd->rtsp_input_start) + continue; + if (mono_now >= (mono_last_status + sec2mono(1))) { + // In case no buffer updates available assume the buffer is being filled + if(rp->hc && rp->hc->hc_rtsp_keep_alive_cmd != RTSP_CMD_DESCRIBE) + rp->range_end++; + rtsp_timeshift_status(pd, rp); + mono_last_status = mono_now; + } + } + return NULL; +} + +static void rtsp_input(void *opaque, streaming_message_t *sm) { + int type = sm->sm_type; + rtsp_st_t *pd = (rtsp_st_t*) opaque; + iptv_mux_t *mux; + streaming_skip_t *data; + rtsp_priv_t *rp; + + if(pd == NULL) + return; + + switch (type) { + case SMT_GRACE: + if (sm->sm_s != NULL) + pd->im = (iptv_mux_t*) ((mpegts_service_t*) sm->sm_s)->s_dvb_mux; + streaming_target_deliver2(pd->output, sm); + break; + case SMT_START: + pd->rtsp_input_start = 1; + streaming_target_deliver2(pd->output, sm); + break; + case SMT_SKIP: + mux = (iptv_mux_t*) pd->im; + if (mux == NULL || mux->im_data == NULL) + break; + rp = (rtsp_priv_t*) mux->im_data; + if (rp->start_position == 0) + rp->start_position = rp->hc->hc_rtsp_stream_start; + rtsp_pause(rp->hc, rp->path, rp->query); + mux->mm_iptv_rtp_seq = -1; + data = (streaming_skip_t*) sm->sm_data; + rtsp_set_position(rp->hc, + rp->range_start + ts_rescale(data->time, 1)); + tvhinfo(LS_IPTV, "rtsp: skip: %" PRItime_t " + %" PRItime_t, rp->range_start, + ts_rescale(data->time, 1)); + streaming_msg_free(sm); + break; + case SMT_SPEED: + mux = (iptv_mux_t*) pd->im; + if (mux == NULL || mux->im_data == NULL) + break; + rp = (rtsp_priv_t*) mux->im_data; + tvhinfo(LS_IPTV, "rtsp: set speed: %i", sm->sm_code); + if (sm->sm_code == 0) { + rtsp_pause(rp->hc, rp->path, rp->query); + } else { + rtsp_set_speed(rp->hc, sm->sm_code / 100); + } + streaming_msg_free(sm); + break; + case SMT_EXIT: + pd->run = 0; + streaming_target_deliver2(pd->output, sm); + break; + default: + streaming_target_deliver2(pd->output, sm); + } +} + +static htsmsg_t* +rtsp_input_info(void *opaque, htsmsg_t *list) { + + return list; +} + +static streaming_ops_t rtsp_input_ops = +{ .st_cb = rtsp_input, .st_info = rtsp_input_info }; + +streaming_target_t* rtsp_st_create(streaming_target_t *out, profile_chain_t *prch) { + rtsp_st_t *h = calloc(1, sizeof(rtsp_st_t)); + + h->output = out; + h->tsfix = prch->prch_share; + h->run = 1; + tvh_thread_create(&h->st_thread, NULL, rtsp_status_thread, h, "rtsp-st"); + streaming_target_init(&h->input, &rtsp_input_ops, h, 0); + + return &h->input; +} + +void rtsp_st_destroy(streaming_target_t *st) { + rtsp_st_t *h = (rtsp_st_t*)st; + h->run = 0; + free(st); +} +#endif +/* + * Initialise RTSP handler + */ void iptv_rtsp_init ( void ) { diff --git a/src/input/mpegts/iptv/iptv_udp.c b/src/input/mpegts/iptv/iptv_udp.c index fbe0bb589..b77688556 100644 --- a/src/input/mpegts/iptv/iptv_udp.c +++ b/src/input/mpegts/iptv/iptv_udp.c @@ -19,6 +19,7 @@ #include "tvheadend.h" #include "iptv_private.h" +#include "iptv_rtcp.h" #include #include @@ -36,7 +37,7 @@ iptv_udp_start ( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *url ) { udp_connection_t *conn; - udp_multirecv_t *um; + udp_multirecv_init(&im->im_um1, IPTV_PKTS, IPTV_PKT_PAYLOAD); /* Note: url->user is used for specifying multicast source address (SSM) here. The URL format is rtp://@: */ @@ -47,13 +48,19 @@ iptv_udp_start if (conn == NULL) return -1; - /* Done */ im->mm_iptv_fd = conn->fd; im->mm_iptv_connection = conn; - um = calloc(1, sizeof(*um)); - udp_multirecv_init(um, IPTV_PKTS, IPTV_PKT_PAYLOAD); - im->im_data = um; + /* Setup the RTCP Retransmission connection when configured */ + rtcp_init(&im->im_rtcp_info); + if(im->mm_iptv_ret_url && rtcp_connect(&im->im_rtcp_info, im->mm_iptv_ret_url, + NULL, 0, im->mm_iptv_interface, im->mm_nicename) == 0) { + im->im_use_retransmission = 1; + udp_multirecv_init(&im->im_um2, IPTV_PKTS, IPTV_PKT_PAYLOAD); + sbuf_reset_and_alloc(&im->im_temp_buffer, IPTV_BUF_SIZE); + } + + im->mm_iptv_rtp_seq = -1; iptv_input_mux_started(mi, im, 1); return 0; @@ -63,12 +70,14 @@ static void iptv_udp_stop ( iptv_input_t *mi, iptv_mux_t *im ) { - udp_multirecv_t *um = im->im_data; - im->im_data = NULL; tvh_mutex_unlock(&iptv_lock); - udp_multirecv_free(um); - free(um); + udp_multirecv_free(&im->im_um1); + if(&im->im_um2) { + udp_multirecv_free(&im->im_um2); + } + if(&im->im_temp_buffer) + sbuf_free(&im->im_temp_buffer); tvh_mutex_lock(&iptv_lock); } @@ -77,10 +86,9 @@ iptv_udp_read ( iptv_input_t *mi, iptv_mux_t *im ) { int i, n; struct iovec *iovec; - udp_multirecv_t *um = im->im_data; ssize_t res = 0; - n = udp_multirecv_read(um, im->mm_iptv_fd, IPTV_PKTS, &iovec); + n = udp_multirecv_read(&im->im_um1, im->mm_iptv_fd, IPTV_PKTS, &iovec); if (n < 0) return -1; @@ -105,17 +113,33 @@ iptv_udp_read ( iptv_input_t *mi, iptv_mux_t *im ) } ssize_t -iptv_rtp_read ( iptv_mux_t *im, udp_multirecv_t *um, - void (*pkt_cb)(iptv_mux_t *im, uint8_t *pkt, int len) ) +iptv_rtp_read(iptv_mux_t *im, void (*pkt_cb)(iptv_mux_t *im, uint8_t *pkt, int len)) { ssize_t len, hlen; uint8_t *rtp; - int i, n; - uint32_t seq, nseq, unc = 0; + int i, n = 0; + uint32_t seq, nseq, oseq, ssrc, unc = 0; struct iovec *iovec; ssize_t res = 0; + char is_ret_buffer = 0; + + if (im->im_use_retransmission) { + n = udp_multirecv_read(&im->im_um2, im->im_rtcp_info.connection_fd, IPTV_PKTS, &iovec); + if (n > 0 && !im->im_is_ce_detected) { + tvhwarn(LS_IPTV, "RET receiving %d unexpected packets for %s", n, + im->mm_nicename); + } + else if (n > 0) { + tvhtrace(LS_IPTV, "RET receiving %d packets for %s", n, im->mm_nicename); + is_ret_buffer = 1; + im->im_rtcp_info.ce_cnt -= n; + im->im_rtcp_info.last_received_sequence += n; + } else { + n = udp_multirecv_read(&im->im_um1, im->mm_iptv_fd, IPTV_PKTS, &iovec); + } + } else + n = udp_multirecv_read(&im->im_um1, im->mm_iptv_fd, IPTV_PKTS, &iovec); - n = udp_multirecv_read(um, im->mm_iptv_fd, IPTV_PKTS, &iovec); if (n < 0) return -1; @@ -138,12 +162,16 @@ iptv_rtp_read ( iptv_mux_t *im, udp_multirecv_t *um, if ((rtp[0] & 0xC0) != 0x80) continue; - /* MPEG-TS */ - if ((rtp[1] & 0x7F) != 33) + /* MPEG-TS or DynamicRTP */ + if ((rtp[1] & 0x7F) != 33 && (rtp[1] & 0x7F) != 96) continue; /* Header length (4bytes per CSRC) */ hlen = ((rtp[0] & 0xf) * 4) + 12; + if (is_ret_buffer) { + /* Skip OSN (original sequence number) field for RET packets */ + hlen += 2; + } if (rtp[0] & 0x10) { if (len < hlen+4) continue; @@ -155,18 +183,59 @@ iptv_rtp_read ( iptv_mux_t *im, udp_multirecv_t *um, len -= hlen; - /* Use uncorrectable value to notify RTP delivery issues */ nseq = (rtp[2] << 8) | rtp[3]; - if (seq == -1) + if (seq == -1 || nseq == 0) seq = nseq; - else if (((seq + 1) & 0xffff) != nseq) { - unc += (len / 188) * (uint32_t)((uint16_t)nseq-(uint16_t)(seq+1)); - tvhtrace(LS_IPTV, "RTP discontinuity (%i != %i)", seq + 1, nseq); + /* Some sources will send the retransmission packets as part of the regular + * stream, we can only detect them by checking for the expected seqn. */ + if (im->im_is_ce_detected && !is_ret_buffer && nseq == im->im_rtcp_info.last_received_sequence) { + is_ret_buffer = 1; + im->im_rtcp_info.ce_cnt --; + im->im_rtcp_info.last_received_sequence ++; } - seq = nseq; - /* Move data */ - sbuf_append(&im->mm_iptv_buffer, rtp + hlen, len); + if (!is_ret_buffer) { + if(seq != nseq && ((seq + 1) & 0xffff) != nseq) { + unc += (len / 188) + * (uint32_t) ((uint16_t) nseq - (uint16_t) (seq + 1)); + ssrc = (rtp[8] << 24) | (rtp[9] << 16) | (rtp[10] << 8) | rtp[11]; + /* Use uncorrectable value to notify RTP delivery issues */ + tvhwarn(LS_IPTV, "RTP discontinuity for %s SSRC: 0x%x (%i != %i)", im->mm_nicename, + ssrc, seq + 1, nseq); + if (im->im_use_retransmission && !im->im_is_ce_detected) { + im->im_is_ce_detected = 1; + rtcp_send_nak(&im->im_rtcp_info, ssrc, seq + 1, nseq - seq - 1); + } + } + seq = nseq; + } + + if (im->im_is_ce_detected) { + /* Move data to RET buffer */ + ssrc = (rtp[8] << 24) | (rtp[9] << 16) | (rtp[10] << 8) | rtp[11]; + if (is_ret_buffer) { + oseq = (rtp[12] << 8) | rtp[13]; + tvhtrace(LS_IPTV, "RTP RET received SEQ %i OSN %i for SSRC: 0x%x", nseq, oseq, ssrc); + sbuf_append(&im->mm_iptv_buffer, rtp + hlen, len); + } else + sbuf_append(&im->im_temp_buffer, rtp + hlen, len); + /* If we received all RET packets dump the temporary buffer back into the iptv buffer, + * or if it takes too long just continue as normal. RET packet rate can be a lot slower + * then the main stream so this can take some time. */ + if(im->im_rtcp_info.ce_cnt > 0 && im->im_temp_buffer.sb_ptr > 1600 * IPTV_PKT_PAYLOAD) { + tvhwarn(LS_IPTV, "RTP RET waiting for packets timeout for SSRC: 0x%x", ssrc); + im->im_rtcp_info.ce_cnt = 0; + } + if(im->im_rtcp_info.ce_cnt <= 0) { + im->im_rtcp_info.ce_cnt = 0; + im->im_is_ce_detected = 0; + sbuf_append_from_sbuf(&im->mm_iptv_buffer, &im->im_temp_buffer); + sbuf_reset_and_alloc(&im->im_temp_buffer, IPTV_BUF_SIZE); + } + } else { + /* Move data */ + sbuf_append(&im->mm_iptv_buffer, rtp + hlen, len); + } res += len; } @@ -180,9 +249,7 @@ iptv_rtp_read ( iptv_mux_t *im, udp_multirecv_t *um, static ssize_t iptv_udp_rtp_read ( iptv_input_t *mi, iptv_mux_t *im ) { - udp_multirecv_t *um = im->im_data; - - return iptv_rtp_read(im, um, NULL); + return iptv_rtp_read(im, NULL); } /* diff --git a/src/plumbing/tsfix.c b/src/plumbing/tsfix.c index 4b4843e5d..dc28e43bc 100644 --- a/src/plumbing/tsfix.c +++ b/src/plumbing/tsfix.c @@ -67,6 +67,8 @@ typedef struct tsfix { int tf_wait_for_video; int64_t tf_tsref; int64_t tf_start_time; + int64_t dts_offset; + int dts_offset_apply; struct th_pktref_queue tf_ptsq; struct th_pktref_queue tf_backlog; @@ -235,7 +237,10 @@ normalize_ts(tsfix_t *tf, tfstream_t *tfs, th_pkt_t *pkt, int backlog) pkt->pkt_dts &= PTS_MASK; /* Subtract the transport wide start offset */ - dts = pts_diff(ref, pkt->pkt_dts); + if (!tf->dts_offset_apply) + dts = pts_diff(ref, pkt->pkt_dts); + else + dts = pts_diff(ref, pkt->pkt_dts + tf->dts_offset); if (tfs->tfs_last_dts_norm == PTS_UNSET) { if (dts < 0 || pkt->pkt_err) { @@ -266,7 +271,7 @@ normalize_ts(tsfix_t *tf, tfstream_t *tfs, th_pkt_t *pkt, int backlog) } tfs->tfs_bad_dts++; if (tfs->tfs_bad_dts < 5) { - tvherror(LS_TSFIX, + tvhwarn(LS_TSFIX, "transport stream %s, DTS discontinuity. " "DTS = %" PRId64 ", last = %" PRId64, streaming_component_type2txt(tfs->tfs_type), @@ -627,6 +632,7 @@ static void tsfix_input(void *opaque, streaming_message_t *sm) { tsfix_t *tf = opaque; + timeshift_status_t *status; switch(sm->sm_type) { case SMT_PACKET: @@ -644,11 +650,25 @@ tsfix_input(void *opaque, streaming_message_t *sm) streaming_msg_free(sm); return; } + break; case SMT_STOP: tsfix_stop(tf); break; + case SMT_TIMESHIFT_STATUS: + if(tf->dts_offset == PTS_UNSET) { + status = sm->sm_data; + tf->dts_offset = status->shift; + } + streaming_msg_free(sm); + return; + + case SMT_SKIP: + if(tf->dts_offset != PTS_UNSET) { + tf->dts_offset_apply = 1; + } + break; case SMT_GRACE: case SMT_EXIT: @@ -659,8 +679,7 @@ tsfix_input(void *opaque, streaming_message_t *sm) case SMT_NOSTART_WARN: case SMT_MPEGTS: case SMT_SPEED: - case SMT_SKIP: - case SMT_TIMESHIFT_STATUS: + break; } @@ -694,7 +713,7 @@ tsfix_create(streaming_target_t *output) tf->tf_output = output; tf->tf_start_time = mclk(); - + tf->dts_offset = PTS_UNSET; streaming_target_init(&tf->tf_input, &tsfix_input_ops, tf, 0); return &tf->tf_input; } diff --git a/src/profile.c b/src/profile.c index 76aa1a8c1..27631f30d 100644 --- a/src/profile.c +++ b/src/profile.c @@ -30,6 +30,7 @@ #endif #if ENABLE_TIMESHIFT #include "timeshift.h" +#include "input/mpegts/iptv/iptv_private.h" #endif #include "dvr/dvr.h" @@ -640,6 +641,8 @@ profile_deliver(profile_chain_t *prch, streaming_message_t *sm) } sm2 = streaming_msg_create_data(SMT_START, streaming_start_copy(prsh->prsh_start_msg)); + if (sm) + sm2->sm_s = sm->sm_s; streaming_target_deliver(prch->prch_post_share, sm2); prch->prch_start_pending = 0; } @@ -1040,7 +1043,7 @@ profile_chain_init(profile_chain_t *prch, profile_t *pro, void *id, int queue) */ int profile_chain_work(profile_chain_t *prch, struct streaming_target *dst, - uint32_t timeshift_period, int flags) + uint32_t timeshift_period, profile_work_flags_t flags) { profile_t *pro = prch->prch_pro; if (pro && pro->pro_work) @@ -1153,6 +1156,10 @@ profile_chain_close(profile_chain_t *prch) timeshift_destroy(prch->prch_timeshift); prch->prch_timeshift = NULL; } + if(prch->prch_rtsp) { + rtsp_st_destroy(prch->prch_rtsp); + prch->prch_rtsp = NULL; + } #endif if (prch->prch_gh) { globalheaders_destroy(prch->prch_gh); @@ -1204,7 +1211,7 @@ const idclass_t profile_htsp_class = static int profile_htsp_work(profile_chain_t *prch, streaming_target_t *dst, - uint32_t timeshift_period, int flags) + uint32_t timeshift_period, profile_work_flags_t flags) { profile_sharer_t *prsh; @@ -1212,9 +1219,17 @@ profile_htsp_work(profile_chain_t *prch, if (!prsh) goto fail; + if (!prsh->prsh_tsfix) + prsh->prsh_tsfix = tsfix_create(&prsh->prsh_input); + prch->prch_share = prsh->prsh_tsfix; + #if ENABLE_TIMESHIFT - if (timeshift_period > 0) - dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period); + if (flags & PROFILE_WORK_REMOTE_TS) { + dst = prch->prch_rtsp = rtsp_st_create(dst, prch); + } else { + if (timeshift_period > 0) + dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period); + } #endif dst = prch->prch_gh = globalheaders_create(dst); @@ -1222,10 +1237,6 @@ profile_htsp_work(profile_chain_t *prch, if (profile_sharer_create(prsh, prch, dst)) goto fail; - if (!prsh->prsh_tsfix) - prsh->prsh_tsfix = tsfix_create(&prsh->prsh_input); - - prch->prch_share = prsh->prsh_tsfix; prch->prch_flags = SUBSCRIPTION_PACKET; streaming_target_init(&prch->prch_input, prsh->prsh_do_queue ? @@ -2398,7 +2409,7 @@ profile_transcode_can_share(profile_chain_t *prch, static int profile_transcode_work(profile_chain_t *prch, streaming_target_t *dst, - uint32_t timeshift_period, int flags) + uint32_t timeshift_period, profile_work_flags_t flags) { profile_sharer_t *prsh; profile_transcode_t *pro = (profile_transcode_t *)prch->prch_pro; diff --git a/src/profile.h b/src/profile.h index efccb0352..0496c6911 100644 --- a/src/profile.h +++ b/src/profile.h @@ -45,6 +45,11 @@ typedef enum { PROFILE_SVF_UHD } profile_svfilter_t; +typedef enum { + PROFILE_WORK_NONE = 0, + PROFILE_WORK_REMOTE_TS +} profile_work_flags_t; + struct profile; struct muxer; struct streaming_target; @@ -94,6 +99,7 @@ typedef struct profile_chain { struct streaming_target *prch_tsfix; #if ENABLE_TIMESHIFT struct streaming_target *prch_timeshift; + struct streaming_target *prch_rtsp; #endif struct streaming_target prch_input; struct streaming_target *prch_share; @@ -129,7 +135,7 @@ typedef struct profile { void (*pro_conf_changed)(struct profile *pro); int (*pro_work)(profile_chain_t *prch, struct streaming_target *dst, - uint32_t timeshift_period, int flags); + uint32_t timeshift_period, profile_work_flags_t flags); int (*pro_reopen)(profile_chain_t *prch, muxer_config_t *m_cfg, muxer_hints_t *hints, int flags); int (*pro_open)(profile_chain_t *prch, muxer_config_t *m_cfg, @@ -175,7 +181,7 @@ static inline void profile_release( profile_t *pro ) } int profile_chain_work(profile_chain_t *prch, struct streaming_target *dst, - uint32_t timeshift_period, int flags); + uint32_t timeshift_period, profile_work_flags_t flags); int profile_chain_reopen(profile_chain_t *prch, muxer_config_t *m_cfg, muxer_hints_t *hints, int flags); diff --git a/src/rtsp.c b/src/rtsp.c index 9cf0b8751..6a7787910 100644 --- a/src/rtsp.c +++ b/src/rtsp.c @@ -37,7 +37,7 @@ rtsp_send_ext( http_client_t *hc, http_cmd_t cmd, (hc->hc_port != 554 ? 7 : 0) + (path ? strlen(path) : 1) + 1; char *buf = alloca(blen); - char buf2[11]; + char buf2[64]; char buf_body[size + 3]; if (hc->hc_rtsp_session) { @@ -118,11 +118,6 @@ rtsp_setup_decode( http_client_t *hc, int satip ) char *argv[32], *argv2[2], *p; int i, n, j; -#if 0 - { http_arg_t *ra; - TAILQ_FOREACH(ra, &hc->hc_args, link) - printf(" %s: %s\n", ra->key, ra->val); } -#endif rtsp_clear_session(hc); if (hc->hc_code != 200) return -EIO; @@ -186,7 +181,8 @@ rtsp_setup_decode( http_client_t *hc, int satip ) } } } else if (!strcasecmp(argv[0], "RTP/AVP") || - !strcasecmp(argv[0], "RTP/AVP/UDP")) { + !strcasecmp(argv[0], "RTP/AVP/UDP") || + !strcasecmp(argv[0], "RTP/AVPF/UDP")) { if (n < 3) return -EIO; hc->hc_rtp_multicast = strcasecmp(argv[1], "multicast") == 0; @@ -227,6 +223,28 @@ rtsp_setup_decode( http_client_t *hc, int satip ) return HTTP_CON_OK; } +int +rtsp_play_decode( http_client_t *hc ) +{ + char *argv[32], *p; + int n; + + if (hc->hc_code != 200) + return -EIO; + p = http_arg_get(&hc->hc_args, "Range"); + if (p == NULL) + return -EIO; + n = http_tokenize(p, argv, 32, '='); + if (n < 1 || strncmp(argv[0], "npt", 3)) + return -EIO; + hc->hc_rtsp_stream_start = strtoumax(argv[1], NULL, 10); + p = http_arg_get(&hc->hc_args, "Scale"); + if (p == NULL) + return -EIO; + hc->hc_rtsp_scale = strtof(p, NULL); + return 0; +} + int rtsp_setup( http_client_t *hc, const char *path, const char *query, @@ -243,6 +261,9 @@ rtsp_setup( http_client_t *hc, snprintf(transport, sizeof(transport), "RTP/AVP;multicast;destination=%s;ttl=1;client_port=%i-%i", multicast_addr, rtp_port, rtcp_port); + } else if(hc->hc_rtp_avpf) { + snprintf(transport, sizeof(transport), + "RTP/AVPF/UDP;unicast;client_port=%i-%i", rtp_port, rtcp_port); } else { snprintf(transport, sizeof(transport), "RTP/AVP;unicast;client_port=%i-%i", rtp_port, rtcp_port); @@ -253,16 +274,36 @@ rtsp_setup( http_client_t *hc, return rtsp_send(hc, RTSP_CMD_SETUP, path, query, &h); } -int -rtsp_describe_decode( http_client_t *hc ) -{ - http_arg_t *ra; +int rtsp_describe_decode(http_client_t *hc, const char *buf, size_t len) { + const char *p; + char transport[64]; + int n, t, transport_type; - /* TODO: Probably rewrite the data to the htsmsg tree ? */ - printf("describe: %i\n", hc->hc_code); - TAILQ_FOREACH(ra, &hc->hc_args, link) - printf(" %s: %s\n", ra->key, ra->val); - printf("data:\n%s\n", hc->hc_data); + p = http_arg_get(&hc->hc_args, "Content-Type"); + if (p == NULL || strncmp(p, "application/sdp", 15)) { + tvhwarn(LS_RTSP, "describe: unkwown response content"); + return -EIO; + } + for (n = 0; n < len; n++) { + p = buf + n; + if (strncmp(p, "a=range", 7) == 0) { + // Parse remote timeshift buffer info + if (strncmp(p + 8, "npt=", 4) == 0) { + sscanf(p + 8, "npt=%" PRItime_t "-%" PRItime_t, &hc->hc_rtsp_range_start, + &hc->hc_rtsp_range_end); + } + } + if (strncmp(p, "m=video", 7) == 0) { + // Parse and select RTP/AVPF stream if available for retransmission support + if (sscanf(p, "m=video %d %s %d\n", &t, transport, &transport_type) == 3) { + tvhtrace(LS_RTSP, "describe: found transport: %d %s %d", t, transport, + transport_type); + if (strncmp(transport, "RTP/AVPF", 8) == 0) { + hc->hc_rtp_avpf = 1; + } + } + } + } return HTTP_CON_OK; } @@ -273,3 +314,23 @@ rtsp_get_parameter( http_client_t *hc, const char *parameter ) { http_arg_set(&hdr, "Content-Type", "text/parameters"); return rtsp_send_ext(hc, RTSP_CMD_GET_PARAMETER, NULL, NULL, &hdr, parameter, strlen(parameter)); } + +int +rtsp_set_speed( http_client_t *hc, float speed ) { + char buf[64]; + http_arg_list_t h; + http_arg_init(&h); + snprintf(buf, sizeof(buf), "%.2f", speed); + http_arg_set(&h, "Scale", buf); + return rtsp_send(hc, RTSP_CMD_PLAY, NULL, NULL, &h); +} + +int +rtsp_set_position( http_client_t *hc, time_t position ) { + char buf[64]; + http_arg_list_t h; + http_arg_init(&h); + snprintf(buf, sizeof(buf), "npt=%" PRItime_t "-", position); + http_arg_set(&h, "Range", buf); + return rtsp_send(hc, RTSP_CMD_PLAY, NULL, NULL, &h); +} diff --git a/src/streaming.c b/src/streaming.c index aac5221ae..267ccdd9a 100644 --- a/src/streaming.c +++ b/src/streaming.c @@ -201,7 +201,7 @@ streaming_target_disconnect(streaming_pad_t *sp, streaming_target_t *st) streaming_message_t * streaming_msg_create(streaming_message_type_t type) { - streaming_message_t *sm = malloc(sizeof(streaming_message_t)); + streaming_message_t *sm = calloc(1, sizeof(streaming_message_t)); memoryinfo_alloc(&streaming_msg_memoryinfo, sizeof(*sm)); sm->sm_type = type; #if ENABLE_TIMESHIFT @@ -432,6 +432,7 @@ streaming_service_deliver(service_t *t, streaming_message_t *sm) { if (atomic_set(&t->s_pending_restart, 0)) service_restart_streams(t); + sm->sm_s = t; streaming_pad_deliver(&t->s_streaming_pad, sm); } diff --git a/src/streaming.h b/src/streaming.h index 3ea3514d9..c90034358 100644 --- a/src/streaming.h +++ b/src/streaming.h @@ -320,6 +320,7 @@ const char * signal2str ( signal_state_t st ); struct streaming_message { TAILQ_ENTRY(streaming_message) sm_link; streaming_message_type_t sm_type; + service_t *sm_s; #if ENABLE_TIMESHIFT int64_t sm_time; #endif