use Errno qw(EAGAIN EINTR);
use Carp qw(croak);
use PublicInbox::DS qw(awaitpid);
+use PublicInbox::IO;
use PublicInbox::Spawn;
use PublicInbox::OnDestroy;
use PublicInbox::WQWorker;
$r_req = $w_res = undef;
$w_req->autoflush(1);
$self->{-ipc_req} = $w_req;
- $self->{-ipc_res} = $r_res;
+ $self->{-ipc_res} = PublicInbox::IO::attach_pid($r_res, $pid,
+ \&ipc_worker_reap, $self, @cb_args);
$self->{-ipc_ppid} = $$;
- awaitpid($pid, \&ipc_worker_reap, $self, @cb_args);
$self->{-ipc_pid} = $pid;
}
sub ipc_worker_reap { # awaitpid callback
my ($pid, $self, $cb, @args) = @_;
- delete $self->{-wq_workers}->{$pid};
return $cb->($pid, $self, @args) if $cb;
return if !$?;
my $s = $? & 127;
return; # idempotent
}
die 'no PID with IPC pipes' unless $pid;
- $w_req = $r_res = undef;
- awaitpid($pid) if $$ == $ppid; # for non-event loop
+ $w_req = undef; # order matters
+ $r_res = undef;
}
sub _wait_return ($$) {
};
warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
undef $end; # trigger exit
- } else {
- $self->{-wq_workers}->{$pid} = $bcast1;
+ } elsif ($bcast1) {
+ $self->{-wq_workers}->{$pid} = PublicInbox::IO::attach_pid(
+ $bcast1, $pid, \&ipc_worker_reap, $self, @cb_args);
+ } else { # $one
+ $self->{-wq_workers}->{$pid} = undef;
awaitpid($pid, \&ipc_worker_reap, $self, @cb_args);
}
}
}
delete @$self{qw(-wq_s1 -wq_s2)} or return;
return if ($self->{-wq_ppid} // -1) != $$;
- awaitpid($_) for keys %{$self->{-wq_workers}};
+ delete $self->{-wq_workers};
}
sub wq_kill {