]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
IPTV: add pause support for the correct input data timing, file:// seems working...
authorJaroslav Kysela <perex@perex.cz>
Fri, 13 Nov 2015 10:25:19 +0000 (11:25 +0100)
committerJaroslav Kysela <perex@perex.cz>
Fri, 13 Nov 2015 17:01:00 +0000 (18:01 +0100)
17 files changed:
src/input/mpegts.h
src/input/mpegts/iptv/iptv.c
src/input/mpegts/iptv/iptv_file.c
src/input/mpegts/iptv/iptv_http.c
src/input/mpegts/iptv/iptv_pipe.c
src/input/mpegts/iptv/iptv_private.h
src/input/mpegts/iptv/iptv_udp.c
src/input/mpegts/linuxdvb/linuxdvb_frontend.c
src/input/mpegts/mpegts_input.c
src/input/mpegts/satip/satip_frontend.c
src/input/mpegts/tsfile/tsfile_input.c
src/input/mpegts/tvhdhomerun/tvhdhomerun_frontend.c
src/packet.h
src/parsers/parsers.c
src/plumbing/globalheaders.c
src/plumbing/tsfix.c
src/tvheadend.h

index d2b51ba903fe053622f314acb77aa2f753651904..6253f339361e7d6798b934dc008787c6a79767f6 100644 (file)
@@ -894,7 +894,7 @@ void mpegts_mux_update_pids ( mpegts_mux_t *mm );
 
 void mpegts_input_recv_packets
   (mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb,
-   int64_t *pcr, uint16_t *pcr_pid);
+   int64_t *pcr_first, int64_t *pcr_last, uint16_t *pcr_pid);
 
 int mpegts_input_get_weight ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags );
 int mpegts_input_get_priority ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags );
index 23f1dc4015b39a2389f40dcb0bd7b38efe8a1004..ed8a5d62c2ce1de960ba73cdd0c17debbf0efe4a 100644 (file)
@@ -24,6 +24,7 @@
 #include "htsstr.h"
 #include "channels.h"
 #include "bouquet.h"
+#include "packet.h"
 
 #include <sys/socket.h>
 #include <sys/types.h>
@@ -368,6 +369,8 @@ iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
 
   pthread_mutex_lock(&iptv_lock);
 
+  gtimer_disarm(&im->im_pause_timer);
+
   /* Stop */
   if (im->im_handler->stop)
     im->im_handler->stop(im);
@@ -404,10 +407,20 @@ iptv_input_display_name ( mpegts_input_t *mi, char *buf, size_t len )
   snprintf(buf, len, "IPTV");
 }
 
+static void
+iptv_input_unpause ( void *aux )
+{
+  iptv_mux_t *im = aux;
+  pthread_mutex_lock(&iptv_lock);
+  tvhtrace("iptv-pcr", "unpause timer callback");
+  im->im_handler->pause(im, 0);
+  pthread_mutex_unlock(&iptv_lock);
+}
+
 static void *
 iptv_input_thread ( void *aux )
 {
-  int nfds;
+  int nfds, r;
   ssize_t n;
   iptv_mux_t *im;
   tvhpoll_event_t ev;
@@ -425,6 +438,7 @@ iptv_input_thread ( void *aux )
       continue;
     }
     im = ev.data.ptr;
+    r  = 0;
 
     pthread_mutex_lock(&iptv_lock);
 
@@ -436,20 +450,51 @@ iptv_input_thread ( void *aux )
         im->im_handler->stop(im);
         break;
       }
-      iptv_input_recv_packets(im, n);
+      r = iptv_input_recv_packets(im, n);
+      if (r == 1)
+        im->im_handler->pause(im, 1);
     }
 
     pthread_mutex_unlock(&iptv_lock);
+
+    if (r == 1) {
+      pthread_mutex_lock(&global_lock);
+      if (im->mm_active)
+        gtimer_arm(&im->im_pause_timer, iptv_input_unpause, im, 1);
+      pthread_mutex_unlock(&global_lock);
+    }
   }
   return NULL;
 }
 
 void
+iptv_input_pause_handler ( iptv_mux_t *im, int pause )
+{
+  tvhpoll_event_t ev = { 0 };
+
+  ev.fd       = im->mm_iptv_fd;
+  ev.events   = TVHPOLL_IN;
+  ev.data.ptr = im;
+  if (pause)
+    tvhpoll_rem(iptv_poll, &ev, 1);
+  else
+    tvhpoll_add(iptv_poll, &ev, 1);
+}
+
+static inline int
+iptv_input_pause_check ( iptv_mux_t *im )
+{
+  /* queued more than 3 seconds? trigger the pause */
+  return im->im_pcr_end - im->im_pcr_start >= 3000000LL;
+}
+
+int
 iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len )
 {
   static time_t t1 = 0, t2;
   iptv_network_t *in = (iptv_network_t*)im->mm_network;
   mpegts_mux_instance_t *mmi;
+  int64_t pcr_first = PTS_UNSET, pcr_last = PTS_UNSET, s64;
 
   in->in_bps += len * 8;
   time(&t2);
@@ -466,13 +511,50 @@ iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len )
     t1 = t2;
   }
 
-  /* Pass on */
+  /* Pass on, but with timing */
   mmi = im->mm_active;
-  if (mmi)
+  if (mmi) {
+    if (im->im_pcr != PTS_UNSET) {
+      s64 = getmonoclock() - im->im_pcr_start;
+      im->im_pcr_start += s64;
+      im->im_pcr += (((s64 / 10LL) * 9LL) + 4LL) / 10LL;
+      im->im_pcr &= PTS_MASK;
+      tvhtrace("iptv-pcr", "pcr: updated %ld, time start %ld", im->im_pcr, im->im_pcr_start);
+    }
+    if (iptv_input_pause_check(im)) {
+      tvhtrace("iptv-pcr", "pcr: paused");
+      return 1;
+    }
     mpegts_input_recv_packets((mpegts_input_t*)iptv_input, mmi,
-                              &im->mm_iptv_buffer, NULL, NULL);
+                              &im->mm_iptv_buffer,
+                              &pcr_first, &pcr_last, &im->im_pcr_pid);
+    if (pcr_first != PTS_UNSET && pcr_last != PTS_UNSET) {
+      if (im->im_pcr == PTS_UNSET) {
+        s64 = pts_diff(pcr_first, pcr_last);
+        if (s64 != PTS_UNSET) {
+          im->im_pcr = pcr_first;
+          im->im_pcr_start = getmonoclock();
+          im->im_pcr_end = im->im_pcr_start + ((s64 * 100LL) + 50LL) / 9LL;
+          tvhtrace("iptv-pcr", "pcr: first %ld last %ld, time start %ld, end %ld",
+                   pcr_first, pcr_last, im->im_pcr_start, im->im_pcr_end);
+        }
+      } else {
+        s64 = pts_diff(im->im_pcr, pcr_last);
+        if (s64 != PTS_UNSET) {
+          im->im_pcr_end = im->im_pcr_start + ((s64 * 100LL) + 50LL) / 9LL;
+          tvhtrace("iptv-pcr", "pcr: last %ld, time end %ld", pcr_last, im->im_pcr_end);
+        }
+      }
+      if (iptv_input_pause_check(im)) {
+        tvhtrace("iptv-pcr", "pcr: paused");
+        return 1;
+      }
+    }
+  }
+  return 0;
 }
 
+
 int
 iptv_input_fd_started ( iptv_mux_t *im )
 {
@@ -519,6 +601,9 @@ iptv_input_mux_started ( iptv_mux_t *im )
   /* Allocate input buffer */
   sbuf_reset_and_alloc(&im->mm_iptv_buffer, IPTV_BUF_SIZE);
 
+  im->im_pcr = PTS_UNSET;
+  im->im_pcr_pid = MPEGTS_PID_NONE;
+
   if (iptv_input_fd_started(im))
     return;
 
index e7cfffda746863120a61c8950d2726af15516520..f8c43275c4998f2d49d18a0d97c599f1f7cecb2a 100644 (file)
 #include <sys/types.h>
 #include <fcntl.h>
 #include <assert.h>
+#include <signal.h>
+
+typedef struct file_priv {
+  int fd;
+  int shutdown;
+  pthread_t tid;
+  pthread_cond_t cond;
+} file_priv_t;
 
 /*
- * Connect UDP/RTP
+ * Read thread
+ */
+static void *
+iptv_file_thread ( void *aux )
+{
+  iptv_mux_t *im = aux;
+  file_priv_t *fp = im->im_data;
+  struct timespec ts;
+  int r, fd = fp->fd, pause = 0;
+  char buf[32*1024];
+
+  pthread_mutex_lock(&iptv_lock);
+  while (!fp->shutdown && fd > 0) {
+    while (!fp->shutdown && pause) {
+      clock_gettime(CLOCK_REALTIME, &ts);
+      ts.tv_sec += 1;
+      if (pthread_cond_timedwait(&fp->cond, &iptv_lock, &ts) == ETIMEDOUT)
+        break;
+    }
+    if (fp->shutdown)
+      break;
+    pause = 0;
+    pthread_mutex_unlock(&iptv_lock);
+    r = read(fd, buf, sizeof(buf));
+    pthread_mutex_lock(&iptv_lock);
+    if (r == 0)
+      break;
+    if (r < 0) {
+      if (ERRNO_AGAIN(errno))
+        continue;
+      break;
+    }
+    sbuf_append(&im->mm_iptv_buffer, buf, r);
+    if (iptv_input_recv_packets(im, r) == 1)
+      pause = 1;
+  }
+  pthread_mutex_unlock(&iptv_lock);
+  return NULL;
+}
+
+/*
+ * Open file
  */
 static int
 iptv_file_start ( iptv_mux_t *im, const char *raw, const url_t *url )
 {
-  int fd = tvh_open(raw + 7, O_RDONLY | O_DIRECT, 0);
+  file_priv_t *fp;
+  int fd = tvh_open(raw + 7, O_RDONLY | O_NONBLOCK, 0);
 
   if (fd < 0) {
     tvherror("iptv", "unable to open file '%s'", raw + 7);
     return -1;
   }
 
-  im->mm_iptv_fd = fd;
-
+  fp = calloc(1, sizeof(*fp));
+  fp->fd = fd;
+  pthread_cond_init(&fp->cond, NULL);
+  im->im_data = fp;
   iptv_input_mux_started(im);
+  tvhthread_create(&fp->tid, NULL, iptv_file_thread, im, "iptvfile");
   return 0;
 }
 
@@ -49,40 +102,22 @@ static void
 iptv_file_stop
   ( iptv_mux_t *im )
 {
-  int rd = im->mm_iptv_fd;
+  file_priv_t *fp = im->im_data;
+  int rd = fp->fd;
   if (rd > 0)
     close(rd);
-  im->mm_iptv_fd = -1;
-}
-
-static ssize_t
-iptv_file_read ( iptv_mux_t *im )
-{
-  int r, fd = im->mm_iptv_fd;
-  ssize_t res = 0;
-
-  while (fd > 0) {
-    sbuf_alloc(&im->mm_iptv_buffer, 32*1024);
-    r = sbuf_read(&im->mm_iptv_buffer, fd);
-    if (r == 0) {
-      close(fd);
-      im->mm_iptv_fd = -1;
-      break;
-    }
-    if (r < 0) {
-      if (errno == EAGAIN)
-        break;
-      if (ERRNO_AGAIN(errno))
-        continue;
-    }
-    res += r;
-  }
-
-  return res;
+  fp->shutdown = 1;
+  pthread_cond_signal(&fp->cond);
+  pthread_mutex_unlock(&iptv_lock);
+  pthread_join(fp->tid, NULL);
+  pthread_cond_destroy(&fp->cond);
+  pthread_mutex_lock(&iptv_lock);
+  free(im->im_data);
+  im->im_data = NULL;
 }
 
 /*
- * Initialise pipe handler
+ * Initialise file handler
  */
 
 void
@@ -92,8 +127,7 @@ iptv_file_init ( void )
     {
       .scheme = "file",
       .start  = iptv_file_start,
-      .stop   = iptv_file_stop,
-      .read   = iptv_file_read,
+      .stop   = iptv_file_stop
     },
   };
   iptv_handler_register(ih, ARRAY_SIZE(ih));
index c769324490114487c40678db1a3b0d4a58f447e8..35616e976114f1ca6b7c933b52ca12c48edcdaee 100644 (file)
@@ -87,7 +87,8 @@ iptv_http_data
   tsdebug_write((mpegts_mux_t *)im, buf, len);
 
   if (len > 0)
-    iptv_input_recv_packets(im, len);
+    if (iptv_input_recv_packets(im, len) == 1)
+      hc->hc_pause = 1;
 
   pthread_mutex_unlock(&iptv_lock);
 
@@ -228,6 +229,19 @@ iptv_http_stop
 }
 
 
+/*
+ * Pause/Unpause
+ */
+static void
+iptv_http_pause
+  ( iptv_mux_t *im, int pause )
+{
+  http_client_t *hc = im->im_data;
+
+  assert(pause == 0);
+  http_client_unpause(hc);
+}
+
 /*
  * Initialise HTTP handler
  */
@@ -240,11 +254,13 @@ iptv_http_init ( void )
       .scheme = "http",
       .start  = iptv_http_start,
       .stop   = iptv_http_stop,
+      .pause  = iptv_http_pause
     },
     {
       .scheme  = "https",
       .start  = iptv_http_start,
       .stop   = iptv_http_stop,
+      .pause  = iptv_http_pause
     }
   };
   iptv_handler_register(ih, 2);
index 77eb6e55aff68b934c064d4bc1d4afc715d1adbe..2b9aaac24ff1418d2894c521aedaa4f6cfddaf36 100644 (file)
@@ -165,6 +165,7 @@ iptv_pipe_init ( void )
       .start  = iptv_pipe_start,
       .stop   = iptv_pipe_stop,
       .read   = iptv_pipe_read,
+      .pause  = iptv_input_pause_handler
     },
   };
   iptv_handler_register(ih, ARRAY_SIZE(ih));
index 3b127e09ff23d04b1f78d5b4bef36d7f06cc6a7c..715ffc474e59a55e1d5425ab035cb05494b02667 100644 (file)
@@ -52,6 +52,7 @@ struct iptv_handler
   int     (*start) ( iptv_mux_t *im, const char *raw, const url_t *url );
   void    (*stop)  ( iptv_mux_t *im );
   ssize_t (*read)  ( iptv_mux_t *im );
+  void    (*pause) ( iptv_mux_t *im, int pause );
   
   RB_ENTRY(iptv_handler) link;
 };
@@ -65,7 +66,8 @@ struct iptv_input
 
 int  iptv_input_fd_started ( iptv_mux_t *im );
 void iptv_input_mux_started ( iptv_mux_t *im );
-void iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len );
+int  iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len );
+void iptv_input_pause_handler ( iptv_mux_t *im, int pause );
 
 struct iptv_network
 {
@@ -138,6 +140,12 @@ struct iptv_mux
   sbuf_t                mm_iptv_buffer;
 
   iptv_handler_t       *im_handler;
+  gtimer_t              im_pause_timer;
+
+  int64_t               im_pcr;
+  int64_t               im_pcr_start;
+  int64_t               im_pcr_end;
+  uint16_t              im_pcr_pid;
 
   void                 *im_data;
 
index e2c909390512d1c82c981ad211fe9a4fa5ee20a6..f701c210893119cac828b39ca0a03ece73bdfbfb 100644 (file)
@@ -187,12 +187,14 @@ iptv_udp_init ( void )
       .start  = iptv_udp_start,
       .stop   = iptv_udp_stop,
       .read   = iptv_udp_read,
+      .pause  = iptv_input_pause_handler
     },
     {
       .scheme = "rtp",
       .start  = iptv_udp_start,
       .stop   = iptv_udp_stop,
       .read   = iptv_udp_rtp_read,
+      .pause  = iptv_input_pause_handler
     }
   };
   iptv_handler_register(ih, 2);
index c4ad1fcdf4573a148093a46ade3161435a2bf4ce..aae3b756ff1cd6b013802f8533481efb94ba0f55 100644 (file)
@@ -1136,7 +1136,7 @@ linuxdvb_frontend_input_thread ( void *aux )
     }
     
     /* Process */
-    mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, &sb, NULL, NULL);
+    mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, &sb, NULL, NULL, NULL);
   }
 
   sbuf_free(&sb);
index 87132988b0da5031f09b144120099d8149362dd2..c58fcea8073f39549df42b69a4453ba5ed109f4c 100644 (file)
@@ -974,7 +974,7 @@ ts_sync_count ( const uint8_t *tsb, int len )
 void
 mpegts_input_recv_packets
   ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, sbuf_t *sb,
-    int64_t *pcr, uint16_t *pcr_pid )
+    int64_t *pcr_first, int64_t *pcr_last, uint16_t *pcr_pid )
 {
   int len2 = 0, off = 0;
   mpegts_packet_t *mp;
@@ -1005,17 +1005,29 @@ mpegts_input_recv_packets
   //       require per mmi buffers, where this is generally not required)
 
   /* Extract PCR on demand */
-  if (pcr && pcr_pid) {
-    uint8_t *tmp;
-    for (tmp = tsb + len2 - 188; tmp >= tsb; tmp -= 188) {
-      uint16_t pid = ((tmp[1] & 0x1f) << 8) | tmp[2];
+  if (pcr_first && pcr_last && pcr_pid) {
+    uint8_t *tmp, *end;
+    uint16_t pid;
+    for (tmp = tsb, end = tsb + len2; tmp < end; tmp += 188) {
+      pid = ((tmp[1] & 0x1f) << 8) | tmp[2];
       if (*pcr_pid == MPEGTS_PID_NONE || *pcr_pid == pid) {
-        if (get_pcr(tmp, pcr)) {
-          if (*pcr != PTS_UNSET) *pcr_pid = pid;
+        if (get_pcr(tmp, pcr_first)) {
+          *pcr_pid = pid;
           break;
         }
       }
     }
+    if (*pcr_pid != MPEGTS_PID_NONE) {
+      for (tmp = tsb + len2 - 188; tmp >= tsb; tmp -= 188) {
+        pid = ((tmp[1] & 0x1f) << 8) | tmp[2];
+        if (*pcr_pid == pid) {
+          if (get_pcr(tmp, pcr_last)) {
+            *pcr_pid = pid;
+            break;
+          }
+        }
+      }
+    }
   }
 
   /* Pass */
index acdb8a61490ce7f7bccbb6596b7c6e5ec59bd5de..44dc0a65bb4df6aa8495f060c524dd6c5f4c9100 100644 (file)
@@ -1022,7 +1022,7 @@ wrdata:
         mmi = lfe->sf_req->sf_mmi;
         mmi->tii_stats.unc += unc;
         mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi,
-                                  &lfe->sf_sbuf, NULL, NULL);
+                                  &lfe->sf_sbuf, NULL, NULL, NULL);
       }
       pthread_mutex_unlock(&lfe->sf_dvr_lock);
       lfe->sf_last_data_tstamp = dispatch_clock;
@@ -1571,7 +1571,7 @@ wrdata:
     if (lfe->sf_req == lfe->sf_req_thread) {
       mmi->tii_stats.unc += unc;
       mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi,
-                                sb, NULL, NULL);
+                                sb, NULL, NULL, NULL);
     } else
       fatal = 1;
     pthread_mutex_unlock(&lfe->sf_dvr_lock);
index 5e8d44edb56e27d5653332eb0a3131e8f6d6f34e..d430d5a60fe080db112991bb2a9ee97dbd1a4208 100644 (file)
@@ -44,7 +44,7 @@ tsfile_input_thread ( void *aux )
   tvhpoll_event_t ev;
   struct stat st;
   sbuf_t buf;
-  int64_t pcr, pcr_last = PTS_UNSET;
+  int64_t pcr, pcr2, pcr_last = PTS_UNSET;
 #if PLATFORM_LINUX
   int64_t pcr_last_realtime = 0;
 #endif
@@ -132,7 +132,7 @@ tsfile_input_thread ( void *aux )
     if (c > 0) {
       pcr = PTS_UNSET;
       mpegts_input_recv_packets((mpegts_input_t*)mi, mmi, &buf,
-                                &pcr, &tmi->mmi_tsfile_pcr_pid);
+                                &pcr, &pcr2, &tmi->mmi_tsfile_pcr_pid);
 
       /* Delay */
       if (pcr != PTS_UNSET) {
index c5188dabdfc58295be8a0b5c86d6f6d6d04dd531..5fc774999e8a7463f486ad0166ec012a318f6f02 100644 (file)
@@ -196,7 +196,7 @@ tvhdhomerun_frontend_input_thread ( void *aux )
 
     //tvhdebug("tvhdhomerun", "got r=%d (thats %d)", r, (r == 7*188));
 
-    mpegts_input_recv_packets((mpegts_input_t*) hfe, mmi, &sb, NULL, NULL);
+    mpegts_input_recv_packets((mpegts_input_t*) hfe, mmi, &sb, NULL, NULL, NULL);
   }
 
   tvhdebug("tvhdhomerun", "setting target to none");
index 30d646491702df30823525eaf81bfaffae71b15a..14d3c6c697096308473ccddf7371827cd4fea00e 100644 (file)
@@ -120,4 +120,16 @@ pktbuf_t *pktbuf_append(pktbuf_t *pb, const void *data, size_t size);
 static inline size_t   pktbuf_len(pktbuf_t *pb) { return pb ? pb->pb_size : 0; }
 static inline uint8_t *pktbuf_ptr(pktbuf_t *pb) { return pb->pb_data; }
 
+static inline int64_t pts_diff(int64_t a, int64_t b)
+{
+  a &= PTS_MASK;
+  b &= PTS_MASK;
+  if (b < (PTS_MASK / 4) && a > (PTS_MASK / 2))
+    return b + PTS_MASK + 1 - a;
+  else if (b > a)
+    return b - a;
+  else
+    return PTS_UNSET;
+}
+
 #endif /* PACKET_H_ */
index 4451b9edef2856a0b4a86b7c935317904fcf6f90..0f8ead6705db0b2b8d398903c6d361cd3ac6365f 100644 (file)
@@ -36,9 +36,6 @@
 #include "packet.h"
 #include "streaming.h"
 
-#define PTS_MASK 0x1ffffffffLL
-//#define PTS_MASK 0x7ffffLL
-
 /* parser states */
 #define PARSER_APPEND 0
 #define PARSER_RESET  1
index e5672d36b7e308951f217b35eea03fb98ea4a185..83753d9f4819299ad584bab1c28919bf16809739 100644 (file)
@@ -34,7 +34,6 @@ typedef struct globalheaders {
 
 } globalheaders_t;
 
-#define PTS_MASK      0x1ffffffffLL
 #define MAX_SCAN_TIME 3500  // in ms
 
 /**
index 5d53900cb331f89e6fad02fc9254c22569c91444..1bfba1147bd0da62a3592a7bf3769a4eaca917f2 100644 (file)
@@ -20,8 +20,6 @@
 #include "streaming.h"
 #include "tsfix.h"
 
-#define PTS_MASK 0x1ffffffffLL
-
 #define tsfixprintf(fmt...) // printf(fmt)
 
 LIST_HEAD(tfstream_list, tfstream);
index 2527678d605c4b9ddf578d5c38c4461b082ad87e..d06a858a184579da269cfb2535b94b32c427d1d1 100644 (file)
@@ -84,6 +84,7 @@ typedef struct str_list
 } str_list_t;
 
 #define PTS_UNSET INT64_C(0x8000000000000000)
+#define PTS_MASK  INT64_C(0x00000001ffffffff)
 
 extern int tvheadend_running;