#include <fcntl.h>
#include <assert.h>
#include <sys/stat.h>
+#include <signal.h>
#include "tvheadend.h"
#include "streaming.h"
#include "service.h"
#include "input/mpegts/dvb.h"
#include "muxer_pass.h"
-#include "dvr/dvr.h"
+#include "spawn.h"
typedef struct pass_muxer {
muxer_t;
/* File descriptor stuff */
off_t pm_off;
int pm_fd;
+ int pm_ofd;
int pm_seekable;
int pm_error;
+ int pm_spawn_pid;
/* Filename is also used for logging */
char *pm_filename;
/*
* Rewrite a PAT packet to only include the service included in the transport stream.
*/
-
static void
pass_muxer_pat_cb(mpegts_psi_table_t *mt, const uint8_t *buf, int len)
{
muxer_container_type_t mc;
const streaming_start_component_t *ssc;
const source_info_t *si = &ss->ss_si;
+ const char *mime = m->m_config.u.pass.m_mime;
+
+ if (mime && mime[0])
+ return mime;
has_audio = 0;
has_video = 0;
return pass_muxer_reconfigure(m, ss);
}
+/**
+ * Open the spawned task on demand
+ */
+static int
+pass_muxer_open2(pass_muxer_t *pm)
+{
+ const char *cmdline = pm->m_config.u.pass.m_cmdline;
+ char **argv = NULL;
+
+ pm->pm_spawn_pid = -1;
+ if (cmdline && cmdline[0]) {
+ argv = NULL;
+ if (spawn_parse_args(&argv, 64, cmdline, NULL))
+ goto error;
+ if (spawn_with_passthrough(argv[0], argv, NULL, pm->pm_ofd, &pm->pm_fd, &pm->pm_spawn_pid, 1)) {
+ tvherror(LS_PASS, "Unable to start pipe '%s' (wrong executable?)", cmdline);
+ goto error;
+ }
+ spawn_free_args(argv);
+ } else {
+ pm->pm_fd = pm->pm_ofd;
+ }
+ return 0;
+
+error:
+ if (argv)
+ spawn_free_args(argv);
+ pm->pm_error = ENOMEM;
+ return -1;
+}
/**
* Open the muxer as a stream muxer (using a non-seekable socket)
pass_muxer_t *pm = (pass_muxer_t*)m;
pm->pm_off = 0;
- pm->pm_fd = fd;
+ pm->pm_ofd = fd;
pm->pm_seekable = 0;
pm->pm_filename = strdup("Live stream");
- return 0;
+ return pass_muxer_open2(pm);
}
pm->pm_off = 0;
pm->pm_seekable = 1;
- pm->pm_fd = fd;
+ pm->pm_ofd = fd;
pm->pm_filename = strdup(filename);
- return 0;
+
+ return pass_muxer_open2(pm);
}
{
pass_muxer_t *pm = (pass_muxer_t*)m;
- if(pm->pm_seekable && close(pm->pm_fd)) {
+ if(pm->pm_spawn_pid > 0)
+ spawn_kill(pm->pm_spawn_pid, SIGTERM, 15);
+ if(pm->pm_seekable && close(pm->pm_ofd)) {
pm->pm_error = errno;
tvherror(LS_PASS, "%s: Unable to close file, close failed -- %s",
pm->pm_filename, strerror(errno));
pm->m_close = pass_muxer_close;
pm->m_destroy = pass_muxer_destroy;
pm->pm_fd = -1;
+ pm->pm_ofd = -1;
+ pm->pm_spawn_pid = -1;
dvb_table_parse_init(&pm->pm_pat, "pass-pat", LS_TBL_PASS, DVB_PAT_PID,
DVB_PAT_BASE, DVB_PAT_MASK, pm);
return 0;
}
+/**
+ * Execute the given program and return its standard input as file-descriptor (pipe).
+ * The standard output file-decriptor (od) must be valid, too.
+ */
+int
+spawn_with_passthrough(const char *prog, char *argv[], char *envp[],
+ int od, int *wd, pid_t *pid, int redir_stderr)
+{
+ pid_t p;
+ int fd[2], f, i, maxfd;
+ char bin[256];
+ const char *local_argv[2] = { NULL, NULL };
+ char **e, **e0, **e2, **e3, *p1, *p2;
+
+ if (*prog != '/' && *prog != '.') {
+ if (!find_exec(prog, bin, sizeof(bin))) return -1;
+ prog = bin;
+ }
+
+ if (!argv) argv = (void *)local_argv;
+ if (!argv[0]) {
+ if (argv != (void *)local_argv) {
+ for (i = 1, e = argv + 1; *e; i++, e++);
+ i = (i + 1) * sizeof(char *);
+ e = alloca(i);
+ memcpy(e, argv, i);
+ argv = e;
+ }
+ argv[0] = (char *)prog;
+ }
+
+ if (!envp || !envp[0]) {
+ e = environ;
+ } else {
+ for (i = 0, e2 = environ; *e2; i++, e2++);
+ for (f = 0, e2 = envp; *e2; f++, e2++);
+ e = alloca((i + f + 1) * sizeof(char *));
+ memcpy(e, environ, i * sizeof(char *));
+ e0 = e + i;
+ *e0 = NULL;
+ for (e2 = envp; *e2; e2++) {
+ for (e3 = e; *e3; e3++) {
+ p1 = strchr(*e2, '=');
+ p2 = strchr(*e3, '=');
+ if (p1 - *e2 == p2 - *e3 && !strncmp(*e2, *e3, p1 - *e2)) {
+ *e3 = *e2;
+ break;
+ }
+ }
+ if (!*e3) {
+ *e0++ = *e2;
+ *e0 = NULL;
+ }
+ }
+ *e0 = NULL;
+ }
+
+ maxfd = sysconf(_SC_OPEN_MAX);
+
+ pthread_mutex_lock(&fork_lock);
+
+ if(pipe(fd) == -1) {
+ pthread_mutex_unlock(&fork_lock);
+ return -1;
+ }
+
+ p = fork();
+
+ if(p == -1) {
+ pthread_mutex_unlock(&fork_lock);
+ tvherror(LS_SPAWN, "Unable to fork() for \"%s\" -- %s",
+ prog, strerror(errno));
+ return -1;
+ }
+
+ if(p == 0) {
+ if (redir_stderr) {
+ f = spawn_pipe_error.wr;
+ } else {
+ f = open("/dev/null", O_RDWR);
+ if(f == -1) {
+ spawn_error("pid %d cannot open /dev/null for redirect %s -- %s",
+ getpid(), prog, strerror(errno));
+ exit(1);
+ }
+ }
+
+ close(0);
+ close(1);
+ close(2);
+
+ dup2(fd[0], 0);
+ dup2(od, 1);
+ dup2(f, 2);
+
+ close(fd[0]);
+ close(fd[1]);
+ close(f);
+
+ spawn_info("Executing \"%s\"\n", prog);
+
+ for (f = 3; f < maxfd; f++)
+ close(f);
+
+ execve(prog, argv, e);
+ spawn_error("pid %d cannot execute %s -- %s\n",
+ getpid(), prog, strerror(errno));
+ exit(1);
+ }
+
+ pthread_mutex_unlock(&fork_lock);
+
+ spawn_enq(prog, p);
+
+ close(fd[0]);
+
+ *wd = fd[1];
+ if (pid) {
+ *pid = p;
+
+ // make the spawned process a session leader so killing the
+ // process group recursively kills any child process that
+ // might have been spawned
+ setpgid(p, p);
+ }
+ return 0;
+}
+
/**
* Execute the given program with arguments
*
* *outp will point to the allocated buffer
- * The function will return the size of the buffer
*/
int
spawnv(const char *prog, char *argv[], pid_t *pid, int redir_stdout, int redir_stderr)
int spawn_and_give_stdout(const char *prog, char *argv[], char *envp[],
int *rd, pid_t *pid, int redir_stderr);
+int spawn_with_passthrough(const char *prog, char *argv[], char *envp[],
+ int od, int *wd, pid_t *pid, int redir_stderr);
+
int spawnv(const char *prog, char *argv[], pid_t *pid, int redir_stdout, int redir_stderr);
int spawn_reap(pid_t pid, char *stxt, size_t stxtlen);