From: Jaroslav Kysela Date: Wed, 23 Aug 2017 09:25:17 +0000 (+0200) Subject: iptv: add libav input X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=25265a078faa92be686bba2fdb4dae1edee02918;p=thirdparty%2Ftvheadend.git iptv: add libav input --- diff --git a/Makefile b/Makefile index 9dbb1c1fe..d28fdcddc 100644 --- a/Makefile +++ b/Makefile @@ -480,6 +480,7 @@ DEPS-LIBAV = \ src/tvhlog.c SRCS-LIBAV = \ src/libav.c \ + src/input/mpegts/iptv/iptv_libav.c \ src/muxer/muxer_libav.c \ src/plumbing/transcoding.c SRCS-$(CONFIG_LIBAV) += $(SRCS-LIBAV) diff --git a/src/input/mpegts/iptv/iptv.c b/src/input/mpegts/iptv/iptv.c index cd01ffff8..a974a359f 100644 --- a/src/input/mpegts/iptv/iptv.c +++ b/src/input/mpegts/iptv/iptv.c @@ -296,7 +296,15 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, int weigh mpegts_mux_nice_name((mpegts_mux_t*)im, buf, sizeof(buf)); urlinit(&url); - if (raw && !strncmp(raw, "pipe://", 7)) { +#if ENABLE_LIBAV + if (im->mm_iptv_libav > 0 || + (im->mm_iptv_libav == 0 && ((iptv_network_t *)im->mm_network)->in_libav)) { + + scheme = "libav"; + + } else +#endif + if (raw && !strncmp(raw, "pipe://", 7)) { scheme = "pipe"; @@ -304,6 +312,12 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, int weigh scheme = "file"; +#if ENABLE_LIBAV + } else if (raw && !strncmp(raw, "libav:", 6)) { + + scheme = "libav"; +#endif + } else { if (urlparse(raw ?: "", &url)) { @@ -327,7 +341,7 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi, int weigh im->mm_iptv_url_raw = strdup(raw); im->mm_active = mmi; // Note: must set here else mux_started call // will not realise we're ready to accept pid open calls - ret = ih->start(im, raw, &url); + ret = ih->start(im, im->mm_iptv_url_raw, &url); if (!ret) im->im_handler = ih; else @@ -384,9 +398,9 @@ iptv_input_display_name ( mpegts_input_t *mi, char *buf, size_t len ) static inline int iptv_input_pause_check ( iptv_mux_t *im ) { - int64_t s64, limit; + int64_t old, s64, limit; - if (im->im_pcr == PTS_UNSET) + if ((old = im->im_pcr) == PTS_UNSET) return 0; limit = im->mm_iptv_buffer_limit; if (!limit) @@ -398,8 +412,9 @@ iptv_input_pause_check ( iptv_mux_t *im ) im->im_pcr_start += s64; im->im_pcr += (((s64 / 10LL) * 9LL) + 4LL) / 10LL; im->im_pcr &= PTS_MASK; - tvhtrace(LS_IPTV_PCR, "pcr: updated %"PRId64", time start %"PRId64", limit %"PRId64, - im->im_pcr, im->im_pcr_start, limit); + if (old != im->im_pcr) + tvhtrace(LS_IPTV_PCR, "pcr: updated %"PRId64", time start %"PRId64", limit %"PRId64, + im->im_pcr, im->im_pcr_start, limit); /* queued more than 3 seconds? trigger the pause */ return im->im_pcr_end - im->im_pcr_start >= limit; @@ -701,6 +716,19 @@ const idclass_t iptv_network_class = { .ic_caption = N_("IPTV Network"), .ic_delete = iptv_network_class_delete, .ic_properties = (const property_t[]){ +#if ENABLE_LIBAV + { + .type = PT_BOOL, + .id = "use_libav", + .name = N_("Use A/V library"), + .desc = N_("The input stream is remuxed with A/V library (libav or" + " or ffmpeg) to the MPEG-TS format which is accepted by" + " tvheadend."), + .off = offsetof(iptv_network_t, in_libav), + .def.i = 1, + .opts = PO_ADVANCED + }, +#endif { .type = PT_BOOL, .id = "scan_create", @@ -1058,6 +1086,9 @@ void iptv_init ( void ) iptv_rtsp_init(); iptv_pipe_init(); iptv_file_init(); +#if ENABLE_LIBAV + iptv_libav_init(); +#endif iptv_input = calloc(1, sizeof(iptv_input_t)); diff --git a/src/input/mpegts/iptv/iptv_libav.c b/src/input/mpegts/iptv/iptv_libav.c new file mode 100644 index 000000000..e5bae572e --- /dev/null +++ b/src/input/mpegts/iptv/iptv_libav.c @@ -0,0 +1,255 @@ +/* + * IPTV - libav handler + * + * Copyright (C) 2017 Jaroslav Kysela + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "tvheadend.h" +#include "iptv_private.h" +#include "libav.h" + +#include +#include + +#define WRITE_BUFFER_SIZE (188*500) + +typedef struct { + const char *url; + iptv_mux_t *mux; + int running; + int pause; + pthread_t thread; + pthread_mutex_t lock; + th_pipe_t pipe; + AVFormatContext *ictx; + AVFormatContext *octx; + sbuf_t sbuf; +} iptv_libav_priv_t; + +/* + * + */ +static int +iptv_libav_write_packet(void *opaque, uint8_t *buf, int buf_size) +{ + iptv_libav_priv_t *la = opaque; + + if (buf_size > 0) { + pthread_mutex_lock(&la->lock); + if (la->sbuf.sb_ptr < 5*1024*1024) { + while (atomic_get(&la->pause)) { + if (!atomic_get(&la->running)) + goto fin; + pthread_mutex_unlock(&la->lock); + tvh_usleep(500000); + pthread_mutex_lock(&la->lock); + } + sbuf_append(&la->sbuf, buf, buf_size); + /* notify iptv layer that we have new data to read */ + write(la->pipe.wr, "", 1); + } +fin: + pthread_mutex_unlock(&la->lock); + } + return 0; +} + +/* + * + */ +static int +iptv_libav_interrupt_callback(void *opaque) +{ + iptv_libav_priv_t *la = opaque; + + return atomic_get(&la->running) == 0; +} + +/* + * + */ +static void * +iptv_libav_thread(void *aux) +{ + iptv_libav_priv_t *la = aux; + AVStream *in_stream, *out_stream; + AVPacket pkt; + uint8_t *buf = NULL; + int ret, i; + + buf = malloc(WRITE_BUFFER_SIZE); + if (buf == NULL) + goto fail; + + if ((ret = avformat_open_input(&la->ictx, la->url, 0, 0)) < 0) { + tvherror(LS_IPTV, "libav: Could not open input '%s': %s", la->url, av_err2str(ret)); + goto fail; + } + la->ictx->interrupt_callback.callback = iptv_libav_interrupt_callback; + la->ictx->interrupt_callback.opaque = la; + if ((ret = avformat_find_stream_info(la->ictx, 0)) < 0) { + tvherror(LS_IPTV, "libav: Unable to find stream info for input '%s': %s", la->url, av_err2str(ret)); + goto fail; + } + + avformat_alloc_output_context2(&la->octx, NULL, "mpegts", NULL); + if (la->octx == NULL) + goto fail; + la->octx->pb = avio_alloc_context(buf, WRITE_BUFFER_SIZE, AVIO_FLAG_WRITE, + la, NULL, iptv_libav_write_packet, NULL); + la->octx->interrupt_callback.callback = iptv_libav_interrupt_callback; + la->octx->interrupt_callback.opaque = la; + + for (i = 0; i < la->ictx->nb_streams; i++) { + in_stream = la->ictx->streams[i]; + out_stream = avformat_new_stream(la->octx, in_stream->codec->codec); + if (out_stream == NULL) { + tvherror(LS_IPTV, "libav: Failed allocating output stream"); + goto fail; + } + ret = avcodec_copy_context(out_stream->codec, in_stream->codec); + if (ret < 0) { + tvherror(LS_IPTV, "libav: Failed to copy context from input to output stream codec context: %s", av_err2str(ret)); + goto fail; + } + out_stream->codec->codec_tag = 0; + if (la->octx->oformat->flags & AVFMT_GLOBALHEADER) + out_stream->codec->flags |= AV_CODEC_FLAG_GLOBAL_HEADER; + } + + ret = avformat_write_header(la->octx, NULL); + if (ret < 0) { + tvherror(LS_IPTV, "libav: Unable to write header"); + goto fail; + } + + while (atomic_get(&la->running)) { + ret = av_read_frame(la->ictx, &pkt); + if (ret < 0) { + if (ret != AVERROR_EOF) + tvherror(LS_IPTV, "libav: unable to read frame: %s", av_err2str(ret)); + break; + } + if (atomic_get(&la->running) == 0) + goto unref; + in_stream = la->ictx->streams[pkt.stream_index]; + out_stream = la->octx->streams[pkt.stream_index]; + /* copy packet */ + pkt.pts = av_rescale_q_rnd(pkt.pts, in_stream->time_base, out_stream->time_base, AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX); + pkt.dts = av_rescale_q_rnd(pkt.dts, in_stream->time_base, out_stream->time_base, AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX); + pkt.duration = av_rescale_q(pkt.duration, in_stream->time_base, out_stream->time_base); + pkt.pos = -1; + ret = av_interleaved_write_frame(la->octx, &pkt); + if (ret < 0) { + tvherror(LS_IPTV, "libav: Error muxing packet: %s", av_err2str(ret)); + break; + } +unref: + av_packet_unref(&pkt); + } + av_write_trailer(la->octx); + +fail: + free(buf); + return NULL; +} + +/* + * Start new thread + */ +static int +iptv_libav_start ( iptv_mux_t *im, const char *raw, const url_t *url ) +{ + iptv_libav_priv_t *la = calloc(1, sizeof(*la)); + + pthread_mutex_init(&la->lock, NULL); + im->im_opaque = la; + if (strncmp(raw, "libav:", 6) == 0) + raw += 6; + la->url = raw; + la->mux = im; + tvh_pipe(O_NONBLOCK, &la->pipe); + im->mm_iptv_fd = la->pipe.rd; + iptv_input_fd_started(im); + atomic_set(&la->running, 1); + atomic_set(&la->pause, 0); + sbuf_init(&la->sbuf); + tvhthread_create(&la->thread, NULL, iptv_libav_thread, la, "libavinput"); + return 0; +} + +static void +iptv_libav_stop + ( iptv_mux_t *im ) +{ + iptv_libav_priv_t *la = im->im_opaque; + + atomic_set(&la->running, 0); + im->im_opaque = NULL; + pthread_kill(la->thread, SIGUSR1); + pthread_join(la->thread, NULL); + tvh_pipe_close(&la->pipe); + avformat_close_input(&la->ictx); + avformat_free_context(la->octx); + sbuf_free(&la->sbuf); + free(la); +} + +static ssize_t +iptv_libav_read ( iptv_mux_t *im ) +{ + iptv_libav_priv_t *la = im->im_opaque; + char buf[8192]; + ssize_t ret; + + if (la == NULL) + return 0; + pthread_mutex_lock(&la->lock); + ret = la->sbuf.sb_ptr; + sbuf_append_from_sbuf(&im->mm_iptv_buffer, &la->sbuf); + sbuf_reset(&la->sbuf, WRITE_BUFFER_SIZE * 2); + read(la->pipe.rd, buf, sizeof(buf)); + pthread_mutex_unlock(&la->lock); + return ret; +} + +static void +iptv_libav_pause ( iptv_mux_t *im, int pause ) +{ + iptv_libav_priv_t *la = im->im_opaque; + + if (la) + atomic_set(&la->pause, pause); +} + +/* + * Initialise libav handler + */ +void +iptv_libav_init ( void ) +{ + static iptv_handler_t ih[] = { + { + .scheme = "libav", + .buffer_limit = 5000, + .start = iptv_libav_start, + .stop = iptv_libav_stop, + .read = iptv_libav_read, + .pause = iptv_libav_pause, + }, + }; + iptv_handler_register(ih, ARRAY_SIZE(ih)); +} diff --git a/src/input/mpegts/iptv/iptv_mux.c b/src/input/mpegts/iptv/iptv_mux.c index 27fe7e85c..7b2fa7ae2 100644 --- a/src/input/mpegts/iptv/iptv_mux.c +++ b/src/input/mpegts/iptv/iptv_mux.c @@ -97,6 +97,17 @@ iptv_mux_url_set ( void *p, const void *v ) return iptv_url_set(&im->mm_iptv_url, &im->mm_iptv_url_sane, v, 1, 1); } +static htsmsg_t * +iptv_mux_libav_enum ( void *o, const char *lang ) +{ + static const struct strtab tab[] = { + { N_("Network settings"), 0 }, + { N_("Use"), 1 }, + { N_("Do not use"), -1 }, + }; + return strtab2htsmsg(tab, 1, lang); +} + const idclass_t iptv_mux_class = { .ic_super = &mpegts_mux_class, @@ -134,6 +145,18 @@ const idclass_t iptv_mux_class = .off = offsetof(iptv_mux_t, mm_iptv_substitute), .opts = PO_ADVANCED }, +#if ENABLE_LIBAV + { + .type = PT_INT, + .id = "use_libav", + .name = N_("Use A/V library"), + .desc = N_("The input stream is remuxed with A/V library (libav or" + " or ffmpeg) to the MPEG-TS format which is accepted by" + " tvheadend."), + .list = iptv_mux_libav_enum, + .off = offsetof(iptv_mux_t, mm_iptv_libav), + }, +#endif { .type = PT_STR, .id = "iptv_interface", diff --git a/src/input/mpegts/iptv/iptv_pipe.c b/src/input/mpegts/iptv/iptv_pipe.c index 43d5c0c36..f75b1676e 100644 --- a/src/input/mpegts/iptv/iptv_pipe.c +++ b/src/input/mpegts/iptv/iptv_pipe.c @@ -28,7 +28,7 @@ #include /* - * Connect UDP/RTP + * Spawn task and create pipes */ static int iptv_pipe_start ( iptv_mux_t *im, const char *raw, const url_t *url ) diff --git a/src/input/mpegts/iptv/iptv_private.h b/src/input/mpegts/iptv/iptv_private.h index 970afb2ca..851eef7d7 100644 --- a/src/input/mpegts/iptv/iptv_private.h +++ b/src/input/mpegts/iptv/iptv_private.h @@ -92,6 +92,7 @@ struct iptv_network int in_ssl_peer_verify; char *in_remove_args; int in_tsid_accept_zero_value; + int in_libav; void *in_auto; /* private structure for auto-network */ }; @@ -114,6 +115,7 @@ struct iptv_mux char *mm_iptv_interface; int mm_iptv_substitute; + int mm_iptv_libav; int mm_iptv_atsc; char *mm_iptv_muxname; @@ -148,6 +150,8 @@ struct iptv_mux void *im_data; int im_delete_flag; + + void *im_opaque; }; iptv_mux_t* iptv_mux_create0 @@ -184,6 +188,7 @@ void iptv_udp_init ( void ); void iptv_rtsp_init ( void ); void iptv_pipe_init ( void ); void iptv_file_init ( void ); +void iptv_libav_init ( void ); ssize_t iptv_rtp_read ( iptv_mux_t *im, udp_multirecv_t *um, void (*pkt_cb)(iptv_mux_t *im, uint8_t *buf, int len) ); diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index 6afd06277..15935ac53 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -1047,6 +1047,8 @@ mpegts_input_recv_packets #define MIN_TS_PKT 100 #define MIN_TS_SYN (5*188) + if (sb->sb_ptr == 0) + return; retry: len2 = 0; off = 0; diff --git a/src/libav.c b/src/libav.c index c7651a75e..042b16f49 100644 --- a/src/libav.c +++ b/src/libav.c @@ -214,6 +214,13 @@ libav_init(void) av_log_set_callback(libav_log_callback); libav_set_loglevel(); av_register_all(); + avformat_network_init(); avfilter_register_all(); transcoding_init(); } + +void +libav_done(void) +{ + avformat_network_deinit(); +} \ No newline at end of file diff --git a/src/libav.h b/src/libav.h index b46ee0baf..ba8b092b8 100644 --- a/src/libav.h +++ b/src/libav.h @@ -59,11 +59,13 @@ streaming_component_type_t codec_id2streaming_component_type(enum AVCodecID id); int libav_is_encoder(AVCodec *codec); void libav_set_loglevel(void); void libav_init(void); +void libav_done(void); #else static inline void libav_set_loglevel(void) { }; static inline void libav_init(void) { }; +static inline void libav_done(void) { }; #endif diff --git a/src/main.c b/src/main.c index fc73b1964..2456d14c2 100644 --- a/src/main.c +++ b/src/main.c @@ -1345,6 +1345,8 @@ main(int argc, char **argv) if(opt_fork) unlink(opt_pidpath); + + libav_done(); /* OpenSSL - welcome to the "cleanup" hell */ ENGINE_cleanup(); diff --git a/src/tvheadend.h b/src/tvheadend.h index 27850a45e..dcaf6b24a 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -798,6 +798,8 @@ void sbuf_realloc(sbuf_t *sb, int len); void sbuf_append(sbuf_t *sb, const void *data, int len); +void sbuf_append_from_sbuf(sbuf_t *sb, sbuf_t *src); + void sbuf_cut(sbuf_t *sb, int off); void sbuf_put_be32(sbuf_t *sb, uint32_t u32); diff --git a/src/utils.c b/src/utils.c index b2860f096..45d03e478 100644 --- a/src/utils.c +++ b/src/utils.c @@ -385,6 +385,21 @@ sbuf_append(sbuf_t *sb, const void *data, int len) sb->sb_ptr += len; } +void +sbuf_append_from_sbuf(sbuf_t *sb, sbuf_t *src) +{ + if (sb->sb_ptr == 0) { + sbuf_free(sb); + sb->sb_data = src->sb_data; + sb->sb_ptr = src->sb_ptr; + sb->sb_size = src->sb_size; + sbuf_steal_data(src); + } else { + sbuf_append(sb, src->sb_data, src->sb_ptr); + src->sb_ptr = 0; + } +} + void sbuf_put_be32(sbuf_t *sb, uint32_t u32) {