]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
xapcmd: use DS instead of custom loop master
authorEric Wong <e@80x24.org>
Thu, 2 Apr 2026 19:44:47 +0000 (19:44 +0000)
committerEric Wong <e@80x24.org>
Fri, 10 Apr 2026 22:25:16 +0000 (22:25 +0000)
Using the DS event_loop ought to make our codebase more
accessible and eventually allow us to integrate compact
into LeiStore.pm more easily, I hope.

lib/PublicInbox/Xapcmd.pm

index e22971af8ae4490dce3a7d1526b071137daa52f5..86b51207a4ec7651d514d731eafd1ec9715ad611 100644 (file)
@@ -110,16 +110,26 @@ sub commit_changes ($$$$) {
        }
 }
 
-sub cb_spawn ($$$$) {
-       my ($cb, $ibxish, $args, $opt) = @_; # $cb = cpdb() or compact()
+sub cb_spawn ($$) {
+       my ($job, $args) = @_; # $job->{cb} = cpdb() or compact()
        my $pid = PublicInbox::DS::fork_persist;
-       return $pid if $pid > 0;
+       if ($pid > 0) {
+               $job->{pids}->{$pid} = 1;
+               return awaitpid($pid, \&cb_done, $job, $args);
+       }
        $SIG{PIPE} = 'DEFAULT'; # warn may fail
        $SIG{__DIE__} = sub { warn @_; _exit(1) }; # don't jump up stack
-       $cb->($ibxish, $args, $opt);
+       $job->{cb}->($job->{owner}, $args, $job->{opt});
        _exit(0);
 }
 
+sub cb_done { # awaitpid cb
+       my ($pid, $job, $args) = @_;
+       delete $job->{pids}->{$pid} // die "BUG: PID:$pid reaped";
+       die "E: @$args failed: $?" if $?;
+       cb_spawn($job, shift(@{$job->{q}}) // return);
+}
+
 sub runnable_or_die ($) {
        my ($exe) = @_;
        which($exe) or die "$exe not found in PATH\n";
@@ -153,8 +163,13 @@ sub same_fs_or_die ($$) {
 }
 
 sub kill_pids {
-       my ($sig, $pids) = @_;
-       kill($sig, keys %$pids); # pids may be empty
+       my ($sig, $job) = @_;
+       kill($sig, keys %{$job->{pids}}); # pids may be empty
+}
+
+sub pids_running { # PublicInbox::DS::post_loop_do cb
+       my ($job) = @_;
+       scalar keys %{$job->{pids}};
 }
 
 sub process_queue ($$$$) {
@@ -167,30 +182,14 @@ sub process_queue ($$$$) {
                }
                return;
        }
-
+       my $job = { q => $queue, owner => $ibxish, opt => $opt, cb => $cb };
        # run in parallel:
-       my %pids;
        local @SIG{keys %SIG} = values %SIG;
-       setup_signals(\&kill_pids, \%pids);
-       while (@$queue) {
-               while (scalar(keys(%pids)) < $max && scalar(@$queue)) {
-                       my $args = shift @$queue;
-                       $pids{cb_spawn($cb, $ibxish, $args, $opt)} = $args;
-               }
-
-               my $flags = 0;
-               while (scalar keys %pids) {
-                       my $pid = waitpid(-1, $flags) or last;
-                       last if $pid < 0;
-                       my $args = delete $pids{$pid};
-                       if ($args) {
-                               die "E: @$args failed: $?\n" if $?;
-                       } else {
-                               warn "unknown PID($pid) reaped: $?\n";
-                       }
-                       $flags = WNOHANG if scalar(@$queue);
-               }
-       }
+       setup_signals(\&kill_pids, $job);
+       $SIG{CHLD} = \&PublicInbox::DS::enqueue_reap;
+       local @PublicInbox::DS::post_loop_do = (\&pids_running, $job);
+       cb_spawn($job, shift(@$queue) // last) for (1..$max);
+       PublicInbox::DS::event_loop;
 }
 
 sub prepare_run {