]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
iptv: add libav input
authorJaroslav Kysela <perex@perex.cz>
Wed, 23 Aug 2017 09:25:17 +0000 (11:25 +0200)
committerJaroslav Kysela <perex@perex.cz>
Wed, 23 Aug 2017 09:28:25 +0000 (11:28 +0200)
12 files changed:
Makefile
src/input/mpegts/iptv/iptv.c
src/input/mpegts/iptv/iptv_libav.c [new file with mode: 0644]
src/input/mpegts/iptv/iptv_mux.c
src/input/mpegts/iptv/iptv_pipe.c
src/input/mpegts/iptv/iptv_private.h
src/input/mpegts/mpegts_input.c
src/libav.c
src/libav.h
src/main.c
src/tvheadend.h
src/utils.c

index 9dbb1c1fe0540edfa68cfde16f23de1dbd8a6bb6..d28fdcddcdbccc9552373c8d8f80451100db9771 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -480,6 +480,7 @@ DEPS-LIBAV = \
        src/tvhlog.c
 SRCS-LIBAV = \
        src/libav.c \
+       src/input/mpegts/iptv/iptv_libav.c \
        src/muxer/muxer_libav.c \
        src/plumbing/transcoding.c
 SRCS-$(CONFIG_LIBAV) += $(SRCS-LIBAV)
index cd01ffff8721f6a19c52d6b521b7dc8ca28d826f..a974a359f8ef95902fb896ce3994a95182b9d147 100644 (file)
@@ -296,7 +296,15 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, int weigh
   mpegts_mux_nice_name((mpegts_mux_t*)im, buf, sizeof(buf));
   urlinit(&url);
 
-  if (raw && !strncmp(raw, "pipe://", 7)) {
+#if ENABLE_LIBAV
+  if (im->mm_iptv_libav > 0 ||
+      (im->mm_iptv_libav == 0 && ((iptv_network_t *)im->mm_network)->in_libav)) {
+
+    scheme = "libav";
+
+  } else
+#endif
+         if (raw && !strncmp(raw, "pipe://", 7)) {
 
     scheme = "pipe";
 
@@ -304,6 +312,12 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, int weigh
 
     scheme = "file";
 
+#if ENABLE_LIBAV
+  } else if (raw && !strncmp(raw, "libav:", 6)) {
+
+    scheme = "libav";
+#endif
+
   } else {
 
     if (urlparse(raw ?: "", &url)) {
@@ -327,7 +341,7 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, int weigh
   im->mm_iptv_url_raw = strdup(raw);
   im->mm_active = mmi; // Note: must set here else mux_started call
                        // will not realise we're ready to accept pid open calls
-  ret = ih->start(im, raw, &url);
+  ret = ih->start(im, im->mm_iptv_url_raw, &url);
   if (!ret)
     im->im_handler = ih;
   else
@@ -384,9 +398,9 @@ iptv_input_display_name ( mpegts_input_t *mi, char *buf, size_t len )
 static inline int
 iptv_input_pause_check ( iptv_mux_t *im )
 {
-  int64_t s64, limit;
+  int64_t old, s64, limit;
 
-  if (im->im_pcr == PTS_UNSET)
+  if ((old = im->im_pcr) == PTS_UNSET)
     return 0;
   limit = im->mm_iptv_buffer_limit;
   if (!limit)
@@ -398,8 +412,9 @@ iptv_input_pause_check ( iptv_mux_t *im )
   im->im_pcr_start += s64;
   im->im_pcr += (((s64 / 10LL) * 9LL) + 4LL) / 10LL;
   im->im_pcr &= PTS_MASK;
-  tvhtrace(LS_IPTV_PCR, "pcr: updated %"PRId64", time start %"PRId64", limit %"PRId64,
-           im->im_pcr, im->im_pcr_start, limit);
+  if (old != im->im_pcr)
+    tvhtrace(LS_IPTV_PCR, "pcr: updated %"PRId64", time start %"PRId64", limit %"PRId64,
+             im->im_pcr, im->im_pcr_start, limit);
 
   /* queued more than 3 seconds? trigger the pause */
   return im->im_pcr_end - im->im_pcr_start >= limit;
@@ -701,6 +716,19 @@ const idclass_t iptv_network_class = {
   .ic_caption    = N_("IPTV Network"),
   .ic_delete     = iptv_network_class_delete,
   .ic_properties = (const property_t[]){
+#if ENABLE_LIBAV
+    {
+      .type     = PT_BOOL,
+      .id       = "use_libav",
+      .name     = N_("Use A/V library"),
+      .desc     = N_("The input stream is remuxed with A/V library (libav or"
+                     " or ffmpeg) to the MPEG-TS format which is accepted by"
+                     " tvheadend."),
+      .off      = offsetof(iptv_network_t, in_libav),
+      .def.i    = 1,
+      .opts     = PO_ADVANCED
+    },
+#endif
     {
       .type     = PT_BOOL,
       .id       = "scan_create",
@@ -1058,6 +1086,9 @@ void iptv_init ( void )
   iptv_rtsp_init();
   iptv_pipe_init();
   iptv_file_init();
+#if ENABLE_LIBAV
+  iptv_libav_init();
+#endif
 
   iptv_input = calloc(1, sizeof(iptv_input_t));
 
diff --git a/src/input/mpegts/iptv/iptv_libav.c b/src/input/mpegts/iptv/iptv_libav.c
new file mode 100644 (file)
index 0000000..e5bae57
--- /dev/null
@@ -0,0 +1,255 @@
+/*
+ *  IPTV - libav handler
+ *
+ *  Copyright (C) 2017 Jaroslav Kysela
+ *
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "tvheadend.h"
+#include "iptv_private.h"
+#include "libav.h"
+
+#include <fcntl.h>
+#include <signal.h>
+
+#define WRITE_BUFFER_SIZE (188*500)
+
+typedef struct {
+  const char *url;
+  iptv_mux_t *mux;
+  int running;
+  int pause;
+  pthread_t thread;
+  pthread_mutex_t lock;
+  th_pipe_t pipe;
+  AVFormatContext *ictx;
+  AVFormatContext *octx;
+  sbuf_t sbuf;
+} iptv_libav_priv_t;
+
+/*
+ *
+ */
+static int
+iptv_libav_write_packet(void *opaque, uint8_t *buf, int buf_size)
+{
+  iptv_libav_priv_t *la = opaque;
+
+  if (buf_size > 0) {
+    pthread_mutex_lock(&la->lock);
+    if (la->sbuf.sb_ptr < 5*1024*1024) {
+      while (atomic_get(&la->pause)) {
+        if (!atomic_get(&la->running))
+          goto fin;
+        pthread_mutex_unlock(&la->lock);
+        tvh_usleep(500000);
+        pthread_mutex_lock(&la->lock);
+      }
+      sbuf_append(&la->sbuf, buf, buf_size);
+      /* notify iptv layer that we have new data to read */
+      write(la->pipe.wr, "", 1);
+    }
+fin:
+    pthread_mutex_unlock(&la->lock);
+  }
+  return 0;
+}
+
+/*
+ *
+ */
+static int
+iptv_libav_interrupt_callback(void *opaque)
+{
+  iptv_libav_priv_t *la = opaque;
+
+  return atomic_get(&la->running) == 0;
+}
+
+/*
+ *
+ */
+static void *
+iptv_libav_thread(void *aux)
+{
+  iptv_libav_priv_t *la = aux;
+  AVStream *in_stream, *out_stream;
+  AVPacket pkt;
+  uint8_t *buf = NULL;
+  int ret, i;
+
+  buf = malloc(WRITE_BUFFER_SIZE);
+  if (buf == NULL)
+    goto fail;
+
+  if ((ret = avformat_open_input(&la->ictx, la->url, 0, 0)) < 0) {
+    tvherror(LS_IPTV, "libav: Could not open input '%s': %s", la->url, av_err2str(ret));
+    goto fail;
+  }
+  la->ictx->interrupt_callback.callback = iptv_libav_interrupt_callback;
+  la->ictx->interrupt_callback.opaque = la;
+  if ((ret = avformat_find_stream_info(la->ictx, 0)) < 0) {
+    tvherror(LS_IPTV, "libav: Unable to find stream info for input '%s': %s", la->url, av_err2str(ret));
+    goto fail;
+  }
+
+  avformat_alloc_output_context2(&la->octx, NULL, "mpegts", NULL);
+  if (la->octx == NULL)
+    goto fail;
+  la->octx->pb = avio_alloc_context(buf, WRITE_BUFFER_SIZE, AVIO_FLAG_WRITE,
+                                    la, NULL, iptv_libav_write_packet, NULL);
+  la->octx->interrupt_callback.callback = iptv_libav_interrupt_callback;
+  la->octx->interrupt_callback.opaque = la;
+
+  for (i = 0; i < la->ictx->nb_streams; i++) {
+    in_stream = la->ictx->streams[i];
+    out_stream = avformat_new_stream(la->octx, in_stream->codec->codec);
+    if (out_stream == NULL) {
+      tvherror(LS_IPTV, "libav: Failed allocating output stream");
+      goto fail;
+    }
+    ret = avcodec_copy_context(out_stream->codec, in_stream->codec);
+    if (ret < 0) {
+      tvherror(LS_IPTV, "libav: Failed to copy context from input to output stream codec context: %s", av_err2str(ret));
+      goto fail;
+    }
+    out_stream->codec->codec_tag = 0;
+    if (la->octx->oformat->flags & AVFMT_GLOBALHEADER)
+      out_stream->codec->flags |= AV_CODEC_FLAG_GLOBAL_HEADER;
+  }
+
+  ret = avformat_write_header(la->octx, NULL);
+  if (ret < 0) {
+    tvherror(LS_IPTV, "libav: Unable to write header");
+    goto fail;
+  }
+
+  while (atomic_get(&la->running)) {
+    ret = av_read_frame(la->ictx, &pkt);
+    if (ret < 0) {
+      if (ret != AVERROR_EOF)
+        tvherror(LS_IPTV, "libav: unable to read frame: %s", av_err2str(ret));
+      break;
+    }
+    if (atomic_get(&la->running) == 0)
+      goto unref;
+    in_stream  = la->ictx->streams[pkt.stream_index];
+    out_stream = la->octx->streams[pkt.stream_index];
+    /* copy packet */
+    pkt.pts = av_rescale_q_rnd(pkt.pts, in_stream->time_base, out_stream->time_base, AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX);
+    pkt.dts = av_rescale_q_rnd(pkt.dts, in_stream->time_base, out_stream->time_base, AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX);
+    pkt.duration = av_rescale_q(pkt.duration, in_stream->time_base, out_stream->time_base);
+    pkt.pos = -1;
+    ret = av_interleaved_write_frame(la->octx, &pkt);
+    if (ret < 0) {
+      tvherror(LS_IPTV, "libav: Error muxing packet: %s", av_err2str(ret));
+      break;
+    }
+unref:
+    av_packet_unref(&pkt);
+  }
+  av_write_trailer(la->octx);
+
+fail:
+  free(buf);
+  return NULL;
+}
+
+/*
+ * Start new thread
+ */
+static int
+iptv_libav_start ( iptv_mux_t *im, const char *raw, const url_t *url )
+{
+  iptv_libav_priv_t *la = calloc(1, sizeof(*la));
+
+  pthread_mutex_init(&la->lock, NULL);
+  im->im_opaque = la;
+  if (strncmp(raw, "libav:", 6) == 0)
+    raw += 6;
+  la->url = raw;
+  la->mux = im;
+  tvh_pipe(O_NONBLOCK, &la->pipe);
+  im->mm_iptv_fd = la->pipe.rd;
+  iptv_input_fd_started(im);
+  atomic_set(&la->running, 1);
+  atomic_set(&la->pause, 0);
+  sbuf_init(&la->sbuf);
+  tvhthread_create(&la->thread, NULL, iptv_libav_thread, la, "libavinput");
+  return 0;
+}
+
+static void
+iptv_libav_stop
+  ( iptv_mux_t *im )
+{
+  iptv_libav_priv_t *la = im->im_opaque;
+
+  atomic_set(&la->running, 0);
+  im->im_opaque = NULL;
+  pthread_kill(la->thread, SIGUSR1);
+  pthread_join(la->thread, NULL);
+  tvh_pipe_close(&la->pipe);
+  avformat_close_input(&la->ictx);
+  avformat_free_context(la->octx);
+  sbuf_free(&la->sbuf);
+  free(la);
+}
+
+static ssize_t
+iptv_libav_read ( iptv_mux_t *im )
+{
+  iptv_libav_priv_t *la = im->im_opaque;
+  char buf[8192];
+  ssize_t ret;
+
+  if (la == NULL)
+    return 0;
+  pthread_mutex_lock(&la->lock);
+  ret = la->sbuf.sb_ptr;
+  sbuf_append_from_sbuf(&im->mm_iptv_buffer, &la->sbuf);
+  sbuf_reset(&la->sbuf, WRITE_BUFFER_SIZE * 2);
+  read(la->pipe.rd, buf, sizeof(buf));
+  pthread_mutex_unlock(&la->lock);
+  return ret;
+}
+
+static void
+iptv_libav_pause ( iptv_mux_t *im, int pause )
+{
+  iptv_libav_priv_t *la = im->im_opaque;
+
+  if (la)
+    atomic_set(&la->pause, pause);
+}
+
+/*
+ * Initialise libav handler
+ */
+void
+iptv_libav_init ( void )
+{
+  static iptv_handler_t ih[] = {
+    {
+      .scheme = "libav",
+      .buffer_limit = 5000,
+      .start  = iptv_libav_start,
+      .stop   = iptv_libav_stop,
+      .read   = iptv_libav_read,
+      .pause  = iptv_libav_pause,
+    },
+  };
+  iptv_handler_register(ih, ARRAY_SIZE(ih));
+}
index 27fe7e85c8dd8e9948b2db61b7e984de37e4f48a..7b2fa7ae257fd38bbaba488321491fed4ea378ae 100644 (file)
@@ -97,6 +97,17 @@ iptv_mux_url_set ( void *p, const void *v )
   return iptv_url_set(&im->mm_iptv_url, &im->mm_iptv_url_sane, v, 1, 1);
 }
 
+static htsmsg_t *
+iptv_mux_libav_enum ( void *o, const char *lang )
+{
+  static const struct strtab tab[] = {
+    { N_("Network settings"),   0 },
+    { N_("Use"),                1 },
+    { N_("Do not use"),        -1 },
+  };
+  return strtab2htsmsg(tab, 1, lang);
+}
+
 const idclass_t iptv_mux_class =
 {
   .ic_super      = &mpegts_mux_class,
@@ -134,6 +145,18 @@ const idclass_t iptv_mux_class =
       .off      = offsetof(iptv_mux_t, mm_iptv_substitute),
       .opts     = PO_ADVANCED
     },
+#if ENABLE_LIBAV
+    {
+      .type     = PT_INT,
+      .id       = "use_libav",
+      .name     = N_("Use A/V library"),
+      .desc     = N_("The input stream is remuxed with A/V library (libav or"
+                     " or ffmpeg) to the MPEG-TS format which is accepted by"
+                     " tvheadend."),
+      .list     = iptv_mux_libav_enum,
+      .off      = offsetof(iptv_mux_t, mm_iptv_libav),
+    },
+#endif
     {
       .type     = PT_STR,
       .id       = "iptv_interface",
index 43d5c0c362dc4b377bee7c3655ca4c5db0ab27aa..f75b1676eb1c2895c0952833a4632fce3cbb4629 100644 (file)
@@ -28,7 +28,7 @@
 #include <assert.h>
 
 /*
- * Connect UDP/RTP
+ * Spawn task and create pipes
  */
 static int
 iptv_pipe_start ( iptv_mux_t *im, const char *raw, const url_t *url )
index 970afb2caad7954fd8dfdc73d912933880166d8c..851eef7d7d667f0ab3f2a83e2588c5cf0d179b26 100644 (file)
@@ -92,6 +92,7 @@ struct iptv_network
   int      in_ssl_peer_verify;
   char    *in_remove_args;
   int      in_tsid_accept_zero_value;
+  int      in_libav;
 
   void    *in_auto; /* private structure for auto-network */
 };
@@ -114,6 +115,7 @@ struct iptv_mux
   char                 *mm_iptv_interface;
 
   int                   mm_iptv_substitute;
+  int                   mm_iptv_libav;
   int                   mm_iptv_atsc;
 
   char                 *mm_iptv_muxname;
@@ -148,6 +150,8 @@ struct iptv_mux
   void                 *im_data;
 
   int                   im_delete_flag;
+
+  void                 *im_opaque;
 };
 
 iptv_mux_t* iptv_mux_create0
@@ -184,6 +188,7 @@ void iptv_udp_init     ( void );
 void iptv_rtsp_init    ( void );
 void iptv_pipe_init    ( void );
 void iptv_file_init    ( void );
+void iptv_libav_init   ( void );
 
 ssize_t iptv_rtp_read ( iptv_mux_t *im, udp_multirecv_t *um,
                         void (*pkt_cb)(iptv_mux_t *im, uint8_t *buf, int len) );
index 6afd06277d4f309fd5b163837271cdb611cc0985..15935ac5318aa6025e7854f4092a9fc35d91e9cf 100644 (file)
@@ -1047,6 +1047,8 @@ mpegts_input_recv_packets
 #define MIN_TS_PKT 100
 #define MIN_TS_SYN (5*188)
 
+  if (sb->sb_ptr == 0)
+    return;
 retry:
   len2 = 0;
   off  = 0;
index c7651a75e4bbfc0a87c6a5b46109570c47839c86..042b16f4949fe7587d9b0b6c91233c379feb2d34 100644 (file)
@@ -214,6 +214,13 @@ libav_init(void)
   av_log_set_callback(libav_log_callback);
   libav_set_loglevel();
   av_register_all();
+  avformat_network_init();
   avfilter_register_all();
   transcoding_init();
 }
+
+void
+libav_done(void)
+{
+  avformat_network_deinit();
+}
\ No newline at end of file
index b46ee0baf7cbb4659e8701511c49d750ea8890ae..ba8b092b8a9177279fe8f955f38925331722d9bf 100644 (file)
@@ -59,11 +59,13 @@ streaming_component_type_t codec_id2streaming_component_type(enum AVCodecID id);
 int libav_is_encoder(AVCodec *codec);
 void libav_set_loglevel(void);
 void libav_init(void);
+void libav_done(void);
 
 #else
 
 static inline void libav_set_loglevel(void) { };
 static inline void libav_init(void) { };
+static inline void libav_done(void) { };
 
 #endif
 
index fc73b19645fdc590e50b099e7a0d0c56f943c1d1..2456d14c205e3b676b9e67ddcb641d7ee654e4dc 100644 (file)
@@ -1345,6 +1345,8 @@ main(int argc, char **argv)
 
   if(opt_fork)
     unlink(opt_pidpath);
+
+  libav_done();
     
   /* OpenSSL - welcome to the "cleanup" hell */
   ENGINE_cleanup();
index 27850a45e7af58857f5d494e297c15872b1a2835..dcaf6b24aa3f5b3b3f4e217bc92b72a19fe6d712 100644 (file)
@@ -798,6 +798,8 @@ void sbuf_realloc(sbuf_t *sb, int len);
 
 void sbuf_append(sbuf_t *sb, const void *data, int len);
 
+void sbuf_append_from_sbuf(sbuf_t *sb, sbuf_t *src);
+
 void sbuf_cut(sbuf_t *sb, int off);
 
 void sbuf_put_be32(sbuf_t *sb, uint32_t u32);
index b2860f096b3f606a1adc50920b33fe093e2a4ce6..45d03e4786966067797d48cc5b3be4472009532a 100644 (file)
@@ -385,6 +385,21 @@ sbuf_append(sbuf_t *sb, const void *data, int len)
   sb->sb_ptr += len;
 }
 
+void
+sbuf_append_from_sbuf(sbuf_t *sb, sbuf_t *src)
+{
+  if (sb->sb_ptr == 0) {
+    sbuf_free(sb);
+    sb->sb_data = src->sb_data;
+    sb->sb_ptr = src->sb_ptr;
+    sb->sb_size = src->sb_size;
+    sbuf_steal_data(src);
+  } else {
+    sbuf_append(sb, src->sb_data, src->sb_ptr);
+    src->sb_ptr = 0;
+  }
+}
+
 void
 sbuf_put_be32(sbuf_t *sb, uint32_t u32)
 {