From: Eric Wong Date: Tue, 19 Aug 2025 00:33:33 +0000 (+0000) Subject: ipc: use PublicInbox::IO::attach_pid instead of awaitpid X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=365ef27928e46b9745b684ad3869369336247ed1;p=thirdparty%2Fpublic-inbox.git ipc: use PublicInbox::IO::attach_pid instead of awaitpid Blessing and attaching PIDs to PublicInbox::IO objects will give us access to my_readline and my_bufread subs to allow for asynchronous callbacks for error handling (similar to PublicInbox::Git->cat_async). --- diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 1395c4625..9e4fa96b4 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -14,6 +14,7 @@ use autodie qw(close pipe read send socketpair); use Errno qw(EAGAIN EINTR); use Carp qw(croak); use PublicInbox::DS qw(awaitpid); +use PublicInbox::IO; use PublicInbox::Spawn; use PublicInbox::OnDestroy; use PublicInbox::WQWorker; @@ -129,15 +130,14 @@ sub ipc_worker_spawn { $r_req = $w_res = undef; $w_req->autoflush(1); $self->{-ipc_req} = $w_req; - $self->{-ipc_res} = $r_res; + $self->{-ipc_res} = PublicInbox::IO::attach_pid($r_res, $pid, + \&ipc_worker_reap, $self, @cb_args); $self->{-ipc_ppid} = $$; - awaitpid($pid, \&ipc_worker_reap, $self, @cb_args); $self->{-ipc_pid} = $pid; } sub ipc_worker_reap { # awaitpid callback my ($pid, $self, $cb, @args) = @_; - delete $self->{-wq_workers}->{$pid}; return $cb->($pid, $self, @args) if $cb; return if !$?; my $s = $? & 127; @@ -167,8 +167,8 @@ sub ipc_worker_stop { return; # idempotent } die 'no PID with IPC pipes' unless $pid; - $w_req = $r_res = undef; - awaitpid($pid) if $$ == $ppid; # for non-event loop + $w_req = undef; # order matters + $r_res = undef; } sub _wait_return ($$) { @@ -364,8 +364,11 @@ sub _wq_worker_start { }; warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@; undef $end; # trigger exit - } else { - $self->{-wq_workers}->{$pid} = $bcast1; + } elsif ($bcast1) { + $self->{-wq_workers}->{$pid} = PublicInbox::IO::attach_pid( + $bcast1, $pid, \&ipc_worker_reap, $self, @cb_args); + } else { # $one + $self->{-wq_workers}->{$pid} = undef; awaitpid($pid, \&ipc_worker_reap, $self, @cb_args); } } @@ -397,7 +400,7 @@ sub wq_close { } delete @$self{qw(-wq_s1 -wq_s2)} or return; return if ($self->{-wq_ppid} // -1) != $$; - awaitpid($_) for keys %{$self->{-wq_workers}}; + delete $self->{-wq_workers}; } sub wq_kill {