}
}
-sub cb_spawn ($$$$) {
- my ($cb, $ibxish, $args, $opt) = @_; # $cb = cpdb() or compact()
+sub cb_spawn ($$) {
+ my ($job, $args) = @_; # $job->{cb} = cpdb() or compact()
my $pid = PublicInbox::DS::fork_persist;
- return $pid if $pid > 0;
+ if ($pid > 0) {
+ $job->{pids}->{$pid} = 1;
+ return awaitpid($pid, \&cb_done, $job, $args);
+ }
$SIG{PIPE} = 'DEFAULT'; # warn may fail
$SIG{__DIE__} = sub { warn @_; _exit(1) }; # don't jump up stack
- $cb->($ibxish, $args, $opt);
+ $job->{cb}->($job->{owner}, $args, $job->{opt});
_exit(0);
}
+sub cb_done { # awaitpid cb
+ my ($pid, $job, $args) = @_;
+ delete $job->{pids}->{$pid} // die "BUG: PID:$pid reaped";
+ die "E: @$args failed: $?" if $?;
+ cb_spawn($job, shift(@{$job->{q}}) // return);
+}
+
sub runnable_or_die ($) {
my ($exe) = @_;
which($exe) or die "$exe not found in PATH\n";
}
sub kill_pids {
- my ($sig, $pids) = @_;
- kill($sig, keys %$pids); # pids may be empty
+ my ($sig, $job) = @_;
+ kill($sig, keys %{$job->{pids}}); # pids may be empty
+}
+
+sub pids_running { # PublicInbox::DS::post_loop_do cb
+ my ($job) = @_;
+ scalar keys %{$job->{pids}};
}
sub process_queue ($$$$) {
}
return;
}
-
+ my $job = { q => $queue, owner => $ibxish, opt => $opt, cb => $cb };
# run in parallel:
- my %pids;
local @SIG{keys %SIG} = values %SIG;
- setup_signals(\&kill_pids, \%pids);
- while (@$queue) {
- while (scalar(keys(%pids)) < $max && scalar(@$queue)) {
- my $args = shift @$queue;
- $pids{cb_spawn($cb, $ibxish, $args, $opt)} = $args;
- }
-
- my $flags = 0;
- while (scalar keys %pids) {
- my $pid = waitpid(-1, $flags) or last;
- last if $pid < 0;
- my $args = delete $pids{$pid};
- if ($args) {
- die "E: @$args failed: $?\n" if $?;
- } else {
- warn "unknown PID($pid) reaped: $?\n";
- }
- $flags = WNOHANG if scalar(@$queue);
- }
- }
+ setup_signals(\&kill_pids, $job);
+ $SIG{CHLD} = \&PublicInbox::DS::enqueue_reap;
+ local @PublicInbox::DS::post_loop_do = (\&pids_running, $job);
+ cb_spawn($job, shift(@$queue) // last) for (1..$max);
+ PublicInbox::DS::event_loop;
}
sub prepare_run {