]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
ipc: use PublicInbox::IO::attach_pid instead of awaitpid
authorEric Wong <e@80x24.org>
Tue, 19 Aug 2025 00:33:33 +0000 (00:33 +0000)
committerEric Wong <e@80x24.org>
Wed, 20 Aug 2025 19:10:13 +0000 (19:10 +0000)
Blessing and attaching PIDs to PublicInbox::IO objects will give
us access to my_readline and my_bufread subs to allow for
asynchronous callbacks for error handling (similar to
PublicInbox::Git->cat_async).

lib/PublicInbox/IPC.pm

index 1395c4625098a616a5dd7c3555874dc40c3f75ff..9e4fa96b43096fb8d37a15c2ff6576074859851f 100644 (file)
@@ -14,6 +14,7 @@ use autodie qw(close pipe read send socketpair);
 use Errno qw(EAGAIN EINTR);
 use Carp qw(croak);
 use PublicInbox::DS qw(awaitpid);
+use PublicInbox::IO;
 use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
 use PublicInbox::WQWorker;
@@ -129,15 +130,14 @@ sub ipc_worker_spawn {
        $r_req = $w_res = undef;
        $w_req->autoflush(1);
        $self->{-ipc_req} = $w_req;
-       $self->{-ipc_res} = $r_res;
+       $self->{-ipc_res} = PublicInbox::IO::attach_pid($r_res, $pid,
+                                       \&ipc_worker_reap, $self, @cb_args);
        $self->{-ipc_ppid} = $$;
-       awaitpid($pid, \&ipc_worker_reap, $self, @cb_args);
        $self->{-ipc_pid} = $pid;
 }
 
 sub ipc_worker_reap { # awaitpid callback
        my ($pid, $self, $cb, @args) = @_;
-       delete $self->{-wq_workers}->{$pid};
        return $cb->($pid, $self, @args) if $cb;
        return if !$?;
        my $s = $? & 127;
@@ -167,8 +167,8 @@ sub ipc_worker_stop {
                return; # idempotent
        }
        die 'no PID with IPC pipes' unless $pid;
-       $w_req = $r_res = undef;
-       awaitpid($pid) if $$ == $ppid; # for non-event loop
+       $w_req = undef; # order matters
+       $r_res = undef;
 }
 
 sub _wait_return ($$) {
@@ -364,8 +364,11 @@ sub _wq_worker_start {
                };
                warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
                undef $end; # trigger exit
-       } else {
-               $self->{-wq_workers}->{$pid} = $bcast1;
+       } elsif ($bcast1) {
+               $self->{-wq_workers}->{$pid} = PublicInbox::IO::attach_pid(
+                       $bcast1, $pid, \&ipc_worker_reap, $self, @cb_args);
+       } else { # $one
+               $self->{-wq_workers}->{$pid} = undef;
                awaitpid($pid, \&ipc_worker_reap, $self, @cb_args);
        }
 }
@@ -397,7 +400,7 @@ sub wq_close {
        }
        delete @$self{qw(-wq_s1 -wq_s2)} or return;
        return if ($self->{-wq_ppid} // -1) != $$;
-       awaitpid($_) for keys %{$self->{-wq_workers}};
+       delete $self->{-wq_workers};
 }
 
 sub wq_kill {