use PublicInbox::OnDestroy;
use PublicInbox::WQWorker;
use Socket qw(AF_UNIX SOCK_STREAM SOCK_SEQPACKET);
+use Scalar::Util qw(blessed reftype);
my $MY_MAX_ARG_STRLEN = 4096 * 33; # extra 4K for serialization
our @EXPORT_OK = qw(ipc_freeze ipc_thaw nproc_shards);
my ($enc, $dec);
ipc_thaw($buf);
}
+sub ipc_fail ($@) {
+ my ($self, @msg) = @_;
+ eval { ipc_worker_stop($self) };
+ unshift @msg, " (stop: $@)" if $@;
+ eval { delete $self->{-ipc_res} };
+ unshift @msg, " (delete -ipc_res: $@)" if $@;
+ croak @msg;
+}
+
+sub ipc_get_res ($) {
+ my ($self) = @_;
+ my $r_res = $self->{-ipc_res} // croak 'BUG: no {-ipc_res}';
+ my ($len, $bref);
+ chop($len = $r_res->my_readline) eq "\n" or
+ ipc_fail $self, "no LF byte in $len";
+ $bref = $r_res->my_bufread($len) or
+ ipc_fail $self, defined($bref) ? 'read EOF' : "read: $!";
+ ipc_thaw($$bref); # may croak
+}
+
+sub ipc_read_step ($$) {
+ my ($self, $inflight) = @_;
+ croak 'BUG: -ipc_inflight too small' if @$inflight < 4;
+ my ($sub, $sub_arg, $acb, $acb_arg) = @$inflight[0..3];
+ my $ret = ipc_get_res $self;
+ splice @$inflight, 0, 4;
+ eval { $acb->($self, $sub, $sub_arg, $acb_arg, $ret) };
+ ipc_fail $self, "E: $sub $@" if $@;
+}
+
sub _send_rec ($$) {
my ($w, $ref) = @_;
my $buf = ipc_freeze($ref);
print $w length($buf), "\n", $buf or croak "print: $!";
}
+sub ipc_req_async ($$) {
+ my ($self, $ref) = @_;
+ my $buf = ipc_freeze($ref);
+ substr $buf, 0, 0, length($buf)."\n";
+ my $inflight;
+ while ($self->{-ipc_req}) {
+ if (defined(my $w = syswrite $self->{-ipc_req}, $buf)) {
+ return if $w == length($buf);
+ substr $buf, 0, $w, ''; # sv_chop
+ } elsif ($! != EAGAIN) {
+ ipc_fail $self, "write: $!";
+ }
+ $inflight //= $self->{-ipc_inflight};
+ ipc_read_step($self, $inflight) if @$inflight;
+ }
+ ipc_fail $self, '-ipc_req gone (closed in callback?)';
+}
+
sub ipc_return ($$$) {
my ($w, $ret, $exc) = @_;
- _send_rec($w, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret);
+ if ($exc) {
+ # C/C++ exceptions from some XS|SWIG bindings have pointers
+ # when serialized and will segfault if attempting to use
+ # the deserialized result in a different address space, so
+ # we stringify them:
+ blessed($exc) && reftype($exc) eq 'SCALAR' and
+ $exc = ref($exc).": $exc";
+ $ret = bless \$exc, 'PublicInbox::IPC::Die';
+ }
+ _send_rec $w, $ret;
}
sub ipc_worker_loop ($$$) {
# this is the overwhelmingly likely case
if (!defined($wantarray)) {
eval { $self->$sub(@args) };
- warn "$$ die: $@ (from nowait $sub)\n" if $@;
+ ipc_return($w_res, \undef, $@);
} elsif ($wantarray) {
my @ret = eval { $self->$sub(@args) };
ipc_return($w_res, \@ret, $@);
sub ipc_worker_spawn {
my ($self, $ident, $oldset, $fields, @cb_args) = @_;
return if $self->{-ipc_res} && $self->{-ipc_res}->can_reap; # idempotent
- delete(@$self{qw(-ipc_req -ipc_res)});
+ delete(@$self{qw(-ipc_req -ipc_res -ipc_inflight)});
# n.b. we use 2 pipes here instead of a single socketpair since
# Linux (as of v6.15) allows a 1MB pipe buffer but only 0.5MB
PublicInbox::DS::sig_setmask($sigset) unless $oldset;
$r_req = $w_res = undef;
$w_req->autoflush(1);
+ my $inflight = $self->{-ipc_inflight} = [];
+ $r_res->blocking(0);
+ $w_req->blocking(0);
$self->{-ipc_req} = $w_req;
$self->{-ipc_res} = PublicInbox::IO::attach_pid($r_res, $pid,
- \&ipc_worker_reap, $self, @cb_args);
+ \&ipc_worker_reap, $self, $inflight, @cb_args);
$pid; # used by tests
}
+# n.b. we don't rely on {-ipc_inflight} and instead pass $inflight
+# explicitly since we need to ensure $inflight is tied to the correct
+# $pid and $self fields can be clobbered on respawn
sub ipc_worker_reap { # awaitpid callback
- my ($pid, $self, $cb, @args) = @_;
+ my ($pid, $self, $inflight, $cb, @args) = @_;
+ while (defined($inflight) && @$inflight) {
+ my ($sub, $sub_arg, $acb, $acb_arg) = splice @$inflight, 0, 4;
+ my $exc = bless \(my $x = 'aborted'), 'PublicInbox::IPC::Die';
+ eval { $acb->($self, $sub, $sub_arg, $acb_arg, $exc) };
+ warn "E: (in abort): $sub: $@" if $@;
+ }
return $cb->($pid, $self, @args) if $cb;
return if !$?;
my $s = $? & 127;
# idempotent, can be called regardless of whether worker is active or not
sub ipc_worker_stop {
my ($self) = @_;
- delete $self->{$_} for qw(-ipc_req -ipc_res); # order matters
+ if (my $w_req = delete $self->{-ipc_req}) {
+ close $w_req; # invalidate if referenced upstack
+ ipc_wait_all $self;
+ delete $self->{-ipc_res}; # ipc_worker_reap will fire
+ }
}
sub _wait_return ($$) {
wantarray ? @$ret : $$ret;
}
+my $ipc_die = sub { # default ipc_async acb
+ my ($self, undef, undef, undef, $ret) = @_;
+ if (ref($ret) eq 'PublicInbox::IPC::Die') {
+ ipc_worker_stop $self;
+ croak $$ret;
+ }
+};
+
+sub ipc_wait_all ($) {
+ my ($self) = @_;
+ my $inflight = $self->{-ipc_inflight} // return;
+ ipc_read_step($self, $inflight) while @$inflight;
+}
+
# call $self->$sub(@args), on a worker if ipc_worker_spawn was used
sub ipc_do {
my ($self, $sub, @args) = @_;
- if (my $w_req = $self->{-ipc_req}) { # run in worker
+ if ($self->{-ipc_req}) { # run in worker
if (defined(wantarray)) {
- my $r_res = $self->{-ipc_res} or die 'no ipc_res';
- _send_rec($w_req, [ wantarray, $sub, @args ]);
- _wait_return($r_res, $sub);
- } else { # likely, fire-and-forget into pipe
- _send_rec($w_req, [ undef , $sub, @args ]);
+ ipc_wait_all $self;
+ ipc_req_async $self, [ wantarray, $sub, @args ];
+ my $ret = ipc_get_res($self);
+ die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
+ wantarray ? @$ret : $$ret;
+ } else { # likely, fire-and-forget into pipe, but dies async
+ ipc_req_async $self, [ undef, $sub, @args ];
+ push @{$self->{-ipc_inflight}}, $sub, \@args,
+ $ipc_die, undef;
}
} else { # run locally
$self->$sub(@args);
}
}
+sub ipc_async {
+ my ($self, $sub, $sub_arg, $acb, $acb_arg) = @_;
+ $sub_arg //= [];
+ $acb //= $ipc_die;
+ if ($self->{-ipc_req}) { # run in worker
+ ipc_req_async $self, [ 1, $sub, @$sub_arg ];
+ push @{$self->{-ipc_inflight}}, $sub, $sub_arg, $acb, $acb_arg;
+ } else { # run locally
+ my @ret = eval { $self->$sub(@$sub_arg) };
+ my $exc = $@;
+ my $ret = $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : \@ret;
+ $acb->($self, $sub, $sub_arg, $acb_arg, $ret);
+ undef;
+ }
+}
+
# needed when there's multiple IPC workers and the parent forking
# causes newer siblings to inherit older siblings sockets
sub ipc_sibling_atfork_child {
undef $end; # trigger exit
} elsif ($bcast1) {
$self->{-wq_workers}->{$pid} = PublicInbox::IO::attach_pid(
- $bcast1, $pid, \&ipc_worker_reap, $self, @cb_args);
+ $bcast1, $pid,
+ \&ipc_worker_reap, $self, undef, @cb_args);
} else { # $one
$self->{-wq_workers}->{$pid} = undef;
- awaitpid($pid, \&ipc_worker_reap, $self, @cb_args);
+ awaitpid($pid, \&ipc_worker_reap, $self, undef, @cb_args);
}
}