- Implement RTCP Negative Acknowledge (a.k.a. Retransmission) support for RTP streams.
When packet loss is detected the client will send a RTCP Generic Feedback report to the server. The server can than resend these lost packets.
Retransmitted packets are send through a second connection or as part of the main stream, both cases are supported.
For Multicast manual setup of the RTCP server is required, for RTSP automatic setup (was already implemented for Receiver Reports) or manual override is possible.
- General clean-up of unused RTCP code and restructure to allow for easy implementation of different types of RTCP messages.
- Make RTCP Receiver Reports optional.
- RTSP start session with DESCRIBE and parse response content.
- RTSP DESCRIBE redirect support.
- Parse DESCRIBE response for AVPF support (required for Retransmission).
- Implement remote time shift support for RTSP streams.
This option can be enabled for a channel to pass-through time shift commands to the RTSP server, the internal time shift buffer is then disabled.
.list = channel_class_epg_running_list,
.opts = PO_EXPERT | PO_DOC_NLIST,
},
+#if ENABLE_TIMESHIFT
+ {
+ .type = PT_BOOL,
+ .id = "remote_timeshift",
+ .name = N_("Remote timeshift"),
+ .desc = N_("Pass timeshift commands to a remote RTSP server"),
+ .off = offsetof(channel_t, ch_remote_timeshift),
+ .opts = PO_ADVANCED,
+ },
+#endif
{
.type = PT_STR,
.islist = 1,
int ch_dvr_extra_time_pre;
int ch_dvr_extra_time_post;
int ch_epg_running;
+ int ch_remote_timeshift;
LIST_HEAD(, dvr_entry) ch_dvrs;
LIST_HEAD(, dvr_autorec_entry) ch_autorecs;
LIST_HEAD(, dvr_timerec_entry) ch_timerecs;
profile_id = htsmsg_get_str(in, "profile");
#if ENABLE_TIMESHIFT
- if (timeshift_conf.enabled) {
- timeshiftPeriod = htsmsg_get_u32_or_default(in, "timeshiftPeriod", 0);
- if (!timeshift_conf.unlimited_period)
- timeshiftPeriod = MIN(timeshiftPeriod, timeshift_conf.max_period * 60);
+ if(ch->ch_remote_timeshift) {
+ timeshiftPeriod = ~0;
+ } else {
+ if (timeshift_conf.enabled) {
+ timeshiftPeriod = htsmsg_get_u32_or_default(in, "timeshiftPeriod", 0);
+ if (!timeshift_conf.unlimited_period)
+ timeshiftPeriod = MIN(timeshiftPeriod, timeshift_conf.max_period * 60);
+ }
}
#endif
streaming_target_init(&hs->hs_input, &htsp_streaming_input_ops, hs, 0);
#if ENABLE_TIMESHIFT
- if (timeshiftPeriod != 0) {
- if (timeshiftPeriod == ~0)
- tvhdebug(LS_HTSP, "using timeshift buffer (unlimited)");
- else
- tvhdebug(LS_HTSP, "using timeshift buffer (%u mins)", timeshiftPeriod / 60);
+ if (ch->ch_remote_timeshift) {
+ tvhdebug(LS_HTSP, "using remote timeshift (RTSP)");
+ } else {
+ if (timeshiftPeriod != 0) {
+ if (timeshiftPeriod == ~0)
+ tvhdebug(LS_HTSP, "using timeshift buffer (unlimited)");
+ else
+ tvhdebug(LS_HTSP, "using timeshift buffer (%u mins)",
+ timeshiftPeriod / 60);
+ }
}
#endif
pro = profile_find_by_list(htsp->htsp_granted_access->aa_profiles, profile_id,
"htsp", SUBSCRIPTION_PACKET | SUBSCRIPTION_HTSP);
profile_chain_init(&hs->hs_prch, pro, ch, 1);
- if (profile_chain_work(&hs->hs_prch, &hs->hs_input, timeshiftPeriod, 0)) {
+ if (profile_chain_work(&hs->hs_prch, &hs->hs_input, timeshiftPeriod, ch->ch_remote_timeshift ?
+ PROFILE_WORK_REMOTE_TS : PROFILE_WORK_NONE)) {
tvherror(LS_HTSP, "unable to create profile chain '%s'", profile_get_name(pro));
profile_chain_close(&hs->hs_prch);
free(hs);
htsmsg_add_u32(rep, "weight", hs->hs_s->ths_weight >= 0 ? hs->hs_s->ths_weight : 0);
#if ENABLE_TIMESHIFT
- if(timeshiftPeriod)
+ if (ch->ch_remote_timeshift) {
htsmsg_add_u32(rep, "timeshiftPeriod", timeshiftPeriod);
+ } else {
+ if (timeshiftPeriod)
+ htsmsg_add_u32(rep, "timeshiftPeriod", timeshiftPeriod);
+ }
#endif
htsp_reply(htsp, in, rep);
int hc_rtcp_tcp;
int hc_rtcp_server_port;
int hc_rtp_multicast:1;
+ int hc_rtp_avpf:1;
long hc_rtsp_stream_id;
int hc_rtp_timeout;
char *hc_rtsp_user;
char *hc_rtsp_pass;
char hc_rtsp_keep_alive_cmd;
+ time_t hc_rtsp_stream_start;
+ time_t hc_rtsp_range_start;
+ time_t hc_rtsp_range_end;
+ float hc_rtsp_scale;
struct http_client_ssl *hc_ssl; /* ssl internals */
void rtsp_clear_session( http_client_t *hc );
-int rtsp_options_decode( http_client_t *hc );
static inline int rtsp_options( http_client_t *hc ) {
return rtsp_send(hc, RTSP_CMD_OPTIONS, NULL, NULL, NULL);
}
-int rtsp_setup_decode( http_client_t *hc, int satip );
+static inline int
+rtsp_describe( http_client_t *hc, const char *path, const char *query ) {
+ return rtsp_send(hc, RTSP_CMD_DESCRIBE, path, query, NULL);
+}
+
int rtsp_setup( http_client_t *hc, const char *path, const char *query,
const char *multicast_addr, int rtp_port, int rtcp_port );
int rtsp_get_parameter( http_client_t *hc, const char *parameter );
-int rtsp_describe_decode( http_client_t *hc );
-static inline int
-rtsp_describe( http_client_t *hc, const char *path, const char *query ) {
- return rtsp_send(hc, RTSP_CMD_DESCRIBE, path, query, NULL);
-}
+int rtsp_set_speed( http_client_t *hc, float speed );
+
+int rtsp_set_position( http_client_t *hc, time_t position );
+
+int rtsp_describe_decode( http_client_t *hc, const char *buf, size_t len );
+
+int rtsp_setup_decode( http_client_t *hc, int satip );
+
+int rtsp_options_decode( http_client_t *hc );
+int rtsp_play_decode( http_client_t *hc );
#endif /* HTTP_H_ */
{
iptv_thread_pool_t *pool = mi->mi_tpool;
- if (im->mm_iptv_fd > 0 || im->mm_iptv_fd2 > 0)
- tvhtrace(LS_IPTV, "iptv_input_close_fds %d %d", im->mm_iptv_fd, im->mm_iptv_fd2);
+ if (im->mm_iptv_fd > 0 || im->im_rtcp_info.connection_fd > 0)
+ tvhtrace(LS_IPTV, "iptv_input_close_fds %d %d", im->mm_iptv_fd, im->im_rtcp_info.connection_fd);
/* Close file */
if (im->mm_iptv_fd > 0) {
}
/* Close file2 */
- if (im->mm_iptv_fd2 > 0) {
- tvhpoll_rem1(pool->poll, im->mm_iptv_fd2);
- udp_close(im->mm_iptv_connection2);
- im->mm_iptv_connection2 = NULL;
- im->mm_iptv_fd2 = -1;
+ if (im->im_rtcp_info.connection_fd > 0) {
+ tvhpoll_rem1(pool->poll, im->im_rtcp_info.connection_fd);
+ udp_close(im->im_rtcp_info.connection);
+ im->im_rtcp_info.connection = NULL;
+ im->im_rtcp_info.connection_fd = -1;
}
}
}
/* Setup poll2 */
- if (im->mm_iptv_fd2 > 0) {
+ if (im->im_rtcp_info.connection_fd > 0) {
/* Error? */
- if (tvhpoll_add1(tpool->poll, im->mm_iptv_fd2, TVHPOLL_IN, im) < 0) {
+ if (tvhpoll_add1(tpool->poll, im->im_rtcp_info.connection_fd, TVHPOLL_IN, im) < 0) {
tvherror(LS_IPTV, "%s - failed to add to poll q (2)", im->mm_nicename);
- close(im->mm_iptv_fd2);
- im->mm_iptv_fd2 = -1;
+ close(im->im_rtcp_info.connection_fd);
+ im->im_rtcp_info.connection_fd = -1;
return -1;
}
}
}
return 0;
-}
+}
static int
iptv_mux_url_set ( void *p, const void *v )
return iptv_url_set(&im->mm_iptv_url, &im->mm_iptv_url_sane, v, 1, 1);
}
+static int
+iptv_mux_ret_url_set ( void *p, const void *v )
+{
+ iptv_mux_t *im = p;
+ return iptv_url_set(&im->mm_iptv_ret_url, &im->mm_iptv_ret_url_sane, v, 0, 0);
+}
+
#if ENABLE_LIBAV
static htsmsg_t *
iptv_mux_libav_enum ( void *o, const char *lang )
.set = iptv_mux_url_set,
.opts = PO_MULTILINE
},
+ {
+ .type = PT_BOOL,
+ .id = "iptv_send_reports",
+ .name = N_("Send RTCP status reports"),
+ .off = offsetof(iptv_mux_t, mm_iptv_send_reports),
+ .opts = PO_ADVANCED
+ },
+ {
+ .type = PT_STR,
+ .id = "iptv_ret_url",
+ .name = N_("Retransmission URL"),
+ .desc = N_("Manually setup a retransmission URL for Multicast streams."
+ " For RTSP streams this URL is automatically setup"
+ " if this value is not set."),
+ .off = offsetof(iptv_mux_t, mm_iptv_ret_url),
+ .set = iptv_mux_ret_url_set,
+ .opts = PO_ADVANCED
+ },
{
.type = PT_STR,
.id = "iptv_url_cmpid",
free(im->mm_iptv_url_sane);
free(im->mm_iptv_url_raw);
free(im->mm_iptv_url_cmpid);
+ free(im->mm_iptv_ret_url);
+ free(im->mm_iptv_ret_url_sane);
+ free(im->mm_iptv_ret_url_raw);
+ free(im->mm_iptv_ret_url_cmpid);
free(im->mm_iptv_muxname);
free(im->mm_iptv_interface);
free(im->mm_iptv_svcname);
#include "url.h"
#include "udp.h"
#include "tvhpoll.h"
+#include "profile.h"
-#define IPTV_BUF_SIZE (300*188)
+#define IPTV_BUF_SIZE (2000*188)
#define IPTV_PKTS 32
#define IPTV_PKT_PAYLOAD 1472
iptv_network_t *iptv_network_create0 ( const char *uuid, htsmsg_t *conf, const idclass_t *idc );
+typedef struct {
+ /* Last transmitted packet timestamp */
+ time_t last_ts;
+ /* Next scheduled packet sending timestamp */
+ time_t next_ts;
+
+ double average_packet_size;
+
+ int members;
+ int senders;
+
+ uint16_t last_received_sequence;
+ uint16_t ce_cnt;
+ uint16_t sequence_cycle;
+ uint16_t nak_req_limit;
+
+ /* Connection to the RTCP remote */
+ udp_connection_t *connection;
+ int connection_fd;
+
+ uint32_t source_ssrc;
+ uint32_t my_ssrc;
+} rtcp_t;
+
struct iptv_mux
{
mpegts_mux_t;
int mm_iptv_streaming_priority;
int mm_iptv_fd;
udp_connection_t *mm_iptv_connection;
- int mm_iptv_fd2;
- udp_connection_t *mm_iptv_connection2;
char *mm_iptv_url;
char *mm_iptv_url_sane;
char *mm_iptv_url_raw;
char *mm_iptv_url_cmpid;
+ char *mm_iptv_ret_url;
+ char *mm_iptv_ret_url_sane;
+ char *mm_iptv_ret_url_raw;
+ char *mm_iptv_ret_url_cmpid;
char *mm_iptv_interface;
+ int mm_iptv_send_reports;
int mm_iptv_substitute;
int mm_iptv_libav;
int mm_iptv_atsc;
int im_delete_flag;
void *im_opaque;
+
+ udp_multirecv_t im_um1;
+ udp_multirecv_t im_um2;
+ char im_use_retransmission;
+ sbuf_t im_temp_buffer;
+ char im_is_ce_detected;
+ rtcp_t im_rtcp_info;
};
iptv_mux_t* iptv_mux_create0
void iptv_file_init ( void );
void iptv_libav_init ( void );
-ssize_t iptv_rtp_read ( iptv_mux_t *im, udp_multirecv_t *um,
- void (*pkt_cb)(iptv_mux_t *im, uint8_t *buf, int len) );
+ssize_t iptv_rtp_read(iptv_mux_t *im, void (*pkt_cb)(iptv_mux_t *im, uint8_t *buf, int len));
void iptv_input_unpause ( void *aux );
+struct rtsp_st {
+ // Note: input MUST BE FIRST in struct
+ streaming_target_t input; ///< Input source
+ streaming_target_t *output; ///< Output dest
+ streaming_target_t *tsfix;
+ pthread_t st_thread;
+ volatile int run;
+ volatile int rtsp_input_start;
+ iptv_mux_t *im;
+};
+
+typedef struct rtsp_st rtsp_st_t;
+#if ENABLE_TIMESHIFT
+void *rtsp_status_thread(void *p) ;
+streaming_target_t* rtsp_st_create(streaming_target_t *out, profile_chain_t *prch);
+void rtsp_st_destroy(streaming_target_t *st);
+#endif
#endif /* __IPTV_PRIVATE_H__ */
/******************************************************************************
Version and padding are set to fixed values, i.e. 2 and 0;
*/
static void
-rtcp_append_headers(sbuf_t *buffer, rtcp_t *packet)
+rtcp_append_headers(sbuf_t *buffer, rtcp_header_t *packet)
{
- packet->common.version = 2;
- packet->common.p = 0;
-
uint8_t byte = 0;
- byte |= packet->common.version << 6;
- byte |= packet->common.p << 5;
- byte |= packet->common.count;
+ packet->version = 2;
+ packet->p = 0;
+
+ byte |= packet->version << 6;
+ byte |= packet->p << 5;
+ byte |= packet->count;
sbuf_put_byte(buffer, byte);
- byte = packet->common.pt;
+ byte = packet->pt;
sbuf_put_byte(buffer, byte);
- sbuf_append(buffer, &packet->common.length, sizeof(packet->common.length));
+ sbuf_append(buffer, &packet->length, sizeof(packet->length));
}
/*
Append RTCP receiver report data to the buffer.
*/
static void
-rtcp_append_rr(sbuf_t *buffer, rtcp_t *packet)
+rtcp_append_rr(sbuf_t *buffer, rtcp_rr_t *packet)
{
uint8_t byte = 0;
- rtcp_rr_t report = packet->r.rr.rr[0];
+ rtcp_rr_block_t report = packet->rr[0];
- sbuf_append(buffer, &packet->r.rr.ssrc, sizeof(packet->r.rr.ssrc));
+ sbuf_append(buffer, &packet->ssrc, sizeof(packet->ssrc));
sbuf_append(buffer, &report.ssrc, sizeof(report.ssrc));
byte = report.fraction;
sbuf_put_byte(buffer, byte);
sbuf_append(buffer, &report.dlsr, sizeof(report.dlsr));
}
+/*
+ Append RTCP NAK data to the buffer.
+ */
+static void
+rtcp_append_nak(sbuf_t *buffer, rtcp_gf_t *packet)
+{
+ sbuf_append(buffer, &packet->my_ssrc, sizeof(packet->my_ssrc));
+ sbuf_append(buffer, &packet->ssrc, sizeof(packet->ssrc));
+ sbuf_append(buffer, &packet->pid, sizeof(packet->pid));
+ sbuf_append(buffer, &packet->blp, sizeof(packet->blp));
+}
+
/*
Just send the buffer to the host in the rtcp_info.
*/
static void
-rtcp_send(iptv_rtcp_info_t *info, sbuf_t *buffer)
+rtcp_send(rtcp_t *info, sbuf_t *buffer)
{
tvhdebug(LS_IPTV, "RTCP: Sending receiver report");
// We don't care of the result right now
It uses the actual informations stored in rtcp_info.
*/
static void
-rtcp_send_rr(iptv_rtcp_info_t *info)
+rtcp_send_rr(rtcp_t *info)
{
- rtcp_rr_t report;
-
+ rtcp_rr_block_t report;
+ rtcp_header_t header;
+ rtcp_rr_t packet;
report.ssrc = htonl(info->source_ssrc);
// Fill in the extended last sequence
report.jitter = htonl(12);
// Build the full packet
- rtcp_t packet;
- packet.common.pt = RTCP_RR;
- packet.common.count = 1;
+ header.pt = RTCP_RR;
+ header.count = 1;
// TODO : set the real length
- packet.common.length = htons(7);
- packet.r.rr.ssrc = htonl(info->my_ssrc);
- packet.r.rr.rr[0] = report;
+ header.length = htons(7);
+ packet.ssrc = htonl(info->my_ssrc);
+ packet.rr[0] = report;
// Build the network packet
sbuf_t network_buffer;
sbuf_init(&network_buffer);
- rtcp_append_headers(&network_buffer, &packet);
+ rtcp_append_headers(&network_buffer, &header);
rtcp_append_rr(&network_buffer, &packet);
// Send it
sbuf_free(&network_buffer);
}
-int
-rtcp_init(iptv_rtcp_info_t * info)
+ssize_t
+rtcp_send_nak(rtcp_t *rtcp_info, uint32_t ssrc, uint16_t seqn, uint16_t len)
+{
+ rtcp_header_t rtcp_header;
+ rtcp_gf_t rtcp_data;
+ sbuf_t network_buffer;
+ uint32_t n;
+ uint16_t blp = 0;
+ udp_connection_t *uc = rtcp_info->connection;
+
+ if (len > rtcp_info->nak_req_limit) {
+ seqn += len - rtcp_info->nak_req_limit;
+ len = rtcp_info->nak_req_limit;
+ }
+ tvhinfo(LS_IPTV,
+ "RTCP: Sending NAK report for SSRC 0x%x, missing: %d, following packets: %d",
+ ssrc, seqn, len - 1);
+
+ rtcp_info->last_received_sequence = seqn;
+ rtcp_info->ce_cnt = len;
+
+ // Build the full packet
+ rtcp_header.version = 2;
+ rtcp_header.p = 0;
+ rtcp_header.count = 1; // Generic NAK
+ rtcp_header.pt = RTCP_GF;
+ rtcp_header.length = htons(3);
+ rtcp_data.my_ssrc = 0;
+ rtcp_data.ssrc = htonl(ssrc);
+
+ while (len > 0) {
+ len--;
+ rtcp_data.pid = htons(seqn);
+ if (len > 16) {
+ blp = 0xffff;
+ len -= 16;
+ seqn += 17;
+ } else {
+ blp = 0;
+ for (n = 0; n < len; n++) {
+ blp |= 1 << n;
+ }
+ len = 0;
+ }
+ rtcp_data.blp = htons(blp);
+
+ // Build the network packet
+ sbuf_init(&network_buffer);
+ rtcp_append_headers(&network_buffer, &rtcp_header);
+ rtcp_append_nak(&network_buffer, &rtcp_data);
+
+ // Send it
+ n = udp_write(uc, network_buffer.sb_data, network_buffer.sb_ptr, &uc->peer);
+ if (n) {
+ tvhwarn(LS_IPTV,
+ "RTCP: Sending NAK report for SSRC 0x%x failed, no data send %d %d",
+ ssrc, n, (uint32_t )sizeof(network_buffer));
+ }
+ // Cleanup
+ sbuf_free(&network_buffer);
+ }
+ return 0;
+}
+
+int
+rtcp_connect(rtcp_t * info, char *url, char *host, int port, char *interface, char *nicename)
{
- uint32_t rnd;
+ udp_connection_t *rtcp_conn;
+ url_t rtcp_url;
+
+ if (info->connection == NULL) {
+ rtcp_conn = udp_bind(LS_IPTV, nicename, NULL, 0, NULL, interface,
+ IPTV_BUF_SIZE, 1024);
+ if (rtcp_conn == NULL || rtcp_conn == UDP_FATAL_ERROR) {
+ tvhwarn(LS_IPTV, "%s - Unable to bind, RTCP won't be available",
+ nicename);
+ return -1;
+ }
+ info->connection_fd = rtcp_conn->fd;
+ info->connection = rtcp_conn;
+ }
+ urlinit(&rtcp_url);
+ if (host == NULL) {
+ if (urlparse(url ? : "", &rtcp_url)) {
+ tvhwarn(LS_IPTV, "%s - invalid RTCP URL, should be rtp://HOST:PORT [%s]",
+ nicename, url);
+ goto fail;
+ }
+ host = rtcp_url.host;
+ port = rtcp_url.port;
+ }
+
+ if (udp_connect(info->connection, "rtcp", host, port)) {
+ tvhwarn(LS_IPTV, "%s - Unable to connect, RTCP won't be available",
+ nicename);
+ goto fail;
+ }
+ urlreset(&rtcp_url);
+ return 0;
+fail:
+ urlreset(&rtcp_url);
+ return -1;
+}
+
+int
+rtcp_init(rtcp_t * info)
+{
info->last_ts = 0;
info->next_ts = 0;
info->members = 2;
info->sequence_cycle = 1;
info->source_ssrc = 0;
info->average_packet_size = 52;
-
- // Fill my SSRC
- uuid_random((uint8_t *)&rnd, sizeof(random));
- info->my_ssrc = rnd;
- srand48(rnd * 0x4232a9b9);
+ info->my_ssrc = 0; // Since we are not transmitting, set this to 0
+ info->nak_req_limit = 128; // This appears to be a safe limit
return 0;
}
int
-rtcp_destroy(iptv_rtcp_info_t *info)
+rtcp_destroy(rtcp_t *info)
{
return 0;
}
* Buffer is a raw RTP buffer
*/
int
-rtcp_receiver_update(iptv_rtcp_info_t *info, uint8_t *buffer)
+rtcp_receiver_update(rtcp_t *info, uint8_t *buffer)
{
union {
uint8_t bytes[2];
RTCP_RR = 201,
RTCP_SDES = 202,
RTCP_BYE = 203,
- RTCP_APP = 204
+ RTCP_APP = 204,
+ RTCP_GF = 205
} rtcp_type_t;
typedef enum
unsigned int count : 5; /* varies by packet type */
unsigned int pt : 8; /* RTCP packet type */
uint16_t length; /* pkt len in words, w/o this word */
-} rtcp_common_t;
+} rtcp_header_t;
/*
uint32_t jitter; /* interarrival jitter */
uint32_t lsr; /* last SR packet from this source */
uint32_t dlsr; /* delay since last SR packet */
-} rtcp_rr_t;
+} rtcp_rr_block_t;
+
+
+
+/*
+ * Generic RTC Feedback block
+ */
+typedef struct
+{
+ uint32_t my_ssrc; /* not used */
+ uint32_t ssrc; /* data source being reported */
+ uint16_t pid; /* first missing frame id */
+ uint16_t blp; /* missing frame count */
+} rtcp_gf_t;
/*
* SDES item
} rtcp_sdes_t;
/*
- * One RTCP packet
+ * reception report (RR)
*/
typedef struct
{
- rtcp_common_t common; /* common header */
-
- union
- {
-
- /* sender report (SR) */
- struct
- {
- uint32_t ssrc; /* sender generating this report */
- uint32_t ntp_sec; /* NTP timestamp */
- uint32_t ntp_frac;
- uint32_t rtp_ts; /* RTP timestamp */
- uint32_t psent; /* packets sent */
- uint32_t osent; /* octets sent */
- rtcp_rr_t rr[1]; /* variable-length list */
- } sr;
-
- /* reception report (RR) */
- struct
- {
- uint32_t ssrc; /* receiver generating this report */
- rtcp_rr_t rr[1]; /* variable-length list */
- } rr;
-
- /* source description (SDES) */
- rtcp_sdes_t sdes;
-
- /* BYE */
- struct
- {
- uint32_t src[1]; /* list of sources */
- /* can't express trailing text for reason */
- } bye;
- } r;
-} rtcp_t;
-
-typedef struct iptv_rtcp_info {
- /* Last transmitted packet timestamp */
- time_t last_ts;
- /* Next scheduled packet sending timestamp */
- time_t next_ts;
-
- double average_packet_size;
-
- int members;
- int senders;
-
- uint16_t last_received_sequence;
- uint16_t sequence_cycle;
-
- /* Connection to the RTCP remote. Initialized by the RTSP code. */
- udp_connection_t *connection;
-
- uint32_t source_ssrc;
- uint32_t my_ssrc;
-} iptv_rtcp_info_t;
+ uint32_t ssrc; /* receiver generating this report */
+ rtcp_rr_block_t rr[1]; /* variable-length list */
+} rtcp_rr_t;
/*
Init rtcp_info field of the rtsp_info.
Return 0 if everything was OK.
rtcp_destroy must be called when rtsp_info is destroyed.
*/
-int rtcp_init(iptv_rtcp_info_t *info);
+int rtcp_init(rtcp_t *info);
/*
Destroy rtcp_info field of rtsp_info.
*/
-int rtcp_destroy(iptv_rtcp_info_t *info);
+int rtcp_destroy(rtcp_t *info);
/*
Update RTCP informations.
It can also send a RTCP RR packet if the timer has expired.
*/
-int rtcp_receiver_update(iptv_rtcp_info_t *info, uint8_t *rtp_packet);
+int rtcp_receiver_update(rtcp_t *info, uint8_t *rtp_packet);
+ssize_t rtcp_send_nak(rtcp_t *rtcp_info, uint32_t ssrc, uint16_t seqn, uint16_t len);
+int rtcp_connect(rtcp_t * info, char *url, char *host, int port, char *interface, char *nicename);
#endif /* IPTV_RTCP_H */
typedef struct {
http_client_t *hc;
- udp_multirecv_t um;
char *path;
char *query;
mtimer_t alive_timer;
int play;
- iptv_rtcp_info_t * rtcp_info;
+ time_t start_position;
+ time_t range_start;
+ time_t range_end;
+ time_t position;
} rtsp_priv_t;
/*
rtsp_get_parameter(rp->hc, "position");
else if(rp->hc->hc_rtsp_keep_alive_cmd == RTSP_CMD_OPTIONS)
rtsp_send(rp->hc, RTSP_CMD_OPTIONS, rp->path, rp->query, NULL);
- else
+ else if(rp->hc->hc_rtsp_keep_alive_cmd == RTSP_CMD_DESCRIBE) {
+ rtsp_send(rp->hc, RTSP_CMD_DESCRIBE, rp->path, rp->query, NULL);
+ rtsp_get_parameter(rp->hc, "position");
+ } else
return;
mtimer_arm_rel(&rp->alive_timer, iptv_rtsp_alive_cb, im,
sec2mono(MAX(1, (rp->hc->hc_rtp_timeout / 2) - 1)));
{
iptv_mux_t *im = hc->hc_aux;
rtsp_priv_t *rp;
+ url_t url;
int r;
+ char *p;
if (im == NULL) {
/* teardown (or teardown timeout) */
return 0;
}
- if (hc->hc_code != HTTP_STATUS_OK) {
+ if (hc->hc_cmd != RTSP_CMD_DESCRIBE && hc->hc_code != HTTP_STATUS_OK) {
tvherror(LS_IPTV, "invalid error code %d for '%s'", hc->hc_code, im->mm_iptv_url_raw);
return 0;
}
+ if (hc->hc_cmd == RTSP_CMD_DESCRIBE && hc->hc_code != HTTP_STATUS_OK && hc->hc_code != HTTP_STATUS_SEE_OTHER) {
+ tvherror(LS_IPTV, "DESCRIBE request returned an invalid error code (%d) for '%s', "
+ "fall back to GET_PARAMETER in keep alive loop.", hc->hc_code, im->mm_iptv_url_raw);
+ hc->hc_rtsp_keep_alive_cmd = RTSP_CMD_GET_PARAMETER;
+ return 0;
+ }
+
rp = im->im_data;
+ if(rp == NULL)
+ return 0;
switch (hc->hc_cmd) {
+ case RTSP_CMD_DESCRIBE:
+ if(rp->play) {
+ // Already active, most probably a keep-alive response
+ break;
+ }
+ if (hc->hc_code == HTTP_STATUS_SEE_OTHER) {
+ if (!hc->hc_handle_location) {
+ tvherror(LS_IPTV, "received code 303 from RTSP server but redirects disabled '%s'",
+ im->mm_iptv_url_raw);
+ return -1;
+ }
+ // Redirect from RTSP server, parse new location and use that instead
+ p = http_arg_get(&hc->hc_args, "Location");
+ if (p == NULL) {
+ tvherror(LS_IPTV, "received code 303 from RTSP server but no new location given for '%s'",
+ im->mm_iptv_url_raw);
+ return -1;
+ }
+ tvhinfo(LS_IPTV, "received new location from RTSP server '%s' was '%s'", p,
+ im->mm_iptv_url_raw);
+ urlinit(&url);
+ if (urlparse(p, &url) || strncmp(url.scheme, "rtsp", 4) != 0) {
+ tvherror(LS_IPTV, "%s - invalid URL [%s]", im->mm_nicename, p);
+ return -1;
+ }
+ if(rp->path)
+ free(rp->path);
+ if(rp->query)
+ free(rp->query);
+ rp->path = strdup(url.path ? : "");
+ rp->query = strdup(url.query ? : "");
+ urlreset(&url);
+ r = rtsp_describe(hc, rp->path, rp->query);
+ if (r < 0) {
+ tvherror(LS_IPTV, "rtsp: DESCRIBE failed");
+ return -1;
+ }
+ }
+ break;
case RTSP_CMD_SETUP:
r = rtsp_setup_decode(hc, 0);
if (r >= 0) {
+ if(hc->hc_rtp_timeout > 20)
+ hc->hc_rtp_timeout = 20;
rtsp_play(hc, rp->path, rp->query);
rp->play = 1;
}
case RTSP_CMD_PLAY:
// Now let's set peer port for RTCP
// Use the HTTP host for sending RTCP reports, NOT the hc_rtp_dest (which is where the stream is sent)
- if (udp_connect(rp->rtcp_info->connection, "rtcp", hc->hc_host, hc->hc_rtcp_server_port)) {
- tvhwarn(LS_RTSP, "Can't connect to remote, RTCP receiver reports won't be sent");
+ if (im->mm_iptv_ret_url) {
+ if (rtcp_connect(&im->im_rtcp_info, im->mm_iptv_ret_url, NULL, 0,
+ im->mm_iptv_interface, im->mm_nicename) == 0) {
+ im->im_use_retransmission = 1;
+ }
+ } else if (rtcp_connect(&im->im_rtcp_info, NULL, hc->hc_host,
+ hc->hc_rtcp_server_port, im->mm_iptv_interface, im->mm_nicename) == 0) {
+ im->im_use_retransmission = 1;
+ }
+ if (rp->start_position == 0) {
+ if (rtsp_play_decode(hc) == 0)
+ rp->position = rp->start_position = hc->hc_rtsp_stream_start;
+ else
+ rp->position = rp->start_position = time(NULL);
+ } else if (rtsp_play_decode(hc) == 0) {
+ rp->position = hc->hc_rtsp_stream_start;
+ tvhdebug(LS_IPTV, "rtsp: position update: %" PRItime_t,
+ hc->hc_rtsp_stream_start);
}
hc->hc_cmd = HTTP_CMD_NONE;
tvh_mutex_lock(&global_lock);
iptv_rtsp_data
( http_client_t *hc, void *buf, size_t len )
{
+ rtsp_priv_t *rp;
iptv_mux_t *im = hc->hc_aux;
+ int r;
if (im == NULL)
return 0;
- if (len > 0)
- tvherror(LS_IPTV, "unknown data %zd received for '%s'", len, im->mm_iptv_url_raw);
+ if (hc->hc_code != HTTP_STATUS_OK)
+ return 0;
+ rp = (rtsp_priv_t*) im->im_data;
+
+ switch (hc->hc_cmd) {
+ case RTSP_CMD_DESCRIBE:
+ if (rp == NULL)
+ break;
+ if (rtsp_describe_decode(hc, buf, len) >= 0) {
+ if(rp->range_start == 0)
+ rp->range_start = hc->hc_rtsp_range_start;
+ rp->range_end = hc->hc_rtsp_range_end;
+ tvhdebug(LS_IPTV, "rtsp: buffer update, start: %" PRItime_t ", end: %" PRItime_t,
+ rp->range_start, rp->range_end);
+ }
+ if(rp->play) {
+ // Already active, most probably a keep-alive response
+ break;
+ }
+ r = rtsp_setup(hc, rp->path, rp->query, NULL,
+ ntohs(IP_PORT(im->mm_iptv_connection->ip)),
+ ntohs(IP_PORT(im->im_rtcp_info.connection->ip)));
+ if (r < 0) {
+ udp_close(im->im_rtcp_info.connection);
+ udp_close(im->mm_iptv_connection);
+ http_client_close(hc);
+ return -1;
+ }
+ udp_multirecv_init(&im->im_um1, IPTV_PKTS, IPTV_PKT_PAYLOAD);
+ udp_multirecv_init(&im->im_um2, IPTV_PKTS, IPTV_PKT_PAYLOAD);
+ sbuf_alloc_(&im->im_temp_buffer, IPTV_BUF_SIZE);
+ break;
+ case RTSP_CMD_GET_PARAMETER:
+ if (rp == NULL)
+ break;
+ // Generic position update
+ if (strncmp(buf, "position", 8) == 0) {
+ rp->position = strtoumax(buf + 10, NULL, 10);
+ tvhdebug(LS_IPTV, "rtsp: position update: %" PRItime_t, rp->position);
+ }
+ break;
+ default:
+ if (len > 0) {
+ tvherror(LS_IPTV, "unknown data %zd received for '%s':\n%s", len,
+ im->mm_iptv_url_raw, (char* )buf);
+ }
+ }
return 0;
}
hc->hc_hdr_received = iptv_rtsp_header;
hc->hc_data_received = iptv_rtsp_data;
hc->hc_handle_location = 1; /* allow redirects */
- hc->hc_rtsp_keep_alive_cmd = RTSP_CMD_GET_PARAMETER; /* start keep alive loop with GET_PARAMETER */
+ hc->hc_rtsp_keep_alive_cmd = RTSP_CMD_DESCRIBE; /* start keep alive loop with DESCRIBE */
http_client_register(hc); /* register to the HTTP thread */
- r = rtsp_setup(hc, u->path, u->query, NULL,
- ntohs(IP_PORT(rtp->ip)),
- ntohs(IP_PORT(rtcp->ip)));
+ r = rtsp_describe(hc, u->path, u->query);
if (r < 0) {
- udp_close(rtcp);
- udp_close(rtp);
- http_client_close(hc);
+ tvherror(LS_IPTV, "rtsp: DESCRIBE failed");
return SM_CODE_TUNING_FAILED;
}
-
rp = calloc(1, sizeof(*rp));
- rp->rtcp_info = calloc(1, sizeof(iptv_rtcp_info_t));
- rtcp_init(rp->rtcp_info);
- rp->rtcp_info->connection = rtcp;
+ rtcp_init(&im->im_rtcp_info);
+ im->im_rtcp_info.connection = rtcp;
rp->hc = hc;
- udp_multirecv_init(&rp->um, IPTV_PKTS, IPTV_PKT_PAYLOAD);
rp->path = strdup(u->path ?: "");
rp->query = strdup(u->query ?: "");
im->im_data = rp;
im->mm_iptv_fd = rtp->fd;
im->mm_iptv_connection = rtp;
- im->mm_iptv_fd2 = rtcp->fd;
- im->mm_iptv_connection2 = rtcp;
-
+ im->im_rtcp_info.connection_fd = rtcp->fd;
+ im->im_rtcp_info.connection = rtcp;
+ im->mm_iptv_rtp_seq = -1;
return 0;
}
rtsp_teardown(rp->hc, rp->path, "");
tvh_mutex_unlock(&iptv_lock);
mtimer_disarm(&rp->alive_timer);
- udp_multirecv_free(&rp->um);
+ udp_multirecv_free(&im->im_um1);
+ udp_multirecv_free(&im->im_um2);
if (!play)
http_client_close(rp->hc);
free(rp->path);
free(rp->query);
- rtcp_destroy(rp->rtcp_info);
- free(rp->rtcp_info);
+ rtcp_destroy(&im->im_rtcp_info);
free(rp);
tvh_mutex_lock(&iptv_lock);
}
static void
iptv_rtp_header_callback ( iptv_mux_t *im, uint8_t *rtp, int len )
{
- rtsp_priv_t *rp = im->im_data;
- iptv_rtcp_info_t *rtcp_info = rp->rtcp_info;
+ rtcp_t *rtcp_info = &im->im_rtcp_info;
ssize_t hlen;
/* Basic headers checks */
static ssize_t
iptv_rtsp_read ( iptv_input_t *mi, iptv_mux_t *im )
{
- rtsp_priv_t *rp = im->im_data;
- udp_multirecv_t *um = &rp->um;
ssize_t r;
- uint8_t buf[1500];
-
- /* RTCP - ignore all incoming packets for now */
- do {
- r = recv(im->mm_iptv_fd2, buf, sizeof(buf), MSG_DONTWAIT);
- } while (r > 0);
-
- r = iptv_rtp_read(im, um, iptv_rtp_header_callback);
+ if (im->mm_iptv_send_reports) {
+ uint8_t buf[1500];
+ /* RTCP - ignore all incoming packets for now */
+ do {
+ r = recv(im->im_rtcp_info.connection_fd, buf, sizeof(buf), MSG_DONTWAIT);
+ } while (r > 0);
+ r = iptv_rtp_read(im, iptv_rtp_header_callback);
+ } else
+ r = iptv_rtp_read(im, NULL);
if (r < 0 && ERRNO_AGAIN(errno))
r = 0;
return r;
}
/*
- * Initialise RTSP handler
+ * Send the status message
*/
+#if ENABLE_TIMESHIFT
+static void rtsp_timeshift_fill_status(rtsp_st_t *ts, rtsp_priv_t *rp,
+ timeshift_status_t *status) {
+ int64_t start, end, current;
+
+ if (rp == NULL) {
+ start = 0;
+ end = 3600;
+ current = 0;
+ } else {
+ start = 0;
+ end = rp->range_end - rp->range_start;
+ current = rp->position - rp->range_start;
+ }
+ status->full = 0;
+ tvhdebug(LS_TIMESHIFT,
+ "remote ts status start %"PRId64" end %"PRId64 " current %"PRId64, start, end,
+ current);
+
+ status->shift = ts_rescale_inv(current, 1);
+ status->pts_start = ts_rescale_inv(start, 1);
+ status->pts_end = ts_rescale_inv(end, 1);
+}
+
+static void rtsp_timeshift_status
+ ( rtsp_st_t *pd, rtsp_priv_t *rp )
+{
+ streaming_message_t *tsm, *tsm2;
+ timeshift_status_t *status;
+
+ status = calloc(1, sizeof(timeshift_status_t));
+ rtsp_timeshift_fill_status(pd, rp, status);
+ tsm = streaming_msg_create_data(SMT_TIMESHIFT_STATUS, status);
+ tsm2 = streaming_msg_clone(tsm);
+ streaming_target_deliver2(pd->output, tsm);
+ streaming_target_deliver2(pd->tsfix, tsm2);
+}
+
+void *rtsp_status_thread(void *p) {
+ int64_t mono_now, mono_last_status = 0;
+ rtsp_st_t *pd = p;
+ rtsp_priv_t *rp;
+
+ while (pd->run) {
+ mono_now = getfastmonoclock();
+ if(pd->im == NULL)
+ continue;
+ rp = (rtsp_priv_t*) pd->im->im_data;
+ if(rp == NULL || !pd->rtsp_input_start)
+ continue;
+ if (mono_now >= (mono_last_status + sec2mono(1))) {
+ // In case no buffer updates available assume the buffer is being filled
+ if(rp->hc && rp->hc->hc_rtsp_keep_alive_cmd != RTSP_CMD_DESCRIBE)
+ rp->range_end++;
+ rtsp_timeshift_status(pd, rp);
+ mono_last_status = mono_now;
+ }
+ }
+ return NULL;
+}
+
+static void rtsp_input(void *opaque, streaming_message_t *sm) {
+ int type = sm->sm_type;
+ rtsp_st_t *pd = (rtsp_st_t*) opaque;
+ iptv_mux_t *mux;
+ streaming_skip_t *data;
+ rtsp_priv_t *rp;
+
+ if(pd == NULL)
+ return;
+
+ switch (type) {
+ case SMT_GRACE:
+ if (sm->sm_s != NULL)
+ pd->im = (iptv_mux_t*) ((mpegts_service_t*) sm->sm_s)->s_dvb_mux;
+ streaming_target_deliver2(pd->output, sm);
+ break;
+ case SMT_START:
+ pd->rtsp_input_start = 1;
+ streaming_target_deliver2(pd->output, sm);
+ break;
+ case SMT_SKIP:
+ mux = (iptv_mux_t*) pd->im;
+ if (mux == NULL || mux->im_data == NULL)
+ break;
+ rp = (rtsp_priv_t*) mux->im_data;
+ if (rp->start_position == 0)
+ rp->start_position = rp->hc->hc_rtsp_stream_start;
+ rtsp_pause(rp->hc, rp->path, rp->query);
+ mux->mm_iptv_rtp_seq = -1;
+ data = (streaming_skip_t*) sm->sm_data;
+ rtsp_set_position(rp->hc,
+ rp->range_start + ts_rescale(data->time, 1));
+ tvhinfo(LS_IPTV, "rtsp: skip: %" PRItime_t " + %" PRItime_t, rp->range_start,
+ ts_rescale(data->time, 1));
+ streaming_msg_free(sm);
+ break;
+ case SMT_SPEED:
+ mux = (iptv_mux_t*) pd->im;
+ if (mux == NULL || mux->im_data == NULL)
+ break;
+ rp = (rtsp_priv_t*) mux->im_data;
+ tvhinfo(LS_IPTV, "rtsp: set speed: %i", sm->sm_code);
+ if (sm->sm_code == 0) {
+ rtsp_pause(rp->hc, rp->path, rp->query);
+ } else {
+ rtsp_set_speed(rp->hc, sm->sm_code / 100);
+ }
+ streaming_msg_free(sm);
+ break;
+ case SMT_EXIT:
+ pd->run = 0;
+ streaming_target_deliver2(pd->output, sm);
+ break;
+ default:
+ streaming_target_deliver2(pd->output, sm);
+ }
+}
+
+static htsmsg_t*
+rtsp_input_info(void *opaque, htsmsg_t *list) {
+
+ return list;
+}
+
+static streaming_ops_t rtsp_input_ops =
+{ .st_cb = rtsp_input, .st_info = rtsp_input_info };
+
+streaming_target_t* rtsp_st_create(streaming_target_t *out, profile_chain_t *prch) {
+ rtsp_st_t *h = calloc(1, sizeof(rtsp_st_t));
+
+ h->output = out;
+ h->tsfix = prch->prch_share;
+ h->run = 1;
+ tvh_thread_create(&h->st_thread, NULL, rtsp_status_thread, h, "rtsp-st");
+ streaming_target_init(&h->input, &rtsp_input_ops, h, 0);
+
+ return &h->input;
+}
+
+void rtsp_st_destroy(streaming_target_t *st) {
+ rtsp_st_t *h = (rtsp_st_t*)st;
+ h->run = 0;
+ free(st);
+}
+#endif
+/*
+ * Initialise RTSP handler
+ */
void
iptv_rtsp_init ( void )
{
#include "tvheadend.h"
#include "iptv_private.h"
+#include "iptv_rtcp.h"
#include <sys/socket.h>
#include <sys/types.h>
( iptv_input_t *mi, iptv_mux_t *im, const char *raw, const url_t *url )
{
udp_connection_t *conn;
- udp_multirecv_t *um;
+ udp_multirecv_init(&im->im_um1, IPTV_PKTS, IPTV_PKT_PAYLOAD);
/* Note: url->user is used for specifying multicast source address (SSM)
here. The URL format is rtp://<srcaddr>@<grpaddr>:<port> */
if (conn == NULL)
return -1;
- /* Done */
im->mm_iptv_fd = conn->fd;
im->mm_iptv_connection = conn;
- um = calloc(1, sizeof(*um));
- udp_multirecv_init(um, IPTV_PKTS, IPTV_PKT_PAYLOAD);
- im->im_data = um;
+ /* Setup the RTCP Retransmission connection when configured */
+ rtcp_init(&im->im_rtcp_info);
+ if(im->mm_iptv_ret_url && rtcp_connect(&im->im_rtcp_info, im->mm_iptv_ret_url,
+ NULL, 0, im->mm_iptv_interface, im->mm_nicename) == 0) {
+ im->im_use_retransmission = 1;
+ udp_multirecv_init(&im->im_um2, IPTV_PKTS, IPTV_PKT_PAYLOAD);
+ sbuf_reset_and_alloc(&im->im_temp_buffer, IPTV_BUF_SIZE);
+ }
+
+ im->mm_iptv_rtp_seq = -1;
iptv_input_mux_started(mi, im, 1);
return 0;
iptv_udp_stop
( iptv_input_t *mi, iptv_mux_t *im )
{
- udp_multirecv_t *um = im->im_data;
-
im->im_data = NULL;
tvh_mutex_unlock(&iptv_lock);
- udp_multirecv_free(um);
- free(um);
+ udp_multirecv_free(&im->im_um1);
+ if(&im->im_um2) {
+ udp_multirecv_free(&im->im_um2);
+ }
+ if(&im->im_temp_buffer)
+ sbuf_free(&im->im_temp_buffer);
tvh_mutex_lock(&iptv_lock);
}
{
int i, n;
struct iovec *iovec;
- udp_multirecv_t *um = im->im_data;
ssize_t res = 0;
- n = udp_multirecv_read(um, im->mm_iptv_fd, IPTV_PKTS, &iovec);
+ n = udp_multirecv_read(&im->im_um1, im->mm_iptv_fd, IPTV_PKTS, &iovec);
if (n < 0)
return -1;
}
ssize_t
-iptv_rtp_read ( iptv_mux_t *im, udp_multirecv_t *um,
- void (*pkt_cb)(iptv_mux_t *im, uint8_t *pkt, int len) )
+iptv_rtp_read(iptv_mux_t *im, void (*pkt_cb)(iptv_mux_t *im, uint8_t *pkt, int len))
{
ssize_t len, hlen;
uint8_t *rtp;
- int i, n;
- uint32_t seq, nseq, unc = 0;
+ int i, n = 0;
+ uint32_t seq, nseq, oseq, ssrc, unc = 0;
struct iovec *iovec;
ssize_t res = 0;
+ char is_ret_buffer = 0;
+
+ if (im->im_use_retransmission) {
+ n = udp_multirecv_read(&im->im_um2, im->im_rtcp_info.connection_fd, IPTV_PKTS, &iovec);
+ if (n > 0 && !im->im_is_ce_detected) {
+ tvhwarn(LS_IPTV, "RET receiving %d unexpected packets for %s", n,
+ im->mm_nicename);
+ }
+ else if (n > 0) {
+ tvhtrace(LS_IPTV, "RET receiving %d packets for %s", n, im->mm_nicename);
+ is_ret_buffer = 1;
+ im->im_rtcp_info.ce_cnt -= n;
+ im->im_rtcp_info.last_received_sequence += n;
+ } else {
+ n = udp_multirecv_read(&im->im_um1, im->mm_iptv_fd, IPTV_PKTS, &iovec);
+ }
+ } else
+ n = udp_multirecv_read(&im->im_um1, im->mm_iptv_fd, IPTV_PKTS, &iovec);
- n = udp_multirecv_read(um, im->mm_iptv_fd, IPTV_PKTS, &iovec);
if (n < 0)
return -1;
if ((rtp[0] & 0xC0) != 0x80)
continue;
- /* MPEG-TS */
- if ((rtp[1] & 0x7F) != 33)
+ /* MPEG-TS or DynamicRTP */
+ if ((rtp[1] & 0x7F) != 33 && (rtp[1] & 0x7F) != 96)
continue;
/* Header length (4bytes per CSRC) */
hlen = ((rtp[0] & 0xf) * 4) + 12;
+ if (is_ret_buffer) {
+ /* Skip OSN (original sequence number) field for RET packets */
+ hlen += 2;
+ }
if (rtp[0] & 0x10) {
if (len < hlen+4)
continue;
len -= hlen;
- /* Use uncorrectable value to notify RTP delivery issues */
nseq = (rtp[2] << 8) | rtp[3];
- if (seq == -1)
+ if (seq == -1 || nseq == 0)
seq = nseq;
- else if (((seq + 1) & 0xffff) != nseq) {
- unc += (len / 188) * (uint32_t)((uint16_t)nseq-(uint16_t)(seq+1));
- tvhtrace(LS_IPTV, "RTP discontinuity (%i != %i)", seq + 1, nseq);
+ /* Some sources will send the retransmission packets as part of the regular
+ * stream, we can only detect them by checking for the expected seqn. */
+ if (im->im_is_ce_detected && !is_ret_buffer && nseq == im->im_rtcp_info.last_received_sequence) {
+ is_ret_buffer = 1;
+ im->im_rtcp_info.ce_cnt --;
+ im->im_rtcp_info.last_received_sequence ++;
}
- seq = nseq;
- /* Move data */
- sbuf_append(&im->mm_iptv_buffer, rtp + hlen, len);
+ if (!is_ret_buffer) {
+ if(seq != nseq && ((seq + 1) & 0xffff) != nseq) {
+ unc += (len / 188)
+ * (uint32_t) ((uint16_t) nseq - (uint16_t) (seq + 1));
+ ssrc = (rtp[8] << 24) | (rtp[9] << 16) | (rtp[10] << 8) | rtp[11];
+ /* Use uncorrectable value to notify RTP delivery issues */
+ tvhwarn(LS_IPTV, "RTP discontinuity for %s SSRC: 0x%x (%i != %i)", im->mm_nicename,
+ ssrc, seq + 1, nseq);
+ if (im->im_use_retransmission && !im->im_is_ce_detected) {
+ im->im_is_ce_detected = 1;
+ rtcp_send_nak(&im->im_rtcp_info, ssrc, seq + 1, nseq - seq - 1);
+ }
+ }
+ seq = nseq;
+ }
+
+ if (im->im_is_ce_detected) {
+ /* Move data to RET buffer */
+ ssrc = (rtp[8] << 24) | (rtp[9] << 16) | (rtp[10] << 8) | rtp[11];
+ if (is_ret_buffer) {
+ oseq = (rtp[12] << 8) | rtp[13];
+ tvhtrace(LS_IPTV, "RTP RET received SEQ %i OSN %i for SSRC: 0x%x", nseq, oseq, ssrc);
+ sbuf_append(&im->mm_iptv_buffer, rtp + hlen, len);
+ } else
+ sbuf_append(&im->im_temp_buffer, rtp + hlen, len);
+ /* If we received all RET packets dump the temporary buffer back into the iptv buffer,
+ * or if it takes too long just continue as normal. RET packet rate can be a lot slower
+ * then the main stream so this can take some time. */
+ if(im->im_rtcp_info.ce_cnt > 0 && im->im_temp_buffer.sb_ptr > 1600 * IPTV_PKT_PAYLOAD) {
+ tvhwarn(LS_IPTV, "RTP RET waiting for packets timeout for SSRC: 0x%x", ssrc);
+ im->im_rtcp_info.ce_cnt = 0;
+ }
+ if(im->im_rtcp_info.ce_cnt <= 0) {
+ im->im_rtcp_info.ce_cnt = 0;
+ im->im_is_ce_detected = 0;
+ sbuf_append_from_sbuf(&im->mm_iptv_buffer, &im->im_temp_buffer);
+ sbuf_reset_and_alloc(&im->im_temp_buffer, IPTV_BUF_SIZE);
+ }
+ } else {
+ /* Move data */
+ sbuf_append(&im->mm_iptv_buffer, rtp + hlen, len);
+ }
res += len;
}
static ssize_t
iptv_udp_rtp_read ( iptv_input_t *mi, iptv_mux_t *im )
{
- udp_multirecv_t *um = im->im_data;
-
- return iptv_rtp_read(im, um, NULL);
+ return iptv_rtp_read(im, NULL);
}
/*
int tf_wait_for_video;
int64_t tf_tsref;
int64_t tf_start_time;
+ int64_t dts_offset;
+ int dts_offset_apply;
struct th_pktref_queue tf_ptsq;
struct th_pktref_queue tf_backlog;
pkt->pkt_dts &= PTS_MASK;
/* Subtract the transport wide start offset */
- dts = pts_diff(ref, pkt->pkt_dts);
+ if (!tf->dts_offset_apply)
+ dts = pts_diff(ref, pkt->pkt_dts);
+ else
+ dts = pts_diff(ref, pkt->pkt_dts + tf->dts_offset);
if (tfs->tfs_last_dts_norm == PTS_UNSET) {
if (dts < 0 || pkt->pkt_err) {
}
tfs->tfs_bad_dts++;
if (tfs->tfs_bad_dts < 5) {
- tvherror(LS_TSFIX,
+ tvhwarn(LS_TSFIX,
"transport stream %s, DTS discontinuity. "
"DTS = %" PRId64 ", last = %" PRId64,
streaming_component_type2txt(tfs->tfs_type),
tsfix_input(void *opaque, streaming_message_t *sm)
{
tsfix_t *tf = opaque;
+ timeshift_status_t *status;
switch(sm->sm_type) {
case SMT_PACKET:
streaming_msg_free(sm);
return;
}
+
break;
case SMT_STOP:
tsfix_stop(tf);
break;
+ case SMT_TIMESHIFT_STATUS:
+ if(tf->dts_offset == PTS_UNSET) {
+ status = sm->sm_data;
+ tf->dts_offset = status->shift;
+ }
+ streaming_msg_free(sm);
+ return;
+
+ case SMT_SKIP:
+ if(tf->dts_offset != PTS_UNSET) {
+ tf->dts_offset_apply = 1;
+ }
+ break;
case SMT_GRACE:
case SMT_EXIT:
case SMT_NOSTART_WARN:
case SMT_MPEGTS:
case SMT_SPEED:
- case SMT_SKIP:
- case SMT_TIMESHIFT_STATUS:
+
break;
}
tf->tf_output = output;
tf->tf_start_time = mclk();
-
+ tf->dts_offset = PTS_UNSET;
streaming_target_init(&tf->tf_input, &tsfix_input_ops, tf, 0);
return &tf->tf_input;
}
#endif
#if ENABLE_TIMESHIFT
#include "timeshift.h"
+#include "input/mpegts/iptv/iptv_private.h"
#endif
#include "dvr/dvr.h"
}
sm2 = streaming_msg_create_data(SMT_START,
streaming_start_copy(prsh->prsh_start_msg));
+ if (sm)
+ sm2->sm_s = sm->sm_s;
streaming_target_deliver(prch->prch_post_share, sm2);
prch->prch_start_pending = 0;
}
*/
int
profile_chain_work(profile_chain_t *prch, struct streaming_target *dst,
- uint32_t timeshift_period, int flags)
+ uint32_t timeshift_period, profile_work_flags_t flags)
{
profile_t *pro = prch->prch_pro;
if (pro && pro->pro_work)
timeshift_destroy(prch->prch_timeshift);
prch->prch_timeshift = NULL;
}
+ if(prch->prch_rtsp) {
+ rtsp_st_destroy(prch->prch_rtsp);
+ prch->prch_rtsp = NULL;
+ }
#endif
if (prch->prch_gh) {
globalheaders_destroy(prch->prch_gh);
static int
profile_htsp_work(profile_chain_t *prch,
streaming_target_t *dst,
- uint32_t timeshift_period, int flags)
+ uint32_t timeshift_period, profile_work_flags_t flags)
{
profile_sharer_t *prsh;
if (!prsh)
goto fail;
+ if (!prsh->prsh_tsfix)
+ prsh->prsh_tsfix = tsfix_create(&prsh->prsh_input);
+ prch->prch_share = prsh->prsh_tsfix;
+
#if ENABLE_TIMESHIFT
- if (timeshift_period > 0)
- dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period);
+ if (flags & PROFILE_WORK_REMOTE_TS) {
+ dst = prch->prch_rtsp = rtsp_st_create(dst, prch);
+ } else {
+ if (timeshift_period > 0)
+ dst = prch->prch_timeshift = timeshift_create(dst, timeshift_period);
+ }
#endif
dst = prch->prch_gh = globalheaders_create(dst);
if (profile_sharer_create(prsh, prch, dst))
goto fail;
- if (!prsh->prsh_tsfix)
- prsh->prsh_tsfix = tsfix_create(&prsh->prsh_input);
-
- prch->prch_share = prsh->prsh_tsfix;
prch->prch_flags = SUBSCRIPTION_PACKET;
streaming_target_init(&prch->prch_input,
prsh->prsh_do_queue ?
static int
profile_transcode_work(profile_chain_t *prch,
streaming_target_t *dst,
- uint32_t timeshift_period, int flags)
+ uint32_t timeshift_period, profile_work_flags_t flags)
{
profile_sharer_t *prsh;
profile_transcode_t *pro = (profile_transcode_t *)prch->prch_pro;
PROFILE_SVF_UHD
} profile_svfilter_t;
+typedef enum {
+ PROFILE_WORK_NONE = 0,
+ PROFILE_WORK_REMOTE_TS
+} profile_work_flags_t;
+
struct profile;
struct muxer;
struct streaming_target;
struct streaming_target *prch_tsfix;
#if ENABLE_TIMESHIFT
struct streaming_target *prch_timeshift;
+ struct streaming_target *prch_rtsp;
#endif
struct streaming_target prch_input;
struct streaming_target *prch_share;
void (*pro_conf_changed)(struct profile *pro);
int (*pro_work)(profile_chain_t *prch, struct streaming_target *dst,
- uint32_t timeshift_period, int flags);
+ uint32_t timeshift_period, profile_work_flags_t flags);
int (*pro_reopen)(profile_chain_t *prch, muxer_config_t *m_cfg,
muxer_hints_t *hints, int flags);
int (*pro_open)(profile_chain_t *prch, muxer_config_t *m_cfg,
}
int profile_chain_work(profile_chain_t *prch, struct streaming_target *dst,
- uint32_t timeshift_period, int flags);
+ uint32_t timeshift_period, profile_work_flags_t flags);
int profile_chain_reopen(profile_chain_t *prch,
muxer_config_t *m_cfg,
muxer_hints_t *hints, int flags);
(hc->hc_port != 554 ? 7 : 0) +
(path ? strlen(path) : 1) + 1;
char *buf = alloca(blen);
- char buf2[11];
+ char buf2[64];
char buf_body[size + 3];
if (hc->hc_rtsp_session) {
char *argv[32], *argv2[2], *p;
int i, n, j;
-#if 0
- { http_arg_t *ra;
- TAILQ_FOREACH(ra, &hc->hc_args, link)
- printf(" %s: %s\n", ra->key, ra->val); }
-#endif
rtsp_clear_session(hc);
if (hc->hc_code != 200)
return -EIO;
}
}
} else if (!strcasecmp(argv[0], "RTP/AVP") ||
- !strcasecmp(argv[0], "RTP/AVP/UDP")) {
+ !strcasecmp(argv[0], "RTP/AVP/UDP") ||
+ !strcasecmp(argv[0], "RTP/AVPF/UDP")) {
if (n < 3)
return -EIO;
hc->hc_rtp_multicast = strcasecmp(argv[1], "multicast") == 0;
return HTTP_CON_OK;
}
+int
+rtsp_play_decode( http_client_t *hc )
+{
+ char *argv[32], *p;
+ int n;
+
+ if (hc->hc_code != 200)
+ return -EIO;
+ p = http_arg_get(&hc->hc_args, "Range");
+ if (p == NULL)
+ return -EIO;
+ n = http_tokenize(p, argv, 32, '=');
+ if (n < 1 || strncmp(argv[0], "npt", 3))
+ return -EIO;
+ hc->hc_rtsp_stream_start = strtoumax(argv[1], NULL, 10);
+ p = http_arg_get(&hc->hc_args, "Scale");
+ if (p == NULL)
+ return -EIO;
+ hc->hc_rtsp_scale = strtof(p, NULL);
+ return 0;
+}
+
int
rtsp_setup( http_client_t *hc,
const char *path, const char *query,
snprintf(transport, sizeof(transport),
"RTP/AVP;multicast;destination=%s;ttl=1;client_port=%i-%i",
multicast_addr, rtp_port, rtcp_port);
+ } else if(hc->hc_rtp_avpf) {
+ snprintf(transport, sizeof(transport),
+ "RTP/AVPF/UDP;unicast;client_port=%i-%i", rtp_port, rtcp_port);
} else {
snprintf(transport, sizeof(transport),
"RTP/AVP;unicast;client_port=%i-%i", rtp_port, rtcp_port);
return rtsp_send(hc, RTSP_CMD_SETUP, path, query, &h);
}
-int
-rtsp_describe_decode( http_client_t *hc )
-{
- http_arg_t *ra;
+int rtsp_describe_decode(http_client_t *hc, const char *buf, size_t len) {
+ const char *p;
+ char transport[64];
+ int n, t, transport_type;
- /* TODO: Probably rewrite the data to the htsmsg tree ? */
- printf("describe: %i\n", hc->hc_code);
- TAILQ_FOREACH(ra, &hc->hc_args, link)
- printf(" %s: %s\n", ra->key, ra->val);
- printf("data:\n%s\n", hc->hc_data);
+ p = http_arg_get(&hc->hc_args, "Content-Type");
+ if (p == NULL || strncmp(p, "application/sdp", 15)) {
+ tvhwarn(LS_RTSP, "describe: unkwown response content");
+ return -EIO;
+ }
+ for (n = 0; n < len; n++) {
+ p = buf + n;
+ if (strncmp(p, "a=range", 7) == 0) {
+ // Parse remote timeshift buffer info
+ if (strncmp(p + 8, "npt=", 4) == 0) {
+ sscanf(p + 8, "npt=%" PRItime_t "-%" PRItime_t, &hc->hc_rtsp_range_start,
+ &hc->hc_rtsp_range_end);
+ }
+ }
+ if (strncmp(p, "m=video", 7) == 0) {
+ // Parse and select RTP/AVPF stream if available for retransmission support
+ if (sscanf(p, "m=video %d %s %d\n", &t, transport, &transport_type) == 3) {
+ tvhtrace(LS_RTSP, "describe: found transport: %d %s %d", t, transport,
+ transport_type);
+ if (strncmp(transport, "RTP/AVPF", 8) == 0) {
+ hc->hc_rtp_avpf = 1;
+ }
+ }
+ }
+ }
return HTTP_CON_OK;
}
http_arg_set(&hdr, "Content-Type", "text/parameters");
return rtsp_send_ext(hc, RTSP_CMD_GET_PARAMETER, NULL, NULL, &hdr, parameter, strlen(parameter));
}
+
+int
+rtsp_set_speed( http_client_t *hc, float speed ) {
+ char buf[64];
+ http_arg_list_t h;
+ http_arg_init(&h);
+ snprintf(buf, sizeof(buf), "%.2f", speed);
+ http_arg_set(&h, "Scale", buf);
+ return rtsp_send(hc, RTSP_CMD_PLAY, NULL, NULL, &h);
+}
+
+int
+rtsp_set_position( http_client_t *hc, time_t position ) {
+ char buf[64];
+ http_arg_list_t h;
+ http_arg_init(&h);
+ snprintf(buf, sizeof(buf), "npt=%" PRItime_t "-", position);
+ http_arg_set(&h, "Range", buf);
+ return rtsp_send(hc, RTSP_CMD_PLAY, NULL, NULL, &h);
+}
streaming_message_t *
streaming_msg_create(streaming_message_type_t type)
{
- streaming_message_t *sm = malloc(sizeof(streaming_message_t));
+ streaming_message_t *sm = calloc(1, sizeof(streaming_message_t));
memoryinfo_alloc(&streaming_msg_memoryinfo, sizeof(*sm));
sm->sm_type = type;
#if ENABLE_TIMESHIFT
{
if (atomic_set(&t->s_pending_restart, 0))
service_restart_streams(t);
+ sm->sm_s = t;
streaming_pad_deliver(&t->s_streaming_pad, sm);
}
struct streaming_message {
TAILQ_ENTRY(streaming_message) sm_link;
streaming_message_type_t sm_type;
+ service_t *sm_s;
#if ENABLE_TIMESHIFT
int64_t sm_time;
#endif