* When get_next_task added messages to the buffer in its last
* iteration, the buffered output is non empty.
*/
- strbuf_write(&pp->buffered_output, stderr);
+ if (opts->consume_output)
+ opts->consume_output(&pp->buffered_output, opts->data);
+ else
+ strbuf_write(&pp->buffered_output, stderr);
strbuf_release(&pp->buffered_output);
sigchain_pop_common();
}
}
-static void pp_output(const struct parallel_processes *pp)
+static void pp_output(const struct parallel_processes *pp,
+ const struct run_process_parallel_opts *opts)
{
size_t i = pp->output_owner;
if (child_is_working(&pp->children[i]) &&
pp->children[i].err.len) {
- strbuf_write(&pp->children[i].err, stderr);
+ if (opts->consume_output)
+ opts->consume_output(&pp->children[i].err, opts->data);
+ else
+ strbuf_write(&pp->children[i].err, stderr);
strbuf_reset(&pp->children[i].err);
}
}
} else {
const size_t n = opts->processes;
- strbuf_write(&pp->children[i].err, stderr);
+ /* Output errors, then all other finished child processes */
+ if (opts->consume_output) {
+ opts->consume_output(&pp->children[i].err, opts->data);
+ opts->consume_output(&pp->buffered_output, opts->data);
+ } else {
+ strbuf_write(&pp->children[i].err, stderr);
+ strbuf_write(&pp->buffered_output, stderr);
+ }
strbuf_reset(&pp->children[i].err);
-
- /* Output all other finished child processes */
- strbuf_write(&pp->buffered_output, stderr);
strbuf_reset(&pp->buffered_output);
/*
pp->children[i].state = GIT_CP_WAIT_CLEANUP;
} else {
pp_buffer_stderr(pp, opts, output_timeout);
- pp_output(pp);
+ pp_output(pp, opts);
}
}
"max:%"PRIuMAX,
(uintmax_t)opts->processes);
+ if (opts->ungroup && opts->consume_output)
+ BUG("ungroup and reading output are mutualy exclusive");
+
/*
* Child tasks might receive input via stdin, terminating early (or not), so
* ignore the default SIGPIPE which gets handled by each feed_pipe_fn which
void *pp_cb,
void *pp_task_cb);
+/**
+ * If this callback is provided, output is collated into a new pipe instead
+ * of the process stderr. Then `consume_output_fn` will be called repeatedly
+ * with output contained in the `output` arg. It will also be called with an
+ * empty `output` to allow for keepalives or similar operations if necessary.
+ *
+ * pp_cb is the callback cookie as passed into run_processes_parallel.
+ * No task cookie is provided because the callback receives collated output.
+ */
+typedef void (*consume_output_fn)(struct strbuf *output, void *pp_cb);
+
/**
* This callback is called on every child process that finished processing.
*
*/
feed_pipe_fn feed_pipe;
+ /*
+ * consume_output: see consume_output_fn() above. This can be NULL
+ * to omit any special handling.
+ */
+ consume_output_fn consume_output;
+
/**
* task_finished: See task_finished_fn() above. This can be
* NULL to omit any special handling.
return 0;
}
+static void test_divert_output(struct strbuf *output, void *cb UNUSED)
+{
+ FILE *output_file;
+
+ output_file = fopen("./output_file", "a");
+
+ strbuf_write(output, output_file);
+ fclose(output_file);
+}
+
static int task_finished(int result UNUSED,
struct strbuf *err,
void *pp_cb UNUSED,
.get_next_task = next_test,
.start_failure = test_failed,
.feed_pipe = test_stdin_pipe_feed,
+ .consume_output = test_divert_output,
.task_finished = test_finished,
.data = &suite,
};
opts.get_next_task = parallel_next;
opts.task_finished = task_finished_quiet;
opts.feed_pipe = test_stdin_pipe_feed;
+ } else if (!strcmp(argv[1], "run-command-divert-output")) {
+ opts.get_next_task = parallel_next;
+ opts.consume_output = test_divert_output;
+ opts.task_finished = task_finished_quiet;
} else {
ret = 1;
fprintf(stderr, "check usage\n");