{
return child_is_working(pp_child) && pp_child->process.in > 0;
}
+static int child_is_sending_output(const struct parallel_child *pp_child)
+{
+ /*
+ * all pp children which buffer output through run_command via ungroup=0
+ * redirect stdout to stderr, so we just need to check process.err.
+ */
+ return child_is_working(pp_child) && pp_child->process.err > 0;
+}
struct parallel_processes {
size_t nr_processes;
CALLOC_ARRAY(pp->children, n);
if (!opts->ungroup)
- CALLOC_ARRAY(pp->pfd, n);
+ CALLOC_ARRAY(pp->pfd, n * 2);
for (size_t i = 0; i < n; i++) {
strbuf_init(&pp->children[i].err, 0);
}
}
-static void pp_buffer_stderr(struct parallel_processes *pp,
- const struct run_process_parallel_opts *opts,
- int output_timeout)
+static void pp_buffer_io(struct parallel_processes *pp,
+ const struct run_process_parallel_opts *opts,
+ int timeout)
{
- while (poll(pp->pfd, opts->processes, output_timeout) < 0) {
+ /* for each potential child slot, prepare two pollfd entries */
+ for (size_t i = 0; i < opts->processes; i++) {
+ if (child_is_sending_output(&pp->children[i])) {
+ pp->pfd[2*i].fd = pp->children[i].process.err;
+ pp->pfd[2*i].events = POLLIN | POLLHUP;
+ } else {
+ pp->pfd[2*i].fd = -1;
+ }
+
+ if (child_is_receiving_input(&pp->children[i])) {
+ pp->pfd[2*i+1].fd = pp->children[i].process.in;
+ pp->pfd[2*i+1].events = POLLOUT;
+ } else {
+ pp->pfd[2*i+1].fd = -1;
+ }
+ }
+
+ while (poll(pp->pfd, opts->processes * 2, timeout) < 0) {
if (errno == EINTR)
continue;
pp_cleanup(pp, opts);
die_errno("poll");
}
- /* Buffer output from all pipes. */
for (size_t i = 0; i < opts->processes; i++) {
+ /* Handle input feeding (stdin) */
+ if (pp->pfd[2*i+1].revents & (POLLOUT | POLLHUP | POLLERR)) {
+ if (opts->feed_pipe) {
+ int ret = opts->feed_pipe(pp->children[i].process.in,
+ opts->data,
+ pp->children[i].data);
+ if (ret < 0)
+ die_errno("feed_pipe");
+ if (ret) {
+ /* done feeding */
+ close(pp->children[i].process.in);
+ pp->children[i].process.in = 0;
+ }
+ } else {
+ /*
+ * No feed_pipe means there is nothing to do, so
+ * close the fd. Child input can be fed by other
+ * methods, such as opts->path_to_stdin which
+ * slurps a file via dup2, so clean up here.
+ */
+ close(pp->children[i].process.in);
+ pp->children[i].process.in = 0;
+ }
+ }
+
+ /* Handle output reading (stderr) */
if (child_is_working(&pp->children[i]) &&
- pp->pfd[i].revents & (POLLIN | POLLHUP)) {
+ pp->pfd[2*i].revents & (POLLIN | POLLHUP)) {
int n = strbuf_read_once(&pp->children[i].err,
pp->children[i].process.err, 0);
if (n == 0) {
static void pp_handle_child_IO(struct parallel_processes *pp,
const struct run_process_parallel_opts *opts,
- int output_timeout)
+ int timeout)
{
- /*
- * First push input, if any (it might no-op), to child tasks to avoid them blocking
- * after input. This also prevents deadlocks when ungrouping below, if a child blocks
- * while the parent also waits for them to finish.
- */
- pp_buffer_stdin(pp, opts);
-
if (opts->ungroup) {
+ pp_buffer_stdin(pp, opts);
for (size_t i = 0; i < opts->processes; i++)
if (child_is_ready_for_cleanup(&pp->children[i]))
pp->children[i].state = GIT_CP_WAIT_CLEANUP;
} else {
- pp_buffer_stderr(pp, opts, output_timeout);
+ pp_buffer_io(pp, opts, timeout);
pp_output(pp);
}
}
void run_processes_parallel(const struct run_process_parallel_opts *opts)
{
int i, code;
- int output_timeout = 100;
+ int timeout = 100;
int spawn_cap = 4;
struct parallel_processes_for_signal pp_sig;
struct parallel_processes pp = {
}
if (!pp.nr_processes)
break;
- pp_handle_child_IO(&pp, opts, output_timeout);
+ pp_handle_child_IO(&pp, opts, timeout);
code = pp_collect_finished(&pp, opts);
if (code) {
pp.shutdown = 1;