]> git.ipfire.org Git - thirdparty/git.git/commitdiff
run-command: add stdin callback for parallelization
authorEmily Shaffer <emilyshaffer@google.com>
Thu, 18 Dec 2025 17:11:16 +0000 (19:11 +0200)
committerJunio C Hamano <gitster@pobox.com>
Fri, 19 Dec 2025 04:46:26 +0000 (13:46 +0900)
If a user of the run_processes_parallel() API wants to pipe a large
amount of information to the stdin of each parallel command, that
data could exceed the pipe buffer of the process's stdin and can be
too big to store in-memory via strbuf & friends or to slurp to a file.

Generally this is solved by repeatedly writing to child_process.in
between calls to start_command() and finish_command(). For a specific
pre-existing example of this, see transport.c:run_pre_push_hook().

This adds a generic callback API to run_processes_parallel() to do
exactly that in a unified manner, similar to the existing callback APIs,
which can then be used by hooks.h to convert the remaining hooks to the
new, simpler parallel interface.

Signed-off-by: Emily Shaffer <emilyshaffer@google.com>
Signed-off-by: Ævar Arnfjörð Bjarmason <avarab@gmail.com>
Signed-off-by: Adrian Ratiu <adrian.ratiu@collabora.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
run-command.c
run-command.h
t/helper/test-run-command.c
t/t0061-run-command.sh

index 82eeac38bf0c10d2432b2b3b67b2125f997db728..a608d37fb22d983bc0431ae861a6cc1ba2f5a9bb 100644 (file)
@@ -1490,6 +1490,16 @@ static int child_is_working(const struct parallel_child *pp_child)
        return pp_child->state == GIT_CP_WORKING;
 }
 
+static int child_is_ready_for_cleanup(const struct parallel_child *pp_child)
+{
+       return child_is_working(pp_child) && !pp_child->process.in;
+}
+
+static int child_is_receiving_input(const struct parallel_child *pp_child)
+{
+       return child_is_working(pp_child) && pp_child->process.in > 0;
+}
+
 struct parallel_processes {
        size_t nr_processes;
 
@@ -1659,6 +1669,44 @@ static int pp_start_one(struct parallel_processes *pp,
        return 0;
 }
 
+static void pp_buffer_stdin(struct parallel_processes *pp,
+                           const struct run_process_parallel_opts *opts)
+{
+       /* Buffer stdin for each pipe. */
+       for (size_t i = 0; i < opts->processes; i++) {
+               struct child_process *proc = &pp->children[i].process;
+               int ret;
+
+               if (!child_is_receiving_input(&pp->children[i]))
+                       continue;
+
+               /*
+                * child input is provided via path_to_stdin when the feed_pipe cb is
+                * missing, so we just signal an EOF.
+                */
+               if (!opts->feed_pipe) {
+                       close(proc->in);
+                       proc->in = 0;
+                       continue;
+               }
+
+               /**
+                * Feed the pipe:
+                *   ret < 0 means error
+                *   ret == 0 means there is more data to be fed
+                *   ret > 0 means feeding finished
+                */
+               ret = opts->feed_pipe(proc->in, opts->data, pp->children[i].data);
+               if (ret < 0)
+                       die_errno("feed_pipe");
+
+               if (ret) {
+                       close(proc->in);
+                       proc->in = 0;
+               }
+       }
+}
+
 static void pp_buffer_stderr(struct parallel_processes *pp,
                             const struct run_process_parallel_opts *opts,
                             int output_timeout)
@@ -1729,6 +1777,7 @@ static int pp_collect_finished(struct parallel_processes *pp,
                pp->children[i].state = GIT_CP_FREE;
                if (pp->pfd)
                        pp->pfd[i].fd = -1;
+               pp->children[i].process.in = 0;
                child_process_init(&pp->children[i].process);
 
                if (opts->ungroup) {
@@ -1763,6 +1812,27 @@ static int pp_collect_finished(struct parallel_processes *pp,
        return result;
 }
 
+static void pp_handle_child_IO(struct parallel_processes *pp,
+                               const struct run_process_parallel_opts *opts,
+                               int output_timeout)
+{
+       /*
+        * First push input, if any (it might no-op), to child tasks to avoid them blocking
+        * after input. This also prevents deadlocks when ungrouping below, if a child blocks
+        * while the parent also waits for them to finish.
+        */
+       pp_buffer_stdin(pp, opts);
+
+       if (opts->ungroup) {
+               for (size_t i = 0; i < opts->processes; i++)
+                       if (child_is_ready_for_cleanup(&pp->children[i]))
+                               pp->children[i].state = GIT_CP_WAIT_CLEANUP;
+       } else {
+               pp_buffer_stderr(pp, opts, output_timeout);
+               pp_output(pp);
+       }
+}
+
 void run_processes_parallel(const struct run_process_parallel_opts *opts)
 {
        int i, code;
@@ -1782,6 +1852,13 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
                                           "max:%"PRIuMAX,
                                           (uintmax_t)opts->processes);
 
+       /*
+        * Child tasks might receive input via stdin, terminating early (or not), so
+        * ignore the default SIGPIPE which gets handled by each feed_pipe_fn which
+        * actually writes the data to children stdin fds.
+        */
+       sigchain_push(SIGPIPE, SIG_IGN);
+
        pp_init(&pp, opts, &pp_sig);
        while (1) {
                for (i = 0;
@@ -1799,13 +1876,7 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
                }
                if (!pp.nr_processes)
                        break;
-               if (opts->ungroup) {
-                       for (size_t i = 0; i < opts->processes; i++)
-                               pp.children[i].state = GIT_CP_WAIT_CLEANUP;
-               } else {
-                       pp_buffer_stderr(&pp, opts, output_timeout);
-                       pp_output(&pp);
-               }
+               pp_handle_child_IO(&pp, opts, output_timeout);
                code = pp_collect_finished(&pp, opts);
                if (code) {
                        pp.shutdown = 1;
@@ -1816,6 +1887,8 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
 
        pp_cleanup(&pp, opts);
 
+       sigchain_pop(SIGPIPE);
+
        if (do_trace2)
                trace2_region_leave(tr2_category, tr2_label, NULL);
 }
index 0df25e445f001cebf0d5ac33d7e6dbe3f779aa6e..e1ca965b5b19882ef1c16b9407eec13ff3e926fe 100644 (file)
@@ -420,6 +420,21 @@ typedef int (*start_failure_fn)(struct strbuf *out,
                                void *pp_cb,
                                void *pp_task_cb);
 
+/**
+ * This callback is repeatedly called on every child process who requests
+ * start_command() to create a pipe by setting child_process.in < 0.
+ *
+ * pp_cb is the callback cookie as passed into run_processes_parallel, and
+ * pp_task_cb is the callback cookie as passed into get_next_task_fn.
+ *
+ * Returns < 0 for error
+ * Returns == 0 when there is more data to be fed (will be called again)
+ * Returns > 0 when finished (child closed fd or no more data to be fed)
+ */
+typedef int (*feed_pipe_fn)(int child_in,
+                               void *pp_cb,
+                               void *pp_task_cb);
+
 /**
  * This callback is called on every child process that finished processing.
  *
@@ -473,6 +488,12 @@ struct run_process_parallel_opts
         */
        start_failure_fn start_failure;
 
+       /*
+        * feed_pipe: see feed_pipe_fn() above. This can be NULL to omit any
+        * special handling.
+        */
+       feed_pipe_fn feed_pipe;
+
        /**
         * task_finished: See task_finished_fn() above. This can be
         * NULL to omit any special handling.
index 3719f23cc2d02f0a6c1760a3263bbc6a0f335fcb..4a56456894ccff9b38aa5bfb3c286897083ca6a5 100644 (file)
@@ -23,19 +23,26 @@ static int number_callbacks;
 static int parallel_next(struct child_process *cp,
                         struct strbuf *err,
                         void *cb,
-                        void **task_cb UNUSED)
+                        void **task_cb)
 {
        struct child_process *d = cb;
        if (number_callbacks >= 4)
                return 0;
 
        strvec_pushv(&cp->args, d->args.v);
+       cp->in = d->in;
+       cp->no_stdin = d->no_stdin;
        if (err)
                strbuf_addstr(err, "preloaded output of a child\n");
        else
                fprintf(stderr, "preloaded output of a child\n");
 
        number_callbacks++;
+
+       /* test_stdin callback will use this to count remaining lines */
+       *task_cb = xmalloc(sizeof(int));
+       *(int*)(*task_cb) = 2;
+
        return 1;
 }
 
@@ -54,15 +61,48 @@ static int no_job(struct child_process *cp UNUSED,
 static int task_finished(int result UNUSED,
                         struct strbuf *err,
                         void *pp_cb UNUSED,
-                        void *pp_task_cb UNUSED)
+                        void *pp_task_cb)
 {
        if (err)
                strbuf_addstr(err, "asking for a quick stop\n");
        else
                fprintf(stderr, "asking for a quick stop\n");
+
+       FREE_AND_NULL(pp_task_cb);
+
        return 1;
 }
 
+static int task_finished_quiet(int result UNUSED,
+                              struct strbuf *err UNUSED,
+                              void *pp_cb UNUSED,
+                              void *pp_task_cb)
+{
+       FREE_AND_NULL(pp_task_cb);
+       return 0;
+}
+
+static int test_stdin_pipe_feed(int hook_stdin_fd, void *cb UNUSED, void *task_cb)
+{
+       int *lines_remaining = task_cb;
+
+       if (*lines_remaining) {
+               struct strbuf buf = STRBUF_INIT;
+               strbuf_addf(&buf, "sample stdin %d\n", --(*lines_remaining));
+               if (write_in_full(hook_stdin_fd, buf.buf, buf.len) < 0) {
+                       if (errno == EPIPE) {
+                               /* child closed stdin, nothing more to do */
+                               strbuf_release(&buf);
+                               return 1;
+                       }
+                       die_errno("write");
+               }
+               strbuf_release(&buf);
+       }
+
+       return !(*lines_remaining);
+}
+
 struct testsuite {
        struct string_list tests, failed;
        int next;
@@ -157,6 +197,7 @@ static int testsuite(int argc, const char **argv)
        struct run_process_parallel_opts opts = {
                .get_next_task = next_test,
                .start_failure = test_failed,
+               .feed_pipe = test_stdin_pipe_feed,
                .task_finished = test_finished,
                .data = &suite,
        };
@@ -460,12 +501,19 @@ int cmd__run_command(int argc, const char **argv)
 
        if (!strcmp(argv[1], "run-command-parallel")) {
                opts.get_next_task = parallel_next;
+               opts.task_finished = task_finished_quiet;
        } else if (!strcmp(argv[1], "run-command-abort")) {
                opts.get_next_task = parallel_next;
                opts.task_finished = task_finished;
        } else if (!strcmp(argv[1], "run-command-no-jobs")) {
                opts.get_next_task = no_job;
                opts.task_finished = task_finished;
+       } else if (!strcmp(argv[1], "run-command-stdin")) {
+               proc.in = -1;
+               proc.no_stdin = 0;
+               opts.get_next_task = parallel_next;
+               opts.task_finished = task_finished_quiet;
+               opts.feed_pipe = test_stdin_pipe_feed;
        } else {
                ret = 1;
                fprintf(stderr, "check usage\n");
index 76d4936a879afdfcc8961117084174d88da96c26..2f77fde0d964c8afd2bd175b77263226aea92d2f 100755 (executable)
@@ -164,6 +164,37 @@ test_expect_success 'run_command runs ungrouped in parallel with more tasks than
        test_line_count = 4 err
 '
 
+test_expect_success 'run_command listens to stdin' '
+       cat >expect <<-\EOF &&
+       preloaded output of a child
+       listening for stdin:
+       sample stdin 1
+       sample stdin 0
+       preloaded output of a child
+       listening for stdin:
+       sample stdin 1
+       sample stdin 0
+       preloaded output of a child
+       listening for stdin:
+       sample stdin 1
+       sample stdin 0
+       preloaded output of a child
+       listening for stdin:
+       sample stdin 1
+       sample stdin 0
+       EOF
+
+       write_script stdin-script <<-\EOF &&
+       echo "listening for stdin:"
+       while read line
+       do
+               echo "$line"
+       done
+       EOF
+       test-tool run-command run-command-stdin 2 ./stdin-script 2>actual &&
+       test_cmp expect actual
+'
+
 cat >expect <<-EOF
 preloaded output of a child
 asking for a quick stop