From: Jaroslav Kysela Date: Tue, 22 Aug 2017 07:16:46 +0000 (+0200) Subject: mpegts pass muxer: add spawn functionality X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=5bbddc9e0143b5135be42714beb3201d9c2e0019;p=thirdparty%2Ftvheadend.git mpegts pass muxer: add spawn functionality --- diff --git a/src/muxer/muxer_pass.c b/src/muxer/muxer_pass.c index 00763fdc7..511b7f6d1 100644 --- a/src/muxer/muxer_pass.c +++ b/src/muxer/muxer_pass.c @@ -21,6 +21,7 @@ #include #include #include +#include #include "tvheadend.h" #include "streaming.h" @@ -28,7 +29,7 @@ #include "service.h" #include "input/mpegts/dvb.h" #include "muxer_pass.h" -#include "dvr/dvr.h" +#include "spawn.h" typedef struct pass_muxer { muxer_t; @@ -36,8 +37,10 @@ typedef struct pass_muxer { /* File descriptor stuff */ off_t pm_off; int pm_fd; + int pm_ofd; int pm_seekable; int pm_error; + int pm_spawn_pid; /* Filename is also used for logging */ char *pm_filename; @@ -67,7 +70,6 @@ pass_muxer_write(muxer_t *m, const void *data, size_t size); /* * Rewrite a PAT packet to only include the service included in the transport stream. */ - static void pass_muxer_pat_cb(mpegts_psi_table_t *mt, const uint8_t *buf, int len) { @@ -276,6 +278,10 @@ pass_muxer_mime(muxer_t* m, const struct streaming_start *ss) muxer_container_type_t mc; const streaming_start_component_t *ssc; const source_info_t *si = &ss->ss_si; + const char *mime = m->m_config.u.pass.m_mime; + + if (mime && mime[0]) + return mime; has_audio = 0; has_video = 0; @@ -364,6 +370,36 @@ pass_muxer_init(muxer_t* m, struct streaming_start *ss, const char *name) return pass_muxer_reconfigure(m, ss); } +/** + * Open the spawned task on demand + */ +static int +pass_muxer_open2(pass_muxer_t *pm) +{ + const char *cmdline = pm->m_config.u.pass.m_cmdline; + char **argv = NULL; + + pm->pm_spawn_pid = -1; + if (cmdline && cmdline[0]) { + argv = NULL; + if (spawn_parse_args(&argv, 64, cmdline, NULL)) + goto error; + if (spawn_with_passthrough(argv[0], argv, NULL, pm->pm_ofd, &pm->pm_fd, &pm->pm_spawn_pid, 1)) { + tvherror(LS_PASS, "Unable to start pipe '%s' (wrong executable?)", cmdline); + goto error; + } + spawn_free_args(argv); + } else { + pm->pm_fd = pm->pm_ofd; + } + return 0; + +error: + if (argv) + spawn_free_args(argv); + pm->pm_error = ENOMEM; + return -1; +} /** * Open the muxer as a stream muxer (using a non-seekable socket) @@ -374,11 +410,11 @@ pass_muxer_open_stream(muxer_t *m, int fd) pass_muxer_t *pm = (pass_muxer_t*)m; pm->pm_off = 0; - pm->pm_fd = fd; + pm->pm_ofd = fd; pm->pm_seekable = 0; pm->pm_filename = strdup("Live stream"); - return 0; + return pass_muxer_open2(pm); } @@ -410,9 +446,10 @@ pass_muxer_open_file(muxer_t *m, const char *filename) pm->pm_off = 0; pm->pm_seekable = 1; - pm->pm_fd = fd; + pm->pm_ofd = fd; pm->pm_filename = strdup(filename); - return 0; + + return pass_muxer_open2(pm); } @@ -560,7 +597,9 @@ pass_muxer_close(muxer_t *m) { pass_muxer_t *pm = (pass_muxer_t*)m; - if(pm->pm_seekable && close(pm->pm_fd)) { + if(pm->pm_spawn_pid > 0) + spawn_kill(pm->pm_spawn_pid, SIGTERM, 15); + if(pm->pm_seekable && close(pm->pm_ofd)) { pm->pm_error = errno; tvherror(LS_PASS, "%s: Unable to close file, close failed -- %s", pm->pm_filename, strerror(errno)); @@ -617,6 +656,8 @@ pass_muxer_create(const muxer_config_t *m_cfg) pm->m_close = pass_muxer_close; pm->m_destroy = pass_muxer_destroy; pm->pm_fd = -1; + pm->pm_ofd = -1; + pm->pm_spawn_pid = -1; dvb_table_parse_init(&pm->pm_pat, "pass-pat", LS_TBL_PASS, DVB_PAT_PID, DVB_PAT_BASE, DVB_PAT_MASK, pm); diff --git a/src/spawn.c b/src/spawn.c index 731852431..98569626c 100644 --- a/src/spawn.c +++ b/src/spawn.c @@ -563,11 +563,138 @@ spawn_and_give_stdout(const char *prog, char *argv[], char *envp[], return 0; } +/** + * Execute the given program and return its standard input as file-descriptor (pipe). + * The standard output file-decriptor (od) must be valid, too. + */ +int +spawn_with_passthrough(const char *prog, char *argv[], char *envp[], + int od, int *wd, pid_t *pid, int redir_stderr) +{ + pid_t p; + int fd[2], f, i, maxfd; + char bin[256]; + const char *local_argv[2] = { NULL, NULL }; + char **e, **e0, **e2, **e3, *p1, *p2; + + if (*prog != '/' && *prog != '.') { + if (!find_exec(prog, bin, sizeof(bin))) return -1; + prog = bin; + } + + if (!argv) argv = (void *)local_argv; + if (!argv[0]) { + if (argv != (void *)local_argv) { + for (i = 1, e = argv + 1; *e; i++, e++); + i = (i + 1) * sizeof(char *); + e = alloca(i); + memcpy(e, argv, i); + argv = e; + } + argv[0] = (char *)prog; + } + + if (!envp || !envp[0]) { + e = environ; + } else { + for (i = 0, e2 = environ; *e2; i++, e2++); + for (f = 0, e2 = envp; *e2; f++, e2++); + e = alloca((i + f + 1) * sizeof(char *)); + memcpy(e, environ, i * sizeof(char *)); + e0 = e + i; + *e0 = NULL; + for (e2 = envp; *e2; e2++) { + for (e3 = e; *e3; e3++) { + p1 = strchr(*e2, '='); + p2 = strchr(*e3, '='); + if (p1 - *e2 == p2 - *e3 && !strncmp(*e2, *e3, p1 - *e2)) { + *e3 = *e2; + break; + } + } + if (!*e3) { + *e0++ = *e2; + *e0 = NULL; + } + } + *e0 = NULL; + } + + maxfd = sysconf(_SC_OPEN_MAX); + + pthread_mutex_lock(&fork_lock); + + if(pipe(fd) == -1) { + pthread_mutex_unlock(&fork_lock); + return -1; + } + + p = fork(); + + if(p == -1) { + pthread_mutex_unlock(&fork_lock); + tvherror(LS_SPAWN, "Unable to fork() for \"%s\" -- %s", + prog, strerror(errno)); + return -1; + } + + if(p == 0) { + if (redir_stderr) { + f = spawn_pipe_error.wr; + } else { + f = open("/dev/null", O_RDWR); + if(f == -1) { + spawn_error("pid %d cannot open /dev/null for redirect %s -- %s", + getpid(), prog, strerror(errno)); + exit(1); + } + } + + close(0); + close(1); + close(2); + + dup2(fd[0], 0); + dup2(od, 1); + dup2(f, 2); + + close(fd[0]); + close(fd[1]); + close(f); + + spawn_info("Executing \"%s\"\n", prog); + + for (f = 3; f < maxfd; f++) + close(f); + + execve(prog, argv, e); + spawn_error("pid %d cannot execute %s -- %s\n", + getpid(), prog, strerror(errno)); + exit(1); + } + + pthread_mutex_unlock(&fork_lock); + + spawn_enq(prog, p); + + close(fd[0]); + + *wd = fd[1]; + if (pid) { + *pid = p; + + // make the spawned process a session leader so killing the + // process group recursively kills any child process that + // might have been spawned + setpgid(p, p); + } + return 0; +} + /** * Execute the given program with arguments * * *outp will point to the allocated buffer - * The function will return the size of the buffer */ int spawnv(const char *prog, char *argv[], pid_t *pid, int redir_stdout, int redir_stderr) diff --git a/src/spawn.h b/src/spawn.h index 12da725ea..12c0b2ec8 100644 --- a/src/spawn.h +++ b/src/spawn.h @@ -32,6 +32,9 @@ void spawn_free_args(char **argv); int spawn_and_give_stdout(const char *prog, char *argv[], char *envp[], int *rd, pid_t *pid, int redir_stderr); +int spawn_with_passthrough(const char *prog, char *argv[], char *envp[], + int od, int *wd, pid_t *pid, int redir_stderr); + int spawnv(const char *prog, char *argv[], pid_t *pid, int redir_stdout, int redir_stderr); int spawn_reap(pid_t pid, char *stxt, size_t stxtlen);