From 493f458ce6ab2777159f174911bfdfdba579f8d5 Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Fri, 13 Nov 2015 11:25:19 +0100 Subject: [PATCH] IPTV: add pause support for the correct input data timing, file:// seems working, fixes #3240 --- src/input/mpegts.h | 2 +- src/input/mpegts/iptv/iptv.c | 95 +++++++++++++++- src/input/mpegts/iptv/iptv_file.c | 104 ++++++++++++------ src/input/mpegts/iptv/iptv_http.c | 18 ++- src/input/mpegts/iptv/iptv_pipe.c | 1 + src/input/mpegts/iptv/iptv_private.h | 10 +- src/input/mpegts/iptv/iptv_udp.c | 2 + src/input/mpegts/linuxdvb/linuxdvb_frontend.c | 2 +- src/input/mpegts/mpegts_input.c | 26 +++-- src/input/mpegts/satip/satip_frontend.c | 4 +- src/input/mpegts/tsfile/tsfile_input.c | 4 +- .../mpegts/tvhdhomerun/tvhdhomerun_frontend.c | 2 +- src/packet.h | 12 ++ src/parsers/parsers.c | 3 - src/plumbing/globalheaders.c | 1 - src/plumbing/tsfix.c | 2 - src/tvheadend.h | 1 + 17 files changed, 227 insertions(+), 62 deletions(-) diff --git a/src/input/mpegts.h b/src/input/mpegts.h index d2b51ba90..6253f3393 100644 --- a/src/input/mpegts.h +++ b/src/input/mpegts.h @@ -894,7 +894,7 @@ void mpegts_mux_update_pids ( mpegts_mux_t *mm ); void mpegts_input_recv_packets (mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb, - int64_t *pcr, uint16_t *pcr_pid); + int64_t *pcr_first, int64_t *pcr_last, uint16_t *pcr_pid); int mpegts_input_get_weight ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags ); int mpegts_input_get_priority ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags ); diff --git a/src/input/mpegts/iptv/iptv.c b/src/input/mpegts/iptv/iptv.c index 23f1dc401..ed8a5d62c 100644 --- a/src/input/mpegts/iptv/iptv.c +++ b/src/input/mpegts/iptv/iptv.c @@ -24,6 +24,7 @@ #include "htsstr.h" #include "channels.h" #include "bouquet.h" +#include "packet.h" #include #include @@ -368,6 +369,8 @@ iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) pthread_mutex_lock(&iptv_lock); + gtimer_disarm(&im->im_pause_timer); + /* Stop */ if (im->im_handler->stop) im->im_handler->stop(im); @@ -404,10 +407,20 @@ iptv_input_display_name ( mpegts_input_t *mi, char *buf, size_t len ) snprintf(buf, len, "IPTV"); } +static void +iptv_input_unpause ( void *aux ) +{ + iptv_mux_t *im = aux; + pthread_mutex_lock(&iptv_lock); + tvhtrace("iptv-pcr", "unpause timer callback"); + im->im_handler->pause(im, 0); + pthread_mutex_unlock(&iptv_lock); +} + static void * iptv_input_thread ( void *aux ) { - int nfds; + int nfds, r; ssize_t n; iptv_mux_t *im; tvhpoll_event_t ev; @@ -425,6 +438,7 @@ iptv_input_thread ( void *aux ) continue; } im = ev.data.ptr; + r = 0; pthread_mutex_lock(&iptv_lock); @@ -436,20 +450,51 @@ iptv_input_thread ( void *aux ) im->im_handler->stop(im); break; } - iptv_input_recv_packets(im, n); + r = iptv_input_recv_packets(im, n); + if (r == 1) + im->im_handler->pause(im, 1); } pthread_mutex_unlock(&iptv_lock); + + if (r == 1) { + pthread_mutex_lock(&global_lock); + if (im->mm_active) + gtimer_arm(&im->im_pause_timer, iptv_input_unpause, im, 1); + pthread_mutex_unlock(&global_lock); + } } return NULL; } void +iptv_input_pause_handler ( iptv_mux_t *im, int pause ) +{ + tvhpoll_event_t ev = { 0 }; + + ev.fd = im->mm_iptv_fd; + ev.events = TVHPOLL_IN; + ev.data.ptr = im; + if (pause) + tvhpoll_rem(iptv_poll, &ev, 1); + else + tvhpoll_add(iptv_poll, &ev, 1); +} + +static inline int +iptv_input_pause_check ( iptv_mux_t *im ) +{ + /* queued more than 3 seconds? trigger the pause */ + return im->im_pcr_end - im->im_pcr_start >= 3000000LL; +} + +int iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len ) { static time_t t1 = 0, t2; iptv_network_t *in = (iptv_network_t*)im->mm_network; mpegts_mux_instance_t *mmi; + int64_t pcr_first = PTS_UNSET, pcr_last = PTS_UNSET, s64; in->in_bps += len * 8; time(&t2); @@ -466,13 +511,50 @@ iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len ) t1 = t2; } - /* Pass on */ + /* Pass on, but with timing */ mmi = im->mm_active; - if (mmi) + if (mmi) { + if (im->im_pcr != PTS_UNSET) { + s64 = getmonoclock() - im->im_pcr_start; + im->im_pcr_start += s64; + im->im_pcr += (((s64 / 10LL) * 9LL) + 4LL) / 10LL; + im->im_pcr &= PTS_MASK; + tvhtrace("iptv-pcr", "pcr: updated %ld, time start %ld", im->im_pcr, im->im_pcr_start); + } + if (iptv_input_pause_check(im)) { + tvhtrace("iptv-pcr", "pcr: paused"); + return 1; + } mpegts_input_recv_packets((mpegts_input_t*)iptv_input, mmi, - &im->mm_iptv_buffer, NULL, NULL); + &im->mm_iptv_buffer, + &pcr_first, &pcr_last, &im->im_pcr_pid); + if (pcr_first != PTS_UNSET && pcr_last != PTS_UNSET) { + if (im->im_pcr == PTS_UNSET) { + s64 = pts_diff(pcr_first, pcr_last); + if (s64 != PTS_UNSET) { + im->im_pcr = pcr_first; + im->im_pcr_start = getmonoclock(); + im->im_pcr_end = im->im_pcr_start + ((s64 * 100LL) + 50LL) / 9LL; + tvhtrace("iptv-pcr", "pcr: first %ld last %ld, time start %ld, end %ld", + pcr_first, pcr_last, im->im_pcr_start, im->im_pcr_end); + } + } else { + s64 = pts_diff(im->im_pcr, pcr_last); + if (s64 != PTS_UNSET) { + im->im_pcr_end = im->im_pcr_start + ((s64 * 100LL) + 50LL) / 9LL; + tvhtrace("iptv-pcr", "pcr: last %ld, time end %ld", pcr_last, im->im_pcr_end); + } + } + if (iptv_input_pause_check(im)) { + tvhtrace("iptv-pcr", "pcr: paused"); + return 1; + } + } + } + return 0; } + int iptv_input_fd_started ( iptv_mux_t *im ) { @@ -519,6 +601,9 @@ iptv_input_mux_started ( iptv_mux_t *im ) /* Allocate input buffer */ sbuf_reset_and_alloc(&im->mm_iptv_buffer, IPTV_BUF_SIZE); + im->im_pcr = PTS_UNSET; + im->im_pcr_pid = MPEGTS_PID_NONE; + if (iptv_input_fd_started(im)) return; diff --git a/src/input/mpegts/iptv/iptv_file.c b/src/input/mpegts/iptv/iptv_file.c index e7cfffda7..f8c43275c 100644 --- a/src/input/mpegts/iptv/iptv_file.c +++ b/src/input/mpegts/iptv/iptv_file.c @@ -25,23 +25,76 @@ #include #include #include +#include + +typedef struct file_priv { + int fd; + int shutdown; + pthread_t tid; + pthread_cond_t cond; +} file_priv_t; /* - * Connect UDP/RTP + * Read thread + */ +static void * +iptv_file_thread ( void *aux ) +{ + iptv_mux_t *im = aux; + file_priv_t *fp = im->im_data; + struct timespec ts; + int r, fd = fp->fd, pause = 0; + char buf[32*1024]; + + pthread_mutex_lock(&iptv_lock); + while (!fp->shutdown && fd > 0) { + while (!fp->shutdown && pause) { + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 1; + if (pthread_cond_timedwait(&fp->cond, &iptv_lock, &ts) == ETIMEDOUT) + break; + } + if (fp->shutdown) + break; + pause = 0; + pthread_mutex_unlock(&iptv_lock); + r = read(fd, buf, sizeof(buf)); + pthread_mutex_lock(&iptv_lock); + if (r == 0) + break; + if (r < 0) { + if (ERRNO_AGAIN(errno)) + continue; + break; + } + sbuf_append(&im->mm_iptv_buffer, buf, r); + if (iptv_input_recv_packets(im, r) == 1) + pause = 1; + } + pthread_mutex_unlock(&iptv_lock); + return NULL; +} + +/* + * Open file */ static int iptv_file_start ( iptv_mux_t *im, const char *raw, const url_t *url ) { - int fd = tvh_open(raw + 7, O_RDONLY | O_DIRECT, 0); + file_priv_t *fp; + int fd = tvh_open(raw + 7, O_RDONLY | O_NONBLOCK, 0); if (fd < 0) { tvherror("iptv", "unable to open file '%s'", raw + 7); return -1; } - im->mm_iptv_fd = fd; - + fp = calloc(1, sizeof(*fp)); + fp->fd = fd; + pthread_cond_init(&fp->cond, NULL); + im->im_data = fp; iptv_input_mux_started(im); + tvhthread_create(&fp->tid, NULL, iptv_file_thread, im, "iptvfile"); return 0; } @@ -49,40 +102,22 @@ static void iptv_file_stop ( iptv_mux_t *im ) { - int rd = im->mm_iptv_fd; + file_priv_t *fp = im->im_data; + int rd = fp->fd; if (rd > 0) close(rd); - im->mm_iptv_fd = -1; -} - -static ssize_t -iptv_file_read ( iptv_mux_t *im ) -{ - int r, fd = im->mm_iptv_fd; - ssize_t res = 0; - - while (fd > 0) { - sbuf_alloc(&im->mm_iptv_buffer, 32*1024); - r = sbuf_read(&im->mm_iptv_buffer, fd); - if (r == 0) { - close(fd); - im->mm_iptv_fd = -1; - break; - } - if (r < 0) { - if (errno == EAGAIN) - break; - if (ERRNO_AGAIN(errno)) - continue; - } - res += r; - } - - return res; + fp->shutdown = 1; + pthread_cond_signal(&fp->cond); + pthread_mutex_unlock(&iptv_lock); + pthread_join(fp->tid, NULL); + pthread_cond_destroy(&fp->cond); + pthread_mutex_lock(&iptv_lock); + free(im->im_data); + im->im_data = NULL; } /* - * Initialise pipe handler + * Initialise file handler */ void @@ -92,8 +127,7 @@ iptv_file_init ( void ) { .scheme = "file", .start = iptv_file_start, - .stop = iptv_file_stop, - .read = iptv_file_read, + .stop = iptv_file_stop }, }; iptv_handler_register(ih, ARRAY_SIZE(ih)); diff --git a/src/input/mpegts/iptv/iptv_http.c b/src/input/mpegts/iptv/iptv_http.c index c76932449..35616e976 100644 --- a/src/input/mpegts/iptv/iptv_http.c +++ b/src/input/mpegts/iptv/iptv_http.c @@ -87,7 +87,8 @@ iptv_http_data tsdebug_write((mpegts_mux_t *)im, buf, len); if (len > 0) - iptv_input_recv_packets(im, len); + if (iptv_input_recv_packets(im, len) == 1) + hc->hc_pause = 1; pthread_mutex_unlock(&iptv_lock); @@ -228,6 +229,19 @@ iptv_http_stop } +/* + * Pause/Unpause + */ +static void +iptv_http_pause + ( iptv_mux_t *im, int pause ) +{ + http_client_t *hc = im->im_data; + + assert(pause == 0); + http_client_unpause(hc); +} + /* * Initialise HTTP handler */ @@ -240,11 +254,13 @@ iptv_http_init ( void ) .scheme = "http", .start = iptv_http_start, .stop = iptv_http_stop, + .pause = iptv_http_pause }, { .scheme = "https", .start = iptv_http_start, .stop = iptv_http_stop, + .pause = iptv_http_pause } }; iptv_handler_register(ih, 2); diff --git a/src/input/mpegts/iptv/iptv_pipe.c b/src/input/mpegts/iptv/iptv_pipe.c index 77eb6e55a..2b9aaac24 100644 --- a/src/input/mpegts/iptv/iptv_pipe.c +++ b/src/input/mpegts/iptv/iptv_pipe.c @@ -165,6 +165,7 @@ iptv_pipe_init ( void ) .start = iptv_pipe_start, .stop = iptv_pipe_stop, .read = iptv_pipe_read, + .pause = iptv_input_pause_handler }, }; iptv_handler_register(ih, ARRAY_SIZE(ih)); diff --git a/src/input/mpegts/iptv/iptv_private.h b/src/input/mpegts/iptv/iptv_private.h index 3b127e09f..715ffc474 100644 --- a/src/input/mpegts/iptv/iptv_private.h +++ b/src/input/mpegts/iptv/iptv_private.h @@ -52,6 +52,7 @@ struct iptv_handler int (*start) ( iptv_mux_t *im, const char *raw, const url_t *url ); void (*stop) ( iptv_mux_t *im ); ssize_t (*read) ( iptv_mux_t *im ); + void (*pause) ( iptv_mux_t *im, int pause ); RB_ENTRY(iptv_handler) link; }; @@ -65,7 +66,8 @@ struct iptv_input int iptv_input_fd_started ( iptv_mux_t *im ); void iptv_input_mux_started ( iptv_mux_t *im ); -void iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len ); +int iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len ); +void iptv_input_pause_handler ( iptv_mux_t *im, int pause ); struct iptv_network { @@ -138,6 +140,12 @@ struct iptv_mux sbuf_t mm_iptv_buffer; iptv_handler_t *im_handler; + gtimer_t im_pause_timer; + + int64_t im_pcr; + int64_t im_pcr_start; + int64_t im_pcr_end; + uint16_t im_pcr_pid; void *im_data; diff --git a/src/input/mpegts/iptv/iptv_udp.c b/src/input/mpegts/iptv/iptv_udp.c index e2c909390..f701c2108 100644 --- a/src/input/mpegts/iptv/iptv_udp.c +++ b/src/input/mpegts/iptv/iptv_udp.c @@ -187,12 +187,14 @@ iptv_udp_init ( void ) .start = iptv_udp_start, .stop = iptv_udp_stop, .read = iptv_udp_read, + .pause = iptv_input_pause_handler }, { .scheme = "rtp", .start = iptv_udp_start, .stop = iptv_udp_stop, .read = iptv_udp_rtp_read, + .pause = iptv_input_pause_handler } }; iptv_handler_register(ih, 2); diff --git a/src/input/mpegts/linuxdvb/linuxdvb_frontend.c b/src/input/mpegts/linuxdvb/linuxdvb_frontend.c index c4ad1fcdf..aae3b756f 100644 --- a/src/input/mpegts/linuxdvb/linuxdvb_frontend.c +++ b/src/input/mpegts/linuxdvb/linuxdvb_frontend.c @@ -1136,7 +1136,7 @@ linuxdvb_frontend_input_thread ( void *aux ) } /* Process */ - mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, &sb, NULL, NULL); + mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, &sb, NULL, NULL, NULL); } sbuf_free(&sb); diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index 87132988b..c58fcea80 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -974,7 +974,7 @@ ts_sync_count ( const uint8_t *tsb, int len ) void mpegts_input_recv_packets ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb, - int64_t *pcr, uint16_t *pcr_pid ) + int64_t *pcr_first, int64_t *pcr_last, uint16_t *pcr_pid ) { int len2 = 0, off = 0; mpegts_packet_t *mp; @@ -1005,17 +1005,29 @@ mpegts_input_recv_packets // require per mmi buffers, where this is generally not required) /* Extract PCR on demand */ - if (pcr && pcr_pid) { - uint8_t *tmp; - for (tmp = tsb + len2 - 188; tmp >= tsb; tmp -= 188) { - uint16_t pid = ((tmp[1] & 0x1f) << 8) | tmp[2]; + if (pcr_first && pcr_last && pcr_pid) { + uint8_t *tmp, *end; + uint16_t pid; + for (tmp = tsb, end = tsb + len2; tmp < end; tmp += 188) { + pid = ((tmp[1] & 0x1f) << 8) | tmp[2]; if (*pcr_pid == MPEGTS_PID_NONE || *pcr_pid == pid) { - if (get_pcr(tmp, pcr)) { - if (*pcr != PTS_UNSET) *pcr_pid = pid; + if (get_pcr(tmp, pcr_first)) { + *pcr_pid = pid; break; } } } + if (*pcr_pid != MPEGTS_PID_NONE) { + for (tmp = tsb + len2 - 188; tmp >= tsb; tmp -= 188) { + pid = ((tmp[1] & 0x1f) << 8) | tmp[2]; + if (*pcr_pid == pid) { + if (get_pcr(tmp, pcr_last)) { + *pcr_pid = pid; + break; + } + } + } + } } /* Pass */ diff --git a/src/input/mpegts/satip/satip_frontend.c b/src/input/mpegts/satip/satip_frontend.c index acdb8a614..44dc0a65b 100644 --- a/src/input/mpegts/satip/satip_frontend.c +++ b/src/input/mpegts/satip/satip_frontend.c @@ -1022,7 +1022,7 @@ wrdata: mmi = lfe->sf_req->sf_mmi; mmi->tii_stats.unc += unc; mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, - &lfe->sf_sbuf, NULL, NULL); + &lfe->sf_sbuf, NULL, NULL, NULL); } pthread_mutex_unlock(&lfe->sf_dvr_lock); lfe->sf_last_data_tstamp = dispatch_clock; @@ -1571,7 +1571,7 @@ wrdata: if (lfe->sf_req == lfe->sf_req_thread) { mmi->tii_stats.unc += unc; mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, - sb, NULL, NULL); + sb, NULL, NULL, NULL); } else fatal = 1; pthread_mutex_unlock(&lfe->sf_dvr_lock); diff --git a/src/input/mpegts/tsfile/tsfile_input.c b/src/input/mpegts/tsfile/tsfile_input.c index 5e8d44edb..d430d5a60 100644 --- a/src/input/mpegts/tsfile/tsfile_input.c +++ b/src/input/mpegts/tsfile/tsfile_input.c @@ -44,7 +44,7 @@ tsfile_input_thread ( void *aux ) tvhpoll_event_t ev; struct stat st; sbuf_t buf; - int64_t pcr, pcr_last = PTS_UNSET; + int64_t pcr, pcr2, pcr_last = PTS_UNSET; #if PLATFORM_LINUX int64_t pcr_last_realtime = 0; #endif @@ -132,7 +132,7 @@ tsfile_input_thread ( void *aux ) if (c > 0) { pcr = PTS_UNSET; mpegts_input_recv_packets((mpegts_input_t*)mi, mmi, &buf, - &pcr, &tmi->mmi_tsfile_pcr_pid); + &pcr, &pcr2, &tmi->mmi_tsfile_pcr_pid); /* Delay */ if (pcr != PTS_UNSET) { diff --git a/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c b/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c index c5188dabd..5fc774999 100644 --- a/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c +++ b/src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c @@ -196,7 +196,7 @@ tvhdhomerun_frontend_input_thread ( void *aux ) //tvhdebug("tvhdhomerun", "got r=%d (thats %d)", r, (r == 7*188)); - mpegts_input_recv_packets((mpegts_input_t*) hfe, mmi, &sb, NULL, NULL); + mpegts_input_recv_packets((mpegts_input_t*) hfe, mmi, &sb, NULL, NULL, NULL); } tvhdebug("tvhdhomerun", "setting target to none"); diff --git a/src/packet.h b/src/packet.h index 30d646491..14d3c6c69 100644 --- a/src/packet.h +++ b/src/packet.h @@ -120,4 +120,16 @@ pktbuf_t *pktbuf_append(pktbuf_t *pb, const void *data, size_t size); static inline size_t pktbuf_len(pktbuf_t *pb) { return pb ? pb->pb_size : 0; } static inline uint8_t *pktbuf_ptr(pktbuf_t *pb) { return pb->pb_data; } +static inline int64_t pts_diff(int64_t a, int64_t b) +{ + a &= PTS_MASK; + b &= PTS_MASK; + if (b < (PTS_MASK / 4) && a > (PTS_MASK / 2)) + return b + PTS_MASK + 1 - a; + else if (b > a) + return b - a; + else + return PTS_UNSET; +} + #endif /* PACKET_H_ */ diff --git a/src/parsers/parsers.c b/src/parsers/parsers.c index 4451b9ede..0f8ead670 100644 --- a/src/parsers/parsers.c +++ b/src/parsers/parsers.c @@ -36,9 +36,6 @@ #include "packet.h" #include "streaming.h" -#define PTS_MASK 0x1ffffffffLL -//#define PTS_MASK 0x7ffffLL - /* parser states */ #define PARSER_APPEND 0 #define PARSER_RESET 1 diff --git a/src/plumbing/globalheaders.c b/src/plumbing/globalheaders.c index e5672d36b..83753d9f4 100644 --- a/src/plumbing/globalheaders.c +++ b/src/plumbing/globalheaders.c @@ -34,7 +34,6 @@ typedef struct globalheaders { } globalheaders_t; -#define PTS_MASK 0x1ffffffffLL #define MAX_SCAN_TIME 3500 // in ms /** diff --git a/src/plumbing/tsfix.c b/src/plumbing/tsfix.c index 5d53900cb..1bfba1147 100644 --- a/src/plumbing/tsfix.c +++ b/src/plumbing/tsfix.c @@ -20,8 +20,6 @@ #include "streaming.h" #include "tsfix.h" -#define PTS_MASK 0x1ffffffffLL - #define tsfixprintf(fmt...) // printf(fmt) LIST_HEAD(tfstream_list, tfstream); diff --git a/src/tvheadend.h b/src/tvheadend.h index 2527678d6..d06a858a1 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -84,6 +84,7 @@ typedef struct str_list } str_list_t; #define PTS_UNSET INT64_C(0x8000000000000000) +#define PTS_MASK INT64_C(0x00000001ffffffff) extern int tvheadend_running; -- 2.47.3