use Compress::Zlib qw(compress);
use Carp qw(croak);
use Time::Local qw(timegm);
-use autodie qw(close pipe open sysread seek sysseek send);
+use Errno qw(EINTR);
+use autodie qw(close pipe open seek sysseek);
our $DO_QUIT = 15; # signal number
our (
$LIVE_JOBS, # integer
$fmt;
}
+sub xsend ($$) { # move to PerlIO if we need to
+ my ($s, $buf) = @_;
+ my $n;
+ while (1) {
+ $n = send $s, $buf, 0;
+ return $n if defined $n;
+ next if $! == EINTR;
+ croak "send: $!";
+ }
+}
+
sub store_repo { # wq_io_do, sends docid back
my ($self, $repo) = @_;
my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
my $did = $repo->{docid};
$did ? $self->{xdb}->replace_document($did, $doc)
: ($did = $self->{xdb}->add_document($doc));
- send($op_p, "repo_stored $did", 0);
+ xsend $op_p, "repo_stored $did";
}
sub cidx_ckpoint ($;$) {
my ($pid, $cmd, $self, $op_p) = @_;
if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
($? & 127) == POSIX::SIGPIPE))) {
- send($op_p, "shard_done $self->{shard}", 0);
+ xsend $op_p, "shard_done $self->{shard}";
} else {
warn "W: @$cmd (\$?=$?)\n";
$self->{xdb}->cancel_transaction;
my ($opt, $self, $git, $op_p) = @_;
my $refs = delete $opt->{1} // 'BUG: no {-repo}->{refs}';
sysseek($refs, 0, SEEK_SET);
- send($op_p, 'fp_done '.sha_all(256, $refs)->hexdigest, 0);
+ xsend $op_p, 'fp_done '.sha_all(256, $refs)->hexdigest;
}
sub fp_done { # called parent via PktOp by fp_async_done
my ($self) = @_;
my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
$self->commit_txn_lazy;
- send($op_p, "shard_done $self->{shard}", 0);
+ xsend $op_p, "shard_done $self->{shard}";
}
sub dump_roots_start {
my $prune_op_p = delete $self->{0} // die 'BUG: no {0} op_p';
my $nr = delete $self->{nr_prune} // die 'BUG: nr_prune undef';
cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr;
- send($prune_op_p, "prune_done $self->{shard}", 0);
+ xsend $prune_op_p, "prune_done $self->{shard}";
}
sub shards_active { # post_loop_do
use autodie qw(bind chdir open pipe socket socketpair syswrite unlink);
use Getopt::Long ();
use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un);
-use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET);
+use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET EINTR);
use Cwd qw(getcwd);
use POSIX qw(strftime);
use IO::Handle ();
use PublicInbox::ContentHash qw(git_sha);
use PublicInbox::OnDestroy;
use PublicInbox::IPC;
+use PublicInbox::IO qw(poll_in);
use Time::HiRes qw(stat); # ctime comparisons for config cache
use File::Path ();
use File::Spec;
}
}
+sub send_gently ($$) {
+ my ($s, $buf) = @_;
+ my $n;
+ while (1) {
+ $n = send $s, $buf, 0;
+ return $n if defined($n) || $! != EINTR;
+ }
+}
+
# pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
sub x_it ($$) {
my ($self, $code) = @_;
$self->{pkt_op_p}->pkt_do('x_it', $code);
exit($code >> 8) if $$ != $daemon_pid;
} elsif ($self->{sock}) { # lei->daemon => lei(1) client
- send($self->{sock}, "x_it $code", 0);
+ send_gently $self->{sock}, "x_it $code";
} elsif ($quit == \&CORE::exit) { # an admin (one-shot) command
exit($code >> 8);
} # else ignore if client disconnected
if ($self->{pkt_op_p}) { # to top lei-daemon
$self->{pkt_op_p}->pkt_do('child_error', $child_error);
} elsif ($self->{sock}) { # to lei(1) client
- send($self->{sock}, "child_error $child_error", 0);
+ send_gently $self->{sock}, "child_error $child_error";
} # else noop if client disconnected
}
while (my $op = shift(@$alerts)) {
if ($op eq ':WINCH') {
# hit the process group that started the MUA
- send($sock, '-WINCH', 0) if $sock;
+ send_gently $sock, '-WINCH' if $sock;
} elsif ($op eq ':bell') {
out($self, "\a");
} elsif ($op =~ /(?<!\\),/) { # bare ',' (not ',,')
my $cmd = $1; # run an arbitrary command
require Text::ParseWords;
$cmd = [ Text::ParseWords::shellwords($cmd) ];
- send($sock, exec_buf($cmd, {}), 0) if $sock;
+ send_gently $sock, exec_buf($cmd, {}) if $sock;
} else {
warn("W: unsupported --alert=$op\n"); # non-fatal
}
say { $self->{2} } @msg, '# -quit pager to continue-';
$self->{2}->autoflush(1);
stop_pager($self);
- send($self->{sock}, 'wait', 0); # wait for user to quit pager
+ send_gently $self->{sock}, 'wait'; # wait for user to quit pager
}
sub stop_pager {
my $self = bless { sock => $sock }, __PACKAGE__;
vec(my $rvec = '', fileno($sock), 1) = 1;
select($rvec, undef, undef, 60) or
- return send($sock, 'timed out waiting to recv FDs', 0);
+ return send_gently $sock, 'timed out waiting to recv FDs';
# (4096 * 33) >MAX_ARG_STRLEN
my @fds = $PublicInbox::IPC::recv_cmd->($sock, my $buf, 4096 * 33) or
return; # EOF
if (!defined($fds[0])) {
warn(my $msg = "recv_cmd failed: $!");
- return send($sock, $msg, 0);
+ return send_gently $sock, $msg;
} else {
my $i = 0;
open($self->{$i++}, '+<&=', $_) for @fds;
- $i == 4 or return send($sock, 'not enough FDs='.($i-1), 0)
+ $i == 4 or return send_gently $sock, 'not enough FDs='.($i-1);
}
# $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
# $buf = "$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
substr($buf, -2, 2, '') eq "\0\0" or # s/\0\0\z//
- return send($sock, 'request command truncated', 0);
+ return send_gently $sock, 'request command truncated';
my ($argc, @argv) = split(/\0/, $buf, -1);
undef $buf;
my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
die "unrecognized client signal: $buf";
}
my $s = $self->{-socks} // []; # lei up --all
- @$s = grep { send($_, $buf, 0) } @$s;
+ @$s = grep { send_gently $_, $buf } @$s;
};
if (my $err = $@) {
eval { $self->fail($err) };
undef;
}
-sub request_umask {
+sub request_umask { # assumes client is trusted and fast
my ($lei) = @_;
my $s = $lei->{sock} // return;
send($s, 'umask', 0) // die "send: $!";
- vec(my $rvec = '', fileno($s), 1) = 1;
- select($rvec, undef, undef, 2) or die 'timeout waiting for umask';
- recv($s, my $v, 5, 0) // die "recv: $!";
- (my $u, $lei->{client_umask}) = unpack('AV', $v);
+ my ($v, $r, $u);
+ do { # n.b. poll_in returns -1 on EINTR
+ vec($v = '', fileno($s), 1) = 1;
+ $r = poll_in($s, 2) or
+ die 'timeout waiting for umask';
+ } while ($r < 0 && $! == EINTR);
+ do {
+ $r = recv $s, $v, 5, 0;
+ die "recv: $!" if !defined($r) && $! != EINTR;
+ } while (!defined $r);
+ ($u, $lei->{client_umask}) = unpack('AV', $v);
$u eq 'u' or warn "E: recv $v has no umask";
}
package PublicInbox::PktOp;
use v5.12;
use parent qw(PublicInbox::DS);
-use Errno qw(EAGAIN ECONNRESET);
+use Errno qw(EAGAIN ECONNRESET EINTR);
use PublicInbox::Syscall qw(EPOLLIN);
use Socket qw(AF_UNIX SOCK_SEQPACKET);
use PublicInbox::IPC qw(ipc_freeze ipc_thaw);
sub event_step {
my ($self) = @_;
my $c = $self->{sock};
- my $n = recv($c, my $msg, 4096, 0);
- unless (defined $n) {
- return if $! == EAGAIN;
- die "recv: $!" if $! != ECONNRESET; # we may be bidirectional
- }
- my ($cmd, @pargs);
+ my ($msg, $n, $cmd, @pargs);
+ do {
+ $n = recv($c, $msg, 4096, 0);
+ unless (defined $n) {
+ next if $! == EINTR;
+ return if $! == EAGAIN;
+ die "recv: $!" if $! != ECONNRESET; # we may be bidirectional
+ }
+ } until (defined $n);
if (index($msg, "\0") > 0) {
($cmd, my $pargs) = split(/\0/, $msg, 2);
@pargs = @{ipc_thaw($pargs)};