From f0b9f90a233876d832f4235b253adac88932746a Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 4 Sep 2025 19:22:27 +0000 Subject: [PATCH] *index: propagate exceptions from shard processes We'll introduce a new ->ipc_async internal API and use it for ->ipc_do calls where the return value is ignored. This new API is modeled after our async API for accessing `git cat-file --batch*' and remains compatible with synchronous callers who want the return value of ->ipc_do. Processes no longer spin and burn CPU after hitting ENOSPC or dealing with database or FS corruption. Instead, they should now die properly in most cases. However, hangs and crashes may still possible since Xapian may abort(3) in some ENOSPC cases and SQLite's may trigger SIGBUS via mmap(2) (if using WAL or forcing mmap use). --- lib/PublicInbox/IO.pm | 2 + lib/PublicInbox/IPC.pm | 137 ++++++++++++++++++++++++++++++---- lib/PublicInbox/V2Writable.pm | 13 +--- t/ipc.t | 15 +++- 4 files changed, 141 insertions(+), 26 deletions(-) diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm index 8640f1129..bb458b207 100644 --- a/lib/PublicInbox/IO.pm +++ b/lib/PublicInbox/IO.pm @@ -100,6 +100,8 @@ sub try_cat ($) { } # TODO: move existing HTTP/IMAP/NNTP/POP3 uses of rbuf here +# this does not return partial data; only a scalar ref on success, +# 0 on EOF, and undef on error. sub my_bufread { my ($io, $len) = @_; my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = ''); diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 5e6ff0ecb..a013031d2 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -19,6 +19,7 @@ use PublicInbox::Spawn; use PublicInbox::OnDestroy; use PublicInbox::WQWorker; use Socket qw(AF_UNIX SOCK_STREAM SOCK_SEQPACKET); +use Scalar::Util qw(blessed reftype); my $MY_MAX_ARG_STRLEN = 4096 * 33; # extra 4K for serialization our @EXPORT_OK = qw(ipc_freeze ipc_thaw nproc_shards); my ($enc, $dec); @@ -63,15 +64,72 @@ sub _get_rec ($) { ipc_thaw($buf); } +sub ipc_fail ($@) { + my ($self, @msg) = @_; + eval { ipc_worker_stop($self) }; + unshift @msg, " (stop: $@)" if $@; + eval { delete $self->{-ipc_res} }; + unshift @msg, " (delete -ipc_res: $@)" if $@; + croak @msg; +} + +sub ipc_get_res ($) { + my ($self) = @_; + my $r_res = $self->{-ipc_res} // croak 'BUG: no {-ipc_res}'; + my ($len, $bref); + chop($len = $r_res->my_readline) eq "\n" or + ipc_fail $self, "no LF byte in $len"; + $bref = $r_res->my_bufread($len) or + ipc_fail $self, defined($bref) ? 'read EOF' : "read: $!"; + ipc_thaw($$bref); # may croak +} + +sub ipc_read_step ($$) { + my ($self, $inflight) = @_; + croak 'BUG: -ipc_inflight too small' if @$inflight < 4; + my ($sub, $sub_arg, $acb, $acb_arg) = @$inflight[0..3]; + my $ret = ipc_get_res $self; + splice @$inflight, 0, 4; + eval { $acb->($self, $sub, $sub_arg, $acb_arg, $ret) }; + ipc_fail $self, "E: $sub $@" if $@; +} + sub _send_rec ($$) { my ($w, $ref) = @_; my $buf = ipc_freeze($ref); print $w length($buf), "\n", $buf or croak "print: $!"; } +sub ipc_req_async ($$) { + my ($self, $ref) = @_; + my $buf = ipc_freeze($ref); + substr $buf, 0, 0, length($buf)."\n"; + my $inflight; + while ($self->{-ipc_req}) { + if (defined(my $w = syswrite $self->{-ipc_req}, $buf)) { + return if $w == length($buf); + substr $buf, 0, $w, ''; # sv_chop + } elsif ($! != EAGAIN) { + ipc_fail $self, "write: $!"; + } + $inflight //= $self->{-ipc_inflight}; + ipc_read_step($self, $inflight) if @$inflight; + } + ipc_fail $self, '-ipc_req gone (closed in callback?)'; +} + sub ipc_return ($$$) { my ($w, $ret, $exc) = @_; - _send_rec($w, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret); + if ($exc) { + # C/C++ exceptions from some XS|SWIG bindings have pointers + # when serialized and will segfault if attempting to use + # the deserialized result in a different address space, so + # we stringify them: + blessed($exc) && reftype($exc) eq 'SCALAR' and + $exc = ref($exc).": $exc"; + $ret = bless \$exc, 'PublicInbox::IPC::Die'; + } + _send_rec $w, $ret; } sub ipc_worker_loop ($$$) { @@ -84,7 +142,7 @@ sub ipc_worker_loop ($$$) { # this is the overwhelmingly likely case if (!defined($wantarray)) { eval { $self->$sub(@args) }; - warn "$$ die: $@ (from nowait $sub)\n" if $@; + ipc_return($w_res, \undef, $@); } elsif ($wantarray) { my @ret = eval { $self->$sub(@args) }; ipc_return($w_res, \@ret, $@); @@ -101,7 +159,7 @@ sub exit_exception { exit(!!$@) } sub ipc_worker_spawn { my ($self, $ident, $oldset, $fields, @cb_args) = @_; return if $self->{-ipc_res} && $self->{-ipc_res}->can_reap; # idempotent - delete(@$self{qw(-ipc_req -ipc_res)}); + delete(@$self{qw(-ipc_req -ipc_res -ipc_inflight)}); # n.b. we use 2 pipes here instead of a single socketpair since # Linux (as of v6.15) allows a 1MB pipe buffer but only 0.5MB @@ -134,14 +192,26 @@ sub ipc_worker_spawn { PublicInbox::DS::sig_setmask($sigset) unless $oldset; $r_req = $w_res = undef; $w_req->autoflush(1); + my $inflight = $self->{-ipc_inflight} = []; + $r_res->blocking(0); + $w_req->blocking(0); $self->{-ipc_req} = $w_req; $self->{-ipc_res} = PublicInbox::IO::attach_pid($r_res, $pid, - \&ipc_worker_reap, $self, @cb_args); + \&ipc_worker_reap, $self, $inflight, @cb_args); $pid; # used by tests } +# n.b. we don't rely on {-ipc_inflight} and instead pass $inflight +# explicitly since we need to ensure $inflight is tied to the correct +# $pid and $self fields can be clobbered on respawn sub ipc_worker_reap { # awaitpid callback - my ($pid, $self, $cb, @args) = @_; + my ($pid, $self, $inflight, $cb, @args) = @_; + while (defined($inflight) && @$inflight) { + my ($sub, $sub_arg, $acb, $acb_arg) = splice @$inflight, 0, 4; + my $exc = bless \(my $x = 'aborted'), 'PublicInbox::IPC::Die'; + eval { $acb->($self, $sub, $sub_arg, $acb_arg, $exc) }; + warn "E: (in abort): $sub: $@" if $@; + } return $cb->($pid, $self, @args) if $cb; return if !$?; my $s = $? & 127; @@ -164,7 +234,11 @@ sub ipc_atfork_child { # idempotent, can be called regardless of whether worker is active or not sub ipc_worker_stop { my ($self) = @_; - delete $self->{$_} for qw(-ipc_req -ipc_res); # order matters + if (my $w_req = delete $self->{-ipc_req}) { + close $w_req; # invalidate if referenced upstack + ipc_wait_all $self; + delete $self->{-ipc_res}; # ipc_worker_reap will fire + } } sub _wait_return ($$) { @@ -174,22 +248,56 @@ sub _wait_return ($$) { wantarray ? @$ret : $$ret; } +my $ipc_die = sub { # default ipc_async acb + my ($self, undef, undef, undef, $ret) = @_; + if (ref($ret) eq 'PublicInbox::IPC::Die') { + ipc_worker_stop $self; + croak $$ret; + } +}; + +sub ipc_wait_all ($) { + my ($self) = @_; + my $inflight = $self->{-ipc_inflight} // return; + ipc_read_step($self, $inflight) while @$inflight; +} + # call $self->$sub(@args), on a worker if ipc_worker_spawn was used sub ipc_do { my ($self, $sub, @args) = @_; - if (my $w_req = $self->{-ipc_req}) { # run in worker + if ($self->{-ipc_req}) { # run in worker if (defined(wantarray)) { - my $r_res = $self->{-ipc_res} or die 'no ipc_res'; - _send_rec($w_req, [ wantarray, $sub, @args ]); - _wait_return($r_res, $sub); - } else { # likely, fire-and-forget into pipe - _send_rec($w_req, [ undef , $sub, @args ]); + ipc_wait_all $self; + ipc_req_async $self, [ wantarray, $sub, @args ]; + my $ret = ipc_get_res($self); + die $$ret if ref($ret) eq 'PublicInbox::IPC::Die'; + wantarray ? @$ret : $$ret; + } else { # likely, fire-and-forget into pipe, but dies async + ipc_req_async $self, [ undef, $sub, @args ]; + push @{$self->{-ipc_inflight}}, $sub, \@args, + $ipc_die, undef; } } else { # run locally $self->$sub(@args); } } +sub ipc_async { + my ($self, $sub, $sub_arg, $acb, $acb_arg) = @_; + $sub_arg //= []; + $acb //= $ipc_die; + if ($self->{-ipc_req}) { # run in worker + ipc_req_async $self, [ 1, $sub, @$sub_arg ]; + push @{$self->{-ipc_inflight}}, $sub, $sub_arg, $acb, $acb_arg; + } else { # run locally + my @ret = eval { $self->$sub(@$sub_arg) }; + my $exc = $@; + my $ret = $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : \@ret; + $acb->($self, $sub, $sub_arg, $acb_arg, $ret); + undef; + } +} + # needed when there's multiple IPC workers and the parent forking # causes newer siblings to inherit older siblings sockets sub ipc_sibling_atfork_child { @@ -362,10 +470,11 @@ sub _wq_worker_start { undef $end; # trigger exit } elsif ($bcast1) { $self->{-wq_workers}->{$pid} = PublicInbox::IO::attach_pid( - $bcast1, $pid, \&ipc_worker_reap, $self, @cb_args); + $bcast1, $pid, + \&ipc_worker_reap, $self, undef, @cb_args); } else { # $one $self->{-wq_workers}->{$pid} = undef; - awaitpid($pid, \&ipc_worker_reap, $self, @cb_args); + awaitpid($pid, \&ipc_worker_reap, $self, undef, @cb_args); } } diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index aea215cff..8fa7daa34 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -560,18 +560,9 @@ sub checkpoint ($;$) { ($self->{oidx}->{-art_max}//0) >= $self->{defrag_at} and do_defrag $self; - # transactions started on parallel shards, - # wait for them by issuing an echo command (echo can only - # run after commit_txn_lazy is done) + # wait for all transactions started on parallel shards if ($wait && $self->{parallel}) { - my $i = 0; - for my $shard (@$shards) { - my $echo = $shard->ipc_do('echo', $i); - $echo == $i or die <<""; -shard[$i] bad echo:$echo != $i waiting for txn commit - - ++$i; - } + $_->ipc_wait_all for @$shards; } my $midx = $self->{midx}; # misc index diff --git a/t/ipc.t b/t/ipc.t index 0ebe0417f..fbf0dfa7f 100644 --- a/t/ipc.t +++ b/t/ipc.t @@ -92,8 +92,21 @@ $test->('local'); is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned'); $test->('worker'); is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned'); - $ipc->ipc_worker_stop; + my @x; + $ipc->ipc_async('test_pid', ['hi'], + sub { (undef, @x) = @_ }, [ 'acb_arg' ]); + $ipc->ipc_async('test_die', ['DIE']); + eval { $ipc->ipc_worker_stop }; + like $@, qr/\bDIE\b/, 'exception propagated'; + is_deeply \@x, [ 'test_pid', ['hi'], [ 'acb_arg' ], [ $pid ] ], + 'ipc_async (worker)'; ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped'); + + $ipc->ipc_async('test_pid', ['hi'], + sub { (undef, @x) = @_ }, [ 'acb_arg' ]); + is_deeply \@x, [ 'test_pid', ['hi'], [ 'acb_arg' ], [ $$ ] ], + 'ipc_async (same process)'; + } $ipc->ipc_worker_stop; # idempotent -- 2.47.3