]> git.ipfire.org Git - thirdparty/tvheadend.git/commitdiff
mpegts pass muxer: add spawn functionality
authorJaroslav Kysela <perex@perex.cz>
Tue, 22 Aug 2017 07:16:46 +0000 (09:16 +0200)
committerJaroslav Kysela <perex@perex.cz>
Tue, 22 Aug 2017 08:14:58 +0000 (10:14 +0200)
src/muxer/muxer_pass.c
src/spawn.c
src/spawn.h

index 00763fdc752caa1e9b43b38778b70b1fc4487cf0..511b7f6d15aa451babe866740ffbe4633482dbe0 100644 (file)
@@ -21,6 +21,7 @@
 #include <fcntl.h>
 #include <assert.h>
 #include <sys/stat.h>
+#include <signal.h>
 
 #include "tvheadend.h"
 #include "streaming.h"
@@ -28,7 +29,7 @@
 #include "service.h"
 #include "input/mpegts/dvb.h"
 #include "muxer_pass.h"
-#include "dvr/dvr.h"
+#include "spawn.h"
 
 typedef struct pass_muxer {
   muxer_t;
@@ -36,8 +37,10 @@ typedef struct pass_muxer {
   /* File descriptor stuff */
   off_t pm_off;
   int   pm_fd;
+  int   pm_ofd;
   int   pm_seekable;
   int   pm_error;
+  int   pm_spawn_pid;
 
   /* Filename is also used for logging */
   char *pm_filename;
@@ -67,7 +70,6 @@ pass_muxer_write(muxer_t *m, const void *data, size_t size);
 /*
  * Rewrite a PAT packet to only include the service included in the transport stream.
  */
-
 static void
 pass_muxer_pat_cb(mpegts_psi_table_t *mt, const uint8_t *buf, int len)
 {
@@ -276,6 +278,10 @@ pass_muxer_mime(muxer_t* m, const struct streaming_start *ss)
   muxer_container_type_t mc;
   const streaming_start_component_t *ssc;
   const source_info_t *si = &ss->ss_si;
+  const char *mime = m->m_config.u.pass.m_mime;
+
+  if (mime && mime[0])
+    return mime;
 
   has_audio = 0;
   has_video = 0;
@@ -364,6 +370,36 @@ pass_muxer_init(muxer_t* m, struct streaming_start *ss, const char *name)
   return pass_muxer_reconfigure(m, ss);
 }
 
+/**
+ * Open the spawned task on demand
+ */
+static int
+pass_muxer_open2(pass_muxer_t *pm)
+{
+  const char *cmdline = pm->m_config.u.pass.m_cmdline;
+  char **argv = NULL;
+
+  pm->pm_spawn_pid = -1;
+  if (cmdline && cmdline[0]) {
+    argv = NULL;
+    if (spawn_parse_args(&argv, 64, cmdline, NULL))
+      goto error;
+    if (spawn_with_passthrough(argv[0], argv, NULL, pm->pm_ofd, &pm->pm_fd, &pm->pm_spawn_pid, 1)) {
+      tvherror(LS_PASS, "Unable to start pipe '%s' (wrong executable?)", cmdline);
+      goto error;
+    }
+    spawn_free_args(argv);
+  } else {
+    pm->pm_fd = pm->pm_ofd;
+  }
+  return 0;
+
+error:
+  if (argv)
+    spawn_free_args(argv);
+  pm->pm_error = ENOMEM;
+  return -1;
+}
 
 /**
  * Open the muxer as a stream muxer (using a non-seekable socket)
@@ -374,11 +410,11 @@ pass_muxer_open_stream(muxer_t *m, int fd)
   pass_muxer_t *pm = (pass_muxer_t*)m;
 
   pm->pm_off      = 0;
-  pm->pm_fd       = fd;
+  pm->pm_ofd      = fd;
   pm->pm_seekable = 0;
   pm->pm_filename = strdup("Live stream");
 
-  return 0;
+  return pass_muxer_open2(pm);
 }
 
 
@@ -410,9 +446,10 @@ pass_muxer_open_file(muxer_t *m, const char *filename)
 
   pm->pm_off      = 0;
   pm->pm_seekable = 1;
-  pm->pm_fd       = fd;
+  pm->pm_ofd      = fd;
   pm->pm_filename = strdup(filename);
-  return 0;
+
+  return pass_muxer_open2(pm);
 }
 
 
@@ -560,7 +597,9 @@ pass_muxer_close(muxer_t *m)
 {
   pass_muxer_t *pm = (pass_muxer_t*)m;
 
-  if(pm->pm_seekable && close(pm->pm_fd)) {
+  if(pm->pm_spawn_pid > 0)
+    spawn_kill(pm->pm_spawn_pid, SIGTERM, 15);
+  if(pm->pm_seekable && close(pm->pm_ofd)) {
     pm->pm_error = errno;
     tvherror(LS_PASS, "%s: Unable to close file, close failed -- %s",
             pm->pm_filename, strerror(errno));
@@ -617,6 +656,8 @@ pass_muxer_create(const muxer_config_t *m_cfg)
   pm->m_close        = pass_muxer_close;
   pm->m_destroy      = pass_muxer_destroy;
   pm->pm_fd          = -1;
+  pm->pm_ofd         = -1;
+  pm->pm_spawn_pid   = -1;
 
   dvb_table_parse_init(&pm->pm_pat, "pass-pat", LS_TBL_PASS, DVB_PAT_PID,
                        DVB_PAT_BASE, DVB_PAT_MASK, pm);
index 7318524319a9e70c7709d467df3e22ecedd699a1..98569626c3090aadfcf10ddca00444f9c74f401e 100644 (file)
@@ -563,11 +563,138 @@ spawn_and_give_stdout(const char *prog, char *argv[], char *envp[],
   return 0;
 }
 
+/**
+ * Execute the given program and return its standard input as file-descriptor (pipe).
+ * The standard output file-decriptor (od) must be valid, too.
+ */
+int
+spawn_with_passthrough(const char *prog, char *argv[], char *envp[],
+                       int od, int *wd, pid_t *pid, int redir_stderr)
+{
+  pid_t p;
+  int fd[2], f, i, maxfd;
+  char bin[256];
+  const char *local_argv[2] = { NULL, NULL };
+  char **e, **e0, **e2, **e3, *p1, *p2;
+
+  if (*prog != '/' && *prog != '.') {
+    if (!find_exec(prog, bin, sizeof(bin))) return -1;
+    prog = bin;
+  }
+
+  if (!argv) argv = (void *)local_argv;
+  if (!argv[0]) {
+    if (argv != (void *)local_argv) {
+      for (i = 1, e = argv + 1; *e; i++, e++);
+      i = (i + 1) * sizeof(char *);
+      e = alloca(i);
+      memcpy(e, argv, i);
+      argv = e;
+    }
+    argv[0] = (char *)prog;
+  }
+
+  if (!envp || !envp[0]) {
+    e = environ;
+  } else {
+    for (i = 0, e2 = environ; *e2; i++, e2++);
+    for (f = 0, e2 = envp; *e2; f++, e2++);
+    e = alloca((i + f + 1) * sizeof(char *));
+    memcpy(e, environ, i * sizeof(char *));
+    e0 = e + i;
+    *e0 = NULL;
+    for (e2 = envp; *e2; e2++) {
+      for (e3 = e; *e3; e3++) {
+        p1 = strchr(*e2, '=');
+        p2 = strchr(*e3, '=');
+        if (p1 - *e2 == p2 - *e3 && !strncmp(*e2, *e3, p1 - *e2)) {
+          *e3 = *e2;
+          break;
+        }
+      }
+      if (!*e3) {
+        *e0++ = *e2;
+        *e0 = NULL;
+      }
+    }
+    *e0 = NULL;
+  }
+
+  maxfd = sysconf(_SC_OPEN_MAX);
+
+  pthread_mutex_lock(&fork_lock);
+
+  if(pipe(fd) == -1) {
+    pthread_mutex_unlock(&fork_lock);
+    return -1;
+  }
+
+  p = fork();
+
+  if(p == -1) {
+    pthread_mutex_unlock(&fork_lock);
+    tvherror(LS_SPAWN, "Unable to fork() for \"%s\" -- %s",
+             prog, strerror(errno));
+    return -1;
+  }
+
+  if(p == 0) {
+    if (redir_stderr) {
+      f = spawn_pipe_error.wr;
+    } else {
+      f = open("/dev/null", O_RDWR);
+      if(f == -1) {
+        spawn_error("pid %d cannot open /dev/null for redirect %s -- %s",
+                    getpid(), prog, strerror(errno));
+        exit(1);
+      }
+    }
+
+    close(0);
+    close(1);
+    close(2);
+
+    dup2(fd[0], 0);
+    dup2(od, 1);
+    dup2(f, 2);
+
+    close(fd[0]);
+    close(fd[1]);
+    close(f);
+
+    spawn_info("Executing \"%s\"\n", prog);
+
+    for (f = 3; f < maxfd; f++)
+      close(f);
+
+    execve(prog, argv, e);
+    spawn_error("pid %d cannot execute %s -- %s\n",
+                getpid(), prog, strerror(errno));
+    exit(1);
+  }
+
+  pthread_mutex_unlock(&fork_lock);
+
+  spawn_enq(prog, p);
+
+  close(fd[0]);
+
+  *wd = fd[1];
+  if (pid) {
+    *pid = p;
+
+    // make the spawned process a session leader so killing the
+    // process group recursively kills any child process that
+    // might have been spawned
+    setpgid(p, p);
+  }
+  return 0;
+}
+
 /**
  * Execute the given program with arguments
  * 
  * *outp will point to the allocated buffer
- * The function will return the size of the buffer
  */
 int
 spawnv(const char *prog, char *argv[], pid_t *pid, int redir_stdout, int redir_stderr)
index 12da725ea740834ddb87ab679a23837d90a6444f..12c0b2ec8e4a4bec76efd1d9e33f0aca8277461c 100644 (file)
@@ -32,6 +32,9 @@ void spawn_free_args(char **argv);
 int spawn_and_give_stdout(const char *prog, char *argv[], char *envp[],
                           int *rd, pid_t *pid, int redir_stderr);
 
+int spawn_with_passthrough(const char *prog, char *argv[], char *envp[],
+                           int od, int *wd, pid_t *pid, int redir_stderr);
+
 int spawnv(const char *prog, char *argv[], pid_t *pid, int redir_stdout, int redir_stderr);
 
 int spawn_reap(pid_t pid, char *stxt, size_t stxtlen);