# use ipc_do when you need work done on a certain process
# use wq_io_do when your work can be done on any idle worker
package PublicInbox::IPC;
-use strict;
-use v5.10.1;
+use v5.12;
use parent qw(Exporter);
+use autodie qw(fork pipe read socketpair sysread);
use Carp qw(croak);
use PublicInbox::DS qw(awaitpid);
use PublicInbox::Spawn;
sub _get_rec ($) {
my ($r) = @_;
- defined(my $len = <$r>) or return;
+ my $len = <$r> // return;
chop($len) eq "\n" or croak "no LF byte in $len";
- defined(my $n = read($r, my $buf, $len)) or croak "read error: $!";
+ my $n = read($r, my $buf, $len);
$n == $len or croak "short read: $n != $len";
ipc_thaw($buf);
}
my ($self, $ident, $oldset, $fields, @cb_args) = @_;
return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
- pipe(my ($r_req, $w_req)) or die "pipe: $!";
- pipe(my ($r_res, $w_res)) or die "pipe: $!";
+ pipe(my $r_req, my $w_req);
+ pipe(my $r_res, my $w_res);
my $sigset = $oldset // PublicInbox::DS::block_signals();
$self->ipc_atfork_prepare;
my $seed = rand(0xffffffff);
- my $pid = fork // die "fork: $!";
+ my $pid = fork;
if ($pid == 0) {
srand($seed);
eval { Net::SSLeay::randomize() };
my $n = length($buf) or return 0;
my $nfd = 0;
for my $fd (@fds) {
- if (open(my $cmdfh, '+<&=', $fd)) {
- $self->{$nfd++} = $cmdfh;
- $cmdfh->autoflush(1);
- } else {
- die "$$ open(+<&=$fd) (FD:$nfd): $!";
- }
+ open(my $cmdfh, '+<&=', $fd);
+ $self->{$nfd++} = $cmdfh;
+ $cmdfh->autoflush(1);
}
while ($full_stream && $n < $len) {
- my $r = sysread($s2, $buf, $len - $n, $n) // croak "read: $!";
+ my $r = sysread($s2, $buf, $len - $n, $n);
croak "read EOF after $n/$len bytes" if $r == 0;
$n = length($buf);
}
sub stream_in_full ($$$) {
my ($s1, $fds, $buf) = @_;
- socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
- croak "socketpair: $!";
+ socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0);
my $n = $send_cmd->($s1, [ fileno($r) ],
ipc_freeze(['do_sock_stream', length($buf)]),
0) // croak "sendmsg: $!";
sub wq_do {
my ($self, $sub, @args) = @_;
if (defined(wantarray)) {
- pipe(my ($r, $w)) or die "pipe: $!";
+ pipe(my $r, my $w);
wq_io_do($self, 'wq_sync_run', [ $w ], wantarray, $sub, @args);
undef $w;
_wait_return($r, $sub);
sub _wq_worker_start {
my ($self, $oldset, $fields, $one, @cb_args) = @_;
my ($bcast1, $bcast2);
- $one or socketpair($bcast1, $bcast2, AF_UNIX, SOCK_SEQPACKET, 0) or
- die "socketpair: $!";
+ $one or socketpair($bcast1, $bcast2, AF_UNIX, SOCK_SEQPACKET, 0);
my $seed = rand(0xffffffff);
- my $pid = fork // die "fork: $!";
+ my $pid = fork;
if ($pid == 0) {
srand($seed);
eval { Net::SSLeay::randomize() };
my ($self, $ident, $nr_workers, $oldset, $fields, @cb_args) = @_;
($send_cmd && $recv_cmd) or return;
return if $self->{-wq_s1}; # idempotent
- $self->{-wq_s1} = $self->{-wq_s2} = undef;
- socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, SOCK_SEQPACKET, 0)
- or die "socketpair: $!";
+ socketpair($self->{-wq_s1}, $self->{-wq_s2},AF_UNIX, SOCK_SEQPACKET, 0);
$self->ipc_atfork_prepare;
$nr_workers //= $self->{-wq_nr_workers}; # was set earlier
my $sigset = $oldset // PublicInbox::DS::block_signals();