sub sendmsg_retry ($) {
return 1 if $!{EINTR};
return unless ($!{ENOMEM} || $!{ENOBUFS} || $!{ETOOMANYREFS});
- return if --$_[0] < 0;
+ return if $_[0]-- == 0;
warn "# sleeping on sendmsg: $! ($_[0] tries left)\n";
select(undef, undef, undef, 0.1);
1;
# any number of FDs per-sendmsg(2) + buffer
*send_cmd4 = sub ($$$$;$) { # (sock, fds, buf, flags) = @_;
my ($sock, $fds, undef, $flags, $tries) = @_;
- $tries //= 50;
+ $tries //= -1; # infinite
my $mh = Socket::MsgHdr->new(buf => $_[2]);
$mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, pack('i' x scalar(@$fds), @$fds));
my $s;
my $buf = ipc_freeze([$sub, @args]);
if ($self->{wqb}) { # saturated once, assume saturated forever
$self->{wqb}->flush_send($buf);
- } else {
- $send_cmd->($self->{-wq_s1}, [], $buf, 0) //
- ($!{EAGAIN} ? PublicInbox::WQBlocked->new($self, $buf)
- : croak("sendmsg: $!"));
- }
+ } elsif (!defined $send_cmd->($self->{-wq_s1}, [], $buf, 0)) {
+ if ($!{EAGAIN} || $!{ENOBUFS} || $!{ENOMEM}) {
+ PublicInbox::WQBlocked->new($self, $buf);
+ } else {
+ croak "sendmsg: $!";
+ }
+ } # else success
}
sub _wq_worker_start {
return (int)pid;
}
-static int sendmsg_retry(int *tries)
+static int sendmsg_retry(long *tries)
{
const struct timespec req = { 0, 100000000 }; /* 100ms */
int err = errno;
switch (err) {
case EINTR: PERL_ASYNC_CHECK(); return 1;
case ENOBUFS: case ENOMEM: case ETOOMANYREFS:
- if (--*tries < 0) return 0;
- fprintf(stderr, "# sleeping on sendmsg: %s (%d tries left)\n",
+ if (*tries-- == 0) return 0;
+ fprintf(stderr, "# sleeping on sendmsg: %s (%ld tries left)\n",
strerror(err), *tries);
nanosleep(&req, NULL);
PERL_ASYNC_CHECK();
char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE];
};
-SV *send_cmd4_(PerlIO *s, SV *svfds, SV *data, int flags, int tries)
+SV *send_cmd4_(PerlIO *s, SV *svfds, SV *data, int flags, long tries)
{
struct msghdr msg = { 0 };
union my_cmsg cmsg = { 0 };
$msg_controllen,
0); # msg_flags
my $s;
- $tries //= 50;
+ $tries //= -1;
do {
$s = syscall($SYS_sendmsg, fileno($sock), $mh, $flags);
} while ($s < 0 && PublicInbox::CmdIPC4::sendmsg_retry($tries));
my $n = $PublicInbox::IPC::send_cmd->($wq_s1, [], $buf,
0);
next if defined($n);
- Carp::croak("sendmsg: $!") unless $!{EAGAIN};
- PublicInbox::DS::epwait($wq_s1, EPOLLOUT|EPOLLONESHOT);
+ if ($!{EAGAIN}) {
+ PublicInbox::DS::epwait($wq_s1,
+ EPOLLOUT|EPOLLONESHOT);
+ } elsif ($!{ENOBUFS} || $!{ENOMEM}) {
+ PublicInbox::DS::add_timer(0.1, \&flush_send,
+ $self);
+ } else {
+ Carp::croak("sendmsg: $!");
+ }
unshift @{$self->{msgq}}, $buf;
last; # wait for ->event_step
}
use Socket qw(AF_UNIX SOCK_SEQPACKET);
use PublicInbox::IPC;
use autodie qw(pipe socketpair);
-our $tries = 50;
+our $tries = -1; # set to zero by read-only daemon
sub mkreq {
my ($self, $ios, @arg) = @_;