From: Eric Wong Date: Thu, 2 Apr 2026 19:44:47 +0000 (+0000) Subject: xapcmd: use DS instead of custom loop X-Git-Url: http://git.ipfire.org/index.cgi?a=commitdiff_plain;h=HEAD;p=thirdparty%2Fpublic-inbox.git xapcmd: use DS instead of custom loop Using the DS event_loop ought to make our codebase more accessible and eventually allow us to integrate compact into LeiStore.pm more easily, I hope. --- diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm index e22971af8..86b51207a 100644 --- a/lib/PublicInbox/Xapcmd.pm +++ b/lib/PublicInbox/Xapcmd.pm @@ -110,16 +110,26 @@ sub commit_changes ($$$$) { } } -sub cb_spawn ($$$$) { - my ($cb, $ibxish, $args, $opt) = @_; # $cb = cpdb() or compact() +sub cb_spawn ($$) { + my ($job, $args) = @_; # $job->{cb} = cpdb() or compact() my $pid = PublicInbox::DS::fork_persist; - return $pid if $pid > 0; + if ($pid > 0) { + $job->{pids}->{$pid} = 1; + return awaitpid($pid, \&cb_done, $job, $args); + } $SIG{PIPE} = 'DEFAULT'; # warn may fail $SIG{__DIE__} = sub { warn @_; _exit(1) }; # don't jump up stack - $cb->($ibxish, $args, $opt); + $job->{cb}->($job->{owner}, $args, $job->{opt}); _exit(0); } +sub cb_done { # awaitpid cb + my ($pid, $job, $args) = @_; + delete $job->{pids}->{$pid} // die "BUG: PID:$pid reaped"; + die "E: @$args failed: $?" if $?; + cb_spawn($job, shift(@{$job->{q}}) // return); +} + sub runnable_or_die ($) { my ($exe) = @_; which($exe) or die "$exe not found in PATH\n"; @@ -153,8 +163,13 @@ sub same_fs_or_die ($$) { } sub kill_pids { - my ($sig, $pids) = @_; - kill($sig, keys %$pids); # pids may be empty + my ($sig, $job) = @_; + kill($sig, keys %{$job->{pids}}); # pids may be empty +} + +sub pids_running { # PublicInbox::DS::post_loop_do cb + my ($job) = @_; + scalar keys %{$job->{pids}}; } sub process_queue ($$$$) { @@ -167,30 +182,14 @@ sub process_queue ($$$$) { } return; } - + my $job = { q => $queue, owner => $ibxish, opt => $opt, cb => $cb }; # run in parallel: - my %pids; local @SIG{keys %SIG} = values %SIG; - setup_signals(\&kill_pids, \%pids); - while (@$queue) { - while (scalar(keys(%pids)) < $max && scalar(@$queue)) { - my $args = shift @$queue; - $pids{cb_spawn($cb, $ibxish, $args, $opt)} = $args; - } - - my $flags = 0; - while (scalar keys %pids) { - my $pid = waitpid(-1, $flags) or last; - last if $pid < 0; - my $args = delete $pids{$pid}; - if ($args) { - die "E: @$args failed: $?\n" if $?; - } else { - warn "unknown PID($pid) reaped: $?\n"; - } - $flags = WNOHANG if scalar(@$queue); - } - } + setup_signals(\&kill_pids, $job); + $SIG{CHLD} = \&PublicInbox::DS::enqueue_reap; + local @PublicInbox::DS::post_loop_do = (\&pids_running, $job); + cb_spawn($job, shift(@$queue) // last) for (1..$max); + PublicInbox::DS::event_loop; } sub prepare_run {