]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
*index: propagate exceptions from shard processes
authorEric Wong <e@80x24.org>
Thu, 4 Sep 2025 19:22:27 +0000 (19:22 +0000)
committerEric Wong <e@80x24.org>
Sat, 6 Sep 2025 18:36:05 +0000 (18:36 +0000)
We'll introduce a new ->ipc_async internal API and use it for
->ipc_do calls where the return value is ignored.  This new
API is modeled after our async API for accessing
`git cat-file --batch*' and remains compatible with synchronous
callers who want the return value of ->ipc_do.

Processes no longer spin and burn CPU after hitting ENOSPC or
dealing with database or FS corruption.  Instead, they should
now die properly in most cases.  However, hangs and crashes may
still possible since Xapian may abort(3) in some ENOSPC cases
and SQLite's may trigger SIGBUS via mmap(2) (if using WAL or
forcing mmap use).

lib/PublicInbox/IO.pm
lib/PublicInbox/IPC.pm
lib/PublicInbox/V2Writable.pm
t/ipc.t

index 8640f11292a63a25af9d22ecd1c265b3d1cc34ce..bb458b207e954e2c348faa1f8494efbf27428e65 100644 (file)
@@ -100,6 +100,8 @@ sub try_cat ($) {
 }
 
 # TODO: move existing HTTP/IMAP/NNTP/POP3 uses of rbuf here
+# this does not return partial data; only a scalar ref on success,
+# 0 on EOF, and undef on error.
 sub my_bufread {
        my ($io, $len) = @_;
        my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = '');
index 5e6ff0ecbcfd27b9d5d50594912e5929755816a7..a013031d27acfce855586c70f7ab6c2060e89060 100644 (file)
@@ -19,6 +19,7 @@ use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
 use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX SOCK_STREAM SOCK_SEQPACKET);
+use Scalar::Util qw(blessed reftype);
 my $MY_MAX_ARG_STRLEN = 4096 * 33; # extra 4K for serialization
 our @EXPORT_OK = qw(ipc_freeze ipc_thaw nproc_shards);
 my ($enc, $dec);
@@ -63,15 +64,72 @@ sub _get_rec ($) {
        ipc_thaw($buf);
 }
 
+sub ipc_fail ($@) {
+       my ($self, @msg) = @_;
+       eval { ipc_worker_stop($self) };
+       unshift @msg, " (stop: $@)" if $@;
+       eval { delete $self->{-ipc_res} };
+       unshift @msg, " (delete -ipc_res: $@)" if $@;
+       croak @msg;
+}
+
+sub ipc_get_res ($) {
+       my ($self) = @_;
+       my $r_res = $self->{-ipc_res} // croak 'BUG: no {-ipc_res}';
+       my ($len, $bref);
+       chop($len = $r_res->my_readline) eq "\n" or
+               ipc_fail $self, "no LF byte in $len";
+       $bref = $r_res->my_bufread($len) or
+               ipc_fail $self, defined($bref) ? 'read EOF' : "read: $!";
+       ipc_thaw($$bref); # may croak
+}
+
+sub ipc_read_step ($$) {
+       my ($self, $inflight) = @_;
+       croak 'BUG: -ipc_inflight too small' if @$inflight < 4;
+       my ($sub, $sub_arg, $acb, $acb_arg) = @$inflight[0..3];
+       my $ret = ipc_get_res $self;
+       splice @$inflight, 0, 4;
+       eval { $acb->($self, $sub, $sub_arg, $acb_arg, $ret) };
+       ipc_fail $self, "E: $sub $@" if $@;
+}
+
 sub _send_rec ($$) {
        my ($w, $ref) = @_;
        my $buf = ipc_freeze($ref);
        print $w length($buf), "\n", $buf or croak "print: $!";
 }
 
+sub ipc_req_async ($$) {
+       my ($self, $ref) = @_;
+       my $buf = ipc_freeze($ref);
+       substr $buf, 0, 0, length($buf)."\n";
+       my $inflight;
+       while ($self->{-ipc_req}) {
+               if (defined(my $w = syswrite $self->{-ipc_req}, $buf)) {
+                       return if $w == length($buf);
+                       substr $buf, 0, $w, ''; # sv_chop
+               } elsif ($! != EAGAIN) {
+                       ipc_fail $self, "write: $!";
+               }
+               $inflight //= $self->{-ipc_inflight};
+               ipc_read_step($self, $inflight) if @$inflight;
+       }
+       ipc_fail $self, '-ipc_req gone (closed in callback?)';
+}
+
 sub ipc_return ($$$) {
        my ($w, $ret, $exc) = @_;
-       _send_rec($w, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret);
+       if ($exc) {
+               # C/C++ exceptions from some XS|SWIG bindings have pointers
+               # when serialized and will segfault if attempting to use
+               # the deserialized result in a different address space, so
+               # we stringify them:
+               blessed($exc) && reftype($exc) eq 'SCALAR' and
+                       $exc = ref($exc).": $exc";
+               $ret = bless \$exc, 'PublicInbox::IPC::Die';
+       }
+       _send_rec $w, $ret;
 }
 
 sub ipc_worker_loop ($$$) {
@@ -84,7 +142,7 @@ sub ipc_worker_loop ($$$) {
                # this is the overwhelmingly likely case
                if (!defined($wantarray)) {
                        eval { $self->$sub(@args) };
-                       warn "$$ die: $@ (from nowait $sub)\n" if $@;
+                       ipc_return($w_res, \undef, $@);
                } elsif ($wantarray) {
                        my @ret = eval { $self->$sub(@args) };
                        ipc_return($w_res, \@ret, $@);
@@ -101,7 +159,7 @@ sub exit_exception { exit(!!$@) }
 sub ipc_worker_spawn {
        my ($self, $ident, $oldset, $fields, @cb_args) = @_;
        return if $self->{-ipc_res} && $self->{-ipc_res}->can_reap; # idempotent
-       delete(@$self{qw(-ipc_req -ipc_res)});
+       delete(@$self{qw(-ipc_req -ipc_res -ipc_inflight)});
 
        # n.b. we use 2 pipes here instead of a single socketpair since
        # Linux (as of v6.15) allows a 1MB pipe buffer but only 0.5MB
@@ -134,14 +192,26 @@ sub ipc_worker_spawn {
        PublicInbox::DS::sig_setmask($sigset) unless $oldset;
        $r_req = $w_res = undef;
        $w_req->autoflush(1);
+       my $inflight = $self->{-ipc_inflight} = [];
+       $r_res->blocking(0);
+       $w_req->blocking(0);
        $self->{-ipc_req} = $w_req;
        $self->{-ipc_res} = PublicInbox::IO::attach_pid($r_res, $pid,
-                                       \&ipc_worker_reap, $self, @cb_args);
+                               \&ipc_worker_reap, $self, $inflight, @cb_args);
        $pid; # used by tests
 }
 
+# n.b. we don't rely on {-ipc_inflight} and instead pass $inflight
+# explicitly since we need to ensure $inflight is tied to the correct
+# $pid and $self fields can be clobbered on respawn
 sub ipc_worker_reap { # awaitpid callback
-       my ($pid, $self, $cb, @args) = @_;
+       my ($pid, $self, $inflight, $cb, @args) = @_;
+       while (defined($inflight) && @$inflight) {
+               my ($sub, $sub_arg, $acb, $acb_arg) = splice @$inflight, 0, 4;
+               my $exc = bless \(my $x = 'aborted'), 'PublicInbox::IPC::Die';
+               eval { $acb->($self, $sub, $sub_arg, $acb_arg, $exc) };
+               warn "E: (in abort): $sub: $@" if $@;
+       }
        return $cb->($pid, $self, @args) if $cb;
        return if !$?;
        my $s = $? & 127;
@@ -164,7 +234,11 @@ sub ipc_atfork_child {
 # idempotent, can be called regardless of whether worker is active or not
 sub ipc_worker_stop {
        my ($self) = @_;
-       delete $self->{$_} for qw(-ipc_req -ipc_res); # order matters
+       if (my $w_req = delete $self->{-ipc_req}) {
+               close $w_req; # invalidate if referenced upstack
+               ipc_wait_all $self;
+               delete $self->{-ipc_res}; # ipc_worker_reap will fire
+       }
 }
 
 sub _wait_return ($$) {
@@ -174,22 +248,56 @@ sub _wait_return ($$) {
        wantarray ? @$ret : $$ret;
 }
 
+my $ipc_die = sub { # default ipc_async acb
+       my ($self, undef, undef, undef, $ret) = @_;
+       if (ref($ret) eq 'PublicInbox::IPC::Die') {
+               ipc_worker_stop $self;
+               croak $$ret;
+       }
+};
+
+sub ipc_wait_all ($) {
+       my ($self) = @_;
+       my $inflight = $self->{-ipc_inflight} // return;
+       ipc_read_step($self, $inflight) while @$inflight;
+}
+
 # call $self->$sub(@args), on a worker if ipc_worker_spawn was used
 sub ipc_do {
        my ($self, $sub, @args) = @_;
-       if (my $w_req = $self->{-ipc_req}) { # run in worker
+       if ($self->{-ipc_req}) { # run in worker
                if (defined(wantarray)) {
-                       my $r_res = $self->{-ipc_res} or die 'no ipc_res';
-                       _send_rec($w_req, [ wantarray, $sub, @args ]);
-                       _wait_return($r_res, $sub);
-               } else { # likely, fire-and-forget into pipe
-                       _send_rec($w_req, [ undef , $sub, @args ]);
+                       ipc_wait_all $self;
+                       ipc_req_async $self, [ wantarray, $sub, @args ];
+                       my $ret = ipc_get_res($self);
+                       die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
+                       wantarray ? @$ret : $$ret;
+               } else { # likely, fire-and-forget into pipe, but dies async
+                       ipc_req_async $self, [ undef, $sub, @args ];
+                       push @{$self->{-ipc_inflight}}, $sub, \@args,
+                                               $ipc_die, undef;
                }
        } else { # run locally
                $self->$sub(@args);
        }
 }
 
+sub ipc_async {
+       my ($self, $sub, $sub_arg, $acb, $acb_arg) = @_;
+       $sub_arg //= [];
+       $acb //= $ipc_die;
+       if ($self->{-ipc_req}) { # run in worker
+               ipc_req_async $self, [ 1, $sub, @$sub_arg ];
+               push @{$self->{-ipc_inflight}}, $sub, $sub_arg, $acb, $acb_arg;
+       } else { # run locally
+               my @ret = eval { $self->$sub(@$sub_arg) };
+               my $exc = $@;
+               my $ret = $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : \@ret;
+               $acb->($self, $sub, $sub_arg, $acb_arg, $ret);
+               undef;
+       }
+}
+
 # needed when there's multiple IPC workers and the parent forking
 # causes newer siblings to inherit older siblings sockets
 sub ipc_sibling_atfork_child {
@@ -362,10 +470,11 @@ sub _wq_worker_start {
                undef $end; # trigger exit
        } elsif ($bcast1) {
                $self->{-wq_workers}->{$pid} = PublicInbox::IO::attach_pid(
-                       $bcast1, $pid, \&ipc_worker_reap, $self, @cb_args);
+                       $bcast1, $pid,
+                       \&ipc_worker_reap, $self, undef, @cb_args);
        } else { # $one
                $self->{-wq_workers}->{$pid} = undef;
-               awaitpid($pid, \&ipc_worker_reap, $self, @cb_args);
+               awaitpid($pid, \&ipc_worker_reap, $self, undef, @cb_args);
        }
 }
 
index aea215cff0e626ccd6011f2ef6d949a59308cc46..8fa7daa3448ed31e28dab21077e2b1104022a407 100644 (file)
@@ -560,18 +560,9 @@ sub checkpoint ($;$) {
                        ($self->{oidx}->{-art_max}//0) >= $self->{defrag_at} and
                        do_defrag $self;
 
-               # transactions started on parallel shards,
-               # wait for them by issuing an echo command (echo can only
-               # run after commit_txn_lazy is done)
+               # wait for all transactions started on parallel shards
                if ($wait && $self->{parallel}) {
-                       my $i = 0;
-                       for my $shard (@$shards) {
-                               my $echo = $shard->ipc_do('echo', $i);
-                               $echo == $i or die <<"";
-shard[$i] bad echo:$echo != $i waiting for txn commit
-
-                               ++$i;
-                       }
+                       $_->ipc_wait_all for @$shards;
                }
 
                my $midx = $self->{midx}; # misc index
diff --git a/t/ipc.t b/t/ipc.t
index 0ebe0417f3b1c094c2763f79b5b12de58b5e903a..fbf0dfa7f647ffef0c2c0b76ca2c86a9e0a93ef0 100644 (file)
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -92,8 +92,21 @@ $test->('local');
        is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned');
        $test->('worker');
        is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned');
-       $ipc->ipc_worker_stop;
+       my @x;
+       $ipc->ipc_async('test_pid', ['hi'],
+               sub { (undef, @x) = @_ }, [ 'acb_arg' ]);
+       $ipc->ipc_async('test_die', ['DIE']);
+       eval { $ipc->ipc_worker_stop };
+       like $@, qr/\bDIE\b/, 'exception propagated';
+       is_deeply \@x, [ 'test_pid', ['hi'], [ 'acb_arg' ], [ $pid ] ],
+               'ipc_async (worker)';
        ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped');
+
+       $ipc->ipc_async('test_pid', ['hi'],
+               sub { (undef, @x) = @_ }, [ 'acb_arg' ]);
+       is_deeply \@x, [ 'test_pid', ['hi'], [ 'acb_arg' ], [ $$ ] ],
+               'ipc_async (same process)';
+
 }
 $ipc->ipc_worker_stop; # idempotent